I have created a concurrent, recursive directory traversal and file processing program, which sometimes hangs after all parallel computations have finished but the 'primary' thread never continues with other tasks.
The code is basically a fork-join style concurrent aggregator, and after the parallel aggregation completes, it should display the results in a Swing window. The trouble with the aggregation is that it needs to generate a tree and aggregate the statistics of the leaf nodes up in the hierarchy.
I'm sure I made a concurrency mistake but can't find it. I included the relevant part of my code at the end of the post (code comments removed for brevity, sorry for the 150 lines, If required, I could move it to an external location).
Context: Java 6u13, Windows XP SP3, Core 2 Duo CPU.
My questions are:
What could be the cause for this random hang?
Is there a better way of doing a concurrent directory traversal, perhaps in a form of an already existing library?
Would be the fork-join framework from Doug Lea (or Java 7) a better framework for the aggregation / directory traversal, and if so, how should I rethink my implementation - in the conceptional level?
Thank you for your time.
And the code excerpt:
private static JavaFileEvaluator[] processFiles(File[] files)
throws InterruptedException {
CountUpDown count = new CountUpDown();
ThreadPoolExecutor ex = (ThreadPoolExecutor)Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
JavaFileEvaluator[] jfes = new JavaFileEvaluator[files.length];
for (int i = 0; i < jfes.length; i++) {
count.increment();
jfes[i] = new JavaFileEvaluator(files[i], count, ex);
ex.execute(jfes[i]);
}
count.await();
for (int i = 0; i < jfes.length; i++) {
count.increment();
final JavaFileEvaluator jfe = jfes[i];
ex.execute(new Runnable() {
public void run() {
jfe.aggregate();
}
});
}
// -------------------------------------
// this await sometimes fails to wake up
count.await(); // <---------------------
// -------------------------------------
ex.shutdown();
ex.awaitTermination(0, TimeUnit.MILLISECONDS);
return jfes;
}
public class JavaFileEvaluator implements Runnable {
private final File srcFile;
private final Counters counters = new Counters();
private final CountUpDown count;
private final ExecutorService service;
private List<JavaFileEvaluator> children;
public JavaFileEvaluator(File srcFile,
CountUpDown count, ExecutorService service) {
this.srcFile = srcFile;
this.count = count;
this.service = service;
}
public void run() {
try {
if (srcFile.isFile()) {
JavaSourceFactory jsf = new JavaSourceFactory();
JavaParser jp = new JavaParser(jsf);
try {
counters.add(Constants.FILE_SIZE, srcFile.length());
countLines();
jp.parse(srcFile);
Iterator<?> it = jsf.getJavaSources();
while (it.hasNext()) {
JavaSource js = (JavaSource)it.next();
js.toString();
processSource(js);
}
// Some catch clauses here
}
} else
if (srcFile.isDirectory()) {
processDirectory(srcFile);
}
} finally {
count.decrement();
}
}
public void processSource(JavaSource js) {
// process source, left out for brevity
}
public void processDirectory(File dir) {
File[] files = dir.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return
(pathname.isDirectory() && !pathname.getName().startsWith("CVS")
&& !pathname.getName().startsWith("."))
|| (pathname.isFile() && pathname.getName().endsWith(".java")
&& pathname.canRead());
}
});
if (files != null) {
Arrays.sort(files, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
if (o1.isDirectory() && o2.isFile()) {
return -1;
} else
if (o1.isFile() && o2.isDirectory()) {
return 1;
}
return o1.getName().compareTo(o2.getName());
}
});
for (File f : files) {
if (f.isFile()) {
counters.add(Constants.FILE, 1);
} else {
counters.add(Constants.DIR, 1);
}
JavaFileEvaluator ev = new JavaFileEvaluator(f, count, service);
if (children == null) {
children = new ArrayList<JavaFileEvaluator>();
}
children.add(ev);
count.increment();
service.execute(ev);
}
}
}
public Counters getCounters() {
return counters;
}
public boolean hasChildren() {
return children != null && children.size() > 0;
}
public void aggregate() {
// recursively aggregate non-leaf nodes
if (!hasChildren()) {
count.decrement();
return;
}
for (final JavaFileEvaluator e : children) {
count.increment();
service.execute(new Runnable() {
@Override
public void run() {
e.aggregate();
}
});
}
count.decrement();
}
}
public class CountUpDown {
private final Lock lock = new ReentrantLock();
private final Condition cond = lock.newCondition();
private final AtomicInteger count = new AtomicInteger();
public void increment() {
count.incrementAndGet();
}
public void decrement() {
int value = count.decrementAndGet();
if (value == 0) {
lock.lock();
try {
cond.signalAll();
} finally {
lock.unlock();
}
} else
if (value < 0) {
throw new IllegalStateException("Counter < 0 :" + value);
}
}
public void await() throws InterruptedException {
lock.lock();
try {
if (count.get() > 0) {
cond.await();
}
} finally {
lock.unlock();
}
}
}
Edit Added the hasChildren() method in JavaSourceEvaluator.