views:

146

answers:

2

Hi,

Can someone walk me though the basic work-flow of reading and writing data with classes generated from DDL?

I have defined some struct-like records using DDL. For example:

  class Customer {
     ustring FirstName;
     ustring LastName;
     ustring CardNo;
     long LastPurchase;
  }

I've compiled this to get a Customer class and included it into my project. I can easily see how to use this as input and output for mappers and reducers (the generated class implements Writable), but not how to read and write it to file.

The JavaDoc for the org.apache.hadoop.record package talks about serializing these records in Binary, CSV or XML format. How do I actually do that? Say my reducer produces IntWritable keys and Customer values. What OutputFormat do I use to write the result in CSV format? What InputFormat would I use to read the resulting files in later, if I wanted to perform analysis over them?

A: 

To get yourself running you might want to take a look at www.karmasphere.com They have a little example in their community tool to show you how to get going!

Gerald W
A: 

Ok, so I think I have this figured out. I'm not sure if it is the most straight-forward way, so please correct me if you know a simpler work-flow.

Every class generated from DDL implements the Record interface, and consequently provides two methods:

serialize(RecordOutput out) for writing deserialize(RecordInput in) for reading

RecordOutput and RecordInput are utility interfaces provided in the org.apache.hadoop.record package. There are a few implementations (e.g. XMLRecordOutput, BinaryRecordOutput, CSVRecordOutput)

As far as I know, you have to implement your own OutputFormat or InputFormat classes to use these. This is fairly easy to do.

For example, the OutputFormat I talked about in the original question (one that writes Integer keys and Customer values in CSV format) would be implemented like this:


  private static class CustomerOutputFormat 
    extends TextOutputFormat<IntWritable, Customer> 
  {

    public RecordWriter<IntWritable, Customer> getRecordWriter(FileSystem ignored,
      JobConf job,
      String name,
      Progressable progress)
    throws IOException {
      Path file = FileOutputFormat.getTaskOutputPath(job, name);
      FileSystem fs = file.getFileSystem(job);
      FSDataOutputStream fileOut = fs.create(file, progress);
      return new CustomerRecordWriter(fileOut);
    }   

    protected static class CustomerRecordWriter 
      implements RecordWriter<IntWritable, Customer> 
    {

      protected DataOutputStream outStream ;

      public AnchorRecordWriter(DataOutputStream out) {
        this.outStream = out ; 
      }

      public synchronized void write(IntWritable key, Customer value) throws IOException {

        CsvRecordOutput csvOutput = new CsvRecordOutput(outStream);
        csvOutput.writeInteger(key.get(), "id") ;
        value.serialize(csvOutput) ; 
      }

      public synchronized void close(Reporter reporter) throws IOException {
        outStream.close();
      }
    }
  }

Creating the InputFormat is much the same. Because the csv format is one entry per line, we can use a LineRecordReader internally to do most of the work.



private static class CustomerInputFormat extends FileInputFormat<IntWritable, Customer> {

  public RecordReader<IntWritable, Customer> getRecordReader(
    InputSplit genericSplit, 
    JobConf job,
    Reporter reporter)
  throws IOException {

    reporter.setStatus(genericSplit.toString());
    return new CustomerRecordReader(job, (FileSplit) genericSplit);
  }

  private class CustomerRecordReader implements RecordReader<IntWritable, Customer> {

    private LineRecordReader lrr ;

    public CustomerRecordReader(Configuration job, FileSplit split) 
    throws IOException{
      this.lrr = new LineRecordReader(job, split);    
    }

    public IntWritable createKey() {
      return new IntWritable();
    }

    public Customer createValue() {
      return new Customer();
    }

    public synchronized boolean next(IntWritable key, Customer value)
    throws IOException {

      LongWritable offset = new LongWritable() ;
      Text line = new Text() ;

      if (!lrr.next(offset, line))
        return false ;

      CsvRecordInput cri = new CsvRecordInput(new      
        ByteArrayInputStream(line.toString().getBytes())) ;
      key.set(cri.readInt("id")) ;
      value.deserialize(cri) ;

      return true ;
    }

    public float getProgress() {
      return lrr.getProgress() ;
    }

    public synchronized long getPos() throws IOException {
      return lrr.getPos() ;
    }

    public synchronized void close() throws IOException {
      lrr.close();
    }
  }
}

Dave