views:

60

answers:

1

Hello

I'm trying to use map/reduce to process large amounts of binary data. The application is characterized by the following: the number of records is potentially large, such that I don't really want to store each record as a separate file in HDFS (I was planning to concatenate them all into a single binary sequence file), and each record is a large coherent (i.e. non-splittable) blob, between one and several hundred MB in size. The records will be consumed and processed by a C++ executable. If it weren't for the size of the records, the Hadoop Pipes API would be fine: but this seems to be based around passing the input to map/reduce tasks as a contiguous block of bytes, which is impractical in this case.

I'm not sure of the best way to do this. Does any kind of buffered interface exist that would allow each M/R task to pull multiple blocks of data in manageable chunks? Otherwise I'm thinking of passing file offsets via the API and streaming in the raw data from HDFS on the C++ side.

I'd like to have any opinions from anyone who's tried anything similar - I'm pretty new to hadoop.

A: 

Hadoop is not designed for records about 100MB in size. You will get OutOfMemoryError and uneven splits because some records are 1MB and some are 100MB. By Ahmdal's Law your parallelism will suffer greatly, reducing throughput.

I see two options. You can use Hadoop streaming to map your large files into your C++ executable as-is. Since this will send your data via stdin it will naturally be streaming and buffered. Your first map task must break up the data into smaller records for further processing. Further tasks then operate on the smaller records.

If you really can't break it up, make your map reduce job operate on file names. The first mapper gets some file names, runs them thorough your mapper C++ executable, stores them in more files. The reducer is given all the names of the output files, repeat with a reducer C++ executable. This will not run out of memory but it will be slow. Besides the parallelism issue you won't get reduce jobs scheduled onto nodes that already have the data, resulting in non-local HDFS reads.

Spike Gronim
Thanks. Regarding non-local HDFS reads- is there no way that a custom input format could be defined that understands that the records are actually pointers to filenames in HDFS that contain the actual data, and which can supply the required information about data locality? (It seems to me processing lists of filenames must be a relatively common use case- I'm surprised there isn't already better support for it...)
Malcolm Wilkins
I don't see a simple way to do that. When you write HDFS files they end up on nodes all over the place, so there's no guarantee that all the files named in one input are on the same data nodes.
Spike Gronim
Many thanks again Spike- this is a very helpful discussion for me. One last quick question- I understand from the above that since all the data from one input split goes to one mapper, it's a problem defining the data locality if that input split contains multiple filenames in HDFS. But what if each input split was constrained to contain just a single filename? Couldn't you then override the default definition of data locality for the input split to be that of the file referred to? Or is there a reason why this would be a bad idea?
Malcolm Wilkins
At that point (1 file per input split) you should just pass the file directly to your C++ executable via Hadoop streaming. Then you will get data locality in the map step. If you output multiple file names in one reduce input you won't get reduce locality.
Spike Gronim