[Twisted-Python] Re: Server with several outgoing connections

Paul Campbell paul at ref.nmedia.net
Fri Nov 5 07:51:09 MST 2004


On Thu, Nov 04, 2004 at 02:37:57AM -0700, twisted-python-request at twistedmatrix.com wrote:
> I'm new to Twisted and I have tried to read and understand the 
> intricate sides of it.
> And have found as so many other that the learning curve pretty steep, 
> especially I guess if one comes from being a C programmer :-)

I've been there myself recently. It also doesn't help that the standard
idioms (design patterns?) in Python are significantly different in some
ways from C.

> Writing the 'client' side is not so easy and here I guess my C legacy 
> is hampering me.
> 
> So I'd like someone to give me, or point me, to an example on how to 
> dynamically create new connections to receivers. I first though these 
> connections should be shortlived that is 'open the connection, send the 
> message, receive the ACK and close the connection' but now I think it 
> might be wiser to keep the connections as long as possible. That is, 
> until that receiver disappears.

Not really. You just have to do a lot of tracking by creating the protocols
in the protocol factory. But there's an easier way for your particular
case since essentially you are doing RPC's.

> Another side of this is that the influx of messages might be higher 
> than the possible output, that is each node has to keep queues for 
> messages not yet sent and messages sent but not ack'ed.
> It might also happen that recivers of messages are not accessible in 
> which case the message has to be queue until the receiver pops up 
> again. The system is not allowed to drop messages.
> Note, that each incomming message might have several receivers, so one 
> incomming message might end up in the sendqueue of several receivers.
> 
> And here in lies one of the problems, I guess I have to keep a 
> 'centrally' managed queue, that should then be notified on the change 
> of status of a message on route to a recevier. How should I implement 
> this ?

All of this is a lot easier to do at a higher level. Implement your code
in the perspective broker. Then all the ACK's and such are automatically
handled. In PB, the code is "almost symmetric" so there's no real difference
between implementing a "server" and a "client". The server/client code
ends up looking almost identical. Plus, PB semi-automatically handles
packet formatting so that you can just pass around high level messages,
and it does active cache updating for you semi-automatically. You can also
implement a login protocol fairly easily if you have a need for that type
of function.

Otherwise, RPC's are already implemented in the XML code. Simply steal the
XML/HTTP stuff and use it for "non-web" related functions. This is a common
technique and issues about client/server management go away (handled
automatically in the HTTP code). In your case, there's no "return" since
you are simply passing messages. A "return" is just an ACK for you.

Otherwise, if you want something more primitive but you can forgo using TCP,
it's a lot easier to implement in UDP. For your protocol, what you described
sounds very un-TCP-like. It doesn't really fit the mold of a bidirectional
stream of bytes in a client/server framework. 

In the datagram protocol, keep your messages within the limits of a packet
or else you have to deal with packet fragmenting and reassembly issues.

Here's how I got around it:

class queueNode:
 "Placeholder for holding data"
 pass

class MyMesgProtocol(DatagramProtocol):
 sendTimeout = 30 # Seconds to retry
 def __init__(self):
  sendQueue = dict()
 def sendMessage(self, data, (host,port)):
  "Sends outgoing messages"
  self.transport.write("W"+data, (host, port))
  q=queueNode
  q.message = data
  q.retries = 3
  q.address = (host,port)
  m = mangle(host, port)
  q.call = reactor.callLater(sendTimeout, retryMesg, m)
  sendQueue[m] = q
  return
 def datagramReceived(self, data, (host,port)):
  "Handles incoming packets"
  if data[0] == "W":
   self.transport.write("A", (host,port))
   self.handleMessage(data[1:], (host,port))
  elif data[0] == "A":
   m = mangle(host,port)
   if m in sendQueue:
    q = sendQueue[m]
    q.call.cancel()
    del sendQueue[m]
   else:
    pass # Hmmm...got an acknowledgement but we didn't have a message queued
  else:
   pass # Hmmm...not a valid packet type
  return
 def handleMessage(self, data, (host,port)):
  "User-level handler. Subclass your message system here."
  pass
 def retryMesg(self, index):
  "Handles retry timeouts"
  q = sendQueue[m]
  q.retries -=1
  if not retries:
   self.nodeFailure(q.address)
   del sendQueue[m]
   return
  self.transport.write("W"+data,q.address)
  q.call = reactor.callLater(sendTimeout, retryMesg, m)
  return
 def nodeFailure(self, (address, port)):
  "User-level node failure handler. Subclass your code here."
  pass

To use the code, simply subclass it and define the two functions
handleMessage and nodeFailure. Send messages via sendMessage. Note that
right now, the code will NOT properly handle multiple messages queued to
the same destination address for brevity. To fix this, you'll need to
insert nonces on the messages so that the ACK's can be paired up with the
corresponding entries in the message queue.

I made several references to a "mangle" function. This function takes an
(address, port) call and returns a string or a long integer or whatever
you feel appropriate to use for indexing the dictionary structure. It's
a simple function to write so I left it for the reader. Plus, it is a simple
matter to extend the same code to add a few bytes for a nonce to the
message "header" (currently a single ASCII letter). Then add the nonce to
your address mangling code for the queue, and then the same queue is now
extended to handle multiple messages in flight to the same destination
as well.

In a similar fashion, the ACK's are about 15 bytes right now (just a single
byte attached to the UDP header). There's nothing to say that you can't extend
the code to a full blown RPC code. Simply edit the code that calls the
message handler to accept a return string and then send that string on the
write message. The send message code has to be changed to return a
deferred (q.deferred=defer.Deferred()). Then later on, the ACK code uses
callBack to return data to the original message calling function.


I noticed that you had a lot of things referencing buffer sizes and such.
This version doesn't pay any attention to buffers at all. If you want to
control buffer sizes, then not only will you need the above extra code to
insert and delete nonces, but you'll also need to use the HTB code in Twisted
or roll your own similar function. Just subclass/wrap the above code as
appropriate in the HTB libraries.




More information about the Twisted-Python mailing list