Hadoop Integration: Installing Apache Cassandra on HortonWorks Data Platform Part 2

Calling Cassandra in MapReduce

Cassandra offers support for retrieving data through re-institutions of native MapReduce classes and methods in order to allow Hadoop jobs access to it's data. Cassandra rows (pairs of key + a sorted map of columns) are integrated as input values to Map tasks for processing specific jobs, and specifying which columns to fetch from each row. An implementation of this is reflected through use of the word_count example, which selects just one configurable columnName from each row, looking like:
import org.apache.cassandra.avro.Column;
import org.apache.cassandra.avro.ColumnOrSuperColumn;
import org.apache.cassandra.avro.Mutation;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;

import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;  

...

static final String KEYSPACE = "database"; 
static final String COLUMN_FAMILY = "col1";
static final String OUTPUT_REDUCER_VAR = "output_reducer";
static final String OUTPUT_COLUMN_FAMILY = "col2"; 
private static final String OUTPUT_PATH_PREFIX = "/tmp/output";
private static final String CONF_COLUMN_NAME = "columnname"; 

...

public static class TokenizerMapper extends Mapper<byte[], SortedMap<byte[], IColumn>, Text, IntWritable>
    {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        private String columnName;


        public void map(byte[] key, SortedMap<byte[], IColumn> columns, Context context)
                        throws IOException, InterruptedException
        {
            IColumn column = columns.get(columnName.getBytes());
            if (column == null)
                return;
            String value = new String(column.value());
            logger.debug("read " + key + ":" + value + " from " + context.getInputSplit());

            StringTokenizer itr = new StringTokenizer(value);
            while (itr.hasMoreTokens())
            {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }

...

ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);

Overall

Hadoop integration for Apache Cassandra materializes through instituting Cassandra's helper functions within custom MapReduce jobs. Cassandra objects are formalized and introduced to MapReduce with Cassandra row fragments standing as inputs, specifying which columns to retrieve from each row.

No comments:

Post a Comment