A 70-30, dual-queue Capacity Scheduler configuration for Hadoop job scheduling

A SELECT or COUNT query in Hive will be executed as a MapReduce job even if the queries are made against a small table or dataset.  Imagine that you want to execute one of these queries which should only take a few seconds... for example, the situation where the set up and tear down of the Hadoop job probably takes longer than the actual work portion of the job.  Also imagine that another user has a complex and long-running job already executing on the cluster.  Bad news for your job.  If you're using mostly Hadoop default settings for the YARN scheduling algorithm, it's possible that your simple job won't be executed until the other is finished.

A few months ago, one of our projects was in this exact situation.  What was worse was that we had multiple users needing to execute relatively simple and short running jobs in order to meet mini-milestones on their part of the project.  We had to find a solution.

Requirements and Validation

We came up with the following requirements and validation criteria concerning the behavior of how jobs should be scheduled.  In particular, how to prevent a small job from being starved while a very large job is already in execution, as well as, giving these small jobs 100% of the cluster resources when the resources are free because we definitely don't want to now impose some new limit on a small job.

Requirements:
1) There should be at most one high resource intensive job running, with enough capacity to allow less intensive jobs to run along side it.
2) Should be able to configure separate queues so that jobs executed on one queue have a higher allocation of resources.
3) Should be able to identify a specific queue that is used when a job is executed.
4) If one user launches a job, they should be able to use 100% of the cluster, but if a different user launches a job the first job's utilization of the cluster should be reset.
5) Ability to configure a queue so that all users of the queue get equal access to cluster resources. Multiple users can execute jobs simultaneously and will always be given an equal share of resources until the point at which users would get less than some percentage of the cluster. At that point, jobs will have to wait before execution.

Validation steps:
1) Run a ("large") job that uses 100% of the cluster (or would use 100% of the cluster under the current, default config).
2) Next, run a ("small") job that only requires e.g. ~20% of the cluster.
3) Ensure the second job can be run and is not stuck waiting for the first job to finish.
4) Attempt to run another ("large") job.
5) Ensure this second ("large") job does not run until the first ("large") job finishes.

Capacity Scheduler configurations

Here is the default configuration for capacity-scheduler:
yarn.scheduler.capacity.maximum-am-resource-percent=0.2
yarn.scheduler.capacity.maximum-applications=10000
yarn.scheduler.capacity.node-locality-delay=40
yarn.scheduler.capacity.root.acl_administer_queues=*
yarn.scheduler.capacity.root.capacity=100
yarn.scheduler.capacity.root.default.acl_administer_jobs=*
yarn.scheduler.capacity.root.default.acl_submit_jobs=*
yarn.scheduler.capacity.root.default.capacity=100
yarn.scheduler.capacity.root.default.maximum-capacity=100
yarn.scheduler.capacity.root.default.state=RUNNING
yarn.scheduler.capacity.root.default.user-limit-factor=1
yarn.scheduler.capacity.root.queues=default
yarn.scheduler.capacity.root.unfunded.capacity=50
and here is our 70-30, dual-queue Capacity Scheduler configuration for capacity-scheduler:
yarn.scheduler.capacity.maximum-am-resource-percent=0.2
yarn.scheduler.capacity.maximum-applications=10000
yarn.scheduler.capacity.node-locality-delay=40
yarn.scheduler.capacity.root.acl_administer_queues=*
yarn.scheduler.capacity.root.big.acl_administer_jobs=*
yarn.scheduler.capacity.root.big.acl_submit_jobs=*
yarn.scheduler.capacity.root.big.capacity=70
yarn.scheduler.capacity.root.big.maximum-applications=1
yarn.scheduler.capacity.root.big.maximum-capacity=100
yarn.scheduler.capacity.root.big.state=RUNNING
yarn.scheduler.capacity.root.big.user-limit-factor=1.25
yarn.scheduler.capacity.root.capacity=100
yarn.scheduler.capacity.root.default.acl_administer_jobs=*
yarn.scheduler.capacity.root.default.acl_submit_jobs=*
yarn.scheduler.capacity.root.default.capacity=30
yarn.scheduler.capacity.root.default.maximum-capacity=100
yarn.scheduler.capacity.root.default.minimum-user-limit-percent=50
yarn.scheduler.capacity.root.default.state=RUNNING
yarn.scheduler.capacity.root.default.user-limit-factor=2
yarn.scheduler.capacity.root.queues=default,big

Requirement Satisfaction

