views:

132

answers:

1

It's a bit complicated to explain but here we go. Basically, the issue is "How to break up problems into subproblems in an efficient way". "Efficient" here means, the broken up subproblems are as big as possible. Basically, it would be ideal if I don't have to break up the problems at all. However, because an worker can only work on specific blocks of the problems, I do need to break up. But I want to find a way to do this as coarse as possible.

Here is some pseudo code..

We have problems like this (Sorry it's in Java. If you don't understand, I'd be glad to explain).

class Problem {
    final Set<Integer> allSectionIds = { 1,2,4,6,7,8,10 };
    final Data data = //Some data
}

And a subproblem is:

class SubProblem {
    final Set<Integer> targetedSectionIds;
    final Data data;

    SubProblem(Set<Integer> targetedSectionsIds, Data data){
        this.targetedSectionIds = targetedSectionIds;
        this.data = data;
    }
}

Work will look like this, then.

class Work implements Runnable {
    final Set<Section> subSections;
    final Data data;
    final Result result;

    Work(Set<Section> subSections, Data data) {
        this.sections = SubSections;
        this.data = data;
    }

    @Override
    public void run(){
        for(Section section : subSections){
            result.addUp(compute(data, section));
        }
    }
}

Now we have instances of 'Worker', that have their own state sections I have.

class Worker implements ExecutorService {
    final Map<Integer,Section> sectionsIHave;
    {
        sectionsIHave = {1:section1, 5:section5, 8:section8 };
    }

    final ExecutorService executor = //some executor.

    @Override
    public void execute(SubProblem problem){
        Set<Section> sectionsNeeded = fetchSections(problem.targetedSectionIds);
        super.execute(new Work(sectionsNeeded, problem.data);
    }

}

phew.

So, we have a lot of Problems and Workers are constantly asking for more SubProblems. My task is to break up Problems into SubProblem and give it to them. The difficulty is however, that I have to later collect all the results for the SubProblems and merge (reduce) them into a Result for the whole Problem.

This is however, costly, so I want to give the workers "chunks" that are as big as possible (has as many targetedSections as possible).

It doesn't have to be perfect (mathematically as efficient as possible or something). I mean, I guess that it is impossible to have a perfect solution, because you can't predict how long each computation will take, etc.. But is there a good heuristic solution for this? Or maybe some resources I can read up before I go into designing?

Any advice is highly appreciated!

EDIT: We have control over the Section Allocation, too, so controlling that is another options. Basically, the only restriction on this is that a worker can only have a fixed number of sections.

+1  A: 

Okay, it appears that you have a sharding model for you network services, we do something similar and we use a reverse index of "entityId" (sectionId) to the "client" (worker) that will connect to the particular network service that will deal with that specific entity. Simplest method (see below) is to use a reverse map of id to worker. If memory is a constraint another possibility is to use a function (e.g. sectionId % number of services).

To give the services as much work as possible, there is a simple batching algorithm that will fill to batch to some user specified maximum. This will allow chunks of work to be sized roughly according to how fast the remote services are able to consume them.

public class Worker implements Runnable {

    private final Map<Integer, Section> sections;
    private final BlockingQueue<SubProblem> problemQ = new ArrayBlockingQueue<SubProblem>(4096);
    private final int batchSize;

    public Worker(final Map<Integer, Section> sectionsIHave, final int batchSize) {
        this.sections = sectionsIHave;
        this.batchSize = batchSize;
    }

    public Set<Integer> getSectionIds() {
        return sections.keySet();
    }

    public void execute(final SubProblem command) throws InterruptedException {

        if (sections.containsKey(command.getSectionId())) {
            problemQ.put(command);
        } else {
            throw new IllegalArgumentException("Invalid section id for worker: " + command.getSectionId());
        }

    }

    @Override
    public void run() {
        final List<SubProblem> batch = new ArrayList<SubProblem>(batchSize);
        while (!Thread.interrupted()) {
            batch.clear();

            try {
                batch.add(problemQ.take());
                for (int i = 1; i < batchSize; i++) {
                    final SubProblem problem = problemQ.poll();
                    if (problem != null) {
                        batch.add(problem);
                    } else {
                        break;
                    }

                    process(batch);
                }
            } catch (final InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void process(final List<SubProblem> batch) {
        // Submit to remote process.
    }

    private static Map<Integer, Worker> indexWorkers(final List<Worker> workers) {
        final Map<Integer, Worker> temp = new HashMap<Integer, Worker>();
        for (final Worker worker : workers) {
            for (final Integer sectionId : worker.getSectionIds()) {
                temp.put(sectionId, worker);
            }
        }
        return Collections.unmodifiableMap(temp);
    }

    public static void main(final String[] args) throws InterruptedException {
     // Load workers, where worker is bound to single remote service
        final List<Worker> workers = getWorkers();
        final Map<Integer, Worker> workerReverseIndex = indexWorkers(workers);
        final List<SubProblem> subProblems = getSubProblems();
        for (final SubProblem problem : subProblems) {
            final Worker w = workerReverseIndex.get(problem.getSectionId());
            w.execute(problem);
        }
    }
}
Michael Barker
Thank you! But you know, problem is our `threads` have state (actually they are distinct physical machines).. If I just chop up sections in the middle, the thread that executes left or right, might not have that "section"...
Enno Shioji