[Twisted-Python] hacking on flow

Clark C. Evans cce at clarkevans.com
Thu Apr 10 04:51:11 EDT 2003

On Thu, Apr 10, 2003 at 09:04:59AM +0200, Philippe Lafoucrière wrote:
| Your post is in a wrong thread...

Oops.  Responded to the wrong message.  ;(

| BTW, how do you test this ? It seems to be good (well written), but I'm
| not sure to exactly understand what you wanted to do.

The basic pattern, as put forth by extrepum in his post a while back...
is that you implement your Flow object using a generator...

   def mygenerator():
       print "dosomething"
       d = SubGenerator()
       yield d
       print d.result
       d = AnotherSubGenerator()
       yield d
       print d.result

''' Flow -- async data flow

    This module provides a mechanism for using async data flows through
    the use of generators.  While this module does not use generators in
    its implementation, it isn't very useable without them.

    A data flow starts with a top level generator, which has numerous
    yield statements.   Each yield can return one of several types:

        flow.Cooperate  This (singleton) value should be returned when 
                        the flow would like to give up control of the 
                        call stack to allow other events to be handled.

        flow.Generator  This is a sub-flow (iterator) to be executed.  
                        This object has a 'result' value which can be 
                        checked for each value yielded.   If the last
                        iteration of the sub-flow produced an exception,
                        then a failure.Failure object will be returned.

                        While the sub-flow is generating (it has not
                        returned or raised StopIteration), it will have
                        an 'active' state of true.  Once it is finished,
                        the 'active' state will be false.

       <anything>       Any other return value, which is not a FlowItem
from twisted.python import failure
from twisted.python.compat import StopIteration, iter

class FlowItem: pass
Cooperate = FlowItem() 

class Generator(FlowItem):
    def __init__(self, iterable):
        self.__next  = iter(iterable).next
        self.result  = None
        self.active  = 1
    def isFailure(self):
        return isinstance(self.result, failure.Failure)
    def getResult(self):
        if self.isFailure():
            res = self.result
            if res.value:  raise res.value
            raise res.type
        return self.result
    def generate(self):
            self.result = self.__next()
        except StopIteration:
            self.active = 0
            self.result = None
            self.active = 0
            self.result = failure.Failure()

class Flow(Generator):
    ''' a top-level generator which can handle subordinates '''
    def __init__(self, iterable):
        Generator.__init__(self, iterable)
        self._stack = [self]
    def execute(self):
        while self._stack:
            head = self._stack[-1]
            if head.active:
                if isinstance(result, FlowItem):
                    if result is Yield: 
                        return 1

from twisted.internet import defer, reactor
class DeferredFlow(Flow, defer.Deferred):
   ''' a version of Flow using Twisted's reactor and Deferreds '''
   def __init__(self, iterable):
       reactor.callLater(0, self.execute)
   def execute(self):
       if Flow.execute(self):
           reactor.callLater(0, self.execute)
           if self.isFailure():

More information about the Twisted-Python mailing list