A Guide to User-Defined Functions in Apache Hive

What is Hive?

Hive is a software package in the Hadoop ecosystem used to abstract files stored in HDFS by adding a schema definition so that the file can be processed using an SQL-based syntax.

What do I need to get started?

This guide assumes that Hive and core Hadoop are installed and working properly. If you have everything set up, you will also need a dataset to manipulate. This tutorial uses a public dataset generated from FAA ontime flight data. The actual dataset referenced in the tutorial is available for download by written request to expert@spryinc.com.

I have everything, what now?

There are two ways to make use of user defined functions through HiveQL. The first is through the use of functions written in Java or any other language that has a JVM implementation (Jython, JRuby, Clojure, Groovy). The second way is through the use of the TRANSFORM...AS syntax that allows any executable script to be used as the transformation step. In the next sections, we will go through detailed examples of each approach.

Writing A Custom UDF

In Java

In order for the correct functions to be available through the code, the Java class needs to extend the org.apache.hadoop.hive.ql.exec.UDF class.

import org.apache.hadoop.hive.ql.exec.UDF;
public class FindMaxDelayType extends UDF {


Hive UDFs are used to analyze information stored in one or more columns of a single row, typically relating columns to others in the group. The example described below takes multiple delay values as input and returns the text description of the column with the maximum value. Finding the maximum value is something that can be done using built-in HiveQL functions, but finding the index containing the maximum value and returning a particular string based on that result would be overly complex.

Each UDF must be contained within its own class, and the functionality must be written inside a function called "evaluate". This function is not specified within the UDF superclass, so the user has full control over the return type and the arguments that are used. Any amount of helper functions and classes can be used in the implementation of the UDF. When parsing the HiveQL, the contents of the "evaluate" function is what Hive will execute.

    public enum DelayTypes {carrier,weather,nas,security,late_aircraft}

    public String evaluate(double carrier, double weather, double nas, double security, double late_aircraft){

        double currentMax = carrier;
        int index = 0;

        if(weather > currentMax) {
            currentMax = weather;
            index = 1;
        }

        if(nas > currentMax) {
            currentMax = nas;
            index = 2;
        }

        if(security > currentMax) {
            currentMax = security;
            index = 3;
        }

        if(late_aircraft > currentMax) {
            currentMax = late_aircraft;
            index = 4;
        }

        return DelayTypes.values()[index].name();
    }
}



In Python

Hive UDFs written in Python must be accessed using the Hadoop Streaming concept, which allows any executable to be used as either the mapper or reducer in a MapReduce transformation. Any Python script can be adapted for Hive to make use of it, as long as the script is set up to read input and write output in a particular manner.

There is no restriction on the packages or number of helper functions that are used in the implementation, and the "sys" package must be imported.

#!/usr/bin/python
import sys
def findMostCommonDelayType(carrier, weather, nas, security, late_aircraft):
   delayTypes = {0 : 'carrier', 1 : 'weather', 2 : 'nas', 3 : 'security', 4 : 'late_aircraft'}
   delays = [carrier, weather, nas, security, late_aircraft]
   maxValue = max(delays)
   return delayTypes[delays.index(maxValue)]


The main part of the script (the part that gets executed first) must be set up to read input from STDIN, splitting on the delimiter for the table.

for line in sys.stdin:
   line = line.strip()
   carrier, weather, nas, security, late_aircraft = line.split('\t')


Finally, throughout the script, anything written to STDOUT will be interpreted as a row of output for the transformation.

   print findMostCommonDelayType(carrier, weather, nas, security, late_aircraft)

Using A Custom UDF

Written In Java

Before launching Hive, the user must specify where the Hive jar files containing UDF definitions are with the following command:

export HIVE_AUX_JARS_PATH=<folder_path>

Alternately, the jar can be added as part of the script inside Hive with this command:

hive> add jar <folder_path>

Both methods are equivalent ways to register the function so that it can be accessed inside the script. One way to make that access easier is to create an alias for the fully qualified function name using the following command:

hive> create temporary function FindMaxDelayType as 'spryinc.training.udf.hive.FindMaxDelayType';

Once the alias is created, the UDF can be used just like any function built into HiveQL.

Written In Python

The script can be added to Hive's classpath using the "add FILE" command. Once this has been used, there is no need to alias the script, since it can be referred to by name without specifying the entire path.

add FILE /home/hardwick/Desktop/FindMaxDelayType.py

Once the script is registered, it can be used as part of the special Hive "TRANSFORM...AS" construct that supports the use of any executable file for transformations.

How do I know it worked?

Java UDF

After creating the UDF, registering it into hive, and creating the alias, the following query should return reasonable results and show no errors.

hive> select carrier_delay, weather_delay, nas_delay, security_delay, late_aircraft_delay, FindMaxDelayType(carrier_delay, weather_delay, nas_delay, security_delay, late_aircraft_delay) as maxDelayType from FLIGHT_ONTIME_2013_1 WHERE carrier_delay IS NOT NULL

Python UDF

The TRANSFORM...AS construct that should be used in conjunction with a Python UDF has four main pieces that will be explained in detail below.

SELECT TRANSFORM (<columns>)
USING 'python <python_script>'
AS (<columns>)
FROM <table>;


The first line allows the user to select a subset of relevant columns that should be passed to the script as the input parameters. In the case of our example, the actual line should be:

SELECT TRANSFORM(carrier_delay, weather_delay, nas_delay, security_delay, late_aircraft_delay)

The second line specifies which executable should be used to handle the actual transformation. For our purposes, the script is written in Python and has already been placed on the Hive classpath through the add FILE command, so the actual line should be:

USING 'python FindMaxDelayType.py'

The third line allows the user to specify the format of the output returned by the script. In our example, there is only one column returned (the string description of the column with the maximum delay), so the actual line should be:

AS maxDelayType

Finally, the last line should be treated as a normal FROM clause in a query. It should select from the table with criteria that creates valid input for the script. In our case, the actual line should be:

FROM FLIGHT_ONTIME_2013_1
WHERE carrier_delay IS NOT NULL;

Where can I go for more information?

Apache Hive Documentation
Or feel free to leave your question or comment below!

No comments:

Post a Comment