views:

192

answers:

3

I'm thinking about building a small testing application in hadoop to get the hang of the system.

The application I have in mind will be in the realm of doing statistics. I want to have "The 10 worst values for each key" from my reducer function (where I must assume the possibility a huge number of values for some keys).

What I have planned is that the values that go into my reducer will basically be the combination of "The actual value" and "The quality/relevance of the actual value". Based on the relevance I "simply" want to take the 10 worst/best values and output them from the reducer.

How do I go about doing that (assuming a huge number of values for a specific key)? Is there a way that I can sort all values BEFORE they are sent into the reducer (and simply stop reading the input when I have read the first 10) or must this be done differently?

Can someone here point me to a piece of example code I can have a look at?


Update: I found two interesting Jira issues HADOOP-485 and HADOOP-686.

Anyone has a code fragment on how to use this in the Hadoop 0.20 API?

+1  A: 

It sounds like you want to use a Combiner, which defines what to do with the values your create on the Map side before they are sent to the Reducer, but after they are grouped by key. The combiner is often set to just be the reducer class (so you reduce on the map side, and then again on the reduce side).

Take a look at how the wordCount example uses the combiner to pre-compute partial counts:

http://wiki.apache.org/hadoop/WordCount


Update Here's what I have in mind for your problem; it's possible I misunderstood what you are trying to do, though.

Every mapper emits <key, {score, data}> pairs.

The combiner gets a partial set of these pairs: <key, [set of {score, data}> and does a local sort (still on the mapper nodes), and outputs <key, [sorted set of top 10 local {score, data}]> pairs.

The reducer will get <key, [set of top-10-sets]> -- all it has to do is perform the merge step of sort-merge (no sorting needed) for each of the members of the value sets, and stop merging when the first 10 values are pulled.


update 2

So, now that we know that the rank as cumilative and as a result, you can't filter the data early by using combiners, the only thing is to do what you suggested -- get a secondary sort going. You've found the right tickets; there is an example of how to do this in Hadoop 20 in src/examples/org/apache/hadoop/examples/SecondarySort.java (or, if you don't want to download the whole source tree, you can look at the example patch in https://issues.apache.org/jira/browse/HADOOP-4545 )

SquareCog
Hmm, as far as I understand the combiner is intended to be a 'partial reducer that is running on a specific node'. I cannot truncate the results at that moment because I do not know the total "quality" for the values yet at that point.
Niels Basjes
Update: Interesting suggestion. Doing it this way (combine already truncated subsets) will in general result in a different output than the 'exact' way of doing it. And it may just be good enough for my situation. I'll consider it. Thanks.
Niels Basjes
Could you explain why this can result in a different output? I am thinking that the top 10 items globally are definitely contained in the set of top 10 items from each partition (possibly as top 3 from one, top 2 from another, and top 5 from a third -- but they are in there).
SquareCog
Assume I want the top 2 values (top 2 to make it fit in this comment box) and I have only 1 key 'K'; Mapper 1 produces values (notation: {data,score}): {A,12}, {B,11}, {C,10}. Mapper 2 produces: {D,11},{E,10},{C,9}. If you do it the proposed way then the final result will be {A,12},{D,11}. The desired result is {C,19},{A,12}. In both mappers the {C,x} was cut off by the combiner. The impact of this can be reduced by making the combiner truncate to a bit more than the final. Say combiner truncates to 25 and the reducer to 10. Still this effect will occur.
Niels Basjes
Oh, I didn't realize the score was cumulative. Slammed right now, back with ideas in a day or two.
SquareCog
A: 

If I understand the question properly, you'll need to use a TotalOrderPartitioner.

bajafresh4life
+1  A: 

Sounds definitively like a SecondarySortProblem. Take a look into "Hadoop: The definitive guide", if you like to. It's from O'Reilly. You can also access it online. There they describe a pretty good implementation.

I implemented it by myself too. Basically it works this way: The partitioner will care for all the key-value-pairs with the same key going to one single reducer. Nothing special here. But there is also the GroupingComparator, that will form groupings. One group is actually passed as an iterator to one reduce()-call. So a Partition can contain multiple groupings. But the amount of partitions should be equal the number of reducers. But the grouping also allows to do some sorting as it implements a compareTo-method.

With this method, you can control, that the 10 best/worst/highest/lowest however keys will reach the reducer first. So after you read these 10 keys, you can leave the reduce method without any further iterations.

Hope that was helpful :-)

Peter Wippermann