views:

307

answers:

3

I've had the same need a few times now and wanted to get other thoughts on the right way to structure a solution. The need is to perform some operation on many elements on many threads without needing to have all elements in memory at once, just the ones under computation. As in, Iterables.partition is insufficient because it brings all elements into memory up front.

Expressing it in code, I want to write a BulkCalc2 that does the same thing as BulkCalc1, just in parallel. Below is sample code that illustrates my best attempt. I'm not satisfied because it's big and ugly, but it does seem to accomplish my goals of keeping threads highly utilized until the work is done, propagating any exceptions during computation, and not having more than numThreads instances of BigThing necessarily in memory at once.

I'll accept the answer which meets the stated goals in the most concise way, whether it's a way to improve my BulkCalc2 or a completely different solution.

interface BigThing {

    int getId();

    String getString();
}

class Calc {

    // somewhat expensive computation
    double calc(BigThing bigThing) {
        Random r = new Random(bigThing.getString().hashCode());
        double d = 0;
        for (int i = 0; i < 100000; i++) {
            d += r.nextDouble();
        }
        return d;
    }
}

class BulkCalc1 {

    final Calc calc;

    public BulkCalc1(Calc calc) {
        this.calc = calc;
    }

    public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
        TreeMap<Integer, Double> results = Maps.newTreeMap();
        while (in.hasNext()) {
            BigThing o = in.next();
            results.put(o.getId(), calc.calc(o));
        }
        return results;
    }
}

class SafeIterator<T> {

    final Iterator<T> in;

    SafeIterator(Iterator<T> in) {
        this.in = in;
    }

    synchronized T nextOrNull() {
        if (in.hasNext()) {
            return in.next();
        }
        return null;
    }
}

class BulkCalc2 {

    final Calc calc;
    final int numThreads;

    public BulkCalc2(Calc calc, int numThreads) {
        this.calc = calc;
        this.numThreads = numThreads;
    }

    public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
        ExecutorService e = Executors.newFixedThreadPool(numThreads);
        List<Future<?>> futures = Lists.newLinkedList();

        final Map<Integer, Double> results = new MapMaker().concurrencyLevel(numThreads).makeMap();
        final SafeIterator<BigThing> it = new SafeIterator<BigThing>(in);
        for (int i = 0; i < numThreads; i++) {
            futures.add(e.submit(new Runnable() {

                @Override
                public void run() {
                    while (true) {
                        BigThing o = it.nextOrNull();
                        if (o == null) {
                            return;
                        }
                        results.put(o.getId(), calc.calc(o));
                    }
                }
            }));
        }

        e.shutdown();

        for (Future<?> future : futures) {
            try {
                future.get();
            } catch (InterruptedException ex) {
                // swallowing is OK
            } catch (ExecutionException ex) {
                throw Throwables.propagate(ex.getCause());
            }
        }

        return new TreeMap<Integer, Double>(results);
    }
}
A: 

Edit: modified, faster version

Notes: This is actually LESS concise, but should run significantly faster. To run on an iterator you call the static methodBulkCalcRunner.runBulkCalc(Iterator,Calc) or specify a number of threads. Clean, fairly concise, and probably the fastest solution you can get.

