Sunday, September 23, 2012

Threading with Python

In this blog, we adapt the wget_fork.py program from the previous article to instead spawn threads to download multiple webpages, rather than create a new process for each download. This article uses the threading chapter of Programming Python by Mark Lutz as inspiration. The examples, though hackneyed, are my originals.

Python has two modules devoted to threading: a low-level module called thread, and a high-level module that uses thread called threading. In this first example, we use thread.start_new_thread() to spawn threads. Each thread downloads a single URL. In order to coordinate access to stdout among the threads, we create a global mutex called stdout_mutex which protects access to that shared stream. We also create a list of mutex called exit_mutexes which provide a mechanism for allowing each thread to signal that it is complete. main() can thereby monitor this list, implementing a rudimentary join scheme. mutexes were chosen for this implementation of join; a list of booleans would serve just as well.

wget_thread.py


#!/usr/bin/env python

import os
import sys 
import thread
import time
import urlparse
import urllib

stdout_mutex = thread.allocate_lock()
exit_mutexes = []  

def download(tid, url):
    global stdout_mutex
    global exit_mutexes

    # craft the output filename
    o = urlparse.urlparse(url)
    filename = o.netloc + o.path
    filename = filename.replace(os.path.sep, '__')
    
    # print message to sdout (could use 'with stdout_mutex' instead)
    stdout_mutex.acquire()
    print '[*] saving %s to %s' % (url, filename)
    stdout_mutex.release()

    # retrieve the file
    try:
        urllib.urlretrieve(url, filename)
    except (IOError, urllib.ContentTooShortError), e:
        stdout_mutex.acquire()
        print('[-] ' + str(e))
        stdout_mutex.release()

    # mark thread as complete
    exit_mutexes[tid].acquire()

def main(argv):
    global stdout_mutex
    global exit_mutexes
    
    # spawn the downloader threads
    for i, url in enumerate(argv[1:]):
        exit_mutexes.append(thread.allocate_lock())
        thread.start_new_thread(download, (i, url))

    # wait for each thread to complete
    while not all(mutex.locked() for mutex in exit_mutexes): time.sleep(1)
    print('main thread exiting')

if __name__ == '__main__':
    main(sys.argv)

The following shows a sample output from the program.

$ ./wget_thread.py http://smherwig.org http://python.org
[*] saving http://smherwig.org to smherwig.org
[*] saving http://python.org to python.org
main thread exiting

In this example, we code the same program using the threading module. The threading module allows you to subclass threading.Thread, thereby allowing a thread to contain state. The run() method is somewhat comparable to the callable that you would pass to thread.start_new_thread. The run method is invoked when you call the class's start() method.

With the threading module, we no longer have to implement our own join, but can instead reap threads with the threading.Thread's join() method.

wget_threading.py


#!/usr/bin/env python

__metaclass__ = type # new-style classes

import os
import sys 
import threading
import urllib
import urlparse

class Downloader(threading.Thread):
    def __init__(self, tid, url, stdout_mutex):
        threading.Thread.__init__(self)
        self.tid = tid 
        self.url = url 
        self.stdout_mutex = stdout_mutex

    def run(self):
        # craft the output filename
        o = urlparse.urlparse(self.url)
        filename = o.netloc + o.path
        filename = filename.replace(os.path.sep, '__')
        with self.stdout_mutex:
            print('[*] saving %s to %s' % (self.url, filename))
        # retrieve the file
        try: 
            urllib.urlretrieve(self.url, filename)
        except (IOError, urllib.ContentTooShortError), e:
            with self.stdout_mutex:
                print('[-] ' + str(e))

def main(argv):
    # create mutex for stdout access (same as thread.allocate_lock())
    stdout_mutex = threading.Lock()
    threads = []  
    # spawn the downloader threads
    for i, url in enumerate(argv[1:]):
        thread = Downloader(i, url, stdout_mutex)
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()   # wait for thread exit
    print('main thread exiting')

if __name__ == '__main__':
    main(sys.argv)

Our last example takes advantage of the fact that threading.Thread's constructor can be passed a callable and tuple of arguments. If our thread is relatively simple, such a constructor might save us from having to write our own subclass. Notice that this use of threading.Thread is quite similar to the semantics fo thread.start_new_thread().

wget_threading2.py


#!/usr/bin/env python

import os
import sys 
import threading
import urllib
import urlparse

def download(tid, url, stdout_mutex):
    o = urlparse.urlparse(url)
    filename = o.netloc + o.path
    filename = filename.replace(os.path.sep, '__')
    with stdout_mutex:
        print('[*] saving %s to %s' % (url, filename))
    # retrieve the file
    try: 
        urllib.urlretrieve(url, filename)
    except (IOError, urllib.ContentTooShortError), e:
        with stdout_mutex:
            print('[-] ' + str(e))

def main(argv):
    # create mutex for stdout access (same as thread.allocate_lock())
    stdout_mutex = threading.Lock()
    threads = []  
    # spawn the downloader threads
    for i, url in enumerate(argv[1:]):
        thread = threading.Thread(target=download, args=(i, url, stdout_mutex))
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()   # wait for thread exit
    print('main thread exiting')

if __name__ == '__main__':
    main(sys.argv)

No comments:

Post a Comment