[Twisted-Python] UDP and multiple access

Phil Mayers p.mayers at imperial.ac.uk
Sun Oct 10 20:48:13 EDT 2004


Sebastien Kirche wrote:

> Hi,
> 
> I am  working on a  little project  to list the  servers of the  online game
> «Enemy Territory» and also to list/find a known player. I try to use twisted
> to simplify the network access.
> I am new both to twisted and python, but as i am also a programmer, learning
> is quite easy.
> 
> The task consists in 2 times:
> 1) one UDP request to the  master server at idsoftware that returns the list
>    of the active (slaves) servers (currently around 2020 hosts)
> 2) one UDP  request to *each* slave server  to obtain characteristics (name,
>    map that is played, ...) and the list of the players
> 
> For the time i  managed to perform the task 1) and i  can obtain the list of
> the hosts:ports to ask for.
> 
> If someone would like to peek an eye on my code, i uploaded it [1].
> 
> My problem is now to send around 2000+ non blocking requests and to take the
> answers as they come back.
> 
> Data  treatment should  not be  a problem,  but i  don't really  see  how to
> perform the mass request.
> 
> I  don't understand  clearly  the twisted  terminology (protocols,  factory,
> ...). I  have seen  in another  script [2] a  case where  there is  just one
> datagramReceived that checks on the host:port of the datagram to distinguish
> the answers, but that is kind of a library, and it only shows 2 requests for
> a test. I have no idea for the mass request.
> 
> Would someone be  kind to guide me a  little about it ? For  example by just
> giving me a snippet for a multiple UDP request with the hosts are in a list,
> and if the datagramReceived will suit the response treatment.
> 
> I hope to be clear enough. TIA.
> 

Ah ha. This is extremely similar to what I've been doing recently (SNMP, 
with many many agents, lots of concurrent UDP clients). Here's the setup 
I used (warning: comes with no guarantee, may end your marriage, etc. - 
specifically, read the warning at the bottom).

This is very, very pseudo-code - my actual code has a lot of very 
confusing non-relevant stuff in it. It uses a queue to buffer the 
receive events and ensure the UDP socket queue is emptied asap (the 
queue function exits quickly, but reschdules itself a short time in the 
future; this short time is enough to let select() run and the data be 
received - see the recent thread on the mailing list about "scalability 
with hundreds of clients" and callLater(0, ...) not doing what you might 
think). That same queue is used to start off the clients' first xmit, 
meaning they'll be relatively well interspersed and you won't overload 
the socket *output* buffer either.

class Protocol(protocol.DatagramProtocol):
     def __init__(self):
         self._queue = []
         self.timeouts = []
         from twisted.internet import reactor
         self.reactor = reactor
         reactor.callLater(1, self.dotimeouts)
     def dotimeouts(self):
         now = time.time()
         while self.timeouts:
             due, deferred = self.timeouts[0]
             if due > now:
                 break
             due, deferred = self.timeouts.pop(0)
             # Might have been called, don't timeout if so
             if not deferred.called:
                 deferred.errback(Timeout())
     def queue(self, callable, pargs=tuple(), kwargs=dict()):
         if not self.calls:
             # Then we won't have schedules an dequeue either
             # WARNING: this number is important...
             self.reactor.callLater(0.001, self.dequeue)
         self._queue.append((callable, pargs, kwargs))
     def dequeue(self):
         if not self.calls:
             # Shouldn't happen
             return
         callable, pargs, kwargs = self._queue.pop(0)
         callable(*pargs, **kwargs)
         if self.calls:
             # If we've more to dequeue, do so
             # WARNING: this number is also important...
             self.reactor.callLater(0.001, self.dequeue)
     def datagramReceived(self, data, addr):
         # Do stuff, then...
         pdu = self.parse(data)
         deferred = self.get_outstanding(pdu, addr)
         self.queue(deferred.callback, pdu)
     def query(self, host, op, args):
         # Do stuff, then
         pdu = self.encode(op, args)
         self.transport.write(pdu.bytes(), host)
         return self.set_outstanding(pdu, host)

class Client:
     def __init__(self, host, protocol):
         self.protocol = protocol
         self.host = host
         self.protocol.queue(self.step1)
     def step1(self):
         deferred = self.protocol.query(self.host, 'op', 'args')
         deferred.addCallbacks(self.step2, self.fail)
     def step2(self, pdu):
         for thing, value in pdu.items():
             # blah, blah
             pass
         deferred = self.protocol.query(self.host, 'op2', None)
         deferred.addCallbacks(self.step3, self.fail)


if __name__=='__main__':
     import sys
     from twisted.internet import reactor

     proto = Protocol()
     reactor.listenUDP(0, proto)
     for hostname in sys.argv[1:]
         if ':' in hostname:
             hostname, port = hostname.split(':')
         else:
             port = defaultport
         client = Client((hostname, port), proto)
     reactor.run()

Now, I make no claims this is the perfect Twisted app - it ain't. What 
it *does* show is the only way I've found (which may be entirely my lack 
of ability) to scalably send multiple hundreds of UDP PDUs without:

a) Starving the Twisted mainloop of CPU, meaning select() doesn't get 
run often enough, and the UDP socket buffer overflows, dropping replies 
and necessitating retransmits
b) Using a lot of sockets, which runs into problems with select() and 
poll() as well as the system fd limit (solveable with ulimit I'll grant)
c) Starving the clients that are slow responders of "cpu" (queue) time

The warning however: UDP, lacking flow control, is very easy to get 
wrong and accidentally DDoS the clients you're trying to talk to. 
Specifically the value in the reactor.callLater(0.001, ...) determines 
how often a "task" will be dequeued from the protocol, and therefore how 
many PDUs you'll send and receive events you'll process per second. Best 
to start with a) a small number of servers and b) a larger value (lower 
rate) for that.

You should also probably implement some form of variable per-host 
timeout to get some kind of rate control/

However, I've had a great deal of difficultly making these points 
understood to other coders, for which I've got two explanations; I'm 
totally wrong, or it's a very subtle issue. Guess which I think it is :o)




More information about the Twisted-Python mailing list