The first four requirements are satisfied. For requirement three, if a user needs to submit a long running, highly intense job, they can use the queue named "big".  To do this, include -Dmapred.job.queue.name=big  along with whatever is the job execution command.  For example, this will run a job in the queue named "big":
yarn jar  /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.2.0.2.0.6.0-101.jar pi -Dmapred.job.queue.name=big 20 1000000009
If this additional java argument is not specified, a users job just goes to the queue named "default" and everything just works as it did before for them.  For example, this will run a job in the queue named "default":
yarn jar  /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.2.0.2.0.6.0-101.jar pi 20 1000000009
Finally, the last requirement describes how the smaller queue (or the "30 in "70-30") behaves. Therefore, requirements are satisfied.

Validation Outcome

The dual-queue configuration can be validated using the steps defined in an earlier section. When the first large job is submitted, 100% of the cluster will be used. After submitting the small job (in step 2), when a portion of the big job completes and releases a slot back to the cluster, the small job will get that slot and will begin execution. In other words, the small job does not preempt the large job in order to immediately obtain a slot to use.  The small job must wait until the next slot frees up. Validation succeeds.

Notes on Capacity Scheduler Configuration Directives

I found it difficult to find documentation about the Capacity Scheduler configuration entries... or at least more resources than just a short explanation or that were sometimes ambiguous or hard to understand for our team.

I discovered that the Capacity Scheduler configuration entry containing "unfunded.capacity" (see the last line in the default Capacity Scheduler box, above) means nothing. There was actually a ticket to clean this up (https://issues.apache.org/jira/browse/AMBARI-6269).  So you can safely remove this entry.  In short, if you're wondering what it means-it says that the capacity of the queue named "unfunded" should be set to 50 (percent of the parent queue's capacity).  But there is no queue defined to be named "unfunded".  So it means and does nothing.

Here are the docs for Capacity Scheduler configuration entries...
Here are a couple of important Capacity Scheduler configurations and their descriptions...

yarn.scheduler.capacity.<queue-path>.user-limit-factor
The multiple of the queue capacity which can be configured to allow a single user to acquire more resources. By default this is set to 1 which ensures that a single user can never take more than the queue's configured capacity irrespective of how idle the cluster is. Value is specified as a float. [1]
If you're running all the job from the same user, by default, you can't take more than the value of the queue. This behavior can be modified by this setting in capacity-scheduler.xml.

yarn.scheduler.capacity.<queue-path>.minimum-user-limit-percent
Each queue enforces a limit on the percentage of resources allocated to a user at any given time, if there is demand for resources. The user limit can vary between a minimum and maximum value. The the former (the minimum value) is set to this property value and the latter (the maximum value) depends on the number of users who have submitted applications. For e.g., suppose the value of this property is 25. If two users have submitted applications to a queue, no single user can use more than 50% of the queue resources. If a third user submits an application, no single user can use more than 33% of the queue resources. With 4 or more users, no user can use more than 25% of the queues resources. A value of 100 implies no user limits are imposed. The default is 100. Value is specified as a integer. [2]

Notes on Fair Scheduler

YARN can be configured to use a different scheduling module altogether. If it turns out that Capacity Scheduler doesn't work for you or maybe you want to try something else, have a look at the Fair Scheduler.

Notes about our Hadoop cluster



External links


Filters and Parameters in Tableau: When to Use Them

Overview

When building a dashboard in Tableau, the analyst may want to filter many worksheets with a single filter selection. Tableau has three ways of making this possible: global quick filters, filter actions, and parameters. Global quick filters will filter each worksheet on a dashboard as long as they each contain that dimension. Filter actions use a "control" worksheet to filter other worksheets on the dashboard based on the selected elements in that sheet. Parameters allow the user to use what would otherwise be a quick filter as a filter if the dimension is different on each sheet.

In this post, we will explore the advantages and disadvantages of using each filter type and how to approach different use cases that call for dashboard filters.

Global Quick Filters

Advantages

Global quick filters are very useful when creating dashboards that contain worksheets that all use the same data source. For example, in a dashboard that displays the dataset in both text and visual forms, global quick filters give the flexibility to present the filter in a variety of formats: single value dropdown, multiple values list, wildcard match, etc. They also allow the user to show an aggregation of all marks with the "(All)" filter.

Disadvantages

Of course, the main disadvantage of global quick filters is that if the analyst has a dashboard with worksheets that each use a different data source, they do not work. This is especially problematic when the component worksheets in a dashboard are displaying similar concepts but are built using datasets optimized for the specific type of worksheet the user needs (table, bar chart, map, etc.). In this case, even if the datasets all have the same columns and data types, the analyst is forced to find an alternative solution.


Filter Actions

Advantages

Filter actions are best used when the user should interact with a specific sheet that acts as the "control." Examples of this may be a text table that lists product categories or a bar chart that lists sales numbers. In this case, the user may want to see other information about specific marks in these worksheets. By setting up filter actions, users can filter other worksheets in a dashboard by selecting the relevant marks.

Filter actions address the problem presented by global quick filters and the use of different datasets. Even if all the worksheets in a dashboard use different datasets, filter actions allow the analyst to assign connections between two fields in different datasets. Analysts can assign a single field connection, multiple fields, or all fields. This Tableau Knowledge Base article demonstrates how to set up the filter and the data relationships.

An example of this might be a text table of product sales in a given sales period. The user wants to see the various worksheets on the dashboard filtered by a specific product or products. In the product sales table, the user can CTRL-click the products they want to see and the other worksheets will be filtered accordingly. This can even work across dashboards.

Disadvantages

While filter actions allow the analyst to filter different datasets with a single click, it takes away the variety of ways the data can be filtered that global quick filters allow. The user must interact with the control worksheet to filter the other sheets connected to it. A simple quick filter (not a global one) that filters the control worksheet will not propagate to the other worksheets in the dashboard. For dashboards where a user might want to filter all worksheets by a given dimension, e.g., date or geographical region, a global quick filter or parameter would be better.


Parameters

Advantages

Parameters address the problems of both global quick filters and filter actions: they allow the user to use a single dropdown or list that affects all connected worksheets in a dashboard and allow the analyst to connect the filter across datasets. Essentially, they act as an independent filter that can then filter the various worksheets. We won't cover how to set them up (they are linked to above) but by using parameters, analysts have greater power over how their worksheets are filtered. As long as the data types match, filters on each worksheet can be controlled by a parameter.

Disadvantages

Unfortunately, parameters have their own limitations. Whereas global quick filters have seven ways to be represented on a dashboard, parameters only have four. Parameters cannot make multiple selections in a filter, e.g., with a list of checkboxes, and they do not have the "(All)" aggregate choice of quick filters. While the inability to select multiple items in a filter cannot be circumvented, the data can be structured to include an "All" row that aggregates the relevant data for that mark. This is not optimal, since the analyst must make this consideration when preparing their data for use in Tableau, but it is the only workaround we have come across.


Conclusion

We have covered the three types of dashboard filters in Tableau and their strengths and weaknesses. As we continue to use Tableau in future projects, we hope to find solutions to some of the shortcomings of these filtering methods. If you have any questions or have a suggestion to work around the challenges we've written about, feel free to leave them in the comments below!

Oozie job stays in PREP mode on HDP2

We recently ran into a weird problem whereby Oozie jobs would not progress past the PREP mode. Running the latest Hortonworks Data Platform v2.0.6.0 release. Turns out that the port number we were using for the jobtracker was not correct. The correct jobtracker port turned out to be 8050. We were trying to use port 8021. Here is the job.properties file that ended up working:


oozie.wf.application.path=hdfs://node1:8020/mnt/WordCount
jobTracker=node2:8050
nameNode=hdfs://node1:8020


where node1 is the NameNode and node2 hosts the Oozie server.

Making Use of AspectJ to Test Hive UDTFs

Unit Testing Hive UDFs

As discussed in previous posts, a User Defined Table Function (UDTF) is a variant on the normal Hive UDF. Instead of reading one or more columns as input and writing a single column as output, the UDTF takes in one or more columns and writes multiple rows.

For UDFs, input records are passed to the following function for processing, with the result being used as the return value:

public static evaluate();

This fits the normal JUnit testing framework, so traditional testing methods can be applied.

However, for UDTFs, the input records are passed to the following function:

public void process(Object[] record);

Notice that the return value is "void". In the case of UDTFs, output values are written through calls to the "forward" method:

protected final void forward(java.lang.Object o);

Since both the process and forward methods have a void return value, this does not conform to the JUnit testing process, and an alternative approach is required.

AspectJ

AspectJ is an extension to the Java language that allows programmers to define "Aspects" - structures that provide mechanisms for overriding functionality in particular methods, or for supplementing additional functionality before or after a particular event. Events can be method calls, modifications of variables, initialization of classes, or thrown exceptions.

This technology is applicable to the UDTF case because it will allow us to apply AspectJ "advice" around the forward method - calling the normal Hive method during normal execution and calling a custom method that will fit into the JUnit framework during the testing phase.

Unit Testing a UDTF

Extension of the GenericUDTF Class

In order to ensure that the AspectJ modification does not affect the implementation of the UDTF, a customized version of the GenericUDTF must be created. This custom class will contain the "testCompatibleForward" method around which the AspectJ advice should be applied.

Aspect Definition

The UDTFTestingAspect.java should be created so that it contains the following code. The "interceptAndSave" function will "intercept" all records intended for the forward method, and save it to an ArrayList for JUnit to inspect.

@Aspect
public class UDTFTestingAspect {
    private static ArrayList<Object[]> result = new ArrayList<Object[]>(); // the ArrayList that will contain the output
    private static boolean initialized = false; // if this is not set to true, the values will be passed to the "forward" method

    /**
     * The user must call this in order to begin the interceptions
     */
    public static void initialize() {
        initialized = true;
    }

    /**
     * This advice is applied at the testCompatibleForward join point. The @Around annotation is used to show that the original
     * functionality will be entirely replaced by this, except in the case where invocation.proceed is called.
     *
     * The intercept functionality will be to capture the Object[] that was intended for the "forward" method and store it
     * into the "result" array for inspection by the unit test.
     * @param invocation Information about the join point where the advice is being applied
     * @throws Throwable If the join point throws an exception, this function will as well
     */
    @Around("execution(* org.spryinc.hive.utils.SpryGenericUDTF.testCompatibleForward(*) throws *)") // the function where the advice is applied
    public void interceptAndSave(ProceedingJoinPoint invocation) throws Throwable {
        if (initialized) {
            // store each of the output Object[] values into the ArrayList
            for (Object i : invocation.getArgs()) {
                Object[] iObj = (Object[]) i;
                result.add(iObj);
            }
        } else
            invocation.proceed(); // normal execution if the intercept behavior is not enabled
    }

    /**
     * @return Returns the ArrayList containing all output values intended for the GenericUDTF.forward method
     */
    public static ArrayList<Object[]> getResult() {
        return result;
    }

    /**
     * Clears the ArrayList so that the results are valid between method calls
     */
    public static void clearResult() {
        result.clear();
    }
}

Modifications to the Unit Tests

Unit tests can be implemented in the normal method - defining the expected outputs, creating the required inputs, and calling the "process" method with the inputs passed in as an argument. In order to take advantage of the AspectJ handling, three actions must be taken:

Make use of the @Before annotation to initialize the testing process

The default behavior is to use the built-in forward method, so that normal usage of the UDTF is unaffected. In order to enable the use of the version of the forward method required for testing, the UDTFTestingAspect.initialize() function must be called. This is accomplished by placing the following lines into the unit test file:


@Before
public void initialize() {
    UDTFTestingAspect.initialize();
}

Retrieve the generated output using a custom function

Since the output from the interceptAndSave method is only available through the Aspect, the following method must be used in order to gain access:

UDTFTestingAspect.getResult()

Make use of the @After annotation to reset the testing process

Since the interceptAndSave method has no built-in mechanism for clearing the results, this must be performed after each test. The following lines will clear the gathered output:

@After
public void clearSavedResults() {
    UDTFTestingAspect.clearResult();
}

Where can I go for more information?

This post assumes that AspectJ is installed and accessible through the IDE. Instructions on installing AspectJ are available here.
For more information on AspectJ
For more information on Hive UDTFs
Or feel free to leave your question or comment below!

Lingual and Cascading Integration

Background

As mentioned in the previous post, Lingual 1.0 was release end of 2013 with Cascading 2.2. It generates Cascading code for SQL queries. Lingual flows can be integrated with a cascading cascade of other cascading and lingual flows.   There are some caveats in doing this since Lingual is a 1.0 release and also some additional tasks that you might not think about for cascading.  For our project, the developer could choose on a per module basis to implement the workflow in Lingual.  In some cases, for moderately complex but simpler queries, the processing was first attempted using Lingual.  Cascading was used as a fall back if the Lingual query's query plan could not be generated.  We found that we had to make a few adjustments and discovered some limitations in Lingual's first release.  Teams will have to decide if benefits of using Lingual outweigh the additional caveats for Schemas and early releases.  Lingual is a promising step in the evolution of tools for data processing on Hadoop.

Schemas
Since it is SQL processing, Lingual needs to know the types (java classes) for all of fields the taps input and output.   This is more type restrictive than Cascading.  In your cascading application, if you don't specify the field names it will parse the header as such using Fields.ALL.

field1, field2, field3

So in cascading you can get away reading the field names form the header like this since you don't need type information.

Tap tap = new Hfs( new TextDelimited( Fields.ALL, true, DataFile.DELIMITER  ), tapPath, sinkMode );


One way to specify tap information automatically to Lingual is to put them in the header separated by a colon.

field1:int, field2:int, field3:String

Then you can utilize their SQLTypeTextDelimited scheme you need to automatically parse the field names and types in the header.

Tap tap = new Hfs( new SQLTypeTextDelimited(  ",", "\"" ), tapPath, , SinkMode.KEEP );


Locking down the default warehouse in Hive

If you are using a secured cluster, you may wish to segregate the data in HDFS so that different users are able to access different data. If you do this, you probably want to use this directory structure for all your services and tools.

For example, you will want to use only external tables in Hive and have the table locations be in specific directories on HDFS in your segregated directory structure. You will also want to lock down the default Hive warehouse location (/apps/hive/warehouse or /user/hive/warehouse) so that users won't be putting data into an insecure location that is accessible by all.

The most intuitive way to lock down the default warehouse is "hadoop fs -chmod 000". However, if you try to create an external table with the internal warehouse at 000 permissions, you will get an error similar to this:

Authorization failed:java.security.AccessControlException: action WRITE
not permitted on path hdfs://<hostname>:8020/apps/hive/warehouse for user
anastetsky. Use show grant to get more details.



Looks like Hive is still trying to write to the internal warehouse even when creating an external table!

This is a Hive bug. It seems as if Hive requires the warehouse to have "write" permission. If you unlock the warehouse and try again, it doesn't look like it actually writes anything to it, it just wants the directory to have the "write" permission. 



There is a workaround.

Hadoop Fundamentals - Custom Counters

Introduction

The Hadoop framework keeps track of a set of built-in counters that it uses to track job progress. As an example, some of the ones displayed in the summary web page in the JobTracker are: the number of bytes read by the input format, the number of launched reduce tasks, the number of data local map tasks, and the number of bytes written by the output format. There is a mechanism that allows user-defined custom counters to be tracked alongside the built-in ones using the existing counter framework.

Creating Custom Counters

In order to create and use a custom counter, the first step is to create an Enum that will contain the names of all custom counters for a particular job. This can be achieved as follows:

enum CustomCounters {RED, BLUE}

Inside the map or reduce task, the counter can be adjusted using the following:

if(red)
    context.getCounter(CustomCounters.RED).increment(1); // increase the counter by 1
else if(blue)
    context.getCounter(CustomCounters.BLUE).increment(-1); // decrease the counter by 1



Programmatically Retrieving Custom Counter Values

The custom counter values will be displayed alongside the built-in counter values on the summary web page for a job viewed through the JobTracker. In addition to this, the values can be accessed programmatically with the following:

long redCounterValue = job.getCounters().findCounter(CustomCounters.RED);

An Introduction to Apache Giraph

A member of the Spry team was selected to write a guest post for the Safari Books Online Blog. Please view the introduction below, but click through here to view the rest of the post.

This blog provides you with an introduction to Apache Giraph, covering details about graphs in general and the benefits of using Giraph.

Graph Vertices and Edges

One task that is extremely important for any analyst is the process of discovering and understanding connections. This process typically manifests in the form of graph processing. A graph can be defined as a collection of vertices and edges, where the vertices can have values and the edges can have directions and weights. Two of the most common ways of interacting with graphs include traversal (following edges between a set of connected vertices) and search (finding vertices in the graph that meet a set of conditions).
This graph structure of vertices and edges is extremely useful in a wide variety of real world applications. For example, imagine the problem of finding driving directions between two addresses. In this scenario, a graph of the roadway system can be built by considering roads as the edges, and intersections as the vertices. A related, larger problem over that same graph might be the process of optimizing the order in which a business makes its deliveries.
Even a natural system like the brain can be treated as a graph – where the billions of neurons are the vertices and the trillions of synapses connecting them are the edges. Once the brain is represented in this manner, research can be conducted to help us understand general brain function in addition to diseases and conditions that affect the passageways – like Alzheimer’s.

Introduction To Machine Learning - Naive Bayes Classifier

This is my first post on machine learning algorithms, so please forgive my briefness in some concepts/examples.

The Naive Bayes model (NBM) is probably the simplest out of all machine learning algorithms, involving Bayesian statistics. It is a great primer model, in the sense that it should be used as a first take on your data. Usually, the NBM is a favorite for text mining/classification and fits those kinds of applications really well. It makes over simplifying assumptions, but none-the-less, a great tool for gaining preliminary insights into the data.

We first start off by defining conditional independence. Suppose we have three random variables A, B and C. These could be anything. For example, let them be the following:
A - will I have pizza for lunch today? (boolean variable, yes/no)
B - did I visit the cafeteria which has my favorite pizzeria (say, Tom's Delicious Pizzas)? (boolean variable, yes/no)
C - what's the name of the place on my receipt where I picked up lunch from today? (multivalued  variable; say I usually visit 8 places in the cafeteria for lunch, out of which Tom's Delicious Pizza is one)

Now, if you look closely at the inter-relationship between these three variables, you'll observe, that if I know that my receipt has/does not have "Tom's Delicious Pizzas" on it, the variable B does not tell me anything new towards predicting A! This is exactly what conditional independence means. "Conditioned" on the fact that I know C, the knowledge of B does not help me in any way to predict A. Hence, we can completely ignore B! In probabilistic terms, this is written as:

P(A| B, C) = P(A|C) provided A and B are conditionally independent, given C

Note: The "given C" part is really important. without any knowledge of C, B would suddenly become very relevant to predicting A. It is absolutely important for us to know C for the above formula to hold. If the following is true:

P(A|B) = P(A)

then it means that A and B are completely, or totally independent. This means, that B really has nothing to do with A. You can imagine A and B to be something like:
A - do you have a cold? (boolean variable, yes/no)
B - did the cow in your farm moo? (boolean variable, yes/no)
As you can see above, A and B have nothing to do with each other, they are completely independent.

HOW TO access HDFS files from Cascading flow


One of the more difficult things to do in Cascading is to access data that is small pieces of data which you might not want to join.  Cascading does have a mechanism to do this but it is limited.

Query Planning vs. Runtime

One of the things that beginners with Cascading have to understand is that most of the java code written actually creates the query plan.  So it is limited where in your code you can actually access HDFS.  This can be done through the FlowProcess object in cascading.  You are provided the flow process object only in certain locations due to the fact that so much of the code is for generating the query plan.

Mechanism

The mechanism in Cascading for accessing an HDFS file is through the openTapForRead method on a Tap.  Note that any local file or HDFS file can be a Tap.   But if the file is local it must be on the local node where the flow is running.  Notice that the method takes a FlowProcess object. 

public TupleEntryIterator openForRead(FlowProcess<Config> flowProcess)
                               throws IOException

http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/Tap.html#openForRead(cascading.flow.FlowProcess)

This method is also available on the flow itself.

http://docs.cascading.org/cascading/2.5/javadoc/cascading/flow/BaseFlow.html#openTapForRead(cascading.tap.Tap)

Once the method is called you have the iterator to the tap to retrieve the tuple data.

 This object roughly corresponds to each session of a flow while it is running.  This FlowProcess is only available for programmers to access in several of the Cascading objects.  These include operations such as Buffer, Aggregator,  Functions, and Filters.    Assemblies do not have this option.  The pattern is to read the hdfs file and set the data in the context in the prepare operation method.  The context then should be destroyed in the clean up method.

Introduction To Machine Learning - The Distributed Naive Bayes Classifier

In my last blog, we discussed about the Naive Bayes algorithm. In this blog, we thing about how we can parallelize work on a distributed system to solve classification problems for enormous data sources.

So say we have documents of 3 classes (just to be more general than the usual binary classification case). First thing we want to do is get priors for our documents. In MapReduce, this would be an easy task:

MAP(docid, doc)
    EMIT(doc.class, 1)

So we basically emit a partial count for the class label for all documents residing on different nodes. On the reducer side, we now need to collect all these partial counts, and add them up:

REDUCE(doc.class, counts [1, 1, ....., 1, 1])
    sum = 0
    for each count in counts
        sum = sum + 1
    EMIT(doc.class, sum / M)

We assume above, that the text corpus size is M. Thus we've gotten our priors. Now, to get the word counts per document type, we'd need to iterate through the document, and for each word, we output a ({word, class}, 1) pair. Let's see how this is done:

MAP(docid, doc)
    for each word w in doc
        EMIT( (word, doc.class) , 1)

The {word, doc.class} pair within the emitted pair is a data structure that can store two values (almost trivial to implement in Java). Note that this pair, to work with Hadoop in Java, must extend the Serializable and Comparable interfaces.
The reducer now just has to count the total number of 1's occurring for a given word-document class pair.

REDUCER((word, doc.class), counts [1, 1, 1, ..., 1])
    sum = 0
    for each count in counts
        sum = sum + 1
    EMIT (word, sum / PRIOR(doc.class)*M)

Here, PRIOR is the prior for document class that we calculated above. PRIOR*M just gives us the total number of docs present in the entire corpus that have document label the same as doc.class

By pre-computing these word probabilities for enormous texts, it is possible to have classification performance of the order of O(kn) where k = number of document classes and n = length of document (in words).

That's all for now. Stay tuned for some code in my next blog (also on machine learning)!

Understanding an Apache Giraph Application

A member of the Spry team was selected to write a guest post for the Safari Books Online Blog. Please view the introduction below, but click through here to view the rest of the post.

Apache Giraph is a framework with methods that must be implemented in order to accomplish your graph processing goal. Two of the examples that come packaged with Giraph areShortest Path and Page Rank. These examples are meant to be extended and altered to fit the needs of any new custom application. This post will go through these two example implementations in detail in order to explain the actions necessary to write a Giraph application.


Determine the Input Data Format

The first step of any Giraph application is to determine the format for the graph input data. There are many different built in Input Formats, all defining data types for the following information describing the graph.

Vertex ID

The Vertex ID is the identifier for a vertex in the graph. The framework does not restrict the definition, so this can be something no more complex than a label, or it can be a fully initialized object with complex pieces. The only limitation is the ability to represent the information in a form that Giraph can parse from the input file.

Vertex Value

The Vertex Value optional, and is another place to store additional information associated with a vertex. Typically, this field is used to store values or objects that should be updated during graph processing.

Edge Tuples

The final piece of input data is the collection of information necessary to define the set of out-edges associated with the source vertex ID. This information is composed of tuples with two elements per edge: the destination Vertex ID and the Edge Weight, where the Edge Weight is optional. Since Giraph only expects out-edges to be specified in this definition, any bi-directional edges must be defined as two separate out-edges with opposite directions.
Click here to view the rest of the post.

SQL with Cascading: The NVL Function

Overview

 

With Apache Hadoop becoming more in more in demand in terms of big data handling, it is likely that many conventional database systems, such as warehouses, RDBMS and the like will forfeit their functionality over to tools and applications which run within the Hadoop sphere.  Just as likely is the fact that in the interest of bypassing Map Reduce programming, Cascading may become one of the primary tools used when performing these integrations.  If such a fact proves true,  much of the integration technique will involve recreation of the common SQL functionalities present in the operation of these conventional database systems.  With Cascading and the vast array of OOB functions and methods, many of these common functionalities may recreated quite easily. In the following example and additional blogs in the series, common SQL functions will be recreated in Cascading.

Function: NVL()   

 

Here, the NVL, or null value function will be considered.  This function is used to replace any null values in a particular column with a declared replacement, and is initialized as follows:

SELECT ...

NVL(column A, ${replacement}) AS A,
NVL(column B, 0) AS B,
NVL(column C, 'Y'),
NVL(column D, 4.01);

...

FROM table A

Cascading Implementation

 

In Cascading, imagine table A as a source tap attached to pipe A.  The respective columns from table have now become fields in the tuple.  Field A1 corresponding to column A from table A has an unknown number of NULL values which should now be changed to meaningful values consisting of the Integer entry "1".  

Debugging Cascading: ... PlannerException: not all head pipes bound to source taps

Overview

 

A common encounter when using the Cascading framework for big data applications is the declaration of numerous data sources that merge into one or two primary transforms or operations using a complicated assembly of pipes.  With this, and additional common encounter is a PlannerException complaining that a single or multiple number of these streams are not bound to their corresponding source taps, specifically:

Cascading: PlannerException not all head pipes bound to source taps. Remaining names []

Such an alert can definitely frustrate the user, and within Cascading, a multitude of scenarios can invoke this error, but the most common cause revolves around the fact that within an entanglement of numerous pipe heads streaming trough various aggregations and filters, one or more heads are not actually connected to an output tail.


Consider an Example with the Following FlowDef..

 

Tap<?, ?, ?> source A

Tap<?, ?, ?> source B

Tap<?, ?, ?> source C

 

Pipe a = new Pipe("a1");

Pipe b = new Pipe("b1");

Pipe c = new Pipe("c1");

 

c = new Each(c, Fields.ALL, new ExpressionFilter(...));

Pipe d = new CoGroup(a, new Fields(A), b, new Fields(B));

Pipe e = new Retain(d, new Fields(declared));

 

return FlowDef flowDef

.addSource(a, source A)

.addSource(b, source B)

.addSource(c, source C)

.addTail(e)

.addSink(e, sink E);

 

Configuring Kerberos Security in Hortonworks Data Platform 2.0

Introduction


Hadoop was originally created without any external security in mind. It was meant to be used by trusted users in a secure environment, and the constraints that were put in place were intended to prevent users from making mistakes, not from preventing malicious characters from harming the system.


Any person who wanted to wreak havoc on a Hadoop system could impersonate another user to circumvent the imposed constraints. In other words, while Hadoop had the means to impose restrictions as authorization policy, it lacked the ability to authenticate users.


As Hadoop became more widespread and saw use in the enterprise arena, security from outsiders became a real concern. The need for authentication became critical.


Enter Kerberos. Kerberos is a single sign-on authentication protocol that uses the concept of “tickets” to provide identity. A user provides their username and password once to the Kerberos server, either directly or through a keytab file, and receives a “ticket granting ticket” (TGT) that can be used to request tickets for any specific service.


The Hadoop community has accepted Kerberos as the de facto standard and it is supported in most Hadoop services and tools. For example, after receiving a TGT, a user can use it to obtain a ticket for Hive and then use the Hive ticket to interact with the Hive service.

Configuring Kerberos in HDP 2.0

Install, configure and start your Kerberos server

The following steps should be performed on the machine where your Kerberos services will be. It is recommended that they be installed on a server separate from your Hadoop cluster.


1. Install the Kerberos server and client packages
sudo yum install krb5-server krb5-workstation

EC2 instances created from "Marketplace" disallow root volume reattach to different instance

The AWS Marketplace allows EC2 users to create instances based on AMI's offered by third parties.  Some instances come with a fee.  This fee is in addition to the standard Amazon EC2 instance charges.  Other instances are free (any of the standard EC2 instance charges still apply).


I am not entirely clear what the real value of "Marketplace" is, but we recently discovered a limitation that is quite precarious.

We've only tried a few of the Marketplace instances, but it seems that for any instance created from a Marketplace AMI, for example, your basic CentOS 6.4 minimal installation, you are unable to re-attach the root volume to another EC2 machine.


This is a pretty basic feature of anything that calls itself a "virtual disk".


We were sorely disappointed with this when we ran in to it.  Thankfully, I wasn't in a bind.  I wasn't in a situation where I was totally expecting to be able to do this because maybe I needed to salvage a system or its data.


After detaching the root volume from a Marketplace instance and attempting to re-attach it to another instance, you'll receive this error message:

   "'vol-ffffffff' with Marketplace codes may not be attached as a secondary device."

Screenshot of actual error message:   "'vol-ffffffff' with Marketplace codes may not be attached as a secondary device."



Here at Spry, we use EC2 instances from time to time.  I've had to perform this operation many times in order to get a system back in working order or salvage the data.  In each of those situations, it was paramount to our business that we be able to perform those function.


To us, something this major needs to be mentioned up front.  If it weren't for a random user's review of a specific Marketplace offering, we would not have ever known!  We would have gone ahead as normal.  As if it were any other AMI.  Then, it would come.  The time when, we need to mount the EBS-backed storage to another machine.  The EBS-backed storage that we've always been able to re-attach to another machine.  The storage that we've been paying for.  We would be out of luck.


IMO, this does not meet the definition of a virtual machine.  With a virtual machine, there is this ability to use a virtual disk in all the ways that a virtual disk can and should be used.  For instance, do things with it that you would expect to be able to do with a physical hard disk drive.  Of course there are things like instance-storage which disappears when the machine is turned off.  But before you even get too deep in to EC2, you would probably investigate the differences between EBS storage and the instance storage.

Cascading: Break Point Debugging

Overview

One of the challenges that accompanies using Cascading to build large applications surfaces in debugging issues between streaming the source taps and writing the sink.  Since tangible data really isn't made available until flow completion, attempting to pinpoint which part of a faulty flow definition is failing can be a frustrating experience.  In comes cascading Debug "pipe" and the "break point" debugging technique.  This approach will aid in stepping through a flow in order to isolate any problems that are obvious in the final writeout.

Cascading's Debugger Pipe

The Debug function is a debugging tool offered within Cascading's base libraries which allows for tuples to be written to stdout.  A developer can visualize what is being streamed along a pipe without having to open and view a writeout file at the end of a process.  Here, the Debugger "pipe" is called such because it is used in the form of an assembly resembling the following:

Pipe A = ${some_assembly}

A = new Each(A, new Debug(true));

...

.addTail(A)
.addSink(A, sinkA);

The Break Point Technique 

The "break point" reference is used here because the above debugging assembly can be set up as "break points" at various points in a flow in which data streams can be forked off to stdout in order to analyze data form at particular placements within the flow process.  More often than not, data streams undergo various transformations during the flow process, and being able to drive a wedge in between these steps can aid tremendously in the debugging process.  The example below uses two break points:

SQL with Cascading: Recreating the SQL "ROW ID" Functionality

Overview

For those who already have a basic understanding of Cascading dynamics, the contexts in which it is used, and what it aims to accomplish upon working with structured data, it should become clear that Cascading may become one of the top go-to frameworks for reimplementing database programming on top of Apache Hadoop.  In this, it becomes a task to even further restructure and again re-implement many of the typical database concepts and operations in Cascading friendly fashion when the framework itself fails to include such elements in it's already well-extended function library.  When such obstacles are encountered, one may leverage one of many of the operator extensions offered by Cascading in which custom-built processes can bridge the gap between a particular SQL action and it's lack of direct support in the function library.

A list of common database operations that may need to be implemented in Cascading include:

1.) DECODE
2.) CASE
3.) ROW ID
4.) recreating primary keys (relates to # 3)
5.) MINUS

