Friday, August 20, 2010

Learning Twisted (Part 2): Async IO

  +---------------+
  |ILoggingContext|......... logPrefix
  +---------------+
  +---------------+
  |IFileDescriptor|'''''''''''  fileno
  +-------+-------+'''''''''''  connectionLost
        | |  +---------------+.......doRead
        | +--|IReadDescriptor|.....__init__ : open the descriptor or
        |    +---------------+                  
       +----------------+..|.......__init__ : open a connection
       |IWriteDescriptor|..|.........doWrite
       +--------.-------+  |
                |          |
         addWriter        addReader
               +-------------+
               |IReactorFDSet|
               +-------------+


Well, this will become clear soon.

We know that the core of twisted is reactor loop that listens for events and dispatches those events to "event handlers" (Event based computing paradigm). We also learnt that how to generate custom events and register methods as "event handlers" for the custom events and some special events.

Various frameworks are available within twisted that provide facility to register events, add & remove the producer and consumer of these events. These framework usually follow certain well known paradigms. We will follow through some of those implementations in a short while as we continue exploring the default reactor "SelectReactor".

Non-blocking synchronisation shows better performance in certain application than blocking synchronization. Select based Non-blocking I/O allows us to implement this paradigm and is supported by SelectReactor.


What it does is, provide you capability to add & remove reader, writers and provide facility for non-blocking I/O between these group of readers and writers by passing on the read / write events to appropriate handlers supported by these reader and writers. Of course, that means that these reader and writers should return a file descriptor like objects on which SelectReactor can call "Select" operation. Apart from that these reader , writers must implement functions which can act as event handlers, so that the SelectReactor can invoke them. These interface requirements are documented using zope.interface.
So some of the interfaces of interest to us at the moment are : IReactorFDSet (Which is implemented by SelectReactor), IFileDescriptor, IReadDescriptor, IWriteDescriptor, IReadWriteDescriptor and ILoggingContext.

These together allow us implement non-blocking I/O using twisted event loop system. I am not sure, if we could term this capability as async I/O.

To start with, we import every thing

from zope.interface import implements
from twisted.internet.interfaces import (
                                          ILoggingContext, 
                                          IFileDescriptor, 
                                          IReadDescriptor, 
                                          IWriteDescriptor, 
                                          IReadWriteDescriptor
                                         )
                                         
import sys, os, fcntl, time

from twisted.internet import reactor
from twisted.python import log
log.startLogging(sys.stdout) 

# Global data where we store what we read from std input and use it to write to a file
dataToWrite = ""    

There seems to be an approach to identify the logger of information via the context and is required as per the interface definition.
The interface implementation pattern is popularly called "duck typing", which basically means that a class should implements the methods mentioned in the Interface class.
This is what we will be doing in the following code. It would be good idea to lookup the interface definition in twisted source code to understand the implementation better.

# Implementing ILoggingContext

class LogContext(object):
    implements(ILoggingContext)
    
    def __init__(self, prefix=''):
        self.prefix = prefix
    
    def logPrefix(self):
        return self.prefix

Here we implement a file descriptor class that provides two functionalities 1) method to return file descriptor on which select can be called by the reactor and second a connection error handler, which is called when an error occurs.

class SampleFileDescriptor(LogContext):
    implements(IFileDescriptor)
    
    def __init__(self, prefix, fd):
        self.fd = fd
        super(SampleFileDescriptor, self).__init__(prefix)

        
    def fileno(self):
        return self.fd
        
    def connectionLost(self,reason):
        # We may want to close descriptors 
        log.msg("Closing connection : reason : %s" % reason)
        self.closeDesciptor()

The following implements separate reader and writer which provides event handlers for when there is something to read or write.

class SampleReadDescriptor(SampleFileDescriptor):
    
    def __init__(self, prefix):
        # Read from standard input, make it Non Blocking
        self._fd = 0
        ofl = fcntl.fcntl(self._fd, fcntl.F_GETFL)
        fcntl.fcntl(self._fd, fcntl.F_SETFL, ofl | os.O_NONBLOCK)
        # We open a file to read from 
        super(SampleReadDescriptor, self).__init__(prefix, ofl)
        
    def doRead(self):
        global dataToWrite
        # Some data is available for reading on your descriptor.
        log.msg("Please provide your inputs\n")
        dataToWrite = os.read(self.fd, 20)
        pass
        
    def closeDesciptor(self):
        fcntl.fcntl(0, fcntl.F_SETFL, self.fd)
        os.close(self.fd)
        
class SampleWriteDescriptor(SampleFileDescriptor):
    
    def __init__(self, prefix):
        
        # Write to a file
        self._fd = open("sampleinput", 'w')
        ofl = fcntl.fcntl(self._fd, fcntl.F_GETFL)
        fcntl.fcntl(self._fd, fcntl.F_SETFL, ofl | os.O_NONBLOCK)
        
        self._writeBuf = ""
        # We open a write to file
        super(SampleWriteDescriptor, self).__init__(prefix, ofl)
        
    
    def doWrite(self):
        global dataToWrite
        # Some data can be written to your descriptor.
        if dataToWrite.rstrip("\n") != "":
            log.msg("Got some thing to write")
            # After having written , ask for some inputs
            self._fd.write(dataToWrite)
            dataToWrite = ""
        
        
    def closeDesciptor(self):
        os.close(self.fd)


This was added to show that reactor could call time events and is not blocked on I/O.

def writeToStdInput(iter):
    now = time.localtime(time.time())
    timeStr = str(time.strftime("%y/%m/%d %H:%M:%S",now))
    log.msg(timeStr)
    if iter < 6:
        reactor.callLater(4, writeToStdInput, iter+1)
    else:
        reactor.stop()
Finally, we need to add reader and writer to the reactor. The reactor starts the select operation on the provided file descriptors of reader and writer, calling appropriate handlers when there is some thing to read, write or when an error occurs. Finally, you can check what happens in the last line of code, which is an example of blocking I/O.
stdReader = SampleReadDescriptor("Input")
stdWriter = SampleWriteDescriptor("Output")

reactor.callLater(4, writeToStdInput, 1)
reactor.addReader(stdReader)
reactor.addWriter(stdWriter)

reactor.run()

t = os.read(0, 20)



This paradigm allows us to do asynchronous I/O operations whose success depends on the fact that the operations be non - blocking.

Going forward, we will explore other computing paradigms supported by twisted framework.

No comments:

Post a Comment