views:

700

answers:

2

A simple wordcount reducer in Ruby looks like this:

#!/usr/bin/env ruby
wordcount = Hash.new
STDIN.each_line do |line|
keyval = line.split("|")
wordcount[keyval[0]] = wordcount[keyval[0]].to_i+keyval[1].to_i
end

wordcount.each_pair do |word,count|
puts "#{word}|#{count}"
end

it gets in the STDIN all mappers intermediate values. Not from a specific key. So actually there is only ONE reducer for all (and not reducer per word or per set of words).

However, on Java examples I saw this interface that gets a key and list of values as inout. Which means intermidiate map values are grouped by key before reduced and reducers can run in parallel:

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
            public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
              int sum = 0;
              while (values.hasNext()) {
                sum += values.next().get();
              }
              output.collect(key, new IntWritable(sum));
            }
          }

Is this a Java only feature? Or can I do it with Hadoop Streaming using Ruby?

+1  A: 

I haven't tried Hadoop Streaming myself but from reading the docs I think you can achieve similar parallel behavior.

Instead of passing a key with the associated values to each reducer, streaming will group the mapper output by keys. It also guarantees that values with the same keys won't be split over multiple reducers. This is somewhat different from normal Hadoop functionality, but even so, the reduce work will be distributed over multiple reducers.

Try to use the -verbose option to get more information about what's really going on. You can also try to experiment with the -D mapred.reduce.tasks=X option where X is the desired number of reducers.

sris
+3  A: 

Reducers will always run in parallel, whether you're using streaming or not (if you're not seeing this, verify that the job configuration is set to allow multiple reduce tasks -- see mapred.reduce.tasks in your cluster or job configuration). The difference is that the framework packages things up a little more nicely for you when you use Java versus streaming.

For Java, the reduce task gets an iterator over all the values for a particular key. This makes it easy to walk the values if you are, say, summing the map output in your reduce task. In streaming, you literally just get a stream of key-value pairs. You are guaranteed that the values will be ordered by key, and that for a given key will not be split across reduce tasks, but any state tracking you need is up to you. For example, in Java your map output comes to your reducer symbolically in the form

key1, {val1, val2, val3} key2, {val7, val8}

With streaming, your output instead looks like

key1, val1 key1, val2 key1, val3 key2, val7 key2, val8

For example, to write a reducer that computes the sum of the values for each key, you'll need a variable to store the last key you saw and a variable to store the sum. Each time you read a new key-value pair, you do the following:

  1. check if the key is different than the last key.
  2. if so, output your key and current sum, and reset the sum to zero.
  3. add the current value to your sum and set last key to the current key.

HTH.

Kevin Weil