This is a relatively straighforward scheduling algorithm. A couple of issues seem tricky at first but really aren't (signal/wait and cache locality). I'll explain the techniques, then give some code I wrote to illustrate the concepts, then give some final notes on tuning.
Algorithms to use
Handling the signal/wait efficiently is seems tricky at first but actually turns out to be extremely easy. Since signal/wait pairs can't nest or overlap, there can really be only two being satisfied and one being waited on at any given time. Simply keeping a "CurrentSignal" pointer to the most recent unsatisfied signal is all that necessary to do the bookkeeping.
Making sure that cores don't jump around between lists too much and that a given list isn't shared between too many cores is also relatively easy: Each core keeps taking jobs from the same list until it blocks, then switches to another list. To keep all the cores from ganging up on a single list, a WorkerCount is kept for each list that tells how many cores are using it, and the lists are organized so cores select lists with fewer workers first.
Locking can be kept simple by locking only the scheduler or the list you are working on at any time, never both.
You expressed some concern about adding jobs to a list after the list has already started executing. It turns out that supporting this is almost trivial: All it needs is a call from the list to the scheduler when a job is added to a list that is currently completed, so the scheduler can schedule the new job.
Data structures
Here are the basic data structures you'll need:
class Scheduler
{
LinkedList<JobList>[] Ready; // Indexed by number of cores working on list
LinkedList<JobList> Blocked;
int ReadyCount;
bool Exit;
public:
void AddList(JobList* joblist);
void DoWork();
internal:
void UpdateQueues(JobList* joblist);
void NotifyBlockedCores();
void WaitForNotifyBlockedCores();
}
class JobList
{
Scheduler Scheduler;
LinkedList<JobList> CurrentQueue;
LinkedList<Job> Jobs; // All jobs in the job list
LinkedList<SignalPoint> Signals; // All signal/wait pairs in the job list,
plus a dummy
Job* NextJob; // The next job to schedule, if any
int NextJobIndex; // The index of NextJob
SignalPoint* CurrentSignal; // First signal not fully satisfied
int WorkerCount; // # of cores executing in this list
public:
void AddJob(Job* job);
void AddSignal();
void AddWait();
internal:
void Ready { get; }
void GetNextReadyJob(Job& job, int& jobIndex);
void MarkJobCompleted(Job job, int jobIndex);
}
class SignalPoint
{
int SignalJobIndex = int.MaxValue;
int WaitJobIndex = int.MaxValue;
int IncompleteCount = 0;
}
Note that the signal points for a given joblist are most conveniently stored separately from the actual list of jobs.
Scheduler implementation
The scheduler keeps track of job lists, assigns them to cores, and executes jobs from the job lists.
AddList adds a job to the scheduler. It must be placed on the Ready or Blocked queue depending on whether it has any work to do (ie. whether any jobs have been added to it yet), so just call UpdateQueues.
void Scheduler.AddList(JobList* joblist)
{
joblist.Scheduler = this;
UpdateQueues(joblist);
}
UpdateQueues centralizes the queue update logic. Notice the algorithm for selecting a new queue, and also the notification to idle cores when work becomes available:
void Scheduler.UpdateQueues(JobList* joblist)
{
lock(this)
{
// Remove from prior queue, if any
if(joblist.CurrentQueue!=null)
{
if(joblist.CurrentQueue!=Blocked) ReadyCount--;
joblist.CurrentQueue.Remove(joblist);
}
// Select new queue
joblist.CurrentQueue = joblist.Ready ? Ready[joblist.WorkerCount] : Blocked;
// Add to new queue
joblist.CurrentQueue.Add(joblist);
if(joblist.CurrentQueue!=Blocked)
if(++ReadyCount==1) NotifyBlockedCores();
}
}
DoWork is a normal scheduler work except: 1. It selects the JobList with the fewest workers, 2. It works jobs from a given joblist until it can't any more, and 3. It stores the jobIndex as well as the job so the joblist can easily update completion state (implementation detail).
void Scheduler.DoWork()
{
while(!Exit)
{
// Get a job list to work on
JobList *list = null;
lock(this)
{
for(int i=0; i<Ready.Length; i++)
if(!Ready[i].Empty)
{
list = Ready[i].First;
break;
}
if(list==null) // No work to do
{
WaitForNotifyBlockedCores();
continue;
}
list.WorkerCount++;
UpdateQueues(list);
}
// Execute jobs in the list as long as possible
while(true)
{
int jobIndex;
Job job;
if(!GetNextReadyJob(&job, &jobIndex)) break;
job.Execute();
list.MarkJobCompleted(job, jobIndex);
}
// Release the job list
lock(this)
{
list.WorkerCount--;
UpdateQueues(list);
}
}
}
JobList implementation
The JobList keeps track of how the signal/wait are interspersed with the jobs and keeps track of which signal/wait pairs have already completed everything before their signal point.
The constructor creates a dummy signal point to add jobs to. This signal point becomes a real signal point (and a new dummy is added) whenever a new "signal" is added.
JobList.JobList()
{
// Always have a dummy signal point at the end
Signals.Add(CurrentSignal = new SignalPoint());
}
AddJob adds a job to the list. It is marked as incomplete in the SignalPoint. When the job is actually executed, the IncompleteCount of the same SignalPoint is decremented. It is also necessary to tell the scheduler that things might have changed, since the new job could be immediately executable. Note that the scheduler is called after the lock on "this" is released to avoid deadlock.
void JobList.AddJob(Job job)
{
lock(this)
{
Jobs.Add(job);
Signals.Last.IncompleteCount++;
if(NextJob == null)
NextJob = job;
}
if(Scheduler!=null)
Scheduler.UpdateQueues(this);
}
AddSignal and AddWait add signals and waits to the job list. Notice that AddSignal actually creates a new SignalPoint, and AddWait just fills in the wait point information in the previously created SignalPoint.
void JobList.AddSignal()
{
lock(this)
{
Signals.Last.SignalJobIndex = Jobs.Count; // Reify dummy signal point
Signals.Add(new SignalPoint()); // Create new dummy signal point
}
}
void JobList.AddWait()
{
lock(this)
{
Signals.Last.Previous.WaitJobIndex = Jobs.Count;
}
}
The Ready property determines whether the list is ready for additional cores assigned to it. There may be two or three cores working on the list without the list being "ready" if the next job is waiting for a signal before it can start.
bool JobList.Ready
{
get
{
lock(this)
{
return NextJob!=null &&
(CurrentSignal==Signals.Last ||
NextJobIndex < CurrentSignal.WaitJobIndex);
}
}
}
GetNextReadyJob is very simple: If we are ready, just return the next job in the list.
void JobList.GetNextReadyJob(Job& job, int& jobIndex)
{
lock(this)
{
if(!Ready) return false;
jobIndex = list.NextJobIndex++;
job = list.NextJob; list.NextJob = job.Next;
return true;
}
}
MarkJobCompleted is probably the most interesting of all. Because of the structure of the signals and waits, the current job is either before CurrentSignal or is between CurrentSignal and CurrentSignal.Next (if it is after the last actual signal, it will be counted as being between CurrentSignal and the dummy SignalPoint at the end). We need to reduce the count of incomplete jobs. We may also need to go on to the next signal if this count goes to zero. Of course we never pass the dummy SignalPoint at the end.
Note that this code doesn't have a call to Scheduler.UpdateQueue because we know the scheduler will be calling GetNextReadyJob in just a second and if it returns false it will be calling UpdateQueue anyway.
void JobList.MarkJobCompleted(Job job, int jobIndex)
{
lock(this)
{
if(jobIndex >= CurrentSignal.SignalJobIndex)
CurrentSignal.Next.IncompleteCount--;
else
{
CurrentSignal.IncompleteCount--;
if(CurrentSignal.IncompleteCount==0)
if(CurrentSignal.WaitJobIndex < int.MaxValue)
CurrentSignal = CurrentSignal.Next;
}
}
}
Tuning based on list length, job length estimates, etc
The code above doesn't pay any attention to how long the job lists are, so if there are a hundred tiny job lists and one huge one it is possible for each core to take a separate tiny job list and then all congregate on the huge one, leading to inefficiency. This can be solved by making Ready[] an array of priority queues prioritized on (joblist.Jobs.Count - joblist.NextJobIndex)
, but with the priority only actually updated in normal UpdateQueue situations for efficiency.
This could get even more sophisticated by creating a heuristic that takes into account the number and spacing of signal/wait combinations to determine the priority. This heuristic would be best tuned by using a distribution of job durations and resource usage.
If individual job durations are known, or if good estimates are available for them, then the heuristic could use the estimated remaining duration instead of just the list length.
Final notes
This is a rather standard solution to the problem you present. You can use the algorithms I gave and they will work, including the locking, but you won't be able to compile the code I wrote above for several reasons:
It is a crazy mix of C++ and C# syntax. I originally started writing in C# then changed a bunch of the syntax to C++ style since I thought that was more likely what you would be using for such a project. But I left in quite a few C#-isms. Fortunately no LINQ ;-).
The LinkedList details have some hand-waving. I assume the list can do First, Last, Add and Remove and that items in the list can do Previous and Next. But I didn't use the actual API for any real linked list class I know of.
I didn't compile or test it. I guarantee there is a bug or two in there somewhere.
Bottom line: You should treat the code above as pseudocode even though it looks like the real McCoy.
Enjoy!