views:

209

answers:

2

I want to write a map/reduce job to select a number of random samples from a large dataset based on a row level condition. I want to minimize the number of intermediate keys.

Pseudocode:

for each row 
  if row matches condition
    put the row.id in the bucket if the bucket is not already large enough

Have you done something like this? Is there any well known algorithm?

A sample containing sequential rows is also good enough.

Thanks.

+1  A: 

Mappers: Output all qualifying values, each with a random integer key.

Single reducer: Output the first N values, throwing away the keys.

The sorter will randomize the mapper output order for you. You don't know how many qualifiying values a mapper will find, so each mapper has to output all qualifying values from its partition.

In general, I like to build up simple mapper/reducer tools like this which use as much of the Hadoop machinery as possible; I end up reusing them in different tasks.

Karl Anderson
I've done something like this but I'm also using in-mapper counters that will limit the amount of intermediate keys emitted.
Andrei Savu
+1  A: 

Karl's approach works just fine, but we can greatly reduce the amount of data produced by the mappers.

Let K the number of samples you want. We'll assume that this is small enough to hold in memory on one of your nodes. We'll assign a random value to each matching row, and then use a modification of the selection algorithm to find the K smallest values.

At the setup part of each mapper, create a priority queue; a Fibonnacci heap is a good choice for this. We'll be using floats as the priorities; if you have a huge amount of data, doubles may be more appropriate to avoid there being ties. For each row that matches your condition, insert that row into the priority queue with a randomly chosen float between 0 and 1 as the priority. If you have more than K things in your queue, remove the highest valued item (this is opposite of the terminology of a standard Fibonnacci heap).

Finally, at the end of the mapper, emit everything in your queue. For each item you emit, use as the key the priority as a FloatWritable and some representation of the corresponding row as the value (the row ID, or perhaps the entire row contents). You will emit only K values per mapper (or less if there were fewer than K matching rows in that mapper).

In your single reducer, Hadoop will automatically scan through the keys in order from lowest to highest. Emit the rows corresponding to the first K keys you see (the K lowest), then quit.

This works because each matching row has the same probability of having one of the K smallest float values. We keep track of the K smallest floats for each mapper to make sure we don't miss any, and then send them to the reducer to find the K smallest overall.

Bkkbrad
thanks. great answer!
Andrei Savu