[Twisted-Python] Flow - an approach to interuptable data flows

Clark C. Evans cce at clarkevans.com
Wed Mar 12 21:12:13 EST 2003

Hello.  Just checked in what I think is a stable version
of this 'flow' code I've been working on. 

  Problem: >
    In twisted, one would like to have a mechanism for
    managing large, perhaps blocking operations in 
    such a way that they can be resumable.

    - >
      When doing page web page building, for example,
      the process often gets nested quite deeply into
      several layers of nested tags
    - >
      Many parts of a web page may have to block till
      data is ready, as a database query may not have
      finished [not necessarly implying threads, for 
      example Gerhard Häring's code async PostgreSQL
    - >
      Some parts of the page building may also be 
      computational expensive, and thus, it would be
      polite to take a break now and then to let the
      main event loop process other events
    - > 
      Maintaining your context when constructing detailed
      (and highly nested) information is easy when you
      are using threads; but in Twisted's non-thread
      setting, it is quite tedious

    - >
      The construction of a 'Flow' object which describes
      a particular nesting of sub-routines so that intermediate
      stages could be added dynamically (for example, depending
      on a user's security).
    - > 
      A way of executing said Flow via a "execution stack", or
      FlowStack which is not the program stack; in this way, 
      a given Flow can be paused to allow other events to 
      get processed and then 'resumed' automatically.
    - >
      A set of FlowStages which are "atomic", non-inturruptable
      operations.   Each FlowStage has an 'input' and an 
      'output', if a Stage produces output, then the data moves
      onto subsequent stages.
    - >
      A mechanism for doing explosions (one-to-many iteration)  
      and reductions (many-to-one aggregations) which are both
      resumable.  In particular support for built-in lists and
      for 2.2, generators
    - > 
      A third mechansim for linking said Flow execution with
      a thread output, so that an iterator in a thread is 
      'transparently' marshalled into the main thread pump;
      so that when the thread blocks, the Flow Pauses, allowing
      other events to be handled within Twisted

Anyway, it's still experimental; but I'm rather happy with the
bugger, and it's producing some quite nice reports.  Comments 
would be helpful.


# Twisted, the Framework of Your Internet
# Copyright (C) 2003 Axista, Inc.
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General
# Public License as published by the Free Software Foundation.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307

""" A resumable execution flow mechanism.

    Within single-threaded twisted main-loop, all code shares the same
    execution stack.  Sometimes it is useful when writing a handler
    to allow the handler to return (for example, if must block), but 
    saving the handler's state so it can be resumed later. 

from __future__ import nested_scopes

class Flow:
       This object maintains a sequence of FlowStages which can be
       executed in order, where the output of one flow stage becomes
       the input of the next.   A flow starts with a top-level FlowStage,
       usually a producer of some sort, perhaps a database query, 
       followed by other filter stages until the data passed is 
       eventually consumed and None is returned.
    def __init__(self):
           Initializes a Flow object.  Processing starts at initialStage
           and then proceeds recursively.  Note that the stages are 
           recorded here as a StageItem singly-linked list.
        self.stageHead    = None
        self.stageTail    = None
        self.waitInterval = 0
    def append(self, stage):
            This appends an additional stage to the singly-linked
            list, starting with stageHead.
        link = FlowItem(stage)
        if not self.stageHead:
            self.stageHead = link
            self.stageTail = link
            self.stageTail.next = link
            self.stageTail = link
        return self

    def addFunction(self, callable, stop=None):
        self.append(FlowFunction(callable, stop))

    def addSequence(self, callable, onFinish = None):
        self.append(FlowSequence(callable, onFinish))

    def addContext(self, onFlush = None):

    def addAccumulator(self, accum, start = None, 
                       finish = None, bucket = None):
        self.append(FlowAccumulator(accum, start, finish, bucket))

    def addDiscard(self):
    def execute(self, data = None):
           This executes the current flow, given empty
           starting data and the default initial state.
        if self.stageHead:
            stack = FlowStack(self.stageHead, data, self.waitInterval)

class FlowStack:
       a stack of FlowStages and a means for their execution
    def __init__(self, flowitem, data = None, waitInterval = 0):
           bootstraps the processing of the flow:

             flowitem      the very first stage in the process
             data          starting argument
             waitInterval  a useful item to slow the flow
        self._waitInterval = waitInterval
        self._stack   = []
        self._context = []  # see FlowContext
        self._stack.append((data, flowitem.stage, flowitem.next))
    def context(self):
        cntx = self._context
        if cntx: 
            return cntx[-1]
    def push(self, data, stage=None, next=None):
           pushes a function to be executed onto the stack:
             data    argument to be passed
             stage   callable to be executed
             next    a FlowItem for subsequent stages
        if not stage:
            # assume the next stage in the process
            curr = self._current[2]
            if curr:
                stage = curr.stage
                next  = curr.next
        elif not next:
            # assume same stage, different function
            next = self._current[2]
        self._stack.append((data, stage, next))
    def execute(self):
           This executes the current flow.
        stack = self._stack
        while stack:
            self._current = stack.pop()
            (data, stage, next) = self._current
            if not(stage): raise "unconsumed data"
                stage(self, data)
            except PauseFlow:
                self.push(data, stage, next)

class PauseFlow(Exception):
      This exception is used to pause a Flow, returning control
      back to the main event loop.  The flow automatically 
      reschedules itself to resume execution, resuming at the
      stage where it left off.

class FlowStage:
        operational unit in a flow, performs some sort of operation
        and optionally pushes other stages onto the call stack
    def __call__(self, flow, data):
            this is the minimum flow stage, it simply returns None,
            and thus indicates that the current branch is complete
class FlowFunction(FlowStage):
        wraps a function takign an input and returning a result; 
        in effect this implements one-to-one behavior
    def __init__(self, callable, stop = None):
        self.callable  = callable
        self.stop      = stop
    def __call__(self, flow, data):
            executes the callable and passes this data onto the next 
            stage in the flow; since this only pushes one item on
            to the stack, it is tail-recursive
        ret = self.callable(data)
        if ret is not self.stop:

class _FlowContext:
        innerds of the flow context, this object is created
        for each descend of a FlowContext stage, and has 
        attached callbacks.

        addOnFlush   adds a function to be called, optionally
                     with the 'context' attribute
    def __init__(self):
        self._flush = []
    def addFlush(self, onFlush, bucket = None):
        args = onFlush.func_code.co_argcount
        if 0 == args: 
           fnc = lambda flow, cntx: onFlush()
        elif 1 == args:
           fnc = lambda flow, cntx: onFlush(getattr(cntx,bucket,None))
           fnc = onFlush
class FlowContext(FlowStage):
        represents a branch of execution which may hold accumulated
        results and may have 'flush' handlers attached, which fire
        when the context is closed
    def __init__(self, onFlush = None):
        self.onFlush = onFlush

    def __call__(self, flow, data):
            adds the _FlowContext to the FlowStack's _context stack
        cntx = _FlowContext()
        if self.onFlush: 
        flow.push(cntx, self.flush)

    def flush(self, flow, cntx):
           cleans up the context and fires onFlush events
        top = flow._context.pop()
        assert top is cntx
        fncs = cntx._flush
        while fncs: flow.push(cntx, fncs.pop())

class FlowSequence(FlowStage):
        allows callable objects returning an iterator to be used
        within the system; this implements one-to-many behavior
    def __init__(self, callable, onFinish = None):
        self.callable = callable
        self.onFinish = onFinish
    def __call__(self, flow, data):
            executes the callable, and if an iterator object 
            is returned, schedules its next method
        ret = self.callable(data)
        if ret is not None:
            next = iter(ret).next
            flow.push(next, self.iterate)
    def iterate(self, flow, next):
            if the next method has results, then schedule the
            next stage of the flow, otherwise finish up
            data = next()
            flow.push(next, self.iterate)
        except StopIteration:
            if self.onFinish:

class FlowAccumulator(FlowStage):
        the opposite of a FlowSequence, this takes multiple calls
        and converges them into a single call; this implements
        many-to-one behavior;  for the accumulator to work, it
        requires a FlowContext be higher up the call stack
    def __init__(self, accum, start = None, finish = None, bucket = None):
        if not bucket: bucket = id(self)
        self.bucket = str(bucket)
        self.start  = start
        self.accum  = accum
        self.finish = finish
    def __call__(self, flow, data):
            executes the accum function
        cntx = flow.context()
        assert cntx, "FlowAccumulator needs a prior FlowContext"
        if not hasattr(cntx, self.bucket):
             if self.finish: cntx.addFlush(self.finish, self.bucket)
             acc = self.start
             if callable(acc): acc = acc()
             acc = getattr(cntx, self.bucket)
        acc = self.accum(acc, data)
        setattr(cntx, self.bucket, acc)

class FlowItem:
       a Flow is implemented as a series of FlowStage objects
       in a linked-list; this is the link node
         stage   a FlowStage in the linked list
         next    next FlowStageLink in this list
    def __init__(self,stage):
        self.stage = stage
        self.next  = None

class FlowIterator:
       This is an iterator base class which can be used to build
       iterators which are constructed and run within a Flow
    def __init__(self, data = None):
        from twisted.internet.reactor import callInThread
        self.data = data  
        tunnel = _TunnelIterator(self)
        self._tunnel = tunnel
    def __iter__(self): 
        return self._tunnel
    def next(self):
            The method used to fetch the next value
        raise StopIteration

class _TunnelIterator:
       This is an iterator which tunnels output from an iterator
       executed in a thread to the main thread.   Note, unlike
       regular iterators, this one throws a PauseFlow exception
       which must be handled by calling reactor.callLater so that
       the producer threads can have a chance to send events to 
       the main thread.
    def __init__(self, source):
            This is the setup, the source argument is the iterator
            being wrapped, which exists in another thread.
        self.source     = source
        self.isFinished = 0
        self.failure    = None
        self.buff       = []
        self.append     = self.buff.append
    def process(self):
            This is called in the 'source' thread, and 
            just basically sucks the iterator, appending
            items back to the main thread.
        from twisted.internet.reactor import callFromThread
            while 1:
                val = self.source.next()
        except StopIteration:
        except Exception, e:
            print str(e)
            #failure = failure.Failure()
            #print "failing", failure
    def setFailure(self, failure):
        self.failure = failure
    def stop(self):
        self.isFinished = 1
    def next(self):
        if self.buff:
           return self.buff.pop(0)
        if self.isFinished:  
            raise StopIteration
        if self.failure:
            raise self.failure
        raise PauseFlow

class FlowQueryIterator(FlowIterator):
    def __init__(self, pool, sql):
        self.curs = None
        self.sql  = sql
        self.pool = pool
        self.data = None
    def __call__(self,data):
        ret = FlowIterator.__call__(self,data)
        ret.append = ret.buff.extend
        return ret
    def next(self):
        if not self.curs:
            conn = self.pool.connect()
            self.curs = conn.cursor()
            if self.data: self.curs.execute(self.sql % self.data) 
            else: self.curs.execute(self.sql)
        res = self.curs.fetchone() # TODO: change to fetchmany
        if not(res): 
            raise StopIteration
        return res

def testFlowIterator():
    class CountIterator(FlowIterator):
        def next(self): # this is run in a separate thread
            print "."
            from time import sleep
            val = self.data
            if not(val):
                print "done counting"
                raise StopIteration
            self.data -= 1
            return val
    def printResult(data): print data
    def finished(): print "finished"
    f = Flow()
    f.addSequence(CountIterator, onFinish=finished)
    f.waitInterval = 1

def testFlow():
       primary tests of the Flow construct
    def addOne(data): return  data+1
    def printResult(data): print data
    def finished(): print "finished"
    def dataSource(data):  return [1, 1+data, 1+data*2]
    f = Flow()
    f.addSequence(dataSource, finished)
    class simpleIterator:
        def __init__(self, data): 
            self.data = data
        def __iter__(self): 
            return self
        def next(self): 
            if self.data < 0: raise StopIteration
            ret = self.data
            self.data -= 1
            return ret
    import operator
    f = Flow()
    f.addAccumulator(operator.add, 0, printResult)

def testFlowConnect():
    from twisted.enterprise.adbapi import ConnectionPool
    pool = ConnectionPool("mx.ODBC.EasySoft","PSICustomerProto")
    def printResult(x): print x
    def printDone(): print "done"
    sql = "SELECT caption from vw_date"
    f = Flow()
    f.waitInterval = 1

# support iterators for 2.1
   StopIteration = StopIteration
   iter = iter
   StopIteration = IndexError
   class _ListIterator:
       def __init__(self,lst):
           self.idx = 0
           if getattr(lst,'keys',None): lst = lst.keys()
           self.lst = lst
       def next(self):
           idx = self.idx
           self.idx += 1
           return self.lst[idx]
   def iter(lst): 
       if hasattr(lst,'__iter__'):
           return lst.__iter__()
           return _ListIterator(lst)

if '__main__' == __name__:
    from twisted.internet import reactor

More information about the Twisted-Python mailing list