[Twisted-Python] Twisted scalability with hundreds of outbound (client) connections

Phil Mayers p.mayers at imperial.ac.uk
Sun Oct 3 09:26:15 EDT 2004

Itamar Shtull-Trauring wrote:

>>I'm also slightly concerned about the number of function calls involved 
>>in jumping in and out of the reactor that many times a second (several 
>>thousand, if I can get it to go as fast as my previous code) given how 
>>expensive they are under Python. It would certainly be quicker to 
>>implement this inside the reactor.mainLoop.
> Just have a single reactor.callLater(0, f), and f() then calls all the
> functions you want done in that iteration.

Ok, just a quick note to people - I solved this as suggested, however 
reactor.callLater(0, func) does not work; because 0 always means *now*, 
you get the queuing problem:

class SNMP(protocol.DatagramProtocol):
     def datagramReceived(self, data, addr):
         pdu = self.decode(data)
         self.queue(pdu.deferred.callback, (pdu,))

     def queue(self, func, pargs):
         if not self.calls:
             # Schedule a receive at some later date
             reactor.callLater(0.001, self.dequeue)
         self.calls.append((func, pargs))

     def dequeue(self):
         if not self.calls:
         func, pargs = self.calls.pop(0)
         # The problem is here - this function will almost certainly
         # be a protocol action that will generate another transmit PDU
         # With many clients, the many transmits can overflow the input
         # queue while we're spinning inside code
         if self.calls:
             # To avoid the problem, wait "delta" (some small number)
             # rather than zero; this will ensure a select() happens
             # before the callLater
             reactor.callLater(0.001, self.dequeue)

class Agent:
     def __init__(self, host, proto):
         self.proto = proto
     def start(self):
         d = self.proto.query(self.host, 'get', oid1, oid2)
         d.addCallbacks(self.step2, self.error)
     def step2(self, pdu):
         # Do some stuff
         d = self.proto.query(self.host, 'get', self.whatnow[pdu])
         d.addCallbacks(self.step3, self.error)

proto = SNMP()
for hostname in sys.argv[1:]:
     a = Agent(hostname, proto)
     # Start up one at a time, to avoid startup surge
     proto.queue(a.start, None)
from twisted.internet import reactor

I hope I'm explaining what's going on here - but if not, don't worry, 
the problem is more or less solved for me, thanks for the assistance. 
The only minor remaining niggle is that the static 0.001 value to 
callLater limits my theoretical max throughput to 1000 queries/sec. The 
only way to do without that parameter would be to execute a select() 
inside every function call in runUntilCurrent I think. As it happens, 
1000/sec is more than the box can do anyway, so it's not a problem at 
the moment!

