[Twisted-Python] more thoughts on resumable async data flows
Clark C. Evans
cce at clarkevans.com
Thu Mar 13 23:50:03 MST 2003
Howdy. I've made quite a few changes to flow.py and it's
quickly approaching maturity. To summarize, flow.py is a
way to express sequential, streaming data flows that are
interruptable as a collection of small, atomic operations.
This is needed since sometimes an operation must block,
flow.py takes care of resuming your flow at a later time.
It's trivial, but here is an example...
def printResult(data): print data
def addOne(data): return data+1
def finished(): print "finished"
def dataSource(data): return [1, 1+data, 1+data*2]
a = Flow()
a.addBranch(dataSource, finished)
a.addCallable(addOne)
a.addCallable(printResult)
a.execute(2)
a.execute(8)
The above code constructs a flow, starting with a
'branch' operation that generates a series of events,
in this case, the lists [1,3,5] and then in a
second execution, [1,9,17]. For each one of these
events, a function addOne is called on them, and
then the output of that function is directed to be printed.
This mechanism works with generators...
def simpleGenerator(data):
for x in range(data):
yield x
b = Flow()
b.addBranch(simpleGenerator)
b.addCallable(printResult)
b.execute(5)
While this may not be all that useful, what Flow brings
to the table is the ability to PauseFlow within a
processing stage, for example
class simpleIterator:
def __init__(self, data):
self.data = data
def __iter__(self):
return self
def next(self):
print "."
if self.data < 0: raise StopIteration
ret = self.data
self.data -= 1
#
# imagine a blocking operation here... sometime
if ret % 2:
raise PauseFlow # <= Goes to main event loop
return ret
c = Flow()
c.addBranch(simpleIterator)
c.addCallable(printResult)
c.execute(5)
The above code produces 4, 2, 0 ; while this may not
be all that interesting, if there were N stages above
this code, they would all be resumed properly. By
tossing PauseFlow, the entire Flow event loop is
stopped, and a reactor.callLater(0, loop-again) is
called; giving other events in the queue to work.
Thus, the flow construct provides a way to not only manage
a bunch of useful code snippets into a process; but more
importantly gives a way that the entire flow can be
interruped and then resumed later when data arrives.
Anyway, it's in the sandbox if anyone wants to play...
Clark
P.S. Unfortunately, it looks like PauseFlow doesn't
work in the context of a generator... pity. Hopefully
I'm doing something wrong.
More information about the Twisted-Python
mailing list