Lists such as this can be easily extended.  None the less, this following post will present an example of how to implement # 3 on the list and a special problem that becomes associated due to sequencing across a distributed system such as Apache Hadoop when data files become split apart and then assembled back together through the MapReduce machine.

The Case

A particular data stream is spawned from the re-implementation of a SQL statement that includes a ROW_ID column, or simply an assigned key corresponding to each incoming record.  This particular field may preexist but more often than not is appended on the fly through a SELECT ROW ID statement.  In the latter case, one of many seemingly obvious solutions is to implement a custom function resembling something similar to the following:

public class AddRowID extends BaseOperation implements Function {

    long count;

    public AddRowID(Fields fields, long count) {

        super(1, fields);
        this.count = count;

    }

@Override

public void operate(@SuppressWarnings("rawtypes") flowProcess, functionCall) {


        functionCall.getOutputCollector().add(new Tuple(count++));

    }
}


and possibly implemented through the following assembly:

Pipe pipe = new Each(previousPipe, Fields.TYPE, new AddRowID(new Fields(A), 0));

PL/SQL Trick For PL/SQL DECODE To Lingual Conversion

 I've recently been exploring parts and bits of Lingual, another Concurrent Inc. effort to ease the transition from SQL/Relational Databases to Hadoop. Lingual is a library that converts ANSI SQL-99 code directly to MapReduce jobs, to run on a cluster. Specifically, I've been exploring the use of Lingual to generate Cascading flows.

