views:

1433

answers:

2

Here is a simplified version of my application showing what I'm doing.

/*
in my app's main():

 Runner run = new Runner();

 run.dowork();

*/

class Runner
{
 private int totalWorkers = 2;
 private int workersDone = 0;

 public synchronized void workerDone()
 {
  workersDone++;
  notifyAll();
 }

 public synchronized void dowork()
 {
  workersDone = 0;

  //<code for opening a file here, other setup here, etc>

  Worker a = new Worker(this);
  Worker b = new Worker(this);

  while ((line = reader.readLine()) != null)
  {
   //<a large amount of processing on 'line'>

   a.setData(line);
   b.setData(line);

   while (workersDone < totalWorkers)
   {
    wait();
   }    
  }
 }
}

class Worker implements Runnable
{
 private Runner runner;
 private String data;

 public Worker(Runner r)
 {
  this.runner = r;
  Thread t = new Thread(this);
  t.start();
 }

 public synchronized void setData(String s)
 {
  this.data = s;
  notifyAll();
 }

 public void run
 {
  while (true)
  {
   synchronized(this)
   {
    wait();

    //<do work with this.data here>

    this.runner.workerDone();
   }
  }
 }
}

The basic concept here is that I have a bunch of workers which all do some processing on an incoming line of data, all independently, and write out the data wherever they like - they do not need to report any data back to the main thread or share data with each other.

The problem that I'm having is that this code deadlocks. I'm reading a file of over 1 million lines and I'm lucky to get 100 lines into it before my app stops responding.

The workers, in reality, all do differing amounts of work so I want to wait until they all complete before moving to the next line.

I cannot let the workers process at different speeds and queue the data internally because the files I am processing are too large for this and won't fit in memory.

I cannot give each worker its own FileReader to independently get 'line', because I do a ton of processing on the line before the workers see it, and do not want to have to re-do the processing in each worker.

I know I'm missing some fairly simple aspect of synchronization in Java but I'm stuck at this point. If someone could explain what I'm doing wrong here I would appreciate it. I believe I'm misunderstanding some aspect of the synchronization but I'm out of ideas for attempting to fix it.

A: 

IMHO you have improperly placed "workersDone = 0".

public synchronized void dowork()
        {
                // workersDone = 0;

                //<code for opening a file here, other setup here, etc>

                Worker a = new Worker(this);
                Worker b = new Worker(this);

                while ((line = reader.readLine()) != null)
                {
                        workersDone = 0;

                        //<a large amount of processing on 'line'>

                        a.setData(line);
                        b.setData(line);

                        while (workersDone < totalWorkers)
                        {
                                wait();
                        }                               
                }
        }
Dmitry Khalatov
I had workersDone in the wrong place for the example I made; in my actual code, it looks like your change.
gdm
+3  A: 

Working directly with synchronized, wait(), and notify() is definitely tricky.

Fortunately the Java Concurrency API provides some excellent control objects for this sort of thing that are much more intuitive. In particular, look at CyclicBarrier and CountDownLatch; one of them almost certainly will be what you're looking for.

You may also find a ThreadPoolExecutor to be handy for this situation.

Here's a simple example / conversion of your snippet that produces the following output (without deadlock, of course):

Read line: Line 1
Waiting for work to be complete on line: Line 1
Working on line: Line 1
Working on line: Line 1
Read line: Line 2
Waiting for work to be complete on line: Line 2
Working on line: Line 2
Working on line: Line 2
Read line: Line 3
Waiting for work to be complete on line: Line 3
Working on line: Line 3
Working on line: Line 3
All work complete!

public class Runner
{

    public static void main(String args[]) {
        Runner r = new Runner();
        try {
            r.dowork();
        } catch (IOException e) {
            // handle
            e.printStackTrace();
        }
    }

    CyclicBarrier barrier;
    ExecutorService executor;
    private int totalWorkers = 2;

    public Runner() {
        this.barrier = new CyclicBarrier(this.totalWorkers + 1);
        this.executor = Executors.newFixedThreadPool(this.totalWorkers);
    }

    public synchronized void dowork() throws IOException
    {
        //<code for opening a file here, other setup here, etc>
        //BufferedReader reader = null;
        //String line;

        final Worker worker = new Worker();

        for(String line : new String[]{"Line 1", "Line 2", "Line 3"})
        //while ((line = reader.readLine()) != null)
        {
            System.out.println("Read line: " + line);
            //<a large amount of processing on 'line'>

            for(int c = 0; c < this.totalWorkers; c++) {
                final String curLine = line;
                this.executor.submit(new Runnable() {
                    public void run() {
                        worker.doWork(curLine);
                    }
                });
            }

            try {
                System.out.println("Waiting for work to be complete on line: " + line);
                this.barrier.await();
            } catch (InterruptedException e) {
                // handle
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // handle
                e.printStackTrace();
            }
        }

        System.out.println("All work complete!");
    }

    class Worker
    {
        public void doWork(String line)
        {
            //<do work with this.data here>
            System.out.println("Working on line: " + line);

            try {
                Runner.this.barrier.await();
            } catch (InterruptedException e) {
                // handle
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // handle
                e.printStackTrace();
            }
        }
    }    
}
Greg Case
CyclicBarrier was incredibly easy to implement and seems to have worked perfectly. I've worked with wait/notify before but not in a situation like this, and using a barrier is definitely easier here. Thanks.
gdm
Agreed. I have weathered using wait/notify as well, but it's a real pain to debug etc. Since I started using the Concurrency API I have yet to find a situation where wait/notify would have been more intuitive than one of the concurrency utility constructs.
Greg Case