[Twisted-Python] Experimenting with tubes

ccx at webprojekty.cz ccx at webprojekty.cz
Mon Aug 11 07:51:04 MDT 2014


Hello, I've been playing with the new tubes that are being implemented:
http://comments.gmane.org/gmane.comp.python.twisted/27248
https://twistedmatrix.com/trac/ticket/1956

Here are few things that I did with it. I won't publish the full code now,
as in it's current shape it could implode eyeballs of twisted devs and
possibly make them summon some of the elder gods, but I'll see if I can
produce something less vile as I merge the ongoing changes to the tubes
branch.

So far I wrote relatively simple app that read logfiles, parse them and
insert what they got out of them into a database. First issue that I've
dealt with is stopping the tubes. When I read the whole of the input I want
to wait until all of it was parsed (atm synchronous code, but I can imagine
eg. some expensive processing being done in thread / external process) and
then wait until it's commited to the database before shutting the reactor
down cleanly.

As of #42908 which I pulled for experimenting the support for passing
flowStopped(reason) through pipeline (or series if you want) was not
working, an issue with None being returned from stopped() ended the
processing prematurely, which I fixed with:

=== modified file 'tubes7/tube.py'
--- tubes7/tube.py	2014-08-01 18:32:48 +0000
+++ tubes7/tube.py	2014-08-01 21:20:44 +0000
@@ -441,6 +446,8 @@
                 downstream.flowStopped(f)
             return
         if iterableOrNot is None:
+            if self._flowStoppingReason is not None:
+                self._tfount.drain.flowStopped(self._flowStoppingReason)
             return 0
         self._pendingIterator = iter(iterableOrNot)
         if self._tfount.drain is None:

Also the ProtocolFount didn't really do what it should, so I made it
implement IHalfCloseableProtocol and made it call flowStopped() accordingly.

One more thing about it I did is that I made it invoke flowStopped() on any
drain that is newly attached to it - apparently when I used the stdio
endpoint it managed to close it when reading from /dev/null even before I
managed to set up the series/pipeline.

That still didn't make it possible for me to wait on DB being written to
properly. What I had to do is to implement CloseableDrain that has
waitDone() method that emits a Deferred that fires when the drain's
flowStopped() was called and all it should do has been done. This makes it
quite handy to use from react()-style setup since I can just return this
Deferred, or DeferredList of all ongoing pipelines.

For the next pipeline I had one more issue: this pipeline can be run either
as a log reader, or as essential part of running program that emits such
logs. In the latter case I need to generate confirmation messages for
specific entries that are being inserted and send them back to the
originator, after they has been safely written to the DB. This I resolved by
adding another field into the values I pass into PostgreSQLDrain - deferred
that will be fired as txpostgres's runOperation finishes. This resolution
works pretty well but it took me quite a while to come up with it, so I'm
not sure if it's intuitive design pattern or if we could come up with
something better.

Then I had to run both pipelines in parallel, after implementing the fan-in
pattern (fan-out was already done by glyph), I wrote this helper function:

def parallel(*tubes):
    out = Out()
    in_ = In()
    out._drain.nextFount = in_
    for tube in tubes:
        out.newFount().flowTo(series(tube, in_.newDrain()))
    in_.stop_on_empty = True
    return out.drain

The nextFount attribute on _OutDrain is what is returned from flowingFrom()
so this function can be used as a part of series. What I'm unsure about is
how to handle stopping of the fan-in. Currently I don't make it stop until
the stop_on_empty is set (so I can add/remove things during it's
initialization) and then I make it stop when the last fount that's flowing
in has stopped (and removed from input founts set) and I use the reason it
passes into flowStopped() to propagate along to the rest of series,
effectively discarding any reason objects passed to all the founts except
the last one.

What I'll have to deal with is a lack of sensible flow control in some parts
of the code. For example the part that generates the log files should not be
stopped just because there's some delay in writing the logs. This made me
wonder if the flow control and perhaps processing confirmation should not be
run not as a part of the main interface but instead something that runs
alongside, where applicable, in the opposite direction. But I don't have any
specific API in my mind at the moment. On the other hand, both are perfectly
solvable with current design - implementing FIFO buffers or message droppers
for flow control and the above mentioned deferred passing for confirmations.

As for data representation that I choose to pass between each tube I've
started with simple namedtuples and following that I've built a simple
"datatype" class somewhat reminiscent of
https://github.com/hynek/characteristic
which I learned of few moments after I finished polishing my own
implementation. What I have there is added layer above namedtuples that
autogenerate zope Interfaces (so I can have adaptation), do field type and
value validation/adaptation and possibly (as a future extension) provide
easy way to make them into AMP commands so the series can be split into
communicating processes as needed. (What would be interesting imo is
something like ampoule for tubes, or perhaps a ThreadTube and SubprocessTube
for performing blocking operations)

Also maybe of note is the implementation of Pipes in Async library for OCaml
which I've been examining lately. What they seem to do there is that they
push values downstream and the function called in each processing step may
return deferred signifying a pause is requested until this deferred is
fired. For those interested in the details you can refer to:
https://ocaml.janestreet.com/ocaml-core/111.25.00/doc/async/#Std.Pipe
and the relevant section of Real World OCaml book (available online).

Looking forward to further tubes development :-)
  CcxCZ (freenode) | Jan Pobříslo (IRL)




More information about the Twisted-Python mailing list