[Twisted-Python] process output modulation

exarkun at twistedmatrix.com exarkun at twistedmatrix.com
Tue Dec 21 13:51:57 EST 2010


On 06:14 pm, dvkeeney at gmail.com wrote:
>We are streaming a large amount of program output to the browser via a
>twisted app, and we are seeing huge memory consumption.
>
>We have a database process that generates large amounts of data to
>stdout, and we are streaming that to the browser through a twisted
>web2 app.  We are using web2 because it supports upload streaming as
>well.
>
>Our code looks like:
>
>        env = {}
>        input = stream.MemoryStream('')
>        SQLDUMP = '/usr/bin/dump'
>
>        pstream = stream.ProcessStreamer(input, SQLDUMP,
>                             [SQLDUMP,'--format=p','--clean',
>                              '--host='+cfg.SOCKET,
>                              '--username='+self.role,dbname],
>                             env)
>        pstream.run()
>
>        outstream = WatchedStream(pstream.outStream)
>
>        response = http.Response( headers=headers, stream=outstream)
>
>
>        class WatchedStream(object):
>
>           def __init__(self,stream):
>                self.stream = stream
>           def split(self, point):
>                ... some implementation
>           def close(self):
>                ... some implementation
>           def read(self):
>                d = self.stream.read()
>                bufSize = sum( [len(b) for b in self.stream.buffer if 
>b])
>                log.msg('buffer size: %s'%bufSize)
>                return d
>
>Watching the log shows us that the stream (a web2.ProducerStream)
>buffer is growing continuously to hundreds of MB.  Doesn't a stream
>object have a bufferSize attribute and the ability to throttle the
>flow of data based on buffer fullness?  Does that throttling behavior
>have to be triggered explicitly?
>
>Yes, I know that web2 is deprecated, but I don't know that the problem
>is in the web2 components.  The reactor.spawnProcess documentation
>does not seem to address the matter of modulating the read speed.  Any
>assistance will be appreciated.

You probably want to pause Twisted's child process reader when you 
notice your buffer is getting too large.  Off hand, I can't say how you 
might translate this advice into specific stream API calls, but 
ultimately you want to call `pauseProducing` on something.

It's also possible you'll have to go through some not-quite-documented 
interfaces(/implementations) to get there.  IProcessTransport has no 
pauseProducing method, but the POSIX implementation of that interface 
has a `pipes` dictionary where the values are `IProducer` providers, so 
you can call `pauseProducing` on them (or just the one for, say, stdout, 
if that's where you're getting bytes from).

Later, of course, you'll want to undo the pause with a call to 
`resumeProducing`.

Jean-Paul



More information about the Twisted-Python mailing list