views:

276

answers:

3

I am working on development of an application to process (and merge) several large java serialized objects (size of order GBs) using Hadoop framework. Hadoop stores distributes blocks of a file on different hosts. But as deserialization will require the all the blocks to be present on single host, its gonna hit the performance drastically. How can I deal this situation where different blocks have to cant be individually processed, unlike text files ?

+1  A: 

I think the basic (unhelpful) answer is that you can't really do this, since this runs directly counter to the MapReduce paradigm. Units of input and output for mappers and reducers are records, which are relatively small. Hadoop operates in terms of these, not file blocks on disk.

Are you sure your process needs everything on one host? Anything that I'd describe as a merge can be implemented pretty cleanly as a MapReduce where there is no such requirement.

If you mean that you want to ensure certain keys (and their values) end up on the same reducer, you can use a Partitioner to define how keys are mapped onto reducer instances. Depending on your situation, this may be what you really are after.

I'll also say it kind of sounds like you are trying to operate on HDFS files, rather than write a Hadoop MapReduce. So maybe your question is really about how to hold open several SequenceFiles on HDFS, read their records and merge, manually. This isn't a Hadoop question then, but, still doesn't need blocks to be on one host.

Sean Owen
Let me rephrase my question. So, typically in a the input file is capable of being partially read and processed by Mapper function (as in text files). In my case, I intend to uses 2 Mapper functions, the first one to split the Binary file into smaller (keys, value) pairs and the second one for more traditional Mapper purpose. My question is what i can do to handle binaries (say images, serialized objects) which would require all the blocks to be on same host, before the processing can start. Hope that explains my problem. I appreciate your response.
restrictedinfinity
I don't think this requires all blocks to be on one host. It does require the worker to transfer enough blocks from HDFS to read at least one full record. But that's true no matter what, the data has to reach the worker eventually. I'd say let HDFS just handle it.
Sean Owen
+1  A: 

There's two issues: one is that each file must (in the initial stage) be processed in whole: the mapper that sees the first byte must handle all the rest of that file. The other problem is locality: for best efficiency, you'd like all the blocks for each such file to reside on the same host.


Processing files in whole:

One simple trick is to have the first-stage mapper process a list of filenames, not their contents. If you want 50 map jobs to run, make 50 files each with that fraction of the filenames. This is easy and works with java or streaming hadoop.

Alternatively, use a non-splittable input format such as NonSplitableTextInputFormat.

For more details, see "How do I process files, one per map?" and "How do I get each of my maps to work on one complete input-file?" on the hadoop wiki.


Locality:

This leaves a problem, however, that the blocks you are reading from are disributed all across the HDFS: normally a performance gain, here a real problem. I don't believe there's any way to chain certain blocks to travel together in the HDFS.

Is it possible to place the files in each node's local storage? This is actually the most performant and easiest way to solve this: have each machine start jobs to process all the files in e.g. /data/1/**/*.data (being as clever as you care to be about efficiently using local partitions and number of CPU cores).

If the files originate from a SAN or from say s3 anyway, try just pulling from there directly: it's built to handle the swarm.


A note on using the first trick: If some of the files are much larger than others, put them alone in the earliest-named listing, to avoid issues with speculative execution. You might turn off speculative execution for such jobs anyway if the tasks are dependable and you don't want some batches processed multiple times.

mrflip
+1  A: 

It sounds like your input file is one big serialized object. Is that the case? Could you make each item its own serialized value with a simple key?

For example, if you were wanting to use Hadoop to parallelize the resizing of images you could serialize each image individually and have a simple index key. Your input file would be a text file with the key values pairs being index key and then serialized blob would be the value.

I use this method when doing simulations in Hadoop. My serialized blob is all the data needed for the simulation and the key is simply an integer representing a simulation number. This allows me to use Hadoop (in particular Amazon Elastic Map Reduce) like a grid engine.

JD Long