[Twisted-Python] Experimenting with tubes

Glyph Lefkowitz glyph at twistedmatrix.com
Thu Aug 14 15:29:56 MDT 2014


On Aug 11, 2014, at 6:51 AM, ccx at webprojekty.cz wrote:

> Hello, I've been playing with the new tubes that are being implemented:
> http://comments.gmane.org/gmane.comp.python.twisted/27248
> https://twistedmatrix.com/trac/ticket/1956

Thanks so much for taking the time to play with it, and taking some time to write feedback.

> Here are few things that I did with it. I won't publish the full code now,
> as in it's current shape it could implode eyeballs of twisted devs and
> possibly make them summon some of the elder gods, but I'll see if I can
> produce something less vile as I merge the ongoing changes to the tubes
> branch.

I'd be interested to see the code nevertheless.  If you had to do eyeball-imploding antics to get Tubes to work well for your use-case, being able to have a look at that would help us evaluate whether those antics were required by the code, encouraged by misfeatures of the API design, or just issues with lack of documentation.

> So far I wrote relatively simple app that read logfiles, parse them and
> insert what they got out of them into a database.

If it's actually reading a file, another nice to-do would be an IFount provider that provides the contents of a file with appropriate flow control, and maybe a thread or process in the background to do the file I/O.  Another thing you could contribute to the branch, possibly?  :-)  How did you implement this?

> First issue that I've
> dealt with is stopping the tubes. When I read the whole of the input I want
> to wait until all of it was parsed (atm synchronous code, but I can imagine
> eg. some expensive processing being done in thread / external process) and
> then wait until it's commited to the database before shutting the reactor
> down cleanly.

To translate this into Tubes terminology, what it sounds like you want is a way for a drain (in this case, the drain representing the database transaction) to authoritatively signal that it has completed consuming all of the inputs that it has received from its fount.  But the only notification that you can get now is stopFlow, which just means "cut it out" and not "I'm done" - not to mention that "stopFlow" is unnecessary after "flowStopped" which makes it meaningless in the current idiom.

> As of #42908 which I pulled for experimenting the support for passing
> flowStopped(reason) through pipeline (or series if you want) was not
> working, an issue with None being returned from stopped() ended the
> processing prematurely, which I fixed with:
> 
> === modified file 'tubes7/tube.py'
> --- tubes7/tube.py	2014-08-01 18:32:48 +0000
> +++ tubes7/tube.py	2014-08-01 21:20:44 +0000
> @@ -441,6 +446,8 @@
>                 downstream.flowStopped(f)
>             return
>         if iterableOrNot is None:
> +            if self._flowStoppingReason is not None:
> +                self._tfount.drain.flowStopped(self._flowStoppingReason)
>             return 0
>         self._pendingIterator = iter(iterableOrNot)
>         if self._tfount.drain is None:

I'm not sure I totally understand the case that you're describing right now.  Can you perhaps contribute a unit test which demonstrates why this line of code is necessary?

> Also the ProtocolFount didn't really do what it should, so I made it
> implement IHalfCloseableProtocol and made it call flowStopped() accordingly.

Yeah that is definitely a known issue on our to-do list.  I think it's even in the notes.rst in the branch.  Can we have your patch?  (You wrote tests, too, right? ;-))

> One more thing about it I did is that I made it invoke flowStopped() on any
> drain that is newly attached to it - apparently when I used the stdio
> endpoint it managed to close it when reading from /dev/null even before I
> managed to set up the series/pipeline.

Are you running into <https://twistedmatrix.com/trac/ticket/7546>?

> That still didn't make it possible for me to wait on DB being written to
> properly. What I had to do is to implement CloseableDrain that has
> waitDone() method that emits a Deferred that fires when the drain's
> flowStopped() was called and all it should do has been done. This makes it
> quite handy to use from react()-style setup since I can just return this
> Deferred, or DeferredList of all ongoing pipelines.

There are some thoughts I have about the database transaction thing:

Maybe this should be done out of band?  Right before I read this paragraph I was thinking of something like this; a database transaction is a separate thing.  The data has in fact flowed to the appropriate point.
Rather than having an "I'm completely done" notification as I proposed above, we could have an explicit notion of application-level acknowledgements of each receive(...) call?
Maybe those acknowledgements could themselves be coming from a Fount in the reverse direction of the data, rather than trying to put in actual bi-directional data flow into the core interfaces?  A recipe in terms of the existing abstraction, rather than an extension?  This is sort of how real app-level acknowledgements work: the recipient has to send its own message to indicate receipt of the message it received.

> For the next pipeline I had one more issue: this pipeline can be run either
> as a log reader, or as essential part of running program that emits such
> logs. In the latter case I need to generate confirmation messages for
> specific entries that are being inserted and send them back to the
> originator, after they has been safely written to the DB. This I resolved by
> adding another field into the values I pass into PostgreSQLDrain - deferred
> that will be fired as txpostgres's runOperation finishes. This resolution
> works pretty well but it took me quite a while to come up with it, so I'm
> not sure if it's intuitive design pattern or if we could come up with
> something better.

I'm glad I didn't read ahead in this message as I'm replying to it, because I can see that we're thinking along very convergent lines :-).  This sounds just like the the 3rd point in the proposal I was saying before: we should have a recipe present for acknowledgements somewhere.

