views:

560

answers:

1

I have a script (worker.py) that prints unbuffered output in the form...

1
2
3
.
.
.
n

where n is some constant number of iterations a loop in this script will make. In another script (service_controller.py) I start a number of threads, each of which starts a subprocess using subprocess.Popen(stdout=subprocess.PIPE, ...); Now, in my main thread (service_controller.py) I want to read the output of each thread's worker.py subprocess and use it to calculate an estimate for the time remaining till completion.

I have all of the logic working that reads the stdout from worker.py and determines the last printed number. The problem is that I can not figure out how to do this in a non-blocking way. If I read a constant bufsize then each read will end up waiting for the same data from each of the workers. I have tried numerous ways including using fcntl, select + os.read, etc. What is my best option here? I can post my source if needed, but I figured the explanation describes the problem well enough.

Thanks for any help here.

EDIT
Adding sample code

I have a worker that starts a subprocess.

class WorkerThread(threading.Thread):
    def __init__(self):
        self.completed = 0
        self.process = None
        self.lock = threading.RLock()
        threading.Thread.__init__(self)

    def run(self):
        cmd = ["/path/to/script", "arg1", "arg2"]
        self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, shell=False)
        #flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
        #fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)

    def get_completed(self):
        self.lock.acquire();
        fd = select.select([self.process.stdout.fileno()], [], [], 5)[0]
        if fd:
            self.data += os.read(fd, 1)
            try:
                self.completed = int(self.data.split("\n")[-2])
            except IndexError:
                pass
        self.lock.release()
        return self.completed

I then have a ThreadManager.

class ThreadManager():
    def __init__(self):
        self.pool = []
        self.running = []
        self.lock = threading.Lock()

    def clean_pool(self, pool):
        for worker in [x for x in pool is not x.isAlive()]:
            worker.join()
            pool.remove(worker)
            del worker
        return pool

    def run(self, concurrent=5):
        while len(self.running) + len(self.pool) > 0:
            self.clean_pool(self.running)
            n = min(max(concurrent - len(self.running), 0), len(self.pool))
            if n > 0:
                for worker in self.pool[0:n]:
                    worker.start()
                self.running.extend(self.pool[0:n])
                del self.pool[0:n]
            time.sleep(.01)
         for worker in self.running + self.pool:
             worker.join()

and some code to run it.

threadManager = ThreadManager()
for i in xrange(0, 5):
    threadManager.pool.append(WorkerThread())
threadManager.run()

I have stripped out a log of the other code in hopes to try to pinpoint the issue.

+2  A: 

Instead of having your service_controller being blocked by i/o access, only the thread loop should read its own controlled process output.

then, you can have method in the threaded object controlling the process to get the last polled output.

of course, don't forget in that case to use some locking mechanism to protect the buffer that will be used both by the thread to fill it and the method called by the controller to get it.

dweeves
Am I far off from what you are suggesting? I have the threaded object controlling the process to get the last polled output...
sberry2A
your get_completed method does only fill self.completed , i would suggest rename it update_completed. then adding a get_completed method returning the self.completed, (adding a threading.RLock to protect access to it).Then in your thread manager, you can periodically call get_completed on your workers.
dweeves
The get_completed method was actually supposed to return self.completed (I omitted it by mistake when retyping). I have added the RLock around the reading, but I still have the same issue.
sberry2A
what about running threadmanager in its own thread ?
dweeves
I just can't get my head wrapped around this. Perhaps some sleep tonight will make me have the breakthrough I am looking for.
sberry2A