views:

261

answers:

2

I'm working on a simple job threading framework which is very similar to the one described in id Tech 5 Challenges. On the most basic level, I have a set of lists of jobs, and I want to schedule these list across a bunch of CPU threads (using a standard thread pool for the actual dispatching.) However, I wonder how this signal/wait stuff inside a wait list can be efficiently implemented. As I understand it, the wait token blocks the list execution if the signal token has not been executed. This implicitly means that everything before a signal has to finish before the signal can be raised. So let's say we have a list like this:

J1, J2, S, J3, W, J4

then the dispatching can go like this:

#1: J1, J2, J3
<wait for J1, J2, run other lists if possible>
#2: J4

However, this ain't that easy as it seems, as given a set of lists, I would have to move some of them between ready and waiting and also have special code to gather all jobs before a signal and tag something onto them, so that they can trigger the signal if and only if they have all finished (meaning for instance that it's no longer possible to add jobs to the list while it is executed, as following signals access the previously inserted jobs.)

Is there any "standard" way of implementing this efficiently? I also wonder how to best schedule the job list execution, right now, each core grabs a job list, and schedules all jobs in it, which gives pretty good scaling (for 32k jobs à 0.7 ms, I get 101%, which I guess is partly due to the fact that the single threaded version is being scheduled onto different cores some times.)

+3  A: 

This is a relatively straighforward scheduling algorithm. A couple of issues seem tricky at first but really aren't (signal/wait and cache locality). I'll explain the techniques, then give some code I wrote to illustrate the concepts, then give some final notes on tuning.

Algorithms to use

Handling the signal/wait efficiently is seems tricky at first but actually turns out to be extremely easy. Since signal/wait pairs can't nest or overlap, there can really be only two being satisfied and one being waited on at any given time. Simply keeping a "CurrentSignal" pointer to the most recent unsatisfied signal is all that necessary to do the bookkeeping.

Making sure that cores don't jump around between lists too much and that a given list isn't shared between too many cores is also relatively easy: Each core keeps taking jobs from the same list until it blocks, then switches to another list. To keep all the cores from ganging up on a single list, a WorkerCount is kept for each list that tells how many cores are using it, and the lists are organized so cores select lists with fewer workers first.

Locking can be kept simple by locking only the scheduler or the list you are working on at any time, never both.

You expressed some concern about adding jobs to a list after the list has already started executing. It turns out that supporting this is almost trivial: All it needs is a call from the list to the scheduler when a job is added to a list that is currently completed, so the scheduler can schedule the new job.

Data structures

Here are the basic data structures you'll need:

class Scheduler
{
  LinkedList<JobList>[] Ready; // Indexed by number of cores working on list
  LinkedList<JobList> Blocked;
  int ReadyCount;
  bool Exit;

  public:
    void AddList(JobList* joblist);
    void DoWork();

  internal:
    void UpdateQueues(JobList* joblist);

    void NotifyBlockedCores();
    void WaitForNotifyBlockedCores();
}

class JobList
{
  Scheduler Scheduler;
  LinkedList<JobList> CurrentQueue;

  LinkedList<Job> Jobs;            // All jobs in the job list
  LinkedList<SignalPoint> Signals; // All signal/wait pairs in the job list,
                                      plus a dummy

  Job* NextJob;                    // The next job to schedule, if any
  int NextJobIndex;                // The index of NextJob

  SignalPoint* CurrentSignal;      // First signal not fully satisfied

  int WorkerCount;                 // # of cores executing in this list

  public:
    void AddJob(Job* job);
    void AddSignal();
    void AddWait();

  internal:
    void Ready { get; }
    void GetNextReadyJob(Job& job, int& jobIndex);
    void MarkJobCompleted(Job job, int jobIndex);
}
class SignalPoint
{
  int SignalJobIndex = int.MaxValue;
  int WaitJobIndex = int.MaxValue;
  int IncompleteCount = 0;
}

Note that the signal points for a given joblist are most conveniently stored separately from the actual list of jobs.

Scheduler implementation

The scheduler keeps track of job lists, assigns them to cores, and executes jobs from the job lists.

AddList adds a job to the scheduler. It must be placed on the Ready or Blocked queue depending on whether it has any work to do (ie. whether any jobs have been added to it yet), so just call UpdateQueues.

void Scheduler.AddList(JobList* joblist)
{
  joblist.Scheduler = this;
  UpdateQueues(joblist);
}

UpdateQueues centralizes the queue update logic. Notice the algorithm for selecting a new queue, and also the notification to idle cores when work becomes available:

void Scheduler.UpdateQueues(JobList* joblist)
{
  lock(this)
  {
    // Remove from prior queue, if any
    if(joblist.CurrentQueue!=null)
    {
      if(joblist.CurrentQueue!=Blocked) ReadyCount--;
      joblist.CurrentQueue.Remove(joblist);
    }

    // Select new queue
    joblist.CurrentQueue = joblist.Ready ? Ready[joblist.WorkerCount] : Blocked;

    // Add to new queue
    joblist.CurrentQueue.Add(joblist);
    if(joblist.CurrentQueue!=Blocked)
      if(++ReadyCount==1) NotifyBlockedCores();
  }
}

