Ticket #5085: twisted-tcp6.patch

File twisted-tcp6.patch, 75.9 KB (added by chjurk, 3 years ago)
  • twisted/internet/address.py

    diff --git a/twisted/internet/address.py b/twisted/internet/address.py
    index b9192d4..ef3b63b 100644
    a b class IPv6Address(_IPAddress): 
    6060    """ 
    6161    Object representing an IPv6 socket endpoint. 
    6262    """ 
     63    def __init__(self, type, host, port, flow_info=0, scope_id=0): 
     64        self.flow_info = flow_info 
     65        self.scope_id = scope_id 
     66        _IPAddress.__init__(self, type, host, port) 
     67 
     68    def __hash__(self): 
     69        return hash((self.type, self.host, self.port, self.flow_info, self.scope_id)) 
    6370 
    6471 
    6572 
  • twisted/internet/tcp.py

    diff --git a/twisted/internet/base.py b/twisted/internet/base.py
    diff --git a/twisted/internet/tcp.py b/twisted/internet/tcp.py
    index 16a096c..682f03e 100644
    a b class BaseClient(_TLSClientMixin, Connection): 
    258258    A base class for client TCP (and similiar) sockets. 
    259259    """ 
    260260    _base = Connection 
     261    _addressType = address.IPv4Address 
    261262 
    262263    addressFamily = socket.AF_INET 
    263264    socketType = socket.SOCK_STREAM 
    class BaseClient(_TLSClientMixin, Connection): 
    270271            Connection.__init__(self, skt, None, reactor) 
    271272            self.doWrite = self.doConnect 
    272273            self.doRead = self.doConnect 
    273             reactor.callLater(0, whenDone) 
     274            self.doConnect() 
    274275        else: 
    275276            reactor.callLater(0, self.failIfNotConnected, error) 
    276277 
    class BaseClient(_TLSClientMixin, Connection): 
    312313        return s 
    313314 
    314315    def resolveAddress(self): 
    315         if abstract.isIPAddress(self.addr[0]): 
     316        if abstract.isIPAddress(self.addr[0]) or abstract.isIPv6Address(self.addr[0]): 
    316317            self._setRealAddress(self.addr[0]) 
    317318        else: 
    318319            d = self.reactor.resolve(self.addr[0]) 
    319320            d.addCallbacks(self._setRealAddress, self.failIfNotConnected) 
    320321 
    321     def _setRealAddress(self, address): 
    322         self.realAddress = (address, self.addr[1]) 
    323         self.doConnect() 
     322    def _setRealAddress(self, addr): 
     323        """ 
     324        Set the real IP address for this client. 
     325        Once the IP address is set, the socket is created using the correct 
     326        address family. 
     327        """ 
     328        if abstract.isIPv6Address(addr): 
     329            self.addressFamily = socket.AF_INET6 
     330            self._addressType = address.IPv6Address 
     331        self.realAddress = (addr, self.addr[1]) 
     332 
     333        # create the socket and wait finish init after that 
     334        self.initConnection() 
     335 
     336    def initConnection(self): 
     337        """ 
     338        Initialize connection by creating the appropriate socket. 
     339        """ 
     340        err = None 
     341        skt = None 
     342        result = True 
     343 
     344        try: 
     345            skt = self.createInternetSocket() 
     346        except socket.error, se: 
     347            err = error.ConnectBindError(se[0], se[1]) 
     348            result = None 
     349        if result and self.bindAddress is not None: 
     350            try: 
     351                skt.bind(self.bindAddress) 
     352            except socket.error, se: 
     353                err = error.ConnectBindError(se[0], se[1]) 
     354                result = None 
     355        self._finishInit(result, skt, err, self.reactor) 
    324356 
    325357    def doConnect(self): 
    326358        """I connect the socket. 
    class Client(BaseClient): 
    394426        # BaseClient.__init__ is invoked later 
    395427        self.connector = connector 
    396428        self.addr = (host, port) 
     429        self.bindAddress = bindAddress 
     430        self.reactor = reactor 
    397431 
    398         whenDone = self.resolveAddress 
    399         err = None 
    400         skt = None 
    401  
    402         try: 
    403             skt = self.createInternetSocket() 
    404         except socket.error, se: 
    405             err = error.ConnectBindError(se[0], se[1]) 
    406             whenDone = None 
    407         if whenDone and bindAddress is not None: 
    408             try: 
    409                 skt.bind(bindAddress) 
    410             except socket.error, se: 
    411                 err = error.ConnectBindError(se[0], se[1]) 
    412                 whenDone = None 
    413         self._finishInit(whenDone, skt, err, reactor) 
     432        # Do outstanding initialization when real address is resolved 
     433        self.resolveAddress() 
    414434 
    415435    def getHost(self): 
    416         """Returns an IPv4Address. 
     436        """ 
     437        Returns an L{IPv4Address} or L{IPv6Address}. 
    417438 
    418439        This indicates the address from which I am connecting. 
    419440        """ 
    420         return address.IPv4Address('TCP', *self.socket.getsockname()) 
     441        return self._addressType('TCP', *self.socket.getsockname()) 
    421442 
    422443    def getPeer(self): 
    423         """Returns an IPv4Address. 
     444        """ 
     445        Returns an L{IPv4Address} or L{IPv6Address}. 
    424446 
    425447        This indicates the address that I am connected to. 
    426448        """ 
    427         return address.IPv4Address('TCP', *self.realAddress) 
     449        return self._addressType('TCP', *self.realAddress) 
    428450 
    429451    def __repr__(self): 
    430452        s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self)) 
    class Port(base.BasePort, _SocketCloser): 
    739761 
    740762 
    741763class Connector(base.BaseConnector): 
     764    _addressType = address.IPv4Address 
     765 
    742766    def __init__(self, host, port, factory, timeout, bindAddress, reactor=None): 
    743         self.host = host 
    744767        if isinstance(port, types.StringTypes): 
    745768            try: 
    746769                port = socket.getservbyname(port, 'tcp') 
    747770            except socket.error, e: 
    748771                raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port)) 
    749         self.port = port 
    750         self.bindAddress = bindAddress 
     772 
     773        # do a host lookup to make sure we have the correct address family 
     774        try: 
     775            addressInfo = socket.getaddrinfo(host, port) 
     776        except socket.gaierror: 
     777            raise error.DNSLookupError(host) 
     778        else: 
     779            assert len(addressInfo) > 0 
     780 
     781            if addressInfo[0][0] == socket.AF_INET6: 
     782                self._addressType = address.IPv6Address 
     783 
     784            self.host, self.port = addressInfo[0][4][:2] 
     785            self.bindAddress = bindAddress 
    751786        base.BaseConnector.__init__(self, factory, timeout, reactor) 
    752787 
    753788    def _makeTransport(self): 
    754789        return Client(self.host, self.port, self.bindAddress, self, self.reactor) 
    755790 
    756791    def getDestination(self): 
    757         return address.IPv4Address('TCP', self.host, self.port) 
     792        return self._addressType('TCP', self.host, self.port) 
  • new file twisted/test/test_tcp6.py

    diff --git a/twisted/test/test_tcp6.py b/twisted/test/test_tcp6.py
    new file mode 100644
    index 0000000..4761d7d
    - +  
     1# Copyright (c) Twisted Matrix Laboratories. 
     2# See LICENSE for details. 
     3 
     4""" 
     5Tests for implementations of L{IReactorTCP} using IPv6. 
     6""" 
     7 
     8import socket, random, errno 
     9 
     10from zope.interface import implements 
     11 
     12from twisted.trial import unittest 
     13 
     14from twisted.python.log import msg 
     15from twisted.internet import protocol, reactor, defer, interfaces 
     16from twisted.internet import error 
     17from twisted.internet.address import IPv6Address 
     18from twisted.internet.interfaces import IHalfCloseableProtocol, IPullProducer 
     19from twisted.protocols import policies 
     20from twisted.test.proto_helpers import AccumulatingProtocol 
     21 
     22 
     23def loopUntil(predicate, interval=0): 
     24    """ 
     25    Poor excuse for an event notification helper.  This polls a condition and 
     26    calls back a Deferred when it is seen to be true. 
     27 
     28    Do not use this function. 
     29    """ 
     30    from twisted.internet import task 
     31    d = defer.Deferred() 
     32    def check(): 
     33        res = predicate() 
     34        if res: 
     35            d.callback(res) 
     36    call = task.LoopingCall(check) 
     37    def stop(result): 
     38        call.stop() 
     39        return result 
     40    d.addCallback(stop) 
     41    d2 = call.start(interval) 
     42    d2.addErrback(d.errback) 
     43    return d 
     44 
     45 
     46 
     47class ClosingProtocol(protocol.Protocol): 
     48 
     49    def connectionMade(self): 
     50        msg("ClosingProtocol.connectionMade") 
     51        self.transport.loseConnection() 
     52 
     53    def connectionLost(self, reason): 
     54        msg("ClosingProtocol.connectionLost") 
     55        reason.trap(error.ConnectionDone) 
     56 
     57 
     58 
     59class ClosingFactory(protocol.ServerFactory): 
     60    """ 
     61    Factory that closes port immediately. 
     62    """ 
     63 
     64    _cleanerUpper = None 
     65 
     66    def buildProtocol(self, conn): 
     67        self._cleanerUpper = self.port.stopListening() 
     68        return ClosingProtocol() 
     69 
     70 
     71    def cleanUp(self): 
     72        """ 
     73        Clean-up for tests to wait for the port to stop listening. 
     74        """ 
     75        if self._cleanerUpper is None: 
     76            return self.port.stopListening() 
     77        return self._cleanerUpper 
     78 
     79 
     80 
     81class MyProtocolFactoryMixin(object): 
     82    """ 
     83    Mixin for factories which create L{AccumulatingProtocol} instances. 
     84 
     85    @type protocolFactory: no-argument callable 
     86    @ivar protocolFactory: Factory for protocols - takes the place of the 
     87        typical C{protocol} attribute of factories (but that name is used by 
     88        this class for something else). 
     89 
     90    @type protocolConnectionMade: L{NoneType} or L{defer.Deferred} 
     91    @ivar protocolConnectionMade: When an instance of L{AccumulatingProtocol} 
     92        is connected, if this is not C{None}, the L{Deferred} will be called 
     93        back with the protocol instance and the attribute set to C{None}. 
     94 
     95    @type protocolConnectionLost: L{NoneType} or L{defer.Deferred} 
     96    @ivar protocolConnectionLost: When an instance of L{AccumulatingProtocol} 
     97        is created, this will be set as its C{closedDeferred} attribute and 
     98        then this attribute will be set to C{None} so the L{defer.Deferred} is 
     99        not used by more than one protocol. 
     100 
     101    @ivar protocol: The most recently created L{AccumulatingProtocol} instance 
     102        which was returned from C{buildProtocol}. 
     103 
     104    @type called: C{int} 
     105    @ivar called: A counter which is incremented each time C{buildProtocol} 
     106        is called. 
     107 
     108    @ivar peerAddresses: A C{list} of the addresses passed to C{buildProtocol}. 
     109    """ 
     110    protocolFactory = AccumulatingProtocol 
     111 
     112    protocolConnectionMade = None 
     113    protocolConnectionLost = None 
     114    protocol = None 
     115    called = 0 
     116 
     117    def __init__(self): 
     118        self.peerAddresses = [] 
     119 
     120 
     121    def buildProtocol(self, addr): 
     122        """ 
     123        Create a L{AccumulatingProtocol} and set it up to be able to perform 
     124        callbacks. 
     125        """ 
     126        self.peerAddresses.append(addr) 
     127        self.called += 1 
     128        p = self.protocolFactory() 
     129        p.factory = self 
     130        p.closedDeferred = self.protocolConnectionLost 
     131        self.protocolConnectionLost = None 
     132        self.protocol = p 
     133        return p 
     134 
     135 
     136 
     137class MyServerFactory(MyProtocolFactoryMixin, protocol.ServerFactory): 
     138    """ 
     139    Server factory which creates L{AccumulatingProtocol} instances. 
     140    """ 
     141 
     142 
     143 
     144class MyClientFactory(MyProtocolFactoryMixin, protocol.ClientFactory): 
     145    """ 
     146    Client factory which creates L{AccumulatingProtocol} instances. 
     147    """ 
     148    failed = 0 
     149    stopped = 0 
     150 
     151    def __init__(self): 
     152        MyProtocolFactoryMixin.__init__(self) 
     153        self.deferred = defer.Deferred() 
     154        self.failDeferred = defer.Deferred() 
     155 
     156    def clientConnectionFailed(self, connector, reason): 
     157        self.failed = 1 
     158        self.reason = reason 
     159        self.failDeferred.callback(None) 
     160 
     161    def clientConnectionLost(self, connector, reason): 
     162        self.lostReason = reason 
     163        self.deferred.callback(None) 
     164 
     165    def stopFactory(self): 
     166        self.stopped = 1 
     167 
     168 
     169 
     170class ListeningTestCase(unittest.TestCase): 
     171 
     172    def test_listen(self): 
     173        """ 
     174        L{IReactorTCP.listenTCP} returns an object which provides 
     175        L{IListeningPort}. 
     176        """ 
     177        f = MyServerFactory() 
     178        p1 = reactor.listenTCP(0, f, interface="::1") 
     179        self.addCleanup(p1.stopListening) 
     180        self.failUnless(interfaces.IListeningPort.providedBy(p1)) 
     181 
     182 
     183    def testStopListening(self): 
     184        """ 
     185        The L{IListeningPort} returned by L{IReactorTCP.listenTCP} can be 
     186        stopped with its C{stopListening} method.  After the L{Deferred} it 
     187        (optionally) returns has been called back, the port number can be bound 
     188        to a new server. 
     189        """ 
     190        f = MyServerFactory() 
     191        port = reactor.listenTCP(0, f, interface="::1") 
     192        n = port.getHost().port 
     193 
     194        def cbStopListening(ignored): 
     195            # Make sure we can rebind the port right away 
     196            port = reactor.listenTCP(n, f, interface="::1") 
     197            return port.stopListening() 
     198 
     199        d = defer.maybeDeferred(port.stopListening) 
     200        d.addCallback(cbStopListening) 
     201        return d 
     202 
     203 
     204    def testNumberedInterface(self): 
     205        f = MyServerFactory() 
     206        # listen only on the loopback interface 
     207        p1 = reactor.listenTCP(0, f, interface='::1') 
     208        return p1.stopListening() 
     209 
     210    def testPortRepr(self): 
     211        f = MyServerFactory() 
     212        p = reactor.listenTCP(0, f) 
     213        portNo = str(p.getHost().port) 
     214        self.failIf(repr(p).find(portNo) == -1) 
     215        def stoppedListening(ign): 
     216            self.failIf(repr(p).find(portNo) != -1) 
     217        d = defer.maybeDeferred(p.stopListening) 
     218        return d.addCallback(stoppedListening) 
     219 
     220 
     221    def test_serverRepr(self): 
     222        """ 
     223        Check that the repr string of the server transport get the good port 
     224        number if the server listens on 0. 
     225        """ 
     226        server = MyServerFactory() 
     227        serverConnMade = server.protocolConnectionMade = defer.Deferred() 
     228        port = reactor.listenTCP(0, server, 50, interface="::1") 
     229        self.addCleanup(port.stopListening) 
     230 
     231        client = MyClientFactory() 
     232        clientConnMade = client.protocolConnectionMade = defer.Deferred() 
     233        connector = reactor.connectTCP("::1", 
     234                                       port.getHost().port, client) 
     235        self.addCleanup(connector.disconnect) 
     236        def check((serverProto, clientProto)): 
     237            portNumber = port.getHost().port 
     238            self.assertEquals( 
     239                repr(serverProto.transport), 
     240                "<AccumulatingProtocol #0 on %s>" % (portNumber,)) 
     241            serverProto.transport.loseConnection() 
     242            clientProto.transport.loseConnection() 
     243 
     244        return defer.gatherResults([serverConnMade, clientConnMade] 
     245            ).addCallback(check) 
     246 
     247 
     248    def test_restartListening(self): 
     249        """ 
     250        Stop and then try to restart a L{tcp.Port}: after a restart, the 
     251        server should be able to handle client connections. 
     252        """ 
     253        serverFactory = MyServerFactory() 
     254        port = reactor.listenTCP(0, serverFactory, interface="::1") 
     255        self.addCleanup(port.stopListening) 
     256 
     257        def cbStopListening(ignored): 
     258            port.startListening() 
     259 
     260            client = MyClientFactory() 
     261            serverFactory.protocolConnectionMade = defer.Deferred() 
     262            client.protocolConnectionMade = defer.Deferred() 
     263            connector = reactor.connectTCP("::1", 
     264                                           port.getHost().port, client) 
     265            self.addCleanup(connector.disconnect) 
     266            return defer.gatherResults([serverFactory.protocolConnectionMade, 
     267                                        client.protocolConnectionMade] 
     268                ).addCallback(close) 
     269 
     270        def close((serverProto, clientProto)): 
     271            clientProto.transport.loseConnection() 
     272            serverProto.transport.loseConnection() 
     273 
     274        d = defer.maybeDeferred(port.stopListening) 
     275        d.addCallback(cbStopListening) 
     276        return d 
     277 
     278 
     279    def test_exceptInStop(self): 
     280        """ 
     281        If the server factory raises an exception in C{stopFactory}, the 
     282        deferred returned by L{tcp.Port.stopListening} should fail with the 
     283        corresponding error. 
     284        """ 
     285        serverFactory = MyServerFactory() 
     286        def raiseException(): 
     287            raise RuntimeError("An error") 
     288        serverFactory.stopFactory = raiseException 
     289        port = reactor.listenTCP(0, serverFactory, interface="::1") 
     290 
     291        return self.assertFailure(port.stopListening(), RuntimeError) 
     292 
     293 
     294    def test_restartAfterExcept(self): 
     295        """ 
     296        Even if the server factory raise an exception in C{stopFactory}, the 
     297        corresponding C{tcp.Port} instance should be in a sane state and can 
     298        be restarted. 
     299        """ 
     300        serverFactory = MyServerFactory() 
     301        def raiseException(): 
     302            raise RuntimeError("An error") 
     303        serverFactory.stopFactory = raiseException 
     304        port = reactor.listenTCP(0, serverFactory, interface="::1") 
     305        self.addCleanup(port.stopListening) 
     306 
     307        def cbStopListening(ignored): 
     308            del serverFactory.stopFactory 
     309            port.startListening() 
     310 
     311            client = MyClientFactory() 
     312            serverFactory.protocolConnectionMade = defer.Deferred() 
     313            client.protocolConnectionMade = defer.Deferred() 
     314            connector = reactor.connectTCP("::1", 
     315                                           port.getHost().port, client) 
     316            self.addCleanup(connector.disconnect) 
     317            return defer.gatherResults([serverFactory.protocolConnectionMade, 
     318                                        client.protocolConnectionMade] 
     319                ).addCallback(close) 
     320 
     321        def close((serverProto, clientProto)): 
     322            clientProto.transport.loseConnection() 
     323            serverProto.transport.loseConnection() 
     324 
     325        return self.assertFailure(port.stopListening(), RuntimeError 
     326            ).addCallback(cbStopListening) 
     327 
     328 
     329    def test_directConnectionLostCall(self): 
     330        """ 
     331        If C{connectionLost} is called directly on a port object, it succeeds 
     332        (and doesn't expect the presence of a C{deferred} attribute). 
     333 
     334        C{connectionLost} is called by L{reactor.disconnectAll} at shutdown. 
     335        """ 
     336        serverFactory = MyServerFactory() 
     337        port = reactor.listenTCP(0, serverFactory, interface="::1") 
     338        portNumber = port.getHost().port 
     339        port.connectionLost(None) 
     340 
     341        client = MyClientFactory() 
     342        serverFactory.protocolConnectionMade = defer.Deferred() 
     343        client.protocolConnectionMade = defer.Deferred() 
     344        reactor.connectTCP("::1", portNumber, client) 
     345        def check(ign): 
     346            client.reason.trap(error.ConnectionRefusedError) 
     347        return client.failDeferred.addCallback(check) 
     348 
     349 
     350    def test_exceptInConnectionLostCall(self): 
     351        """ 
     352        If C{connectionLost} is called directory on a port object and that the 
     353        server factory raises an exception in C{stopFactory}, the exception is 
     354        passed through to the caller. 
     355 
     356        C{connectionLost} is called by L{reactor.disconnectAll} at shutdown. 
     357        """ 
     358        serverFactory = MyServerFactory() 
     359        def raiseException(): 
     360            raise RuntimeError("An error") 
     361        serverFactory.stopFactory = raiseException 
     362        port = reactor.listenTCP(0, serverFactory, interface="::1") 
     363        self.assertRaises(RuntimeError, port.connectionLost, None) 
     364 
     365 
     366 
     367def callWithSpew(f): 
     368    from twisted.python.util import spewerWithLinenums as spewer 
     369    import sys 
     370    sys.settrace(spewer) 
     371    try: 
     372        f() 
     373    finally: 
     374        sys.settrace(None) 
     375 
     376class LoopbackTestCase(unittest.TestCase): 
     377    """ 
     378    Test loopback connections. 
     379    """ 
     380    def test_closePortInProtocolFactory(self): 
     381        """ 
     382        A port created with L{IReactorTCP.listenTCP} can be connected to with 
     383        L{IReactorTCP.connectTCP}. 
     384        """ 
     385        f = ClosingFactory() 
     386        port = reactor.listenTCP(0, f, interface="::1") 
     387        f.port = port 
     388        self.addCleanup(f.cleanUp) 
     389        portNumber = port.getHost().port 
     390        clientF = MyClientFactory() 
     391        reactor.connectTCP("::1", portNumber, clientF) 
     392        def check(x): 
     393            self.assertTrue(clientF.protocol.made) 
     394            self.assertTrue(port.disconnected) 
     395            clientF.lostReason.trap(error.ConnectionDone) 
     396        return clientF.deferred.addCallback(check) 
     397 
     398    def _trapCnxDone(self, obj): 
     399        getattr(obj, 'trap', lambda x: None)(error.ConnectionDone) 
     400 
     401 
     402    def _connectedClientAndServerTest(self, callback): 
     403        """ 
     404        Invoke the given callback with a client protocol and a server protocol 
     405        which have been connected to each other. 
     406        """ 
     407        serverFactory = MyServerFactory() 
     408        serverConnMade = defer.Deferred() 
     409        serverFactory.protocolConnectionMade = serverConnMade 
     410        port = reactor.listenTCP(0, serverFactory, interface="::1") 
     411        self.addCleanup(port.stopListening) 
     412 
     413        portNumber = port.getHost().port 
     414        clientF = MyClientFactory() 
     415        clientConnMade = defer.Deferred() 
     416        clientF.protocolConnectionMade = clientConnMade 
     417        reactor.connectTCP("::1", portNumber, clientF) 
     418 
     419        connsMade = defer.gatherResults([serverConnMade, clientConnMade]) 
     420        def connected((serverProtocol, clientProtocol)): 
     421            callback(serverProtocol, clientProtocol) 
     422            serverProtocol.transport.loseConnection() 
     423            clientProtocol.transport.loseConnection() 
     424        connsMade.addCallback(connected) 
     425        return connsMade 
     426 
     427 
     428    def test_tcpNoDelay(self): 
     429        """ 
     430        The transport of a protocol connected with L{IReactorTCP.connectTCP} or 
     431        L{IReactor.TCP.listenTCP} can have its I{TCP_NODELAY} state inspected 
     432        and manipulated with L{ITCPTransport.getTcpNoDelay} and 
     433        L{ITCPTransport.setTcpNoDelay}. 
     434        """ 
     435        def check(serverProtocol, clientProtocol): 
     436            for p in [serverProtocol, clientProtocol]: 
     437                transport = p.transport 
     438                self.assertEquals(transport.getTcpNoDelay(), 0) 
     439                transport.setTcpNoDelay(1) 
     440                self.assertEquals(transport.getTcpNoDelay(), 1) 
     441                transport.setTcpNoDelay(0) 
     442                self.assertEquals(transport.getTcpNoDelay(), 0) 
     443        return self._connectedClientAndServerTest(check) 
     444 
     445 
     446    def test_tcpKeepAlive(self): 
     447        """ 
     448        The transport of a protocol connected with L{IReactorTCP.connectTCP} or 
     449        L{IReactor.TCP.listenTCP} can have its I{SO_KEEPALIVE} state inspected 
     450        and manipulated with L{ITCPTransport.getTcpKeepAlive} and 
     451        L{ITCPTransport.setTcpKeepAlive}. 
     452        """ 
     453        def check(serverProtocol, clientProtocol): 
     454            for p in [serverProtocol, clientProtocol]: 
     455                transport = p.transport 
     456                self.assertEquals(transport.getTcpKeepAlive(), 0) 
     457                transport.setTcpKeepAlive(1) 
     458                self.assertEquals(transport.getTcpKeepAlive(), 1) 
     459                transport.setTcpKeepAlive(0) 
     460                self.assertEquals(transport.getTcpKeepAlive(), 0) 
     461        return self._connectedClientAndServerTest(check) 
     462 
     463 
     464    def testFailing(self): 
     465        clientF = MyClientFactory() 
     466        # XXX we assume no one is listening on TCP port 69 
     467        reactor.connectTCP("::1", 69, clientF, timeout=5) 
     468        def check(ignored): 
     469            clientF.reason.trap(error.ConnectionRefusedError) 
     470        return clientF.failDeferred.addCallback(check) 
     471 
     472 
     473    def test_connectionRefusedErrorNumber(self): 
     474        """ 
     475        Assert that the error number of the ConnectionRefusedError is 
     476        ECONNREFUSED, and not some other socket related error. 
     477        """ 
     478 
     479        # Bind a number of ports in the operating system.  We will attempt 
     480        # to connect to these in turn immediately after closing them, in the 
     481        # hopes that no one else has bound them in the mean time.  Any 
     482        # connection which succeeds is ignored and causes us to move on to 
     483        # the next port.  As soon as a connection attempt fails, we move on 
     484        # to making an assertion about how it failed.  If they all succeed, 
     485        # the test will fail. 
     486 
     487        # It would be nice to have a simpler, reliable way to cause a 
     488        # connection failure from the platform. 
     489        # 
     490        # On Linux (2.6.15), connecting to port 0 always fails.  FreeBSD 
     491        # (5.4) rejects the connection attempt with EADDRNOTAVAIL. 
     492        # 
     493        # On FreeBSD (5.4), listening on a port and then repeatedly 
     494        # connecting to it without ever accepting any connections eventually 
     495        # leads to an ECONNREFUSED.  On Linux (2.6.15), a seemingly 
     496        # unbounded number of connections succeed. 
     497 
     498        serverSockets = [] 
     499        for i in xrange(10): 
     500            serverSocket = socket.socket(socket.AF_INET6) 
     501            serverSocket.bind(('::1', 0)) 
     502            serverSocket.listen(1) 
     503            serverSockets.append(serverSocket) 
     504        random.shuffle(serverSockets) 
     505 
     506        clientCreator = protocol.ClientCreator(reactor, protocol.Protocol) 
     507 
     508        def tryConnectFailure(): 
     509            def connected(proto): 
     510                """ 
     511                Darn.  Kill it and try again, if there are any tries left. 
     512                """ 
     513                proto.transport.loseConnection() 
     514                if serverSockets: 
     515                    return tryConnectFailure() 
     516                self.fail("Could not fail to connect - could not test errno for that case.") 
     517 
     518            serverSocket = serverSockets.pop() 
     519            serverHost, serverPort = serverSocket.getsockname()[:2] 
     520            serverSocket.close() 
     521 
     522            connectDeferred = clientCreator.connectTCP(serverHost, serverPort) 
     523            connectDeferred.addCallback(connected) 
     524            return connectDeferred 
     525 
     526        refusedDeferred = tryConnectFailure() 
     527        self.assertFailure(refusedDeferred, error.ConnectionRefusedError) 
     528        def connRefused(exc): 
     529            self.assertEqual(exc.osError, errno.ECONNREFUSED) 
     530        refusedDeferred.addCallback(connRefused) 
     531        def cleanup(passthrough): 
     532            while serverSockets: 
     533                serverSockets.pop().close() 
     534            return passthrough 
     535        refusedDeferred.addBoth(cleanup) 
     536        return refusedDeferred 
     537 
     538 
     539    def test_connectByServiceFail(self): 
     540        """ 
     541        Connecting to a named service which does not exist raises 
     542        L{error.ServiceNameUnknownError}. 
     543        """ 
     544        self.assertRaises( 
     545            error.ServiceNameUnknownError, 
     546            reactor.connectTCP, 
     547            "::1", "thisbetternotexist", MyClientFactory()) 
     548 
     549 
     550    def test_connectByService(self): 
     551        """ 
     552        L{IReactorTCP.connectTCP} accepts the name of a service instead of a 
     553        port number and connects to the port number associated with that 
     554        service, as defined by L{socket.getservbyname}. 
     555        """ 
     556        serverFactory = MyServerFactory() 
     557        serverConnMade = defer.Deferred() 
     558        serverFactory.protocolConnectionMade = serverConnMade 
     559        port = reactor.listenTCP(0, serverFactory, interface="::1") 
     560        self.addCleanup(port.stopListening) 
     561        portNumber = port.getHost().port 
     562        clientFactory = MyClientFactory() 
     563        clientConnMade = defer.Deferred() 
     564        clientFactory.protocolConnectionMade = clientConnMade 
     565 
     566        def fakeGetServicePortByName(serviceName, protocolName): 
     567            if serviceName == 'http' and protocolName == 'tcp': 
     568                return portNumber 
     569            return 10 
     570        self.patch(socket, 'getservbyname', fakeGetServicePortByName) 
     571 
     572        reactor.connectTCP('::1', 'http', clientFactory) 
     573 
     574        connMade = defer.gatherResults([serverConnMade, clientConnMade]) 
     575        def connected((serverProtocol, clientProtocol)): 
     576            self.assertTrue( 
     577                serverFactory.called, 
     578                "Server factory was not called upon to build a protocol.") 
     579            serverProtocol.transport.loseConnection() 
     580            clientProtocol.transport.loseConnection() 
     581        connMade.addCallback(connected) 
     582        return connMade 
     583 
     584 
     585class StartStopFactory(protocol.Factory): 
     586 
     587    started = 0 
     588    stopped = 0 
     589 
     590    def startFactory(self): 
     591        if self.started or self.stopped: 
     592            raise RuntimeError 
     593        self.started = 1 
     594 
     595    def stopFactory(self): 
     596        if not self.started or self.stopped: 
     597            raise RuntimeError 
     598        self.stopped = 1 
     599 
     600 
     601class ClientStartStopFactory(MyClientFactory): 
     602 
     603    started = 0 
     604    stopped = 0 
     605 
     606    def startFactory(self): 
     607        if self.started or self.stopped: 
     608            raise RuntimeError 
     609        self.started = 1 
     610 
     611    def stopFactory(self): 
     612        if not self.started or self.stopped: 
     613            raise RuntimeError 
     614        self.stopped = 1 
     615 
     616 
     617class FactoryTestCase(unittest.TestCase): 
     618    """Tests for factories.""" 
     619 
     620    def test_serverStartStop(self): 
     621        """ 
     622        The factory passed to L{IReactorTCP.listenTCP} should be started only 
     623        when it transitions from being used on no ports to being used on one 
     624        port and should be stopped only when it transitions from being used on 
     625        one port to being used on no ports. 
     626        """ 
     627        # Note - this test doesn't need to use listenTCP.  It is exercising 
     628        # logic implemented in Factory.doStart and Factory.doStop, so it could 
     629        # just call that directly.  Some other test can make sure that 
     630        # listenTCP and stopListening correctly call doStart and 
     631        # doStop. -exarkun 
     632 
     633        f = StartStopFactory() 
     634 
     635        # listen on port 
     636        p1 = reactor.listenTCP(0, f, interface='::1') 
     637        self.addCleanup(p1.stopListening) 
     638 
     639        self.assertEqual((f.started, f.stopped), (1, 0)) 
     640 
     641        # listen on two more ports 
     642        p2 = reactor.listenTCP(0, f, interface='::1') 
     643        p3 = reactor.listenTCP(0, f, interface='::1') 
     644 
     645        self.assertEqual((f.started, f.stopped), (1, 0)) 
     646 
     647        # close two ports 
     648        d1 = defer.maybeDeferred(p1.stopListening) 
     649        d2 = defer.maybeDeferred(p2.stopListening) 
     650        closedDeferred = defer.gatherResults([d1, d2]) 
     651        def cbClosed(ignored): 
     652            self.assertEqual((f.started, f.stopped), (1, 0)) 
     653            # Close the last port 
     654            return p3.stopListening() 
     655        closedDeferred.addCallback(cbClosed) 
     656 
     657        def cbClosedAll(ignored): 
     658            self.assertEquals((f.started, f.stopped), (1, 1)) 
     659        closedDeferred.addCallback(cbClosedAll) 
     660        return closedDeferred 
     661 
     662 
     663    def test_clientStartStop(self): 
     664        """ 
     665        The factory passed to L{IReactorTCP.connectTCP} should be started when 
     666        the connection attempt starts and stopped when it is over. 
     667        """ 
     668        f = ClosingFactory() 
     669        p = reactor.listenTCP(0, f, interface="::1") 
     670        f.port = p 
     671        self.addCleanup(f.cleanUp) 
     672        portNumber = p.getHost().port 
     673 
     674        factory = ClientStartStopFactory() 
     675        reactor.connectTCP("::1", portNumber, factory) 
     676        self.assertTrue(factory.started) 
     677        return loopUntil(lambda: factory.stopped) 
     678 
     679 
     680 
     681class ConnectorTestCase(unittest.TestCase): 
     682 
     683    def test_connectorIdentity(self): 
     684        """ 
     685        L{IReactorTCP.connectTCP} returns an object which provides 
     686        L{IConnector}.  The destination of the connector is the address which 
     687        was passed to C{connectTCP}.  The same connector object is passed to 
     688        the factory's C{startedConnecting} method as to the factory's 
     689        C{clientConnectionLost} method. 
     690        """ 
     691        serverFactory = ClosingFactory() 
     692        tcpPort = reactor.listenTCP(0, serverFactory, interface="::1") 
     693        serverFactory.port = tcpPort 
     694        self.addCleanup(serverFactory.cleanUp) 
     695        portNumber = tcpPort.getHost().port 
     696 
     697        seenConnectors = [] 
     698        seenFailures = [] 
     699 
     700        clientFactory = ClientStartStopFactory() 
     701        clientFactory.clientConnectionLost = ( 
     702            lambda connector, reason: (seenConnectors.append(connector), 
     703                                       seenFailures.append(reason))) 
     704        clientFactory.startedConnecting = seenConnectors.append 
     705 
     706        connector = reactor.connectTCP("::1", portNumber, clientFactory) 
     707        self.assertTrue(interfaces.IConnector.providedBy(connector)) 
     708        dest = connector.getDestination() 
     709        self.assertEquals(dest.type, "TCP") 
     710        self.assertEquals(dest.host, "::1") 
     711        self.assertEquals(dest.port, portNumber) 
     712 
     713        d = loopUntil(lambda: clientFactory.stopped) 
     714        def clientFactoryStopped(ignored): 
     715            seenFailures[0].trap(error.ConnectionDone) 
     716            self.assertEqual(seenConnectors, [connector, connector]) 
     717        d.addCallback(clientFactoryStopped) 
     718        return d 
     719 
     720 
     721    def test_userFail(self): 
     722        """ 
     723        Calling L{IConnector.stopConnecting} in C{Factory.startedConnecting} 
     724        results in C{Factory.clientConnectionFailed} being called with 
     725        L{error.UserError} as the reason. 
     726        """ 
     727        serverFactory = MyServerFactory() 
     728        tcpPort = reactor.listenTCP(0, serverFactory, interface="::1") 
     729        self.addCleanup(tcpPort.stopListening) 
     730        portNumber = tcpPort.getHost().port 
     731 
     732        def startedConnecting(connector): 
     733            connector.stopConnecting() 
     734 
     735        clientFactory = ClientStartStopFactory() 
     736        clientFactory.startedConnecting = startedConnecting 
     737        reactor.connectTCP("::1", portNumber, clientFactory) 
     738 
     739        d = loopUntil(lambda: clientFactory.stopped) 
     740        def check(ignored): 
     741            self.assertEquals(clientFactory.failed, 1) 
     742            clientFactory.reason.trap(error.UserError) 
     743        return d.addCallback(check) 
     744 
     745 
     746    def test_reconnect(self): 
     747        """ 
     748        Calling L{IConnector.connect} in C{Factory.clientConnectionLost} causes 
     749        a new connection attempt to be made. 
     750        """ 
     751        serverFactory = ClosingFactory() 
     752        tcpPort = reactor.listenTCP(0, serverFactory, interface="::1") 
     753        serverFactory.port = tcpPort 
     754        self.addCleanup(serverFactory.cleanUp) 
     755        portNumber = tcpPort.getHost().port 
     756 
     757        clientFactory = MyClientFactory() 
     758 
     759        def clientConnectionLost(connector, reason): 
     760            connector.connect() 
     761        clientFactory.clientConnectionLost = clientConnectionLost 
     762        reactor.connectTCP("::1", portNumber, clientFactory) 
     763 
     764        d = loopUntil(lambda: clientFactory.failed) 
     765        def reconnectFailed(ignored): 
     766            p = clientFactory.protocol 
     767            self.assertEqual((p.made, p.closed), (1, 1)) 
     768            clientFactory.reason.trap(error.ConnectionRefusedError) 
     769            self.assertEqual(clientFactory.stopped, 1) 
     770        return d.addCallback(reconnectFailed) 
     771 
     772 
     773 
     774class CannotBindTestCase(unittest.TestCase): 
     775    """ 
     776    Tests for correct behavior when a reactor cannot bind to the required TCP 
     777    port. 
     778    """ 
     779 
     780    def test_cannotBind(self): 
     781        """ 
     782        L{IReactorTCP.listenTCP} raises L{error.CannotListenError} if the 
     783        address to listen on is already in use. 
     784        """ 
     785        f = MyServerFactory() 
     786 
     787        p1 = reactor.listenTCP(0, f, interface='::1') 
     788        self.addCleanup(p1.stopListening) 
     789        n = p1.getHost().port 
     790        dest = p1.getHost() 
     791        self.assertEquals(dest.type, "TCP") 
     792        self.assertEquals(dest.host, "::1") 
     793        self.assertEquals(dest.port, n) 
     794 
     795        # make sure new listen raises error 
     796        self.assertRaises(error.CannotListenError, 
     797                          reactor.listenTCP, n, f, interface='::1') 
     798 
     799 
     800 
     801    def _fireWhenDoneFunc(self, d, f): 
     802        """Returns closure that when called calls f and then callbacks d. 
     803        """ 
     804        from twisted.python import util as tputil 
     805        def newf(*args, **kw): 
     806            rtn = f(*args, **kw) 
     807            d.callback('') 
     808            return rtn 
     809        return tputil.mergeFunctionMetadata(f, newf) 
     810 
     811 
     812    def test_clientBind(self): 
     813        """ 
     814        L{IReactorTCP.connectTCP} calls C{Factory.clientConnectionFailed} with 
     815        L{error.ConnectBindError} if the bind address specified is already in 
     816        use. 
     817        """ 
     818        theDeferred = defer.Deferred() 
     819        sf = MyServerFactory() 
     820        sf.startFactory = self._fireWhenDoneFunc(theDeferred, sf.startFactory) 
     821        p = reactor.listenTCP(0, sf, interface="::1") 
     822        self.addCleanup(p.stopListening) 
     823 
     824        def _connect1(results): 
     825            d = defer.Deferred() 
     826            cf1 = MyClientFactory() 
     827            cf1.buildProtocol = self._fireWhenDoneFunc(d, cf1.buildProtocol) 
     828            reactor.connectTCP("::1", p.getHost().port, cf1, 
     829                               bindAddress=("::1", 0)) 
     830            d.addCallback(_conmade, cf1) 
     831            return d 
     832 
     833        def _conmade(results, cf1): 
     834            d = defer.Deferred() 
     835            cf1.protocol.connectionMade = self._fireWhenDoneFunc( 
     836                d, cf1.protocol.connectionMade) 
     837            d.addCallback(_check1connect2, cf1) 
     838            return d 
     839 
     840        def _check1connect2(results, cf1): 
     841            self.assertEquals(cf1.protocol.made, 1) 
     842 
     843            d1 = defer.Deferred() 
     844            d2 = defer.Deferred() 
     845            port = cf1.protocol.transport.getHost().port 
     846            cf2 = MyClientFactory() 
     847            cf2.clientConnectionFailed = self._fireWhenDoneFunc( 
     848                d1, cf2.clientConnectionFailed) 
     849            cf2.stopFactory = self._fireWhenDoneFunc(d2, cf2.stopFactory) 
     850            reactor.connectTCP("::1", p.getHost().port, cf2, 
     851                               bindAddress=("::1", port)) 
     852            d1.addCallback(_check2failed, cf1, cf2) 
     853            d2.addCallback(_check2stopped, cf1, cf2) 
     854            dl = defer.DeferredList([d1, d2]) 
     855            dl.addCallback(_stop, cf1, cf2) 
     856            return dl 
     857 
     858        def _check2failed(results, cf1, cf2): 
     859            self.assertEquals(cf2.failed, 1) 
     860            cf2.reason.trap(error.ConnectBindError) 
     861            self.assertTrue(cf2.reason.check(error.ConnectBindError)) 
     862            return results 
     863 
     864        def _check2stopped(results, cf1, cf2): 
     865            self.assertEquals(cf2.stopped, 1) 
     866            return results 
     867 
     868        def _stop(results, cf1, cf2): 
     869            d = defer.Deferred() 
     870            d.addCallback(_check1cleanup, cf1) 
     871            cf1.stopFactory = self._fireWhenDoneFunc(d, cf1.stopFactory) 
     872            cf1.protocol.transport.loseConnection() 
     873            return d 
     874 
     875        def _check1cleanup(results, cf1): 
     876            self.assertEquals(cf1.stopped, 1) 
     877 
     878        theDeferred.addCallback(_connect1) 
     879        return theDeferred 
     880 
     881 
     882 
     883class MyOtherClientFactory(protocol.ClientFactory): 
     884    def buildProtocol(self, address): 
     885        self.address = address 
     886        self.protocol = AccumulatingProtocol() 
     887        return self.protocol 
     888 
     889 
     890 
     891class LocalRemoteAddressTestCase(unittest.TestCase): 
     892    """ 
     893    Tests for correct getHost/getPeer values and that the correct address is 
     894    passed to buildProtocol. 
     895    """ 
     896    def test_hostAddress(self): 
     897        """ 
     898        L{IListeningPort.getHost} returns the same address as a client 
     899        connection's L{ITCPTransport.getPeer}. 
     900        """ 
     901        serverFactory = MyServerFactory() 
     902        serverFactory.protocolConnectionLost = defer.Deferred() 
     903        serverConnectionLost = serverFactory.protocolConnectionLost 
     904        port = reactor.listenTCP(0, serverFactory, interface='::1') 
     905        self.addCleanup(port.stopListening) 
     906        n = port.getHost().port 
     907 
     908        clientFactory = MyClientFactory() 
     909        onConnection = clientFactory.protocolConnectionMade = defer.Deferred() 
     910        connector = reactor.connectTCP('::1', n, clientFactory) 
     911 
     912        def check(ignored): 
     913            self.assertEquals([port.getHost()], clientFactory.peerAddresses) 
     914            self.assertEquals( 
     915                port.getHost(), clientFactory.protocol.transport.getPeer()) 
     916        onConnection.addCallback(check) 
     917 
     918        def cleanup(ignored): 
     919            # Clean up the client explicitly here so that tear down of 
     920            # the server side of the connection begins, then wait for 
     921            # the server side to actually disconnect. 
     922            connector.disconnect() 
     923            return serverConnectionLost 
     924        onConnection.addCallback(cleanup) 
     925 
     926        return onConnection 
     927 
     928 
     929 
     930class WriterProtocol(protocol.Protocol): 
     931    def connectionMade(self): 
     932        # use everything ITransport claims to provide. If something here 
     933        # fails, the exception will be written to the log, but it will not 
     934        # directly flunk the test. The test will fail when maximum number of 
     935        # iterations have passed and the writer's factory.done has not yet 
     936        # been set. 
     937        self.transport.write("Hello Cleveland!\n") 
     938        seq = ["Goodbye", " cruel", " world", "\n"] 
     939        self.transport.writeSequence(seq) 
     940        peer = self.transport.getPeer() 
     941        if peer.type != "TCP": 
     942            print "getPeer returned non-TCP socket:", peer 
     943            self.factory.problem = 1 
     944        us = self.transport.getHost() 
     945        if us.type != "TCP": 
     946            print "getHost returned non-TCP socket:", us 
     947            self.factory.problem = 1 
     948        self.factory.done = 1 
     949 
     950        self.transport.loseConnection() 
     951 
     952class ReaderProtocol(protocol.Protocol): 
     953    def dataReceived(self, data): 
     954        self.factory.data += data 
     955    def connectionLost(self, reason): 
     956        self.factory.done = 1 
     957 
     958class WriterClientFactory(protocol.ClientFactory): 
     959    def __init__(self): 
     960        self.done = 0 
     961        self.data = "" 
     962    def buildProtocol(self, addr): 
     963        p = ReaderProtocol() 
     964        p.factory = self 
     965        self.protocol = p 
     966        return p 
     967 
     968class WriteDataTestCase(unittest.TestCase): 
     969    """ 
     970    Test that connected TCP sockets can actually write data. Try to exercise 
     971    the entire ITransport interface. 
     972    """ 
     973 
     974    def test_writer(self): 
     975        """ 
     976        L{ITCPTransport.write} and L{ITCPTransport.writeSequence} send bytes to 
     977        the other end of the connection. 
     978        """ 
     979        f = protocol.Factory() 
     980        f.protocol = WriterProtocol 
     981        f.done = 0 
     982        f.problem = 0 
     983        wrappedF = WiredFactory(f) 
     984        p = reactor.listenTCP(0, wrappedF, interface="::1") 
     985        self.addCleanup(p.stopListening) 
     986        n = p.getHost().port 
     987        clientF = WriterClientFactory() 
     988        wrappedClientF = WiredFactory(clientF) 
     989        reactor.connectTCP("::1", n, wrappedClientF) 
     990 
     991        def check(ignored): 
     992            self.failUnless(f.done, "writer didn't finish, it probably died") 
     993            self.failUnless(f.problem == 0, "writer indicated an error") 
     994            self.failUnless(clientF.done, 
     995                            "client didn't see connection dropped") 
     996            expected = "".join(["Hello Cleveland!\n", 
     997                                "Goodbye", " cruel", " world", "\n"]) 
     998            self.failUnless(clientF.data == expected, 
     999                            "client didn't receive all the data it expected") 
     1000        d = defer.gatherResults([wrappedF.onDisconnect, 
     1001                                 wrappedClientF.onDisconnect]) 
     1002        return d.addCallback(check) 
     1003 
     1004 
     1005    def test_writeAfterShutdownWithoutReading(self): 
     1006        """ 
     1007        A TCP transport which is written to after the connection has been shut 
     1008        down should notify its protocol that the connection has been lost, even 
     1009        if the TCP transport is not actively being monitored for read events 
     1010        (ie, pauseProducing was called on it). 
     1011        """ 
     1012        # This is an unpleasant thing.  Generally tests shouldn't skip or 
     1013        # run based on the name of the reactor being used (most tests 
     1014        # shouldn't care _at all_ what reactor is being used, in fact).  The 
     1015        # Gtk reactor cannot pass this test, though, because it fails to 
     1016        # implement IReactorTCP entirely correctly.  Gtk is quite old at 
     1017        # this point, so it's more likely that gtkreactor will be deprecated 
     1018        # and removed rather than fixed to handle this case correctly. 
     1019        # Since this is a pre-existing (and very long-standing) issue with 
     1020        # the Gtk reactor, there's no reason for it to prevent this test 
     1021        # being added to exercise the other reactors, for which the behavior 
     1022        # was also untested but at least works correctly (now).  See #2833 
     1023        # for information on the status of gtkreactor. 
     1024        if reactor.__class__.__name__ == 'IOCPReactor': 
     1025            raise unittest.SkipTest( 
     1026                "iocpreactor does not, in fact, stop reading immediately after " 
     1027                "pauseProducing is called. This results in a bonus disconnection " 
     1028                "notification. Under some circumstances, it might be possible to " 
     1029                "not receive this notifications (specifically, pauseProducing, " 
     1030                "deliver some data, proceed with this test).") 
     1031        if reactor.__class__.__name__ == 'GtkReactor': 
     1032            raise unittest.SkipTest( 
     1033                "gtkreactor does not implement unclean disconnection " 
     1034                "notification correctly.  This might more properly be " 
     1035                "a todo, but due to technical limitations it cannot be.") 
     1036 
     1037        # Called back after the protocol for the client side of the connection 
     1038        # has paused its transport, preventing it from reading, therefore 
     1039        # preventing it from noticing the disconnection before the rest of the 
     1040        # actions which are necessary to trigger the case this test is for have 
     1041        # been taken. 
     1042        clientPaused = defer.Deferred() 
     1043 
     1044        # Called back when the protocol for the server side of the connection 
     1045        # has received connection lost notification. 
     1046        serverLost = defer.Deferred() 
     1047 
     1048        class Disconnecter(protocol.Protocol): 
     1049            """ 
     1050            Protocol for the server side of the connection which disconnects 
     1051            itself in a callback on clientPaused and publishes notification 
     1052            when its connection is actually lost. 
     1053            """ 
     1054            def connectionMade(self): 
     1055                """ 
     1056                Set up a callback on clientPaused to lose the connection. 
     1057                """ 
     1058                msg('Disconnector.connectionMade') 
     1059                def disconnect(ignored): 
     1060                    msg('Disconnector.connectionMade disconnect') 
     1061                    self.transport.loseConnection() 
     1062                    msg('loseConnection called') 
     1063                clientPaused.addCallback(disconnect) 
     1064 
     1065            def connectionLost(self, reason): 
     1066                """ 
     1067                Notify observers that the server side of the connection has 
     1068                ended. 
     1069                """ 
     1070                msg('Disconnecter.connectionLost') 
     1071                serverLost.callback(None) 
     1072                msg('serverLost called back') 
     1073 
     1074        # Create the server port to which a connection will be made. 
     1075        server = protocol.ServerFactory() 
     1076        server.protocol = Disconnecter 
     1077        port = reactor.listenTCP(0, server, interface='::1') 
     1078        self.addCleanup(port.stopListening) 
     1079        addr = port.getHost() 
     1080 
     1081        class Infinite(object): 
     1082            """ 
     1083            A producer which will write to its consumer as long as 
     1084            resumeProducing is called. 
     1085 
     1086            @ivar consumer: The L{IConsumer} which will be written to. 
     1087            """ 
     1088            implements(IPullProducer) 
     1089 
     1090            def __init__(self, consumer): 
     1091                self.consumer = consumer 
     1092 
     1093            def resumeProducing(self): 
     1094                msg('Infinite.resumeProducing') 
     1095                self.consumer.write('x') 
     1096                msg('Infinite.resumeProducing wrote to consumer') 
     1097 
     1098            def stopProducing(self): 
     1099                msg('Infinite.stopProducing') 
     1100 
     1101 
     1102        class UnreadingWriter(protocol.Protocol): 
     1103            """ 
     1104            Trivial protocol which pauses its transport immediately and then 
     1105            writes some bytes to it. 
     1106            """ 
     1107            def connectionMade(self): 
     1108                msg('UnreadingWriter.connectionMade') 
     1109                self.transport.pauseProducing() 
     1110                clientPaused.callback(None) 
     1111                msg('clientPaused called back') 
     1112                def write(ignored): 
     1113                    msg('UnreadingWriter.connectionMade write') 
     1114                    # This needs to be enough bytes to spill over into the 
     1115                    # userspace Twisted send buffer - if it all fits into 
     1116                    # the kernel, Twisted won't even poll for OUT events, 
     1117                    # which means it won't poll for any events at all, so 
     1118                    # the disconnection is never noticed.  This is due to 
     1119                    # #1662.  When #1662 is fixed, this test will likely 
     1120                    # need to be adjusted, otherwise connection lost 
     1121                    # notification will happen too soon and the test will 
     1122                    # probably begin to fail with ConnectionDone instead of 
     1123                    # ConnectionLost (in any case, it will no longer be 
     1124                    # entirely correct). 
     1125                    producer = Infinite(self.transport) 
     1126                    msg('UnreadingWriter.connectionMade write created producer') 
     1127                    self.transport.registerProducer(producer, False) 
     1128                    msg('UnreadingWriter.connectionMade write registered producer') 
     1129                serverLost.addCallback(write) 
     1130 
     1131        # Create the client and initiate the connection 
     1132        client = MyClientFactory() 
     1133        client.protocolFactory = UnreadingWriter 
     1134        clientConnectionLost = client.deferred 
     1135        def cbClientLost(ignored): 
     1136            msg('cbClientLost') 
     1137            return client.lostReason 
     1138        clientConnectionLost.addCallback(cbClientLost) 
     1139        msg('Connecting to %s:%s' % (addr.host, addr.port)) 
     1140        reactor.connectTCP(addr.host, addr.port, client) 
     1141 
     1142        # By the end of the test, the client should have received notification 
     1143        # of unclean disconnection. 
     1144        msg('Returning Deferred') 
     1145        return self.assertFailure(clientConnectionLost, error.ConnectionLost) 
     1146 
     1147 
     1148 
     1149class ConnectionLosingProtocol(protocol.Protocol): 
     1150    def connectionMade(self): 
     1151        self.transport.write("1") 
     1152        self.transport.loseConnection() 
     1153        self.master._connectionMade() 
     1154        self.master.ports.append(self.transport) 
     1155 
     1156 
     1157 
     1158class NoopProtocol(protocol.Protocol): 
     1159    def connectionMade(self): 
     1160        self.d = defer.Deferred() 
     1161        self.master.serverConns.append(self.d) 
     1162 
     1163    def connectionLost(self, reason): 
     1164        self.d.callback(True) 
     1165 
     1166 
     1167 
     1168class ConnectionLostNotifyingProtocol(protocol.Protocol): 
     1169    """ 
     1170    Protocol which fires a Deferred which was previously passed to 
     1171    its initializer when the connection is lost. 
     1172 
     1173    @ivar onConnectionLost: The L{Deferred} which will be fired in 
     1174        C{connectionLost}. 
     1175 
     1176    @ivar lostConnectionReason: C{None} until the connection is lost, then a 
     1177        reference to the reason passed to C{connectionLost}. 
     1178    """ 
     1179    def __init__(self, onConnectionLost): 
     1180        self.lostConnectionReason = None 
     1181        self.onConnectionLost = onConnectionLost 
     1182 
     1183 
     1184    def connectionLost(self, reason): 
     1185        self.lostConnectionReason = reason 
     1186        self.onConnectionLost.callback(self) 
     1187 
     1188 
     1189 
     1190class HandleSavingProtocol(ConnectionLostNotifyingProtocol): 
     1191    """ 
     1192    Protocol which grabs the platform-specific socket handle and 
     1193    saves it as an attribute on itself when the connection is 
     1194    established. 
     1195    """ 
     1196    def makeConnection(self, transport): 
     1197        """ 
     1198        Save the platform-specific socket handle for future 
     1199        introspection. 
     1200        """ 
     1201        self.handle = transport.getHandle() 
     1202        return protocol.Protocol.makeConnection(self, transport) 
     1203 
     1204 
     1205 
     1206class ProperlyCloseFilesMixin: 
     1207    """ 
     1208    Tests for platform resources properly being cleaned up. 
     1209    """ 
     1210    def createServer(self, address, portNumber, factory): 
     1211        """ 
     1212        Bind a server port to which connections will be made.  The server 
     1213        should use the given protocol factory. 
     1214 
     1215        @return: The L{IListeningPort} for the server created. 
     1216        """ 
     1217        raise NotImplementedError() 
     1218 
     1219 
     1220    def connectClient(self, address, portNumber, clientCreator): 
     1221        """ 
     1222        Establish a connection to the given address using the given 
     1223        L{ClientCreator} instance. 
     1224 
     1225        @return: A Deferred which will fire with the connected protocol instance. 
     1226        """ 
     1227        raise NotImplementedError() 
     1228 
     1229 
     1230    def getHandleExceptionType(self): 
     1231        """ 
     1232        Return the exception class which will be raised when an operation is 
     1233        attempted on a closed platform handle. 
     1234        """ 
     1235        raise NotImplementedError() 
     1236 
     1237 
     1238    def getHandleErrorCode(self): 
     1239        """ 
     1240        Return the errno expected to result from writing to a closed 
     1241        platform socket handle. 
     1242        """ 
     1243        # These platforms have been seen to give EBADF: 
     1244        # 
     1245        #  Linux 2.4.26, Linux 2.6.15, OS X 10.4, FreeBSD 5.4 
     1246        #  Windows 2000 SP 4, Windows XP SP 2 
     1247        return errno.EBADF 
     1248 
     1249 
     1250    def test_properlyCloseFiles(self): 
     1251        """ 
     1252        Test that lost connections properly have their underlying socket 
     1253        resources cleaned up. 
     1254        """ 
     1255        onServerConnectionLost = defer.Deferred() 
     1256        serverFactory = protocol.ServerFactory() 
     1257        serverFactory.protocol = lambda: ConnectionLostNotifyingProtocol( 
     1258            onServerConnectionLost) 
     1259        serverPort = self.createServer('::1', 0, serverFactory) 
     1260 
     1261        onClientConnectionLost = defer.Deferred() 
     1262        serverAddr = serverPort.getHost() 
     1263        clientCreator = protocol.ClientCreator( 
     1264            reactor, lambda: HandleSavingProtocol(onClientConnectionLost)) 
     1265        clientDeferred = self.connectClient( 
     1266            serverAddr.host, serverAddr.port, clientCreator) 
     1267 
     1268        def clientConnected(client): 
     1269            """ 
     1270            Disconnect the client.  Return a Deferred which fires when both 
     1271            the client and the server have received disconnect notification. 
     1272            """ 
     1273            client.transport.write( 
     1274                'some bytes to make sure the connection is set up') 
     1275            client.transport.loseConnection() 
     1276            return defer.gatherResults([ 
     1277                onClientConnectionLost, onServerConnectionLost]) 
     1278        clientDeferred.addCallback(clientConnected) 
     1279 
     1280        def clientDisconnected((client, server)): 
     1281            """ 
     1282            Verify that the underlying platform socket handle has been 
     1283            cleaned up. 
     1284            """ 
     1285            client.lostConnectionReason.trap(error.ConnectionClosed) 
     1286            server.lostConnectionReason.trap(error.ConnectionClosed) 
     1287            expectedErrorCode = self.getHandleErrorCode() 
     1288            err = self.assertRaises( 
     1289                self.getHandleExceptionType(), client.handle.send, 'bytes') 
     1290            self.assertEqual(err.args[0], expectedErrorCode) 
     1291        clientDeferred.addCallback(clientDisconnected) 
     1292 
     1293        def cleanup(passthrough): 
     1294            """ 
     1295            Shut down the server port.  Return a Deferred which fires when 
     1296            this has completed. 
     1297            """ 
     1298            result = defer.maybeDeferred(serverPort.stopListening) 
     1299            result.addCallback(lambda ign: passthrough) 
     1300            return result 
     1301        clientDeferred.addBoth(cleanup) 
     1302 
     1303        return clientDeferred 
     1304 
     1305 
     1306 
     1307class ProperlyCloseFilesTestCase(unittest.TestCase, ProperlyCloseFilesMixin): 
     1308    """ 
     1309    Test that the sockets created by L{IReactorTCP.connectTCP} are cleaned up 
     1310    when the connection they are associated with is closed. 
     1311    """ 
     1312    def createServer(self, address, portNumber, factory): 
     1313        """ 
     1314        Create a TCP server using L{IReactorTCP.listenTCP}. 
     1315        """ 
     1316        return reactor.listenTCP(portNumber, factory, interface=address) 
     1317 
     1318 
     1319    def connectClient(self, address, portNumber, clientCreator): 
     1320        """ 
     1321        Create a TCP client using L{IReactorTCP.connectTCP}. 
     1322        """ 
     1323        return clientCreator.connectTCP(address, portNumber) 
     1324 
     1325 
     1326    def getHandleExceptionType(self): 
     1327        """ 
     1328        Return L{socket.error} as the expected error type which will be 
     1329        raised by a write to the low-level socket object after it has been 
     1330        closed. 
     1331        """ 
     1332        return socket.error 
     1333 
     1334 
     1335 
     1336class WiredForDeferreds(policies.ProtocolWrapper): 
     1337    def __init__(self, factory, wrappedProtocol): 
     1338        policies.ProtocolWrapper.__init__(self, factory, wrappedProtocol) 
     1339 
     1340    def connectionMade(self): 
     1341        policies.ProtocolWrapper.connectionMade(self) 
     1342        self.factory.onConnect.callback(None) 
     1343 
     1344    def connectionLost(self, reason): 
     1345        policies.ProtocolWrapper.connectionLost(self, reason) 
     1346        self.factory.onDisconnect.callback(None) 
     1347 
     1348 
     1349 
     1350class WiredFactory(policies.WrappingFactory): 
     1351    protocol = WiredForDeferreds 
     1352 
     1353    def __init__(self, wrappedFactory): 
     1354        policies.WrappingFactory.__init__(self, wrappedFactory) 
     1355        self.onConnect = defer.Deferred() 
     1356        self.onDisconnect = defer.Deferred() 
     1357 
     1358 
     1359 
     1360class AddressTestCase(unittest.TestCase): 
     1361    """ 
     1362    Tests for address-related interactions with client and server protocols. 
     1363    """ 
     1364    def setUp(self): 
     1365        """ 
     1366        Create a port and connected client/server pair which can be used 
     1367        to test factory behavior related to addresses. 
     1368 
     1369        @return: A L{defer.Deferred} which will be called back when both the 
     1370            client and server protocols have received their connection made 
     1371            callback. 
     1372        """ 
     1373        class RememberingWrapper(protocol.ClientFactory): 
     1374            """ 
     1375            Simple wrapper factory which records the addresses which are 
     1376            passed to its L{buildProtocol} method and delegates actual 
     1377            protocol creation to another factory. 
     1378 
     1379            @ivar addresses: A list of the objects passed to buildProtocol. 
     1380            @ivar factory: The wrapped factory to which protocol creation is 
     1381                delegated. 
     1382            """ 
     1383            def __init__(self, factory): 
     1384                self.addresses = [] 
     1385                self.factory = factory 
     1386 
     1387            # Only bother to pass on buildProtocol calls to the wrapped 
     1388            # factory - doStart, doStop, etc aren't necessary for this test 
     1389            # to pass. 
     1390            def buildProtocol(self, addr): 
     1391                """ 
     1392                Append the given address to C{self.addresses} and forward 
     1393                the call to C{self.factory}. 
     1394                """ 
     1395                self.addresses.append(addr) 
     1396                return self.factory.buildProtocol(addr) 
     1397 
     1398        # Make a server which we can receive connection and disconnection 
     1399        # notification for, and which will record the address passed to its 
     1400        # buildProtocol. 
     1401        self.server = MyServerFactory() 
     1402        self.serverConnMade = self.server.protocolConnectionMade = defer.Deferred() 
     1403        self.serverConnLost = self.server.protocolConnectionLost = defer.Deferred() 
     1404        # RememberingWrapper is a ClientFactory, but ClientFactory is-a 
     1405        # ServerFactory, so this is okay. 
     1406        self.serverWrapper = RememberingWrapper(self.server) 
     1407 
     1408        # Do something similar for a client. 
     1409        self.client = MyClientFactory() 
     1410        self.clientConnMade = self.client.protocolConnectionMade = defer.Deferred() 
     1411        self.clientConnLost = self.client.protocolConnectionLost = defer.Deferred() 
     1412        self.clientWrapper = RememberingWrapper(self.client) 
     1413 
     1414        self.port = reactor.listenTCP(0, self.serverWrapper, interface='::1') 
     1415        self.connector = reactor.connectTCP( 
     1416            self.port.getHost().host, self.port.getHost().port, self.clientWrapper) 
     1417 
     1418        return defer.gatherResults([self.serverConnMade, self.clientConnMade]) 
     1419 
     1420 
     1421    def tearDown(self): 
     1422        """ 
     1423        Disconnect the client/server pair and shutdown the port created in 
     1424        L{setUp}. 
     1425        """ 
     1426        self.connector.disconnect() 
     1427        return defer.gatherResults([ 
     1428            self.serverConnLost, self.clientConnLost, 
     1429            defer.maybeDeferred(self.port.stopListening)]) 
     1430 
     1431 
     1432    def test_buildProtocolClient(self): 
     1433        """ 
     1434        L{ClientFactory.buildProtocol} should be invoked with the address of 
     1435        the server to which a connection has been established, which should 
     1436        be the same as the address reported by the C{getHost} method of the 
     1437        transport of the server protocol and as the C{getPeer} method of the 
     1438        transport of the client protocol. 
     1439        """ 
     1440        serverHost = self.server.protocol.transport.getHost() 
     1441        clientPeer = self.client.protocol.transport.getPeer() 
     1442 
     1443        self.assertEqual( 
     1444            self.clientWrapper.addresses, 
     1445            [IPv6Address('TCP', serverHost.host, serverHost.port)]) 
     1446        self.assertEqual( 
     1447            self.clientWrapper.addresses, 
     1448            [IPv6Address('TCP', clientPeer.host, clientPeer.port)]) 
     1449 
     1450 
     1451 
     1452class LargeBufferWriterProtocol(protocol.Protocol): 
     1453 
     1454    # Win32 sockets cannot handle single huge chunks of bytes.  Write one 
     1455    # massive string to make sure Twisted deals with this fact. 
     1456 
     1457    def connectionMade(self): 
     1458        # write 60MB 
     1459        self.transport.write('X'*self.factory.len) 
     1460        self.factory.done = 1 
     1461        self.transport.loseConnection() 
     1462 
     1463class LargeBufferReaderProtocol(protocol.Protocol): 
     1464    def dataReceived(self, data): 
     1465        self.factory.len += len(data) 
     1466    def connectionLost(self, reason): 
     1467        self.factory.done = 1 
     1468 
     1469class LargeBufferReaderClientFactory(protocol.ClientFactory): 
     1470    def __init__(self): 
     1471        self.done = 0 
     1472        self.len = 0 
     1473    def buildProtocol(self, addr): 
     1474        p = LargeBufferReaderProtocol() 
     1475        p.factory = self 
     1476        self.protocol = p 
     1477        return p 
     1478 
     1479 
     1480class FireOnClose(policies.ProtocolWrapper): 
     1481    """A wrapper around a protocol that makes it fire a deferred when 
     1482    connectionLost is called. 
     1483    """ 
     1484    def connectionLost(self, reason): 
     1485        policies.ProtocolWrapper.connectionLost(self, reason) 
     1486        self.factory.deferred.callback(None) 
     1487 
     1488 
     1489class FireOnCloseFactory(policies.WrappingFactory): 
     1490    protocol = FireOnClose 
     1491 
     1492    def __init__(self, wrappedFactory): 
     1493        policies.WrappingFactory.__init__(self, wrappedFactory) 
     1494        self.deferred = defer.Deferred() 
     1495 
     1496 
     1497class LargeBufferTestCase(unittest.TestCase): 
     1498    """Test that buffering large amounts of data works. 
     1499    """ 
     1500 
     1501    datalen = 60*1024*1024 
     1502    def testWriter(self): 
     1503        f = protocol.Factory() 
     1504        f.protocol = LargeBufferWriterProtocol 
     1505        f.done = 0 
     1506        f.problem = 0 
     1507        f.len = self.datalen 
     1508        wrappedF = FireOnCloseFactory(f) 
     1509        p = reactor.listenTCP(0, wrappedF, interface="::1") 
     1510        self.addCleanup(p.stopListening) 
     1511        n = p.getHost().port 
     1512        clientF = LargeBufferReaderClientFactory() 
     1513        wrappedClientF = FireOnCloseFactory(clientF) 
     1514        reactor.connectTCP("::1", n, wrappedClientF) 
     1515 
     1516        d = defer.gatherResults([wrappedF.deferred, wrappedClientF.deferred]) 
     1517        def check(ignored): 
     1518            self.failUnless(f.done, "writer didn't finish, it probably died") 
     1519            self.failUnless(clientF.len == self.datalen, 
     1520                            "client didn't receive all the data it expected " 
     1521                            "(%d != %d)" % (clientF.len, self.datalen)) 
     1522            self.failUnless(clientF.done, 
     1523                            "client didn't see connection dropped") 
     1524        return d.addCallback(check) 
     1525 
     1526 
     1527class MyHCProtocol(AccumulatingProtocol): 
     1528 
     1529    implements(IHalfCloseableProtocol) 
     1530 
     1531    readHalfClosed = False 
     1532    writeHalfClosed = False 
     1533 
     1534    def readConnectionLost(self): 
     1535        self.readHalfClosed = True 
     1536        # Invoke notification logic from the base class to simplify testing. 
     1537        if self.writeHalfClosed: 
     1538            self.connectionLost(None) 
     1539 
     1540    def writeConnectionLost(self): 
     1541        self.writeHalfClosed = True 
     1542        # Invoke notification logic from the base class to simplify testing. 
     1543        if self.readHalfClosed: 
     1544            self.connectionLost(None) 
     1545 
     1546 
     1547class MyHCFactory(protocol.ServerFactory): 
     1548 
     1549    called = 0 
     1550    protocolConnectionMade = None 
     1551 
     1552    def buildProtocol(self, addr): 
     1553        self.called += 1 
     1554        p = MyHCProtocol() 
     1555        p.factory = self 
     1556        self.protocol = p 
     1557        return p 
     1558 
     1559 
     1560class HalfCloseTestCase(unittest.TestCase): 
     1561    """Test half-closing connections.""" 
     1562 
     1563    def setUp(self): 
     1564        self.f = f = MyHCFactory() 
     1565        self.p = p = reactor.listenTCP(0, f, interface="::1") 
     1566        self.addCleanup(p.stopListening) 
     1567        d = loopUntil(lambda :p.connected) 
     1568 
     1569        self.cf = protocol.ClientCreator(reactor, MyHCProtocol) 
     1570 
     1571        d.addCallback(lambda _: self.cf.connectTCP(p.getHost().host, 
     1572                                                   p.getHost().port)) 
     1573        d.addCallback(self._setUp) 
     1574        return d 
     1575 
     1576    def _setUp(self, client): 
     1577        self.client = client 
     1578        self.clientProtoConnectionLost = self.client.closedDeferred = defer.Deferred() 
     1579        self.assertEquals(self.client.transport.connected, 1) 
     1580        # Wait for the server to notice there is a connection, too. 
     1581        return loopUntil(lambda: getattr(self.f, 'protocol', None) is not None) 
     1582 
     1583    def tearDown(self): 
     1584        self.assertEquals(self.client.closed, 0) 
     1585        self.client.transport.loseConnection() 
     1586        d = defer.maybeDeferred(self.p.stopListening) 
     1587        d.addCallback(lambda ign: self.clientProtoConnectionLost) 
     1588        d.addCallback(self._tearDown) 
     1589        return d 
     1590 
     1591    def _tearDown(self, ignored): 
     1592        self.assertEquals(self.client.closed, 1) 
     1593        # because we did half-close, the server also needs to 
     1594        # closed explicitly. 
     1595        self.assertEquals(self.f.protocol.closed, 0) 
     1596        d = defer.Deferred() 
     1597        def _connectionLost(reason): 
     1598            self.f.protocol.closed = 1 
     1599            d.callback(None) 
     1600        self.f.protocol.connectionLost = _connectionLost 
     1601        self.f.protocol.transport.loseConnection() 
     1602        d.addCallback(lambda x:self.assertEquals(self.f.protocol.closed, 1)) 
     1603        return d 
     1604 
     1605    def testCloseWriteCloser(self): 
     1606        client = self.client 
     1607        f = self.f 
     1608        t = client.transport 
     1609 
     1610        t.write("hello") 
     1611        d = loopUntil(lambda :len(t._tempDataBuffer) == 0) 
     1612        def loseWrite(ignored): 
     1613            t.loseWriteConnection() 
     1614            return loopUntil(lambda :t._writeDisconnected) 
     1615        def check(ignored): 
     1616            self.assertEquals(client.closed, False) 
     1617            self.assertEquals(client.writeHalfClosed, True) 
     1618            self.assertEquals(client.readHalfClosed, False) 
     1619            return loopUntil(lambda :f.protocol.readHalfClosed) 
     1620        def write(ignored): 
     1621            w = client.transport.write 
     1622            w(" world") 
     1623            w("lalala fooled you") 
     1624            self.assertEquals(0, len(client.transport._tempDataBuffer)) 
     1625            self.assertEquals(f.protocol.data, "hello") 
     1626            self.assertEquals(f.protocol.closed, False) 
     1627            self.assertEquals(f.protocol.readHalfClosed, True) 
     1628        return d.addCallback(loseWrite).addCallback(check).addCallback(write) 
     1629 
     1630    def testWriteCloseNotification(self): 
     1631        f = self.f 
     1632        f.protocol.transport.loseWriteConnection() 
     1633 
     1634        d = defer.gatherResults([ 
     1635            loopUntil(lambda :f.protocol.writeHalfClosed), 
     1636            loopUntil(lambda :self.client.readHalfClosed)]) 
     1637        d.addCallback(lambda _: self.assertEquals( 
     1638            f.protocol.readHalfClosed, False)) 
     1639        return d 
     1640 
     1641 
     1642class HalfClose2TestCase(unittest.TestCase): 
     1643 
     1644    def setUp(self): 
     1645        self.f = f = MyServerFactory() 
     1646        self.f.protocolConnectionMade = defer.Deferred() 
     1647        self.p = p = reactor.listenTCP(0, f, interface="::1") 
     1648 
     1649        # XXX we don't test server side yet since we don't do it yet 
     1650        d = protocol.ClientCreator(reactor, AccumulatingProtocol).connectTCP( 
     1651            p.getHost().host, p.getHost().port) 
     1652        d.addCallback(self._gotClient) 
     1653        return d 
     1654 
     1655    def _gotClient(self, client): 
     1656        self.client = client 
     1657        # Now wait for the server to catch up - it doesn't matter if this 
     1658        # Deferred has already fired and gone away, in that case we'll 
     1659        # return None and not wait at all, which is precisely correct. 
     1660        return self.f.protocolConnectionMade 
     1661 
     1662    def tearDown(self): 
     1663        self.client.transport.loseConnection() 
     1664        return self.p.stopListening() 
     1665 
     1666    def testNoNotification(self): 
     1667        """ 
     1668        TCP protocols support half-close connections, but not all of them 
     1669        support being notified of write closes.  In this case, test that 
     1670        half-closing the connection causes the peer's connection to be 
     1671        closed. 
     1672        """ 
     1673        self.client.transport.write("hello") 
     1674        self.client.transport.loseWriteConnection() 
     1675        self.f.protocol.closedDeferred = d = defer.Deferred() 
     1676        self.client.closedDeferred = d2 = defer.Deferred() 
     1677        d.addCallback(lambda x: 
     1678                      self.assertEqual(self.f.protocol.data, 'hello')) 
     1679        d.addCallback(lambda x: self.assertEqual(self.f.protocol.closed, True)) 
     1680        return defer.gatherResults([d, d2]) 
     1681 
     1682    def testShutdownException(self): 
     1683        """ 
     1684        If the other side has already closed its connection, 
     1685        loseWriteConnection should pass silently. 
     1686        """ 
     1687        self.f.protocol.transport.loseConnection() 
     1688        self.client.transport.write("X") 
     1689        self.client.transport.loseWriteConnection() 
     1690        self.f.protocol.closedDeferred = d = defer.Deferred() 
     1691        self.client.closedDeferred = d2 = defer.Deferred() 
     1692        d.addCallback(lambda x: 
     1693                      self.failUnlessEqual(self.f.protocol.closed, True)) 
     1694        return defer.gatherResults([d, d2]) 
     1695 
     1696 
     1697class HalfCloseBuggyApplicationTests(unittest.TestCase): 
     1698    """ 
     1699    Test half-closing connections where notification code has bugs. 
     1700    """ 
     1701 
     1702    def setUp(self): 
     1703        """ 
     1704        Set up a server and connect a client to it.  Return a Deferred which 
     1705        only fires once this is done. 
     1706        """ 
     1707        self.serverFactory = MyHCFactory() 
     1708        self.serverFactory.protocolConnectionMade = defer.Deferred() 
     1709        self.port = reactor.listenTCP( 
     1710            0, self.serverFactory, interface="::1") 
     1711        self.addCleanup(self.port.stopListening) 
     1712        addr = self.port.getHost() 
     1713        creator = protocol.ClientCreator(reactor, MyHCProtocol) 
     1714        clientDeferred = creator.connectTCP(addr.host, addr.port) 
     1715        def setClient(clientProtocol): 
     1716            self.clientProtocol = clientProtocol 
     1717        clientDeferred.addCallback(setClient) 
     1718        return defer.gatherResults([ 
     1719            self.serverFactory.protocolConnectionMade, 
     1720            clientDeferred]) 
     1721 
     1722 
     1723    def aBug(self, *args): 
     1724        """ 
     1725        Fake implementation of a callback which illegally raises an 
     1726        exception. 
     1727        """ 
     1728        raise RuntimeError("ONO I AM BUGGY CODE") 
     1729 
     1730 
     1731    def _notificationRaisesTest(self): 
     1732        """ 
     1733        Helper for testing that an exception is logged by the time the 
     1734        client protocol loses its connection. 
     1735        """ 
     1736        closed = self.clientProtocol.closedDeferred = defer.Deferred() 
     1737        self.clientProtocol.transport.loseWriteConnection() 
     1738        def check(ignored): 
     1739            errors = self.flushLoggedErrors(RuntimeError) 
     1740            self.assertEqual(len(errors), 1) 
     1741        closed.addCallback(check) 
     1742        return closed 
     1743 
     1744 
     1745    def test_readNotificationRaises(self): 
     1746        """ 
     1747        If C{readConnectionLost} raises an exception when the transport 
     1748        calls it to notify the protocol of that event, the exception should 
     1749        be logged and the protocol should be disconnected completely. 
     1750        """ 
     1751        self.serverFactory.protocol.readConnectionLost = self.aBug 
     1752        return self._notificationRaisesTest() 
     1753 
     1754 
     1755    def test_writeNotificationRaises(self): 
     1756        """ 
     1757        If C{writeConnectionLost} raises an exception when the transport 
     1758        calls it to notify the protocol of that event, the exception should 
     1759        be logged and the protocol should be disconnected completely. 
     1760        """ 
     1761        self.clientProtocol.writeConnectionLost = self.aBug 
     1762        return self._notificationRaisesTest() 
     1763 
     1764 
     1765 
     1766class LogTestCase(unittest.TestCase): 
     1767    """ 
     1768    Test logging facility of TCP base classes. 
     1769    """ 
     1770 
     1771    def test_logstrClientSetup(self): 
     1772        """ 
     1773        Check that the log customization of the client transport happens 
     1774        once the client is connected. 
     1775        """ 
     1776        server = MyServerFactory() 
     1777 
     1778        client = MyClientFactory() 
     1779        client.protocolConnectionMade = defer.Deferred() 
     1780 
     1781        port = reactor.listenTCP(0, server, interface='::1') 
     1782        self.addCleanup(port.stopListening) 
     1783 
     1784        connector = reactor.connectTCP( 
     1785            port.getHost().host, port.getHost().port, client) 
     1786        self.addCleanup(connector.disconnect) 
     1787 
     1788        # It should still have the default value 
     1789        self.assertEquals(connector.transport.logstr, 
     1790                          "Uninitialized") 
     1791 
     1792        def cb(ign): 
     1793            self.assertEquals(connector.transport.logstr, 
     1794                              "AccumulatingProtocol,client") 
     1795        client.protocolConnectionMade.addCallback(cb) 
     1796        return client.protocolConnectionMade 
     1797 
     1798 
     1799 
     1800class PauseProducingTestCase(unittest.TestCase): 
     1801    """ 
     1802    Test some behaviors of pausing the production of a transport. 
     1803    """ 
     1804 
     1805    def test_pauseProducingInConnectionMade(self): 
     1806        """ 
     1807        In C{connectionMade} of a client protocol, C{pauseProducing} used to be 
     1808        ignored: this test is here to ensure it's not ignored. 
     1809        """ 
     1810        server = MyServerFactory() 
     1811 
     1812        client = MyClientFactory() 
     1813        client.protocolConnectionMade = defer.Deferred() 
     1814 
     1815        port = reactor.listenTCP(0, server, interface='::1') 
     1816        self.addCleanup(port.stopListening) 
     1817 
     1818        connector = reactor.connectTCP( 
     1819            port.getHost().host, port.getHost().port, client) 
     1820        self.addCleanup(connector.disconnect) 
     1821 
     1822        def checkInConnectionMade(proto): 
     1823            tr = proto.transport 
     1824            # The transport should already be monitored 
     1825            self.assertIn(tr, reactor.getReaders() + 
     1826                              reactor.getWriters()) 
     1827            proto.transport.pauseProducing() 
     1828            self.assertNotIn(tr, reactor.getReaders() + 
     1829                                 reactor.getWriters()) 
     1830            d = defer.Deferred() 
     1831            d.addCallback(checkAfterConnectionMade) 
     1832            reactor.callLater(0, d.callback, proto) 
     1833            return d 
     1834        def checkAfterConnectionMade(proto): 
     1835            tr = proto.transport 
     1836            # The transport should still not be monitored 
     1837            self.assertNotIn(tr, reactor.getReaders() + 
     1838                                 reactor.getWriters()) 
     1839        client.protocolConnectionMade.addCallback(checkInConnectionMade) 
     1840        return client.protocolConnectionMade 
     1841 
     1842    if not interfaces.IReactorFDSet.providedBy(reactor): 
     1843        test_pauseProducingInConnectionMade.skip = "Reactor not providing IReactorFDSet" 
     1844 
     1845 
     1846 
     1847class CallBackOrderTestCase(unittest.TestCase): 
     1848    """ 
     1849    Test the order of reactor callbacks 
     1850    """ 
     1851 
     1852    def test_loseOrder(self): 
     1853        """ 
     1854        Check that Protocol.connectionLost is called before factory's 
     1855        clientConnectionLost 
     1856        """ 
     1857        server = MyServerFactory() 
     1858        server.protocolConnectionMade = (defer.Deferred() 
     1859                .addCallback(lambda proto: self.addCleanup( 
     1860                             proto.transport.loseConnection))) 
     1861 
     1862        client = MyClientFactory() 
     1863        client.protocolConnectionLost = defer.Deferred() 
     1864        client.protocolConnectionMade = defer.Deferred() 
     1865 
     1866        def _cbCM(res): 
     1867            """ 
     1868            protocol.connectionMade callback 
     1869            """ 
     1870            reactor.callLater(0, client.protocol.transport.loseConnection) 
     1871 
     1872        client.protocolConnectionMade.addCallback(_cbCM) 
     1873 
     1874        port = reactor.listenTCP(0, server, interface='::1') 
     1875        self.addCleanup(port.stopListening) 
     1876 
     1877        connector = reactor.connectTCP( 
     1878            port.getHost().host, port.getHost().port, client) 
     1879        self.addCleanup(connector.disconnect) 
     1880 
     1881        def _cbCCL(res): 
     1882            """ 
     1883            factory.clientConnectionLost callback 
     1884            """ 
     1885            return 'CCL' 
     1886 
     1887        def _cbCL(res): 
     1888            """ 
     1889            protocol.connectionLost callback 
     1890            """ 
     1891            return 'CL' 
     1892 
     1893        def _cbGather(res): 
     1894            self.assertEquals(res, ['CL', 'CCL']) 
     1895 
     1896        d = defer.gatherResults([ 
     1897                client.protocolConnectionLost.addCallback(_cbCL), 
     1898                client.deferred.addCallback(_cbCCL)]) 
     1899        return d.addCallback(_cbGather) 
     1900 
     1901 
     1902 
     1903try: 
     1904    import resource 
     1905except ImportError: 
     1906    pass 
     1907else: 
     1908    numRounds = resource.getrlimit(resource.RLIMIT_NOFILE)[0] + 10 
     1909    ProperlyCloseFilesTestCase.numberRounds = numRounds