Lingual supports these ANSI SQL commands in the text query that you provide it. So basically, if your query code contains anything outside of these functions, it will not run with Lingual.

This is a short post on a trick that helps convert a very standard DECODE operation that is often used in PL/SQL code.

I had the following instance of code:

 AND (                (ecd.code_a <> 'US'
                       AND (ecd.code_b =   

                                        DECODE (fas.code_b,
                              
          'INTL', ecd.code_b,
                              
          'ALL', ecd.code_b,
                              
          fas.code_b
                              
         )
                            )
                       )
                    OR (ecd.code_a = 'US'
                        AND (ecd.code_b =
                              
  DECODE (fas.code_b,
                              
          'US', ecd.code_b,
                              
          'ALL', ecd.code_b,
                              
          fas.code_b
                              
         )
                            )
                       )
     )
So we have a pretty mid-level complicated query to convert into Lingual supportable format. One thing to observe here, is that the DECODE is, in essence, just like a set of SWITCH...CASE...DEFAULT statements clubbed together.  A clean way, to then convert it into Lingual, is by doing something like so:

AND (
                        (ecd.code_a <> 'US'
                         AND (fas.code_b IN ('INTL', 'ALL') OR
                                  ecd.
code_b = fas.code_b)
                             )
           OR
                        (ecd.
code_a = 'US'
                         AND (fas.
code_b IN ('US', 'ALL') OR
                                   ecd.
code_b = fas.code_b)
                        )
    )


