[Twisted-Python] process output modulation

David dvkeeney at gmail.com
Tue Dec 21 13:51:04 MST 2010


Thank you.

I was thinking along those lines, but wasn't sure how the
producer/consumer methods related to the streams model.  I guess I
still don't know, but I now have more confidence that a solution is
there to find.

I was considering a klunky workaround, where the db output is piped to
a throttling program which pipes to the web app.  I still have the
problem of telling that filter how fast to go, but that is more of a
unix domain problem and less of a twisted domain problem.  Doable, but
yuck.

I guess it's time to dig into the reactor.spawnProcess code and find
where the producer-related method calls are.

David

On Tue, Dec 21, 2010 at 11:51 AM,  <exarkun at twistedmatrix.com> wrote:
> On 06:14 pm, dvkeeney at gmail.com wrote:
>>We are streaming a large amount of program output to the browser via a
>>twisted app, and we are seeing huge memory consumption.
>>
>>We have a database process that generates large amounts of data to
>>stdout, and we are streaming that to the browser through a twisted
>>web2 app.  We are using web2 because it supports upload streaming as
>>well.
>>
>>Our code looks like:
>>
>>        env = {}
>>        input = stream.MemoryStream('')
>>        SQLDUMP = '/usr/bin/dump'
>>
>>        pstream = stream.ProcessStreamer(input, SQLDUMP,
>>                             [SQLDUMP,'--format=p','--clean',
>>                              '--host='+cfg.SOCKET,
>>                              '--username='+self.role,dbname],
>>                             env)
>>        pstream.run()
>>
>>        outstream = WatchedStream(pstream.outStream)
>>
>>        response = http.Response( headers=headers, stream=outstream)
>>
>>
>>        class WatchedStream(object):
>>
>>           def __init__(self,stream):
>>                self.stream = stream
>>           def split(self, point):
>>                ... some implementation
>>           def close(self):
>>                ... some implementation
>>           def read(self):
>>                d = self.stream.read()
>>                bufSize = sum( [len(b) for b in self.stream.buffer if
>>b])
>>                log.msg('buffer size: %s'%bufSize)
>>                return d
>>
>>Watching the log shows us that the stream (a web2.ProducerStream)
>>buffer is growing continuously to hundreds of MB.  Doesn't a stream
>>object have a bufferSize attribute and the ability to throttle the
>>flow of data based on buffer fullness?  Does that throttling behavior
>>have to be triggered explicitly?
>>
>>Yes, I know that web2 is deprecated, but I don't know that the problem
>>is in the web2 components.  The reactor.spawnProcess documentation
>>does not seem to address the matter of modulating the read speed.  Any
>>assistance will be appreciated.
>
> You probably want to pause Twisted's child process reader when you
> notice your buffer is getting too large.  Off hand, I can't say how you
> might translate this advice into specific stream API calls, but
> ultimately you want to call `pauseProducing` on something.
>
> It's also possible you'll have to go through some not-quite-documented
> interfaces(/implementations) to get there.  IProcessTransport has no
> pauseProducing method, but the POSIX implementation of that interface
> has a `pipes` dictionary where the values are `IProducer` providers, so
> you can call `pauseProducing` on them (or just the one for, say, stdout,
> if that's where you're getting bytes from).
>
> Later, of course, you'll want to undo the pause with a call to
> `resumeProducing`.
>
> Jean-Paul
>
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>



-- 
dkeeney at travelbyroad.net
Rdbhost -> SQL databases as a webservice [www.rdbhost.com]




More information about the Twisted-Python mailing list