Opened 8 years ago

Last modified 32 hours ago

#1956 enhancement assigned

"Tubes": Create a composable, flow-control and back-pressure friendly way to arrange parsing, processing, and emitting data

Reported by: jknight Owned by: glyph
Priority: normal Milestone:
Component: core Keywords:
Cc: glyph, exarkun, itamarst, radix, jknight, oubiwann, ashfall, twistedmatrix.com@…, tobias.oberstein@…, Julian+Twisted@…, free@…, daf@… Branch: branches/tubes-1956-7
(diff, github, buildbot, log)
Author: ashfall, glyph Launchpad Bug:

Description (last modified by glyph)

The current producer/consumer API has a couple of problems:

  • It's impossible to get notifications of buffer size changes, which is important for certain kinds of timeout logic. For example, if you buffer a bunch of output, and then time out if the peer is inactive (isn't reading any data), then you need to know not just when the buffer is empty but when some data is consumed.
  • The streaming flag doesn't really make sense. It complicates the implementation of nested producers. While it doesn't actually break anything, it's just a bug. There's no reason to have it and it should be removed.
  • It's awkward to chain producers and consumers with each other (XXX: this problem needs better documentation, use case, example of awkward code).
  • IProtocol, ITransport, IProducer and IConsumer all have redundant "here's some data" / "there will be no more data" methods which are subtly different. These should be unified so that the protocol consuming data from the transport and the transport consuming data from the protocol look the same.

We should design an API which does not have these problems.

Attachments (1)

tubes-1956-3-howto-corrections.patch (6.8 KB) - added by rwall 12 months ago.
One or two corrections and suggestions for the tubes tutorial.

Download all attachments as: .zip

Change History (55)

comment:1 Changed 8 years ago by jknight

  • Cc itamarst added; itamar removed

comment:2 Changed 8 years ago by jknight

Here's a rough sketch to get started with.

Producer:

  • beProducing() -> Deferred: register the consumer and return a d called back when done producing (or errored), but don't start writing (badly named function)
  • bufferFull() -> None: when called, _MUST_ not call writeFn again. Called on state change only
  • bufferEmpty(writeFn) -> None: you can start writing data by calling writeFn. Called on state change only.
  • abort(failure): close down any internal state, and errback the deferred with the given failure.
  • split(offset) -> IProducer, IProducer: return two producers, one that stops at offset bytes, one that starts there. (optional? provided by ABC?). Only callable while not producing?

Consumer:

  • consumeFrom(Producer) -> Deferred: registers producer, adds internal callback to prod's returned deferred, and returns the d from the producer.

comment:3 Changed 8 years ago by jknight

Here's a rough sketch to get started with, but readable.

Producer:

  • beProducing() -> Deferred: register the consumer and return a d called back when done producing (or errored), but don't start writing (badly named function)
  • bufferFull() -> None: when called, _MUST_ not call writeFn again. Called on state change only
  • bufferEmpty(writeFn) -> None: you can start writing data by calling writeFn. Called on state change only.
  • abort(failure): close down any internal state, and errback the deferred with the given failure.
  • split(offset) -> IProducer, IProducer: return two producers, one that stops at offset bytes, one that starts there. (optional? provided by ABC?). Only callable while not producing?

Consumer:

  • consumeFrom(Producer) -> Deferred: registers producer, adds internal callback to prod's returned deferred, and returns the d from the producer.

comment:4 Changed 8 years ago by glyph

Hooray a ticket.

AMP's streaming stuff will need something like this too. I will make a separate ticket for that with a detailed description tonight (I will try to do it with the current cons/prod API first).

I assume beProducing() should actually be produceTo(consumer) or something like that?

Can we modify this so that there are no Deferreds in the lowest level of the API? Instead, add finishedConsuming(consumer) and finishedProducing(producer) and APIs to Producer and Consumer respectively?

Sometimes, the producer wants to be "on top" (you have some data, you want to write it somewhere). Sometimes, the consumer wants to be "on top" (you want to get some data and you want to pass along a place to put it). How do we reconcile those two use cases? It sounds like it's pretty close in this sketch, but some detailed description of how beProducing and consumeFrom interact would be good.

comment:5 Changed 8 years ago by glyph

To clarify, I'm not saying there should be no Deferreds anywhere, just that they shouldn't be the "primitive" here. Convenience APIs that return Deferreds are good, and absolutely necessary for short scripts.

comment:6 Changed 8 years ago by glyph

  • Priority changed from normal to highest

comment:7 Changed 8 years ago by glyph

  • Cc radix added

