A Guide To Hive User Defined Functions: UDTF

Hive provides the ability to supplement built in functionality by allowing the user to implement custom user defined functions, of which there are several types. This post will focus on the User Defined Table Function (UDTF) which takes input and writes out multiple rows of data.

Implementation

Initialize

In order for the correct functions to be available through the code, the Java class needs to extend the org.apache.hadoop.hive.ql.udf.generic.GenericUDTF class. Unlike UDFs, for UDTF implementation, the generic class is the only option.

import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
public class SplitIntoMinutes extends GenericUDTF {


For each input argument, Hive expects the use of an ObjectInspector to access the value. The ObjectInspector also has methods to facilitate type checking, and should be used inside the initialize function to ensure the input is valid.

private PrimitiveObjectInspector startTimeOI = null;
private PrimitiveObjectInspector endTimeOI = null;

@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {

    if (args.length != 2) {
         throw new UDFArgumentException("SplitIntoMinutes() takes exactly 2 arguments");
    }

    if ((args[0].getCategory() != ObjectInspector.Category.PRIMITIVE 

          && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) && 
        (args[1].getCategory() != ObjectInspector.Category.PRIMITIVE 
          && ((PrimitiveObjectInspector) args[1]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
        throw new UDFArgumentException("SplitIntoMinutes() takes 2 strings as parameters");
    }

    startTimeOI = (PrimitiveObjectInspector) args[0];

    endTimeOI = (PrimitiveObjectInspector) args[1];


The initialize method should also be used to define the expected output for this function, including types and aliases. It is important to note that these aliases will not propagate up to the function call in HiveQL. The user will need to specify their own aliases for the result.

    List fieldNames = new ArrayList(2);
    List fieldOIs = new ArrayList(2);
    fieldNames.add("hour");
    fieldNames.add("minute");
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}




Process

Unlike the "evaluate" method used in other types of UDFs, the UDTF implementation requires the "process" method. The expectation is the same though, this method is called once per input row and the processing contained inside this method is used to generate the output. The "forward" method is used (instead of "return") in order to specify the output from the function.

@Override
public void process(Object[] record) throws HiveException {
    final String startTime = (String) startTimeOI.getPrimitiveJavaObject(record[0]);
    final String endTime = (String) endTimeOI.getPrimitiveJavaObject(record[1]);

    String currentTime = startTime;

    while (!currentTime.isEmpty() && !endTime.startsWith(currentTime)) {
        String hour = currentTime.split(":")[0];
        String min = currentTime.split(":")[1];

        if (currentTime.equals(startTime)) {
            forward(new Object[]{hour, min});
        } else {
            forward(new Object[]{hour, min});
        }

        currentTime = incrementByMins(currentTime, 1);
    }
}

Close

The "close" function allows for any cleanup that is necessary before returning from the UDTF. It is important to note that no records can be written out from this function.

@Override
public void close() throws HiveException {}



Using A Custom UDTF

When using a UDTF as part of a query, it is important to note that no other columns can be accessed through the select statement. The arguments to the query should be passed normally, and the aliases for the columns in the output rows should be specified using the following syntax.

select SplitIntoMinutes(start_time, end_time) AS (hour, minute) from time_table;

Where can I go for more information?

Apache Hive Documentation

Or feel free to leave your question or comment below!

1 comment:

  1. Clean and neat way of explanation. Just started with HIVE udf's. Made me clear. Thanks.

    ReplyDelete