[Twisted-Python] Twisted scalability with hundreds of outbound (client) connections

Phil Mayers p.mayers at imperial.ac.uk
Sun Sep 26 14:32:46 EDT 2004


I have an SNMP poller written in Python that I'd like to turn into a 
long-running server process. The obvious thing (it is obvious - I really 
need the support infrastructure) is to convert it to twisted, since it 
provides easy support to turn the process into a server as well as a 
client (e.g. XMLRPC query mapping onto an SNMP fetch; using DBAPI to 
update node status, all hooked in via deferred, etc.)

However, I've done a couple of proof-of-concept ports and am having 
scalability problems related to the way reactors work. I'm not entirely 
certain I've got the problem nailed, since the reactor sources are... 
complex... but I think what happens is this: The framework is heavily 
oriented towards getting IO out as soon as possible, and since there's 
no "scheduling" of receive events, protocol instances talking to "fast" 
clients eat all the CPU, and the socket buffer overflows since each 
datagramReceived triggers another write().

Possibly an example would help:

class agent:
     def start(self):
         d = self.get_table('ifTable')
         d.addCallbacks(self.step2, self.die)
     def step2(self, ifTable):
         # Do some processing, then more data fetch
         d = self.get_table('ipAddrTable')
         d.addCallbacks(self.step3, self.die)
     def step3(self, ipAddrTable):
         # etc. etc.

def do_timeouts(reactor):
     # get the list of sent PDUs
     now = time.time()
     for pdu in pdus:
         if pdu.due < now:
             pdu.deferred.errback(Timeout())
     reactor.callLater(1, do_timeouts, reactor)

import sys
from twisted.internet import reactor

for host in sys.argv[1:]:
     a = agent(host)
     reactor.callLater(0, a.start)
reactor.callLater(1, do_timeouts, reactor)
reactor.run()


Now, I know there are scalability problems with callLater, but that's 
only being used for the startup because I can't think of a cleaner way 
of doing it - future activity is all triggered by deferreds. I'm 
handling the timeouts by running a once-per-second expiry which is 
sufficient for my needs.

The *problem* is that the SNMP agents I'm talking to have an extremely 
wide range of performances, response times and quantity of data to 
retrieve, and the faster endpoints are consuming a disproportionate 
quantity of the CPU time; so much so in fact, that the UDP socket 
receive queue limit is being exceeded (I'm using a single UDP socket for 
all the clients, and breaking the responses back out based on SNMP 
message ID - this is to overcome FD limits).

We're talking 750-1000 clients here BTW.

What I really need is some way of making the reactor do a fair-queuing 
style of transmit/receive; I have some not-very-clear notion in my head 
of transport.write not actually doing the write, but just putting the 
output data onto a per-agent queue, and the reactor emptying those 
queues in a strictly round-robin fashion, *but* stopping immediately to 
service input data if it arrives.

Hmm. I suppose I could implement that by subclassing the UDP transport:

class queuedtransport:
     def write(self, data, host=None):
         if not host in self.host2queue:
             queue = max(self.host2queue.values()) + 1
             self.host2queue[host] = queue
         else:
             queue = self.host2queue[host]
         self.queues.setdefault(queue, []).append(data)
     def queue(self):
         if not self.queues:
            return reactor.callLater(1, self.queue)
         while True:
             self.pos = (self.pos + 1) % len(self.queues)
             if not self.queues[self.pos]:
                 continue
             self._write(self.queues[self.pos].pop(0))
             break

Or something - I'm fairly sure that code wouldn't quite work, but 
something similar.

Does anyone have any suggestions how to handle this? My existing 
(home-grown, nasty) async framework can handle a lot of queries a 
second; upwards of several hundred, and it does a very good job of 
sharing out the load evenly so that fast systems complete quickly but 
the slower ones still get enough time to a) not suffer PDU timeouts and 
b) not exhaust inbound socket buffers.

The way my current code works is:

def receive():
     r,w,x = select.select(fds,[],[])
     for rfd in r:
         while True:
             try:
                 pdu = r.read()
                 if not pdu:
                     break
                 mark_answered(pdu)
              except:
                  pass
         return len(r)
     else:
         return None

while self.jobs:
     while True:
         if not receive():
             break
         job = job.nextptr
         if job.waiting:
             if job.waiting.answered:
                 job.receive()
             elif job.waiting.due < now:
                 try:
                     job.waiting.retry()
                 except:
                     # Out of retries
                     job.cancel()


You can probably see the way this works - on every loop, receive IO 
takes absolute precedence but goes into an input queue, and the input 
queue only has one "receive" event per IO-empty - *and* there's a 
pointer into the queue of jobs to ensure that they're serviced in a 
round-robin fashion if ready. Sadly, the pseudo-code above makes it look 
much cleaner than the real code, and I want the other protocol / service 
support along with other things like the deferred pattern and such.

I would welcome any suggestions.




More information about the Twisted-Python mailing list