comment:8 Changed 8 years ago by jknight

I was wondering about how you were going to do AMP's streaming, because it's almost exactly the same issues as I ran into for web2's streams..

I'm pretty sure calling it produceTo is not a good idea, because it doesn't actually take a consumer as an argument. One thing I like about my proposal is that the consumer actually has no API from the producer's standpoint. I think that is a good feature and shouldn't be discarded without thinking hard about it.

"beProducing" should never be called directly, it should always be called through a consumer's consumeFrom function. If you're passing around a data stream (producer), you call transport.consumeFrom(myDatastreamObject) when you finally find the place to put it. If you're passing around a data sink (consumer), you still call sink.consumeFrom(myDatastreamObject) when you finally find some data to send it. I don't think that's actually a problem?

It should've said:

Consumer:

  • consumeFrom(Producer) -> Deferred: registers producer, calls producer.beProducing(), adds internal callbacks to returned deferred (to deregister producer), and returns the d.

comment:9 Changed 8 years ago by jknight

To expand a bit on that, your finishedProducing(producer) would be the equivalent of calling back the deferred from beProducing on the producer. finishedConsuming() doesn't have any analog in the above API, and I can't see a need to have it, either. Either the consumer gets all the data streamed to it, or it pauses with bufferFull(), or it has an error and calls abort(). I'm not sure what a finishedConsuming() would be useful for?

comment:10 Changed 8 years ago by glyph

Hmm. I hadn't considered the "Has no API" feature. That's pretty cool.

.split() bugs me though. I think that split() is actually an API that wants to be on the transport, .switchProtocol(newProto, pushbackData). I am kind of in a hurry right now but if that doesn't make sense I will expound later.

What I think I'm going to be doing for AMP's streaming is creating an argument type of "protocol factory", which takes a client protocol factory on the requesting side. If you want to upload, you pass that and then start writing when you get a successful response, if you want to download, you pass it and call connectionMade as soon as you get a success response from the responder, then wait for data.

comment:11 Changed 8 years ago by radix

  • Cc jknight added

How about a P/C party day at Belmont and Highland this weekend? James, Glyph, Jp, Itamar?

comment:12 Changed 8 years ago by itamarst

saturday?

comment:13 Changed 8 years ago by radix

sounds good to me

comment:14 Changed 8 years ago by exarkun

There's some code in svn now

comment:15 Changed 8 years ago by glyph

OK so this weekend obviously isn't good but can we try to have another producer/consumer party on the 12th?

comment:16 Changed 8 years ago by glyph

  • Description modified (diff)

Updating the description a little bit (everybody else please feel free to add more).

comment:17 Changed 8 years ago by jknight

I looked at exarkun's code, and it's intriguing, but it's lacking a bit in explanation and rationale.

Glyph: split() is most certainly not something that belongs to a transport. See the uses of it in web2. It's mostly used there for splitting up _outgoing_ data. It is an operation that takes place on a steram. It can be implemented generically for any stream, but can also have a more optimized implementation for streams where the underlying data is seekable (like files).

Also, I don't recommend modifying the description, as that doesn't have history, which is why I originally made it so terse. I do wish there was a "reply" option or somesuch for comments which would copy them into the text box with their original formatting, though.

Finally, as I said to itamar earlier, I think web2's stream module should be looked at as a use-case for this new thing. If the new P/C API can make the implementation of web2's stream module completely trivial, I think that will be a success.

comment:18 Changed 8 years ago by glyph

Re: split(): OK. I don't think this has anything to do with p/c; if I understand correctly, split() is used to implement range requests, which are a rather web-specific bit of functionality. You can pretty trivially create a file wrapper for that which is then passed to the generic file producer; why does this need to be dealt with inside the p/c layer itself?

Re: description: OK, I won't make too many changes to the description, although I think posting a current summary of the consensus there is good so that people trying to participate in the discussion don't have to go through the whole thing. (I suspect this discussion is going to get quite a bit longer.)

Re: stream module: That is definitely the motivating use-case. While the edges of the old API are a bit rough (see especially the description comment about the streaming flag) they are adequate to just about everything they've been put to so far. The web2 streams stuff seems to be the only area where there's major contention over this API.

comment:19 follow-up: Changed 8 years ago by glyph

  • Priority changed from highest to low

I'm dropping the priority on this because the only real reason I considered it "high" was my desire to remove IStream from web2 before declaring its API supported; practically speaking, Apple's use of this interface has eliminated our chance to do that.

comment:20 Changed 8 years ago by exarkun

  • Owner changed from glyph to exarkun
  • Priority changed from low to normal

