<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2//EN">
<HTML>
<HEAD>
<META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=us-ascii">
<META NAME="Generator" CONTENT="MS Exchange Server version 6.5.7651.59">
<TITLE>Learning about IPushProducer</TITLE>
</HEAD>
<BODY>
<!-- Converted from text/rtf format -->

<P><FONT SIZE=2 FACE="Courier New">When running the following code (my 2nd twisted program!), it works as I had hoped - it doesn't starve any clients that want to receive data back, even with a simultaneously active really long streaming server-to-client communication (i.e. one piggy client asking for millions of bytes).&nbsp; i.e. another client can get in and ask for just a few bytes while a large payload is being delivered to a different client.&nbsp; Which is great!</FONT></P>

<P><FONT SIZE=2 FACE="Courier New">Here's a sample interaction from the client side:</FONT>
</P>

<P><FONT SIZE=2 FACE="Courier New">$ telnet localhost 8007</FONT>

<BR><FONT SIZE=2 FACE="Courier New">Trying 127.0.0.1...</FONT>

<BR><FONT SIZE=2 FACE="Courier New">Connected to localhost.</FONT>

<BR><FONT SIZE=2 FACE="Courier New">Escape character is '^]'.</FONT>

<BR><FONT SIZE=2 FACE="Courier New">1</FONT>

<BR><FONT SIZE=2 FACE="Courier New">x</FONT>

<BR><FONT SIZE=2 FACE="Courier New">2</FONT>

<BR><FONT SIZE=2 FACE="Courier New">xx</FONT>

<BR><FONT SIZE=2 FACE="Courier New">3</FONT>

<BR><FONT SIZE=2 FACE="Courier New">xxx</FONT>

<BR><FONT SIZE=2 FACE="Courier New">10</FONT>

<BR><FONT SIZE=2 FACE="Courier New">xxxxxxxxxx</FONT>

<BR><FONT SIZE=2 FACE="Courier New">99999</FONT>

<BR><FONT SIZE=2 FACE="Courier New">xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx</FONT></P>

<P><FONT SIZE=2 FACE="Courier New">[...lots of x's...]</FONT>

<BR><FONT SIZE=2 FACE="Courier New">xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx</FONT>

<BR><FONT SIZE=2 FACE="Courier New">bye</FONT>

<BR><FONT SIZE=2 FACE="Courier New">Connection closed by foreign host.</FONT>

<BR><FONT SIZE=2 FACE="Courier New">$ </FONT>
</P>

<P><FONT SIZE=2 FACE="Courier New">So I have 2 questions on my code:</FONT>
</P>

<P><FONT SIZE=2 FACE="Courier New">1) am I doing anything wrong in setting up the plumbing?</FONT>

<BR><FONT SIZE=2 FACE="Courier New">2) does pauseProducing() get called by another thread whilst resumeProducing() is running?&nbsp; (I believe it must, otherwise my resumeProducing() would only be entered once).&nbsp; If so I should have an appropriate mutex around the read/write of self.pause, no?</FONT></P>

<P><FONT SIZE=2 FACE="Courier New">Here is the code, and output from the server is at the end.&nbsp; Thanks -- Benjamin</FONT>
</P>

<P><FONT SIZE=2 FACE="Courier New">#!/usr/bin/env python</FONT>

<BR><FONT SIZE=2 FACE="Courier New">import os, os.path, sys, re, commands, pickle, tempfile, getopt, datetime</FONT>

<BR><FONT SIZE=2 FACE="Courier New">import socket, string, random, time, traceback, shutil, popen2</FONT>
</P>

<P><FONT SIZE=2 FACE="Courier New">from zope.interface import implements</FONT>

<BR><FONT SIZE=2 FACE="Courier New">from twisted.internet import protocol, defer, interfaces, error, reactor</FONT>

<BR><FONT SIZE=2 FACE="Courier New">from twisted.internet.protocol import Protocol, Factory</FONT>

<BR><FONT SIZE=2 FACE="Courier New">from twisted.protocols.basic import LineReceiver</FONT>
</P>

