[Twisted-Python] complete producer/consumer example

Benjamin Rutt rutt.4 at osu.edu
Mon Dec 14 18:54:54 MST 2009


How does the below example look as a complete producer/consumer example?  If
it's well received, perhaps we can add it to the online documentation at
http://twistedmatrix.com/documents/current/core/howto/producers.html in the
"Further Reading" section?  I always felt that the producer/consumer (a.k.a.
high volume streaming) docs lacked a real example that users could download
and run.

#!/sw/external/python-2.6.1/bin/python
"""Serve as a sample implementation of a twisted producer/consumer
system, with a simple TCP server which asks the user how many random
integers they want, and it sends the result set back to the user, one
result per line."""

import random

from zope.interface import implements
from twisted.internet import interfaces, reactor
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver

class Producer:
    """Send back the requested number of random integers to the client."""
    implements(interfaces.IPushProducer)
    def __init__(self, proto, cnt):
        self._proto = proto
        self._goal = cnt
        self._produced = 0
        self._paused = False
    def pauseProducing(self):
        """When we've produced data too fast, pauseProducing() will be
called (reentrantly from within resumeProducing's transport.write
method, most likely), so set a flag that causes production to pause
temporarily."""
        self._paused = True
        print('pausing connection from %s' %
(self._proto.transport.getPeer()))
    def resumeProducing(self):
        self._paused = False
        while not self._paused and self._produced < self._goal:
            next_int = random.randint(0, 10000)
            self._proto.transport.write('%d\r\n' % (next_int))
            self._produced += 1
        if self._produced == self._goal:
            self._proto.transport.unregisterProducer()
            self._proto.transport.loseConnection()
    def stopProducing(self):
        pass

class ServeRandom(LineReceiver):
    """Serve up random data."""
    def connectionMade(self):
        print('connection made from %s' % (self.transport.getPeer()))
        self.transport.write('how many random integers do you want?\r\n')
    def lineReceived(self, line):
        cnt = int(line.strip())
        producer = Producer(self, cnt)
        self.transport.registerProducer(producer, True)
        producer.resumeProducing()
    def connectionLost(self, reason):
        print('connection lost from %s' % (self.transport.getPeer()))
factory = Factory()
factory.protocol = ServeRandom
reactor.listenTCP(1234, factory)
print('listening on 1234...')
reactor.run()

Use on the client:

$ telnet localhost 1234
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
how many random integers do you want?
5
431
7201
3289
9604
6659
Connection closed by foreign host.
$

Use on the server (observe how the server pauses production sometimes - this
happens when a large data set is requested by the client):

$ ./streaming.py
listening on 1234...
connection made from IPv4Address(TCP, '127.0.0.1', 54859)
connection lost from IPv4Address(TCP, '127.0.0.1', 54859)
connection made from IPv4Address(TCP, '127.0.0.1', 54864)
pausing connection from IPv4Address(TCP, '127.0.0.1', 54864)
pausing connection from IPv4Address(TCP, '127.0.0.1', 54864)
pausing connection from IPv4Address(TCP, '127.0.0.1', 54864)
pausing connection from IPv4Address(TCP, '127.0.0.1', 54864)
connection lost from IPv4Address(TCP, '127.0.0.1', 54864)
[...]

Thanks.
-- 
Benjamin Rutt
-------------- next part --------------
An HTML attachment was scrubbed...
URL: </pipermail/twisted-python/attachments/20091214/d8e325b1/attachment.html>


More information about the Twisted-Python mailing list