views:

414

answers:

3

Hello. I have some problem with CompletionService. My task: to parse parallely for about 300 html pages, i need to wait for all the results only for 5 seconds, then - return the result to main code. I`ve decided to use CompletionService + Callable for this. The question is how to stop all threads, that were caused by CompletionService and return the result from that pages, that were successfully parsed? In this code removed printlines, but i can say that 5 seconds is enough(there are good results, but program wait when all threads will be completed). My code performed about 2 minutes.

My calling code:

Collection<Callable<HCard>> solvers = new ArrayList<Callable<HCard>>();
for (final String currentUrl : allUrls) {
    solvers.add(new Callable<HCard>() {
        public HCard call() throws ParserException {
            HCard hCard = HCardParser.parseOne(currentUrl);                      
            if (hCard != null) {
                return hCard;
            } else {
                return null;
            }
        }
    });
}
ExecutorService execService = Executors.newCachedThreadPool();
Helper helper = new Helper();
List<HCard> result = helper.solve(execService, solvers);
//then i do smth with result list

My called code:

public class Helper {
List<HCard> solve(Executor e, Collection<Callable<HCard>> solvers) throws InterruptedException {
    CompletionService<HCard> cs = new ExecutorCompletionService<HCard>(e);
    int n = solvers.size();

    Future<HCard> future = null;
    HCard hCard = null;
    ArrayList<HCard> result = new ArrayList<HCard>();

    for (Callable<HCard> s : solvers) {
        cs.submit(s);
    }
    for (int i = 0; i < n; ++i) {
        try {
            future = cs.take();
            hCard = future.get();
            if (hCard != null) {
                result.add(hCard);
            }
        } catch (ExecutionException e1) {
            future.cancel(true);
        }
    }
    return result;
}

I attempted to use:

  • awaitTermination(5000, TimeUnit.MILLISECONDS)
  • future.cancel(true)
  • execService.shutdownNow()
  • future.get(5000, TimeUnit.MILLISECONDS);
  • TimeOutException: i can`t get TimeOutException.

Please, help me on context of my code.
Thanks in advance!

+2  A: 

You need to ensure that the tasks that you submit respond properly to interruption, i.e., they check Thread.isInterrupted() or are otherwise deemed "interruptible".

I'm not sure that you need a completion service for this.

ExecutorService service = ...

// Submit all your tasks
for (Task t : tasks) {
    service.submit(t);
}

service.shutdown();

// Wait for termination
boolean success = service.awaitTermination(5, TimeUnit.SECONDS);
if (!success) {
    // awaitTermination timed out, interrupt everyone
    service.shutdownNow();
}

At this point, there is not much you can do if your Task objects don't respond to interruption

Kevin
+1 For mentioning interruptibility: That one bit me in a similar problem I was having (see my profile). Not sure what HCardParser.parseOne is doing, but in my case the function I was calling was unaffected by interrupt.. Using HTTP_Connection timeouts, or what ever it is you're doing, you might have better luck actually making your Callables halt.
Tim
A: 

I never used CompletionService, but I'm sure there is a poll(timeunit,unit) call to do a limited amount wait. Then check for null. Measure the time waited and stop waiting after 5 seconds. Approximately:

public class Helper {
List<HCard> solve(Executor e, Collection<Callable<HCard>> solvers) 
throws InterruptedException {
CompletionService<HCard> cs = new ExecutorCompletionService<HCard>(e);
int n = solvers.size();

Future<HCard> future = null;
HCard hCard = null;
ArrayList<HCard> result = new ArrayList<HCard>();

for (Callable<HCard> s : solvers) {
    cs.submit(s);
}
long timeleft = 5000;
for (int i = 0; i < n; ++i) {
    if (timeleft <= 0) {
        break;
    }
    try {
        long t = System.currentTimeMillis();
        future = cs.poll(timeleft, TimeUnit.MILLISECONDS);
        timeleft -= System.currentTimeMillis() - t;
        if (future != null) {
            hCard = future.get();
            if (hCard != null) {
                result.add(hCard);
            }
        } else {
           break;
        }
    } catch (ExecutionException e1) {
        future.cancel(true);
    }
}
return result;
}

Not tested though.

kd304
A: 

The problem is you always get every single result, so the code will always run to completion. I'd do it with a CountDownLatch as per the code below.

Also, don't use Executors.newCachedThreadPool - chances are this will spawn a lot of threads (up to 300 if your tasks take any amount of time, as the executor won't let the number of idle threads drop to zero).

