[Twisted-Python] Flow - an approach to interuptable data flows
Clark C. Evans
cce at clarkevans.com
Wed Mar 12 21:12:13 EST 2003
Hello. Just checked in what I think is a stable version
of this 'flow' code I've been working on.
Problem: >
In twisted, one would like to have a mechanism for
managing large, perhaps blocking operations in
such a way that they can be resumable.
Context:
- >
When doing page web page building, for example,
the process often gets nested quite deeply into
several layers of nested tags
- >
Many parts of a web page may have to block till
data is ready, as a database query may not have
finished [not necessarly implying threads, for
example Gerhard Häring's code async PostgreSQL
linkage]
- >
Some parts of the page building may also be
computational expensive, and thus, it would be
polite to take a break now and then to let the
main event loop process other events
- >
Maintaining your context when constructing detailed
(and highly nested) information is easy when you
are using threads; but in Twisted's non-thread
setting, it is quite tedious
Solution:
- >
The construction of a 'Flow' object which describes
a particular nesting of sub-routines so that intermediate
stages could be added dynamically (for example, depending
on a user's security).
- >
A way of executing said Flow via a "execution stack", or
FlowStack which is not the program stack; in this way,
a given Flow can be paused to allow other events to
get processed and then 'resumed' automatically.
- >
A set of FlowStages which are "atomic", non-inturruptable
operations. Each FlowStage has an 'input' and an
'output', if a Stage produces output, then the data moves
onto subsequent stages.
- >
A mechanism for doing explosions (one-to-many iteration)
and reductions (many-to-one aggregations) which are both
resumable. In particular support for built-in lists and
for 2.2, generators
- >
A third mechansim for linking said Flow execution with
a thread output, so that an iterator in a thread is
'transparently' marshalled into the main thread pump;
so that when the thread blocks, the Flow Pauses, allowing
other events to be handled within Twisted
Anyway, it's still experimental; but I'm rather happy with the
bugger, and it's producing some quite nice reports. Comments
would be helpful.
Clark
# Twisted, the Framework of Your Internet
# Copyright (C) 2003 Axista, Inc.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General
# Public License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
""" A resumable execution flow mechanism.
Within single-threaded twisted main-loop, all code shares the same
execution stack. Sometimes it is useful when writing a handler
to allow the handler to return (for example, if must block), but
saving the handler's state so it can be resumed later.
"""
from __future__ import nested_scopes
class Flow:
'''
This object maintains a sequence of FlowStages which can be
executed in order, where the output of one flow stage becomes
the input of the next. A flow starts with a top-level FlowStage,
usually a producer of some sort, perhaps a database query,
followed by other filter stages until the data passed is
eventually consumed and None is returned.
'''
def __init__(self):
'''
Initializes a Flow object. Processing starts at initialStage
and then proceeds recursively. Note that the stages are
recorded here as a StageItem singly-linked list.
'''
self.stageHead = None
self.stageTail = None
self.waitInterval = 0
#
def append(self, stage):
'''
This appends an additional stage to the singly-linked
list, starting with stageHead.
'''
link = FlowItem(stage)
if not self.stageHead:
self.stageHead = link
self.stageTail = link
else:
self.stageTail.next = link
self.stageTail = link
return self
def addFunction(self, callable, stop=None):
self.append(FlowFunction(callable, stop))
def addSequence(self, callable, onFinish = None):
self.append(FlowSequence(callable, onFinish))
def addContext(self, onFlush = None):
self.append(FlowContext(onFlush))
def addAccumulator(self, accum, start = None,
finish = None, bucket = None):
self.append(FlowAccumulator(accum, start, finish, bucket))
def addDiscard(self):
self.append(FlowStage())
def execute(self, data = None):
'''
This executes the current flow, given empty
starting data and the default initial state.
'''
if self.stageHead:
stack = FlowStack(self.stageHead, data, self.waitInterval)
stack.execute()
class FlowStack:
'''
a stack of FlowStages and a means for their execution
'''
def __init__(self, flowitem, data = None, waitInterval = 0):
'''
bootstraps the processing of the flow:
flowitem the very first stage in the process
data starting argument
waitInterval a useful item to slow the flow
'''
self._waitInterval = waitInterval
self._stack = []
self._context = [] # see FlowContext
self._stack.append((data, flowitem.stage, flowitem.next))
#
def context(self):
cntx = self._context
if cntx:
return cntx[-1]
#
def push(self, data, stage=None, next=None):
'''
pushes a function to be executed onto the stack:
data argument to be passed
stage callable to be executed
next a FlowItem for subsequent stages
'''
if not stage:
# assume the next stage in the process
curr = self._current[2]
if curr:
stage = curr.stage
next = curr.next
elif not next:
# assume same stage, different function
next = self._current[2]
self._stack.append((data, stage, next))
#
def execute(self):
'''
This executes the current flow.
'''
stack = self._stack
while stack:
self._current = stack.pop()
(data, stage, next) = self._current
if not(stage): raise "unconsumed data"
try:
stage(self, data)
except PauseFlow:
self.push(data, stage, next)
reactor.callLater(self._waitInterval,self.execute)
return
class PauseFlow(Exception):
'''
This exception is used to pause a Flow, returning control
back to the main event loop. The flow automatically
reschedules itself to resume execution, resuming at the
stage where it left off.
'''
class FlowStage:
'''
operational unit in a flow, performs some sort of operation
and optionally pushes other stages onto the call stack
'''
#
def __call__(self, flow, data):
'''
this is the minimum flow stage, it simply returns None,
and thus indicates that the current branch is complete
'''
pass
class FlowFunction(FlowStage):
'''
wraps a function takign an input and returning a result;
in effect this implements one-to-one behavior
'''
def __init__(self, callable, stop = None):
self.callable = callable
self.stop = stop
#
def __call__(self, flow, data):
'''
executes the callable and passes this data onto the next
stage in the flow; since this only pushes one item on
to the stack, it is tail-recursive
'''
ret = self.callable(data)
if ret is not self.stop:
flow.push(ret)
class _FlowContext:
'''
innerds of the flow context, this object is created
for each descend of a FlowContext stage, and has
attached callbacks.
addOnFlush adds a function to be called, optionally
with the 'context' attribute
'''
def __init__(self):
self._flush = []
#
def addFlush(self, onFlush, bucket = None):
args = onFlush.func_code.co_argcount
if 0 == args:
fnc = lambda flow, cntx: onFlush()
elif 1 == args:
fnc = lambda flow, cntx: onFlush(getattr(cntx,bucket,None))
else:
fnc = onFlush
self._flush.append(fnc)
class FlowContext(FlowStage):
'''
represents a branch of execution which may hold accumulated
results and may have 'flush' handlers attached, which fire
when the context is closed
'''
def __init__(self, onFlush = None):
self.onFlush = onFlush
def __call__(self, flow, data):
'''
adds the _FlowContext to the FlowStack's _context stack
'''
cntx = _FlowContext()
if self.onFlush:
cntx.addFlush(self.onFlush)
flow._context.append(cntx)
flow.push(cntx, self.flush)
flow.push(data)
def flush(self, flow, cntx):
'''
cleans up the context and fires onFlush events
'''
top = flow._context.pop()
assert top is cntx
fncs = cntx._flush
while fncs: flow.push(cntx, fncs.pop())
class FlowSequence(FlowStage):
'''
allows callable objects returning an iterator to be used
within the system; this implements one-to-many behavior
'''
def __init__(self, callable, onFinish = None):
self.callable = callable
self.onFinish = onFinish
#
def __call__(self, flow, data):
'''
executes the callable, and if an iterator object
is returned, schedules its next method
'''
ret = self.callable(data)
if ret is not None:
next = iter(ret).next
flow.push(next, self.iterate)
#
def iterate(self, flow, next):
'''
if the next method has results, then schedule the
next stage of the flow, otherwise finish up
'''
try:
data = next()
flow.push(next, self.iterate)
flow.push(data)
except StopIteration:
if self.onFinish:
self.onFinish()
class FlowAccumulator(FlowStage):
'''
the opposite of a FlowSequence, this takes multiple calls
and converges them into a single call; this implements
many-to-one behavior; for the accumulator to work, it
requires a FlowContext be higher up the call stack
'''
def __init__(self, accum, start = None, finish = None, bucket = None):
if not bucket: bucket = id(self)
self.bucket = str(bucket)
self.start = start
self.accum = accum
self.finish = finish
#
def __call__(self, flow, data):
'''
executes the accum function
'''
cntx = flow.context()
assert cntx, "FlowAccumulator needs a prior FlowContext"
if not hasattr(cntx, self.bucket):
if self.finish: cntx.addFlush(self.finish, self.bucket)
acc = self.start
if callable(acc): acc = acc()
else:
acc = getattr(cntx, self.bucket)
acc = self.accum(acc, data)
setattr(cntx, self.bucket, acc)
class FlowItem:
'''
a Flow is implemented as a series of FlowStage objects
in a linked-list; this is the link node
stage a FlowStage in the linked list
next next FlowStageLink in this list
'''
def __init__(self,stage):
self.stage = stage
self.next = None
class FlowIterator:
'''
This is an iterator base class which can be used to build
iterators which are constructed and run within a Flow
'''
#
def __init__(self, data = None):
from twisted.internet.reactor import callInThread
self.data = data
tunnel = _TunnelIterator(self)
callInThread(tunnel.process)
self._tunnel = tunnel
#
def __iter__(self):
return self._tunnel
#
def next(self):
'''
The method used to fetch the next value
'''
raise StopIteration
class _TunnelIterator:
'''
This is an iterator which tunnels output from an iterator
executed in a thread to the main thread. Note, unlike
regular iterators, this one throws a PauseFlow exception
which must be handled by calling reactor.callLater so that
the producer threads can have a chance to send events to
the main thread.
'''
def __init__(self, source):
'''
This is the setup, the source argument is the iterator
being wrapped, which exists in another thread.
'''
self.source = source
self.isFinished = 0
self.failure = None
self.buff = []
self.append = self.buff.append
#
def process(self):
'''
This is called in the 'source' thread, and
just basically sucks the iterator, appending
items back to the main thread.
'''
from twisted.internet.reactor import callFromThread
try:
while 1:
val = self.source.next()
callFromThread(self.append,val)
except StopIteration:
callFromThread(self.stop)
except Exception, e:
print str(e)
#failure = failure.Failure()
#print "failing", failure
#callFromThread(self.setFailure,failure)
#
def setFailure(self, failure):
self.failure = failure
#
def stop(self):
self.isFinished = 1
#
def next(self):
if self.buff:
return self.buff.pop(0)
if self.isFinished:
raise StopIteration
if self.failure:
raise self.failure
raise PauseFlow
class FlowQueryIterator(FlowIterator):
def __init__(self, pool, sql):
FlowIterator.__init__(self)
self.curs = None
self.sql = sql
self.pool = pool
self.data = None
def __call__(self,data):
ret = FlowIterator.__call__(self,data)
ret.append = ret.buff.extend
return ret
def next(self):
if not self.curs:
conn = self.pool.connect()
self.curs = conn.cursor()
if self.data: self.curs.execute(self.sql % self.data)
else: self.curs.execute(self.sql)
res = self.curs.fetchone() # TODO: change to fetchmany
if not(res):
self.curs.close()
raise StopIteration
return res
def testFlowIterator():
class CountIterator(FlowIterator):
def next(self): # this is run in a separate thread
print "."
from time import sleep
sleep(.5)
val = self.data
if not(val):
print "done counting"
raise StopIteration
self.data -= 1
return val
def printResult(data): print data
def finished(): print "finished"
f = Flow()
f.addSequence(CountIterator, onFinish=finished)
f.addFunction(printResult)
f.waitInterval = 1
f.execute(5)
def testFlow():
'''
primary tests of the Flow construct
'''
def addOne(data): return data+1
def printResult(data): print data
def finished(): print "finished"
def dataSource(data): return [1, 1+data, 1+data*2]
f = Flow()
f.execute()
f.addSequence(dataSource, finished)
f.addFunction(addOne)
f.addFunction(printResult)
f.execute(2)
f.execute(11)
class simpleIterator:
def __init__(self, data):
self.data = data
def __iter__(self):
return self
def next(self):
if self.data < 0: raise StopIteration
ret = self.data
self.data -= 1
return ret
import operator
f = Flow()
f.addContext(finished)
f.addSequence(simpleIterator)
f.addAccumulator(operator.add, 0, printResult)
def testFlowConnect():
from twisted.enterprise.adbapi import ConnectionPool
pool = ConnectionPool("mx.ODBC.EasySoft","PSICustomerProto")
def printResult(x): print x
def printDone(): print "done"
sql = "SELECT caption from vw_date"
f = Flow()
f.waitInterval = 1
f.addStage(FlowQueryIterator(pool,sql),onFinish=printDone)
f.addStage(printResult)
f.execute()
# support iterators for 2.1
try:
StopIteration = StopIteration
iter = iter
except:
StopIteration = IndexError
class _ListIterator:
def __init__(self,lst):
self.idx = 0
if getattr(lst,'keys',None): lst = lst.keys()
self.lst = lst
def next(self):
idx = self.idx
self.idx += 1
return self.lst[idx]
def iter(lst):
if hasattr(lst,'__iter__'):
return lst.__iter__()
else:
return _ListIterator(lst)
if '__main__' == __name__:
from twisted.internet import reactor
testFlow()
testFlowIterator()
#testFlowConnect()
reactor.callLater(5,reactor.stop)
reactor.run()
More information about the Twisted-Python
mailing list