views:

365

answers:

2

I'm creating a threaded python script that has a collection of files that is put into a queue and then an unknown amount of threads (default is 3) to start downloading. When each of the threads complete it updates the stdout with the queue status and a percentage. All the files are being downloaded but the status information is wrong on the 3rd thread and I'm not sure why. I've been considering creating a work_completed queue to use for the calculation but don't think I should have to/that it would matter. Could someone point me in the right direction here?

download_queue = queue.Queue()

class Downloader(threading.Thread):
    def __init__(self,work_queue):
        super().__init__()
        self.current_job = 0
        self.work_queue = work_queue
        self.queue_size = work_queue.qsize()

    def run(self):
        while self.work_queue.qsize() > 0:
            url = self.work_queue.get(True)
            system_call = "wget -nc -q {0} -O {1}".format(url,local_file)
            os.system(system_call)
            self.current_job = int(self.queue_size) - int(self.work_queue.qsize())
            self.percent = (self.current_job / self.queue_size) * 100
            sys.stdout.flush()
            status = "\rDownloading " + url.split('/')[-1] + " [status: " + str(self.current_job) + "/" + str(self.queue_size) + ", " + str(round(self.percent,2)) + "%]"
        finally:
            self.work_queue.task_done()
def main:
    if download_queue.qsize() > 0:
        if options.active_downloads:
            active_downloads = options.active_downloads
        else:
            active_downloads = 3
        for x in range(active_downloads):
            downloader = Downloader(download_queue)
            downloader.start()
        download_queue.join()
+1  A: 

You can't check the queue size in one statement, and then .get() from the queue in the next. In the meantime the whole world may have changed. The .get() method call is the single atomic operation you need to call. If it raises Empty or blocks, the queue is empty.

Your threads can overwrite each other's output. I would have another thread with an input queue whos only job is to print the items in the queue to stdout. It can also count off the number of completed items and produce status information.

I also tend not to subclass Thread, but instead just supply a plain Thread instance with a target= parameter and .start() the thread.

based on your response, try this:

download_queue = queue.Queue()


class Downloader(threading.Thread):
    def __init__(self,work_queue, original_size):
        super().__init__()
        self.current_job = 0
        self.work_queue = work_queue
        self.queue_size = original_size

    def run(self):
        while True:
            try:
                url = self.work_queue.get(False)
                system_call = "wget -nc -q {0} -O {1}".format(url,local_file)
                os.system(system_call)
                # the following code is questionable. By the time we get here,
                #   many other items may have been taken off the queue. 
                self.current_job = int(self.queue_size) - int(self.work_queue.qsize())
                self.percent = (self.current_job / self.queue_size) * 100
                sys.stdout.flush()
                status = ("\rDownloading " + url.split('/')[-1] + 
                          " [status: " + str(self.current_job) + 
                          "/" + str(self.queue_size) + ", " + 
                          str(round(self.percent,2)) + "%]" )            
            except queue.Empty:
                pass
            finally: 
                self.work_queue.task_done()




def main:
    if download_queue.qsize() > 0:
        original_size = download_queue.qsize()
        if options.active_downloads:
            active_downloads = options.active_downloads
        else:
            active_downloads = 3
        for x in range(active_downloads):
            downloader = Downloader(download_queue, original_size)
            downloader.start()
        download_queue.join()
Joe Koberg
I'm aware that the threads will overwrite each other's output, that's fine as it's supposed to do that. I only want to show the latest file that was set to be downloaded and what number it is compared to the initial value of the queue size.Currently what's happening is that queuesize is wrong on the 3rd thread (if using defaults); it shows 2 less than the first two. For instance here is what each of the status lines look like when printed: Downloading file 1.txt [status: 1/10, 10%] Downloading file 2.txt [status: 2/10, 10%] Downloading file 3.txt [status: 3/8, 37.5%]
Yeah, by the time the 3rd worker is started, the other two have processed an item off the queue... You don't show the code that puts items into the queue in this snippet, but presumably that's where your total count should come from. Or simply store the total queue size BEFORE you start any threads, and don't read it inside a thread.
Joe Koberg
I wrote the script in Python3 and it works with the exception of a few things. Thank you for your input, I will modify my code with your suggestions when I get home later this evening.
A: 

If you'd like to use the multiprocessing module, it includes a very nice parallel imap_unordered, which would reduce your problem to the very elegant:

import multiprocessing, sys

class ParallelDownload:
    def __init__(self, urls, processcount=3):
        self.total_items = len(urls)
        self.pool = multiprocessing.Pool(processcount)
        for n, status in enumerate(self.pool.imap_unordered(self.download, urls)):
            stats = (n, self.total_items, n/self.total_items)
            sys.stdout.write(status + " [%d/%d = %0.2f %%]\n"%stats)


    def download(self, url):
        system_call = "wget -nc -q {0} -O {1}".format(url, local_file)
        os.system(system_call)
        status = "\rDownloaded " + url.split('/')[-1]
        return status
Joe Koberg