views:

821

answers:

4

In many real-life situations where you apply MapReduce, the final algorithms end up being several MapReduce steps.

I.e. Map1 , Reduce1 , Map2 , Reduce2 , etc.

So you have the output from the last reduce that is needed as the input for the next map.

The intermediate data is something you (in general) do not want to keep once the pipeline has been successfully completed. Also because this intermediate data is in general some data structure (like a 'map' or a 'set') you don't want to put too much effort in writing and reading these key-value pairs.

What is the recommended way of doing that in Hadoop?

Is there a (simple) example that shows how to handle this intermediate data in the correct way, including the cleanup afterward?

Thanks.

A: 

You can use oozie for barch processing your MapReduce jobs. http://issues.apache.org/jira/browse/HADOOP-5303

Reko
Thanks for the suggestion. This seems like a lot more than I was looking for.
Niels Basjes
+1  A: 

I think this tutorial on Yahoo's developer network will help you with this: Chaining Jobs

You use the JobClient.runJob(). The output path of the data from the first job becomes the input path to your second job. These need to be passed in as arguments to your jobs with appropriate code to parse them and set up the parameters for the job.

I think that the above method might however be the way the now older mapred API did it, but it should still work. There will be a similar method in the new mapreduce API but i'm not sure what it is.

As far as removing intermediate data after a job has finished you can do this in your code. The way i've done it before is using something like:

FileSystem.delete(Path f, boolean recursive);

Where the path is the location on HDFS of the data. You need to make sure that you only delete this data once no other job requires it.

Binary Nerd
Thanks for the link to the Yahoo tutorial. The Chaining Jobs is indeed what you want if the two are in the same run. What I was looking for is what the easy way is to do if you want to be able to run them separately. In the mentioned tutorial I found SequenceFileOutputFormat "Writes binary files suitable for reading into subsequent MapReduce jobs" and the matching SequenceFileInputFormat which make it all very easy to do. Thanks.
Niels Basjes
A: 

There are actually a number of ways to do this. I'll focus on two.

One is via Riffle ( http://github.com/cwensel/riffle ) an annotation library for identifying dependent things and 'executing' them in dependency (topological) order.

Or you can use a Cascade (and MapReduceFlow) in Cascading ( http://www.cascading.org/ ). A future version will support Riffle annotations, but it works great now with raw MR JobConf jobs.

A variant on this is to not manage MR jobs by hand at all, but develop your application using the Cascading API. Then the JobConf and job chaining is handled internally via the Cascading planner and Flow classes.

This way you spend your time focusing on your problem, not on the mechanics of managing Hadoop jobs etc. You can even layer different languages on top (like clojure or jruby) to even further simplify your development and applications. http://www.cascading.org/modules.html

cwensel
A: 

There are many ways you can do it. (1) Cascading jobs

Create the JobConf object "job1" for the first job and set all the parameters with "input" as inputdirectory and "temp" as output directory. Execute this job: JobClient.run(job1).

Immediately below it, create the JobConf object "job2" for the second job and set all the parameters with "temp" as inputdirectory and "output" as output directory. Execute this job: JobClient.run(job2).

(2) Create two JobConf objects and set all the parameters in them just like (1) except that you don't use JobClient.run.

Then create two Job objects with jobconfs as parameters: Job job1=new Job(jobconf1); Job job2=new Job(jobconf2);

Using the jobControl object, you specify the job dependencies and then run the jobs: JobControl jbcntrl=new JobControl("jbcntrl"); jbcntrl.addJob(job1); jbcntrl.addJob(job2); job2.addDependingJob(job1); jbcntrl.run();

(3) If you need a structure somewhat like Map+ | Reduce | Map*, you can use the ChainMapper and ChainReducer classes that come with Hadoop version 0.19 and onwards.

Cheers