Julien Palard

Python Developer and Trainer

Python: Introducing ppipe : Parallel Pipe

/!\ this was highly experimental code written in 2011. Today you should NOT use it, just look at it if the subject amuses you. Better take a look at asyncio if you want this kind of tools.

I'll speak about my pipe python module so if you didn't know it, you should first read the first aricle about pipes. The idea behind ppipe (parallel pipe) is to transparently make an asynchronous pipe, multithreaded or not. As multithreading isn't an easy piece of code for everybody, with loads of pollutions like locks, queues, giving code far away from the actual simple task you tried to do... The idea is that one kind of multithreading can be nicely handled with a simple design pattern well implemented in python: the queue. The queue handles all the locking part so you don't have to worry about it, just make your workers work, enqueue, dequeue, work ... but you still have to create workers! As pipe is not far away from the concept of queue, as, every part of a pipe command works on a piece of data and then git it to the next worker, it's not hard to imagine an asynchronous pipe in which every part of a pipe command can work at the same time. Then it's not hard to imagine that n threads can be started for a single step of a pipe command, leading to a completly multithreaded application having \~0 lines of code bloat for the tread generation / synchronization in your actual code. So I tried to implement it, keeping the actual contract which is very simple that is : Every pipe should take an iterable as input. (I was tempted to change it to 'every pipe must take a Queue as input' ... but if I don't change the contract, normal pipes and parallel pipes should be mixed.), so I created a branch you'll found on github with a single new file 'ppipe.py' that, actually, is not 'importable' it's only a proof of concept, that can be launched. Here is the test I wrote using ppipe :

print "Normal execution :"
xrange(4) | where(fat_big_condition1) \
          | where(fat_big_condition2) \
          | add | lineout

print "Parallel with 1 worker"
xrange(4) | parallel_where(fat_big_condition1) \
          | where(fat_big_condition2) \
          | add | lineout

print "Parallel with 2 workers"
xrange(4) | parallel_where(fat_big_condition1, qte_of_workers=2) \
          | parallel_where(fat_big_condition2, qte_of_workers=2) | add | stdout

print "Parallel with 4 workers"
xrange(4) | parallel_where(fat_big_condition1, qte_of_workers=4) \
          | parallel_where(fat_big_condition2, qte_of_workers=4) | add | stdout

The idea is to compare normal pipe (Normal execution) with asynchronous pipe (Parallel with 1 worker), as 1 worker is the default, and then 2 and 4 workers that can be given to a ppipe using 'qte_of_workers='. fat_big_condition1 and 2 are just f*cking long running piece of code like fetching something far far away in the internet ... but for our tests, let's use time.sleep:

def fat_big_condition1(x):
    log(1, "Working...")
    time.sleep(2)
    log(1, "Done !")
    return 1

def fat_big_condition2(x):
    log(2, "Working...")
    time.sleep(2)
    log(2, "Done !")
    return 1

They always return 1... and they log using a simple log function that make fat_big_condition1 to log in the left column and fat_big_condition2 to log in the right column:

stdoutlock = Lock()
def log(column, text):
    stdoutlock.acquire()
    print ' ' * column * 10,
    print str(datetime.now().time().strftime("%S")),
    print text
    stdoutlock.release()

And that is the output (integers are the current second, so the times didn't start at 0...):

Normal execution :
           57 Working...
           59 Done !
                     59 Working...
                     01 Done !
           01 Working...
           03 Done !
                     03 Working...
                     05 Done !
           05 Working...
           07 Done !
                     07 Working...
                     09 Done !
           09 Working...
           11 Done !
                     11 Working...
                     13 Done !

// As you can see here, only one condition is executed at a time,
// that is a normal behavior for a non-threaded program.

Parallel with 1 worker
           13 Working...
           15 Done !
                     15 Working...
           15 Working...
                     17 Done !
           17 Done !
           17 Working...
                     17 Working...
                     19 Done !
           19 Done !
           19 Working...
                     19 Working...
                     21 Done !
           21 Done !
                     21 Working...
                     23 Done !

// Just adding parallel_ to the first where, you now see that it's
// asynchronous and that the two conditions can work at the
// same time, interlacing a bit the output.

Parallel with 2 workers
           23 Working...
           23 Working...
           25 Done !
           25 Working...
           25 Done !
           25 Working...
                     25 Working...
                     25 Working...
           27 Done !
           27 Done !
                     27 Done !
                     27 Working...
                     27 Done !
                     27 Working...
                     29 Done !
                     29 Done !


Parallel with 4 workers
           55 Working...
           55 Working...
           55 Working...
           55 Working...
           57 Done !
           57 Done !
           57 Done !
           57 Done !
                     57 Working...
                     57 Working...
                     57 Working...
                     57 Working...
                     59 Done !
                     59 Done !
                     59 Done !
                     59 Done !

// And now with 2 and 4 workers you can clearly see what
// happens, with 2 workers, input is computed by pairs,
// and with 4 threads, all the input can be computed at once
// but the 4 workers of the 2nd condition have to wait the data
// before starting to work, so in the last test, you have 8 threads,
// only the 4 firsts are working the 2 first second, then only the 4
// others works.

To make the creation of ppipe simple, I excluded all the 'threading' part in a function usable as a decorator, so writing a parallel_where give :

@Pipe
@Threaded
def parallel_where(item, output, condition):
    if condition(item):
        output.put(item)

You can see the queue here! :-) Enjoy!