> Then I had to run both pipelines in parallel, after implementing the fan-in
> pattern (fan-out was already done by glyph), I wrote this helper function:
> 
> def parallel(*tubes):
>    out = Out()
>    in_ = In()
>    out._drain.nextFount = in_
>    for tube in tubes:
>        out.newFount().flowTo(series(tube, in_.newDrain()))
>    in_.stop_on_empty = True
>    return out.drain

We actually started working on twisted.tubes.fan to implement something very much like this that we hadn't gotten to yet!  David was calling this pattern "fork/join" and we were debating whether we needed infrastructure code to do this.

> The nextFount attribute on _OutDrain is what is returned from flowingFrom()
> so this function can be used as a part of series. What I'm unsure about is
> how to handle stopping of the fan-in. Currently I don't make it stop until
> the stop_on_empty is set

That ... definitely sounds kind of gross.  As does actually setting the nextFount attribute directly on the fan.Out.

> (so I can add/remove things during it's
> initialization) and then I make it stop when the last fount that's flowing
> in has stopped (and removed from input founts set) and I use the reason it
> passes into flowStopped() to propagate along to the rest of series,
> effectively discarding any reason objects passed to all the founts except
> the last one.

twisted.web.client.Agent has a solution to this where there's a multi-failure object that aggregates multiple errors into one thing.  I think we have to do something similar.  Unfortunately this is a very confusing interface in addition to being poorly documented and relies on private classes that expose ostensibly public attributes.  We need to very carefully document this within fan.In.

> What I'll have to deal with is a lack of sensible flow control in some parts
> of the code. For example the part that generates the log files should not be
> stopped just because there's some delay in writing the logs. This made me
> wonder if the flow control and perhaps processing confirmation should not be
> run not as a part of the main interface but instead something that runs
> alongside, where applicable, in the opposite direction. But I don't have any
> specific API in my mind at the moment. On the other hand, both are perfectly
> solvable with current design - implementing FIFO buffers or message droppers
> for flow control and the above mentioned deferred passing for confirmations.

So, at some level, yes it should be stopped because there's a delay in writing the logs.  By which I mean that if you don't want to stop it, you have to choose an explicit, finite amount of memory or disk to use for buffering.  FIFO buffers and message droppers are (obviously) very important flow-control intermediaries, but at the end of the day you have a finite amount of resources and the idea that flow-control should be "optional" means that sometimes (read: "usually") you have an infinite amount of RAM and disk and you're perfectly happy to put it all to work whenever you experience a network partition that stalls your TCP stack.

> As for data representation that I choose to pass between each tube I've
> started with simple namedtuples and following that I've built a simple
> "datatype" class somewhat reminiscent of
> https://github.com/hynek/characteristic
> which I learned of few moments after I finished polishing my own
> implementation. What I have there is added layer above namedtuples that
> autogenerate zope Interfaces (so I can have adaptation), do field type and
> value validation/adaptation and possibly (as a future extension) provide
> easy way to make them into AMP commands so the series can be split into
> communicating processes as needed. (What would be interesting imo is
> something like ampoule for tubes, or perhaps a ThreadTube and SubprocessTube
> for performing blocking operations)

I think it's likely we'll acquire a dependency on Characteristic sometime soon, I have promised to look at the issues on <https://github.com/hynek/characteristic/pull/13> and try to address them already :).

> Also maybe of note is the implementation of Pipes in Async library for OCaml
> which I've been examining lately. What they seem to do there is that they
> push values downstream and the function called in each processing step may
> return deferred signifying a pause is requested until this deferred is
> fired. For those interested in the details you can refer to:
> https://ocaml.janestreet.com/ocaml-core/111.25.00/doc/async/#Std.Pipe
> and the relevant section of Real World OCaml book (available online).

Creating a token for every single call to .receive() makes life hard.  Deferred could go to some trouble to be a cheaper token to pass around (especially on PyPy) but doing it this way is also error-prone as a mistaken error-handler in the Deferred chain means that the default behavior of buggy code un-hooks your loop and leaves idle data sources that will never be cleaned up.

(The fact that each call to .pauseFlow returns a token is me trying to rehabilitate myself from worrying about the performance side of this argument and worry more about the correctness / error-prone-ness part.  The PyPy developers, especially Alex Gaynor, have almost convinced me that it is OK to malloc things, sometimes.  Sometimes.)

I worked quite a bit with the 'Streams' interface in web2 on Calendar Server, and my conclusion there is that while this is better than nothing (it was very nice to be able to just return a Stream rather than cobble together something that returned NOT_DONE_YET every time) it was (A) slow and (B) error prone.  Tubes are designed specifically to avoid this error.  Although you can return Deferreds internally, no consumer ever needs to write the callback-loop that calls .read() again from a callback on .read().

> Looking forward to further tubes development :-)

As am I.  Thanks for all the feedback and encouragement!  This was very useful.  My main takeaway is that we definitely have some missing utility classes (file fount, FIFO queue, message dropper, fan.In, an idiom and supporting code for processing message acknowledgements at the application level), some bugs (something about flowStopped not propagating correctly?), but that the interfaces as they stand are largely on the right track.

-glyph

-------------- next part --------------
An HTML attachment was scrubbed...
URL: </pipermail/twisted-python/attachments/20140814/33b72ec2/attachment-0002.html>


More information about the Twisted-Python mailing list