Reasons this is faster:

  • Results are collected in a thread-local HashMap -- no synchronization required for collecting them. Otherwise, synchronization would be required for storing each result. This improves scaling per-thread, and provides better locality of reference (your HashMaps can live entirely in L2 cache for each processor, no need to communicate).
  • HashMap used in place of less efficient Map collections
  • Errors are bundled up into a collection for later handling. With a thread pool, every Exception requires a thread to die and be recreated

    interface BigThing { int getId(); String getString(); }

    class Calc {
        // somewhat expensive computation
        double calc(BigThing bigThing) {
            Random r = new Random(bigThing.getString().hashCode());
            double d = 0;
            for (int i = 0; i < 100000; i++) {
                d += r.nextDouble();
            }
            return d;
        }
    }
    
    
    static class BulkCalcRunner implements Runnable {
        Calc calc;
        CountDownLatch latch;
        Iterator<BigThing> it;
        Collection<Throwable> errors;
        Map<Integer,Double> results;
    
    
    
    public BulkCalcRunner (Calc calc, Iterator&lt;BigThing&gt; it, CountDownLatch latch, Map&lt;Integer,Double&gt; results, Collection&lt;Throwable&gt; errors) {
        this.calc = calc;
        this.latch = latch;
        this.errors = errors;
        this.results = results;
    }
    
    
    public void run() {
        ArrayList&lt;Throwable&gt; errorLocal = new ArrayList&lt;Throwable&gt;();
        HashMap&lt;Integer,Double&gt; resultsLocal = new HashMap&lt;Integer,Double&gt;();
        while (true) {
            BigThing thing = null;
            try {
                synchronized (it) {
                    if (it.hasNext()) {
                        thing = it.next();
                    }
                }
            } catch (Exception e) { //prevents iterator errors from causing endless loop
                thing = null;
            }
            //finished when first null BigThing encountered
            if (thing == null) {
                synchronized (errors) {
                    errors.addAll(errorLocal);
                }
                synchronized(results) {
                    results.putAll(resultsLocal);
                }
                latch.countDown();
                break;
            }
            try {
                resultsLocal.put(thing.getId(), calc.calc(thing));
            } catch (Exception e) {
                errorLocal.add(e);
            }
        }
    }
    
    
    public static Map&lt;Integer,Double&gt; runBulkCalc(Iterator&lt;BigThing&gt; iterator, Calc calculation, int numThreads) {
        final ConcurrentHashMap&lt;Integer, Double&gt; results = new ConcurrentHashMap&lt;Integer, Double&gt;();
        final ArrayList&lt;Throwable&gt; errors = new ArrayList&lt;Throwable&gt;();
        final CountDownLatch latch = new CountDownLatch(numThreads);
    
    
        //start up the worker threads
        for (int i = 0; i &lt; numThreads; i++) {
            new Thread(new BulkCalcRunner(calculation,iterator,latch, results, errors)).start();
        }
    
    
        try {
            //Latch waits for all the worker threads to check in as "done"
            latch.await();
        } catch (InterruptedException ex) {
            // swallowing is better than spitting it out...
        }
    
    
        //finally, propagate errors!
        for (Throwable th : errors) {
            throw Throwables.propagate(th.getCause());
        }
        return results;
    }
    
    
    public static Map&lt;Integer,Double&gt; runBulkCalc(Iterator&lt;BigThing&gt; iterator, Calc calculation) {
        return runBulkCalc(iterator,calculation,Runtime.getRuntime().availableProcessors());
    }
    
    }
BobMcGee
BulkCalcAwesome.calc throws this exception when I run it:Exception in thread "main" java.util.concurrent.RejectedExecutionException at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1760) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:78)
Brian Harris
Oh, bugger. I was expecting it to block on submit until Queue space opened up. Let me try another approach tomorrow. By the way, can you say anything about how the Iterator works? If it's pulling from a collection of lazy-loading entities or something, there may be a better way...
BobMcGee
Nothing is known about the Iterator given to BulkCalc. It may or may not be thread-safe or lazy-loading.
Brian Harris
I like the use of CountDownLatch in this edited approach! It's not the most concise solution as you say, but it does seem to meet the stated goals. I doubt it's significantly faster as you claim, but perhaps a micro-benchmark could demonstrate? Thanks for the submission!
Brian Harris
Thanks. If you want to thank for the work, you might vote this up? Also, I should clarify "much faster". The work *Calc* does can't run any faster (duh), but the threaded portion here minimizes time spent in synchronized blocks, and minimizes the number of times locks are acquired (only a few times per thread for errors, and once on the iterator). This translates into more efficient parallelization. I'll post up a second version that is more succinct but demonstrates less efficient synchronization.
BobMcGee
A: 

While I cannot figure a way to improve the design, at least we can pull out the generic component into a utility class. With the threading code pulled out, BulkCalc3 is sufficiently concise.

class BulkCalc3
{
    final Calc calc;

    public BulkCalc3(Calc calc)
    {
        this.calc = calc;
    }

    public TreeMap<Integer, Double> calc(Iterator<BigThing> in)
    {
        final ConcurrentMap<Integer, Double> resultMap = new MapMaker().makeMap();
        ThreadedIteratorProcessor<BigThing> processor = new ThreadedIteratorProcessor<BigThing>();
        processor.processIterator(in, new ThreadedIteratorProcessor.ElementProcessor<BigThing>()
        {
            @Override
            public void processElement(BigThing o)
            {
                resultMap.put(o.getId(), calc.calc(o));
            }
        });
        return new TreeMap<Integer, Double>(resultMap);
    }
}

Here's the utility class:

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * A utility class to process each element in an iterator in an efficient manner.
 */
public class ThreadedIteratorProcessor<T>
{
    public static interface ElementProcessor<T>
    {
        /**
         * Process an element.
         * @param element The element to process.
         */
        public void processElement(T element);
    }
    private final int numThreads;

    /**
     * Create an instance which uses a specified number of threads.
     * @param numThreads The number of processing threads.
     */
    public ThreadedIteratorProcessor(int numThreads)
    {
        this.numThreads = numThreads;
    }

    /**
     * Create an instance which uses a number of threads equal to the number of system processors.
     */
    public ThreadedIteratorProcessor()
    {
        this(Runtime.getRuntime().availableProcessors());
    }

