views:

66

answers:

3

Hi All,

I would like to use thread to process a streaming input.

How can make the below code for an infinite input generate for example by using itertools.count

The code below will work if: 'for i in itertools.count():' is replaced by 'for i in xrange(5):'

from threading import Thread
from Queue import Queue, Empty
import itertools

def do_work(q):
  while True:
    try:
        x = q.get(block=False)
        print (x)
    except Empty:
        break

if __name__ == "__main__":
  work_queue = Queue()
  for i in itertools.count():
    work_queue.put(i)

  threads = [Thread(target=do_work, args=(work_queue,)) for i in range(8)]

  for t in threads: t.start()
  for t in threads: t.join()
A: 

Maybe I'm missing something, but isn't it as simple as creating and starting the threads before the for loop?

Also, having your threads terminate when there's no work seems like a bad idea, as there may be more work showing up in future. Surely you want them to block until some work is available?

jchl
You make a good point about the `break` in `do_work`. Unless the queue has buffered a sufficient amount of data, it is possible that worker threads will all terminate before more information can be placed into the queue.
unholysampler
Not really working for me. Please can you post code
Joey
@Joey This sounds like a homework question. If so, please tag it as such.
jchl
+1  A: 

The problem is that itertools.count generates an infinite sequence. This means the for loop will never end. You should put that in it's own function and make it a separate thread. This way you will have the queue growing while the worker threads get data off the queue.

unholysampler
you are right about the infinite loop. I though so too. But, it gives me a runtime error when I put the itertools in its own thread. Please post some code.
Joey
@Joey: What error?
MattH
Runtime Error Visual C++This application has requested runtime to terminate it in an unusual way.
Joey
A: 

You need to fill the queue with a thread. You need to manage the queue size. Especially if the workers are taking time to process items. You need to mark queue items done. If this is related to your other question about twitter and "extremely fast" input, then you have an awful lot more to do with regards to database inserts.

Your questions have been too vague on pretty complicated topics. You don't seem to understand enough about even what you're trying to achieve to know that it's not easy. I recommend that you are a little more specific in what you are trying to do.

Here's an example of filling and consuming a queue with threads. The queue size isn't being managed.

from threading import Thread
from Queue import Queue, Empty, Full
import itertools
from time import sleep


def do_work(q,wkr):
  while True:
    try:
      x = q.get(block=True,timeout=10)
      q.task_done()
      print "Wkr %s: Consuming %s" % (wkr,x)
      sleep(0.01)
    except Empty:
      print "Wkr %s exiting, timeout/empty" % (wkr)
      break
    sleep(0.01)

def fill_queue(q,limit=1000):
  count = itertools.count()
  while True:
    n = count.next()
    try:
      q.put(n,block=True,timeout=10)
    except Full:
      print "Filler exiting, timeout/full"
      break
    if n >= limit:
      print "Filler exiting, reached limit - %s" % limit
      break
    sleep(0.01)

work_queue = Queue()

threads = [Thread(target=do_work, args=(work_queue,i)) for i in range(2)]
threads.insert(0,Thread(target=fill_queue,args=(work_queue,100)))

for t in threads:
  t.start()

for t in threads:
  t.join()

 Wkr 0: Consuming 0
 Wkr 1: Consuming 1
 Wkr 0: Consuming 2
 Wkr 1: Consuming 3
 ....
 Wkr 1: Consuming 99
 Filler exiting, reached limit - 100
 Wkr 0: Consuming 100
 Wkr 1 exiting, timeout/empty
 Wkr 0 exiting, timeout/empty
MattH