Machine Learning In Hadoop: Association Rule Learning

What is Association Rule Learning?

The association rule concept is commonplace in today's society of targeted ads and search box suggestions, since it is designed to find patterns that link data together. The underlying principle is that given a large historical list of item sets, it is possible to use probabilistic analysis to project which item sets are likely to be built now and in the future. An association rule will typically be in the following form:

If a set X is known, then there is a probability P (based on historical data showing all known combinations) that the superset Y (where Y contains all elements in X and additional element(s)) will also be known.

For example, the "Hello World" example used with association rules is in terms of grocery store purchases:

We can use historical transaction data to analyze past purchases and create a list of "association rules" describing the set of items customers typically purchase. Then, if we know that a particular customer is currently buying both milk and eggs, we can reference those rules to calculate the probability that the customer will add butter to their cart. One would assume that the likelihood of adding milk to their cart is higher than the likelihood for a customer adding barbecue sauce to their cart, but even that combination would probably have a non-zero probability (since there probably exists a cart containing those three items, among other things). Since the set {milk, eggs} will likely be part of many customers' purchased items, a completed set of association rules is typically used in conjunction with a probabilistic threshold in order to exclude historical occurrences that do not represent real trends.

Why is this Problem Suitable For Hadoop?

In order to improve the accuracy of the projections, large volumes of historical data must be analyzed in order to create the initial set of association rules. Due to the size of the data that must be analyzed up front, Hadoop is a perfect tool to help with this problem.




Creating the Association Rules

One implementation of the association rule algorithm is available online here (note, this version of association rules includes the confidence calculation in the output but not the support lift calculation). It is written in Python and uses Hadoop Streaming to take advantage of the MapReduce paradigm. The implementation was originally written to work with example data in the form "a,b,c,d", so it did need to be adapted in order to work with the FAA dataset. This was done by transforming the input data to be of the form "columnName_value" so the original implementation could be used without further edits. The necessary changes to fpMap.py are shown below.

header = ['DAY_OF_WEEK','ORIGIN','DEST','DEP_TIME','DEP_DELAY','TAXI_OUT','TAXI_IN','ARR_DELAY'] 
arr = [] 

#loop through records, every 10000 make a FP Tree and generate patterns 
for line in sys.stdin: 
   line = line.strip() 

   if len(arr) > 10000: 
      # create a tree from arr with minimum frequency 0 
      t = create_fp_tree(arr,0) 
      for p in t.fp_growth(): 
         print "%s\t%d" % (','.join(sorted(p[0])),p[1]) 
         arr = [] 
   else: 
      splitLine = line.split('\t') 
      if not "DAY_OF_WEEK" in splitLine: 
         temp = [] 
         for i in range(len(splitLine)): 
            if(splitLine[i] and header[i]):
               temp.append('_'.join([header[i],splitLine[i]])) 
            arr.append(temp) 

#flush any remaining records to a tree and generate patterns 
if len(arr) > 0: 
   t = create_fp_tree(arr,0) 
   for p in t.fp_growth(): 
      print "%s\t%d" % (','.join(sorted(p[0])),p[1])

Association Rules

ARR_DELAY_-1,DAY_OF_WEEK_1      ARR_DELAY_-1,DAY_OF_WEEK_1,TAXI_OUT_10  0.108108
ARR_DELAY_-1,DAY_OF_WEEK_1      ARR_DELAY_-1,DAY_OF_WEEK_1,TAXI_OUT_11  0.162162
ARR_DELAY_-1,DAY_OF_WEEK_1      ARR_DELAY_-1,DAY_OF_WEEK_1,DEST_SJU     0.027027
ARR_DELAY_-1,DAY_OF_WEEK_1      ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_2  0.027027
ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1 ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1,DEST_MCO        0.333333
ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1 ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1,TAXI_IN_9       0.333333
ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1 ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1,DEST_BOS        0.333333
ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1 ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1,DEP_TIME_1004   0.333333
ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1 ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1,TAXI_IN_31      0.333333
ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1 ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1,TAXI_OUT_12     0.333333
ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1 ARR_DELAY_-1,DAY_OF_WEEK_1,DEP_DELAY_-1,ORIGIN_BWI      1.000000

Making the Recommendations

After the rules have been created, they can be used in conjunction with new data in order to determine recommendations.

FlightUtils.py

@outputSchema("inputString:chararray")
def conColCom(day_of_week,fl_date,origin,dest,dep_time,dep_delay,taxi_out,taxi_in,arr_delay):
        values=[day_of_week,dest,dep_time,dep_delay,taxi_out,taxi_in,arr_delay]
        values=filter(None,values)
        values.sort()
        return ','.join(values)

FAARules.pig

Pig script to format the incoming data into the format that matches the rules, pulls back relevant data that has a threshold above the limit.

register 'FlightUtils.py' using jython as utils;

faarules = load '/user/kbell/faa_arules/tab_results.txt' as
(source:chararray, rec:chararray, prob:double);

flight_ontime = load '/user/kbell/flight_ontime_2013_1' using PigStorage(',') as
(year:double, quarter:double, month:double, day_of_month:double, day_of_week:chararray, fl_date:chararray,
airline_id:double, fl_num:chararray, origin_airport_id:double, origin_airport_seq_id:double,
origin_city_market_id:double,origin:chararray,origin_city_name:chararray, dest_airport_id:double,
dest_airport_seq_id:double, dest_city_market_id:double, dest:chararray, dest_city_name:chararray,
crs_dep_time:chararray, dep_time:chararray, dep_delay:int, dep_delay_new:double, taxi_out:int,
wheels_off:chararray, wheels_on:chararray, taxi_in:int, crs_arr_time:chararray, arr_time:chararray,
arr_delay:int, arr_delay_new:double, cancelled:double, cancellation_code:chararray, diverted:double,
crs_elapsed_time:double, actual_elapsed_time:double, air_time:double, distance:double, distance_group:double,
carrier_delay:double, weather_delay:double, nas_delay:double, security_delay:double, late_aircraft_delay:double);

flight_col =
        foreach flight_ontime
        generate
    CONCAT('DAY_OF_WEEK_',day_of_week) as day_of_week,
    CONCAT('FL_DATE_',fl_date) as fl_date,
                CONCAT('ORIGIN_',REPLACE(origin,'"','')) as origin,
                (CONCAT('DEP_TIME_',REGEX_EXTRACT(REPLACE(dep_time,'"',''),'(^0?)(.*)',2)) == 'DEP_TIME_' ? '' : CONCAT('DEP_TIME_',REGEX_EXTRACT(REPLACE(dep_time,'"',''),'(^0?)(.*)',2))) as dep_time,
    CONCAT('DEST_',REPLACE(dest,'"','')) as dest,
    CONCAT('DEP_DELAY_',(chararray)dep_delay) as dep_delay,
    CONCAT('TAXI_OUT_',(chararray)taxi_out) as taxi_out,
    CONCAT('TAXI_IN_',(chararray)taxi_in) as taxi_in,
    CONCAT('ARR_DELAY_',(chararray)distGrouped(arr_delay)) as arr_delay;


result =
        foreach flight_col
        generate
                utils.conColCom(day_of_week,fl_date,origin,dest,dep_time,dep_delay,taxi_out,taxi_in,arr_delay);


pre_rec = JOIN result BY $0, faarules BY source;

post_pre_rec =
  foreach pre_rec
  generate
    faarules.source,
    faarules.rec,
    faarules.prob;

rec =
  filter post_pre_rec by prob > 0.5;

dump rec;

Where Can I Go For More Information?

Or feel free to leave your question or comment below!

No comments:

Post a Comment