<font size="2" face="Arial, sans-serif"><div>How does the below example
look as a complete producer/consumer example? If it's well received, perhaps we can add it to the online
documentation at <a href="http://twistedmatrix.com/documents/current/core/howto/producers.html" target="_blank">http://twistedmatrix.com/documents/current/core/howto/producers.html</a> in the "<span></span><a name="auto11"><span></span></a><a name="auto11">Further Reading" section</a>? I always felt that the producer/consumer (a.k.a. high
volume streaming) docs lacked a real example that users could download
and run.</div>
<div> </div>
<div>#!/sw/external/python-2.6.1/bin/python</div>
<div>"""Serve as a sample implementation of a twisted producer/consumer</div>
<div>system, with a simple TCP server which asks the user how many random</div>
<div>integers they want, and it sends the result set back to the user, one</div>
<div>result per line."""</div>
<div> </div>
<div>import random</div>
<div> </div>
<div>from zope.interface import implements</div>
<div>from twisted.internet import interfaces, reactor</div>
<div>from twisted.internet.protocol import Factory</div>
<div>from twisted.protocols.basic import LineReceiver</div>
<div> </div>
<div>class Producer:</div>
<div> """Send back the requested number of random integers to the client."""</div>
<div> implements(interfaces.IPushProducer)</div>
<div> def __init__(self, proto, cnt):</div>
<div> self._proto = proto</div>
<div> self._goal = cnt</div>
<div> self._produced = 0</div>
<div> self._paused = False</div>
<div> def pauseProducing(self):</div>
<div> """When we've produced data too fast, pauseProducing() will be</div>
<div>called (reentrantly from within resumeProducing's transport.write</div>
<div>method, most likely), so set a flag that causes production to pause</div>
<div>temporarily."""</div>
<div> self._paused = True</div>
<div> print('pausing connection from %s' % (self._proto.transport.getPeer()))</div>
<div> def resumeProducing(self):</div>
<div> self._paused = False</div>
<div> while not self._paused and self._produced < self._goal:</div>
<div> next_int = random.randint(0, 10000)</div>
<div> self._proto.transport.write('%d\r\n' % (next_int))</div>
<div> self._produced += 1</div>
<div> if self._produced == self._goal:</div>
<div> self._proto.transport.unregisterProducer()</div>
<div> self._proto.transport.loseConnection()</div>
<div> def stopProducing(self):</div>
<div> pass</div>
<div> </div>
<div>class ServeRandom(LineReceiver):</div>
<div> """Serve up random data."""</div>
<div> def connectionMade(self):</div>
<div> print('connection made from %s' % (self.transport.getPeer()))</div>
<div> self.transport.write('how many random integers do you want?\r\n')</div>
<div> def lineReceived(self, line):</div>
<div> cnt = int(line.strip())</div>
<div> producer = Producer(self, cnt)</div>
<div> self.transport.registerProducer(producer, True)</div>
<div> producer.resumeProducing()</div>
<div> def connectionLost(self, reason):</div>
<div> print('connection lost from %s' % (self.transport.getPeer()))</div>
<div>factory = Factory()</div>
<div>factory.protocol = ServeRandom</div>
<div>reactor.listenTCP(1234, factory)</div>
<div>print('listening on 1234...')</div>
<div>reactor.run()</div>
<div> </div>
<div>Use on the client:</div>
<div> </div>
<div>$ telnet localhost 1234</div>
<div>Trying 127.0.0.1...</div>
<div>Connected to localhost.</div>
<div>Escape character is '^]'.</div>
<div>how many random integers do you want?</div>
<div>5</div>
<div>431</div>
<div>7201</div>
<div>3289</div>
<div>9604</div>
<div>6659</div>
<div>Connection closed by foreign host.</div>
<div>$</div>
<div> </div>
<div>Use on the server (observe how the server pauses production
sometimes - this happens when a large data set is requested by the
client):</div>
<div> </div>
<div>$ ./streaming.py </div>
<div>listening on 1234...</div>
<div>connection made from IPv4Address(TCP, '127.0.0.1', 54859)</div>
<div>connection lost from IPv4Address(TCP, '127.0.0.1', 54859)</div>
<div>connection made from IPv4Address(TCP, '127.0.0.1', 54864)</div>
<div>pausing connection from IPv4Address(TCP, '127.0.0.1', 54864)</div>
<div>pausing connection from IPv4Address(TCP, '127.0.0.1', 54864)</div>
<div>pausing connection from IPv4Address(TCP, '127.0.0.1', 54864)</div>
<div>pausing connection from IPv4Address(TCP, '127.0.0.1', 54864)</div>
<div>connection lost from IPv4Address(TCP, '127.0.0.1', 54864)</div></font>[...]<br><br>Thanks.<br>-- <br>Benjamin Rutt<br>