[Twisted-Python] [p2p-hackers] source code (in python, using twisted) to my reliable udp rpc protocol (fwd from angryhicKclown at netscape.net)

Eugen Leitl eugen at leitl.org
Sat Aug 7 13:19:52 MDT 2004

----- Forwarded message from angryhicKclown at netscape.net -----

From: angryhicKclown at netscape.net
Date: Sat, 07 Aug 2004 14:23:26 -0400
To: p2p-hackers at zgp.org
Subject: [p2p-hackers] source code (in python,
	using twisted) to my reliable udp rpc protocol
X-Mailer: Atlas Mailer 2.0
Reply-To: "Peer-to-peer development." <p2p-hackers at zgp.org>

I was paging thru the archives and realized I never posted it.

Works for me, but I've never done a large-scale test of it. Perhaps someone would like to and share the results?

It consists of several layers, a stack, if you will:
Reliable datagram - handles acks, etc
Datagram stream - allows one to send messages larger than the MTU
CBOB - binary format for storing structured data, similar to xml, except it doesn't suck
TinyRPC - RPC protocol build upon all of those layers

Enjoy. Requires www.twistedmatrix.com.

---- cut here ----
# TinyRPC protocol

# bsd license

from twisted.internet import reactor, protocol, interfaces, defer
from twisted.protocols import policies

import socket
import struct
import binascii
import cStringIO as StringIO
import time

CBOB_INT    = 1
CBOB_STR    = 3
CBOB_OBJ    = 8

def cbob_encode(obj, buf=StringIO.StringIO()):
    if isinstance(obj, bool):
        if obj:
            buf.write(struct.pack("! BB", CBOB_BOOL, 1))
            buf.write(struct.pack("! BB", CBOB_BOOL, 0))
    elif isinstance(obj, int):
        buf.write(struct.pack("! Bi", CBOB_INT, obj))
    elif isinstance(obj, float):
        buf.write(struct.pack("! Bf", CBOB_FLOAT, obj))
    elif isinstance(obj, str):
        buf.write(struct.pack("! BH", CBOB_STR, len(obj)) + obj)
    elif isinstance(obj, unicode):
        buf.write(struct.pack("! BH", CBOB_USTR, len(obj)) + obj)
    elif isinstance(obj, tuple):
        buf.write(struct.pack("! BB", CBOB_TUPLE, len(obj)))
        for o in obj:
            cbob_encode(o, buf)
    elif isinstance(obj, list):
        buf.write(struct.pack("! BH", CBOB_LIST, len(obj)))
        for o in obj:
            cbob_encode(o, buf)
    elif isinstance(obj, dict):
        buf.write(struct.pack("! BH", CBOB_DICT, len(obj)))
        for o in obj:
            cbob_encode(o, buf)
            cbob_encode(obj[o], buf)
    elif isinstance(obj, object):
        members = obj.__dict__.items() #inspect.getmembers(obj)
        buf.write(struct.pack("! BHB", CBOB_OBJ, len(members), len(obj.__class__.__name__)))
        for member in members:
            cbob_encode(member[0], buf)
            cbob_encode(member[1], buf)
    elif isinstance(obj, NoneType):
        buf.write(struct.pack("! B", CBOB_NONE))
        raise "couldn't serialize "  + str(obj)
def cbob_decode(buf):
    d = buf.read(1)
    typ = struct.unpack("! B", d)[0]
    if typ == CBOB_BOOL:
        v = struct.unpack("! B", buf.read(1))[0]
        return v == 1
    elif typ == CBOB_INT:
        return struct.unpack("! i", buf.read(4))[0]
    elif typ == CBOB_FLOAT:
        return struct.unpack("! f", buf.read(4))[0]
    elif typ == CBOB_STR:
        l = struct.unpack("! H", buf.read(2))[0]
        return buf.read(l)
    elif typ == CBOB_USTR:
        l = struct.unpack("! H", buf.read(2))[0]
        return unicode(buf.read(l))
    elif typ == CBOB_TUPLE:
        l = struct.unpack("! B", buf.read(1))[0]
        v = ()
        for i in range(0, l):
            v = v + (cbob_decode(buf),)
        return v
    elif typ == CBOB_LIST:
        l = struct.unpack("! H", buf.read(2))[0]
        v = []
        for i in range(0, l):
        return v
    elif typ == CBOB_DICT:
        l = struct.unpack("! H", buf.read(2))[0]
        v = {}
        for i in range(0, l):
            key,value = cbob_decode(buf),cbob_decode(buf)
            v[key] = value
        return v
    elif typ == CBOB_OBJ:
        membercount,clsnamesize = struct.unpack("! HB", buf.read(3))
        clsname = buf.read(clsnamesize)
        v = globals()[clsname]()
        for i in range(0, membercount):
            name,value = cbob_decode(buf),cbob_decode(buf)
            setattr(v, name, value)
        return v
    elif typ == CBOB_NONE:
        return None
        raise "invalid typecode " + typ

