Ticket #4171: streaming.py

File streaming.py, 2.2 KB (added by ruttbe, 5 years ago)

an example of a producer/consumer in twisted

Line 
1#!/usr/bin/env python
2"""Serve as a sample implementation of a twisted producer/consumer
3system, with a simple TCP server which asks the user how many random
4integers they want, and it sends the result set back to the user, one
5result per line, and then closes the connection.
6
7Requires python 2.6 and twisted 8.2.0 or above."""
8
9import random
10
11from zope.interface import implements
12from twisted.internet import interfaces, reactor
13from twisted.internet.protocol import Factory
14from twisted.protocols.basic import LineReceiver
15
16class Producer:
17    """Send back the requested number of random integers to the client."""
18    implements(interfaces.IPushProducer)
19    def __init__(self, proto, cnt):
20        self._proto = proto
21        self._goal = cnt
22        self._produced = 0
23        self._paused = False
24    def pauseProducing(self):
25        """When we've produced data too fast, pauseProducing() will be
26called (reentrantly from within resumeProducing's transport.write
27method, most likely), so set a flag that causes production to pause
28temporarily."""
29        self._paused = True
30        print('pausing connection from %s' % (self._proto.transport.getPeer()))
31    def resumeProducing(self):
32        self._paused = False
33        while not self._paused and self._produced < self._goal:
34            next_int = random.randint(0, 10000)
35            self._proto.transport.write('%d\r\n' % (next_int))
36            self._produced += 1
37        if self._produced == self._goal:
38            self._proto.transport.unregisterProducer()
39            self._proto.transport.loseConnection()
40    def stopProducing(self):
41        pass
42
43class ServeRandom(LineReceiver):
44    """Serve up random data."""
45    def connectionMade(self):
46        print('connection made from %s' % (self.transport.getPeer()))
47        self.transport.write('how many random integers do you want?\r\n')
48    def lineReceived(self, line):
49        cnt = int(line.strip())
50        producer = Producer(self, cnt)
51        self.transport.registerProducer(producer, True)
52        producer.resumeProducing()
53    def connectionLost(self, reason):
54        print('connection lost from %s' % (self.transport.getPeer()))
55factory = Factory()
56factory.protocol = ServeRandom
57reactor.listenTCP(1234, factory)
58print('listening on 1234...')
59reactor.run()