Lingual SQL on Hadoop

Background

There are many projects that attempt to utilize SQL for data analytics and management on Hadoop.  Lingual from Concurrent, Inc. is a batch processing ANSI SQL 99 engine.  It was just made available GA at the end of 2013.   With high latency, it is more similar to Hive rather than a realtime SQL processing tool such as Impala or Apache Dril..   Lingual utilizes the Optiq SQL query parser writen by Julian Hyde to parse incoming queries.  The cascading team then wrote Lingual to utilize this information to generate Cacading code to actually perform the query.   Interestingly enough the Apache Drill team also utilize Optiq for their query parsing.

Lingual has several parts that could be useful for specific use cases: 

1.  Command line shell similar to Hive which can be utilized for queries
2.  Integration with Cascading by means of Cascading flows
3.  JDBC integration with existing tools they need to utilize HDFS data

Use Case


The use case that was most specific to one of our applications is integration with cascading flows.   The team is developing a financial data workflow processing tool which contains a tremendous amount of business logic and SQL processing.   Depending on developer skill with Cascading and the complexity of logic to implement, the developer might choose to utilize Lingual instead of writing Cascading code.  Our approach was similar to the decision to be made when utilizing Cascading instead of pure Map Reduce.    


Implementation


Since Lingual could be integrated into the workflow at any point with it's Cascading integration, the developers could decide which tool is the best to use for each piece of the particular workflow.  Lingual will take a Cascading input tap and perform the SQL query on the input taps to generate Cascading output taps.  It is important to note that the input taps can be produced from the output of other cascading flows and the output taps from lingual can be utilized by cascading flows down the pipeline.  

Here is the typical usage pattern for creating a Lingual Flow.

   public static FlowDef createFlowDefUsing(Tap<?, ?, ?> mfOtherDetails2Source ,
                                             Tap<?, ?, ?> mfOtherDetailsSource,
                                             Tap<?, ?, ?> mfSpecialInfoSource,
                                             Tap<?, ?, ?> mfAccountInfoSource,
                                             int month,
                                             int year) {


        /**
         * The SQL statement
         *
         * Right now since we are using lower case field names in our cascading flow
         * lingual requires they all be escaped with the double quote
         * otherwise they are converted to upper case during query parsing.
         *
         * Lingual also requires a prefix to each table name, in our case a.
         *
         */

        String statement = "INSERT INTO \"mf_account_info\" (\"col_1\", \"col_2\", \"sub_col_1\", \"col_3\" )" +
                "    SELECT a.\"col_1\", a.\"col_1\" AS \"col_2\", a.\"sub_col_1\",\n" +
                "          a.\"col_3\"\n" +
                "     FROM \"x\".\"mf_special_info\" a, \"x\".\"mf_other_details_2\" b\n" +
                "    WHERE b.\"col_5\" = 'MF'\n" +
                "      AND a.\"col_1\" = b.\"col_1\"\n" +
                "      AND b.\"col_4\" <> 'A'\n"  +
                "   UNION ALL\n" +
                "   SELECT b.\"col_1\", a.\"col_1\" AS \"col_2\", a.\"sub_col_1\",\n" +
                "          a.\"col_3\"\n" +
                "     FROM \"x\".\"mf_other_details_2\" a, \"x\".\"mf_special_info\" b\n" +
                "    WHERE b.\"col_4\" = 'C' AND a.\"col_1\" = b.\"sub_col_1\"";

        // Lingual requires a prefix for the table names
        FlowDef flowDef = FlowDef.flowDef()
                .setName(VwRaresAccountSplitVw.class.getSimpleName())
                .addSource( "x.mf_other_details_2", mfOtherDetails2Source )
                .addSource( "x.mf_other_details", mfOtherDetailsSource )
                .addSource( "x.mf_special_info", mfSpecialInfoSource )
                .addSink( "mf_account_info", mfAccountInfoSource );

        SQLPlanner sqlPlanner = new SQLPlanner()
                .setSql( statement );

        flowDef.addAssemblyPlanner( sqlPlanner );

        return flowDef;
    }

The input taps correspond to table names in the query and the output tap will be where the output  records are written from the query.  The flow can then be run as a flow in a Cascade or on its own in the same manner with other Cascading flows.   

Conclusion

Lingual should definitely be evaluated for implementation of your complex SQL based data workflows before embarking on writing Cascading code.  Frameworks continuously evolve to assist the developers in getting their work completed.  The need for low level Map Reduce being hand written by developers will decrease in the future.  As always in first releases of products, you can see there are some caveats and integration with cascading took some testing.  The caveats will be discussed in a future article.


No comments:

Post a Comment