Classes are all inline to make it easier - paste the whole code block into a class called LotsOfTasks and run it.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class LotsOfTasks {
    private static final int SIZE = 300;

    public static void main(String[] args) throws InterruptedException {

        String[] allUrls = generateUrls(SIZE);

        Collection<Callable<HCard>> solvers = new ArrayList<Callable<HCard>>();
        for (final String currentUrl : allUrls) {
            solvers.add(new Callable<HCard>() {
                public HCard call() {
                    HCard hCard = HCardParser.parseOne(currentUrl);
                    if (hCard != null) {
                        return hCard;
                    } else {
                        return null;
                    }
                }
            });
        }
        ExecutorService execService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // One thread per cpu, ideal for compute-bound
        Helper helper = new Helper();

        System.out.println("Starting..");
        long start = System.nanoTime();
        List<HCard> result = helper.solve(execService, solvers, 5);
        long stop = System.nanoTime();
        for (HCard hCard : result) {
            System.out.println("hCard = " + hCard);
        }

        System.out.println("Took: " + TimeUnit.SECONDS.convert((stop - start), TimeUnit.NANOSECONDS) + " seconds");
    }

    private static String[] generateUrls(final int size) {
        String[] urls = new String[size];
        for (int i = 0; i < size; i++) {
            urls[i] = "" + i;
        }
        return urls;
    }

    private static class HCardParser {
        private static final Random random = new Random();

        public static HCard parseOne(String currentUrl) {
            try {
                Thread.sleep(random.nextInt(1000)); // Wait for a random time up to 1 seconds per task (simulate some activity)
            } catch (InterruptedException e) {
                // ignore
            }
            return new HCard(currentUrl);
        }
    }

    private static class HCard {
        private final String currentUrl;

        public HCard(String currentUrl) {
            this.currentUrl = currentUrl;
        }

        @Override
        public String toString() {
            return "HCard[" + currentUrl + "]";
        }
    }

    private static class Helper {
        List<HCard> solve(ExecutorService e, Collection<Callable<HCard>> solvers, int timeoutSeconds) throws InterruptedException {

            final CountDownLatch latch = new CountDownLatch(solvers.size());

            final ConcurrentLinkedQueue<HCard> executionResults = new ConcurrentLinkedQueue<HCard>();

            for (final Callable<HCard> s : solvers) {
                e.submit(new Callable<HCard>() {
                    public HCard call() throws Exception {
                        try {
                            executionResults.add(s.call());
                        } finally {
                            latch.countDown();
                        }
                        return null;
                    }
                });
            }

            latch.await(timeoutSeconds, TimeUnit.SECONDS);

            final List<Runnable> unfinishedTasks = e.shutdownNow();
            System.out.println("There were " + unfinishedTasks.size() + " urls not processed");

            return Arrays.asList(executionResults.toArray(new HCard[executionResults.size()]));
        }
    }
}

Typical output on my system looks something like this:

Starting..
There were 279 urls not processed
hCard = HCard[0]
hCard = HCard[1]
hCard = HCard[2]
hCard = HCard[3]
hCard = HCard[5]
hCard = HCard[4]
hCard = HCard[6]
hCard = HCard[8]
hCard = HCard[7]
hCard = HCard[10]
hCard = HCard[11]
hCard = HCard[9]
hCard = HCard[12]
hCard = HCard[14]
hCard = HCard[15]
hCard = HCard[13]
hCard = HCard[16]
hCard = HCard[18]
hCard = HCard[17]
hCard = HCard[20]
hCard = HCard[19]
Took: 5 seconds
Darren