views:

57

answers:

3

I'm working on a system which has to handle a number of race-conditions when serving jobs to a number of worker-machines.

The clients would query the system for jobs with status='0' (ToDo), then, in an atomic way, update the 'oldest' row with status='1' (Locked) and retrieve the id for that row (for updating the job with worker information like which machine is working on it etc.).

The main issue here is that there might be any number of clients updating at the same time. A solution would be to lock around 20 of the rows with status='0', update the oldest one and release all the locks again afterwards. I've been looking into the TransactionMiddleware but I don't see how this would prevent the case of the oldest one being updated from under me after I query it.

I've looked into the QuerySet.update() thing, and it looks promising, but in the case of two clients getting a hold of the same record, the status would simply updated, and we would have two workers working on the same job.. I'm really at a loss here.

I also found ticket #2705 which seems to handle the case nicely, but I have no idea how to get the code from there because of my limited SVN experience (the last updates are simply diffs, but I don't know how to merge that with the trunk of the code).

Code: Result = Job

class Result(models.Model):
"""
Result: completed- and pending runs

'ToDo': job hasn't been acquired by a client
'Locked': job has been acquired
'Paused'
"""
# relations
run = models.ForeignKey(Run)
input = models.ForeignKey(Input)

PROOF_CHOICES = (
    (1, 'Maybe'),
    (2, 'No'),
    (3, 'Yes'),
    (4, 'Killed'),
    (5, 'Error'),
    (6, 'NA'),
)
proof_status = models.IntegerField(
    choices=PROOF_CHOICES,
    default=6,
    editable=False)

STATUS_CHOICES = (
    (0, 'ToDo'),
    (1, 'Locked'),
    (2, 'Done'),
)
result_status = models.IntegerField(choices=STATUS_CHOICES, editable=False, default=0)

# != 'None' => status = 'Done'
proof_data = models.FileField(upload_to='results/',
    null=True, blank=True)
# part of the proof_data
stderr = models.TextField(editable=False,
    null=True, blank=True)

realtime = models.TimeField(editable=False,
    null=True, blank=True)
usertime = models.TimeField(editable=False,
    null=True, blank=True)
systemtime = models.TimeField(editable=False,
    null=True, blank=True)

# updated when client sets status to locked
start_time = models.DateTimeField(editable=False)

worker = models.ForeignKey('Worker', related_name='solved',
    null=True, blank=True)
A: 

You have two choices off the top of my head. One is to lock rows immediately upon retrieval and only release the lock once the appropriate one has been marked as in use. The problem here is that no other client process can even look at the jobs which don't get selected. If you're always just automatically selecting the last one then it may be a brief enough of a window to be o.k. for you.

The other option would be to bring back the rows that are open at the time of the query, but to then check again whenever the client tries to grab a job to work with. When a client attempts to update a job to work on it a check would first be done to see if it's still available. If someone else has already grabbed it then a notification would be sent back to the client. This allows all of the clients to see all of the jobs as snapshots, but if they are constantly grabbing the latest one then you might have the clients constantly receiving notifications that a job is already in use. Maybe this is the race condition to which you're referring?

One way to get around that would be to return the jobs in specific groups to the clients so that they are not always getting the same lists. For example, break them down by geographic area or even just randomly. For example, each client could have an ID of 0 to 9. Take the mod of an ID on the jobs and send back those jobs with the same ending digit to the client. Don't limit it to just those jobs though, as you don't want there to be jobs that you can't reach. So for example if you had clients of 1, 2, and 3 and a job of 104 then no one would be able to get to it. So, once there aren't enough jobs with the correct ending digit jobs would start coming back with other digits just to fill the list. You might need to play around with the exact algorithm here, but hopefully this gives you an idea.

How you lock the rows in your database in order to update them and/or send back the notifications will largely depend on your RDBMS. In MS SQL Server you could wrap all of that work nicely in a stored procedure as long as user intervention isn't needed in the middle of it.

I hope this helps.

Tom H.
The way we did it in the old system, was the locking of approx. 20 rows matching the status='0', selecting the oldest and so on, but this did introduce some race-conditions which were sort of odd, and I've been tasked to eliminate them. Plus, I would really like to do this in a Django-ish manner. We are using PostgreSQL as the DB backend, and if we have to go to custom SQL so be it, but is there a way to do the stuff I want to do while still guaranteeing atomicity?We have the works in categories, meaning only a subset of the workers are accessing any 'set' of 'ToDo' results at any one time.
Paddie
A: 

To merge #2705 into your django, you need to download it first:

cd <django-dir>
wget http://code.djangoproject.com/attachment/ticket/2705/for_update_11366_cdestigter.diff?format=raw

then rewind svn to the necessary django version:

svn update -r11366

then apply it:

patch -p1 for_update_11366_cdestigter.diff

It will inform you which files were patched successfully and which were not. In the unlikely case of conflicts you can fix them manually looking at http://code.djangoproject.com/attachment/ticket/2705/for_update_11366_cdestigter.diff

To unapply the patch, just write

svn revert --recursive . 
Antony Hatchkins
Thank you Anthony, this is a great help. I'll try this right away.
Paddie
A: 

If your django is running on one machine, there is a much simpler way to do it... Excuse the pseudo-code as the details of your implementation aren't clear.

from threading import Lock

workers_lock = Lock()

def get_work(request):
    workers_lock.acquire()
    try:
        # Imagine this method exists for brevity
        work_item = WorkItem.get_oldest()
        work_item.result_status = 1
        work_item.save()
    finally:
        workers_lock.release()

    return work_item
kibitzer
Ha, that is thinking outside the box! Love it! :)
Paddie