A Guide to User-Defined Functions in Apache Pig

What is Pig?

Pig is a component of the Hadoop ecosystem that abstracts some of the lower level Hadoop functions through the use of a functional scripting language called Pig Latin. We covered the basics already, so this post will dive into what happens when you discover the need to extend Pig Latin's base functionality through User Defined Functions (UDFs).

What do I need to get started?

This guide assumes that Pig and core Hadoop are installed and working properly. If you have everything set up, you will also need a dataset to manipulate. This tutorial uses a public dataset generated from FAA ontime flight data. The actual dataset referenced in the tutorial is the same as the one used in the previous post, and it is available for download by written request to expert@spryinc.com.

I have everything, what now?

Finding Custom UDFs

Since Apache Pig is open source software, a large library of custom User Defined Functions (UDFs) have already been written and published for public use. A collection of these functions is stored in the PiggyBank available through the Apache SVN website. The site organizes the functions into several categories describing their general purpose. The functions are not validated before being officially included in the PiggyBank, but the intent is for users to find and fix bugs as they are discovered. So if you have a need for a function that is not specific to your use case, it may be worthwhile to see what is already available before writing custom code.

Writing Custom UDFs

According to the Apache Pig documentation, most of the available support is for UDFs written in Java, and additional (limited) support is available for Python. In the next sections, we will go through detailed examples written in each language.

Java - Simple Return Type

The class containing the custom function must extend the org.apache.pig.EvalFunc class, which uses generics to declare the return type. For example, the function we are writing will return a Boolean value.

import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;

import java.io.IOException;
import java.util.Iterator;


public class CheckForIntegers extends EvalFunc<Boolean> {}

For functions that only return a single primitive object (integer, string, boolean, etc.), there is only one Java function that needs to be overridden: exec. This function contains the commands that will be executed, given the input from Pig. No matter the actual input, the only argument to this function is always a single "Tuple" object. Inside the function, the Tuple should be dissected so that it represents the input that the function expects.


@Override 
public Boolean exec(Tuple input) throws IOException {

The purpose of this example function will be to iterate over the elements of a bag and return whether or not they can all be successfully parsed as Integers. Inside the exec function, we will first convert the Tuple into the DataBag that we actually passed in.

    DataBag bag = (DataBag)input.get(0);

Next, we will want to retrieve an iterator for the bag so that we can scan through each value and make sure it can be parsed without any exceptions being thrown.

    Iterator it = bag.iterator();

    while (it.hasNext()){
        Tuple t = (Tuple)it.next();


Now that we have an element from the DataBag, we need to be able to access its data. Pig allows access to the individual fields through positional notation, and expects the code to cast the Object to the actual expected datatype.

        try{
            Integer.parseInt((String)t.get(0));
        } catch(Exception e){
            return false;
        }
    }

    return true;
}

Once this class is packaged up into a jar file, the "CheckForIntegers" function can be made accessible from inside a Pig script.

Java - Complex Return Type

When returning databags or tuples, the custom class must also override the outputSchema function. If this step is skipped, Pig will incorrectly assume that the complex object is simply a bytearray.

Let's now decide to write a slightly different UDF that acts more as a filter - reading the input databag, and creating an output bag that only contains valid Integers, along with a second field specifying whether the value is even or odd.

We will need to create a new class, FilterForIntegerBags, that extends EvalFunc and returns a DataBag.

import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

import java.io.IOException;
import java.util.Iterator;

public class FilterForIntegerBags extends EvalFunc<DataBag>{
    @Override
    public DataBag exec(Tuple input) throws IOException {


The next few lines introduce the factory methods that must be used when creating new Tuples and DataBags.

        TupleFactory mTupleFactory = TupleFactory.getInstance();
        BagFactory mBagFactory = BagFactory.getInstance();

        DataBag output = mBagFactory.newDefaultBag();


Next we will include the code to iterate over the input databag and determine whether or not the element can be parsed as an Integer.


        DataBag bag = (DataBag) input.get(0);

        Iterator it = bag.iterator();

        while (it.hasNext()) {
            Tuple t = (Tuple) it.next();

            try {
                String value = (String) t.get(0);
                int intValue = Integer.parseInt(value);


If the current element does not throw an exception, we will need to create a new tuple that should be included in our output databag. The newTuple function takes an argument specifying the size of the tuple, and the set function populates each field with our desired value.

                Tuple currentTuple = mTupleFactory.newTuple(2);
                currentTuple.set(0, value);
                currentTuple.set(1, (intValue % 2) == 0 ? "even" : "odd");


Finally, we will add the newly built tuple to our output databag and close our while loop.


                output.add(currentTuple);
            } catch (Exception e) {}
        }

        return output;
    }
}

The last step of implementing this class is to define the schema that Pig should use when interacting with the output databag. This is done through the outputSchema method, shown below. The major steps are:
  1. Define the list of field names and datatypes and then create a Schema object from that list to represent the tuple.
  2. Create a Schema object for the databag, including the name and the fact that it will contain the tuple(s) described in step 1.
  3. Create and return the overall schema that Pig will make available to the user, including the name and the fact that it will contain the databag described in step 2.
  4. Set up an error message that will be displayed to the user if the schema can't be parsed successfully.

