I've got a Python script running Django for database and memcache, but it's notably runnning as a standalone daemon (i.e. not responding to webserver requests). The daemon checks a Django model Requisition for objects with a status=STATUS_NEW
, then marks them STATUS_WORKING and puts them into a queue.
A number of processes (created using the multiprocess package) will pull things out of the Queue and do work on the Requisition with the pr.id
that was passed to the Queue. I believe the memory leak is probably in the following code (but it could be in the 'Worker' code on the other side of the Queue though this is unlikely because because the memory size is growing even when no Requisitions are coming up -- i.e. when the workers are all blocking on Queue.get()).
from requisitions.models import Requisition # our Django model
from multiprocessing import Queue
while True:
# Wait for "N"ew requisitions, then pop them into the queue.
for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
pr.set_status(pr.STATUS_WORKING)
pr.save()
queue.put(pr.id)
time.sleep(settings.DAEMON_POLL_WAIT)
Where settings.DAEMON_POLL_WAIT=0.01
.
It seems if I leave this running for a period of time (i.e. a couple days) the Python process will grow to infinite size and eventually the system will run out of memory.
What's going on here (or how can I find out), and more importantly - how can you run a daemon that does this?
My first thought is to change the dynamic of the function, notably by putting the check for new Requisition objects into a django.core.cache cache
, i.e.
from django.core.cache import cache
while True:
time.sleep(settings.DAEMON_POLL_WAIT)
if cache.get('new_requisitions'):
# Possible race condition
cache.clear()
process_new_requisitions(queue)
def process_new_requisitions(queue):
for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW):
pr.set_status(pr.STATUS_WORKING)
pr.save()
queue.put(pr.id)
The process that's creating Requisitions with status=STATUS_NEW
can do a cache.set('new_requisitions', 1)
(or alternatively we could catch a signal or Requisition.save() event where a new Requisition is being created and then set the flag in the cache from there).
However I'm not sure that the solution I've proposed here addresses the memory issues (which are probably related to garbage collection - so the scoping by way of the process_new_requisitions
may solve the problem).
I'm grateful for any thoughts and feedback.