views:

22

answers:

2

I want to quickly bzip2 compress several hundred gigabytes of data using my 8 core , 16 GB ram workstation. Currently I am using a simple python script to compress a whole directory tree using bzip2 and an os.system call coupled to an os.walk call.

I see that the bzip2 only uses a single cpu while the other cpus remain relatively idle.

I am a newbie in queue and threaded processes . But I am wondering how I can implement this such that I can have four bzip2 running threads (actually I guess os.system threads ), each using probably their own cpu , that deplete files from a queue as they bzip them.

My single thread script is pasted here .

import os
import sys

for roots, dirlist , filelist in os.walk(os.curdir):
    for file in [os.path.join(roots,filegot) for filegot in filelist]:
        if "bz2" not in file:
            print "Compressing %s" % (file)
            os.system("bzip2 %s" % file)
            print ":DONE" 
+1  A: 

Use the subprocess module to spawn several processes at once. If N of them are running (N should a bit bigger than the number of CPUs you have, say 3 for 2 cores, 10 for 8), wait for one to terminate and then start another one.

Note that this might not help much since there will be a lot of disk activity which you can't parallelize. A lot of free RAM for caches helps.

Aaron Digulla
A: 

This code comes from MRAB on comp.lang.python

Try this: 
import os 
import sys 
from threading import Thread, Lock 
from Queue import Queue 
def report(message): 
     mutex.acquire() 
     print message 
     sys.stdout.flush() 
     mutex.release() 
class Compressor(Thread): 
     def __init__(self, in_queue, out_queue): 
         Thread.__init__(self) 
         self.in_queue = in_queue 
         self.out_queue = out_queue 
     def run(self): 
         while True: 
             path = self.in_queue.get() 
             sys.stdout.flush() 
             if path is None: 
                 break 
             report("Compressing %s" % path) 
             os.system("bzip2 %s" % path) 
             report("Done %s" %  path) 
             self.out_queue.put(path) 
in_queue = Queue() 
out_queue = Queue() 
mutex = Lock() 
THREAD_COUNT = 4 
worker_list = [] 
for i in range(THREAD_COUNT): 
     worker = Compressor(in_queue, out_queue) 
     worker.start() 
     worker_list.append(worker) 
for roots, dirlist, filelist in os.walk(os.curdir): 
     for file in [os.path.join(roots, filegot) for filegot in filelist]: 
         if "bz2" not in file: 
             in_queue.put(file) 
for i in range(THREAD_COUNT): 
     in_queue.put(None) 
for worker in worker_list: 
     worker.join() 
harijay