Friday, September 28, 2012

Searching Files with bash, find, and grep

I often find myself in the situation where I am working in a deeply nested project, and need to search all of the source files for a keyword. One solution is to pipe together the find and grep commands. For instance, one could invoke

find . -type f -name "*.c" -print | xargs grep -rn printf
to find all occurrences of the printf function. For ease of use, I decided to wrap this functionality in a bash script.

findgrep.sh


#!/bin/sh

usage() {
    prog=`basename $0`
    cat << EOF

    usage $prog [-d ROOT_DIR] [-s] FILE KEYWORD

    Options:
        -d ROOT_DIR     root directory to start the search from
        -s              case sensentive search     

    Search for all files under ROOTDIR directory with a name matching
    FILE pattern.  Among the files found, grep for the KEYWORD pattern.

    Examples:

        # search for C source files under /home/joe that mention 'socket'
        $prog -d /home/joe "*.c" socket

        # search for use of 'synchronized' keyword in all Java source files
        # under the current working directory
        $prog -s "*.java" synchronized 

EOF
}


findgrep() {
    if [[ $4 -eq 0 ]]
    then
        grep_args="-in"
    else
        grep_args="-n"
    fi
    find "$1" -type f -name "$2" -print | xargs grep $grep_args "$3"
}

root_dir=`pwd`
case_sensitive=0

while getopts ":hd:s" option
do
    case $option in
    h)
        usage
        exit 1
        ;;
    d)
        rood_dir=$OPTARG
        ;;
    s)
        case_sensitive=1
        ;;
    \?)
        echo "invalid option: -$OPTARG" >&2
        exit
        ;;
    esac
done


# shift command line arguments so that first positional argument is $1
shift $((OPTIND-1))

