views:

228

answers:

1

I have created a concurrent, recursive directory traversal and file processing program, which sometimes hangs after all parallel computations have finished but the 'primary' thread never continues with other tasks.

The code is basically a fork-join style concurrent aggregator, and after the parallel aggregation completes, it should display the results in a Swing window. The trouble with the aggregation is that it needs to generate a tree and aggregate the statistics of the leaf nodes up in the hierarchy.

I'm sure I made a concurrency mistake but can't find it. I included the relevant part of my code at the end of the post (code comments removed for brevity, sorry for the 150 lines, If required, I could move it to an external location).

Context: Java 6u13, Windows XP SP3, Core 2 Duo CPU.

My questions are:

What could be the cause for this random hang?

Is there a better way of doing a concurrent directory traversal, perhaps in a form of an already existing library?

Would be the fork-join framework from Doug Lea (or Java 7) a better framework for the aggregation / directory traversal, and if so, how should I rethink my implementation - in the conceptional level?

Thank you for your time.

And the code excerpt:

private static JavaFileEvaluator[] processFiles(File[] files) 
throws InterruptedException {
    CountUpDown count = new CountUpDown();
    ThreadPoolExecutor ex = (ThreadPoolExecutor)Executors
    .newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    JavaFileEvaluator[] jfes = new JavaFileEvaluator[files.length];
    for (int i = 0; i < jfes.length; i++) {
        count.increment();
        jfes[i] = new JavaFileEvaluator(files[i], count, ex);
        ex.execute(jfes[i]);
    }
    count.await();
    for (int i = 0; i < jfes.length; i++) {
        count.increment();
        final JavaFileEvaluator jfe = jfes[i];
        ex.execute(new Runnable() {
            public void run() {
                jfe.aggregate();
            }
        });

    }
    // -------------------------------------
    // this await sometimes fails to wake up
    count.await(); // <---------------------
    // -------------------------------------
    ex.shutdown();
    ex.awaitTermination(0, TimeUnit.MILLISECONDS);
    return jfes;
}
public class JavaFileEvaluator implements Runnable {
    private final File srcFile;
    private final Counters counters = new Counters();
    private final CountUpDown count;
    private final ExecutorService service;
    private List<JavaFileEvaluator> children;
    public JavaFileEvaluator(File srcFile, 
            CountUpDown count, ExecutorService service) {
        this.srcFile = srcFile;
        this.count = count;
        this.service = service;
    }
    public void run() {
        try {
            if (srcFile.isFile()) {
                JavaSourceFactory jsf = new JavaSourceFactory();
                JavaParser jp = new JavaParser(jsf);
                try {
                    counters.add(Constants.FILE_SIZE, srcFile.length());
                    countLines();
                    jp.parse(srcFile);
                    Iterator<?> it = jsf.getJavaSources();
                    while (it.hasNext()) {
                        JavaSource js = (JavaSource)it.next();
                        js.toString();
                        processSource(js);
                    }
                // Some catch clauses here
                }
            } else
            if (srcFile.isDirectory()) {
                processDirectory(srcFile);
            }
        } finally {
            count.decrement();
        }
    }
    public void processSource(JavaSource js) {
        // process source, left out for brevity
    }
    public void processDirectory(File dir) {
        File[] files = dir.listFiles(new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return 
                (pathname.isDirectory() && !pathname.getName().startsWith("CVS") 
                 && !pathname.getName().startsWith("."))
                || (pathname.isFile() && pathname.getName().endsWith(".java") 
                 && pathname.canRead());
            }
        });
        if (files != null) {
            Arrays.sort(files, new Comparator<File>() {
                @Override
                public int compare(File o1, File o2) {
                    if (o1.isDirectory() && o2.isFile()) {
                        return -1;
                    } else
                    if (o1.isFile() && o2.isDirectory()) {
                        return 1;
                    }
                    return o1.getName().compareTo(o2.getName());
                }
            });
            for (File f : files) {
                if (f.isFile()) {
                    counters.add(Constants.FILE, 1);
                } else {
                    counters.add(Constants.DIR, 1);
                }
                JavaFileEvaluator ev = new JavaFileEvaluator(f, count, service);
                if (children == null) {
                    children = new ArrayList<JavaFileEvaluator>();
                }
                children.add(ev);
                count.increment();
                service.execute(ev);
            }
        }
    }
    public Counters getCounters() {
        return counters;
    }
    public boolean hasChildren() {
        return children != null && children.size() > 0;
    }
    public void aggregate() {
        // recursively aggregate non-leaf nodes
        if (!hasChildren()) {
            count.decrement();
            return;
        }
        for (final JavaFileEvaluator e : children) {
            count.increment();
            service.execute(new Runnable() {
                @Override
                public void run() {
                    e.aggregate();
                }
            });
        }
        count.decrement();
    }
}
public class CountUpDown {
    private final Lock lock = new ReentrantLock();
    private final Condition cond = lock.newCondition();
    private final AtomicInteger count = new AtomicInteger();
    public void increment() {
        count.incrementAndGet();
    }
    public void decrement() {
        int value = count.decrementAndGet();
        if (value == 0) {
            lock.lock();
            try {
                cond.signalAll();
            } finally {
                lock.unlock();
            }
        } else
        if (value < 0) {
            throw new IllegalStateException("Counter < 0 :" + value);
        }
    }
    public void await() throws InterruptedException {
        lock.lock();
        try {
            if (count.get() > 0) {
                cond.await();
            }
        } finally {
            lock.unlock();
        }
    }
}

Edit Added the hasChildren() method in JavaSourceEvaluator.

A: 

In the aggregate method of JavaFileEvaluator, count.decrement() is not called in a finally block. If any RuntimeExceptions are thrown inside the aggregate function (possibly in the hasChildren method, which I don't see the body of?), the call to decrement will never happen and CountUpDown will stay in await indefinitely. This may be the cause of the random hang you are seeing.

For the second question, I don't know of any libraries in java for doing this, but I haven't really looked, sorry for the non-answer but this isn't something I've had any opportunity to use before.

As far as the third question goes, I think whether you use a fork-join framework provided by someone else, or continue providing your own concurrency framework, the biggest gain would be in separating the logic that does the work of traversing the directories from the logic involved with managing the parrallelism. The code you provided uses the CountUpDown class to keep track of when all threads are finished, and you end up with calls to increment/decrement sprinkled throughout the methods dealing with the directory traversing, which will lead to nightmares tracking down bugs. Moving to the java7 fork-join framework will force you to create a class that only deals with the actual traversal logic, and leave the concurrency logic up to the framework, which may be a good way for you to go. The other option is to keep going with what you have here, but make a clear delineation between the management logic and the work logic, which would help you track down and fix these sorts of bugs.

joe p
Wow, you are right! I added the missing hasChildren. I'll also wrap the body into a try-finally and will run the code in a loop for a while, thanks! +1. Could you also reflect to the second and third question?
kd304
I haven't found a library either when I first started this code one year ago. It seems this kind of parallel traversal is just outsourced to the fj framework. Thank you.
kd304