tags:

views:

62

answers:

1

Hi Text manipulation in Reduce phase seems not working correctly. I suspect problem could be in my code rather then hadoop itself but you never know... If you can spot any gotchas let me know. I wasted a day trying to figure out what’s wrong with this code.

my sample input file called simple.psv

12345   [email protected]|m|1975
12346   [email protected]|m|1981

my Mapper and reducer code

package simplemapreduce;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;

/**
 *
 * @author
 */
public class Main {


    public static class SimpleMap extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text inputs,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {

            String inputString = inputs.toString();
            //System.out.println("CRM Map record:"+inputString);
            StringTokenizer tokenizer = new StringTokenizer(inputString);
            Text userid = new Text();
            if (tokenizer.hasMoreTokens()) {
                userid.set(tokenizer.nextToken());
                Text data = new Text();
                if (tokenizer.hasMoreTokens()) {
                    data.set(tokenizer.nextToken());
                } else {
                    data.set("");
                }
                output.collect(userid, data);
            }
        }
    }

    /**
     * A reducer class that just emits its input.
     */
    public static class SimpleReduce extends MapReduceBase implements
            Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {

            while (values.hasNext()) {
                Text txt = values.next();
                String inputString = "<start>" + txt.toString() + "<end>";
                Text out = new Text();
                out.set(inputString);
                //System.out.println(inputString);
                output.collect(key, out);

            }
        }
    }

    public static void main(String[] args) throws IOException {

        if (args.length != 2) {
            System.err.println("Usage: SimpleMapReduce <input path> <output path>");
            System.exit(1);
        }
        JobConf conf = new JobConf(Main.class);
        conf.setJobName("Simple Map reducer");

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        conf.setMapperClass(SimpleMap.class);
        conf.setCombinerClass(SimpleReduce.class);
        conf.setReducerClass(SimpleReduce.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        conf.setNumReduceTasks(1);
        JobClient.runJob(conf);
    }
}

my sample launch script called simple.sh

#!/bin/bash

hadoop jar SimpleMapReduce.jar \
  /full/path/to/input/simple.tsv  /user/joebloggs/output

expected output

12345   <start>[email protected]|m|1975<end>
12346   <start>[email protected]|m|1981<end>

actual output

12345   <start><start>[email protected]|m|1975<end><end>
12346   <start><start>[email protected]|m|1981<end><end>

I tested this on Amazon s3 as well on Linux if you could spot the problem and let me know what it is... that will really save some hair on my head!

+1  A: 

The basic flow of data through the system is:

Input -> Map -> Reduce -> output.

As a performance optimization the combiner has been added to allow a computer (one of the many in the hadoop cluster) to do a partial aggregation of the data before it is transmitted to the system where the actual reducer is run.

In the word count example it is fine to start with these values :

1 1 1 1 1 1 1 1 1 1

combine them into

3 4 2 1

and the reduce them into the final result

10

So the combiner is essentially a performance optimization. If you do not specify a combiner it will not change the information going through (i.e. it's an "identity reducer"). So you can only use the SAME class as both the combiner and reducer if the dataset remains valid that way. In your case: that is not true --> your data is now invalid.

You do:

conf.setCombinerClass(SimpleReduce.class);
conf.setReducerClass(SimpleReduce.class);

So this makes the output of your mapper go through your reducer twice. The first one adds: "start" & "end" The second one adds "start" & "end" again.

Simple solution:

// conf.setCombinerClass(SimpleReduce.class);
conf.setReducerClass(SimpleReduce.class);

HTH

Niels Basjes