root / trunk / twisted / internet / udp.py

Revision 24441, 12.9 kB (checked in by thijs, 1 year ago)

Merge maintainer-email-2438: Get rid of references to maintainer email addresses from code.

Author: thijs
Reviewer: exarkun
Fixes: #2438

Line 
1 # -*- test-case-name: twisted.test.test_udp -*-
2
3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6
7 """
8 Various asynchronous UDP classes.
9
10 Please do not use this module directly.
11
12 Maintainer: Itamar Shtull-Trauring
13 """
14
15 # System Imports
16 import os
17 import socket
18 import operator
19 import struct
20 import warnings
21 from zope.interface import implements
22
23 from twisted.python.runtime import platformType
24 if platformType == 'win32':
25     from errno import WSAEWOULDBLOCK as EWOULDBLOCK
26     from errno import WSAEINTR as EINTR
27     from errno import WSAEMSGSIZE as EMSGSIZE
28     from errno import WSAECONNREFUSED as ECONNREFUSED
29     from errno import WSAECONNRESET
30     EAGAIN=EWOULDBLOCK
31 else:
32     from errno import EWOULDBLOCK, EINTR, EMSGSIZE, ECONNREFUSED, EAGAIN
33
34 # Twisted Imports
35 from twisted.internet import protocol, base, defer, address
36 from twisted.persisted import styles
37 from twisted.python import log, reflect, failure
38
39 # Sibling Imports
40 import abstract, error, interfaces
41
42
43 class Port(base.BasePort):
44     """UDP port, listening for packets."""
45
46     implements(interfaces.IUDPTransport, interfaces.ISystemHandle)
47
48     addressFamily = socket.AF_INET
49     socketType = socket.SOCK_DGRAM
50     maxThroughput = 256 * 1024 # max bytes we read in one eventloop iteration
51
52     # Actual port number being listened on, only set to a non-None
53     # value when we are actually listening.
54     _realPortNumber = None
55
56     def __init__(self, port, proto, interface='', maxPacketSize=8192, reactor=None):
57         """Initialize with a numeric port to listen on.
58         """
59         base.BasePort.__init__(self, reactor)
60         self.port = port
61         self.protocol = proto
62         self.maxPacketSize = maxPacketSize
63         self.interface = interface
64         self.setLogStr()
65         self._connectedAddr = None
66
67     def __repr__(self):
68         if self._realPortNumber is not None:
69             return "<%s on %s>" % (self.protocol.__class__, self._realPortNumber)
70         else:
71             return "<%s not connected>" % (self.protocol.__class__,)
72
73     def getHandle(self):
74         """Return a socket object."""
75         return self.socket
76
77     def startListening(self):
78         """Create and bind my socket, and begin listening on it.
79
80         This is called on unserialization, and must be called after creating a
81         server to begin listening on the specified port.
82         """
83         self._bindSocket()
84         self._connectToProtocol()
85
86     def _bindSocket(self):
87         try:
88             skt = self.createInternetSocket()
89             skt.bind((self.interface, self.port))
90         except socket.error, le:
91             raise error.CannotListenError, (self.interface, self.port, le)
92
93         # Make sure that if we listened on port 0, we update that to
94         # reflect what the OS actually assigned us.
95         self._realPortNumber = skt.getsockname()[1]
96
97         log.msg("%s starting on %s"%(self.protocol.__class__, self._realPortNumber))
98
99         self.connected = 1
100         self.socket = skt
101         self.fileno = self.socket.fileno
102
103     def _connectToProtocol(self):
104         self.protocol.makeConnection(self)
105         self.startReading()
106
107
108     def doRead(self):
109         """Called when my socket is ready for reading."""
110         read = 0
111         while read < self.maxThroughput:
112             try:
113                 data, addr = self.socket.recvfrom(self.maxPacketSize)
114             except socket.error, se:
115                 no = se.args[0]
116                 if no in (EAGAIN, EINTR, EWOULDBLOCK):
117                     return
118                 if (no == ECONNREFUSED) or (platformType == "win32" and no == WSAECONNRESET):
119                     if self._connectedAddr:
120                         self.protocol.connectionRefused()
121                 else:
122                     raise
123             else:
124                 read += len(data)
125                 try:
126                     self.protocol.datagramReceived(data, addr)
127                 except:
128                     log.err()
129
130
131     def write(self, datagram, addr=None):
132         """Write a datagram.
133
134         @param addr: should be a tuple (ip, port), can be None in connected mode.
135         """
136         if self._connectedAddr:
137             assert addr in (None, self._connectedAddr)
138             try:
139                 return self.socket.send(datagram)
140             except socket.error, se:
141                 no = se.args[0]
142                 if no == EINTR:
143                     return self.write(datagram)
144                 elif no == EMSGSIZE:
145                     raise error.MessageLengthError, "message too long"
146                 elif no == ECONNREFUSED:
147                     self.protocol.connectionRefused()
148                 else:
149                     raise
150         else:
151             assert addr != None
152             if not addr[0].replace(".", "").isdigit():
153                 warnings.warn("Please only pass IPs to write(), not hostnames", DeprecationWarning, stacklevel=2)
154             try:
155                 return self.socket.sendto(datagram, addr)
156             except socket.error, se:
157                 no = se.args[0]
158                 if no == EINTR:
159                     return self.write(datagram, addr)
160                 elif no == EMSGSIZE:
161                     raise error.MessageLengthError, "message too long"
162                 elif no == ECONNREFUSED:
163                     # in non-connected UDP ECONNREFUSED is platform dependent, I think
164                     # and the info is not necessarily useful. Nevertheless maybe we
165                     # should call connectionRefused? XXX
166                     return
167                 else:
168                     raise
169
170     def writeSequence(self, seq, addr):
171         self.write("".join(seq), addr)
172
173     def connect(self, host, port):
174         """'Connect' to remote server."""
175         if self._connectedAddr:
176             raise RuntimeError, "already connected, reconnecting is not currently supported (talk to itamar if you want this)"
177         if not abstract.isIPAddress(host):
178             raise ValueError, "please pass only IP addresses, not domain names"
179         self._connectedAddr = (host, port)
180         self.socket.connect((host, port))
181
182     def _loseConnection(self):
183         self.stopReading()
184         if self.connected: # actually means if we are *listening*
185             from twisted.internet import reactor
186             reactor.callLater(0, self.connectionLost)
187
188     def stopListening(self):
189         if self.connected:
190             result = self.d = defer.Deferred()
191         else:
192             result = None
193         self._loseConnection()
194         return result
195
196     def loseConnection(self):
197         warnings.warn("Please use stopListening() to disconnect port", DeprecationWarning, stacklevel=2)
198         self.stopListening()
199
200     def connectionLost(self, reason=None):
201         """Cleans up my socket.
202         """
203         log.msg('(Port %s Closed)' % self._realPortNumber)
204         self._realPortNumber = None
205         base.BasePort.connectionLost(self, reason)
206         if hasattr(self, "protocol"):
207             # we won't have attribute in ConnectedPort, in cases
208             # where there was an error in connection process
209             self.protocol.doStop()
210         self.connected = 0
211         self.socket.close()
212         del self.socket
213         del self.fileno
214         if hasattr(self, "d"):
215             self.d.callback(None)
216             del self.d
217
218     def setLogStr(self):
219         self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
220
221     def logPrefix(self):
222         """Returns the name of my class, to prefix log entries with.
223         """
224         return self.logstr
225
226     def getHost(self):
227         """
228         Returns an IPv4Address.
229
230         This indicates the address from which I am connecting.
231         """
232         return address.IPv4Address('UDP', *(self.socket.getsockname() + ('INET_UDP',)))
233
234
235 class ConnectedPort(Port):
236     """DEPRECATED.
237
238     A connected UDP socket."""
239
240     implements(interfaces.IUDPConnectedTransport)
241
242     def __init__(self, (remotehost, remoteport), port, proto, interface='', maxPacketSize=8192, reactor=None):
243         Port.__init__(self, port, proto, interface, maxPacketSize, reactor)
244         self.remotehost = remotehost
245         self.remoteport = remoteport
246
247     def startListening(self):
248         self._bindSocket()
249         if abstract.isIPAddress(self.remotehost):
250             self.setRealAddress(self.remotehost)
251         else:
252             self.realAddress = None
253             d = self.reactor.resolve(self.remotehost)
254             d.addCallback(self.setRealAddress).addErrback(self.connectionFailed)
255
256     def setRealAddress(self, addr):
257         self.realAddress = addr
258         self.socket.connect((addr, self.remoteport))
259         self._connectToProtocol()
260
261     def connectionFailed(self, reason):
262         self._loseConnection()
263         self.protocol.connectionFailed(reason)
264         del self.protocol
265
266     def doRead(self):
267         """Called when my socket is ready for reading."""
268         read = 0
269         while read < self.maxThroughput:
270             try:
271                 data, addr = self.socket.recvfrom(self.maxPacketSize)
272                 read += len(data)
273                 self.protocol.datagramReceived(data)
274             except socket.error, se:
275                 no = se.args[0]
276                 if no in (EAGAIN, EINTR, EWOULDBLOCK):
277                     return
278                 if (no == ECONNREFUSED) or (platformType == "win32" and no == WSAECONNRESET):
279                     self.protocol.connectionRefused()
280                 else:
281                     raise
282             except:
283                 log.deferr()
284
285     def write(self, data):
286         """Write a datagram."""
287         try:
288             return self.socket.send(data)
289         except socket.error, se:
290             no = se.args[0]
291             if no == EINTR:
292                 return self.write(data)
293             elif no == EMSGSIZE:
294                 raise error.MessageLengthError, "message too long"
295             elif no == ECONNREFUSED:
296                 self.protocol.connectionRefused()
297             else:
298                 raise
299
300     def getPeer(self):
301         """
302         Returns a tuple of ('INET_UDP', hostname, port), indicating
303         the remote address.
304         """
305         return address.IPv4Address('UDP', self.remotehost, self.remoteport, 'INET_UDP')
306
307
308 class MulticastMixin:
309     """Implement multicast functionality."""
310
311     def getOutgoingInterface(self):
312         i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF)
313         return socket.inet_ntoa(struct.pack("@i", i))
314
315     def setOutgoingInterface(self, addr):
316         """Returns Deferred of success."""
317         return self.reactor.resolve(addr).addCallback(self._setInterface)
318
319     def _setInterface(self, addr):
320         i = socket.inet_aton(addr)
321         self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i)
322         return 1
323
324     def getLoopbackMode(self):
325         return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP)
326
327     def setLoopbackMode(self, mode):
328         mode = struct.pack("b", operator.truth(mode))
329         self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, mode)
330
331     def getTTL(self):
332         return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL)
333
334     def setTTL(self, ttl):
335         ttl = struct.pack("B", ttl)
336         self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
337
338     def joinGroup(self, addr, interface=""):
339         """Join a multicast group. Returns Deferred of success."""
340         return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 1)
341
342     def _joinAddr1(self, addr, interface, join):
343         return self.reactor.resolve(interface).addCallback(self._joinAddr2, addr, join)
344
345     def _joinAddr2(self, interface, addr, join):
346         addr = socket.inet_aton(addr)
347         interface = socket.inet_aton(interface)
348         if join:
349             cmd = socket.IP_ADD_MEMBERSHIP
350         else:
351             cmd = socket.IP_DROP_MEMBERSHIP
352         try:
353             self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
354         except socket.error, e:
355             return failure.Failure(error.MulticastJoinError(addr, interface, *e.args))
356
357     def leaveGroup(self, addr, interface=""):
358         """Leave multicast group, return Deferred of success."""
359         return self.reactor.resolve(addr).addCallback(self._joinAddr1, interface, 0)
360
361
362 class MulticastPort(MulticastMixin, Port):
363     """UDP Port that supports multicasting."""
364
365     implements(interfaces.IMulticastTransport)
366
367     def __init__(self, port, proto, interface='', maxPacketSize=8192, reactor=None, listenMultiple=False):
368         Port.__init__(self, port, proto, interface, maxPacketSize, reactor)
369         self.listenMultiple = listenMultiple
370
371     def createInternetSocket(self):
372         skt = Port.createInternetSocket(self)
373         if self.listenMultiple:
374             skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
375             if hasattr(socket, "SO_REUSEPORT"):
376                 skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
377         return skt
378
379
380 class ConnectedMulticastPort(MulticastMixin, ConnectedPort):
381     """DEPRECATED.
382
383     Connected UDP Port that supports multicasting."""
384
385     implements(interfaces.IMulticastTransport)
Note: See TracBrowser for help on using the browser.