Hi Text manipulation in Reduce phase seems not working correctly. I suspect problem could be in my code rather then hadoop itself but you never know... If you can spot any gotchas let me know. I wasted a day trying to figure out what’s wrong with this code.
my sample input file called simple.psv
12345 [email protected]|m|1975
12346 [email protected]|m|1981
my Mapper and reducer code
package simplemapreduce;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
/**
*
* @author
*/
public class Main {
public static class SimpleMap extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text inputs,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
String inputString = inputs.toString();
//System.out.println("CRM Map record:"+inputString);
StringTokenizer tokenizer = new StringTokenizer(inputString);
Text userid = new Text();
if (tokenizer.hasMoreTokens()) {
userid.set(tokenizer.nextToken());
Text data = new Text();
if (tokenizer.hasMoreTokens()) {
data.set(tokenizer.nextToken());
} else {
data.set("");
}
output.collect(userid, data);
}
}
}
/**
* A reducer class that just emits its input.
*/
public static class SimpleReduce extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
while (values.hasNext()) {
Text txt = values.next();
String inputString = "<start>" + txt.toString() + "<end>";
Text out = new Text();
out.set(inputString);
//System.out.println(inputString);
output.collect(key, out);
}
}
}
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: SimpleMapReduce <input path> <output path>");
System.exit(1);
}
JobConf conf = new JobConf(Main.class);
conf.setJobName("Simple Map reducer");
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(SimpleMap.class);
conf.setCombinerClass(SimpleReduce.class);
conf.setReducerClass(SimpleReduce.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setNumReduceTasks(1);
JobClient.runJob(conf);
}
}
my sample launch script called simple.sh
#!/bin/bash
hadoop jar SimpleMapReduce.jar \
/full/path/to/input/simple.tsv /user/joebloggs/output
expected output
12345 <start>[email protected]|m|1975<end>
12346 <start>[email protected]|m|1981<end>
actual output
12345 <start><start>[email protected]|m|1975<end><end>
12346 <start><start>[email protected]|m|1981<end><end>
I tested this on Amazon s3 as well on Linux if you could spot the problem and let me know what it is... that will really save some hair on my head!