In this blog, we adapt the wget_fork.py program from the previous article to instead spawn threads to download multiple webpages, rather than create a new process for each download. This article uses the threading chapter of Programming Python by Mark Lutz as inspiration. The examples, though hackneyed, are my originals.
Python has two modules devoted to threading: a low-level module called thread, and a high-level module that uses thread called threading. In this first example, we use thread.start_new_thread() to spawn threads. Each thread downloads a single URL. In order to coordinate access to stdout among the threads, we create a global mutex called stdout_mutex which protects access to that shared stream. We also create a list of mutex called exit_mutexes which provide a mechanism for allowing each thread to signal that it is complete. main() can thereby monitor this list, implementing a rudimentary join scheme. mutexes were chosen for this implementation of join; a list of booleans would serve just as well.
wget_thread.py
#!/usr/bin/env python
import os
import sys
import thread
import time
import urlparse
import urllib
stdout_mutex = thread.allocate_lock()
exit_mutexes = []
def download(tid, url):
global stdout_mutex
global exit_mutexes
# craft the output filename
o = urlparse.urlparse(url)
filename = o.netloc + o.path
filename = filename.replace(os.path.sep, '__')
# print message to sdout (could use 'with stdout_mutex' instead)
stdout_mutex.acquire()
print '[*] saving %s to %s' % (url, filename)
stdout_mutex.release()
# retrieve the file
try:
urllib.urlretrieve(url, filename)
except (IOError, urllib.ContentTooShortError), e:
stdout_mutex.acquire()
print('[-] ' + str(e))
stdout_mutex.release()
# mark thread as complete
exit_mutexes[tid].acquire()
def main(argv):
global stdout_mutex
global exit_mutexes
# spawn the downloader threads
for i, url in enumerate(argv[1:]):
exit_mutexes.append(thread.allocate_lock())
thread.start_new_thread(download, (i, url))
# wait for each thread to complete
while not all(mutex.locked() for mutex in exit_mutexes): time.sleep(1)
print('main thread exiting')
if __name__ == '__main__':
main(sys.argv)
The following shows a sample output from the program.
$ ./wget_thread.py http://smherwig.org http://python.org [*] saving http://smherwig.org to smherwig.org [*] saving http://python.org to python.org main thread exiting
In this example, we code the same program using the threading module. The threading module allows you to subclass threading.Thread, thereby allowing a thread to contain state. The run() method is somewhat comparable to the callable that you would pass to thread.start_new_thread. The run method is invoked when you call the class's start() method.
With the threading module, we no longer have to implement our own join, but can instead reap threads with the threading.Thread's join() method.
wget_threading.py
#!/usr/bin/env python
__metaclass__ = type # new-style classes
import os
import sys
import threading
import urllib
import urlparse
class Downloader(threading.Thread):
def __init__(self, tid, url, stdout_mutex):
threading.Thread.__init__(self)
self.tid = tid
self.url = url
self.stdout_mutex = stdout_mutex
def run(self):
# craft the output filename
o = urlparse.urlparse(self.url)
filename = o.netloc + o.path
filename = filename.replace(os.path.sep, '__')
with self.stdout_mutex:
print('[*] saving %s to %s' % (self.url, filename))
# retrieve the file
try:
urllib.urlretrieve(self.url, filename)
except (IOError, urllib.ContentTooShortError), e:
with self.stdout_mutex:
print('[-] ' + str(e))
def main(argv):
# create mutex for stdout access (same as thread.allocate_lock())
stdout_mutex = threading.Lock()
threads = []
# spawn the downloader threads
for i, url in enumerate(argv[1:]):
thread = Downloader(i, url, stdout_mutex)
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # wait for thread exit
print('main thread exiting')
if __name__ == '__main__':
main(sys.argv)
Our last example takes advantage of the fact that threading.Thread's constructor can be passed a callable and tuple of arguments. If our thread is relatively simple, such a constructor might save us from having to write our own subclass. Notice that this use of threading.Thread is quite similar to the semantics fo thread.start_new_thread().
wget_threading2.py
#!/usr/bin/env python
import os
import sys
import threading
import urllib
import urlparse
def download(tid, url, stdout_mutex):
o = urlparse.urlparse(url)
filename = o.netloc + o.path
filename = filename.replace(os.path.sep, '__')
with stdout_mutex:
print('[*] saving %s to %s' % (url, filename))
# retrieve the file
try:
urllib.urlretrieve(url, filename)
except (IOError, urllib.ContentTooShortError), e:
with stdout_mutex:
print('[-] ' + str(e))
def main(argv):
# create mutex for stdout access (same as thread.allocate_lock())
stdout_mutex = threading.Lock()
threads = []
# spawn the downloader threads
for i, url in enumerate(argv[1:]):
thread = threading.Thread(target=download, args=(i, url, stdout_mutex))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # wait for thread exit
print('main thread exiting')
if __name__ == '__main__':
main(sys.argv)
No comments:
Post a Comment