DoWork is a normal scheduler work except: 1. It selects the JobList with the fewest workers, 2. It works jobs from a given joblist until it can't any more, and 3. It stores the jobIndex as well as the job so the joblist can easily update completion state (implementation detail).

void Scheduler.DoWork()
{
  while(!Exit)
  {
    // Get a job list to work on
    JobList *list = null;
    lock(this)
    {
      for(int i=0; i<Ready.Length; i++)
        if(!Ready[i].Empty)
        {
          list = Ready[i].First;
          break;
        }
      if(list==null)  // No work to do
      {
        WaitForNotifyBlockedCores();
        continue;
      }
      list.WorkerCount++;
      UpdateQueues(list);
    }

    // Execute jobs in the list as long as possible
    while(true)
    {
      int jobIndex;
      Job job;
      if(!GetNextReadyJob(&job, &jobIndex)) break;

      job.Execute();

      list.MarkJobCompleted(job, jobIndex);
    }

    // Release the job list
    lock(this)
    {
      list.WorkerCount--;
      UpdateQueues(list);
    }
  }
}

JobList implementation

The JobList keeps track of how the signal/wait are interspersed with the jobs and keeps track of which signal/wait pairs have already completed everything before their signal point.

The constructor creates a dummy signal point to add jobs to. This signal point becomes a real signal point (and a new dummy is added) whenever a new "signal" is added.

JobList.JobList()
{
  // Always have a dummy signal point at the end
  Signals.Add(CurrentSignal = new SignalPoint());
}

AddJob adds a job to the list. It is marked as incomplete in the SignalPoint. When the job is actually executed, the IncompleteCount of the same SignalPoint is decremented. It is also necessary to tell the scheduler that things might have changed, since the new job could be immediately executable. Note that the scheduler is called after the lock on "this" is released to avoid deadlock.

void JobList.AddJob(Job job)
{
  lock(this)
  {
    Jobs.Add(job);
    Signals.Last.IncompleteCount++;
    if(NextJob == null)
      NextJob = job;
  }
  if(Scheduler!=null)
    Scheduler.UpdateQueues(this);
}

AddSignal and AddWait add signals and waits to the job list. Notice that AddSignal actually creates a new SignalPoint, and AddWait just fills in the wait point information in the previously created SignalPoint.

void JobList.AddSignal()
{
  lock(this)
  {
    Signals.Last.SignalJobIndex = Jobs.Count;  // Reify dummy signal point
    Signals.Add(new SignalPoint());            // Create new dummy signal point
  }
}


void JobList.AddWait()
{
  lock(this)
  {
    Signals.Last.Previous.WaitJobIndex = Jobs.Count;
  }
}

The Ready property determines whether the list is ready for additional cores assigned to it. There may be two or three cores working on the list without the list being "ready" if the next job is waiting for a signal before it can start.

bool JobList.Ready
{
  get
  {
    lock(this)
    {
      return NextJob!=null &&
        (CurrentSignal==Signals.Last ||
         NextJobIndex < CurrentSignal.WaitJobIndex);
    }
  }
}

GetNextReadyJob is very simple: If we are ready, just return the next job in the list.

void JobList.GetNextReadyJob(Job& job, int& jobIndex)
{
  lock(this)
  {
    if(!Ready) return false;
    jobIndex = list.NextJobIndex++;
    job = list.NextJob; list.NextJob = job.Next;
    return true;

  }
}

MarkJobCompleted is probably the most interesting of all. Because of the structure of the signals and waits, the current job is either before CurrentSignal or is between CurrentSignal and CurrentSignal.Next (if it is after the last actual signal, it will be counted as being between CurrentSignal and the dummy SignalPoint at the end). We need to reduce the count of incomplete jobs. We may also need to go on to the next signal if this count goes to zero. Of course we never pass the dummy SignalPoint at the end.

Note that this code doesn't have a call to Scheduler.UpdateQueue because we know the scheduler will be calling GetNextReadyJob in just a second and if it returns false it will be calling UpdateQueue anyway.

void JobList.MarkJobCompleted(Job job, int jobIndex)
{
  lock(this)
  {
    if(jobIndex >= CurrentSignal.SignalJobIndex)
      CurrentSignal.Next.IncompleteCount--;
    else
    {
      CurrentSignal.IncompleteCount--;
      if(CurrentSignal.IncompleteCount==0)
        if(CurrentSignal.WaitJobIndex < int.MaxValue)
          CurrentSignal = CurrentSignal.Next;
    }
  }
}

Tuning based on list length, job length estimates, etc

The code above doesn't pay any attention to how long the job lists are, so if there are a hundred tiny job lists and one huge one it is possible for each core to take a separate tiny job list and then all congregate on the huge one, leading to inefficiency. This can be solved by making Ready[] an array of priority queues prioritized on (joblist.Jobs.Count - joblist.NextJobIndex), but with the priority only actually updated in normal UpdateQueue situations for efficiency.

