SQL with Cascading: Recreating the SQL "ROW ID" Functionality

Overview

For those who already have a basic understanding of Cascading dynamics, the contexts in which it is used, and what it aims to accomplish upon working with structured data, it should become clear that Cascading may become one of the top go-to frameworks for reimplementing database programming on top of Apache Hadoop.  In this, it becomes a task to even further restructure and again re-implement many of the typical database concepts and operations in Cascading friendly fashion when the framework itself fails to include such elements in it's already well-extended function library.  When such obstacles are encountered, one may leverage one of many of the operator extensions offered by Cascading in which custom-built processes can bridge the gap between a particular SQL action and it's lack of direct support in the function library.

A list of common database operations that may need to be implemented in Cascading include:

1.) DECODE
2.) CASE
3.) ROW ID
4.) recreating primary keys (relates to # 3)
5.) MINUS

Lists such as this can be easily extended.  None the less, this following post will present an example of how to implement # 3 on the list and a special problem that becomes associated due to sequencing across a distributed system such as Apache Hadoop when data files become split apart and then assembled back together through the MapReduce machine.

The Case

A particular data stream is spawned from the re-implementation of a SQL statement that includes a ROW_ID column, or simply an assigned key corresponding to each incoming record.  This particular field may preexist but more often than not is appended on the fly through a SELECT ROW ID statement.  In the latter case, one of many seemingly obvious solutions is to implement a custom function resembling something similar to the following:

public class AddRowID extends BaseOperation implements Function {

    long count;

    public AddRowID(Fields fields, long count) {

        super(1, fields);
        this.count = count;

    }

@Override

public void operate(@SuppressWarnings("rawtypes") flowProcess, functionCall) {


        functionCall.getOutputCollector().add(new Tuple(count++));

    }
}


and possibly implemented through the following assembly:

Pipe pipe = new Each(previousPipe, Fields.TYPE, new AddRowID(new Fields(A), 0));

The Problem

The issue that arises with the previous method is that the count sequence is skewed when the data is distributed.  ID's remain non-unique across the board and become repeated in the final output, generating overlaps in various sections depending on the size of the incoming data file or artifact.  

A Solution!

Once again, Cascading offers no direct implementation of the ofter-encountered ROW ID/ROWID operator characteristic of many procedural SQL-based operations, which ideally would handle the sequencing issue across the platform and generate an independent key when such an element is needed.  Though seemingly a barrier, the lack of such functionality can be supplemented quite easily through the implementation of Cascading's Buffer class.  Performing the operation within a buffer preserves the sequence across the platform.  The Row ID buffer resembles the following:

public class RowID extends BaseOperation implements Buffer {

  public long rowCount;

  public RowID(Fields fields, int startCount) {

    super(1, new Fields("row_id"));

    rowCount = startCount - 1;

  }

 public void operate(FlowProcess fp, BufferCall bc) {

 long sum = 0;

 Iterator<TupleEntry> arguments = bc.getArgumentsIterator();

 while(arguments.hasNext()) {

  sum += arguments.next().getInteger(0);

  rowCount++;

 }

 Tuple result = new Tuple(rowCount);

 bc.getOutputCollector().add(result);

}

Note

The row count operation assigns the "row_id" to each record and most importantly needs to follow a lead operation, or a particular operation it can "track" in order to increment the count.  In this case it uses a "sum" operation of the input field.  When the lead operation is performed on each record, the count is incremented by 1 and recorded.  Different operations may serve as the lead operation.  The importance lies in the fact that the count is added to the tuple.   


 

No comments:

Post a Comment