class DispatcherTransport:
    __implements__ = interfaces.IUDPConnectedTransport
    def __init__(self, dispatcher, addr, proto):
        self.dispatcher = dispatcher
        self.addr = addr
        self.protocol = proto
    def getHost(self):
        return self.dispatcher.getHost()
    def getPeer(self):
        return ("INET",) + self.addr
    def write(self, packet):
        return self.dispatcher.transport.write(packet, self.addr)
    def loseConnection(self):
        del self.protocol
class Dispatcher(protocol.DatagramProtocol):
    # TODO: call startFactory()?
    def __init__(self, factory):
        self.handlers = {}
        factory.dispatcher = self
        self.factory = factory
    def __getitem__(self, addr):
        if addr not in self.handlers:
            return self.open_new(addr)
        return self.handlers[addr]
    def datagramReceived(self, data, addr):
    def getHost(self):
        return self.transport.getHost()
    def open_new(self, addr):
        assert addr not in self.handlers, "Already connected"
        p = self.factory.buildProtocol(addr)
        p.transport = DispatcherTransport(self, addr, p)
        self.handlers[addr] = p
        return p
    def startProtocol(self):
        return self.factory.startFactory()
    def stopProtocol(self):
        return self.factory.stopFactory()

class ReliableDatagramProtocol(protocol.ConnectedDatagramProtocol):
    "acks and stuff"
    OP_SEND = 0
    OP_ACK = 128 # msb set
    def startProtocol(self):
        self.retransmits = {} # crc->(data,num retransmits, bits, delayedcall for retransmit, Deferred when packet is acked)
        self.last_received = []
    def stopProtocol(self, reason):
        print "protocol stopped",reason
    def send(self, data, bits=0):
        crc = binascii.crc32(data)
        d = defer.Deferred()
        self.retransmits[crc] = (data, self.NUM_RETRANSMITS, bits, reactor.callLater(self.RETRANSMIT_INTERVAL, self.retransmit, crc), d)
        self.send_packet(self.OP_SEND, bits, data)
        return d
    def retransmit(self, crc):
        t = self.retransmits[crc]
        self.send_packet(self.OP_SEND, t[2], t[0])
        if t[1] > 0:
            self.retransmits[crc] = (t[0], t[1] - 1, t[2], reactor.callLater(self.RETRANSMIT_INTERVAL, self.retransmit, crc), t[4])
            del self.retransmits[crc]
    def send_packet(self, op, bits, data):
        self.transport.write(struct.pack("! B", op | bits) + data)
    def send_ack(self, crc):
        if len(self.last_received) >= self.MAX_LAST_RECEIVED:
            del self.last_received[0]
        self.transport.write(struct.pack("! Bi", self.OP_ACK, crc))
    def got_ack(self, crc):
        d = self.retransmits[crc][4]
        del self.retransmits[crc]
    def datagramReceived(self, data):
        opbyte, = struct.unpack("! B", data[0])
        op = opbyte & 128
        data = data[1:]
        if op == self.OP_SEND:
            crc = binascii.crc32(data)
            if crc not in self.last_received:
                self.dataReceived(data, opbyte)
            self.got_ack(struct.unpack("! i", data)[0])
    def dataReceived(self, data, bits):
        print "received",data,"from",self.transport.getPeer()
    def packetLost(self):