    /**
     * Process each element in an iterator in parallel.  The number of worker threads depends on how this object was
     * constructed.  This method will re-throw any exception thrown in the supplied ElementProcessor.  An element will
     * not be requested from the iterator any earlier than is absolutely necessary.  In other words, the last element in
     * the iterator will not be consumed until all of the other elements are completely processed, excluding elements
     * currently being processed by the worker threads.
     * @param iterator The iterator from which to get elements.  This iterator need not be thread-safe.
     * @param elementProcessor The element processor.
     */
    public void processIterator(Iterator<T> iterator, ElementProcessor<T> elementProcessor)
    {
        // Use an ExecutorService for proper exception handling.
        ExecutorService e = Executors.newFixedThreadPool(numThreads, MoreExecutors.daemonThreadFactory());
        List<Future<?>> futures = Lists.newLinkedList();

        // Get a thread-safe iterator
        final SafeIterator<T> safeIterator = new SafeIterator<T>(iterator);

        // Submit numThreads new worker threads to pull work from the iterator.
        for (int i = 0; i < numThreads; i++)
        {
            futures.add(e.submit(new Consumer<T>(safeIterator, elementProcessor)));
        }

        e.shutdown();

        // Calling .get() on the futures accomplishes two things:
        // 1. awaiting completion of the work
        // 2. discovering an exception during calculation, and rethrowing to the client in this thread.
        for (Future<?> future : futures)
        {
            try
            {
                future.get();
            }
            catch (InterruptedException ex)
            {
                // swallowing is OK
            }
            catch (ExecutionException ex)
            {
                // Re-throw the underlying exception to the client.
                throw Throwables.propagate(ex.getCause());
            }
        }
    }

    // A runnable that sits in a loop consuming and processing elements from an iterator.
    private static class Consumer<T> implements Runnable
    {
        private final SafeIterator<T> it;
        private final ElementProcessor<T> elementProcessor;

        public Consumer(SafeIterator<T> it, ElementProcessor<T> elementProcessor)
        {
            this.it = it;
            this.elementProcessor = elementProcessor;
        }

        @Override
        public void run()
        {
            while (true)
            {
                T o = it.nextOrNull();
                if (o == null)
                {
                    return;
                }
                elementProcessor.processElement(o);
            }
        }
    }

    // a thread-safe iterator-like object.
    private static class SafeIterator<T>
    {
        private final Iterator<T> in;

        SafeIterator(Iterator<T> in)
        {
            this.in = in;
        }

        synchronized T nextOrNull()
        {
            if (in.hasNext())
            {
                return in.next();
            }
            return null;
        }
    }
}
Brian Harris
I would have appreciated you waiting for my results (see below) before submitting and accepting your own. I think my result is cleaner and less classy, and I can guarantee it is faster.
BobMcGee
Oops, hope I didn't discourage you! I believe this site allows me to change the accepted answer as new ones roll in. Seeing as I get notified on new answers and comments, I'll be able to evaluate each submission as they come in. Fair enough?Let's keep discussion related to your submission in the comments under your submission.
Brian Harris
I don't think you can switch your accepted answer unless you do it quickly (it'll let you toggle it on/off for maybe an hour or two). You can however vote answers up/down...
BobMcGee
I should be clearer: the producer-consumer pattern is useful, but you don't really need the extra classes here unless you're reusing instances or extending the classes. I think the logical, succinct approach is a single function that takes an `Iterator<BigThing>` and a `Calc` type, with optional threading number. If you're piping something through multiple stages of processing, it's worth keeping the SafeIterator probably, but some of the other interfaces, etc probably won't get used.
BobMcGee
A: 

And, the succinct way: (slower, not as robust or clean,but still fairly OK)

33 lines for the computation, all in one method. Not as efficient due to un-needed synchronization, and (unlike the above) it loses a thread (and must make a new one) with each exception in processing. The previous one I posted just collects up all the exceptions into a tidy bundle for later handling. That improved performance if exceptions occur sometimes, because threads are moderately expensive to create.

/** More succinct */
public static Map<Integer, Double> bulkCalcSuccincter(final Iterator<BigThing> it, final Calc calc, final int numThreads) {
    final ConcurrentHashMap<Integer, Double> results = new ConcurrentHashMap<Integer, Double>();
    final java.util.List<Future> futures = new ArrayList<Future>();
    final ExecutorService e = Executors.newFixedThreadPool(numThreads);

    for (int i = 0; i < numThreads; i++) {
        futures.add(e.submit(new Runnable() {
            public void run() {
                while (true) {
                    BigThing thing = null;
                    synchronized (it) {
                        thing = (it.hasNext()) ? it.next() : null;
                    }
                    if (thing == null) {
                        break;
                    }
                    results.put(thing.getId(), calc.calc(thing));
                }
            }
        }));
    }
    e.shutdown();

    for (Future f : futures) {
        try {
            f.get();
        } catch (InterruptedException ex) {
        // swallowing is better than spitting it out
        } catch (ExecutionException ex) {
            throw Throwables.propagate(ex.getCause());
        }
    }
    return results;
}
BobMcGee