Writing Hive GenericUDFs

Hive UDF's often come across as a blessing to the Hive developer since it is an excellent means to customize computation on your data. To illustrate my point, say we need to write a UDF that breaks a string of numbers on a delimiter, sorts the numbers and returns a string of the same numbers with the same delimiter, but now in sorted order. This is a relatively simple task to do using a UDF. 

public class SortNumbers extends UDF {

    public static Text evaluate(Text str, Text delim)  {

        String [] numbers = str.toString().split(delim.toString());
      
        if (numbers.length == 0) return str; // return the original
                                             // string as is
        ArrayList<Integer> num_array = new ArrayList<Integer>();

        for (int i=0; i<numbers.length; i++)
            num_array.add(new Integer(Integer.parseInt(numbers[i]))); // insert all string numbers into an array

        Collections.sort(num_array); // well, that was easy. Sort all Integers

        StringBuilder sb = new StringBuilder(); // optimized for String concat
        for (Integer s : num_array) {
            sb.append(Integer.toString(s));
            sb.append(delim.toString()); // note, an extra delim would appear
        }                                // after the last string is appended

        return new Text(sb.toString().substring(0, sb.toString().length()-1));
    }
}




Ok, so we have  some things going on above. First, we read the data into an Integer array. Then, we sort it using the Collections.sort() convenience function. After that, we build another String object with the help of a StringBuilder class. 

While this UDF is super easy to write, there are a few limitations. Firstly, the evaluate method on a derived class SortNumbers of class UDF  cannot understand complex structures like structs, unions or even arrays of structs. Secondly, it is not possible to provide evaluate() with variable arguments; i.e in our case, evaluate() expects two input arguments, and would only work if provided exactly those number of arguments. 

What is the most complex data type that a UDF evaluate() method can handle then? The answer is an twofold: either a hash map, or an array of primitive types.  Lets see a class which implements a method that uses an array of primitives. 

public class SortNumbersArray extends UDF {

    public static ArrayList<IntWritable> evaluate(ArrayList<Text> arr)  {       
        ArrayList<IntWritable> num_array = new ArrayList<IntWritable>(); 

        for (int i=0; i<numbers.length; i++)
            num_array.add(new IntWritable(Integer.parseInt(arr.get(i).toString()))); // insert all string numbers 

                                                                                     // into an array
        Collections.sort(num_array); // well, that was easy. Sort all Integers
                return num_array;
    }
}


So we have achieved the same objective as the SortNumber.evaluate() method, except that SortNumberArray.evaluate() now takes in an ArrayList<> object and throws out an ArrayList<> object. This would be the best way to handle an array of primitive data types with the simplest of UDFs. 

One point that is worth noting: In the above code, I use some hadoop I/O data types, like Text and IntWritable. What are these? Text is a version of the String class, the only difference being that it is optimized for being sent over a network. In other words, using Text, it is possible to serialize a Text object into a stream of bytes, pass this stream over a computer network, and read from a stream of bytes to build up a Text object. In common programming jargon, this is referred to as SerDe (short for Serialization - Deserialization). IntWritable, as you can guess, is a SerDe optimized version of the Integer class. To get a complete list of all "Writable" classes that come with the org.apache.hadoop.io package, refer to this page.

Now lets assume that our input is not only a list of Text objects, but a list of a complex data type: a struct with two fields - an integer field to sort the object on, and a string that serves as some metadata associated with the integer. A real-life example of the need of such an object could be - the integer field denoting the  number of times a user visited a particular posting on Craigslist, and the string being the user ID. We are trying to sort all users according to the number of visits to a posting over a day. 

The above problem cannot be modeled with the UDF class we wrote above. How does Hive know what we are passing to the evaluate() method is an array of Integers, Strings or Structs? Furthermore, a developer would want to add an optional flag to the input arguments to the function that indicates the top K number of users that he/she is interested in. How can we incorporate this variability into  our function? 

The answer is GenericUDFs. GenericUDFs require three methods to be implemented. These are 
  • public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException
  • public Object evaluate(DeferredObject[] arguments) throws HiveException
  • public String getDisplayString(String[] strings) 

initialize(ObjectInspector [] args): This function does three things in general -
  • checks the total number of arguments and verifies if they are as expected
  • checks the data types of all arguments, and sub-data types of complex data type arguments if necessary
  • initializes the variable to return 
  • returns an ObjectInspector that points to a type of the variable to be returned
evaluate(DeferredObject[] arguments): This function is exactly like the evaluate function for simple UDF's, except, it takes in an array of type DeferredObject. Using private ObjectInspectors initialized during the initialize() call, we can recreate SerDe objects and manipulate 
them to get our results. Make sure that the object that evaluate() returns is of a type that is described by the ObjectInspector returned by initialize()

getDisplayString(String[] strings): This function is called to print the name and arguments passed to the UDF whenever Hive logs error messages. Making sure that this function shows the right function usage can save a lot of debugging time when going through long logs.

And that is all, really! Once these functions have been implemented, it is easy to use them: You use them after issuing the following two commands like you would for any UDF : 
  • ADD JAR /path/to/jar/MyJar.jar; 
  • CREATE TEMPORARY FUNCTION sortStructArray AS 'com.nkelkar.udf.hive.GenericUDFSortStructArray';
A liberally documented version of a GenericUDF that implements the sorting problem that we just viewed above can be found here

Thanks and I hope you enjoyed reading about GenericUDFs. Please post any questions/comments below. Happy coding!


 

No comments:

Post a Comment