root / trunk / twisted / internet / tcp.py

Revision 26296, 36.2 kB (checked in by exarkun, 4 months ago)

Merge failed-connectionLost-3654

Author: washort, exarkun
Reviewer: therve
Fixes: #3654

Take care to handle exceptions raised by FileDescriptor.connectionLost in
the delayed call to that method by Port.stopListening. Previously an exception
would cause the Deferred returned to never fire; now it will errback with
the exception instead.

Line 
1 # -*- test-case-name: twisted.test.test_tcp -*-
2 # Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Various asynchronous TCP/IP classes.
7
8 End users shouldn't use this module directly - use the reactor APIs instead.
9
10 Maintainer: Itamar Shtull-Trauring
11 """
12
13
14 # System Imports
15 import os
16 import types
17 import socket
18 import sys
19 import operator
20
21 from zope.interface import implements, classImplements
22
23 try:
24     from OpenSSL import SSL
25 except ImportError:
26     SSL = None
27
28 from twisted.python.runtime import platformType
29
30
31 if platformType == 'win32':
32     # no such thing as WSAEPERM or error code 10001 according to winsock.h or MSDN
33     EPERM = object()
34     from errno import WSAEINVAL as EINVAL
35     from errno import WSAEWOULDBLOCK as EWOULDBLOCK
36     from errno import WSAEINPROGRESS as EINPROGRESS
37     from errno import WSAEALREADY as EALREADY
38     from errno import WSAECONNRESET as ECONNRESET
39     from errno import WSAEISCONN as EISCONN
40     from errno import WSAENOTCONN as ENOTCONN
41     from errno import WSAEINTR as EINTR
42     from errno import WSAENOBUFS as ENOBUFS
43     from errno import WSAEMFILE as EMFILE
44     # No such thing as WSAENFILE, either.
45     ENFILE = object()
46     # Nor ENOMEM
47     ENOMEM = object()
48     EAGAIN = EWOULDBLOCK
49     from errno import WSAECONNRESET as ECONNABORTED
50
51     from twisted.python.win32 import formatError as strerror
52 else:
53     from errno import EPERM
54     from errno import EINVAL
55     from errno import EWOULDBLOCK
56     from errno import EINPROGRESS
57     from errno import EALREADY
58     from errno import ECONNRESET
59     from errno import EISCONN
60     from errno import ENOTCONN
61     from errno import EINTR
62     from errno import ENOBUFS
63     from errno import EMFILE
64     from errno import ENFILE
65     from errno import ENOMEM
66     from errno import EAGAIN
67     from errno import ECONNABORTED
68
69     from os import strerror
70
71 from errno import errorcode
72
73 # Twisted Imports
74 from twisted.internet import defer, base, address, fdesc
75 from twisted.internet.task import deferLater
76 from twisted.python import log, failure, reflect
77 from twisted.python.util import unsignedID
78 from twisted.internet.error import CannotListenError
79 from twisted.internet import abstract, main, interfaces, error
80
81
82
83 class _SocketCloser:
84     _socketShutdownMethod = 'shutdown'
85
86     def _closeSocket(self):
87         # socket.close() doesn't *really* close if there's another reference
88         # to it in the TCP/IP stack, e.g. if it was was inherited by a
89         # subprocess. And we really do want to close the connection. So we
90         # use shutdown() instead, and then close() in order to release the
91         # filedescriptor.
92         skt = self.socket
93         try:
94             getattr(skt, self._socketShutdownMethod)(2)
95         except socket.error:
96             pass
97         try:
98             skt.close()
99         except socket.error:
100             pass
101
102
103
104 class _TLSMixin:
105     _socketShutdownMethod = 'sock_shutdown'
106
107     writeBlockedOnRead = 0
108     readBlockedOnWrite = 0
109     _userWantRead = _userWantWrite = True
110
111     def getPeerCertificate(self):
112         return self.socket.get_peer_certificate()
113
114     def doRead(self):
115         if self.disconnected:
116             # See the comment in the similar check in doWrite below.
117             # Additionally, in order for anything other than returning
118             # CONNECTION_DONE here to make sense, it will probably be necessary
119             # to implement a way to switch back to TCP from TLS (actually, if
120             # we did something other than return CONNECTION_DONE, that would be
121             # a big part of implementing that feature).  In other words, the
122             # expectation is that doRead will be called when self.disconnected
123             # is True only when the connection has been lost.  It's possible
124             # that the other end could stop speaking TLS and then send us some
125             # non-TLS data.  We'll end up ignoring that data and dropping the
126             # connection.  There's no unit tests for this check in the cases
127             # where it makes a difference.  The test suite only hits this
128             # codepath when it would have otherwise hit the SSL.ZeroReturnError
129             # exception handler below, which has exactly the same behavior as
130             # this conditional.  Maybe that's the only case that can ever be
131             # triggered, I'm not sure.  -exarkun
132             return main.CONNECTION_DONE
133         if self.writeBlockedOnRead:
134             self.writeBlockedOnRead = 0
135             self._resetReadWrite()
136         try:
137             return Connection.doRead(self)
138         except SSL.ZeroReturnError:
139             return main.CONNECTION_DONE
140         except SSL.WantReadError:
141             return
142         except SSL.WantWriteError:
143             self.readBlockedOnWrite = 1
144             Connection.startWriting(self)
145             Connection.stopReading(self)
146             return
147         except SSL.SysCallError, (retval, desc):
148             if ((retval == -1 and desc == 'Unexpected EOF')
149                 or retval > 0):
150                 return main.CONNECTION_LOST
151             log.err()
152             return main.CONNECTION_LOST
153         except SSL.Error, e:
154             return e
155
156     def doWrite(self):
157         # Retry disconnecting
158         if self.disconnected:
159             # This case is triggered when "disconnected" is set to True by a
160             # call to _postLoseConnection from FileDescriptor.doWrite (to which
161             # we upcall at the end of this overridden version of that API).  It
162             # means that while, as far as any protocol connected to this
163             # transport is concerned, the connection no longer exists, the
164             # connection *does* actually still exist.  Instead of closing the
165             # connection in the overridden _postLoseConnection, we probably
166             # tried (and failed) to send a TLS close alert.  The TCP connection
167             # is still up and we're waiting for the socket to become writeable
168             # enough for the TLS close alert to actually be sendable.  Only
169             # then will the connection actually be torn down. -exarkun
170             return self._postLoseConnection()
171         if self._writeDisconnected:
172             return self._closeWriteConnection()
173
174         if self.readBlockedOnWrite:
175             self.readBlockedOnWrite = 0
176             self._resetReadWrite()
177         return Connection.doWrite(self)
178
179     def writeSomeData(self, data):
180         try:
181             return Connection.writeSomeData(self, data)
182         except SSL.WantWriteError:
183             return 0
184         except SSL.WantReadError:
185             self.writeBlockedOnRead = 1
186             Connection.stopWriting(self)
187             Connection.startReading(self)
188             return 0
189         except SSL.ZeroReturnError:
190             return main.CONNECTION_LOST
191         except SSL.SysCallError, e:
192             if e[0] == -1 and data == "":
193                 # errors when writing empty strings are expected
194                 # and can be ignored
195                 return 0
196             else:
197                 return main.CONNECTION_LOST
198         except SSL.Error, e:
199             return e
200
201
202     def _postLoseConnection(self):
203         """
204         Gets called after loseConnection(), after buffered data is sent.
205
206         We try to send an SSL shutdown alert, but if it doesn't work, retry
207         when the socket is writable.
208         """
209         # Here, set "disconnected" to True to trick higher levels into thinking
210         # the connection is really gone.  It's not, and we're not going to
211         # close it yet.  Instead, we'll try to send a TLS close alert to shut
212         # down the TLS connection cleanly.  Only after we actually get the
213         # close alert into the socket will we disconnect the underlying TCP
214         # connection.
215         self.disconnected = True
216         if hasattr(self.socket, 'set_shutdown'):
217             # If possible, mark the state of the TLS connection as having
218             # already received a TLS close alert from the peer.  Why do
219             # this???
220             self.socket.set_shutdown(SSL.RECEIVED_SHUTDOWN)
221         return self._sendCloseAlert()
222
223
224     def _sendCloseAlert(self):
225         # Okay, *THIS* is a bit complicated.
226
227         # Basically, the issue is, OpenSSL seems to not actually return
228         # errors from SSL_shutdown. Therefore, the only way to
229         # determine if the close notification has been sent is by
230         # SSL_shutdown returning "done". However, it will not claim it's
231         # done until it's both sent *and* received a shutdown notification.
232
233         # I don't actually want to wait for a received shutdown
234         # notification, though, so, I have to set RECEIVED_SHUTDOWN
235         # before calling shutdown. Then, it'll return True once it's
236         # *SENT* the shutdown.
237
238         # However, RECEIVED_SHUTDOWN can't be left set, because then
239         # reads will fail, breaking half close.
240
241         # Also, since shutdown doesn't report errors, an empty write call is
242         # done first, to try to detect if the connection has gone away.
243         # (*NOT* an SSL_write call, because that fails once you've called
244         # shutdown)
245         try:
246             os.write(self.socket.fileno(), '')
247         except OSError, se:
248             if se.args[0] in (EINTR, EWOULDBLOCK, ENOBUFS):
249                 return 0
250             # Write error, socket gone
251             return main.CONNECTION_LOST
252
253         try:
254             if hasattr(self.socket, 'set_shutdown'):
255                 laststate = self.socket.get_shutdown()
256                 self.socket.set_shutdown(laststate | SSL.RECEIVED_SHUTDOWN)
257                 done = self.socket.shutdown()
258                 if not (laststate & SSL.RECEIVED_SHUTDOWN):
259                     self.socket.set_shutdown(SSL.SENT_SHUTDOWN)
260             else:
261                 #warnings.warn("SSL connection shutdown possibly unreliable, "
262                 #              "please upgrade to ver 0.XX", category=UserWarning)
263                 self.socket.shutdown()
264                 done = True
265         except SSL.Error, e:
266             return e
267
268         if done:
269             self.stopWriting()
270             # Note that this is tested for by identity below.
271             return main.CONNECTION_DONE
272         else:
273             # For some reason, the close alert wasn't sent.  Start writing
274             # again so that we'll get another chance to send it.
275             self.startWriting()
276             # On Linux, select will sometimes not report a closed file
277             # descriptor in the write set (in particular, it seems that if a
278             # send() fails with EPIPE, the socket will not appear in the write
279             # set).  The shutdown call above (which calls down to SSL_shutdown)
280             # may have swallowed a write error.  Therefore, also start reading
281             # so that if the socket is closed we will notice.  This doesn't
282             # seem to be a problem for poll (because poll reports errors
283             # separately) or with select on BSD (presumably because, unlike
284             # Linux, it doesn't implement select in terms of poll and then map
285             # POLLHUP to select's in fd_set).
286             self.startReading()
287             return None
288
289     def _closeWriteConnection(self):
290         result = self._sendCloseAlert()
291
292         if result is main.CONNECTION_DONE:
293             return Connection._closeWriteConnection(self)
294
295         return result
296
297     def startReading(self):
298         self._userWantRead = True
299         if not self.readBlockedOnWrite:
300             return Connection.startReading(self)
301
302     def stopReading(self):
303         self._userWantRead = False
304         if not self.writeBlockedOnRead:
305             return Connection.stopReading(self)
306
307     def startWriting(self):
308         self._userWantWrite = True
309         if not self.writeBlockedOnRead:
310             return Connection.startWriting(self)
311
312     def stopWriting(self):
313         self._userWantWrite = False
314         if not self.readBlockedOnWrite:
315             return Connection.stopWriting(self)
316
317     def _resetReadWrite(self):
318         # After changing readBlockedOnWrite or writeBlockedOnRead,
319         # call this to reset the state to what the user requested.
320         if self._userWantWrite:
321             self.startWriting()
322         else:
323             self.stopWriting()
324
325         if self._userWantRead:
326             self.startReading()
327         else:
328             self.stopReading()
329
330
331
332 class _TLSDelayed(object):
333     """
334     State tracking record for TLS startup parameters.  Used to remember how
335     TLS should be started when starting it is delayed to wait for the output
336     buffer to be flushed.
337
338     @ivar bufferedData: A C{list} which contains all the data which was
339         written to the transport after an attempt to start TLS was made but
340         before the buffers outstanding at that time could be flushed and TLS
341         could really be started.  This is appended to by the transport's
342         write and writeSequence methods until it is possible to actually
343         start TLS, then it is written to the TLS-enabled transport.
344
345     @ivar context: An SSL context factory object to use to start TLS.
346
347     @ivar extra: An extra argument to pass to the transport's C{startTLS}
348         method.
349     """
350     def __init__(self, bufferedData, context, extra):
351         self.bufferedData = bufferedData
352         self.context = context
353         self.extra = extra
354
355
356
357 def _getTLSClass(klass, _existing={}):
358     if klass not in _existing:
359         class TLSConnection(_TLSMixin, klass):
360             implements(interfaces.ISSLTransport)
361         _existing[klass] = TLSConnection
362     return _existing[klass]
363
364
365
366 class Connection(abstract.FileDescriptor, _SocketCloser):
367     """
368     Superclass of all socket-based FileDescriptors.
369
370     This is an abstract superclass of all objects which represent a TCP/IP
371     connection based socket.
372
373     @ivar logstr: prefix used when logging events related to this connection.
374     @type logstr: C{str}
375     """
376
377     implements(interfaces.ITCPTransport, interfaces.ISystemHandle)
378
379     TLS = 0
380
381     def __init__(self, skt, protocol, reactor=None):
382         abstract.FileDescriptor.__init__(self, reactor=reactor)
383         self.socket = skt
384         self.socket.setblocking(0)
385         self.fileno = skt.fileno
386         self.protocol = protocol
387
388     if SSL:
389         _tlsWaiting = None
390         def startTLS(self, ctx, extra):
391             assert not self.TLS
392             if self.dataBuffer or self._tempDataBuffer:
393                 # pre-TLS bytes are still being written.  Starting TLS now
394                 # will do the wrong thing.  Instead, mark that we're trying
395                 # to go into the TLS state.
396                 self._tlsWaiting = _TLSDelayed([], ctx, extra)
397                 return False
398
399             self.stopReading()
400             self.stopWriting()
401             self._startTLS()
402             self.socket = SSL.Connection(ctx.getContext(), self.socket)
403             self.fileno = self.socket.fileno
404             self.startReading()
405             return True
406
407
408         def _startTLS(self):
409             self.TLS = 1
410             self.__class__ = _getTLSClass(self.__class__)
411
412
413         def write(self, bytes):
414             if self._tlsWaiting is not None:
415                 self._tlsWaiting.bufferedData.append(bytes)
416             else:
417                 abstract.FileDescriptor.write(self, bytes)
418
419
420         def writeSequence(self, iovec):
421             if self._tlsWaiting is not None:
422                 self._tlsWaiting.bufferedData.extend(iovec)
423             else:
424                 abstract.FileDescriptor.writeSequence(self, iovec)
425
426
427         def doWrite(self):
428             result = abstract.FileDescriptor.doWrite(self)
429             if self._tlsWaiting is not None:
430                 if not self.dataBuffer and not self._tempDataBuffer:
431                     waiting = self._tlsWaiting
432                     self._tlsWaiting = None
433                     self.startTLS(waiting.context, waiting.extra)
434                     self.writeSequence(waiting.bufferedData)
435             return result
436
437
438     def getHandle(self):
439         """Return the socket for this connection."""
440         return self.socket
441
442
443     def doRead(self):
444         """Calls self.protocol.dataReceived with all available data.
445
446         This reads up to self.bufferSize bytes of data from its socket, then
447         calls self.dataReceived(data) to process it.  If the connection is not
448         lost through an error in the physical recv(), this function will return
449         the result of the dataReceived call.
450         """
451         try:
452             data = self.socket.recv(self.bufferSize)
453         except socket.error, se:
454             if se.args[0] == EWOULDBLOCK:
455                 return
456             else:
457                 return main.CONNECTION_LOST
458         if not data:
459             return main.CONNECTION_DONE
460         return self.protocol.dataReceived(data)
461
462
463     def writeSomeData(self, data):
464         """
465         Write as much as possible of the given data to this TCP connection.
466
467         This sends up to C{self.SEND_LIMIT} bytes from C{data}.  If the
468         connection is lost, an exception is returned.  Otherwise, the number
469         of bytes successfully written is returned.
470         """
471         try:
472             # Limit length of buffer to try to send, because some OSes are too
473             # stupid to do so themselves (ahem windows)
474             return self.socket.send(buffer(data, 0, self.SEND_LIMIT))
475         except socket.error, se:
476             if se.args[0] == EINTR:
477                 return self.writeSomeData(data)
478             elif se.args[0] in (EWOULDBLOCK, ENOBUFS):
479                 return 0
480             else:
481                 return main.CONNECTION_LOST
482
483
484     def _closeWriteConnection(self):
485         try:
486             getattr(self.socket, self._socketShutdownMethod)(1)
487         except socket.error:
488             pass
489         p = interfaces.IHalfCloseableProtocol(self.protocol, None)
490         if p:
491             try:
492                 p.writeConnectionLost()
493             except:
494                 f = failure.Failure()
495                 log.err()
496                 self.connectionLost(f)
497
498
499     def readConnectionLost(self, reason):
500         p = interfaces.IHalfCloseableProtocol(self.protocol, None)
501         if p:
502             try:
503                 p.readConnectionLost()
504             except:
505                 log.err()
506                 self.connectionLost(failure.Failure())
507         else:
508             self.connectionLost(reason)
509
510     def connectionLost(self, reason):
511         """See abstract.FileDescriptor.connectionLost().
512         """
513         abstract.FileDescriptor.connectionLost(self, reason)
514         self._closeSocket()
515         protocol = self.protocol
516         del self.protocol
517         del self.socket
518         del self.fileno
519         protocol.connectionLost(reason)
520
521     logstr = "Uninitialized"
522
523     def logPrefix(self):
524         """Return the prefix to log with when I own the logging thread.
525         """
526         return self.logstr
527
528     def getTcpNoDelay(self):
529         return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
530
531     def setTcpNoDelay(self, enabled):
532         self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
533
534     def getTcpKeepAlive(self):
535         return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
536                                                      socket.SO_KEEPALIVE))
537
538     def setTcpKeepAlive(self, enabled):
539         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
540
541 if SSL:
542     classImplements(Connection, interfaces.ITLSTransport)
543
544 class BaseClient(Connection):
545     """A base class for client TCP (and similiar) sockets.
546     """
547     addressFamily = socket.AF_INET
548     socketType = socket.SOCK_STREAM
549
550     def _finishInit(self, whenDone, skt, error, reactor):
551         """Called by base classes to continue to next stage of initialization."""
552         if whenDone:
553             Connection.__init__(self, skt, None, reactor)
554             self.doWrite = self.doConnect
555             self.doRead = self.doConnect
556             reactor.callLater(0, whenDone)
557         else:
558             reactor.callLater(0, self.failIfNotConnected, error)
559
560     def startTLS(self, ctx, client=1):
561         if Connection.startTLS(self, ctx, client):
562             if client:
563                 self.socket.set_connect_state()
564             else:
565                 self.socket.set_accept_state()
566
567
568     def stopConnecting(self):
569         """Stop attempt to connect."""
570         self.failIfNotConnected(error.UserError())
571
572     def failIfNotConnected(self, err):
573         """
574         Generic method called when the attemps to connect failed. It basically
575         cleans everything it can: call connectionFailed, stop read and write,
576         delete socket related members.
577         """
578         if (self.connected or self.disconnected or
579             not hasattr(self, "connector")):
580             return
581
582         self.connector.connectionFailed(failure.Failure(err))
583         if hasattr(self, "reactor"):
584             # this doesn't happen if we failed in __init__
585             self.stopReading()
586             self.stopWriting()
587             del self.connector
588
589         try:
590             self._closeSocket()
591         except AttributeError:
592             pass
593         else:
594             del self.socket, self.fileno
595
596     def createInternetSocket(self):
597         """(internal) Create a non-blocking socket using
598         self.addressFamily, self.socketType.
599         """
600         s = socket.socket(self.addressFamily, self.socketType)
601         s.setblocking(0)
602         fdesc._setCloseOnExec(s.fileno())
603         return s
604
605     def resolveAddress(self):
606         if abstract.isIPAddress(self.addr[0]):
607             self._setRealAddress(self.addr[0])
608         else:
609             d = self.reactor.resolve(self.addr[0])
610             d.addCallbacks(self._setRealAddress, self.failIfNotConnected)
611
612     def _setRealAddress(self, address):
613         self.realAddress = (address, self.addr[1])
614         self.doConnect()
615
616     def doConnect(self):
617         """I connect the socket.
618
619         Then, call the protocol's makeConnection, and start waiting for data.
620         """
621         if not hasattr(self, "connector"):
622             # this happens when connection failed but doConnect
623             # was scheduled via a callLater in self._finishInit
624             return
625
626         err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
627         if err:
628             self.failIfNotConnected(error.getConnectError((err, strerror(err))))
629             return
630
631
632         # doConnect gets called twice.  The first time we actually need to
633         # start the connection attempt.  The second time we don't really
634         # want to (SO_ERROR above will have taken care of any errors, and if
635         # it reported none, the mere fact that doConnect was called again is
636         # sufficient to indicate that the connection has succeeded), but it
637         # is not /particularly/ detrimental to do so.  This should get
638         # cleaned up some day, though.
639         try:
640             connectResult = self.socket.connect_ex(self.realAddress)
641         except socket.error, se:
642             connectResult = se.args[0]
643         if connectResult:
644             if connectResult == EISCONN:
645                 pass
646             # on Windows EINVAL means sometimes that we should keep trying:
647             # http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winsock/winsock/connect_2.asp
648             elif ((connectResult in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or
649                   (connectResult == EINVAL and platformType == "win32")):
650                 self.startReading()
651                 self.startWriting()
652                 return
653             else:
654                 self.failIfNotConnected(error.getConnectError((connectResult, strerror(connectResult))))
655                 return
656
657         # If I have reached this point without raising or returning, that means
658         # that the socket is connected.
659         del self.doWrite
660         del self.doRead
661         # we first stop and then start, to reset any references to the old doRead
662         self.stopReading()
663         self.stopWriting()
664         self._connectDone()
665
666     def _connectDone(self):
667         self.protocol = self.connector.buildProtocol(self.getPeer())
668         self.connected = 1
669         self.logstr = self.protocol.__class__.__name__ + ",client"
670         self.startReading()
671         self.protocol.makeConnection(self)
672
673     def connectionLost(self, reason):
674         if not self.connected:
675             self.failIfNotConnected(error.ConnectError(string=reason))
676         else:
677             Connection.connectionLost(self, reason)
678             self.connector.connectionLost(reason)
679
680
681 class Client(BaseClient):
682     """A TCP client."""
683
684     def __init__(self, host, port, bindAddress, connector, reactor=None):
685         # BaseClient.__init__ is invoked later
686         self.connector = connector
687         self.addr = (host, port)
688
689         whenDone = self.resolveAddress
690         err = None
691         skt = None
692
693         try:
694             skt = self.createInternetSocket()
695         except socket.error, se:
696             err = error.ConnectBindError(se[0], se[1])
697             whenDone = None
698         if whenDone and bindAddress is not None:
699             try:
700                 skt.bind(bindAddress)
701             except socket.error, se:
702                 err = error.ConnectBindError(se[0], se[1])
703                 whenDone = None
704         self._finishInit(whenDone, skt, err, reactor)
705
706     def getHost(self):
707         """Returns an IPv4Address.
708
709         This indicates the address from which I am connecting.
710         """
711         return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
712
713     def getPeer(self):
714         """Returns an IPv4Address.
715
716         This indicates the address that I am connected to.
717         """
718         return address.IPv4Address('TCP', *(self.realAddress + ('INET',)))
719
720     def __repr__(self):
721         s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self))
722         return s
723
724
725 class Server(Connection):
726     """
727     Serverside socket-stream connection class.
728
729     This is a serverside network connection transport; a socket which came from
730     an accept() on a server.
731     """
732
733     def __init__(self, sock, protocol, client, server, sessionno, reactor):
734         """
735         Server(sock, protocol, client, server, sessionno)
736
737         Initialize it with a socket, a protocol, a descriptor for my peer (a
738         tuple of host, port describing the other end of the connection), an
739         instance of Port, and a session number.
740         """
741         Connection.__init__(self, sock, protocol, reactor)
742         self.server = server
743         self.client = client
744         self.sessionno = sessionno
745         self.hostname = client[0]
746         self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__,
747                                     sessionno,
748                                     self.hostname)
749         self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
750                                           self.sessionno,
751                                           self.server._realPortNumber)
752         self.startReading()
753         self.connected = 1
754
755     def __repr__(self):
756         """A string representation of this connection.
757         """
758         return self.repstr
759
760     def startTLS(self, ctx, server=1):
761         if Connection.startTLS(self, ctx, server):
762             if server:
763                 self.socket.set_accept_state()
764             else:
765                 self.socket.set_connect_state()
766
767
768     def getHost(self):
769         """Returns an IPv4Address.
770
771         This indicates the server's address.
772         """
773         return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
774
775     def getPeer(self):
776         """Returns an IPv4Address.
777
778         This indicates the client's address.
779         """
780         return address.IPv4Address('TCP', *(self.client + ('INET',)))
781
782 class Port(base.BasePort, _SocketCloser):
783     """
784     A TCP server port, listening for connections.
785
786     When a connection is accepted, this will call a factory's buildProtocol
787     with the incoming address as an argument, according to the specification
788     described in L{twisted.internet.interfaces.IProtocolFactory}.
789
790     If you wish to change the sort of transport that will be used, the
791     C{transport} attribute will be called with the signature expected for
792     C{Server.__init__}, so it can be replaced.
793
794     @ivar deferred: a deferred created when L{stopListening} is called, and
795         that will fire when connection is lost. This is not to be used it
796         directly: prefer the deferred returned by L{stopListening} instead.
797     @type deferred: L{defer.Deferred}
798
799     @ivar disconnecting: flag indicating that the L{stopListening} method has
800         been called and that no connections should be accepted anymore.
801     @type disconnecting: C{bool}
802
803     @ivar connected: flag set once the listen has successfully been called on
804         the socket.
805     @type connected: C{bool}
806     """
807
808     implements(interfaces.IListeningPort)
809
810     addressFamily = socket.AF_INET
811     socketType = socket.SOCK_STREAM
812
813     transport = Server
814     sessionno = 0
815     interface = ''
816     backlog = 50
817
818     # Actual port number being listened on, only set to a non-None
819     # value when we are actually listening.
820     _realPortNumber = None
821
822     def __init__(self, port, factory, backlog=50, interface='', reactor=None):
823         """Initialize with a numeric port to listen on.
824         """
825         base.BasePort.__init__(self, reactor=reactor)
826         self.port = port
827         self.factory = factory
828         self.backlog = backlog
829         self.interface = interface
830
831     def __repr__(self):
832         if self._realPortNumber is not None:
833             return "<%s of %s on %s>" % (self.__class__, self.factory.__class__,
834                                          self._realPortNumber)
835         else:
836             return "<%s of %s (not listening)>" % (self.__class__, self.factory.__class__)
837
838     def createInternetSocket(self):
839         s = base.BasePort.createInternetSocket(self)
840         if platformType == "posix" and sys.platform != "cygwin":
841             s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
842         return s
843
844
845     def startListening(self):
846         """Create and bind my socket, and begin listening on it.
847
848         This is called on unserialization, and must be called after creating a
849         server to begin listening on the specified port.
850         """
851         try:
852             skt = self.createInternetSocket()
853             skt.bind((self.interface, self.port))
854         except socket.error, le:
855             raise CannotListenError, (self.interface, self.port, le)
856
857         # Make sure that if we listened on port 0, we update that to
858         # reflect what the OS actually assigned us.
859         self._realPortNumber = skt.getsockname()[1]
860
861         log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNumber))
862
863         # The order of the next 6 lines is kind of bizarre.  If no one
864         # can explain it, perhaps we should re-arrange them.
865         self.factory.doStart()
866         skt.listen(self.backlog)
867         self.connected = True
868         self.socket = skt
869         self.fileno = self.socket.fileno
870         self.numberAccepts = 100
871
872         self.startReading()
873
874
875     def _buildAddr(self, (host, port)):
876         return address._ServerFactoryIPv4Address('TCP', host, port)
877
878
879     def doRead(self):
880         """Called when my socket is ready for reading.
881
882         This accepts a connection and calls self.protocol() to handle the
883         wire-level protocol.
884         """
885         try:
886             if platformType == "posix":
887                 numAccepts = self.numberAccepts
888             else:
889                 # win32 event loop breaks if we do more than one accept()
890                 # in an iteration of the event loop.
891                 numAccepts = 1
892             for i in range(numAccepts):
893                 # we need this so we can deal with a factory's buildProtocol
894                 # calling our loseConnection
895                 if self.disconnecting:
896                     return
897                 try:
898                     skt, addr = self.socket.accept()
899                 except socket.error, e:
900                     if e.args[0] in (EWOULDBLOCK, EAGAIN):
901                         self.numberAccepts = i
902                         break
903                     elif e.args[0] == EPERM:
904                         # Netfilter on Linux may have rejected the
905                         # connection, but we get told to try to accept()
906                         # anyway.
907                         continue
908                     elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
909
910                         # Linux gives EMFILE when a process is not allowed
911                         # to allocate any more file descriptors.  *BSD and
912                         # Win32 give (WSA)ENOBUFS.  Linux can also give
913                         # ENFILE if the system is out of inodes, or ENOMEM
914                         # if there is insufficient memory to allocate a new
915                         # dentry.  ECONNABORTED is documented as possible on
916                         # both Linux and Windows, but it is not clear
917                         # whether there are actually any circumstances under
918                         # which it can happen (one might expect it to be
919                         # possible if a client sends a FIN or RST after the
920                         # server sends a SYN|ACK but before application code
921                         # calls accept(2), however at least on Linux this
922                         # _seems_ to be short-circuited by syncookies.
923
924                         log.msg("Could not accept new connection (%s)" % (
925                             errorcode[e.args[0]],))
926                         break
927                     raise
928
929                 fdesc._setCloseOnExec(skt.fileno())
930                 protocol = self.factory.buildProtocol(self._buildAddr(addr))
931                 if protocol is None:
932                     skt.close()
933                     continue
934                 s = self.sessionno
935                 self.sessionno = s+1
936                 transport = self.transport(skt, protocol, addr, self, s, self.reactor)
937                 transport = self._preMakeConnection(transport)
938                 protocol.makeConnection(transport)
939             else:
940                 self.numberAccepts = self.numberAccepts+20
941         except:
942             # Note that in TLS mode, this will possibly catch SSL.Errors
943             # raised by self.socket.accept()
944             #
945             # There is no "except SSL.Error:" above because SSL may be
946             # None if there is no SSL support.  In any case, all the
947             # "except SSL.Error:" suite would probably do is log.deferr()
948             # and return, so handling it here works just as well.
949             log.deferr()
950
951     def _preMakeConnection(self, transport):
952         return transport
953
954     def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
955         """
956         Stop accepting connections on this port.
957
958         This will shut down the socket and call self.connectionLost().  It
959         returns a deferred which will fire successfully when the port is
960         actually closed, or with a failure if an error occurs shutting down.
961         """
962         self.disconnecting = True
963         self.stopReading()
964         if self.connected:
965             self.deferred = deferLater(
966                 self.reactor, 0, self.connectionLost, connDone)
967             return self.deferred
968
969     stopListening = loseConnection
970
971
972     def connectionLost(self, reason):
973         """
974         Cleans up the socket.
975         """
976         log.msg('(Port %s Closed)' % self._realPortNumber)
977         self._realPortNumber = None
978
979         base.BasePort.connectionLost(self, reason)
980         self.connected = False
981         self._closeSocket()
982         del self.socket
983         del self.fileno
984
985         try:
986             self.factory.doStop()
987         finally:
988             self.disconnecting = False
989
990
991     def logPrefix(self):
992         """Returns the name of my class, to prefix log entries with.
993         """
994         return reflect.qual(self.factory.__class__)
995
996     def getHost(self):
997         """Returns an IPv4Address.
998
999         This indicates the server's address.
1000         """
1001         return address.IPv4Address('TCP', *(self.socket.getsockname() + ('INET',)))
1002
1003 class Connector(base.BaseConnector):
1004     def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
1005         self.host = host
1006         if isinstance(port, types.StringTypes):
1007             try:
1008                 port = socket.getservbyname(port, 'tcp')
1009             except socket.error, e:
1010                 raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
1011         self.port = port
1012         self.bindAddress = bindAddress
1013         base.BaseConnector.__init__(self, factory, timeout, reactor)
1014
1015     def _makeTransport(self):
1016         return Client(self.host, self.port, self.bindAddress, self, self.reactor)
1017
1018     def getDestination(self):
1019         return address.IPv4Address('TCP', self.host, self.port, 'INET')
Note: See TracBrowser for help on using the browser.