Ticket #454: udpbroadcast.py

File udpbroadcast.py, 3.3 KB (added by rwall, 3 years ago)

Send UDP broadcast messages for tracking a cluster of hosts

Line 
1# Copyright (c) Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5An example demonstrating how to send UDP broadcast messages for tracking a
6cluster of hosts at Layer 2.
7
8Each host broadcasts its ID on start up and upon receiving a broadcast, a host
9will unicast it's ID to the broadcaster.
10Additionally, each host periodically unicasts to each of its peers.
11
12Run using twistd. Eg
13
14* twistd -noy doc/core/examples/udpbroadcast.py
15"""
16
17from datetime import datetime, timedelta
18from uuid import uuid1, UUID
19
20from twisted.application import internet, service
21from twisted.internet.protocol import DatagramProtocol
22from twisted.python import log
23
24
25BROADCAST_INTERVAL = timedelta(seconds=1)
26STALE_INTERVAL = BROADCAST_INTERVAL * 2
27MYID = uuid1()
28PORT = 8555
29
30
31class Peer(object):
32    def __init__(self, id, address):
33        self.id = id
34        self.address = address
35
36
37    def __repr__(self):
38        return '<%s id=%r address=%r>' % (
39            self.__class__.__name__, self.id, self.address)
40
41
42class PeerTrackerProtocol(DatagramProtocol):
43    noisy = False
44
45    def __init__(self, controller, myid, port):
46        self.controller = controller
47        self.myid = myid
48        self.port = port
49
50
51    def startProtocol(self):
52        self.transport.setBroadcastAllowed(True)
53        self.broadcast()
54
55
56    def datagramReceived(self, datagram, addr):
57        uuid = UUID(bytes=datagram)
58        if uuid != self.myid:
59            peer = Peer(id=uuid, address=addr)
60            self.controller.peerReceived(peer, self)
61
62
63    def broadcast(self):
64        self.transport.write(self.myid.bytes, ('<broadcast>', self.port))
65
66
67    def ping(self, peer):
68        self.transport.write(self.myid.bytes, (peer.address[0], self.port))
69
70
71
72class Broadcaster(object):
73    def __init__(self):
74        self.peers = {}
75
76
77    def peerReceived(self, peer, proto):
78        if peer.id not in self.peers:
79            log.msg(format='NEW_PEER %(peer)r', peer=peer)
80            proto.broadcast()
81        self.peers[peer.id] = (datetime.utcnow(), peer)
82
83
84    def removeStalePeers(self, maxage):
85        now = datetime.utcnow()
86        for peerID, (lastSeen, peer) in self.peers.items():
87            diff = now - lastSeen
88            if diff > maxage:
89                del self.peers[peerID]
90                log.msg(
91                    format='REMOVED_PEER: %(peer)r, %(lastSeen)s',
92                    peer=peer, lastSeen=lastSeen)
93
94
95    def pingPeers(self, proto):
96        for lastSeen, peer in self.peers.values():
97            log.msg(
98                format='PING_PEER: %(address)r',
99                address=peer.address[0])
100            proto.ping(peer)
101
102
103    def makeService(self, myid, port, staleInterval):
104        application = service.Application('Broadcaster')
105
106        root = service.MultiService()
107        root.setServiceParent(application)
108
109        proto = PeerTrackerProtocol(controller=self, myid=myid, port=port)
110        root.addService(internet.UDPServer(port, proto))
111
112        root.addService(
113            internet.TimerService(
114                staleInterval.total_seconds(),
115                self.removeStalePeers, staleInterval))
116
117        root.addService(
118            internet.TimerService(
119                staleInterval.total_seconds(),
120                self.pingPeers, proto))
121
122        return application
123
124
125application = Broadcaster().makeService(
126    myid=MYID, port=PORT, staleInterval=STALE_INTERVAL)