Tuesday, August 24, 2010

Learning Twisted (part 4) : Cooperative multitasking

EventLoop dispatches the events to be handled by eventHandlers. If an eventHandler plays fraud and do not return immediately, event or reactor loop can not service other events. This means that all our event handlers need to return quickly for this model to work.
i.e functions that have substantial waiting period and are non-blocking. Typically, network or file I/O can be are non-blocking.

Twisted provides facility to interleave execution of various functions, where the period of waiting can be used to do something useful for other functions that are waiting to be serviced.

+--------------------------++--------------------------+
|task1 | wait period | comp||task2 | wait period | comp|
+--------------------------++--------------------------+
+--------------------------+
|task1 | wait period | comp|
+--------------------------+
       +--------------------------+
       |task2 | wait period | comp|
       +--------------------------+




Particularly, the constraint "every event handler must take a very small slice of time and should return immediately" should be respected.
This constraint can be violated in at least two ways:


  1. You can call functions that can block (non blocking system calls). We will come back to this later.
  2. Functions that process large data set before they return the result. Simplest of the cases is when we deal with "for" loop that runs over a collection of items.

If the collection of items is large, then it may take a long time before each one is processed.

Twisted framework provides a way where the functions loop over large collection can work cooperatively with other functions by yielding time for others to get executed along with ongoing loop execution.

twisted.internet.task provides two methods - "cooperate" and "coiterate" to allow cooperative interleaving of the various functions that loop over large collections.

from twisted.internet import reactor
from twisted.python import log
import sys, time
from string import ascii_lowercase
from twisted.internet.task import cooperate, coiterate
log.startLogging(sys.stdout) 

The iterator objects themselves are required to support the following two methods,
which together form the iterator protocol - __iter__ and __next__

class Counter(object):
    
    def __init__(self, start, end):
        self.iterVal = start
        self.end = end
        
    def __iter__(self):
        return self

    def next(self):
        if self.iterVal > self.end:
            raise StopIteration
        else:
            self.iterVal += 1
            return self.iterVal - 1
            
            
class AlphaCounter(object):
    
    def __init__(self, start, end):
        assert(str(start) in ascii_lowercase)
        assert(str(end) in ascii_lowercase)
        assert(ord(str(start)) < ord(str(end)))
        self.iterVal = ord(str(start))
        self.end = ord(str(end))
        
    def __iter__(self):
        return self

    def next(self):
        if self.iterVal > self.end:
            raise StopIteration
        else:
            self.iterVal += 1
            return chr(self.iterVal - 1)
            
The following function results in calling the loop on counterObj completely before executing the other loop printing certain alphabets.
This is contrived example but you see that there is no parallelism in this execution.

counterObj = Counter(1,11)  
alphaCounterObj = AlphaCounter('d', 'p')

def simplyPrint(data):
    print data
            
def loopFunction(iterObj):
    for obj in iterObj:
        simplyPrint(obj)  
        
reactor.callWhenRunning(loopFunction, counterObj)
reactor.callWhenRunning(loopFunction, alphaCounterObj) 
This will run the two loops sequentially :
1 2 3 4 ... 11 d e f g h .. p
Now, if we use the cooperate facility provided in task.py, the output will be interleaved among the various loops that are executed. There is certain amount of parallelism achieved through this model.
def loopCooperatively(iterObj):
    
    def copIterate():
        for obj in iterObj:
            simplyPrint(obj) 
            yield None
            
    return cooperate(copIterate())
        
task2 = loopCooperatively(AlphaCounter('k', 'z'))
task2_d = task2.whenDone()
task2.pause()
task2.resume()

def simpleIterate(iterObj):
        for obj in iterObj:
            simplyPrint(obj) 
            yield None

task3 =  coiterate(simpleIterate(Counter(100,111)))

reactor.run()

Here, both the loops are advanced in parallel:
k 100 l 101 m 102 ...
It may not be a good idea to wrap every "for" loop this way but you may want to look at implementing some amount of loop parallelism in your code.

No comments:

Post a Comment