i.e functions that have substantial waiting period and are non-blocking. Typically, network or file I/O can be are non-blocking.
Twisted provides facility to interleave execution of various functions, where the period of waiting can be used to do something useful for other functions that are waiting to be serviced.
+--------------------------++--------------------------+ |task1 | wait period | comp||task2 | wait period | comp| +--------------------------++--------------------------+ +--------------------------+ |task1 | wait period | comp| +--------------------------+ +--------------------------+ |task2 | wait period | comp| +--------------------------+
Particularly, the constraint "every event handler must take a very small slice of time and should return immediately" should be respected.
This constraint can be violated in at least two ways:
- You can call functions that can block (non blocking system calls). We will come back to this later.
- Functions that process large data set before they return the result. Simplest of the cases is when we deal with "for" loop that runs over a collection of items.
If the collection of items is large, then it may take a long time before each one is processed.
Twisted framework provides a way where the functions loop over large collection can work cooperatively with other functions by yielding time for others to get executed along with ongoing loop execution.
twisted.internet.task provides two methods - "cooperate" and "coiterate" to allow cooperative interleaving of the various functions that loop over large collections.
from twisted.internet import reactor from twisted.python import log import sys, time from string import ascii_lowercase from twisted.internet.task import cooperate, coiterate log.startLogging(sys.stdout)
The iterator objects themselves are required to support the following two methods,
which together form the iterator protocol - __iter__ and __next__
class Counter(object): def __init__(self, start, end): self.iterVal = start self.end = end def __iter__(self): return self def next(self): if self.iterVal > self.end: raise StopIteration else: self.iterVal += 1 return self.iterVal - 1 class AlphaCounter(object): def __init__(self, start, end): assert(str(start) in ascii_lowercase) assert(str(end) in ascii_lowercase) assert(ord(str(start)) < ord(str(end))) self.iterVal = ord(str(start)) self.end = ord(str(end)) def __iter__(self): return self def next(self): if self.iterVal > self.end: raise StopIteration else: self.iterVal += 1 return chr(self.iterVal - 1)The following function results in calling the loop on counterObj completely before executing the other loop printing certain alphabets.
This is contrived example but you see that there is no parallelism in this execution.
counterObj = Counter(1,11) alphaCounterObj = AlphaCounter('d', 'p') def simplyPrint(data): print data def loopFunction(iterObj): for obj in iterObj: simplyPrint(obj) reactor.callWhenRunning(loopFunction, counterObj) reactor.callWhenRunning(loopFunction, alphaCounterObj)This will run the two loops sequentially :
1 2 3 4 ... 11 d e f g h .. pNow, if we use the cooperate facility provided in task.py, the output will be interleaved among the various loops that are executed. There is certain amount of parallelism achieved through this model.
def loopCooperatively(iterObj): def copIterate(): for obj in iterObj: simplyPrint(obj) yield None return cooperate(copIterate()) task2 = loopCooperatively(AlphaCounter('k', 'z')) task2_d = task2.whenDone() task2.pause() task2.resume() def simpleIterate(iterObj): for obj in iterObj: simplyPrint(obj) yield None task3 = coiterate(simpleIterate(Counter(100,111))) reactor.run()
Here, both the loops are advanced in parallel:
k 100 l 101 m 102 ...It may not be a good idea to wrap every "for" loop this way but you may want to look at implementing some amount of loop parallelism in your code.
No comments:
Post a Comment