public Schema outputSchema(Schema input) {

        try {

            List<Schema.FieldSchema> fields = new ArrayList<Schema.FieldSchema>();
            fields.add(new Schema.FieldSchema("integerValue", DataType.INTEGER));
            fields.add(new Schema.FieldSchema("indicator", DataType.CHARARRAY));

            Schema tupleSchema = new Schema(fields);
            Schema bagSchema = new Schema(new Schema.FieldSchema("pair", tupleSchema, DataType.TUPLE));
            bagSchema.setTwoLevelAccessRequired(true);
            Schema.FieldSchema bagFs = new Schema.FieldSchema(
                    "pairs", bagSchema, DataType.BAG);

            return new Schema(bagFs);
        } catch (FrontendException e) {
            throw new RuntimeException("Unable to compute FilterForIntegerBags schema.");

        }

    }

Python

User defined functions written in Python are accessible through Pig just as the Java ones are. Schema definitions are handled through an annotation above the function definition. The Python function can be implemented normally. The following example takes in several parameters representing each type of delay in relation to the total delay time, with the intention of returning a string containing the most common delay type for a particular group.

@outputSchema("delayType:chararray")
def findMostCommonDelayType(carrier_delay_ratio, weather_delay_ratio, nas_delay_ratio, security_delay_ratio, late_aircraft_delay_ratio):
    delayTypes = {0 : 'carrier_delay', 1 : 'weather_delay', 2 : 'nas_delay', 3 : 'security_delay', 4 : 'late_aircraft'}
    delays = [carrier_delay_ratio, weather_delay_ratio, nas_delay_ratio, security_delay_ratio, late_aircraft_delay_ratio]
    maxValue = max(delays)
    return delayTypes[delays.index(maxValue)]


Other UDF Languages

In Pig 0.10.0, support for UDFs written in Ruby was added. Pig 0.11.0 supports Groovy UDFs. The development process was done with extensibility in mind, so additional languages may be added with the next release as well.

Using A Custom UDF

Whether the function is downloaded from PiggyBank, or custom code, there are two main steps that are required in order to use one in your Pig script.

First, you must use the REGISTER command to import the UDF into the workspace that Pig uses while running the script. Once Pig knows how to reference the function, the second step is to invoke the custom code using either the alias defined when registering the script or the fully qualified path name to the function in the jar file. Some examples of this are below:

REGISTER hdfs://<NameNodeIP:Port>/user/spry-user/pig/udfs/FlightUtils.jar;
REGISTER 'hdfs://<NameNodeIP:Port>/user/spry-user/pig/udfs/FlightUtils.py' using jython as utils;

If you receive no feedback after running a register command, that means that your file was successfully registered.

How do I know it worked?

If you are able to register your custom function in a Pig script, then you should be able to use it as if it were any function built into Pig Latin. If Pig is unable to find your function, or if it expects arguments different from what your script is making available, errors will be displayed.

For example, the following Pig script should print data with no errors once you update the path to the Python UDF and the FAA input data.


register '<path>/FlightUtils.py' using jython as utils;

flight_ontime = load '<FAA_DATA>' using PigStorage('\\u0001') as (year:double, quarter:double, month:double, day_of_month:double, day_of_week:double, fl_date:chararray, airline_id:double, fl_num:chararray, origin_airport_id:double, origin_airport_seq_id:double,origin_city_market_id:double,origin:chararray,origin_city_name:chararray, dest_airport_id:double, dest_airport_seq_id:double, dest_city_market_id:double, dest:chararray, dest_city_name:chararray, crs_dep_time:chararray, dep_time:chararray, dep_delay:double, dep_delay_new:double, taxi_out:double, wheels_off:chararray, wheels_on:chararray, taxi_in:double, crs_arr_time:chararray, arr_time:chararray, arr_delay:double, arr_delay_new:double, cancelled:double, cancellation_code:chararray, diverted:double, crs_elapsed_time:double, actual_elapsed_time:double, air_time:double, distance:double, distance_group:double, carrier_delay:double, weather_delay:double, nas_delay:double, security_delay:double, late_aircraft_delay:double); 
flight_col_subset = foreach flight_ontime generate  
origin,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay;



airports = group flight_col_subset by origin;


delay_sums = foreach airports generate
group as origin,
SUM(flight_col_subset.carrier_delay) as carrier_delay_sum, SUM(flight_col_subset.weather_delay) as weather_delay_sum, SUM(flight_col_subset.nas_delay) as nas_delay_sum, SUM(flight_col_subset.security_delay) as security_delay_sum, SUM(flight_col_subset.late_aircraft_delay) as late_aircraft_delay_sum;

with_delay_totals = foreach delay_sums generate
origin,
carrier_delay_sum,
weather_delay_sum,
nas_delay_sum,
security_delay_sum,
late_aircraft_delay_sum,
(carrier_delay_sum + weather_delay_sum + nas_delay_sum + security_delay_sum + late_aircraft_delay_sum) as total;

delay_ratios = foreach with_delay_totals generate
origin,
(carrier_delay_sum/total) as carrier_delay_ratio,
(weather_delay_sum/total) as weather_delay_ratio,
(nas_delay_sum/total) as nas_delay_ratio,
(security_delay_sum/total) as security_delay_ratio,(late_aircraft_delay_sum/total) as late_aircraft_delay_ratio; 

result = foreach delay_ratios generate origin,
utils.findMostCommonDelayType(carrier_delay_ratio,weather_delay_ratio, nas_delay_ratio, security_delay_ratio,late_aircraft_delay_ratio);

dump result;

Where can I go for more information?

Or feel free to leave your question or comment below!

No comments:

Post a Comment