tags:

views:

60

answers:

1

Hey everyone, I am having a little trouble debugging my code. Please look below:

import globalFunc
from globalFunc import systemPrint
from globalFunc import out
from globalFunc import debug
import math
import time
import multiprocessing

"""
Somehow this is not working well
"""
class urlServerM( multiprocessing.Process):

    """
    This calculates how much links get put into the priority queue
    so to reach the level that we intend, for every query resultset,
    we will put the a certain number of links into visitNext first,
    and even if every resultSet is full, we will be able to achieve the link
    level that we intended. The rest is pushed into another list where
    if the first set of lists don't have max for every time, the remaining will
    be spared on these links
    """
    def getPriorityCounter(self, level, constraint):
                return int( math.exp( ( math.log(constraint) / (level - 1) ) ) )


    def __init__( self, level, constraint, urlQ):
        """limit is obtained via ngCrawler.getPriorityNum"""
        multiprocessing.Process.__init__(self)
        self.constraint = int( constraint)
        self.limit = self.getPriorityCounter( level, self.constraint)
        self.visitNext = []
        self.visitLater = []
        self._count = 0
        self.urlQ = urlQ


    """
    puts the next into the Queue
    """
    def putNextIntoQ(self):
        debug('putNextIntoQ', str(self.visitNext) + str(self.visitLater) )
        if self.visitNext != []:
            _tmp = self.visitNext[0]
            self.visitNext.remove(_tmp)
            self.urlQ.put(_tmp)

        elif self.visitLater != []:
            _tmp = self.visitLater[0]
            self.visitLater.remove(_tmp)
            self.urlQ.put(_tmp)


    def run(self):
        while True:
            if self.hasNext():
                time.sleep(0.5)
                self.putNextIntoQ()
                debug('process', 'put something in Q already')
            else:
                out('process', 'Nothing in visitNext or visitLater, sleeping')
                time.sleep(2)
        return


    def hasNext(self):
        debug( 'hasnext', str(self.visitNext) + str(self.visitLater) )
        if self.visitNext != []:
            return True
        elif self.visitLater != []:
            return True
        return False


    """
    This function resets the counter 
    which is used to keep track of how much is already inside the 
    visitNext vs visitLater
    """
    def reset(self): 
        self._count = 0


    def store(self, linkS):
        """Stores a link into one of these list"""
        if self._count < self.limit:
            self.visitNext.append( linkS)
            debug('put', 'something is put inside visitNext')
        else: 
            self.visitLater.append( linkS)
            debug('put', 'something is put inside visitLater')
        self._count += 1



if __name__ == "__main__":
    #   def __init__( self, level, constraint, urlQ):

    from multiprocessing import Queue
    q = Queue(3)
    us = urlServerM( 3, 6000, q)

    us.start()
    time.sleep(2)

    # only one thread will do this
    us.store('http://www.google.com')
    debug('put', 'put completed')

    time.sleep(3)

    print q.get_nowait()
    time.sleep(3)

And this is the output

OUTPUT
DEBUG hasnext: [][]
[process]   Nothing in visitNext or visitLater, sleeping
DEBUG put: something is put inside visitNext
DEBUG put: put completed
DEBUG hasnext: [][]
[process]   Nothing in visitNext or visitLater, sleeping
DEBUG hasnext: [][]
[process]   Nothing in visitNext or visitLater, sleeping
Traceback (most recent call last):
  File "urlServerM.py", line 112, in <module>
    print q.get_nowait()
  File "/usr/lib/python2.6/multiprocessing/queues.py", line 122, in get_nowait
    return self.get(False)
  File "/usr/lib/python2.6/multiprocessing/queues.py", line 104, in get
    raise Empty
Queue.Empty
DEBUG hasnext: [][]

Apparently, I find this really weird. Well basically what this code this is that when tested in main(), it starts the process, and then it stores http://www.google.com into the the class's visitNext, and then I just want to see that being pushed into the Queue.

However, according to the output I find it extremely weird that even though my class has completed storing a url into the Class, hasNext doesn't show anything up. Any body know why? Is this the best way to write run() in a continual while loop? Is this actually necessary? I basically am trying to experiment with the module multiprocessing, and I have a pool of workers (from multiprocessing.Pool) which need to obtain these urls from this class (Single point of entry). Is using a queue the best way? Do I need to make this a "live" process since every worker is asking from the Queue and unless I have a way to signal that to my urlServer to put something into the Queue, I cannot think of a less troublesome way.

A: 

You're using multiprocessing, so the memory is not shared between main execution and your urlserver.

I.e. I think this is effectively a Noop: {us.store('http://www.google.com')} because when it's executed in the main thread, it modifies only the main threads representation of {us}. You can confirm that the url is in main thread's memory, by calling {us.hasnext()} before {q.get_nowait()}.

To make it work, you'll have to turn all lists that you want to share into Queue-s, or Pipe-s. Alternatively, you can just change your model to {threading} and it should work without changes (more or less - you'll have to do locking around the visit lists; and you get GIL issues again).

(And yeah - please edit your questions better next time. I knew what might be your problem as soon as I saw "multiprocessing", but otherwise I wouldn't bother looking at the code at all...)

viraptor