This could get even more sophisticated by creating a heuristic that takes into account the number and spacing of signal/wait combinations to determine the priority. This heuristic would be best tuned by using a distribution of job durations and resource usage.

If individual job durations are known, or if good estimates are available for them, then the heuristic could use the estimated remaining duration instead of just the list length.

Final notes

This is a rather standard solution to the problem you present. You can use the algorithms I gave and they will work, including the locking, but you won't be able to compile the code I wrote above for several reasons:

  1. It is a crazy mix of C++ and C# syntax. I originally started writing in C# then changed a bunch of the syntax to C++ style since I thought that was more likely what you would be using for such a project. But I left in quite a few C#-isms. Fortunately no LINQ ;-).

  2. The LinkedList details have some hand-waving. I assume the list can do First, Last, Add and Remove and that items in the list can do Previous and Next. But I didn't use the actual API for any real linked list class I know of.

  3. I didn't compile or test it. I guarantee there is a bug or two in there somewhere.

Bottom line: You should treat the code above as pseudocode even though it looks like the real McCoy.

Enjoy!

Ray Burns
Hi, nice post. Do you have any suggestions for reading material on this topic? Thanks.
Mark Simpson
@Mark: Sorry, I know I read several good articles related to this 25-30 years ago but couldn't give you a reference now. Things like this just stick in your brain and don't go away, if you know what I mean.
Ray Burns
+1  A: 

If you have access to a work stealing framework in your environment (for example, Cilk if you are in C, or the fork/join framework of Doug Lea in Java), you can easily get a simple and clean solution (compared to low-level ad-hoc attempts, which you will probably will have to do if you can't use something like that), which give you automatic load balancing and good data locality.

Here is a high level description of a solution: you start one thread per core. Each one gets assigned a list till they are exhausted (many ways to do this - that's the task of very good concurrent queuing mechanisms, and that's a reason you would like to avoid do-it-yourself solutions if possible). Each worker goes through the rows of the lists one by one: - Two queues are maintained, one for those jobs before a signal token, and one or those after it. - When a job is encountered, it is forked, and added to the respective queue (depending on whether we saw a signal token or not) - When a wait token is encountered, we join all jobs before the signal (that's the semantics you describe if I understood correctly). Note that in the code I use helpJoin(), it means that the thread will actually help (by popping forked tasks and executing them till the join can proceed)

"Fork" means putting the task in a thread-local queue, which will either be executed by the thread itself later, or it can be stolen by another thread that looks for some work to do.

For illustration purposes, here is a working ~80 lines simulation of this scenario, using the aforementioned java framework. It creates as many threads as available cores, and some lists, and starts executing them. Note how simple the run() method is - while it still has the benefits of load balancing and that threads mostly execute tasks from their own list, unless they run out of work and start stealing to get some. Of course, if you are not in Java or C, you would have to find a similar framework, but the same set of core ideas would similarly simplify your code regardless of the language.

import java.util.*;
import java.util.concurrent.*;
import jsr166y.ForkJoinPool;
import jsr166y.ForkJoinTask;
import jsr166y.RecursiveTask;

public class FJTest {
    public static void main(String[] args) throws Exception {
        Iterable<List<TaskType>> lists = createLists(10);

        ForkJoinPool pool = new ForkJoinPool();

        for (final List<TaskType> list : lists) {
            pool.submit(new Runnable() {
                public void run() {
                    List<ForkJoinTask> beforeSignal = new ArrayList<ForkJoinTask>();
                    List<ForkJoinTask> afterSignal = new ArrayList<ForkJoinTask>();
                    boolean signaled = false;
                    for (TaskType task : list) {
                        switch (task) {
                            case JOB:
                                ForkJoinTask job = new Job();
                                if (signaled == false)
                                    beforeSignal.add(job);
                                else
                                    afterSignal.add(job);
                                job.fork();
                                break;
                            case SIGNAL:
                                signaled = true;
                                break;
                            case WAIT:
                                signaled = false;
                                for (ForkJoinTask t : beforeSignal) {
                                    t.helpJoin();
                                }
                                beforeSignal = afterSignal;
                                afterSignal = new ArrayList<ForkJoinTask>();
                        }
                    }
                }
            });
        }

        pool.shutdown();
        pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    }

    private static Iterable<List<TaskType>> createLists(int size) {
        List<List<TaskType>> tasks = new ArrayList<List<TaskType>>();
        for (int i = 0; i < size; i++) {
            tasks.add(createSomeList());
        }
        return tasks;
    }

    private static List<TaskType> createSomeList() {
        return Arrays.asList(
                TaskType.JOB,
                TaskType.JOB,
                TaskType.SIGNAL,
                TaskType.JOB,
                TaskType.WAIT,
                TaskType.JOB);
    }

}

enum TaskType {
    JOB, SIGNAL, WAIT;
}
class Job extends RecursiveTask<Void> {
    @Override
    protected Void compute() {
        long x = 1;
        for (long i = 1; i < 200000001; i++) {
            x = i * x;
        }
        System.out.println(x); //just to use x
        return null;
    }
}
Dimitris Andreou