It is important to remember, that any type of DECODE statement can thus be converted into a set of AND/OR conditions thereby emulating each of the cases within the DECODE. For me, this was the difference between my decision to proceed with Lingual or not!

Thanks, I hope this helped! Happy coding!

Lingual SQL on Hadoop

Background

There are many projects that attempt to utilize SQL for data analytics and management on Hadoop.  Lingual from Concurrent, Inc. is a batch processing ANSI SQL 99 engine.  It was just made available GA at the end of 2013.   With high latency, it is more similar to Hive rather than a realtime SQL processing tool such as Impala or Apache Dril..   Lingual utilizes the Optiq SQL query parser writen by Julian Hyde to parse incoming queries.  The cascading team then wrote Lingual to utilize this information to generate Cacading code to actually perform the query.   Interestingly enough the Apache Drill team also utilize Optiq for their query parsing.

Lingual has several parts that could be useful for specific use cases: 

1.  Command line shell similar to Hive which can be utilized for queries
2.  Integration with Cascading by means of Cascading flows
3.  JDBC integration with existing tools they need to utilize HDFS data

Use Case


The use case that was most specific to one of our applications is integration with cascading flows.   The team is developing a financial data workflow processing tool which contains a tremendous amount of business logic and SQL processing.   Depending on developer skill with Cascading and the complexity of logic to implement, the developer might choose to utilize Lingual instead of writing Cascading code.  Our approach was similar to the decision to be made when utilizing Cascading instead of pure Map Reduce.    

