Cascading: Implementing Counter Logic

Overview


Cascading is a Java implemented data transformation framework operating within Apache Hadoop.  Given an applicable stream of data, Cascading offers a variety of functions and tools for manipulating form, performing aggregations and executing other custom transformations in which writing the corresponding operations in Map Reduce would indeed prove challenging.  Again, this is most likely the main advantage the Cascading framework offers to developers.  Developers familiar with Cascading are likely to understand the internal plan structure passed through pipe assemblies, in which transformed data doesn't actually exist in concrete form until a flow completes.  This poses a challenge in situations within an pure Cascading application where logic on data values need to be executed mid-flow, or before a definition completes.  One very common instance of this involves pin-pointing a count of the number of tuples in a stream.  More specifically, business logic may dictate one particular transformation "B" occurs only if the number of tuples in the output pipe resulting from transformation "A" is greater than or equal to "x", ELSE execute transformation "C".  Once again, Cascading up to the current version (2.5) seems not to offer any straightforward "tuple count" functionality.  This may or may not be available in future versions, but for now, one possible solution is as follows:




Tracking Tuple Counts Through Hadoop Counters 

One solution is using a Hadoop counter coupled with multiple flow definitions corresponding to the particular transformations that depend on the tuple counts at each step.

FlowDef()...

Pipe A = new Each(pipe, CustomFilter1());

If the resulting tuple count on "A" is  >= 500, then:

Pipe B = new Each(A, CustomFilter2());

If not,  keep pipe A as it exists after filter 1.  

Define a counter function:

Public class TransformWithAOrB {

public static String GROUP = "rows";
public static String COUNT = "count"; 

private static final class RowCount Extends BaseOperation implements Function

@Override
public void operate(FlowProcess flowProcess, FunctionCall functionCall) {

  flowProcess.increment(GROUP, COUNT, 1);

   }  
}   

FlowDef flowDefWithFilterA(source, sink, countSink)...

Pipe A = new Each(pipe, new CustomFilterA());

Pipe countPipe = new Each(A, new RowCount());

return FlowDef.flowDef()
        .addSource(pipe, source)
        .addTail(A)
        .addSink(A, sink)
        .addTail(countPipe)
        .addSink(countPipe, countSink);


FlowDef flowDefWithFilterB(source, sink)... 

Pipe A = new Each(pipe, new CustomFilterB());

return FlowDef.flowDef()
        .addSource(pipe, source)
        .addTail(A)
        .addSink(A, sink);

}

   
Here, two flow definitions are assigned corresponding to each transformation.  Definition "A" uses the counter function to count the tuples in pipe A after the first filter is executed.  Note that a "count sink" is also assigned.  Next, the logic becomes implemented in a flow package.


Implementing Logic in a Flow Package

Once the flow definitions have been establish, they may pass through a run function that executes logic on the counter.  For this particular case, this method should resemble something similar to the following:

public class RunFlowDefs {
  
 public static int run(Tap source, Tap sink, Tap countSink) {

  FlowDef flowDef = TransformWithAOrB.flowDefWithFilterA(source, sink, countSink) 

  FlowConnector flowConnector = new HadoopFlowConnector();

  Flow filterA = flowConnector.connect(flowDef);

  filterA.complete();

  long pass1 = filterA.getFlowStats().getCounterValue(class.GROUP, class.COUNT);

  if(pass1 >= 500) {

   FlowDef flowDef2 = TransformWithAOrB.flowDefWithFilterB(source, sink);

   Flow filterB = flowConnector.connect(flowDef2);

   filterB.complete();

  }

  else {

  System.out.println("condition met.  bypassing second transform");
  
  }

 }
  
  return 0;
 


Here , this run method is leveraged in a main function, that establishes incoming/outgoing taps and executes the associated flows with:

RunFlowDefs.run(source, sink, countSink); 

No comments:

Post a Comment