Cascading: Using BufferJoin To Perform Custom Joins

Custom join operations are often difficult to implement. They almost always require cross-join operations to be performed. Cascading 2.5.0 offers a unique solution to tackle custom join requirements.

This solution comes in the form of the BufferJoin class. BufferJoin allows a developer to specify a CoGroup of two input streams, and then traverse each combination of left-hand side (lhs) tuples and right-hand side (rhs) tuples for the specified grouping keys. With a CoGroup, the default grouping keys are the join keys. So, for example, if you have something like:

Pipe result = new CoGroup(lhs, new Fields(lhs_a, lhs_b), rhs, new Fields(rhs_a, rhs_b), 
                          new BufferJoin());

The above basically means that you want to join lhs with rhs on (lhs_a, lhs_b) and (rhs_a, rhs_b) like an inner join (only keep rows with these two tuples same on both sides) and then be allowed to iterate over all such rows in a separate Buffer (this is requested by the last part in code line above - BufferJoin)

To understand more about the BufferJoin, we will follow a (rather creative) example. Suppose we wish to start a matrimonial website that does matchmaking. Since we are really new to this market, we try to keep things simple. People come to our webpage, create a profile (photos, name, birth date, age, etc). Men and women can then search for good matches on our website. Also, we need to build a recommender system, that constantly displays recommendations to users (much like those annoying advertisements that show up to the right of any Facebook page). To make better recommendations, we ask our users to pay a tiny amount of money (they wouldn't pay us a lot anyway, since we are new to the market, remember?) and some personal information that we legally promise to keep secret forever (like salary information, family information, permissions to do a credit check, etc). How can we then create a good recommender system for this use case? 



The problem described in the above paragraph is indeed a difficult one. Firstly, our choice of data storage is HDFS. Secondly, we decide to keep all male profiles in one file, and all the female profiles in another. For a male user in the male table (we refer to a file in HDFS as a table, since it is easier to visualize data that way), we'd like to find all "good" matches from the female table (and of course, vice versa). Moreover, we have these two tables being updated every few minutes (since more and more people are creating accounts on our website, yay!).  How do we determine the "good" mentioned above, for our matches? Towards that, say we take a survey within our locality of different couples that just got married, and come up with the following business logic after a bit of research:

IF 0 <= [ age of male ] - [ age of female ] <= 3 (elder male within same age group)
     IF $110,000.00 < [ male salary ] + [ female salary ] < $135,000.00 (salary limits of couple)
          RECOMMEND male <--> female (alert the pair about each other)
     END IF
END IF

Disclaimer: The above business logic is for example purposes, and does not display the author's views on matrimony or gender in any kind.

The above business logic is too simple, highly unlikely to be the core business logic of a matrimonial website; but it is enough to convey the idea. The code for an implementation for the above can be found in the IntelliJ project here. Feel free to pull down the project like:

git clone https://github.com/kelkarn/blogcode.git

In the BufferJoinExample.java class, the following snippet is of great importance:

matches = new CoGroup(males, Fields.NONE, females, Fields.NONE, new BufferJoin());

matches = new Every(matches, new CustomJoin(new Fields(NAME + MALE_NAMESPACE,  
                    NAME + FEMALE_NAMESPACE)), Fields.RESULTS); // keep result fields

The above lines of code tell Cascading to join the two input streams males and females with no join field, and defer the actual joining to a Buffer implementation that follows next. The buffer (CustomJoin) then implements the actual business logic discussed above. These are some lines of code from CustomJoin.java
if(rhsTuple.getInteger( 1 ) <= lhsTuple.getInteger( 1 )) {  // is female age <= male age ?

        if((rhsTuple.getInteger( 2 ) + lhsTuple.getInteger( 2 ) > 110000) &&           (rhsTuple.getInteger( 2 ) + lhsTuple.getInteger( 2 ) < 135000)) {
        if(lhsTuple.getInteger( 1 ) - rhsTuple.getInteger( 1 ) <= 3) { 
           bufferCall.getOutputCollector().add(new Tuple(lhsTuple.getString( 0 ), 
           rhsTuple.getString( 0 )));
                        }
                    }
                } 

This is exactly what we had in the pseudocode. The buffer has access to both lhs and rhs stream tuples and so is easily customizable to fit whatever logic your application requires. 
Some notes about the BufferJoin
Since Cascading uses an in-memory table to maintain high speed access to the rightmost streams in the CoGroup, it is recommended that the smaller data files be kept on the right hand side. 

Secondly, Fields.NONE as join key within CoGroup should be used carefully. It essentially translates to a full cross-join in Cascading, which based on your data size and available cluster size, may slow down your process. 

Finally, if you observed in CustomJoin Buffer class, it is only allowable to access tuple field values numerically. This is one of the limitations of using a custom joiner. You need to know the exact positions of your fields within the tuple, to use it in the Buffer. 
 
The code, once you fork the repository, has liberal amounts of comments to explain in detail how the buffer works. Try running BufferJoinExampleTest test class on your local to execute a small test run of our BufferJoinExample class. You'll see the following output: 

['name_male', 'name_female']
['Kumar', 'Marissa']
['Aaron', 'Jane']
['Levon', 'Priya']

The input files can be found in: src/test/resources/input/buffer/bufferjoin/
So these are our match recommendations! We are off to a good start towards making a great recommender system! :)

Please leave your questions and comments about BufferJoin below. Thanks for reading, and happy coding!

No comments:

Post a Comment