views:

4373

answers:

11

I have a list/queue of 200 commands that I need to run in a shell on a Linux server.

I only want to have a maximum of 10 processes running (from the queue) at once. Some processes will take a few seconds to complete, other processes will take much longer.

When a process finishes I want the next command to be "popped" from the queue and executed.

Does anyone have code to solve this problem?

Further elaboration:

There's 200 pieces of work that need to be done, in a queue of some sort. I want to have at most 10 pieces of work going on at once. When a thread finishes a piece of work it should ask the queue for the next piece of work. If there's no more work in the queue, the thread should die. When all the threads have died it means all the work has been done.

The actual problem I'm trying to solve is using imapsync to synchronize 200 mailboxes from an old mail server to a new mail server. Some users have large mailboxes and take a long time tto sync, others have very small mailboxes and sync quickly.

+1  A: 

If you are going to use Python, I recommend using Twisted for this: http://twistedmatrix.com/trac/

Specifically Twisted Runner: http://twistedmatrix.com/trac/wiki/TwistedRunner

apphacker
A: 

Can you elaborate what you mean by in parallel? It sounds like you need to implement some sort of locking in the queue so your entries are not selected twice, etc and the commands run only once.

Most queue systems cheat -- they just write a giant to-do list, then select e.g. ten items, work them, and select the next ten items. There's no parallelization.

If you provide some more details, I'm sure we can help you out.

Till
+4  A: 

GNU make (and perhaps other implementations as well) has the -j argument, which governs how many jobs it will run at once. When a job completes, make will start another one.

Ignacio Vazquez-Abrams
+2  A: 

Well, if they are largely independent of each other, I'd think in terms of:

Initialize an array of jobs pending (queue, ...) - 200 entries
Initialize an array of jobs running - empty

while (jobs still pending and queue of jobs running still has space)
    take a job off the pending queue
    launch it in background
    if (queue of jobs running is full)
        wait for a job to finish
        remove from jobs running queue
while (queue of jobs is not empty)
    wait for job to finish
    remove from jobs running queue

Note that the tail test in the main loop means that if the 'jobs running queue' has space when the while loop iterates - preventing premature termination of the loop. I think the logic is sound.

I can see how to do that in C fairly easily - it wouldn't be all that hard in Perl, either (and therefore not too hard in the other scripting languages - Python, Ruby, Tcl, etc). I'm not at all sure I'd want to do it in shell - the wait command in shell waits for all children to terminate, rather than for some child to terminate.

Jonathan Leffler
+13  A: 

On the shell, xargs can be used to queue parallel command processing. For example, for having always 3 sleeps in parallel, sleeping for 1 second each, and executing 10 sleeps in total do

echo {1..10} | xargs -d ' ' -n1 -P3 sh -c 'sleep 1s' _

And it would sleep for 4 seconds in total. If you have a list of names, and want to pass the names to commands executed, again executing 3 commands in parallel, do

cat names | xargs -n1 -P3 process_name

Would execute the command process_name alice, process_name bob and so on.

Johannes Schaub - litb
+11  A: 

I would imagine you could do this using make and the make -j xx command.

Perhaps a makefile like this

all : usera userb userc....

usera:
       imapsync usera
userb:
       imapsync userb
....

make -j 10 -f makefile

ErgoSum
This worked exactly as I hoped it would. I wrote some code to generate the Makefile. It ended up being over 1000 lines. Thanks!
mlambie
+2  A: 

In python, you could try:

import Queue, os, threading

# synchronised queue
queue = Queue.Queue(0)    # 0 means no maximum size

# do stuff to initialise queue with strings
# representing os commands
queue.put('sleep 10')
queue.put('echo Sleeping..')
# etc
# or use python to generate commands, e.g.
# for username in ['joe', 'bob', 'fred']:
#    queue.put('imapsync %s' % username)

def go():
  while True:
    try:
      # False here means no blocking: raise exception if queue empty
      command = queue.get(False)
      # Run command.  python also has subprocess module which is more
      # featureful but I am not very familiar with it.
      # os.system is easy :-)
      os.system(command)
    except Queue.Empty:
      return

for i in range(10):   # change this to run more/fewer threads
  threading.Thread(target=go).start()

Untested...

(of course, python itself is single-threaded. You should still get the benefit of multiple threads in terms of waiting for IO, though.)

John Fouhy
1) os.system could be replaced with the new improved subprocess module.2) It doesn't matter that CPython has a GIL because you're running external commands, not Python code (functions).
Cristian Ciupitu
If you replace `threading.Thread` by `multiprocessing.Process` and `Queue` by `multiprocessing.Queue` then the code will run using multiple processes.
J.F. Sebastian
A: 

Python's multiprocessing module would seem to fit your issue nicely. It's a high-level package that supports threading by process.

Jeff Bauer
+6  A: 

For this kind of job PPSS is written: Parallel processing shell script. Google for this name and you will find it, I won't linkspam.

Greets from the author.

haha, great. greets back from litb :P
Johannes Schaub - litb
This looks ideal. I'll check it out next time I'm faced with a similar problem. Thanks for updating this thread with your recommendation.
mlambie
awesome! can we set it up dynamic threading that e.g. 80% CPU/Ram allow ?
Devrim
A: 

Heres something that works in ruby (I know, some ruby things may not be threadsafe, but all seems to work here):

Create an array each of which contains a shell commmand

@assemblyCommands = [] @assemblyCommands.push("cd #{assembly1Dir}; cap3 *.fasta #{assembly1params} > cap3.stdout 2> cap3.stderr") ...

Then run the commands in eg 4 threads (each thread executes then checks if there are more commands to be run).

threads = []
['proc1', 'proc2', 'proc3', 'proc4'].each do |processor| threads << Thread.new(processor) do |threadid| %x[#{@assemblyCommands.pop}] until @assemblyCommands.empty? end end threads.each { |aThread| aThread.join }

Yannick Wurm
+1  A: 

Parallel https://savannah.nongnu.org/projects/parallel/ is made exatcly for this purpose.

cat userlist | parallel imapsync

One of the beauties of Parallel compared to other solutions is that it makes sure output is not mixed. Doing traceroute in parallel works fine for example:

(echo foss.org.my; echo www.debian.org; echo www.freenetproject.org) | parallel traceroute

Ole Tange