[Twisted-Python] rewrite of flow.py completed

Clark C. Evans cce at clarkevans.com
Fri Apr 11 03:38:34 EDT 2003

Ok.  The complete re-write of flow.py based on the work of 
extrepum ("Bob Ippolito") is finished, with a test case
checked in.  This differs from extrepum's work [1] in 
some ways, but is almost identical in others.

1.  The implementation and test suite does not use generators,
    thus it is safe for 2.1 usage... although I can't imagine
    anyone using it without generators.   But the advantage 
    is that it could go into the current Twisted code base.
    The test suite uses iterators (with the corresponding
    generator commented out).

2.  This implementation has 4 distinct components:

    a) the core 'Flow' class and its supporting items, could be
       put into something like twisted.python.flow as they only
       depend upon other twisted.python stuff.  

    b) There is a DeferredFlow which uses Flow as a mix-in with
       Deferred, with behavior similar to DeferredList.  This 
       could go into twisted.internet.defer

    c) There is a ThreadedIterator which depends upon DeferredFlow, 
       and can be used to merge blocking behavior into the 
       framework.   This could go into twisted.internet.threads

    d) There is a very small class which builds upon ThreadedIterator
       that uses a pool from twisted.enterprise.adbapi; however, 
       in this context the async features of adbapi more or less
       get in the way.   So, perhaps this is best left as an example.

    In particular, extrepum's work build on top of Deferred, which
    I think is unnecessary; thus quite a bit of simplification 
    occurred when stripping out the Deferred stuff, and instead
    treating DeferredFlow as a cross product or mix-in of 
    Flow and Deferred.

3.  Extrepum's work was more 'flat' and didn't take recursive
    use of the technique into consideration; or perhaps I just 
    didn't understand how it would handle recursive usage.
    In any case, this implementation allows generators to call
    other generators and provides a nice stack unwinding, etc.

4.  I'd like to eventually look more at extrepum's work to 
    see if he had additional aspects that I've missed.  If 
    those aspects are generally useable, then I'll include 

Anyway, that's about it.  I'm now rebuilding some of my internal
code to use this module and ripping the old flow.py out.  This 
flow overall has been *greatly* simpler usage -- and I can't
thank extrepum enough for pointing me to his usage of generators
to create async flows within Twisted.

I'd also like to thank Itamar, Moshez, Radix, and Exarkun for
helping me along and pointing me in the right directions.



[1] twistedmatrix.com/pipermail/twisted-python/2003-February/002808.html

# Twisted, the Framework of Your Internet
# Copyright (C) 2003 Matthew W. Lefkowitz
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General
# Public License as published by the Free Software Foundation.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
""" Flow -- async data flow

    This module provides a mechanism for using async data flows through
    the use of generators.  While this module does not use generators in
    its implementation, it isn't very useable without them.   A data flow
    is constructed with a top level generator, which can have three 
    types of yield statements:  flow.Cooperate, flow.Generator, or
    any other return value with exceptions wrapped using failure.Failure
    An example program...

    from __future__ import generators
    import flow
    def producer():
        lst = flow.Generator([1,2,3])
        nam = flow.Generator(['one','two','three'])
        while 1:
            yield lst; yield nam
            if lst.stop or nam.stop: 
            yield (lst.result, nam.result)

    def consumer():
        title = flow.Generator(['Title'])
        yield title
        print title.getResult()
        lst = flow.Generator(producer())
            while 1:
                yield lst
                print lst.getResult()
        except flow.StopIteration: pass


from twisted.python import failure
from twisted.python.compat import StopIteration, iter

class FlowCommand: 
    """ Objects given special meaning when returned from yield """

class Cooperate(FlowCommand):
    """ Represents a request to delay and let other events process

        Objects of this type are returned within a flow when
        the flow would block, or needs to sleep.  This object
        is then used as a signal to the flow mechanism to pause
        and perhaps let other delayed operations to proceed.
    def __init__(self, timeout = 0):
        self.timeout = timeout

class Generator(FlowCommand):
    """ Wraps a generator or other iterator for use in a flow 

        Creates a nested generation stage (a producer) which can provide
        zero or more values to the current stage (the consumer).  After 
        a yield of this object when control has returned to the caller,
        this object will have two attributes:

            stop    This is true if the underlying generator has not 
                    been started (a yield is needed) or if the underlying
                    generator has raised StopIteration

            result  This is the result of the generator if it is active, 
                    the result may be a fail.Failure object if an 
                    exception was thrown in the nested generator.
    def __init__(self, iterable):
        self._next  = iter(iterable).next
        self.result = None
        self.stop   = 1
    def isFailure(self):
        """ return a boolean value if the result is a Failure """ 
        if self.stop: raise StopIteration()
        return isinstance(self.result, failure.Failure)
    def getResult(self):
        """ return the result, or re-throw an exception on Failure """
        if self.isFailure():
            raise (self.result.value or self.result.type)
        return self.result
    def _generate(self):
        """ update the active and result member variables """ 
            self.result = self._next()
            self.stop = 0
        except StopIteration:
            self.stop = 1
            self.result = None
        except Cooperate, coop:
            self.stop = 0
            self.result = coop
        except failure.Failure, fail:
            self.stop = 1
            self.result = failure
            self.stop = 1
            self.result = failure.Failure()

class Flow:
    """ A flow contruct, created with a top-level generator/iterator

        The iterable provided to this flow is the top-level consumer
        object.  From within the consumer, multiple 'yield' calls can
        be made returning either Cooperate or Generate.  If a Generate
        object is returned, then it becomes the current context and
        the process is continued.  Communication from the producer 
        back to the consumer is done by yield of a non FlowItem
    def __init__(self, iterable):
        self.results = []
        self._stack  = [Generator(iterable)]
    def _addResult(self, result):
        """ private called as top-level results are added"""
    def _execute(self):
        """ private execute, execute flow till a Cooperate is found """
        while self._stack:
            head = self._stack[-1]
            if head.stop:
                result = head.result
                if isinstance(result, FlowCommand):
                    if isinstance(result, Cooperate):
                        return result.timeout
                    assert(isinstance(result, Generator))
                    if len(self._stack) > 1:
                        if self._addResult(result):
    def execute(self):
        """ continually execute, using sleep for Cooperate """
        from time import sleep
        while 1:
            timeout = self._execute()
            if timeout is None: break