Apache Giraph: A Brief Troubleshooting Guide

Apache Giraph is a component in the Hadoop ecosystem that provides iterative graph processing on top of MapReduce or YARN. It is the open-source answer to Google's Pregel, and has been around since early 2012. The tool has a very active developer community that contributes to the code base. However, since the emphasis is on making Giraph as stable and robust as possible, there are some gaps in the documentation about how to use the tool - specifically in the realm of interpreting error messages.

This blog post will contribute information on some of the most common error messages that might be encountered when running the Giraph examples.

Class Not Found Exceptions

In order to avoid these types of errors:

Exception in thread "main" java.lang.ClassNotFoundException: org.giraph.examples.SimpleShortestPathsVertex

Ensure that any necessary jars from Giraph are being located on the classpath. One way to do this is to copy the jar files from giraph-core/target and giraph-examples/target into the Hadoop lib folder with the following commands:

cp giraph-core/target/*.jar /usr/lib/hadoop/lib
cp giraph-examples/target/*.jar /usr/lib/hadoop/lib

Accessing Hadoop web interfaces as dr.who

If you enable Kerberos security on your Hadoop services but do not set up HTTP authentication for its web interfaces, you will see one or more of the following messages:
  • Logged in as: dr.who
  • Permission denied: user=dr.who
  • Access denied: User dr.who does not have permission to view job
  • No groups available for user dr.who
Dr.who is an anonymous user that is used to access a secure web service.

This can happen if you rely on the Ambari Secure Wizard (ASW) to set up your Kerberos security, as this is one of the parts of the configuration that the ASW does not touch. You need to set this up manually. Here is a good guide on doing that: http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.0.6.0/ds_Hadoop/hadoop-project-dist/hadoop-common/HttpAuthentication.html

It will set it up so that the user will be accessing the web interfaces as the logged in Kerberos principal.

The significant fields are the type (kerberos), principal (HTTP), keytab path (HTTP), initializers, and secret file. The secret file's contents can be anything for testing purposes, as long as that file is readable by all the Hadoop services.

Here is a good resource for testing and troubleshooting authenticated HTTP access: http://hadoop.apache.org/docs/r0.23.9/hadoop-auth/Examples.html.

Insights into Tableau Performance Testing

Introduction

Here at Spry, Tableau is one of our go-to data visualization tools. In many cases, however, we seem to have performance issues when it comes to loading and filtering Tableau workbooks. Although we don't always work with very large amounts of data (~5-8M rows in a typical data set), we do utilize a number of types of visualizations, filter types, etc. This can result in less than optimal performance conditions when a particular dashboard uses complex interactions.

The Experiment

To pinpoint performance bottlenecks, we set out to benchmark Tableau. At it's core, the experiment is quite simple: take create a dashboard "template," use increasing sizes of data sets, use various connection types, and measure performance with Tableau's performance recording feature.

Here is our experiment setup:
  • Test workbook: one from a client that used line charts, tables, maps, dashboard filter actions, and parameter filtering
  • Data set sizes: 500,000 rows, 2 millions rows, 5 millions rows 
  • Connection types: Hortonworks Hive, Oracle Database 11g Express Edition, Tableau Data Extract
 In our testing, we measured the processing time for the following:
  • Load time of the dashboard
  • Filtering a map and removing the filter
  • Parameter filters of a text table

 

The Results

Our results were not surprising: working with flat files is faster than working with an ODBMS connection and working with extracts is faster than working with live connections. However, an extract of a CSV (or similar) isn't always an option when working with clients. In this case, the live Oracle connection had an acceptable performance rate on Tableau server. We are still analyzing our results and will followup in a future blog post.

Git for Beginners

Git is the defacto standard version control for software development on Linux.  For new comers to git, there is often lack of non-technical resources to quickly get them up to speed. This article attempts to provide some quick insight on how beginners can utilize git without too much angst.

Optimistic vs. Pessimist Locking

File locking models are used by source code systems to prevent developers from corrupting each others work in the event that they need to make changes to the same file.There are two types of locking mechanisms for source code systems that have been used in the history of modern software development.

In pessimistic locking, the developer must lock the file every time they want to make a change.  This is said to be pessimistic because from the version control standpoint, the change the developer is making is assumed to be a conflicting change with the existing version of the code.  In order to prevent the conflict, a lock is given to the developer to work on the file through a checkout process.   Conflicts are prevented so merges are not required.  Often the lock inhibited development on the same file unnecessarily and slowed development. This occurs often because the changes multiple developers were making did not cause conflict and could be easily merged by a developer or machine.

Optimistic locking version control systems allow multiple changes to be made to the same file at the same time while being optimistic that the changes can be resolved through a future merge.  Optimistic locking has been the locking mechanism of choice in the popular source control systems in recent years such as git or svn.  Developers like the flexibility of being able to change any file at any time they would like but they must remain cognizant of the ramifications of the optimistic locking approach on other developers.