[Twisted-Python] hacking on flow

Clark C. Evans cce at clarkevans.com
Thu Apr 10 01:33:02 EDT 2003


work in progress on my understanding of extrepum's flow... not
in anyway near perfect

-------------

''' Flow -- async data flows

    This is a flow module which is used for async data flows, 
    in particularly those coming from databases where blocking
    behavior is common.   Implementations will typically use
    generators (or anything implementing the iterator protocol)
    for each stage of the flow.

    Basically, communication within the data flow is carried
    out using the yield statement.  Besides using 'return' 
    or throwing a StopIteration exception, there are several
    items which yield can return:

       flow.Yield      This (singleton) value should be returned when an
                       operation would block, it gives other micro-threads
                       a chance to do their work.

       flow.Callable   A callable object should be returned when a function
                       should be executed.  This adds another stage to the
                       flow. These objects have a 'result' attribute which 
                       is filled in with the output of the function, or a 
                       subclass of failure.Failure for exceptions.

       flow.Iteration  An iterable object, that is, a generator, or array,
                       or class which implements both next() and __iter__.
                       These objects, like Callable, have a 'result' 
                       attribute, but also have a stop attribute which
                       is true when StopIteration has been thrown.

       <anything>      Any other return value
  
'''
from twisted.python import failure
from twisted.python.compat import StopIteration, iter

class FlowItem:
    ''' Base class of all flow objects '''
    def __init__(self):
        self.result = None

Yield = FlowItem()  # A special flow item which signals a block

class Iteration(FlowItem):
    ''' An iteration object

        Used for objects which implement __iter__ and next
        and can be called more than once maintaining state.
    '''
    def __init__(self, iterable):
        FlowItem.__init__(self)
        self.next    = iter(iterable).next
        self.stop    = 0
    def __call__(self):
        try:
            self.result = self.next()
        except StopIteration:
            self.stop = 1
            self.result = None
        except:
            self.stop = 1
            self.result = failure.Failure()
        return self.result

class Callable(FlowItem):
    ''' A callable, where the function value is returned '''
    def __init__(self, func, *args, **kwargs):
        FlowItem.__init__(self)
        self.args   = args
        self.kwargs = kwargs
        self.func   = func
    def __call__(self):
        try:
            self.result = apply(self.func, self.args, self.kwargs)
        except:
            self.result = failure.Failure()
        return self.result

class Controller(Iteration):
      ''' An iteration object that remains on the stack

          When a Callable or an Iteration return a FlowItem
          other than Yield, then they are normally replaced
          by the returned value, so that it's value is returned
          to the Callable's caller.  In the case of a Controller,
          this replacement behavior isn't performed.
      '''
      pass

class Flow(object):
    ''' an flow, call next() until the iterator raises StopIteration '''
    def __init__(self, controller):
        self._stack = [controller]
    def __iter__(self): return self
    def next(self):
        while self._stack:
            head = self._stack.pop()
            result = head()
            if isinstance(result, FlowItem):
                if result is Yield: 
                    self._stack.append(head)
                    return 1
                if isinstance(head, Controller):
                    self._stack.append(head)
                self._stack.append(result)
        raise StopIteration

class DeferredFlow(object):
    pass
    # basically, create a Flow, loop on Flow.next()
    # for each item in the loop, do a reactor.callLater(0, next)
    # return the very last value or error via a Deferred





More information about the Twisted-Python mailing list