comment:21 in reply to: ↑ 19 Changed 8 years ago by jknight

Replying to glyph:

As noted in the other ticket, it is not the case that it is too late.

comment:22 Changed 3 years ago by <automation>

  • Owner exarkun deleted

comment:23 Changed 3 years ago by glyph

  • Owner set to glyph
  • Status changed from new to assigned

I am Having Ideas about Things.

comment:24 Changed 3 years ago by itamar

You might want to take a look at Lamina: https://github.com/ztellman/lamina

comment:25 Changed 2 years ago by glyph

OK. I've looked at Lamina, and it looks like most of the interesting ideas I already independently invented in a way which is more directly relevant to Twisted.

I really need to get the sketches that I've written into a Twisted branch somewhere, but I'm hung up on naming everything.

comment:26 Changed 2 years ago by itamar

BTW, for BodyProducer we discovered that having an optional length attribute is very useful, e.g. for HTTP. I suspect that having length as an optional attribute for founts would either make the "progress bar" API simpler to implement, or possibly make it completely unnecessary.

comment:27 Changed 2 years ago by oubiwann

  • Cc oubiwann added

comment:28 Changed 2 years ago by glyph

  • Author set to glyph
  • Branch set to branches/tubes-1956

(In [34377]) Branching to 'tubes-1956'

comment:29 follow-up: Changed 2 years ago by itamar

I know you're still in the process of turning prototype into real code, but I'd like to reiterate that BodyProducer's optional length attribute might make a superior alternative to the floating point progress thing, since you can either say "I've gotten 100 out of the 923 items I expect" or "this fount has no length so tracking progress doesn't make sense" instead of arbitrary floats which don't even make sense in the unknown length case.

comment:30 Changed 2 years ago by glyph

OK, the prototype is in there now, and it's even got some half-decent test coverage and an example that a sufficiently imaginative and motivated contributor should be able to understand as a general statement of purpose.

I have obviously not had the time and energy to devote to this that it requires to get finished, so I encourage people to write tests for the un-tested parts; just dive in, don't ask for my permission, it's all in a branch and I'm sure we'll get it sorted out in review. Every little bit helps!

comment:31 in reply to: ↑ 29 Changed 2 years ago by glyph

Replying to itamar:

I know you're still in the process of turning prototype into real code, but I'd like to reiterate that BodyProducer's optional length attribute might make a superior alternative to the floating point progress thing, since you can either say "I've gotten 100 out of the 923 items I expect" or "this fount has no length so tracking progress doesn't make sense" instead of arbitrary floats which don't even make sense in the unknown length case.

The floating-point progress thing and the length attribute are for two different use-cases. I'm not in love with floating point as the type which expresses progress, and if we can think of something better that would be great (maybe a fractions.Fraction? that strikes me as too-clever-by-half though...) but it does something different.

The current progress API, as implemented, is there to tell the IDrain provider that a certain amount of progress has been made towards the next call to receive. The optional length attribute would be there to inform an IDrain receiving ISegments the total of len() of all of the things passed to its receive.

