tags:

views:

981

answers:

3

Here's my source code

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PageRank {

public static final String MAGIC_STRING = ">>>>";
boolean overwrite = true;

PageRank(boolean overwrite){
    this.overwrite = overwrite;
}
public static class TextPair implements WritableComparable<TextPair>{
    Text x;
    int ordering;

    public TextPair(){
        x = new Text();
        ordering = 1;
    }

    public void setText(Text t, int o){
        x = t;
        ordering = o;
    }

    public void setText(String t, int o){
        x.set(t);
        ordering = o;
    }


    public void readFields(DataInput in) throws IOException {
        x.readFields(in);
        ordering = in.readInt();
    }


    public void write(DataOutput out) throws IOException {
        x.write(out);
        out.writeInt(ordering);
    }


    public int hashCode() {
        return x.hashCode();
    }


    public int compareTo(TextPair o) {
        int x = this.x.compareTo(o.x);
        if(x==0)
            return ordering-o.ordering;
        else
            return x;
    }
}

public static class MapperA extends Mapper<LongWritable, Text, TextPair, Text> {

private Text word = new Text();
Text title = new Text();
Text link = new Text();
TextPair textpair = new TextPair();

boolean start=false;
String currentTitle="";
private Pattern linkPattern = Pattern.compile("\\[\\[\\s*(.+?)\\s*\\]\\]");
private Pattern titlePattern = Pattern.compile("<title>\\s*(.+?)\\s*</title>");
private Pattern pagePattern = Pattern.compile("&ltpage&gt\\s*(.+?)\\s*&lt/page&gt");


public void map(LongWritable key, Text value,  Context context) throws IOException, InterruptedException {
    String line = value.toString();
    int startPage=line.lastIndexOf("<title>");  

    if(startPage<0)
    {           
        Matcher matcher = linkPattern.matcher(line);                
        int n = 0;
        title.set(currentTitle);
        while(matcher.find()){
            textpair.setText(matcher.group(1), 1);
            context.write(textpair, title);
        }
        link.set(MAGIC_STRING);     
        textpair.setText(title.toString(), 0);
        context.write(textpair, link);
    } 
    else
    {           
        String result=line.trim();
        Matcher titleMatcher = titlePattern.matcher(result);            
        if(titleMatcher.find()){
            currentTitle = titleMatcher.group(1);
        }
        else
        {
            currentTitle=result;
        }               
        }    
   }
    } 

   public static class ReducerA extends Reducer<TextPair, Text, Text, Text>{
    Text aw = new Text();
    boolean valid = false;
    String last = "";

    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        while (context.nextKeyValue()) {
            TextPair key = context.getCurrentKey();
            Text value = context.getCurrentValue();
            if(key.ordering==0){
                last = key.x.toString();
            }
            else if(key.x.toString().equals(last)){
                context.write(key.x, value);
            }
        }
        cleanup(context);
         }
               }

  public static class MapperB extends Mapper<Text, Text, Text, Text>{
Text t = new Text();        
public void map(Text key, Text value, Context context) throws InterruptedException, IOException{
    context.write(value, key);
}
 }

   public static class ReducerB extends Reducer<Text, Text, Text, PageRankRecord>{
    ArrayList<String> q = new ArrayList<String>();

    public void reduce(Text key, Iterable<Text> values, Context context)throws InterruptedException, IOException{
        q.clear();
        for(Text value:values){
            q.add(value.toString());
        }

        PageRankRecord prr = new PageRankRecord();
        prr.setPageRank(1.0);

        if(q.size()>0){
            String[] a = new String[q.size()];
            q.toArray(a);

            prr.setlinks(a);
        }
        context.write(key, prr);
    }
}

public boolean roundA(Configuration conf, String inputPath, String outputPath, boolean overwrite) throws IOException, InterruptedException, ClassNotFoundException{
    if(FileSystem.get(conf).exists(new Path(outputPath))){
        if(overwrite){
            FileSystem.get(conf).delete(new Path(outputPath), true);
            System.err.println("The target file is dirty, overwriting!");
        }
        else
            return true;
    }

    Job job = new Job(conf, "closure graph build round A");

    //job.setJarByClass(GraphBuilder.class);
    job.setMapperClass(MapperA.class);
    //job.setCombinerClass(RankCombiner.class);
    job.setReducerClass(ReducerA.class);

    job.setMapOutputKeyClass(TextPair.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    job.setNumReduceTasks(30);

    FileInputFormat.addInputPath(job, new Path(inputPath));
    SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
    return job.waitForCompletion(true);
}

public boolean roundB(Configuration conf, String inputPath, String outputPath) throws IOException, InterruptedException, ClassNotFoundException{
    if(FileSystem.get(conf).exists(new Path(outputPath))){
        if(overwrite){
            FileSystem.get(conf).delete(new Path(outputPath), true);
            System.err.println("The target file is dirty, overwriting!");
        }
        else
            return true;
    }

    Job job = new Job(conf, "closure graph build round B");

    //job.setJarByClass(PageRank.class);
    job.setMapperClass(MapperB.class);
    //job.setCombinerClass(RankCombiner.class);
    job.setReducerClass(ReducerB.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(PageRankRecord.class);

    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    job.setNumReduceTasks(30);

    SequenceFileInputFormat.addInputPath(job, new Path(inputPath));
    SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
    return job.waitForCompletion(true);
}

public boolean build(Configuration conf, String inputPath, String outputPath) throws IOException, InterruptedException, ClassNotFoundException{

    System.err.println(inputPath);
    if(roundA(conf, inputPath, "cgb", true)){           
        return roundB(conf, "cgb", outputPath);
    }
    else
        return false;
}   

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
    Configuration conf = new Configuration();       
            //PageRanking.banner("ClosureGraphBuilder");
    PageRank cgb = new PageRank(true);
    cgb.build(conf, args[0], args[1]);
}


 }

Here's how i compile and run

javac -classpath hadoop-0.20.1-core.jar -d pagerank_classes PageRank.java PageRankRecord.java

jar -cvf pagerank.jar -C pagerank_classes/ .

bin/hadoop jar pagerank.jar PageRank pagerank result

but I am getting the following errors:

 INFO mapred.JobClient: Task Id : attempt_201001012025_0009_m_000001_0, Status : FAILED
java.lang.RuntimeException: java.lang.ClassNotFoundException: PageRank$MapperA

Can someone tell me whats wrong

Thanks

+1  A: 

Did "PageRank$MapperA.class" end up inside that jar file? It should be in the same place as "PageRank.class".

ZoFreX
A: 

Are internal classes layed out as a separately class file?? Please excuse my insufficient knowledge :-/

Peter Wippermann
that is not exactly an answer, or? should be posted as a question... but yes, each class, including nested, intern or anonymous, is compiled into an own file. *There can be only one class inside a classfile*
Carlos Heuberger
A: 

Try to add "--libjars pagerank.jar". Mapper and reducer are running across machines, thus you need to distribute your jar to every machine. "--libjars" helps to do that.

Victor