Debugging Cascading: ... PlannerException: not all head pipes bound to source taps

Overview

 

A common encounter when using the Cascading framework for big data applications is the declaration of numerous data sources that merge into one or two primary transforms or operations using a complicated assembly of pipes.  With this, and additional common encounter is a PlannerException complaining that a single or multiple number of these streams are not bound to their corresponding source taps, specifically:

Cascading: PlannerException not all head pipes bound to source taps. Remaining names []

Such an alert can definitely frustrate the user, and within Cascading, a multitude of scenarios can invoke this error, but the most common cause revolves around the fact that within an entanglement of numerous pipe heads streaming trough various aggregations and filters, one or more heads are not actually connected to an output tail.


Consider an Example with the Following FlowDef..

 

Tap<?, ?, ?> source A

Tap<?, ?, ?> source B

Tap<?, ?, ?> source C

 

Pipe a = new Pipe("a1");

Pipe b = new Pipe("b1");

Pipe c = new Pipe("c1");

 

c = new Each(c, Fields.ALL, new ExpressionFilter(...));

Pipe d = new CoGroup(a, new Fields(A), b, new Fields(B));

Pipe e = new Retain(d, new Fields(declared));

 

return FlowDef flowDef

.addSource(a, source A)

.addSource(b, source B)

.addSource(c, source C)

.addTail(e)

.addSink(e, sink E);

 

Here, this flow fails to complete because Pipe C is defined but is detached from the completion.   The defined tail "e" is a compilation of pipes "a" and "b", specifically their co-group, which writes to the corresponding sink "E", so a and b are accounted for.  Pipe "c", on the other hand, is left out of the path leading to an output.  Note the following example, which would complete:

Tap<?, ?, ?> source A

Tap<?, ?, ?> source B

Tap<?, ?, ?> source C

 

Pipe a = new Pipe("a1");

Pipe b = new Pipe("b1");

Pipe c = new Pipe("c1");

 

c = new Each(c, Fields.ALL, new ExpressionFilter(...));

Pipe d = new CoGroup(a, new Fields(A), b, new Fields(B), c, new Fields(C));

Pipe e = new Retain(d, new Fields(declared));

 

return FlowDef flowDef

.addSource(a, source A)

.addSource(b, source B)

.addSource(c, source C)

.addTail(e)

.addSink(e, sink E);

 

Here, Pipe C has been attached to the tail by streaming into a join between a, b and c.  This flow completes not because all defined pipes and their associated sources are accounted for.  When coming in contact with this particular Planner exception, note closely the final destination of each defined pipe in the flow in order to ensure that all streams merge into a tail in their eventual form.  


Summary  


When coming into contact with Cascading Planner Exceptions such as this, it is important to keep close track of the steps occurring in the flow and the final destinations of each pipe that is bound to a source.  Specifically, sources must drain into sinks, and any pipe holding a source stream that is not bound to a tail will invoke a planner exception.  If a Java IDE is being used for development, such as IntelliJ or Eclipse, pipes are easily tracked since any unused initializations remain greyed out while everything eventually utilized are left bold.  Note that this approach does not completely eradicate error as pipes being properly used may still fail to steam to a tail, but it does in fact provide a starting point for checking source destinations from one point in the flow to the next.      


  

 

 

 

No comments:

Post a Comment