from twisted.internet import defer
class DeferredFlow(Flow, defer.Deferred):
    """ a version of Flow using Twisted's reactor and Deferreds
        In this version, a call to execute isn't required.  Instead,
        the iterable is scheduled right away using the reactor.  And,
        the Cooperate is implemented through the reactor's callLater.
        Since more than one (possibly failing) result could be returned,
        this uses the same semantics as DeferredList
    def __init__(self, iterable, delay = 0, 
                 fireOnOneCallback=0, fireOnOneErrback=0):
        """initialize a DeferredFlow
        @param iterable:          top level iterator / generator
        @param delay:             delay when scheduling reactor.callLater
        @param fireOnOneCallback: a flag indicating that the first good 
                                  yielded result should be sent via Callback
        @param fireOnOneErrback:  a flag indicating that the first failing
                                  yield result should be sent via Errback
        from twisted.internet import reactor
        self.fireOnOneCallback = fireOnOneCallback
        self.fireOnOneErrback  = fireOnOneErrback
        reactor.callLater(delay, self._execute)
    def execute(self): 
        raise TypeError("Deferred Flow is auto-executing") 
    def _addResult(self, result):
        """ emulate DeferredList behavior, short circut if event is fired """
        if not self.called:
            if self.fireOnOneCallback:
                if not isinstance(result, failure.Failure):
                    return 1
            if self.fireOnOneErrback:
                if isinstance(result, failure.Failure):
                    return 1
    def _execute(self):
        timeout = Flow._execute(self)
        if timeout is None:
            if not self.called:
            from twisted.internet import reactor
            reactor.callLater(timeout, self._execute)
# The following is a thread package which really is othogonal to
# Flow.  Flow does not depend on it, and it does not depend on Flow.
# Although, if you are trying to bring the output of a thread into
# a Flow, it is exactly what you want.   The QueryIterator is 
# just an obvious application of the ThreadedIterator.

class ThreadedIterator:
       This is an iterator base class which can be used to build
       iterators which are constructed and run within a Flow
    def __init__(self):
        tunnel = _TunnelIterator(self)
        self._tunnel = tunnel
    def __iter__(self): 
        from twisted.internet.reactor import callInThread
        return self._tunnel
    def next(self):
            The method used to fetch the next value, make sure
            to return a list of rows, not just a row
        raise StopIteration

class _TunnelIterator:
       This is an iterator which tunnels output from an iterator
       executed in a thread to the main thread.   Note, unlike
       regular iterators, this one throws a PauseFlow exception
       which must be handled by calling reactor.callLater so that
       the producer threads can have a chance to send events to 
       the main thread.
    def __init__(self, source):
            This is the setup, the source argument is the iterator
            being wrapped, which exists in another thread.
        self.source     = source
        self.isFinished = 0
        self.failure    = None
        self.buff       = []
    def process(self):
            This is called in the 'source' thread, and 
            just basically sucks the iterator, appending
            items back to the main thread.
        from twisted.internet.reactor import callFromThread
            while 1:
                val = self.source.next()
                self.buff.extend(val)    # lists are thread safe
        except StopIteration:
        self.source = None
    def setFailure(self, failure):
        self.failure = failure
    def stop(self):
        self.isFinished = 1
    def next(self):
        if self.buff:
           return self.buff.pop(0)
        if self.isFinished:  
            raise StopIteration
        if self.failure:
            raise self.failure
        raise Cooperate()

class QueryIterator(ThreadedIterator):
    def __init__(self, pool, sql, fetchall=0):
        self.curs = None
        self.sql  = sql
        self.pool = pool
        self.data = None
        self.fetchall = fetchall
    def __call__(self,data):
        self.data = data
        return self
    def next(self):
        if not self.curs:
            conn = self.pool.connect()
            self.curs = conn.cursor()
            if self.data: self.curs.execute(self.sql % self.data) 
            else: self.curs.execute(self.sql)
        if self.fetchall:
            res = self.curs.fetchall()
            res = self.curs.fetchmany()
        if not(res): 
            raise StopIteration
        return res

