views:

91

answers:

4

Hi,

I have some code where i execute a several tasks using Executors and a Blocking Queue. The results have to be returned as an iterator because that is what the application that i work on expects. However, there is a 1:N relationship between the task and the results added to the queue, so i cannot use the ExecutorCompletionService. While calling hasNext(), i need to know when all the tasks have finished and added all the results to the queue, so that i can stop the retrieval of results from the queue. Note, that once items are put on the queue, another thread should be ready to consume (Executor.invokeAll(), blocks until all tasks have completed, which is not what i want, nor a timeout). This was my first attempt, i am using an AtomicInteger just to demonstrate the point even though it will not work. Could someone help me in undestanding how i can solve this issue?

public class ResultExecutor<T> implements Iterable<T> {
    private BlockingQueue<T> queue;
    private Executor executor;
    private AtomicInteger count;

    public ResultExecutor(Executor executor) {
        this.queue = new LinkedBlockingQueue<T>();
        this.executor = executor;
        count = new AtomicInteger();            
    }

    public void execute(ExecutorTask task) {
        executor.execute(task);
    }

    public Iterator<T> iterator() {
        return new MyIterator();
    }

    public class MyIterator implements Iterator<T> {
        private T current;          
        public boolean hasNext() {
            if (count.get() > 0 && current == null)
            {
                try {
                    current = queue.take();
                    count.decrementAndGet();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return current != null;
        }

        public T next() {
            final T ret = current;
            current = null;
            return ret;
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }

    }

    public class ExecutorTask implements Runnable{
        private String name;

        public ExecutorTask(String name) {
            this.name = name;

        }

         private int random(int n)
         {
           return (int) Math.round(n * Math.random());
         }


        @SuppressWarnings("unchecked")
        public void run() {
            try {
                int random = random(500);
                Thread.sleep(random);
                queue.put((T) (name + ":" + random + ":1"));
                queue.put((T) (name + ":" + random + ":2"));
                queue.put((T) (name + ":" + random + ":3"));
                queue.put((T) (name + ":" + random + ":4"));
                queue.put((T) (name + ":" + random + ":5"));

                count.addAndGet(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }                   
        }           
    }       

}

And the calling code looks like:

    Executor e = Executors.newFixedThreadPool(2);
    ResultExecutor<Result> resultExecutor = new ResultExecutor<Result>(e);

    resultExecutor.execute(resultExecutor.new ExecutorTask("A"));
    resultExecutor.execute(resultExecutor.new ExecutorTask("B"));

    Iterator<Result> iter = resultExecutor.iterator();
    while (iter.hasNext()) {
        System.out.println(iter.next());
    }
+1  A: 

I believe a Future is what you're looking for. It allows you to associate asynchronous tasks with a result object, and query the status of that result. For each task you begin, keep a reference to its Future and use that to determine whether or not it has completed.

purecharger
I understand what a future is and how it works, but within the context of the case i presented, the answer is too vague.
Asim
+1  A: 

If I understand your problem correctly (which I'm not sure I do), you can prevent an infinite wait on an empty queue by using [BlockingQueue.poll][1] instead of take(). This lets you specify a timeout, after which time null will be returned if the queue is empty.

If you drop this straight into your hasNext implementation (with an appropriately short timeout), the logic will be correct. An empty queue will return false while a queue with entities remaining will return true.

[1]: http://java.sun.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#poll(long, java.util.concurrent.TimeUnit)

Andrzej Doyle
Not sure i want to take the polling approach, on a system such as ours, if the timeout is too short and one or more of the tasks are still running then the results could be inconsistent. If the timeout is too long then that could lead to performance issues especially since these tasks are hitting an extrnal datasource.
Asim
+1  A: 

Use "poison" objects in the Queue to signal that a task will provide no more results.

class Client
{

  public static void main(String... argv)
    throws Exception
  {
    BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
    ExecutorService workers = Executors.newFixedThreadPool(2);
    workers.execute(new ExecutorTask("A", queue));
    workers.execute(new ExecutorTask("B", queue));
    Iterator<String> results = 
      new QueueMarkersIterator<String>(queue, ExecutorTask.MARKER, 2);
    while (results.hasNext())
      System.out.println(results.next());
  }

}

class QueueMarkersIterator<T>
  implements Iterator<T>
{

  private final BlockingQueue<? extends T> queue;

  private final T marker;

  private int count;

  private T next;

  QueueMarkersIterator(BlockingQueue<? extends T> queue, T marker, int count)
  {
    this.queue = queue;
    this.marker = marker;
    this.count = count;
    this.next = marker;
  }

  public boolean hasNext()
  {
    if (next == marker)
      next = nextImpl();
    return (next != marker);
  }

  public T next()
  {
    if (next == marker)
      next = nextImpl();
    if (next == marker)
      throw new NoSuchElementException();
    T tmp = next;
    next = marker;
    return tmp;
  }

  /*
   * Block until the status is known. Interrupting the current thread 
   * will cause iteration to cease prematurely, even if elements are 
   * subsequently queued.
   */
  private T nextImpl()
  {
    while (count > 0) {
      T o;
      try {
        o = queue.take();
      }
      catch (InterruptedException ex) {
        count = 0;
        Thread.currentThread().interrupt();
        break;
      }
      if (o == marker) {
        --count;
      }
      else {
        return o;
      }
    }
    return marker;
  }

  public void remove()
  {
    throw new UnsupportedOperationException();
  }

}

class ExecutorTask
  implements Runnable
{

  static final String MARKER = new String();

  private static final Random random = new Random();

  private final String name;

  private final BlockingQueue<String> results;

  public ExecutorTask(String name, BlockingQueue<String> results)
  {
    this.name = name;
    this.results = results;
  }

  public void run()
  {
    int random = ExecutorTask.random.nextInt(500);
    try {
      Thread.sleep(random);
    }
    catch (InterruptedException ignore) {
    }
    final int COUNT = 5;
    for (int idx = 0; idx < COUNT; ++idx)
      results.add(name + ':' + random + ':' + (idx + 1));
    results.add(MARKER);
  }

}
erickson
As i understand invokeAll() blocks until all the tasks are completed which is not what i want. I have updated my post to try and clarify what i really want.
Asim
Yes that works quite well, i have also posted a version that uses a non-blocking queue + wait/notify.
Asim
A: 

Here is an alternate solution that uses a non-blocking queue with wait/notify, AtomicInteger and a callback.

public class QueueExecutor implements CallbackInterface<String> {

    public static final int NO_THREADS = 26;

    private Object syncObject = new Object();
    private AtomicInteger count;
    Queue<String> queue = new LinkedList<String>();

    public void execute() {
        count = new AtomicInteger(NO_THREADS);
        ExecutorService executor = Executors.newFixedThreadPool(NO_THREADS/2);
        for(int i=0;i<NO_THREADS;i++)
            executor.execute(new ExecutorTask<String>("" + (char) ('A'+i), queue, this));

        Iterator<String> iter = new QueueIterator<String>(queue, count);
        int count = 0;
        while (iter.hasNext()) {

            System.out.println(iter.next());
            count++;
        }

        System.out.println("Handled " + count + " items");
    }

    public void callback(String result) {
        System.out.println(result);
        count.decrementAndGet();
        synchronized (syncObject) {
            syncObject.notify();
        }
    }

    public class QueueIterator<T> implements Iterator<T> {
        private Queue<T> queue;
        private AtomicInteger count;

        public QueueIterator(Queue<T> queue, AtomicInteger count) {
            this.queue = queue;
            this.count = count;
        }

        public boolean hasNext() {          
            while(true) {                   
                synchronized (syncObject) {
                    if(queue.size() > 0)
                        return true;

                    if(count.get() == 0)
                        return false;

                    try {
                        syncObject.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        public T next() {

            synchronized (syncObject) {
                if(hasNext())
                    return queue.remove();
                else
                    return null;
            }
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }

    }

    class ExecutorTask<T> implements Runnable {
        private String name;
        private Queue<T> queue;
        private CallbackInterface<T> callback;

        public ExecutorTask(String name, Queue<T> queue,
                CallbackInterface<T> callback) {
            this.name = name;
            this.queue = queue;
            this.callback = callback;
        }

        @SuppressWarnings("unchecked")
        public void run() {
            try {
                Thread.sleep(1000);
                                    Random randomX = new Random();
                for (int i = 0; i < 5; i++) {
                    synchronized (syncObject) {
                        Thread.sleep(randomX.nextInt(10)+1);

                        queue.add((T) (name + ":" + ":" + i));
                        syncObject.notify();
                    }
                }

                callback.callback((T) (name + ": Done"));

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

public interface CallbackInterface<T> {
    void callback(T result);
}

And the calling code is simply:

    QueueExecutor exec = new QueueExecutor();
    exec.execute();
Asim