views:

53

answers:

1

Hi everybody;

Sorry for my poor english. i hope you'll understand my problem. I have a question about hadoop developpment.

I have to train myself on a simple image processing project using hadoop. All i want to do is rotate an image with Hadoop (of course i don't want hadoop use the whole image). I have a problem with the inputFormat. what should i do?

the solutions i found were:

  • put a SequenceFile as inputFormat (but then, i have to make a first map/reduce job, then another to make the rotation?)
  • extends FileInputFormat and RecordReader with a class i developped: ImageWritable. it's contain a BufferedImage (the original image), a bufferedImage[ ] which is the image split by any number we want, and a int[ ] pixelsArray, which is the image's "serialisation".

I'm not very familiar with generics in java too, so i extends the classes like that:

public class ImageInputFormat extends FileInputFormat< IntWritable, ImageWritable>
public class ImageRecordReader extends RecordReader< IntWritable, ImageWritable> 

Is it correct?

In fact, I'm a lost in that jungle, and don't even know what to do/use next. I read lot of paper about hadoop, but i still don't really understand how map/reduce works with inputfile. Is there someone here to help me please?

thank's

A: 

i was looking for help for two weeks, but this afternoon, i found an intersting page: http://wiki.apache.org/hadoop/HadoopMapReduce?highlight=(inputf)

thus, i explored the second point. Now, I can reach the recordReader without error. I looked the class Text code source. that really help me. here is a part of ImageRecordReader

final class ImageRecordReader extends RecordReader<IntWritable, ImageWritable>{

    ImageWritable iwri;
    IntWritable key;


    public ImageRecordReader(TaskAttemptContext context, FileSplit fileSplit) throws IOException, InterruptedException {
        System.out.println(fileSplit.getPath().toString());
        key = new IntWritable(0);
        iwri = new ImageWritable(4);
    }

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        FileSplit fsplit = (FileSplit) split;
        iwri.CreateImageWritableFromSerial(fsplit.getPath());
        iwri.affiche();
    }

and here the main code to make the BufferedImage become writable:

public final class ImageWritable implements Writable{

    private BufferedImage bi;
    private int[] pixelsArray;
    private BufferedImage[] biPart;
    private int nbPart;

    public int[] getPixelsArray(){return pixelsArray;}

    private void setPixelArray(){
        int width = bi.getWidth();
        int height = bi.getHeight();
        pixelsArray = new int[width * height +2];
        Image im = bi;
        PixelGrabber pg = new PixelGrabber(im, 0, 0, width, height, pixelsArray, 2, width);
        try{ pg.grabPixels(); }
        catch(InterruptedException ex){ ex.printStackTrace(); }
        pixelsArray[0] = width;
        pixelsArray[1] = height;
    }
    public int[] getPixelArray(Image im){
        int width = im.getWidth(null);
        int height = im.getHeight(null);

        pixelsArray = new int[width * height +2];

        PixelGrabber pg = new PixelGrabber(im, 0, 0, width, height, pixelsArray, 2, width);
        try{ pg.grabPixels(); }
        catch(InterruptedException ex) { ex.printStackTrace(); }

        pixelsArray[0] = width;
        pixelsArray[1] = height;
        System.out.println("Width = "+ width);
        System.out.println("Heitgh = "+ height);
        return pixelsArray;
    }

    private void createPartfromPixelArray(int[] pixArr){
        MemoryImageSource mis;
        Toolkit tk = Toolkit.getDefaultToolkit();
        int wPart = pixArr[0]/(nbPart/2);
        int hPart = pixArr[1]/2;
        int lgLi = pixArr[0];
        for(int i = 1; i <= nbPart; ++i){
           if(i<=nbPart/2){
               mis = new MemoryImageSource(wPart, hPart,  pixArr, 2+ i*wPart, lgLi);
            } else {
               mis = new MemoryImageSource(wPart, hPart, pixArr, (pixArr.length+2)/2 + (i%(nbPart/2))*wPart,  lgLi);
            }

           biPart[i-1] = RotationToolKit.getBuffered(tk.createImage(mis));
           affiche(biPart[i-1], Integer.toString(i));
        }

    }

    public ImageWritable(int nbPart){
        this.nbPart = nbPart;
        bi = new BufferedImage(1, 1, BufferedImage.TYPE_INT_ARGB);
        biPart = new BufferedImage[this.nbPart];
    }
    public boolean CreateImageWritableFromSerial(Path path){
        try {
            System.out.println(path.toString());
            DataInput dataInput = new FileImageInputStream(new File(path.toString().substring(5)));
            readFields(dataInput);
            return true;
        } catch (FileNotFoundException ex) {
            Logger.getLogger(ImageWritable.class.getName()).log(Level.SEVERE, null, ex);
        } catch (IOException ex) {
            Logger.getLogger(ImageWritable.class.getName()).log(Level.SEVERE, null, ex);
        }
            return false;
    }


    public void write(DataOutput d) throws IOException {
        System.out.println("Width du tableau = "+ pixelsArray[0]);
        System.out.println("Heitgh du tableau = "+ pixelsArray[1]);
        System.out.println("length du tableau = "+ pixelsArray.length);
        for(int o : pixelsArray){
            d.writeInt(o);
        }
        System.out.println();
    }

    public void readFields(DataInput di) throws IOException {
        int w = di.readInt();
        int h = di.readInt();
        int length = w * h;
        System.out.println("Width lue du tableau = "+ w);
        System.out.println("Heitgh lue du tableau = "+ h);
        System.out.println("length calculée du tableau = "+ length);
        pixelsArray = new int[length+2];
        pixelsArray[0] = w;
        pixelsArray[1] = h;
        for(int i = 2; i<pixelsArray.length; i++)
            pixelsArray[i] = di.readInt();

        System.out.println();

        createPartfromPixelArray(pixelsArray);
        bi = createfromPixelsArray(pixelsArray);
    }

    public ImageWritable getPart(int i){ return new ImageWritable(biPart[i]); }
    public BufferedImage getBi() { return bi; }
    public void setWithoutCreateNewBi(BufferedImage bi){ this.bi = bi; }
    public void setBi(BufferedImage bi) {
        Graphics2D g = bi.createGraphics();
        g.setComposite(AlphaComposite.DstIn);
        g.drawImage(this.bi, null, null);
        g.dispose();
    }

    public int getNbPart() {return nbPart; }
    public void setNbPart(int part) {  nbPart = part; }

}

The remaining problems are: * i don't really know if it's still correct * how to enough split the image in order to have small image of hdfs block size?

thank's for people who could help me.