class ReliableDatagramStreamMessageProtocol(ReliableDatagramProtocol):
    "Sends large messages (> mtu size) and sends them in a buffered block..."
    MTU = 512
    OP_BEGINMSG = 64 # 2nd msb set
    OP_DATA = 0

    def startProtocol(self):
        self.outmessagequeue = [] # data len, stream, bits
        self.inmessage = None
        self.packetcount = 0
        self.numpackets = 0
    def sendMessage(self, data, bits=0):
        self.outmessagequeue.append((len(data), StringIO.StringIO(data), bits))
        if len(self.outmessagequeue) == 1:
    def send_first_block(self, dummy=""):
        length,stream,bits = self.outmessagequeue[0]
        length = length - self.MTU + 4
        if length < 0:
            length = 0
        length = length + 1
        self.send(struct.pack("! I", length / self.MTU + length % self.MTU) + stream.read(self.MTU - 4), self.OP_BEGINMSG | bits).addCallback(self.send_next_block)
    def send_next_block(self, dummy=""):
        length,stream,bits = self.outmessagequeue[0]
        d = stream.read(self.MTU)
        if len(d) > 0:
            self.send(d, self.OP_DATA | bits).addCallback(self.send_next_block)
            del self.outmessagequeue[0]
            if len(self.outmessagequeue) > 0:
    def dataReceived(self, data, bits):
        if bits & 64 == self.OP_DATA:
            self.packetcount = self.packetcount + 1
            self.inmessage = StringIO.StringIO()
            self.packetcount = 1
            mtu = len(data) # the entire packet size is the size of the mtu
            self.numpackets = struct.unpack("! I", data[:4])[0]
        if self.packetcount >= self.numpackets:
            self.messageReceived(self.inmessage.getvalue(), bits)
            self.inmessage = StringIO.StringIO()
    def messageReceived(self, message, bits):
        print "got message", message

class DatagramRPCProtocol(ReliableDatagramStreamMessageProtocol):
    OP_CALL = 32
    OP_RETURN = 0
    MAX_CALLID = 512
    def startProtocol(self):
        if hasattr(self.factory, "buildRemote"):
            self.remotes = self.factory.buildRemote()
        self.calls = {}
        self.callid = 0
    def messageReceived(self, message, bits):
        message = StringIO.StringIO(message)
        if bits & 32 == self.OP_CALL:
            t, objname, methodname, args, kwargs = cbob_decode(message)
            r = getattr(self.remotes[objname], methodname)(*args,**kwargs)
            #print "calling " + objname + "." + methodname
            if isinstance(r, defer.Deferred):
                r.addCallback(lambda result: self.send_result(t, result))
                self.send_result(t, r)
            t,r = cbob_decode(message)
    def send_result(self, t, r):
        sio = StringIO.StringIO()
        cbob_encode((t, r), sio)
        self.sendMessage(sio.getvalue(), self.OP_RETURN)
    def callRemote(self, objname, methodname, *args, **kwargs):
        t = self.callid
        if self.callid == self.MAX_CALLID:
            self.callid = 0
            self.callid = self.callid + 1
        self.calls[t] = defer.Deferred()
        sio = StringIO.StringIO()
        cbob_encode((t, objname, methodname, args, kwargs), sio)
        self.sendMessage(sio.getvalue(), self.OP_CALL)
        return self.calls[t]

#class LazyRMIProtocol(DatagramRPCProtocol):
#    raise "Todo: implement"

class TestObject:
    def hello(self, name):
        return "Hello, " + name.first + " " + name.last

class Name:
    first = ""
    last = ""
def test():
    reactor.callLater(0, test2)
def test2():
    f = protocol.ServerFactory()
    #f.protocol = ReliableDatagramProtocol
    #f.protocol = ReliableDatagramStreamMessageProtocol
    f.protocol = DatagramRPCProtocol
    f.buildRemote = lambda: {"myObj" : TestObject()}
    d = Dispatcher(f)
    d2 = Dispatcher(f)
    p = d[socket.gethostbyname("localhost"), 8889]
    p2 = d[socket.gethostbyname("localhost"), 8888]
    reactor.listenUDP(8888, d)
    reactor.listenUDP(8889, d2)
    reactor.callLater(0, test3, p, p2)

def test3(p, p2):
    name = Name()
    name.first = "great"
    name.last = "world"
    p.callRemote("myObj", "hello", name).addCallback(test4)

def test4(r):
    print "result of call", r
if __name__ == "__main__":

Switch to Netscape Internet Service.
As low as $9.95 a month -- Sign up today at http://isp.netscape.com/register

Netscape. Just the Net You Need.

New! Netscape Toolbar for Internet Explorer
Search from anywhere on the Web and block those annoying pop-ups.
Download now at http://channels.netscape.com/ns/search/install.jsp
p2p-hackers mailing list
p2p-hackers at zgp.org
Here is a web page listing P2P Conferences:

----- End forwarded message -----
Eugen* Leitl <a href="http://leitl.org">leitl</a>
ICBM: 48.07078, 11.61144            http://www.leitl.org
8B29F6BE: 099D 78BA 2FD3 B014 B08A  7779 75B0 2443 8B29 F6BE
http://moleculardevices.org         http://nanomachines.net
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 198 bytes
Desc: not available
URL: </pipermail/twisted-python/attachments/20040807/bc6025da/attachment.sig>

More information about the Twisted-Python mailing list