[Twisted-Python] Learning about IPushProducer

Rutt, Benjamin Benjamin.Rutt at gs.com
Tue Mar 6 12:03:53 EST 2007


When running the following code (my 2nd twisted program!), it works as I
had hoped - it doesn't starve any clients that want to receive data
back, even with a simultaneously active really long streaming
server-to-client communication (i.e. one piggy client asking for
millions of bytes).  i.e. another client can get in and ask for just a
few bytes while a large payload is being delivered to a different
client.  Which is great!

Here's a sample interaction from the client side:

$ telnet localhost 8007
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
1
x
2
xx
3
xxx
10
xxxxxxxxxx
99999
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[...lots of x's...]
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
bye
Connection closed by foreign host.
$ 

So I have 2 questions on my code:

1) am I doing anything wrong in setting up the plumbing?
2) does pauseProducing() get called by another thread whilst
resumeProducing() is running?  (I believe it must, otherwise my
resumeProducing() would only be entered once).  If so I should have an
appropriate mutex around the read/write of self.pause, no?

Here is the code, and output from the server is at the end.  Thanks --
Benjamin

#!/usr/bin/env python
import os, os.path, sys, re, commands, pickle, tempfile, getopt,
datetime
import socket, string, random, time, traceback, shutil, popen2

from zope.interface import implements
from twisted.internet import protocol, defer, interfaces, error, reactor
from twisted.internet.protocol import Protocol, Factory
from twisted.protocols.basic import LineReceiver

class NonStarvingXGiver:
    implements(interfaces.IPushProducer)
    def __init__(self, howmany, consumer):
        self.howmany = howmany
        self.sent_already = 0
        self.paused = False
        self.consumer = consumer
    def beginSendingXs(self):
        self.deferred = deferred = defer.Deferred()
        self.consumer.registerProducer(self, False)
        return deferred
    def pauseProducing(self):
        print 'pauseProducing: invoked'
        self.paused = True
    def resumeProducing(self):
        print 'resumeProducing: invoked'
        self.paused = False
        maxchunksz = 1024
        while not self.paused and self.howmany > self.sent_already:
            chunksz = min(maxchunksz, self.howmany - self.sent_already)
            self.consumer.write('x' * chunksz)
            self.sent_already += chunksz
        if self.howmany == self.sent_already:
            self.consumer.write('\n')
            self.consumer.unregisterProducer()
            print 'resumeProducing: exiting for the last time'
    def stopProducing(self):
        print 'stopProducing: invoked'
        self.consumer.unregisterProducer()
        
class xgiver(LineReceiver):
    def lineReceived(self, howmany):
        print 'got line [%s] from client [%s]' % (howmany,
 
self.transport.getPeer())
        if howmany == 'bye':
            print 'goodbye to', self.transport.getPeer()
            self.transport.loseConnection()
            return
        try:
            howmany = int(howmany)
            s = NonStarvingXGiver(howmany, self.transport)
            s.beginSendingXs()
        except Exception, ex:
            self.transport.write("invalid input " + howmany + "\n")

# Next lines are magic:
factory = Factory()
factory.protocol = xgiver

# 8007 is the port you want to run under. Choose something >1024
reactor.listenTCP(8007, factory)
reactor.run()


-------------------------------------------------------------------
Server output:

$ ./xgiver.py 
got line [1] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [2] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [3] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [10] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [99999] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
pauseProducing: invoked
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [bye] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
goodbye to IPv4Address(TCP, '127.0.0.1', 51007)

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://twistedmatrix.com/pipermail/twisted-python/attachments/20070306/f2bde4cb/attachment.htm 


More information about the Twisted-Python mailing list