<P><FONT SIZE=2 FACE="Courier New">class NonStarvingXGiver:</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp; implements(interfaces.IPushProducer)</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp; def __init__(self, howmany, consumer):</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.howmany = howmany</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.sent_already = 0</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.paused = False</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.consumer = consumer</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp; def beginSendingXs(self):</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.deferred = deferred = defer.Deferred()</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.consumer.registerProducer(self, False)</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return deferred</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp; def pauseProducing(self):</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; print 'pauseProducing: invoked'</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.paused = True</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp; def resumeProducing(self):</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; print 'resumeProducing: invoked'</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.paused = False</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; maxchunksz = 1024</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while not self.paused and self.howmany &gt; self.sent_already:</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; chunksz = min(maxchunksz, self.howmany - self.sent_already)</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.consumer.write('x' * chunksz)</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.sent_already += chunksz</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if self.howmany == self.sent_already:</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.consumer.write('\n')</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.consumer.unregisterProducer()</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; print 'resumeProducing: exiting for the last time'</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp; def stopProducing(self):</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; print 'stopProducing: invoked'</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.consumer.unregisterProducer()</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </FONT>

<BR><FONT SIZE=2 FACE="Courier New">class xgiver(LineReceiver):</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp; def lineReceived(self, howmany):</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; print 'got line [%s] from client [%s]' % (howmany,</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.transport.getPeer())</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if howmany == 'bye':</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; print 'goodbye to', self.transport.getPeer()</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.transport.loseConnection()</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try:</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; howmany = int(howmany)</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; s = NonStarvingXGiver(howmany, self.transport)</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; s.beginSendingXs()</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; except Exception, ex:</FONT>

<BR><FONT SIZE=2 FACE="Courier New">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.transport.write(&quot;invalid input &quot; + howmany + &quot;\n&quot;)</FONT>
</P>

<P><FONT SIZE=2 FACE="Courier New"># Next lines are magic:</FONT>

<BR><FONT SIZE=2 FACE="Courier New">factory = Factory()</FONT>

<BR><FONT SIZE=2 FACE="Courier New">factory.protocol = xgiver</FONT>
</P>

<P><FONT SIZE=2 FACE="Courier New"># 8007 is the port you want to run under. Choose something &gt;1024</FONT>

<BR><FONT SIZE=2 FACE="Courier New">reactor.listenTCP(8007, factory)</FONT>

<BR><FONT SIZE=2 FACE="Courier New">reactor.run()</FONT>
</P>
<BR>

<P><FONT SIZE=2 FACE="Courier New">-------------------------------------------------------------------</FONT>

<BR><FONT SIZE=2 FACE="Courier New">Server output:</FONT>
</P>

<P><FONT SIZE=2 FACE="Courier New">$ ./xgiver.py </FONT>

<BR><FONT SIZE=2 FACE="Courier New">got line [1] from client [IPv4Address(TCP, '127.0.0.1', 51007)]</FONT>

<BR><FONT SIZE=2 FACE="Courier New">resumeProducing: invoked</FONT>

<BR><FONT SIZE=2 FACE="Courier New">resumeProducing: exiting for the last time</FONT>

<BR><FONT SIZE=2 FACE="Courier New">got line [2] from client [IPv4Address(TCP, '127.0.0.1', 51007)]</FONT>

<BR><FONT SIZE=2 FACE="Courier New">resumeProducing: invoked</FONT>

<BR><FONT SIZE=2 FACE="Courier New">resumeProducing: exiting for the last time</FONT>

<BR><FONT SIZE=2 FACE="Courier New">got line [3] from client [IPv4Address(TCP, '127.0.0.1', 51007)]</FONT>

<BR><FONT SIZE=2 FACE="Courier New">resumeProducing: invoked</FONT>

<BR><FONT SIZE=2 FACE="Courier New">resumeProducing: exiting for the last time</FONT>

<BR><FONT SIZE=2 FACE="Courier New">got line [10] from client [IPv4Address(TCP, '127.0.0.1', 51007)]</FONT>

<BR><FONT SIZE=2 FACE="Courier New">resumeProducing: invoked</FONT>

<BR><FONT SIZE=2 FACE="Courier New">resumeProducing: exiting for the last time</FONT>

<BR><FONT SIZE=2 FACE="Courier New">got line [99999] from client [IPv4Address(TCP, '127.0.0.1', 51007)]</FONT>

<BR><FONT SIZE=2 FACE="Courier New">resumeProducing: invoked</FONT>

<BR><FONT SIZE=2 FACE="Courier New">pauseProducing: invoked</FONT>

<BR><FONT SIZE=2 FACE="Courier New">resumeProducing: invoked</FONT>

<BR><FONT SIZE=2 FACE="Courier New">resumeProducing: exiting for the last time</FONT>

<BR><FONT SIZE=2 FACE="Courier New">got line [bye] from client [IPv4Address(TCP, '127.0.0.1', 51007)]</FONT>

<BR><FONT SIZE=2 FACE="Courier New">goodbye to IPv4Address(TCP, '127.0.0.1', 51007)</FONT>
</P>

</BODY>
</HTML>