views:

119

answers:

4

Dear StackOverflow,

I'm trying to write a multithreaded web crawler.

My main entry class has the following code:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null)
         return;
exec.execute(new URLCrawler(this, url));
}

The URLCrawler fetches the specified URL, parses the HTML extracts links from it, and schedules unseen links back to frontier.

A frontier is a queue of uncrawled URLs. The problem is how to write the get() method. If the queue is empty, it should wait until any URLCrawlers finish and then try again. It should return null only when the queue is empty and there is no currently active URLCrawler.

My first idea was to use an AtomicInteger for counting current number of working URLCrawlers and an auxiliary object for notifyAll()/wait() calls. Each crawler on start increments the number of current working URLCrawlers, and on exit decrements it, and notify the object that it has completed.

But I read that notify()/notifyAll() and wait() are somewhat deprecated methods to do thread communication.

What should I use in this work pattern? It is similar to M producers and N consumers, the question is how to deal with exaustion of producers.

+2  A: 

I am not sure I understand your design, but this may be a job for a Semaphore

finnw
+3  A: 

One option is to make "frontier" a blocking queue, So any thread trying to "get" from it will block. As soon as any other URLCrawler puts objects into that queue, any other threads will be automatically notified (with the object dequeued)

naikus
Yes, that is a solution for a steady state. But how to deal then with situation when none of URLCrawlers queues any urls? With a blocking queue the frontier will block infinitely.
Anton Kazennikov
In that case you can have a crawlerDone() method on your frontier object that is called every time a UrlCrawler finishes work. This method along with the counter approach you suggested, you can test (in your frontier method) if all crawlers have finished. If that is true the get() can return null without blocking
naikus
frontier can be a fixed capacity blocking queue. a good candidate for that capacity is the numberOfCrawlers
Loop
@Loop: If Crawlers enqueue more then one URL (which seems likely), there will be a deadlock. If you use an unbounded blocking queue, you'll have to use "special messages" to pry the blocking thread from the queue when you detect that the activity is over (which problem, you then have to still solve). So all in all I don't think a blocking queue will help here..
Enno Shioji
+1  A: 

I think use of wait/notify is justified in this case. Can't think of any straight forward way to do this using j.u.c.
In a class, let's call Coordinator:

private final int numOfCrawlers;
private int waiting;

public boolean shouldTryAgain(){
    synchronized(this){
        waiting++;
        if(waiting>=numOfCrawlers){
            //Everybody is waiting, terminate
            return false;
        }else{
            wait();//spurious wake up is okay
            //waked up for whatever reason. Try again
            waiting--;
            return true;
        }
    }

public void hasEnqueued(){
    synchronized(this){
        notifyAll();
    }
} 

then,

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null){
        if(!coordinator.shouldTryAgain()){
            //all threads are waiting. No possibility of new jobs.
            return;
        }else{
            //Possible that there are other jobs. Try again
            continue;
        }
    }
    exec.execute(new URLCrawler(this, url));
}//while(true)
Enno Shioji
+1  A: 

I think a basic building block for your use case is a "latch", similar to CountDownLatch, but unlike CountDownLatch, one that permits incrementing the count as well.

An interface for such a latch might be

public interface Latch {
    public void countDown();
    public void countUp();
    public void await() throws InterruptedException;
    public int getCount();
}

Legal values for counts would be 0 and up. The await() method would let you block until the count goes down to zero.

If you have such a latch, your use case can be described fairly easily. I also suspect the queue (frontier) can be eliminated in this solution (executor provides one anyway so it's somewhat redundant). I would rewrite your main routine as

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers);
Latch latch = ...; // instantiate a latch
URL[] initialUrls = ...;
for (URL url: initialUrls) {
    executor.execute(new URLCrawler(this, url, latch));
}
// now wait for all crawling tasks to finish
latch.await();

Your URLCrawler would use the latch in this manner:

public class URLCrawler implements Runnable {
    private final Latch latch;

    public URLCrawler(..., Latch l) {
        ...
        latch = l;
        latch.countUp(); // increment the count as early as possible
    }

    public void run() {
        try {
            List<URL> secondaryUrls = crawl();
            for (URL url: secondaryUrls) {
                // submit new tasks directly
                executor.execute(new URLCrawler(..., latch));
            }
        } finally {
            // as a last step, decrement the count
            latch.countDown();
        }
    }
}

As for the latch implementations, there can be a number of possible implementations, ranging from one that's based on wait() and notifyAll(), one that uses Lock and Condition, to an implementation that uses the AbstractQueuedSynchronizer. All of these implementations I think would be pretty straightforward. Note that the wait()-notifyAll() version and the Lock-Condition version would be based on mutual exclusion, whereas the AQS version would utilize CAS (compare-and-swap), and thus might scale better under certain situations.

sjlee