I am increasingly of the opinion that the optional attribute should simply be an additional interface, since not much about the general data-flow model is served by mixing it in to the initial implementation. Then, your IDrain can, in flowingFrom(), say, if IContentLength.providedBy(fount): doSomethingWith(fount.contentLength). (Or, hasattr(fount, "length") if that's what floats your boat).

The progress() event is there to provide a very specific feature, and the docstring explains this, although I guess it could be clearer. I'll give it another shot here and maybe you can tell me what parts you like of each explanation.

The issue has to do with layering and information hiding. If you have an IFount which is delivering large-ish objects (let's say: parsed PIL images being downloaded via an AMP protocol), and you expect them to be continuously delivered, you want to have two timeouts: one for "some activity has transpired on the underlying connection" and one for "I haven't received an image in X seconds"; both of these timeouts should be controlled by the ultimate consumer, not by any intermediary parsing layer, but the intermediary parsing layer needs to somehow communicate information about some arbitrary not-sure-if-it's-a-whole-image-yet bytes arriving on the underlying connection, from bytes through amp all the way to the consumer of the parsed images. I should note that in this example (maybe the images are being displayed to a user in a slideshow) neither the stream of images nor the stream of bytes are finite: there's a sequence of bytes and the bytes are composed into AMP boxes and there's a sequence of AMP boxes and those are composed into images. In other words, it's not like the stream itself has a content length; it's an implied length of the next message. Whereas, if you have an IFount that represents an HTTP response body, the fount itself is finite and will not keep delivering data forever, and has a content length.

This is why .progress() by default takes no argument; in most cases, I think, the argument won't actually be provided, since a lot of different data flows don't provide enough information to determine it. It's just a notification that something is happening. It can optionally be provided with a float to provide additional information for more specific use-cases where such information is available. Which, again, might not be precise even then; if you know you're going to receive 100 AMP boxes, you might not know how big each one is going to be in terms of bytes or time. So, even if we were to use .progress() to communicate content-length, we'd need to extend it a little bit to say "no, really, I'm quite sure about this, you can rely on it".

comment:32 Changed 2 years ago by jml

  • Cc ashfall added

comment:33 Changed 2 years ago by glyph

comment:34 Changed 23 months ago by ralphm

  • Cc twistedmatrix.com@… added

comment:35 Changed 22 months ago by ashfall

  • Author changed from glyph to ashfall, glyph
  • Branch changed from branches/tubes-1956 to branches/tubes-1956-2

(In [35753]) Branching to 'tubes-1956-2'

comment:36 Changed 22 months ago by oberstet

  • Cc tobias.oberstein@… added

comment:37 Changed 16 months ago by glyph

  • Branch changed from branches/tubes-1956-2 to branches/tubes-1956-3

(In [37894]) Branching to 'tubes-1956-3'

comment:38 Changed 16 months ago by Julian

  • Cc Julian+Twisted@… added

comment:39 Changed 15 months ago by glyph

This might also make it less important to be able to compose protocols and transports themselves.

comment:40 Changed 15 months ago by glyph

  • Summary changed from Make a less sucky producer/consumer API to "Tubes": Create a composable, flow-control and back-pressure friendly way to arrange parsing, processing, and emitting data

comment:41 follow-up: Changed 15 months ago by radix

I have a use case, and I wonder what you think of it.

Several times, I've implemented client APIs that abstract over the connection to the service and queue up requests to be sent if there is no current connection (service is just starting up or there was a brief interruption in service, like the remote end is being restarted or whatever).

I think it would be nice for these client APIs to provide a consumer-like interface, so that they can indicate to the senders of the current backpressure status. On top of that, I've found it usually necessary to implement a *hard* limit on how many requests can be buffered, because in a high-volume service it's easy to build up so many buffered requests that your process begins swapping and thrashing.

I need to look at Tubes some more, but I think I've already determined that there's no Interface-declared to indicate when the buffer has been maxed out and the consumer will refuse to accept new requests. Of course it's easy to provide this in the implementation; just make the method that accepts new requests raise an implementation-defined exception when the implementation-defined configuration for buffer size is exceeded. Do you think there's a benefit to the Tubes interface specifying how this should be done?

comment:42 in reply to: ↑ 41 Changed 14 months ago by glyph

Replying to radix:

I have a use case, and I wonder what you think of it.

Several times, I've implemented client APIs that abstract over the connection to the service and queue up requests to be sent if there is no current connection (service is just starting up or there was a brief interruption in service, like the remote end is being restarted or whatever).

Sure, this makes sense.

I think it would be nice for these client APIs to provide a consumer-like interface, so that they can indicate to the senders of the current backpressure status.

That should be pretty easy with the interfaces in the branch.

On top of that, I've found it usually necessary to implement a *hard* limit on how many requests can be buffered, because in a high-volume service it's easy to build up so many buffered requests that your process begins swapping and thrashing.

I need to look at Tubes some more, but I think I've already determined that there's no Interface-declared to indicate when the buffer has been maxed out and the consumer will refuse to accept new requests. Of course it's easy to provide this in the implementation; just make the method that accepts new requests raise an implementation-defined exception when the implementation-defined configuration for buffer size is exceeded. Do you think there's a benefit to the Tubes interface specifying how this should be done?

I'm still open to being convinced otherwise, but the current behavior is somewhat intentional.

Once you've gone to the trouble of synthesizing a request, we might as well buffer it; it's already in memory, and we've already sent the appropriate API call to ask you to stop synthesizing more requests.

It might make sense to raise some kind of catastrophic error like MemoryError to say "you really buffered too much here" or log a message saying "this producer is producing even though we told it not to". But if you're continuing to synthesize new requests that need to be buffered after a pauseFlow notification, then the only utility of such an error or message is to help you quickly locate the bug where said requests are being synthesized. So it doesn't make sense to me to set an arbitrary limit, but rather to figure out how to spot flow-control bugs on the first buffered message that is really out-of-bounds. Since clearly some buffering is OK, it would be good to figure out the earliest point we could reasonably fail so as to make detecting this sort of bug feasible without massive load testing.

comment:43 Changed 13 months ago by free.ekanayaka

  • Cc free@… added

comment:44 Changed 13 months ago by glyph

Progress is continuing; the branch is getting close to a pretty usable / final state. I could use some help with writing the documentation if anyone has time.

comment:45 Changed 12 months ago by daf

  • Cc daf@… added

Changed 12 months ago by rwall

One or two corrections and suggestions for the tubes tutorial.

comment:46 follow-up: Changed 12 months ago by rwall

I just read the tubes tutorial and added one or two corrections (and suggestions) as I went.

See attachment:tubes-1956-3-howto-corrections.patch

comment:47 in reply to: ↑ 46 ; follow-up: Changed 12 months ago by glyph

Replying to rwall:

I just read the tubes tutorial and added one or two corrections (and suggestions) as I went.

See attachment:tubes-1956-3-howto-corrections.patch

Please feel free to commit this (and any future changes you've got) straight to the branch. It's a team effort :).

comment:48 in reply to: ↑ 47 Changed 12 months ago by rwall

Replying to glyph:

Please feel free to commit this (and any future changes you've got) straight to the branch. It's a team effort :).

Ok. and BTW I think it's a very nice intro. I've seen lots of talk
about Tubes and I'm starting to get it.

I know it's a work in progress, but here are some comments about the
tutorial.

  1. doc/core/howto/tube.xhtml
    1. r39609 - corrected some spelling and typos
    2. I think it needs breaking up - more headings.
    3. Needs a conclusion.
    4. "flowTo can return a IFount so we can chain flowTo calls": I may have misunderstood, but after reading the previous sections, I thought "series" did the same thing.
    5. "Note: If you're curious": This note is very difficult to read. Too many Tubes, ITubes, flowTos etc.
  2. doc/core/howto/listings/tubes/intparse.py
    1. The "product" function seems to be unused, but I think it's probably easier to understand than "reducer". "reducer" is neat, but it took me a while to understand and I think it may distract people from the Tubes themselves.

comment:49 Changed 11 months ago by glyph

  • Branch changed from branches/tubes-1956-3 to branches/tubes-1956-4

(In [39934]) Branching to 'tubes-1956-4'

comment:50 Changed 5 months ago by glyph

  • Branch changed from branches/tubes-1956-4 to branches/tubes-1956-5

(In [41854]) Branching to tubes-1956-5.

comment:51 Changed 2 months ago by glyph

  • Branch changed from branches/tubes-1956-5 to branches/tubes-1956-6

(In [42641]) Branching to tubes-1956-6.

comment:52 Changed 4 weeks ago by glyph

  • Branch changed from branches/tubes-1956-6 to branches/tubes-1956-7

(In [42820]) Branching to tubes-1956-7.

comment:53 follow-up: Changed 39 hours ago by hodgestar

Would anything prevent pulling tubes out into its own package and starting to use it now?

comment:54 in reply to: ↑ 53 Changed 32 hours ago by glyph

Replying to hodgestar:

Would anything prevent pulling tubes out into its own package and starting to use it now?

As I mentioned on the mailing list, the big problem is that I have no intention of supporting any sort of compatibility policy within the package until it's "done" (by which I mean: we've constructed some comprehensive documentation of how to use the API, with supporting examples, and there are no giant gaping holes in its test coverage or known flaws in its design). I don't like telling users that they can't expect things to keep working, so while I think it is OK for some projects to have looser compatibility constraints than Twisted, I don't want to have any compatibility constraints at the moment.

I do feel like we are asymptotically approaching correctness, though, as the last few iterations of work on the tubes branch have resulted in progressively less rewriting of the core abstractions.

However, while I don't feel like I'd get a lot of utility out of tubes being spun out into a separate project personally (as I don't think it would speed up development or make it markedly easier for others to contribute given the nature of the code), I also would not stand in the way of someone working on such a split in the hopes that we might get it done faster if they don't agree with that assessment. You, David Reid, Thomas Hervé, and Jean-Paul Calderone have all expressed such sentiments now, and I'm happy to be overruled.

A good first step would be to commit a change to the current branch that changed all the imports within tubes itself to be explicit relative imports, so that the package can be moved into and out of Twisted easily. (I do intend to propose this for inclusion in Twisted core when it's in good shape; I think that its main utility is going to be in refactoring Twisted itself, and I don't want a circular dependency on an external package, which is one reason I want to keep doing development here.)

Note: See TracTickets for help on using tickets.