[Twisted-Python] [p2p-hackers] source code (in python, using twisted) to my reliable udp rpc protocol (fwd from angryhicKclown at netscape.net)
Eugen Leitl
eugen at leitl.org
Sat Aug 7 15:19:52 EDT 2004
----- Forwarded message from angryhicKclown at netscape.net -----
From: angryhicKclown at netscape.net
Date: Sat, 07 Aug 2004 14:23:26 -0400
To: p2p-hackers at zgp.org
Subject: [p2p-hackers] source code (in python,
using twisted) to my reliable udp rpc protocol
X-Mailer: Atlas Mailer 2.0
Reply-To: "Peer-to-peer development." <p2p-hackers at zgp.org>
I was paging thru the archives and realized I never posted it.
Works for me, but I've never done a large-scale test of it. Perhaps someone would like to and share the results?
It consists of several layers, a stack, if you will:
Reliable datagram - handles acks, etc
Datagram stream - allows one to send messages larger than the MTU
CBOB - binary format for storing structured data, similar to xml, except it doesn't suck
TinyRPC - RPC protocol build upon all of those layers
Enjoy. Requires www.twistedmatrix.com.
---- cut here ----
# TinyRPC protocol
# bsd license
from twisted.internet import reactor, protocol, interfaces, defer
from twisted.protocols import policies
import socket
import struct
import binascii
import cStringIO as StringIO
import time
CBOB_BOOL = 0
CBOB_INT = 1
CBOB_FLOAT = 2
CBOB_STR = 3
CBOB_USTR = 4
CBOB_TUPLE = 5
CBOB_LIST = 6
CBOB_DICT = 7
CBOB_OBJ = 8
CBOB_NONE = 9
def cbob_encode(obj, buf=StringIO.StringIO()):
if isinstance(obj, bool):
if obj:
buf.write(struct.pack("! BB", CBOB_BOOL, 1))
else:
buf.write(struct.pack("! BB", CBOB_BOOL, 0))
elif isinstance(obj, int):
buf.write(struct.pack("! Bi", CBOB_INT, obj))
elif isinstance(obj, float):
buf.write(struct.pack("! Bf", CBOB_FLOAT, obj))
elif isinstance(obj, str):
buf.write(struct.pack("! BH", CBOB_STR, len(obj)) + obj)
elif isinstance(obj, unicode):
buf.write(struct.pack("! BH", CBOB_USTR, len(obj)) + obj)
elif isinstance(obj, tuple):
buf.write(struct.pack("! BB", CBOB_TUPLE, len(obj)))
for o in obj:
cbob_encode(o, buf)
elif isinstance(obj, list):
buf.write(struct.pack("! BH", CBOB_LIST, len(obj)))
for o in obj:
cbob_encode(o, buf)
elif isinstance(obj, dict):
buf.write(struct.pack("! BH", CBOB_DICT, len(obj)))
for o in obj:
cbob_encode(o, buf)
cbob_encode(obj[o], buf)
elif isinstance(obj, object):
members = obj.__dict__.items() #inspect.getmembers(obj)
buf.write(struct.pack("! BHB", CBOB_OBJ, len(members), len(obj.__class__.__name__)))
buf.write(obj.__class__.__name__)
for member in members:
cbob_encode(member[0], buf)
cbob_encode(member[1], buf)
elif isinstance(obj, NoneType):
buf.write(struct.pack("! B", CBOB_NONE))
else:
raise "couldn't serialize " + str(obj)
def cbob_decode(buf):
d = buf.read(1)
typ = struct.unpack("! B", d)[0]
if typ == CBOB_BOOL:
v = struct.unpack("! B", buf.read(1))[0]
return v == 1
elif typ == CBOB_INT:
return struct.unpack("! i", buf.read(4))[0]
elif typ == CBOB_FLOAT:
return struct.unpack("! f", buf.read(4))[0]
elif typ == CBOB_STR:
l = struct.unpack("! H", buf.read(2))[0]
return buf.read(l)
elif typ == CBOB_USTR:
l = struct.unpack("! H", buf.read(2))[0]
return unicode(buf.read(l))
elif typ == CBOB_TUPLE:
l = struct.unpack("! B", buf.read(1))[0]
v = ()
for i in range(0, l):
v = v + (cbob_decode(buf),)
return v
elif typ == CBOB_LIST:
l = struct.unpack("! H", buf.read(2))[0]
v = []
for i in range(0, l):
v.append(cbob_decode(buf))
return v
elif typ == CBOB_DICT:
l = struct.unpack("! H", buf.read(2))[0]
v = {}
for i in range(0, l):
key,value = cbob_decode(buf),cbob_decode(buf)
v[key] = value
return v
elif typ == CBOB_OBJ:
membercount,clsnamesize = struct.unpack("! HB", buf.read(3))
clsname = buf.read(clsnamesize)
v = globals()[clsname]()
for i in range(0, membercount):
name,value = cbob_decode(buf),cbob_decode(buf)
setattr(v, name, value)
return v
elif typ == CBOB_NONE:
return None
else:
raise "invalid typecode " + typ
class DispatcherTransport:
__implements__ = interfaces.IUDPConnectedTransport
def __init__(self, dispatcher, addr, proto):
self.dispatcher = dispatcher
self.addr = addr
self.protocol = proto
def getHost(self):
return self.dispatcher.getHost()
def getPeer(self):
return ("INET",) + self.addr
def write(self, packet):
return self.dispatcher.transport.write(packet, self.addr)
def loseConnection(self):
self.protocol.stopProtocol()
del self.protocol
class Dispatcher(protocol.DatagramProtocol):
# TODO: call startFactory()?
def __init__(self, factory):
self.handlers = {}
factory.dispatcher = self
self.factory = factory
def __getitem__(self, addr):
if addr not in self.handlers:
return self.open_new(addr)
return self.handlers[addr]
def datagramReceived(self, data, addr):
self[addr].datagramReceived(data)
def getHost(self):
return self.transport.getHost()
def open_new(self, addr):
assert addr not in self.handlers, "Already connected"
p = self.factory.buildProtocol(addr)
p.transport = DispatcherTransport(self, addr, p)
p.startProtocol()
self.handlers[addr] = p
return p
def startProtocol(self):
return self.factory.startFactory()
def stopProtocol(self):
return self.factory.stopFactory()
class ReliableDatagramProtocol(protocol.ConnectedDatagramProtocol):
"acks and stuff"
NUM_RETRANSMITS = 2 #10
RETRANSMIT_INTERVAL = .200
MAX_LAST_RECEIVED = 50
OP_SEND = 0
OP_ACK = 128 # msb set
def startProtocol(self):
self.retransmits = {} # crc->(data,num retransmits, bits, delayedcall for retransmit, Deferred when packet is acked)
self.last_received = []
def stopProtocol(self, reason):
print "protocol stopped",reason
def send(self, data, bits=0):
crc = binascii.crc32(data)
d = defer.Deferred()
self.retransmits[crc] = (data, self.NUM_RETRANSMITS, bits, reactor.callLater(self.RETRANSMIT_INTERVAL, self.retransmit, crc), d)
self.send_packet(self.OP_SEND, bits, data)
return d
def retransmit(self, crc):
t = self.retransmits[crc]
self.send_packet(self.OP_SEND, t[2], t[0])
if t[1] > 0:
self.retransmits[crc] = (t[0], t[1] - 1, t[2], reactor.callLater(self.RETRANSMIT_INTERVAL, self.retransmit, crc), t[4])
else:
self.packetLost(retransmits[crc][0])
del self.retransmits[crc]
def send_packet(self, op, bits, data):
self.transport.write(struct.pack("! B", op | bits) + data)
def send_ack(self, crc):
if len(self.last_received) >= self.MAX_LAST_RECEIVED:
del self.last_received[0]
self.last_received.append(crc)
self.transport.write(struct.pack("! Bi", self.OP_ACK, crc))
def got_ack(self, crc):
self.retransmits[crc][3].cancel()
d = self.retransmits[crc][4]
del self.retransmits[crc]
d.callback(crc)
def datagramReceived(self, data):
opbyte, = struct.unpack("! B", data[0])
op = opbyte & 128
data = data[1:]
if op == self.OP_SEND:
crc = binascii.crc32(data)
if crc not in self.last_received:
self.dataReceived(data, opbyte)
self.send_ack(crc)
else:
self.got_ack(struct.unpack("! i", data)[0])
def dataReceived(self, data, bits):
print "received",data,"from",self.transport.getPeer()
def packetLost(self):
self.transport.loseConnection()
class ReliableDatagramStreamMessageProtocol(ReliableDatagramProtocol):
"Sends large messages (> mtu size) and sends them in a buffered block..."
MTU = 512
OP_BEGINMSG = 64 # 2nd msb set
OP_DATA = 0
def startProtocol(self):
ReliableDatagramProtocol.startProtocol(self)
self.outmessagequeue = [] # data len, stream, bits
self.inmessage = None
self.packetcount = 0
self.numpackets = 0
def sendMessage(self, data, bits=0):
self.outmessagequeue.append((len(data), StringIO.StringIO(data), bits))
if len(self.outmessagequeue) == 1:
self.send_first_block()
def send_first_block(self, dummy=""):
length,stream,bits = self.outmessagequeue[0]
length = length - self.MTU + 4
if length < 0:
length = 0
length = length + 1
self.send(struct.pack("! I", length / self.MTU + length % self.MTU) + stream.read(self.MTU - 4), self.OP_BEGINMSG | bits).addCallback(self.send_next_block)
def send_next_block(self, dummy=""):
length,stream,bits = self.outmessagequeue[0]
d = stream.read(self.MTU)
if len(d) > 0:
self.send(d, self.OP_DATA | bits).addCallback(self.send_next_block)
else:
stream.close()
del self.outmessagequeue[0]
if len(self.outmessagequeue) > 0:
self.send_next_block()
def dataReceived(self, data, bits):
if bits & 64 == self.OP_DATA:
self.packetcount = self.packetcount + 1
self.inmessage.write(data)
else:
self.inmessage = StringIO.StringIO()
self.packetcount = 1
mtu = len(data) # the entire packet size is the size of the mtu
self.numpackets = struct.unpack("! I", data[:4])[0]
self.inmessage.write(data[4:])
if self.packetcount >= self.numpackets:
self.messageReceived(self.inmessage.getvalue(), bits)
self.inmessage = StringIO.StringIO()
def messageReceived(self, message, bits):
print "got message", message
class DatagramRPCProtocol(ReliableDatagramStreamMessageProtocol):
OP_CALL = 32
OP_RETURN = 0
MAX_CALLID = 512
def startProtocol(self):
ReliableDatagramStreamMessageProtocol.startProtocol(self)
if hasattr(self.factory, "buildRemote"):
self.remotes = self.factory.buildRemote()
self.calls = {}
self.callid = 0
def messageReceived(self, message, bits):
message = StringIO.StringIO(message)
if bits & 32 == self.OP_CALL:
t, objname, methodname, args, kwargs = cbob_decode(message)
r = getattr(self.remotes[objname], methodname)(*args,**kwargs)
#print "calling " + objname + "." + methodname
if isinstance(r, defer.Deferred):
r.addCallback(lambda result: self.send_result(t, result))
else:
self.send_result(t, r)
else:
t,r = cbob_decode(message)
self.calls[t].callback(r)
def send_result(self, t, r):
sio = StringIO.StringIO()
cbob_encode((t, r), sio)
self.sendMessage(sio.getvalue(), self.OP_RETURN)
def callRemote(self, objname, methodname, *args, **kwargs):
t = self.callid
if self.callid == self.MAX_CALLID:
self.callid = 0
else:
self.callid = self.callid + 1
self.calls[t] = defer.Deferred()
sio = StringIO.StringIO()
cbob_encode((t, objname, methodname, args, kwargs), sio)
self.sendMessage(sio.getvalue(), self.OP_CALL)
return self.calls[t]
#class LazyRMIProtocol(DatagramRPCProtocol):
# raise "Todo: implement"
class TestObject:
def hello(self, name):
return "Hello, " + name.first + " " + name.last
class Name:
first = ""
last = ""
def test():
reactor.callLater(0, test2)
reactor.run()
def test2():
f = protocol.ServerFactory()
#f.protocol = ReliableDatagramProtocol
#f.protocol = ReliableDatagramStreamMessageProtocol
f.protocol = DatagramRPCProtocol
f.buildRemote = lambda: {"myObj" : TestObject()}
d = Dispatcher(f)
d2 = Dispatcher(f)
p = d[socket.gethostbyname("localhost"), 8889]
p2 = d[socket.gethostbyname("localhost"), 8888]
reactor.listenUDP(8888, d)
reactor.listenUDP(8889, d2)
reactor.callLater(0, test3, p, p2)
def test3(p, p2):
name = Name()
name.first = "great"
name.last = "world"
p.callRemote("myObj", "hello", name).addCallback(test4)
def test4(r):
print "result of call", r
if __name__ == "__main__":
test()
__________________________________________________________________
Switch to Netscape Internet Service.
As low as $9.95 a month -- Sign up today at http://isp.netscape.com/register
Netscape. Just the Net You Need.
New! Netscape Toolbar for Internet Explorer
Search from anywhere on the Web and block those annoying pop-ups.
Download now at http://channels.netscape.com/ns/search/install.jsp
_______________________________________________
p2p-hackers mailing list
p2p-hackers at zgp.org
http://zgp.org/mailman/listinfo/p2p-hackers
_______________________________________________
Here is a web page listing P2P Conferences:
http://www.neurogrid.net/twiki/bin/view/Main/PeerToPeerConferences
----- End forwarded message -----
--
Eugen* Leitl <a href="http://leitl.org">leitl</a>
______________________________________________________________
ICBM: 48.07078, 11.61144 http://www.leitl.org
8B29F6BE: 099D 78BA 2FD3 B014 B08A 7779 75B0 2443 8B29 F6BE
http://moleculardevices.org http://nanomachines.net
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 198 bytes
Desc: not available
Url : http://twistedmatrix.com/pipermail/twisted-python/attachments/20040807/bc6025da/attachment.pgp
More information about the Twisted-Python
mailing list