[Twisted-Python] Re: hacking on flow

Clark C. Evans cce at clarkevans.com
Thu Apr 10 09:44:32 MDT 2003


On Thu, Apr 10, 2003 at 11:51:28AM +0200, Philippe Lafoucrière wrote:
| I had to manage different flows in one application.
| 
| DS1 ----> BLOC1 ---+
|                    |
| DS2 ----> BLOC2 ---+---- BLOC3 -----> DS3

Right.

| is an exemple of what it could be. Every thing is a bloc. DSx are 
| datasources (there are also blocs). BLOCx are some treatment bloc that 
| operate with their inputs, and provide the result on their outputs.
| 
| this is the algorithm comming with :
| 
| all bloc are "inactive"
| for bloc in blocs:
|     if len(bloc.inputs) is 0:
|         bloc is "active" # it should be a datasource/producer
| 
| gotData(bloc, lines) :
|     for line in lines:
|         fill in connected blocs's inputs with outputs
|         connected blocs are active
|         while connected blocs's inputs are set:
|             wait # I know, this sucks
|     bloc is inactive
| 
| while there are still active blocs  
|     for bloc in blocs
|         if all inputs are available (!= None)
|             bloc.proceed().addCallback(gotData)

Ok.  So, basically, you have an input queue for each
code block, and when a particular condition on the 
queue is triggered, the code block fires.  For BLOC1,
it'd be any input from DS1, for BLOC2, it'd be input
from DS2.  For BLOC3, it'd be both input from BLOC1
and BLOC2.  

This is interesting.  With the new-improved flow 
(aka extrepum's method), this would be listed as
3 generators... plus 2 built-in generators.

DS1 = flow.QueryIterator("some query")
DS2 = flow.QueryIterator("another query")

def BLOC1:
    ds1 = flow.Generator( DS1 )
    yield ds1   # think of as ds1.next()
    while ds1.active:
        result = ds1.result   # value of ds1.next()
        //
        // process ds1.result, possibly
        // sending the result to the next
        // stage using:   yield result
        //
        yield ds1

def BLOC2:
    ds2 = flow.Generator( DS2 )
    yield ds2
    while ds2.active:
        result = ds2.result
        //
        // process ds2, calling yield result
        // to pass on value to next stage
        //
        yield ds2

def BLOC3:
    bl1 = flow.Generator(BLOC1())
    bl2 = flow.Generator(BLOC2())
    yield bl1; yield bl2
    while bl1.active and bl2.active:
        // do somethign with bl1.result and bl2.result
        yield bl1; yield bl2

d = flow.DeferredFlow(BLOC3)
d.addBoth(lambda x: print 'done')
reactor.start()

I'll make a test like this for the flow module so that
it's more clear.   This is such a *HUGE* improvement over
what I was doing... thank you _so_ much extrepum.

Best,

;) Clark




More information about the Twisted-Python mailing list