In this blog, we adapt the wget_fork.py program from the previous article to instead spawn threads to download multiple webpages, rather than create a new process for each download. This article uses the threading chapter of Programming Python by Mark Lutz as inspiration. The examples, though hackneyed, are my originals.
Python has two modules devoted to threading: a low-level module called thread, and a high-level module that uses thread called threading. In this first example, we use thread.start_new_thread() to spawn threads. Each thread downloads a single URL. In order to coordinate access to stdout among the threads, we create a global mutex called stdout_mutex which protects access to that shared stream. We also create a list of mutex called exit_mutexes which provide a mechanism for allowing each thread to signal that it is complete. main() can thereby monitor this list, implementing a rudimentary join scheme. mutexes were chosen for this implementation of join; a list of booleans would serve just as well.
wget_thread.py
#!/usr/bin/env python import os import sys import thread import time import urlparse import urllib stdout_mutex = thread.allocate_lock() exit_mutexes = [] def download(tid, url): global stdout_mutex global exit_mutexes # craft the output filename o = urlparse.urlparse(url) filename = o.netloc + o.path filename = filename.replace(os.path.sep, '__') # print message to sdout (could use 'with stdout_mutex' instead) stdout_mutex.acquire() print '[*] saving %s to %s' % (url, filename) stdout_mutex.release() # retrieve the file try: urllib.urlretrieve(url, filename) except (IOError, urllib.ContentTooShortError), e: stdout_mutex.acquire() print('[-] ' + str(e)) stdout_mutex.release() # mark thread as complete exit_mutexes[tid].acquire() def main(argv): global stdout_mutex global exit_mutexes # spawn the downloader threads for i, url in enumerate(argv[1:]): exit_mutexes.append(thread.allocate_lock()) thread.start_new_thread(download, (i, url)) # wait for each thread to complete while not all(mutex.locked() for mutex in exit_mutexes): time.sleep(1) print('main thread exiting') if __name__ == '__main__': main(sys.argv)
The following shows a sample output from the program.
$ ./wget_thread.py http://smherwig.org http://python.org [*] saving http://smherwig.org to smherwig.org [*] saving http://python.org to python.org main thread exiting
In this example, we code the same program using the threading module. The threading module allows you to subclass threading.Thread, thereby allowing a thread to contain state. The run() method is somewhat comparable to the callable that you would pass to thread.start_new_thread. The run method is invoked when you call the class's start() method.
With the threading module, we no longer have to implement our own join, but can instead reap threads with the threading.Thread's join() method.
wget_threading.py
#!/usr/bin/env python __metaclass__ = type # new-style classes import os import sys import threading import urllib import urlparse class Downloader(threading.Thread): def __init__(self, tid, url, stdout_mutex): threading.Thread.__init__(self) self.tid = tid self.url = url self.stdout_mutex = stdout_mutex def run(self): # craft the output filename o = urlparse.urlparse(self.url) filename = o.netloc + o.path filename = filename.replace(os.path.sep, '__') with self.stdout_mutex: print('[*] saving %s to %s' % (self.url, filename)) # retrieve the file try: urllib.urlretrieve(self.url, filename) except (IOError, urllib.ContentTooShortError), e: with self.stdout_mutex: print('[-] ' + str(e)) def main(argv): # create mutex for stdout access (same as thread.allocate_lock()) stdout_mutex = threading.Lock() threads = [] # spawn the downloader threads for i, url in enumerate(argv[1:]): thread = Downloader(i, url, stdout_mutex) threads.append(thread) thread.start() for thread in threads: thread.join() # wait for thread exit print('main thread exiting') if __name__ == '__main__': main(sys.argv)
Our last example takes advantage of the fact that threading.Thread's constructor can be passed a callable and tuple of arguments. If our thread is relatively simple, such a constructor might save us from having to write our own subclass. Notice that this use of threading.Thread is quite similar to the semantics fo thread.start_new_thread().
wget_threading2.py
#!/usr/bin/env python import os import sys import threading import urllib import urlparse def download(tid, url, stdout_mutex): o = urlparse.urlparse(url) filename = o.netloc + o.path filename = filename.replace(os.path.sep, '__') with stdout_mutex: print('[*] saving %s to %s' % (url, filename)) # retrieve the file try: urllib.urlretrieve(url, filename) except (IOError, urllib.ContentTooShortError), e: with stdout_mutex: print('[-] ' + str(e)) def main(argv): # create mutex for stdout access (same as thread.allocate_lock()) stdout_mutex = threading.Lock() threads = [] # spawn the downloader threads for i, url in enumerate(argv[1:]): thread = threading.Thread(target=download, args=(i, url, stdout_mutex)) threads.append(thread) thread.start() for thread in threads: thread.join() # wait for thread exit print('main thread exiting') if __name__ == '__main__': main(sys.argv)
No comments:
Post a Comment