if [[ $# -ne 2 ]]
 then
    usage
    exit 1
fi

findgrep $root_dir $1 $2 $case_sensitive                                                             
Example:
$ pwd
/Users/smherwig/github
$ ls
lua-daemon  lua-mnt     lua-proc    pytk-editor
$ findgrep.sh -s "*.c" statvfs
/Users/smherwig/github/lua-mnt/src/mnt.c:7:#include <sys/statvfs.h>;
/Users/smherwig/github/lua-mnt/src/mnt.c:154:    struct statvfs  sbuf;
/Users/smherwig/github/lua-mnt/src/mnt.c:157:    ret = statvfs(path, &sbuf);

Thursday, September 27, 2012

Producer-Consumer Model with Python

In this article, we investigate the producer-consumer model in Python. The producer-consumer model describes a situation where one or more players (the producers) are responsible for placing data into a buffer, and one or more players (the consumers) are responsible for removing data from that buffer. The chief hurdles to this arrangement are
  1. Ensuring the producer(s) do not add more data than the buffer can hold.
  2. Ensuring the consumers(s) do not take from an empty buffer,
  3. Ensuring the activities of all players are synchronized.

In Python, the combined use of the Queue and threading modules provides an elegant way to approach the problem that abstracts away many of the details. In particular, the Queue.Queue datatype ensures synchronized access to its data, freeing the user from having to use a locking scheme.

The program below is slightly redudant in that we specify the consumer threads as daemons, but then indirectly wait for them to complete. I will explain this is a second. First, let's talk about the purpose of daemon threads.

The Python documentation states that, "The entire Python program exits when no alive non-daemon threads are left. A clearer phrasing is, "The program exits when all non-daemon threads have completed." By default, all threads (including the 'main' thread) are non-daemon. By specifying that the consumers are daemon, we are saying that the program can terminate even though the consumers have not completed. In many programs (think GUIs) this is the desired behavior. We, however, also delay the 'main' thread from exiting until the producers have stopped and the queue is empty, which effectively waits for the consumers to complete.

The example program below lets the user spawn an arbitrary number of producer and consumer threads. Each producer threads generates a random pair of integers and places the pair in a queue. The consumers remove the pairs from the queue and compute their greatest common divisor.

gcd_processor.py


#!/usr/bin/env python

from optparse import OptionParser
import Queue
import random
import sys
import threading
import time

data_queue = Queue.Queue()
stdout_mutex = threading.Lock()

def gcd(a, b):
    while b != 0:
        a, b = b, a % b
    return a

def consumer(idnum):
    while True:
        try:
            data = data_queue.get(block=False)
        except Queue.Empty:
            pass
        else:
             with stdout_mutex:
                print('\tconsumer %d: computed gcd(%d, %d) = %d' % \
                        (idnum, data[0], data[1], gcd(data[0], data[1])))
        time.sleep(0.1)

def producer(idnum, count):
    for i in range(count):
        a, b  = random.randint(1, sys.maxint), random.randint(1, sys.maxint)
        with stdout_mutex:
            print('\tproducer %d: generated (%d, %d)' % (idnum, a, b))
        data_queue.put((a, b))
        time.sleep(0.1)


if __name__ == '__main__':
    usage = '''usage: %prog [options]
   
Demonstrate producer-consumer model.

Each producer generates a sequence of integer pairs.  The integer pair is
placed on a global queue.  Each consumer removes pairs from the queue and
computes the greatest common divisor for that pair.
'''
    version = '%prog 1.0'
    parser = OptionParser(usage=usage, version=version)
    parser.add_option('-c', '--num-consumers', dest='num_consumers', 
            type='int', default=2, 
            help='number of consumer threads')
    parser.add_option('-p', '--num-producers', dest='num_producers',
            type='int', default=4,
            help='number of producer threads')
    parser.add_option('-n', '--num-integer-pairs', dest='num_integer_pairs',
            type='int', default=10,
            help='the number of integer pairs each producer generates')
    (options, args) = parser.parse_args()

    print('[*] beginning main thread')
    for i in range(options.num_consumers):
        t = threading.Thread(target=consumer, args=(i, ))
        t.daemon = True
        t.start()

    joinable = []
    for i in range(options.num_producers):
        t = threading.Thread(target=producer,
                args=(i, options.num_integer_pairs));
        joinable.append(t)
        t.start()

    # wait for all of the producer threads to finish
    for thread in joinable: thread.join()
    with stdout_mutex:
        print('[*] all producer threads finished')

    # wait for all of the consumer threads to finish
    while not data_queue.empty(): pass
    with stdout_mutex:
        print('[*] all consumer threads finished')
    with stdout_mutex:
        print('[*] main thread exited')

Below is a sample run of the program with two consumers and three producers, where each producer produces four integer pairs.

$ ./gcd_processor.py -c 2 -p 3 -n 4
[*] beginning main thread
 producer 0: generated (8650083748717510955, 7042817999880859144)
 producer 1: generated (8411030783605146193, 5649382909553114176)
 producer 2: generated (7598317520065638838, 1583073951150684026)
 consumer 1: computed gcd(8650083748717510955, 7042817999880859144) = 1
 producer 2: generated (9132283067627759945, 5550586898968097650)
 consumer 0: computed gcd(8411030783605146193, 5649382909553114176) = 1
 producer 0: generated (941876034000530679, 2254827834336877783)
 producer 1: generated (3831435553181605295, 6532409849491920585)
 producer 1: generated (89084273097138564, 7529330907292728475)
 producer 0: generated (810343105420840002, 7098464443848996296)
 producer 2: generated (1459983564619067730, 7336893082805612268)
 consumer 0: computed gcd(7598317520065638838, 1583073951150684026) = 2
 consumer 1: computed gcd(9132283067627759945, 5550586898968097650) = 5
 producer 0: generated (5748866744760111634, 4998219244374494451)
 consumer 0: computed gcd(941876034000530679, 2254827834336877783) = 1
 producer 1: generated (8257193796991062027, 218310341014437559)
 consumer 1: computed gcd(3831435553181605295, 6532409849491920585) = 5
 producer 2: generated (4387473705826720899, 3521353578419199591)
 consumer 0: computed gcd(89084273097138564, 7529330907292728475) = 1
 consumer 1: computed gcd(810343105420840002, 7098464443848996296) = 2
[*] all producer threads finished
 consumer 0: computed gcd(1459983564619067730, 7336893082805612268) = 6
 consumer 1: computed gcd(5748866744760111634, 4998219244374494451) = 1
 consumer 1: computed gcd(8257193796991062027, 218310341014437559) = 1
 consumer 0: computed gcd(4387473705826720899, 3521353578419199591) = 9
[*] all consumer threads finished
[*] main thread exited

Sunday, September 23, 2012

Threading with Python

In this blog, we adapt the wget_fork.py program from the previous article to instead spawn threads to download multiple webpages, rather than create a new process for each download. This article uses the threading chapter of Programming Python by Mark Lutz as inspiration. The examples, though hackneyed, are my originals.

Python has two modules devoted to threading: a low-level module called thread, and a high-level module that uses thread called threading. In this first example, we use thread.start_new_thread() to spawn threads. Each thread downloads a single URL. In order to coordinate access to stdout among the threads, we create a global mutex called stdout_mutex which protects access to that shared stream. We also create a list of mutex called exit_mutexes which provide a mechanism for allowing each thread to signal that it is complete. main() can thereby monitor this list, implementing a rudimentary join scheme. mutexes were chosen for this implementation of join; a list of booleans would serve just as well.

wget_thread.py


#!/usr/bin/env python

import os
import sys 
import thread
import time
import urlparse
import urllib

stdout_mutex = thread.allocate_lock()
exit_mutexes = []  

def download(tid, url):
    global stdout_mutex
    global exit_mutexes

    # craft the output filename
    o = urlparse.urlparse(url)
    filename = o.netloc + o.path
    filename = filename.replace(os.path.sep, '__')
    
    # print message to sdout (could use 'with stdout_mutex' instead)
    stdout_mutex.acquire()
    print '[*] saving %s to %s' % (url, filename)
    stdout_mutex.release()

    # retrieve the file
    try:
        urllib.urlretrieve(url, filename)
    except (IOError, urllib.ContentTooShortError), e:
        stdout_mutex.acquire()
        print('[-] ' + str(e))
        stdout_mutex.release()

    # mark thread as complete
    exit_mutexes[tid].acquire()

def main(argv):
    global stdout_mutex
    global exit_mutexes
    
    # spawn the downloader threads
    for i, url in enumerate(argv[1:]):
        exit_mutexes.append(thread.allocate_lock())
        thread.start_new_thread(download, (i, url))

    # wait for each thread to complete
    while not all(mutex.locked() for mutex in exit_mutexes): time.sleep(1)
    print('main thread exiting')

if __name__ == '__main__':
    main(sys.argv)

The following shows a sample output from the program.

$ ./wget_thread.py http://smherwig.org http://python.org
[*] saving http://smherwig.org to smherwig.org
[*] saving http://python.org to python.org
main thread exiting

In this example, we code the same program using the threading module. The threading module allows you to subclass threading.Thread, thereby allowing a thread to contain state. The run() method is somewhat comparable to the callable that you would pass to thread.start_new_thread. The run method is invoked when you call the class's start() method.

With the threading module, we no longer have to implement our own join, but can instead reap threads with the threading.Thread's join() method.

wget_threading.py


#!/usr/bin/env python

__metaclass__ = type # new-style classes

import os
import sys 
import threading
import urllib
import urlparse

class Downloader(threading.Thread):
    def __init__(self, tid, url, stdout_mutex):
        threading.Thread.__init__(self)
        self.tid = tid 
        self.url = url 
        self.stdout_mutex = stdout_mutex

    def run(self):
        # craft the output filename
        o = urlparse.urlparse(self.url)
        filename = o.netloc + o.path
        filename = filename.replace(os.path.sep, '__')
        with self.stdout_mutex:
            print('[*] saving %s to %s' % (self.url, filename))
        # retrieve the file
        try: 
            urllib.urlretrieve(self.url, filename)
        except (IOError, urllib.ContentTooShortError), e:
            with self.stdout_mutex:
                print('[-] ' + str(e))

def main(argv):
    # create mutex for stdout access (same as thread.allocate_lock())
    stdout_mutex = threading.Lock()
    threads = []  
    # spawn the downloader threads
    for i, url in enumerate(argv[1:]):
        thread = Downloader(i, url, stdout_mutex)
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()   # wait for thread exit
    print('main thread exiting')

if __name__ == '__main__':
    main(sys.argv)

Our last example takes advantage of the fact that threading.Thread's constructor can be passed a callable and tuple of arguments. If our thread is relatively simple, such a constructor might save us from having to write our own subclass. Notice that this use of threading.Thread is quite similar to the semantics fo thread.start_new_thread().

wget_threading2.py


#!/usr/bin/env python

import os
import sys 
import threading
import urllib
import urlparse

def download(tid, url, stdout_mutex):
    o = urlparse.urlparse(url)
    filename = o.netloc + o.path
    filename = filename.replace(os.path.sep, '__')
    with stdout_mutex:
        print('[*] saving %s to %s' % (url, filename))
    # retrieve the file
    try: 
        urllib.urlretrieve(url, filename)
    except (IOError, urllib.ContentTooShortError), e:
        with stdout_mutex:
            print('[-] ' + str(e))

def main(argv):
    # create mutex for stdout access (same as thread.allocate_lock())
    stdout_mutex = threading.Lock()
    threads = []  
    # spawn the downloader threads
    for i, url in enumerate(argv[1:]):
        thread = threading.Thread(target=download, args=(i, url, stdout_mutex))
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()   # wait for thread exit
    print('main thread exiting')

if __name__ == '__main__':
    main(sys.argv)

Thursday, September 20, 2012

Fork Exec Wait with Python

The following program demonstrates the use of os.fork() and os.wait() in Python. The program forks several processes, each of which counts to a number. The parent process waits for each child process to end, and prints that process's return value.

#!/usr/bin/env python

from optparse import OptionParser
import os
import sys 
import time

def worker(count):
    for i in range(count):
        time.sleep(1)
        print '[%s] => %s' % (os.getpid(), i)

def boss(num_workers, count):
    child_pids = []
    for i in range(num_workers):
        pid = os.fork()
        if pid == 0:
            worker(count)
            os._exit(i)
        else:
            child_pids.append(pid)
    for pid in child_pids:
        pid, status = os.waitpid(pid, 0)
        if os.WIFEXITED(status):
            print 'parent: child with pid %d exited with value %d' % \ 
                (pid, os.WEXITSTATUS(status))

if __name__ == '__main__':
    usage = '''usage: %prog [options]
   
Demonstrate fork and wait system calls.'''
    version = '%prog 1.0'
    parser = OptionParser(usage=usage, version=version)
    parser.add_option('-n', '--num-workers', dest='num_workers', type='int',
            default=5, 
            help='number of child worker processes to fork (default=5)')
    parser.add_option('-c', '--toil-count', dest='toil_count', type='int', 
            default=10,
            help='the number each worker process must count to (default=10)')
    (options, args) = parser.parse_args()
    boss(options.num_workers, options.toil_count)

A sample output of running the program with seven worker processes, each of which counts to three, is shown below.

$ ./forkwait.py -n 7 -c 3
[692] => 0
[695] => 0
[694] => 0
[693] => 0
[697] => 0
[696] => 0
[698] => 0
[695] => 1
[692] => 1
[693] => 1
[697] => 1
[698] => 1
[694] => 1
[696] => 1
[695] => 2
[692] => 2
[697] => 2
[693] => 2
[698] => 2
[694] => 2
[696] => 2
parent: child with pid 692 exited with value 0
parent: child with pid 693 exited with value 1
parent: child with pid 694 exited with value 2
parent: child with pid 695 exited with value 3
parent: child with pid 696 exited with value 4
parent: child with pid 697 exited with value 5
parent: child with pid 698 exited with value 6

Our next program is a bit more interesting. In this program, the user species a list of webpages to download. For each webpage, the program forks a new process and uses os.execlp() to execute wget.

#!/usr/bin/env python

from optparse import OptionParser
import os
import sys 
import time
import urlparse

def wget(webpages):
    child_pids = []
    for webpage in webpages:
        pid = os.fork()
        if pid == 0:
            # generate output name to prevent many duplicate index.html's
            o = urlparse.urlparse(webpage)
            output = o.netloc + o.path
            output = output.replace(os.path.sep, '__')
            print '[*] saving %s to %s' % (webpage, output)
            os.execlp('wget', 'wget', '--quiet', '--output-document', output,
                    webpage)
            assert False, 'error starting wget'
        else:
            child_pids.append(pid)
    for pid in child_pids:
        pid, status = os.waitpid(pid, 0)
        if os.WIFEXITED(status):
            print 'parent: child with pid %d exited with value %d' % \ 
                (pid, os.WEXITSTATUS(status))

if __name__ == '__main__':
    usage = '''usage: %prog [url ...]
  
Retrieve one or more webpages.
'''
    version = '%prog 1.0'
    parser = OptionParser(usage=usage, version=version)
    (options, args) = parser.parse_args()
    wget(args)

In the sample run below, we attempt to download the homepage's of smherwig.org, lua.org, and the nonexistent, invalid URL, foo.

$ ./forkwget.py http://smherwig.org/index.html http://lua.org foo
[*] saving http://smherwig.org/index.html to smherwig.org__index.html
[*] saving http://lua.org to lua.org
[*] saving foo to foo
parent: child with pid 1153 exited with value 0
parent: child with pid 1154 exited with value 0
parent: child with pid 1155 exited with value 4