[Twisted-Python] more thoughts on resumable async data flows

Clark C. Evans cce at clarkevans.com
Thu Mar 13 23:50:03 MST 2003


Howdy.   I've made quite a few changes to flow.py and it's 
quickly approaching maturity.   To summarize, flow.py is a
way to express sequential, streaming data flows that are 
interruptable as a collection of small, atomic operations.
This is needed since sometimes an operation must block,
flow.py takes care of resuming your flow at a later time.

It's trivial, but here is an example...

    def printResult(data): print data
    def addOne(data): return  data+1
    def finished(): print "finished"
    def dataSource(data):  return [1, 1+data, 1+data*2]

    a = Flow()
    a.addBranch(dataSource, finished)
    a.addCallable(addOne)
    a.addCallable(printResult)
    a.execute(2)
    a.execute(8)

The above code constructs a flow, starting with a
'branch' operation that generates a series of events,
in this case, the lists [1,3,5] and then in a 
second execution, [1,9,17].  For each one of these 
events, a function addOne is called on them, and 
then the output of that function is directed to be printed. 

This mechanism works with generators...

    def simpleGenerator(data):
        for x in range(data):
            yield x

    b = Flow()
    b.addBranch(simpleGenerator)
    b.addCallable(printResult)
    b.execute(5)

While this may not be all that useful, what Flow brings
to the table is the ability to PauseFlow within a 
processing stage, for example

    class simpleIterator:
        def __init__(self, data):
            self.data = data
        def __iter__(self):
            return self
        def next(self):
            print "."
            if self.data < 0: raise StopIteration
            ret = self.data
            self.data -= 1
            #
            # imagine a blocking operation here... sometime
            if ret % 2:
                raise PauseFlow   # <= Goes to main event loop
            return ret

    c = Flow()
    c.addBranch(simpleIterator)
    c.addCallable(printResult)
    c.execute(5)

The above code produces 4, 2, 0 ; while this may not
be all that interesting, if there were N stages above
this code, they would all be resumed properly.  By 
tossing PauseFlow, the entire Flow event loop is 
stopped, and a reactor.callLater(0, loop-again) is
called; giving other events in the queue to work.

Thus, the flow construct provides a way to not only manage
a bunch of useful code snippets into a process; but more
importantly gives a way that the entire flow can be 
interruped and then resumed later when data arrives.

Anyway, it's in the sandbox if anyone wants to play...

Clark

P.S.  Unfortunately, it looks like PauseFlow doesn't
work in the context of a generator... pity.  Hopefully
I'm doing something wrong.






More information about the Twisted-Python mailing list