views:

272

answers:

2

Hi all!

i have algorithm that will go through a large data set read some text files and search for specific terms in those lines. I have it implemented in Java, but I didnt want to post code so that it doesnt look i am searching for someone to implement it for me, but it is true i really need a lot of help!!! This was not planned for my project, but data set turned out to be huge, so teacher told me I have to do it like this.

EDIT(i did not clarified i previos version)The data set I have is on a Hadoop cluster, and I should make its MapReduce implementation

I was reading about MapReduce and thaught that i first do the standard implementation and then it will be more/less easier to do it with mapreduce. But didnt happen, since algorithm is quite stupid and nothing special, and map reduce...i cant wrap my mind around it.

So here is shortly pseudo code of my algorithm

LIST termList   (there is method that creates this list from lucene index)
FOLDER topFolder

INPUT topFolder
IF it is folder and not empty
    list files (there are 30 sub folders inside)
    FOR EACH sub folder
        GET file "CheckedFile.txt"
        analyze(CheckedFile)
    ENDFOR
END IF


Method ANALYZE(CheckedFile)

read CheckedFile
WHILE CheckedFile has next line
    GET line
    FOR(loops through termList)
            GET third word from line
          IF third word = term from list
        append whole line to string buffer
    ENDIF
ENDFOR
END WHILE
OUTPUT string buffer to file

Also, as you can see, each time when "analyze" is called, new file has to be created, i understood that map reduce is difficult to write to many outputs???

I understand mapreduce intuition, and my example seems perfectly suited for mapreduce, but when it comes to do this, obviously I do not know enough and i am STUCK!

Please please help.

+3  A: 

You can just use an empty reducer, and partition your job to run a single mapper per file. Each mapper will create its own output file in your output folder.

SquareCog
Hi! Thanx for the answer!!! But I am not sure I understand :/ Can you give me some more info? Do you maybe have some examples like that???
Julia
+2  A: 

Hi Julia,

Map Reduce is easily implemented using some nice Java 6 concurrency features, especially Future, Callable and ExecutorService.

I created a Callable that will analyse a file in the way you specified

public class FileAnalyser implements Callable<String> {

  private Scanner scanner;
  private List<String> termList;

  public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException {
    this.termList = termList;
    scanner = new Scanner(new File(filename));
  }

  @Override
  public String call() throws Exception {
    StringBuilder buffer = new StringBuilder();
    while (scanner.hasNextLine()) {
      String line = scanner.nextLine();
      String[] tokens = line.split(" ");
      if ((tokens.length >= 3) && (inTermList(tokens[2])))
        buffer.append(line);
    }
    return buffer.toString();
  }

  private boolean inTermList(String term) {
    return termList.contains(term);
  }
}

We need to create a new callable for each file found and submit this to the executor service. The result of the submission is a Future which we can use later to obtain the result of the file parse.

public class Analayser {

  private static final int THREAD_COUNT = 10;

  public static void main(String[] args) {

    //All callables will be submitted to this executor service
    //Play around with THREAD_COUNT for optimum performance
    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);

    //Store all futures in this list so we can refer to them easily
    List<Future<String>> futureList = new ArrayList<Future<String>>();

    //Some random term list, I don't know what you're using.
    List<String> termList = new ArrayList<String>();
    termList.add("terma");
    termList.add("termb");

    //For each file you find, create a new FileAnalyser callable and submit
    //this to the executor service. Add the future to the list
    //so we can check back on the result later
    for each filename in all files {
      try {
        Callable<String> worker = new FileAnalyser(filename, termList);
        Future<String> future = executor.submit(worker);
        futureList.add(future);
      }
      catch (FileNotFoundException fnfe) {
        //If the file doesn't exist at this point we can probably ignore,
        //but I'll leave that for you to decide.
        System.err.println("Unable to create future for " + filename);
        fnfe.printStackTrace(System.err);
      }
    }

    //You may want to wait at this point, until all threads have finished
    //You could maybe loop through each future until allDone() holds true
    //for each of them.

    //Loop over all finished futures and do something with the result
    //from each
    for (Future<String> current : futureList) {
      String result = current.get();
      //Do something with the result from this future
    }
  }
}

My example here is far from complete, and far from efficient. I haven't considered the sample size, if it's really huge you could keep looping over the futureList, removing elements that have finished, something similar to:

while (futureList.size() > 0) {
      for (Future<String> current : futureList) {
        if (current.isDone()) {
          String result = current.get();
          //Do something with result
          futureList.remove(current);
          break; //We have modified the list during iteration, best break out of for-loop
        }
      }
}

Alternatively you could implement a producer-consumer type setup where the producer submits callables to the executor service and produces a future and the consumer takes the result of the future and discards then future.

This would maybe require the produce and consumer be threads themselves, and a synchronized list for adding/removing futures.

Any questions please ask.

Karl Walsh
Hi! Thank you very much for the proposed solution!! I am sorry I probably didnt clearly specified problem, although i tried. My mistake, i just mentioned Hadoop in the title, but my data set is on a cluster running hadoop, so I should implement it according to Hadoop MaPreduce frameork... I will edit my post now.The data set I am analyzing is 6GB :/ Too much for concurrency to cope with it?????
Julia
Oops, I'm a noob here :DTo redeem myself slightly I ran my code on 100 files, ~61MB each, ~6GB in total. I'm not entirely sure what your file parser does so left out the gory detail and just scanned each line and returned an empty string. A bit contrived I know.The performance wasn't too terrible, thread pool size was 100, so all 100 files were parsed without being queued by the executor service. Total running time was 17 minutes on my Atom processor.Sorry I couldn't answer your question properly. I don't have experience with Hadoop but after reading up SquareCog's answer makes sense.
Karl Walsh
Hi! Thank you very much, you helped a lot, because i can not cope hadoop MR with brain and time I have. I will have few more similar algorithms to implement so I have to try it in way that i am capable to do it.Couldnt get hadoop help anywhere :/So your code i have adopted, and on my Intel 2Ghz, with thread pool 42 took about 20 minutes to parse and output results into new files, but on only 200Mb data (42 files). Again, i have to do some modifications to parser,it has to do some more strict matching, not pure "contains"term, so when i run it all, i let u know results :)
Julia