Ticket #5085: twisted-tcp6.patch

File twisted-tcp6.patch, 75.9 KB (added by chjurk, 5 years ago)
  • twisted/internet/address.py

    diff --git a/twisted/internet/address.py b/twisted/internet/address.py
    index b9192d4..ef3b63b 100644
    a b class IPv6Address(_IPAddress): 
    6060    """
    6161    Object representing an IPv6 socket endpoint.
    6262    """
     63    def __init__(self, type, host, port, flow_info=0, scope_id=0):
     64        self.flow_info = flow_info
     65        self.scope_id = scope_id
     66        _IPAddress.__init__(self, type, host, port)
     67
     68    def __hash__(self):
     69        return hash((self.type, self.host, self.port, self.flow_info, self.scope_id))
    6370
    6471
    6572
  • twisted/internet/tcp.py

    diff --git a/twisted/internet/base.py b/twisted/internet/base.py
    diff --git a/twisted/internet/tcp.py b/twisted/internet/tcp.py
    index 16a096c..682f03e 100644
    a b class BaseClient(_TLSClientMixin, Connection): 
    258258    A base class for client TCP (and similiar) sockets.
    259259    """
    260260    _base = Connection
     261    _addressType = address.IPv4Address
    261262
    262263    addressFamily = socket.AF_INET
    263264    socketType = socket.SOCK_STREAM
    class BaseClient(_TLSClientMixin, Connection): 
    270271            Connection.__init__(self, skt, None, reactor)
    271272            self.doWrite = self.doConnect
    272273            self.doRead = self.doConnect
    273             reactor.callLater(0, whenDone)
     274            self.doConnect()
    274275        else:
    275276            reactor.callLater(0, self.failIfNotConnected, error)
    276277
    class BaseClient(_TLSClientMixin, Connection): 
    312313        return s
    313314
    314315    def resolveAddress(self):
    315         if abstract.isIPAddress(self.addr[0]):
     316        if abstract.isIPAddress(self.addr[0]) or abstract.isIPv6Address(self.addr[0]):
    316317            self._setRealAddress(self.addr[0])
    317318        else:
    318319            d = self.reactor.resolve(self.addr[0])
    319320            d.addCallbacks(self._setRealAddress, self.failIfNotConnected)
    320321
    321     def _setRealAddress(self, address):
    322         self.realAddress = (address, self.addr[1])
    323         self.doConnect()
     322    def _setRealAddress(self, addr):
     323        """
     324        Set the real IP address for this client.
     325        Once the IP address is set, the socket is created using the correct
     326        address family.
     327        """
     328        if abstract.isIPv6Address(addr):
     329            self.addressFamily = socket.AF_INET6
     330            self._addressType = address.IPv6Address
     331        self.realAddress = (addr, self.addr[1])
     332
     333        # create the socket and wait finish init after that
     334        self.initConnection()
     335
     336    def initConnection(self):
     337        """
     338        Initialize connection by creating the appropriate socket.
     339        """
     340        err = None
     341        skt = None
     342        result = True
     343
     344        try:
     345            skt = self.createInternetSocket()
     346        except socket.error, se:
     347            err = error.ConnectBindError(se[0], se[1])
     348            result = None
     349        if result and self.bindAddress is not None:
     350            try:
     351                skt.bind(self.bindAddress)
     352            except socket.error, se:
     353                err = error.ConnectBindError(se[0], se[1])
     354                result = None
     355        self._finishInit(result, skt, err, self.reactor)
    324356
    325357    def doConnect(self):
    326358        """I connect the socket.
    class Client(BaseClient): 
    394426        # BaseClient.__init__ is invoked later
    395427        self.connector = connector
    396428        self.addr = (host, port)
     429        self.bindAddress = bindAddress
     430        self.reactor = reactor
    397431
    398         whenDone = self.resolveAddress
    399         err = None
    400         skt = None
    401 
    402         try:
    403             skt = self.createInternetSocket()
    404         except socket.error, se:
    405             err = error.ConnectBindError(se[0], se[1])
    406             whenDone = None
    407         if whenDone and bindAddress is not None:
    408             try:
    409                 skt.bind(bindAddress)
    410             except socket.error, se:
    411                 err = error.ConnectBindError(se[0], se[1])
    412                 whenDone = None
    413         self._finishInit(whenDone, skt, err, reactor)
     432        # Do outstanding initialization when real address is resolved
     433        self.resolveAddress()
    414434
    415435    def getHost(self):
    416         """Returns an IPv4Address.
     436        """
     437        Returns an L{IPv4Address} or L{IPv6Address}.
    417438
    418439        This indicates the address from which I am connecting.
    419440        """
    420         return address.IPv4Address('TCP', *self.socket.getsockname())
     441        return self._addressType('TCP', *self.socket.getsockname())
    421442
    422443    def getPeer(self):
    423         """Returns an IPv4Address.
     444        """
     445        Returns an L{IPv4Address} or L{IPv6Address}.
    424446
    425447        This indicates the address that I am connected to.
    426448        """
    427         return address.IPv4Address('TCP', *self.realAddress)
     449        return self._addressType('TCP', *self.realAddress)
    428450
    429451    def __repr__(self):
    430452        s = '<%s to %s at %x>' % (self.__class__, self.addr, unsignedID(self))
    class Port(base.BasePort, _SocketCloser): 
    739761
    740762
    741763class Connector(base.BaseConnector):
     764    _addressType = address.IPv4Address
     765
    742766    def __init__(self, host, port, factory, timeout, bindAddress, reactor=None):
    743         self.host = host
    744767        if isinstance(port, types.StringTypes):
    745768            try:
    746769                port = socket.getservbyname(port, 'tcp')
    747770            except socket.error, e:
    748771                raise error.ServiceNameUnknownError(string="%s (%r)" % (e, port))
    749         self.port = port
    750         self.bindAddress = bindAddress
     772
     773        # do a host lookup to make sure we have the correct address family
     774        try:
     775            addressInfo = socket.getaddrinfo(host, port)
     776        except socket.gaierror:
     777            raise error.DNSLookupError(host)
     778        else:
     779            assert len(addressInfo) > 0
     780
     781            if addressInfo[0][0] == socket.AF_INET6:
     782                self._addressType = address.IPv6Address
     783
     784            self.host, self.port = addressInfo[0][4][:2]
     785            self.bindAddress = bindAddress
    751786        base.BaseConnector.__init__(self, factory, timeout, reactor)
    752787
    753788    def _makeTransport(self):
    754789        return Client(self.host, self.port, self.bindAddress, self, self.reactor)
    755790
    756791    def getDestination(self):
    757         return address.IPv4Address('TCP', self.host, self.port)
     792        return self._addressType('TCP', self.host, self.port)
  • new file twisted/test/test_tcp6.py

    diff --git a/twisted/test/test_tcp6.py b/twisted/test/test_tcp6.py
    new file mode 100644
    index 0000000..4761d7d
    - +  
     1# Copyright (c) Twisted Matrix Laboratories.
     2# See LICENSE for details.
     3
     4"""
     5Tests for implementations of L{IReactorTCP} using IPv6.
     6"""
     7
     8import socket, random, errno
     9
     10from zope.interface import implements
     11
     12from twisted.trial import unittest
     13
     14from twisted.python.log import msg
     15from twisted.internet import protocol, reactor, defer, interfaces
     16from twisted.internet import error
     17from twisted.internet.address import IPv6Address
     18from twisted.internet.interfaces import IHalfCloseableProtocol, IPullProducer
     19from twisted.protocols import policies
     20from twisted.test.proto_helpers import AccumulatingProtocol
     21
     22
     23def loopUntil(predicate, interval=0):
     24    """
     25    Poor excuse for an event notification helper.  This polls a condition and
     26    calls back a Deferred when it is seen to be true.
     27
     28    Do not use this function.
     29    """
     30    from twisted.internet import task
     31    d = defer.Deferred()
     32    def check():
     33        res = predicate()
     34        if res:
     35            d.callback(res)
     36    call = task.LoopingCall(check)
     37    def stop(result):
     38        call.stop()
     39        return result
     40    d.addCallback(stop)
     41    d2 = call.start(interval)
     42    d2.addErrback(d.errback)
     43    return d
     44
     45
     46
     47class ClosingProtocol(protocol.Protocol):
     48
     49    def connectionMade(self):
     50        msg("ClosingProtocol.connectionMade")
     51        self.transport.loseConnection()
     52
     53    def connectionLost(self, reason):
     54        msg("ClosingProtocol.connectionLost")
     55        reason.trap(error.ConnectionDone)
     56
     57
     58
     59class ClosingFactory(protocol.ServerFactory):
     60    """
     61    Factory that closes port immediately.
     62    """
     63
     64    _cleanerUpper = None
     65
     66    def buildProtocol(self, conn):
     67        self._cleanerUpper = self.port.stopListening()
     68        return ClosingProtocol()
     69
     70
     71    def cleanUp(self):
     72        """
     73        Clean-up for tests to wait for the port to stop listening.
     74        """
     75        if self._cleanerUpper is None:
     76            return self.port.stopListening()
     77        return self._cleanerUpper
     78
     79
     80
     81class MyProtocolFactoryMixin(object):
     82    """
     83    Mixin for factories which create L{AccumulatingProtocol} instances.
     84
     85    @type protocolFactory: no-argument callable
     86    @ivar protocolFactory: Factory for protocols - takes the place of the
     87        typical C{protocol} attribute of factories (but that name is used by
     88        this class for something else).
     89
     90    @type protocolConnectionMade: L{NoneType} or L{defer.Deferred}
     91    @ivar protocolConnectionMade: When an instance of L{AccumulatingProtocol}
     92        is connected, if this is not C{None}, the L{Deferred} will be called
     93        back with the protocol instance and the attribute set to C{None}.
     94
     95    @type protocolConnectionLost: L{NoneType} or L{defer.Deferred}
     96    @ivar protocolConnectionLost: When an instance of L{AccumulatingProtocol}
     97        is created, this will be set as its C{closedDeferred} attribute and
     98        then this attribute will be set to C{None} so the L{defer.Deferred} is
     99        not used by more than one protocol.
     100
     101    @ivar protocol: The most recently created L{AccumulatingProtocol} instance
     102        which was returned from C{buildProtocol}.
     103
     104    @type called: C{int}
     105    @ivar called: A counter which is incremented each time C{buildProtocol}
     106        is called.
     107
     108    @ivar peerAddresses: A C{list} of the addresses passed to C{buildProtocol}.
     109    """
     110    protocolFactory = AccumulatingProtocol
     111
     112    protocolConnectionMade = None
     113    protocolConnectionLost = None
     114    protocol = None
     115    called = 0
     116
     117    def __init__(self):
     118        self.peerAddresses = []
     119
     120
     121    def buildProtocol(self, addr):
     122        """
     123        Create a L{AccumulatingProtocol} and set it up to be able to perform
     124        callbacks.
     125        """
     126        self.peerAddresses.append(addr)
     127        self.called += 1
     128        p = self.protocolFactory()
     129        p.factory = self
     130        p.closedDeferred = self.protocolConnectionLost
     131        self.protocolConnectionLost = None
     132        self.protocol = p
     133        return p
     134
     135
     136
     137class MyServerFactory(MyProtocolFactoryMixin, protocol.ServerFactory):
     138    """
     139    Server factory which creates L{AccumulatingProtocol} instances.
     140    """
     141
     142
     143
     144class MyClientFactory(MyProtocolFactoryMixin, protocol.ClientFactory):
     145    """
     146    Client factory which creates L{AccumulatingProtocol} instances.
     147    """
     148    failed = 0
     149    stopped = 0
     150
     151    def __init__(self):
     152        MyProtocolFactoryMixin.__init__(self)
     153        self.deferred = defer.Deferred()
     154        self.failDeferred = defer.Deferred()
     155
     156    def clientConnectionFailed(self, connector, reason):
     157        self.failed = 1
     158        self.reason = reason
     159        self.failDeferred.callback(None)
     160
     161    def clientConnectionLost(self, connector, reason):
     162        self.lostReason = reason
     163        self.deferred.callback(None)
     164
     165    def stopFactory(self):
     166        self.stopped = 1
     167
     168
     169
     170class ListeningTestCase(unittest.TestCase):
     171
     172    def test_listen(self):
     173        """
     174        L{IReactorTCP.listenTCP} returns an object which provides
     175        L{IListeningPort}.
     176        """
     177        f = MyServerFactory()
     178        p1 = reactor.listenTCP(0, f, interface="::1")
     179        self.addCleanup(p1.stopListening)
     180        self.failUnless(interfaces.IListeningPort.providedBy(p1))
     181
     182
     183    def testStopListening(self):
     184        """
     185        The L{IListeningPort} returned by L{IReactorTCP.listenTCP} can be
     186        stopped with its C{stopListening} method.  After the L{Deferred} it
     187        (optionally) returns has been called back, the port number can be bound
     188        to a new server.
     189        """
     190        f = MyServerFactory()
     191        port = reactor.listenTCP(0, f, interface="::1")
     192        n = port.getHost().port
     193
     194        def cbStopListening(ignored):
     195            # Make sure we can rebind the port right away
     196            port = reactor.listenTCP(n, f, interface="::1")
     197            return port.stopListening()
     198
     199        d = defer.maybeDeferred(port.stopListening)
     200        d.addCallback(cbStopListening)
     201        return d
     202
     203
     204    def testNumberedInterface(self):
     205        f = MyServerFactory()
     206        # listen only on the loopback interface
     207        p1 = reactor.listenTCP(0, f, interface='::1')
     208        return p1.stopListening()
     209
     210    def testPortRepr(self):
     211        f = MyServerFactory()
     212        p = reactor.listenTCP(0, f)
     213        portNo = str(p.getHost().port)
     214        self.failIf(repr(p).find(portNo) == -1)
     215        def stoppedListening(ign):
     216            self.failIf(repr(p).find(portNo) != -1)
     217        d = defer.maybeDeferred(p.stopListening)
     218        return d.addCallback(stoppedListening)
     219
     220
     221    def test_serverRepr(self):
     222        """
     223        Check that the repr string of the server transport get the good port
     224        number if the server listens on 0.
     225        """
     226        server = MyServerFactory()
     227        serverConnMade = server.protocolConnectionMade = defer.Deferred()
     228        port = reactor.listenTCP(0, server, 50, interface="::1")
     229        self.addCleanup(port.stopListening)
     230
     231        client = MyClientFactory()
     232        clientConnMade = client.protocolConnectionMade = defer.Deferred()
     233        connector = reactor.connectTCP("::1",
     234                                       port.getHost().port, client)
     235        self.addCleanup(connector.disconnect)
     236        def check((serverProto, clientProto)):
     237            portNumber = port.getHost().port
     238            self.assertEquals(
     239                repr(serverProto.transport),
     240                "<AccumulatingProtocol #0 on %s>" % (portNumber,))
     241            serverProto.transport.loseConnection()
     242            clientProto.transport.loseConnection()
     243
     244        return defer.gatherResults([serverConnMade, clientConnMade]
     245            ).addCallback(check)
     246
     247
     248    def test_restartListening(self):
     249        """
     250        Stop and then try to restart a L{tcp.Port}: after a restart, the
     251        server should be able to handle client connections.
     252        """
     253        serverFactory = MyServerFactory()
     254        port = reactor.listenTCP(0, serverFactory, interface="::1")
     255        self.addCleanup(port.stopListening)
     256
     257        def cbStopListening(ignored):
     258            port.startListening()
     259
     260            client = MyClientFactory()
     261            serverFactory.protocolConnectionMade = defer.Deferred()
     262            client.protocolConnectionMade = defer.Deferred()
     263            connector = reactor.connectTCP("::1",
     264                                           port.getHost().port, client)
     265            self.addCleanup(connector.disconnect)
     266            return defer.gatherResults([serverFactory.protocolConnectionMade,
     267                                        client.protocolConnectionMade]
     268                ).addCallback(close)
     269
     270        def close((serverProto, clientProto)):
     271            clientProto.transport.loseConnection()
     272            serverProto.transport.loseConnection()
     273
     274        d = defer.maybeDeferred(port.stopListening)
     275        d.addCallback(cbStopListening)
     276        return d
     277
     278
     279    def test_exceptInStop(self):
     280        """
     281        If the server factory raises an exception in C{stopFactory}, the
     282        deferred returned by L{tcp.Port.stopListening} should fail with the
     283        corresponding error.
     284        """
     285        serverFactory = MyServerFactory()
     286        def raiseException():
     287            raise RuntimeError("An error")
     288        serverFactory.stopFactory = raiseException
     289        port = reactor.listenTCP(0, serverFactory, interface="::1")
     290
     291        return self.assertFailure(port.stopListening(), RuntimeError)
     292
     293
     294    def test_restartAfterExcept(self):
     295        """
     296        Even if the server factory raise an exception in C{stopFactory}, the
     297        corresponding C{tcp.Port} instance should be in a sane state and can
     298        be restarted.
     299        """
     300        serverFactory = MyServerFactory()
     301        def raiseException():
     302            raise RuntimeError("An error")
     303        serverFactory.stopFactory = raiseException
     304        port = reactor.listenTCP(0, serverFactory, interface="::1")
     305        self.addCleanup(port.stopListening)
     306
     307        def cbStopListening(ignored):
     308            del serverFactory.stopFactory
     309            port.startListening()
     310
     311            client = MyClientFactory()
     312            serverFactory.protocolConnectionMade = defer.Deferred()
     313            client.protocolConnectionMade = defer.Deferred()
     314            connector = reactor.connectTCP("::1",
     315                                           port.getHost().port, client)
     316            self.addCleanup(connector.disconnect)
     317            return defer.gatherResults([serverFactory.protocolConnectionMade,
     318                                        client.protocolConnectionMade]
     319                ).addCallback(close)
     320
     321        def close((serverProto, clientProto)):
     322            clientProto.transport.loseConnection()
     323            serverProto.transport.loseConnection()
     324
     325        return self.assertFailure(port.stopListening(), RuntimeError
     326            ).addCallback(cbStopListening)
     327
     328
     329    def test_directConnectionLostCall(self):
     330        """
     331        If C{connectionLost} is called directly on a port object, it succeeds
     332        (and doesn't expect the presence of a C{deferred} attribute).
     333
     334        C{connectionLost} is called by L{reactor.disconnectAll} at shutdown.
     335        """
     336        serverFactory = MyServerFactory()
     337        port = reactor.listenTCP(0, serverFactory, interface="::1")
     338        portNumber = port.getHost().port
     339        port.connectionLost(None)
     340
     341        client = MyClientFactory()
     342        serverFactory.protocolConnectionMade = defer.Deferred()
     343        client.protocolConnectionMade = defer.Deferred()
     344        reactor.connectTCP("::1", portNumber, client)
     345        def check(ign):
     346            client.reason.trap(error.ConnectionRefusedError)
     347        return client.failDeferred.addCallback(check)
     348
     349
     350    def test_exceptInConnectionLostCall(self):
     351        """
     352        If C{connectionLost} is called directory on a port object and that the
     353        server factory raises an exception in C{stopFactory}, the exception is
     354        passed through to the caller.
     355
     356        C{connectionLost} is called by L{reactor.disconnectAll} at shutdown.
     357        """
     358        serverFactory = MyServerFactory()
     359        def raiseException():
     360            raise RuntimeError("An error")
     361        serverFactory.stopFactory = raiseException
     362        port = reactor.listenTCP(0, serverFactory, interface="::1")
     363        self.assertRaises(RuntimeError, port.connectionLost, None)
     364
     365
     366
     367def callWithSpew(f):
     368    from twisted.python.util import spewerWithLinenums as spewer
     369    import sys
     370    sys.settrace(spewer)
     371    try:
     372        f()
     373    finally:
     374        sys.settrace(None)
     375
     376class LoopbackTestCase(unittest.TestCase):
     377    """
     378    Test loopback connections.
     379    """
     380    def test_closePortInProtocolFactory(self):
     381        """
     382        A port created with L{IReactorTCP.listenTCP} can be connected to with
     383        L{IReactorTCP.connectTCP}.
     384        """
     385        f = ClosingFactory()
     386        port = reactor.listenTCP(0, f, interface="::1")
     387        f.port = port
     388        self.addCleanup(f.cleanUp)
     389        portNumber = port.getHost().port
     390        clientF = MyClientFactory()
     391        reactor.connectTCP("::1", portNumber, clientF)
     392        def check(x):
     393            self.assertTrue(clientF.protocol.made)
     394            self.assertTrue(port.disconnected)
     395            clientF.lostReason.trap(error.ConnectionDone)
     396        return clientF.deferred.addCallback(check)
     397
     398    def _trapCnxDone(self, obj):
     399        getattr(obj, 'trap', lambda x: None)(error.ConnectionDone)
     400
     401
     402    def _connectedClientAndServerTest(self, callback):
     403        """
     404        Invoke the given callback with a client protocol and a server protocol
     405        which have been connected to each other.
     406        """
     407        serverFactory = MyServerFactory()
     408        serverConnMade = defer.Deferred()
     409        serverFactory.protocolConnectionMade = serverConnMade
     410        port = reactor.listenTCP(0, serverFactory, interface="::1")
     411        self.addCleanup(port.stopListening)
     412
     413        portNumber = port.getHost().port
     414        clientF = MyClientFactory()
     415        clientConnMade = defer.Deferred()
     416        clientF.protocolConnectionMade = clientConnMade
     417        reactor.connectTCP("::1", portNumber, clientF)
     418
     419        connsMade = defer.gatherResults([serverConnMade, clientConnMade])
     420        def connected((serverProtocol, clientProtocol)):
     421            callback(serverProtocol, clientProtocol)
     422            serverProtocol.transport.loseConnection()
     423            clientProtocol.transport.loseConnection()
     424        connsMade.addCallback(connected)
     425        return connsMade
     426
     427
     428    def test_tcpNoDelay(self):
     429        """
     430        The transport of a protocol connected with L{IReactorTCP.connectTCP} or
     431        L{IReactor.TCP.listenTCP} can have its I{TCP_NODELAY} state inspected
     432        and manipulated with L{ITCPTransport.getTcpNoDelay} and
     433        L{ITCPTransport.setTcpNoDelay}.
     434        """
     435        def check(serverProtocol, clientProtocol):
     436            for p in [serverProtocol, clientProtocol]:
     437                transport = p.transport
     438                self.assertEquals(transport.getTcpNoDelay(), 0)
     439                transport.setTcpNoDelay(1)
     440                self.assertEquals(transport.getTcpNoDelay(), 1)
     441                transport.setTcpNoDelay(0)
     442                self.assertEquals(transport.getTcpNoDelay(), 0)
     443        return self._connectedClientAndServerTest(check)
     444
     445
     446    def test_tcpKeepAlive(self):
     447        """
     448        The transport of a protocol connected with L{IReactorTCP.connectTCP} or
     449        L{IReactor.TCP.listenTCP} can have its I{SO_KEEPALIVE} state inspected
     450        and manipulated with L{ITCPTransport.getTcpKeepAlive} and
     451        L{ITCPTransport.setTcpKeepAlive}.
     452        """
     453        def check(serverProtocol, clientProtocol):
     454            for p in [serverProtocol, clientProtocol]:
     455                transport = p.transport
     456                self.assertEquals(transport.getTcpKeepAlive(), 0)
     457                transport.setTcpKeepAlive(1)
     458                self.assertEquals(transport.getTcpKeepAlive(), 1)
     459                transport.setTcpKeepAlive(0)
     460                self.assertEquals(transport.getTcpKeepAlive(), 0)
     461        return self._connectedClientAndServerTest(check)
     462
     463
     464    def testFailing(self):
     465        clientF = MyClientFactory()
     466        # XXX we assume no one is listening on TCP port 69
     467        reactor.connectTCP("::1", 69, clientF, timeout=5)
     468        def check(ignored):
     469            clientF.reason.trap(error.ConnectionRefusedError)
     470        return clientF.failDeferred.addCallback(check)
     471
     472
     473    def test_connectionRefusedErrorNumber(self):
     474        """
     475        Assert that the error number of the ConnectionRefusedError is
     476        ECONNREFUSED, and not some other socket related error.
     477        """
     478
     479        # Bind a number of ports in the operating system.  We will attempt
     480        # to connect to these in turn immediately after closing them, in the
     481        # hopes that no one else has bound them in the mean time.  Any
     482        # connection which succeeds is ignored and causes us to move on to
     483        # the next port.  As soon as a connection attempt fails, we move on
     484        # to making an assertion about how it failed.  If they all succeed,
     485        # the test will fail.
     486
     487        # It would be nice to have a simpler, reliable way to cause a
     488        # connection failure from the platform.
     489        #
     490        # On Linux (2.6.15), connecting to port 0 always fails.  FreeBSD
     491        # (5.4) rejects the connection attempt with EADDRNOTAVAIL.
     492        #
     493        # On FreeBSD (5.4), listening on a port and then repeatedly
     494        # connecting to it without ever accepting any connections eventually
     495        # leads to an ECONNREFUSED.  On Linux (2.6.15), a seemingly
     496        # unbounded number of connections succeed.
     497
     498        serverSockets = []
     499        for i in xrange(10):
     500            serverSocket = socket.socket(socket.AF_INET6)
     501            serverSocket.bind(('::1', 0))
     502            serverSocket.listen(1)
     503            serverSockets.append(serverSocket)
     504        random.shuffle(serverSockets)
     505
     506        clientCreator = protocol.ClientCreator(reactor, protocol.Protocol)
     507
     508        def tryConnectFailure():
     509            def connected(proto):
     510                """
     511                Darn.  Kill it and try again, if there are any tries left.
     512                """
     513                proto.transport.loseConnection()
     514                if serverSockets:
     515                    return tryConnectFailure()
     516                self.fail("Could not fail to connect - could not test errno for that case.")
     517
     518            serverSocket = serverSockets.pop()
     519            serverHost, serverPort = serverSocket.getsockname()[:2]
     520            serverSocket.close()
     521
     522            connectDeferred = clientCreator.connectTCP(serverHost, serverPort)
     523            connectDeferred.addCallback(connected)
     524            return connectDeferred
     525
     526        refusedDeferred = tryConnectFailure()
     527        self.assertFailure(refusedDeferred, error.ConnectionRefusedError)
     528        def connRefused(exc):
     529            self.assertEqual(exc.osError, errno.ECONNREFUSED)
     530        refusedDeferred.addCallback(connRefused)
     531        def cleanup(passthrough):
     532            while serverSockets:
     533                serverSockets.pop().close()
     534            return passthrough
     535        refusedDeferred.addBoth(cleanup)
     536        return refusedDeferred
     537
     538
     539    def test_connectByServiceFail(self):
     540        """
     541        Connecting to a named service which does not exist raises
     542        L{error.ServiceNameUnknownError}.
     543        """
     544        self.assertRaises(
     545            error.ServiceNameUnknownError,
     546            reactor.connectTCP,
     547            "::1", "thisbetternotexist", MyClientFactory())
     548
     549
     550    def test_connectByService(self):
     551        """
     552        L{IReactorTCP.connectTCP} accepts the name of a service instead of a
     553        port number and connects to the port number associated with that
     554        service, as defined by L{socket.getservbyname}.
     555        """
     556        serverFactory = MyServerFactory()
     557        serverConnMade = defer.Deferred()
     558        serverFactory.protocolConnectionMade = serverConnMade
     559        port = reactor.listenTCP(0, serverFactory, interface="::1")
     560        self.addCleanup(port.stopListening)
     561        portNumber = port.getHost().port
     562        clientFactory = MyClientFactory()
     563        clientConnMade = defer.Deferred()
     564        clientFactory.protocolConnectionMade = clientConnMade
     565
     566        def fakeGetServicePortByName(serviceName, protocolName):
     567            if serviceName == 'http' and protocolName == 'tcp':
     568                return portNumber
     569            return 10
     570        self.patch(socket, 'getservbyname', fakeGetServicePortByName)
     571
     572        reactor.connectTCP('::1', 'http', clientFactory)
     573
     574        connMade = defer.gatherResults([serverConnMade, clientConnMade])
     575        def connected((serverProtocol, clientProtocol)):
     576            self.assertTrue(
     577                serverFactory.called,
     578                "Server factory was not called upon to build a protocol.")
     579            serverProtocol.transport.loseConnection()
     580            clientProtocol.transport.loseConnection()
     581        connMade.addCallback(connected)
     582        return connMade
     583
     584
     585class StartStopFactory(protocol.Factory):
     586
     587    started = 0
     588    stopped = 0
     589
     590    def startFactory(self):
     591        if self.started or self.stopped:
     592            raise RuntimeError
     593        self.started = 1
     594
     595    def stopFactory(self):
     596        if not self.started or self.stopped:
     597            raise RuntimeError
     598        self.stopped = 1
     599
     600
     601class ClientStartStopFactory(MyClientFactory):
     602
     603    started = 0
     604    stopped = 0
     605
     606    def startFactory(self):
     607        if self.started or self.stopped:
     608            raise RuntimeError
     609        self.started = 1
     610
     611    def stopFactory(self):
     612        if not self.started or self.stopped:
     613            raise RuntimeError
     614        self.stopped = 1
     615
     616
     617class FactoryTestCase(unittest.TestCase):
     618    """Tests for factories."""
     619
     620    def test_serverStartStop(self):
     621        """
     622        The factory passed to L{IReactorTCP.listenTCP} should be started only
     623        when it transitions from being used on no ports to being used on one
     624        port and should be stopped only when it transitions from being used on
     625        one port to being used on no ports.
     626        """
     627        # Note - this test doesn't need to use listenTCP.  It is exercising
     628        # logic implemented in Factory.doStart and Factory.doStop, so it could
     629        # just call that directly.  Some other test can make sure that
     630        # listenTCP and stopListening correctly call doStart and
     631        # doStop. -exarkun
     632
     633        f = StartStopFactory()
     634
     635        # listen on port
     636        p1 = reactor.listenTCP(0, f, interface='::1')
     637        self.addCleanup(p1.stopListening)
     638
     639        self.assertEqual((f.started, f.stopped), (1, 0))
     640
     641        # listen on two more ports
     642        p2 = reactor.listenTCP(0, f, interface='::1')
     643        p3 = reactor.listenTCP(0, f, interface='::1')
     644
     645        self.assertEqual((f.started, f.stopped), (1, 0))
     646
     647        # close two ports
     648        d1 = defer.maybeDeferred(p1.stopListening)
     649        d2 = defer.maybeDeferred(p2.stopListening)
     650        closedDeferred = defer.gatherResults([d1, d2])
     651        def cbClosed(ignored):
     652            self.assertEqual((f.started, f.stopped), (1, 0))
     653            # Close the last port
     654            return p3.stopListening()
     655        closedDeferred.addCallback(cbClosed)
     656
     657        def cbClosedAll(ignored):
     658            self.assertEquals((f.started, f.stopped), (1, 1))
     659        closedDeferred.addCallback(cbClosedAll)
     660        return closedDeferred
     661
     662
     663    def test_clientStartStop(self):
     664        """
     665        The factory passed to L{IReactorTCP.connectTCP} should be started when
     666        the connection attempt starts and stopped when it is over.
     667        """
     668        f = ClosingFactory()
     669        p = reactor.listenTCP(0, f, interface="::1")
     670        f.port = p
     671        self.addCleanup(f.cleanUp)
     672        portNumber = p.getHost().port
     673
     674        factory = ClientStartStopFactory()
     675        reactor.connectTCP("::1", portNumber, factory)
     676        self.assertTrue(factory.started)
     677        return loopUntil(lambda: factory.stopped)
     678
     679
     680
     681class ConnectorTestCase(unittest.TestCase):
     682
     683    def test_connectorIdentity(self):
     684        """
     685        L{IReactorTCP.connectTCP} returns an object which provides
     686        L{IConnector}.  The destination of the connector is the address which
     687        was passed to C{connectTCP}.  The same connector object is passed to
     688        the factory's C{startedConnecting} method as to the factory's
     689        C{clientConnectionLost} method.
     690        """
     691        serverFactory = ClosingFactory()
     692        tcpPort = reactor.listenTCP(0, serverFactory, interface="::1")
     693        serverFactory.port = tcpPort
     694        self.addCleanup(serverFactory.cleanUp)
     695        portNumber = tcpPort.getHost().port
     696
     697        seenConnectors = []
     698        seenFailures = []
     699
     700        clientFactory = ClientStartStopFactory()
     701        clientFactory.clientConnectionLost = (
     702            lambda connector, reason: (seenConnectors.append(connector),
     703                                       seenFailures.append(reason)))
     704        clientFactory.startedConnecting = seenConnectors.append
     705
     706        connector = reactor.connectTCP("::1", portNumber, clientFactory)
     707        self.assertTrue(interfaces.IConnector.providedBy(connector))
     708        dest = connector.getDestination()
     709        self.assertEquals(dest.type, "TCP")
     710        self.assertEquals(dest.host, "::1")
     711        self.assertEquals(dest.port, portNumber)
     712
     713        d = loopUntil(lambda: clientFactory.stopped)
     714        def clientFactoryStopped(ignored):
     715            seenFailures[0].trap(error.ConnectionDone)
     716            self.assertEqual(seenConnectors, [connector, connector])
     717        d.addCallback(clientFactoryStopped)
     718        return d
     719
     720
     721    def test_userFail(self):
     722        """
     723        Calling L{IConnector.stopConnecting} in C{Factory.startedConnecting}
     724        results in C{Factory.clientConnectionFailed} being called with
     725        L{error.UserError} as the reason.
     726        """
     727        serverFactory = MyServerFactory()
     728        tcpPort = reactor.listenTCP(0, serverFactory, interface="::1")
     729        self.addCleanup(tcpPort.stopListening)
     730        portNumber = tcpPort.getHost().port
     731
     732        def startedConnecting(connector):
     733            connector.stopConnecting()
     734
     735        clientFactory = ClientStartStopFactory()
     736        clientFactory.startedConnecting = startedConnecting
     737        reactor.connectTCP("::1", portNumber, clientFactory)
     738
     739        d = loopUntil(lambda: clientFactory.stopped)
     740        def check(ignored):
     741            self.assertEquals(clientFactory.failed, 1)
     742            clientFactory.reason.trap(error.UserError)
     743        return d.addCallback(check)
     744
     745
     746    def test_reconnect(self):
     747        """
     748        Calling L{IConnector.connect} in C{Factory.clientConnectionLost} causes
     749        a new connection attempt to be made.
     750        """
     751        serverFactory = ClosingFactory()
     752        tcpPort = reactor.listenTCP(0, serverFactory, interface="::1")
     753        serverFactory.port = tcpPort
     754        self.addCleanup(serverFactory.cleanUp)
     755        portNumber = tcpPort.getHost().port
     756
     757        clientFactory = MyClientFactory()
     758
     759        def clientConnectionLost(connector, reason):
     760            connector.connect()
     761        clientFactory.clientConnectionLost = clientConnectionLost
     762        reactor.connectTCP("::1", portNumber, clientFactory)
     763
     764        d = loopUntil(lambda: clientFactory.failed)
     765        def reconnectFailed(ignored):
     766            p = clientFactory.protocol
     767            self.assertEqual((p.made, p.closed), (1, 1))
     768            clientFactory.reason.trap(error.ConnectionRefusedError)
     769            self.assertEqual(clientFactory.stopped, 1)
     770        return d.addCallback(reconnectFailed)
     771
     772
     773
     774class CannotBindTestCase(unittest.TestCase):
     775    """
     776    Tests for correct behavior when a reactor cannot bind to the required TCP
     777    port.
     778    """
     779
     780    def test_cannotBind(self):
     781        """
     782        L{IReactorTCP.listenTCP} raises L{error.CannotListenError} if the
     783        address to listen on is already in use.
     784        """
     785        f = MyServerFactory()
     786
     787        p1 = reactor.listenTCP(0, f, interface='::1')
     788        self.addCleanup(p1.stopListening)
     789        n = p1.getHost().port
     790        dest = p1.getHost()
     791        self.assertEquals(dest.type, "TCP")
     792        self.assertEquals(dest.host, "::1")
     793        self.assertEquals(dest.port, n)
     794
     795        # make sure new listen raises error
     796        self.assertRaises(error.CannotListenError,
     797                          reactor.listenTCP, n, f, interface='::1')
     798
     799
     800
     801    def _fireWhenDoneFunc(self, d, f):
     802        """Returns closure that when called calls f and then callbacks d.
     803        """
     804        from twisted.python import util as tputil
     805        def newf(*args, **kw):
     806            rtn = f(*args, **kw)
     807            d.callback('')
     808            return rtn
     809        return tputil.mergeFunctionMetadata(f, newf)
     810
     811
     812    def test_clientBind(self):
     813        """
     814        L{IReactorTCP.connectTCP} calls C{Factory.clientConnectionFailed} with
     815        L{error.ConnectBindError} if the bind address specified is already in
     816        use.
     817        """
     818        theDeferred = defer.Deferred()
     819        sf = MyServerFactory()
     820        sf.startFactory = self._fireWhenDoneFunc(theDeferred, sf.startFactory)
     821        p = reactor.listenTCP(0, sf, interface="::1")
     822        self.addCleanup(p.stopListening)
     823
     824        def _connect1(results):
     825            d = defer.Deferred()
     826            cf1 = MyClientFactory()
     827            cf1.buildProtocol = self._fireWhenDoneFunc(d, cf1.buildProtocol)
     828            reactor.connectTCP("::1", p.getHost().port, cf1,
     829                               bindAddress=("::1", 0))
     830            d.addCallback(_conmade, cf1)
     831            return d
     832
     833        def _conmade(results, cf1):
     834            d = defer.Deferred()
     835            cf1.protocol.connectionMade = self._fireWhenDoneFunc(
     836                d, cf1.protocol.connectionMade)
     837            d.addCallback(_check1connect2, cf1)
     838            return d
     839
     840        def _check1connect2(results, cf1):
     841            self.assertEquals(cf1.protocol.made, 1)
     842
     843            d1 = defer.Deferred()
     844            d2 = defer.Deferred()
     845            port = cf1.protocol.transport.getHost().port
     846            cf2 = MyClientFactory()
     847            cf2.clientConnectionFailed = self._fireWhenDoneFunc(
     848                d1, cf2.clientConnectionFailed)
     849            cf2.stopFactory = self._fireWhenDoneFunc(d2, cf2.stopFactory)
     850            reactor.connectTCP("::1", p.getHost().port, cf2,
     851                               bindAddress=("::1", port))
     852            d1.addCallback(_check2failed, cf1, cf2)
     853            d2.addCallback(_check2stopped, cf1, cf2)
     854            dl = defer.DeferredList([d1, d2])
     855            dl.addCallback(_stop, cf1, cf2)
     856            return dl
     857
     858        def _check2failed(results, cf1, cf2):
     859            self.assertEquals(cf2.failed, 1)
     860            cf2.reason.trap(error.ConnectBindError)
     861            self.assertTrue(cf2.reason.check(error.ConnectBindError))
     862            return results
     863
     864        def _check2stopped(results, cf1, cf2):
     865            self.assertEquals(cf2.stopped, 1)
     866            return results
     867
     868        def _stop(results, cf1, cf2):
     869            d = defer.Deferred()
     870            d.addCallback(_check1cleanup, cf1)
     871            cf1.stopFactory = self._fireWhenDoneFunc(d, cf1.stopFactory)
     872            cf1.protocol.transport.loseConnection()
     873            return d
     874
     875        def _check1cleanup(results, cf1):
     876            self.assertEquals(cf1.stopped, 1)
     877
     878        theDeferred.addCallback(_connect1)
     879        return theDeferred
     880
     881
     882
     883class MyOtherClientFactory(protocol.ClientFactory):
     884    def buildProtocol(self, address):
     885        self.address = address
     886        self.protocol = AccumulatingProtocol()
     887        return self.protocol
     888
     889
     890
     891class LocalRemoteAddressTestCase(unittest.TestCase):
     892    """
     893    Tests for correct getHost/getPeer values and that the correct address is
     894    passed to buildProtocol.
     895    """
     896    def test_hostAddress(self):
     897        """
     898        L{IListeningPort.getHost} returns the same address as a client
     899        connection's L{ITCPTransport.getPeer}.
     900        """
     901        serverFactory = MyServerFactory()
     902        serverFactory.protocolConnectionLost = defer.Deferred()
     903        serverConnectionLost = serverFactory.protocolConnectionLost
     904        port = reactor.listenTCP(0, serverFactory, interface='::1')
     905        self.addCleanup(port.stopListening)
     906        n = port.getHost().port
     907
     908        clientFactory = MyClientFactory()
     909        onConnection = clientFactory.protocolConnectionMade = defer.Deferred()
     910        connector = reactor.connectTCP('::1', n, clientFactory)
     911
     912        def check(ignored):
     913            self.assertEquals([port.getHost()], clientFactory.peerAddresses)
     914            self.assertEquals(
     915                port.getHost(), clientFactory.protocol.transport.getPeer())
     916        onConnection.addCallback(check)
     917
     918        def cleanup(ignored):
     919            # Clean up the client explicitly here so that tear down of
     920            # the server side of the connection begins, then wait for
     921            # the server side to actually disconnect.
     922            connector.disconnect()
     923            return serverConnectionLost
     924        onConnection.addCallback(cleanup)
     925
     926        return onConnection
     927
     928
     929
     930class WriterProtocol(protocol.Protocol):
     931    def connectionMade(self):
     932        # use everything ITransport claims to provide. If something here
     933        # fails, the exception will be written to the log, but it will not
     934        # directly flunk the test. The test will fail when maximum number of
     935        # iterations have passed and the writer's factory.done has not yet
     936        # been set.
     937        self.transport.write("Hello Cleveland!\n")
     938        seq = ["Goodbye", " cruel", " world", "\n"]
     939        self.transport.writeSequence(seq)
     940        peer = self.transport.getPeer()
     941        if peer.type != "TCP":
     942            print "getPeer returned non-TCP socket:", peer
     943            self.factory.problem = 1
     944        us = self.transport.getHost()
     945        if us.type != "TCP":
     946            print "getHost returned non-TCP socket:", us
     947            self.factory.problem = 1
     948        self.factory.done = 1
     949
     950        self.transport.loseConnection()
     951
     952class ReaderProtocol(protocol.Protocol):
     953    def dataReceived(self, data):
     954        self.factory.data += data
     955    def connectionLost(self, reason):
     956        self.factory.done = 1
     957
     958class WriterClientFactory(protocol.ClientFactory):
     959    def __init__(self):
     960        self.done = 0
     961        self.data = ""
     962    def buildProtocol(self, addr):
     963        p = ReaderProtocol()
     964        p.factory = self
     965        self.protocol = p
     966        return p
     967
     968class WriteDataTestCase(unittest.TestCase):
     969    """
     970    Test that connected TCP sockets can actually write data. Try to exercise
     971    the entire ITransport interface.
     972    """
     973
     974    def test_writer(self):
     975        """
     976        L{ITCPTransport.write} and L{ITCPTransport.writeSequence} send bytes to
     977        the other end of the connection.
     978        """
     979        f = protocol.Factory()
     980        f.protocol = WriterProtocol
     981        f.done = 0
     982        f.problem = 0
     983        wrappedF = WiredFactory(f)
     984        p = reactor.listenTCP(0, wrappedF, interface="::1")
     985        self.addCleanup(p.stopListening)
     986        n = p.getHost().port
     987        clientF = WriterClientFactory()
     988        wrappedClientF = WiredFactory(clientF)
     989        reactor.connectTCP("::1", n, wrappedClientF)
     990
     991        def check(ignored):
     992            self.failUnless(f.done, "writer didn't finish, it probably died")
     993            self.failUnless(f.problem == 0, "writer indicated an error")
     994            self.failUnless(clientF.done,
     995                            "client didn't see connection dropped")
     996            expected = "".join(["Hello Cleveland!\n",
     997                                "Goodbye", " cruel", " world", "\n"])
     998            self.failUnless(clientF.data == expected,
     999                            "client didn't receive all the data it expected")
     1000        d = defer.gatherResults([wrappedF.onDisconnect,
     1001                                 wrappedClientF.onDisconnect])
     1002        return d.addCallback(check)
     1003
     1004
     1005    def test_writeAfterShutdownWithoutReading(self):
     1006        """
     1007        A TCP transport which is written to after the connection has been shut
     1008        down should notify its protocol that the connection has been lost, even
     1009        if the TCP transport is not actively being monitored for read events
     1010        (ie, pauseProducing was called on it).
     1011        """
     1012        # This is an unpleasant thing.  Generally tests shouldn't skip or
     1013        # run based on the name of the reactor being used (most tests
     1014        # shouldn't care _at all_ what reactor is being used, in fact).  The
     1015        # Gtk reactor cannot pass this test, though, because it fails to
     1016        # implement IReactorTCP entirely correctly.  Gtk is quite old at
     1017        # this point, so it's more likely that gtkreactor will be deprecated
     1018        # and removed rather than fixed to handle this case correctly.
     1019        # Since this is a pre-existing (and very long-standing) issue with
     1020        # the Gtk reactor, there's no reason for it to prevent this test
     1021        # being added to exercise the other reactors, for which the behavior
     1022        # was also untested but at least works correctly (now).  See #2833
     1023        # for information on the status of gtkreactor.
     1024        if reactor.__class__.__name__ == 'IOCPReactor':
     1025            raise unittest.SkipTest(
     1026                "iocpreactor does not, in fact, stop reading immediately after "
     1027                "pauseProducing is called. This results in a bonus disconnection "
     1028                "notification. Under some circumstances, it might be possible to "
     1029                "not receive this notifications (specifically, pauseProducing, "
     1030                "deliver some data, proceed with this test).")
     1031        if reactor.__class__.__name__ == 'GtkReactor':
     1032            raise unittest.SkipTest(
     1033                "gtkreactor does not implement unclean disconnection "
     1034                "notification correctly.  This might more properly be "
     1035                "a todo, but due to technical limitations it cannot be.")
     1036
     1037        # Called back after the protocol for the client side of the connection
     1038        # has paused its transport, preventing it from reading, therefore
     1039        # preventing it from noticing the disconnection before the rest of the
     1040        # actions which are necessary to trigger the case this test is for have
     1041        # been taken.
     1042        clientPaused = defer.Deferred()
     1043
     1044        # Called back when the protocol for the server side of the connection
     1045        # has received connection lost notification.
     1046        serverLost = defer.Deferred()
     1047
     1048        class Disconnecter(protocol.Protocol):
     1049            """
     1050            Protocol for the server side of the connection which disconnects
     1051            itself in a callback on clientPaused and publishes notification
     1052            when its connection is actually lost.
     1053            """
     1054            def connectionMade(self):
     1055                """
     1056                Set up a callback on clientPaused to lose the connection.
     1057                """
     1058                msg('Disconnector.connectionMade')
     1059                def disconnect(ignored):
     1060                    msg('Disconnector.connectionMade disconnect')
     1061                    self.transport.loseConnection()
     1062                    msg('loseConnection called')
     1063                clientPaused.addCallback(disconnect)
     1064
     1065            def connectionLost(self, reason):
     1066                """
     1067                Notify observers that the server side of the connection has
     1068                ended.
     1069                """
     1070                msg('Disconnecter.connectionLost')
     1071                serverLost.callback(None)
     1072                msg('serverLost called back')
     1073
     1074        # Create the server port to which a connection will be made.
     1075        server = protocol.ServerFactory()
     1076        server.protocol = Disconnecter
     1077        port = reactor.listenTCP(0, server, interface='::1')
     1078        self.addCleanup(port.stopListening)
     1079        addr = port.getHost()
     1080
     1081        class Infinite(object):
     1082            """
     1083            A producer which will write to its consumer as long as
     1084            resumeProducing is called.
     1085
     1086            @ivar consumer: The L{IConsumer} which will be written to.
     1087            """
     1088            implements(IPullProducer)
     1089
     1090            def __init__(self, consumer):
     1091                self.consumer = consumer
     1092
     1093            def resumeProducing(self):
     1094                msg('Infinite.resumeProducing')
     1095                self.consumer.write('x')
     1096                msg('Infinite.resumeProducing wrote to consumer')
     1097
     1098            def stopProducing(self):
     1099                msg('Infinite.stopProducing')
     1100
     1101
     1102        class UnreadingWriter(protocol.Protocol):
     1103            """
     1104            Trivial protocol which pauses its transport immediately and then
     1105            writes some bytes to it.
     1106            """
     1107            def connectionMade(self):
     1108                msg('UnreadingWriter.connectionMade')
     1109                self.transport.pauseProducing()
     1110                clientPaused.callback(None)
     1111                msg('clientPaused called back')
     1112                def write(ignored):
     1113                    msg('UnreadingWriter.connectionMade write')
     1114                    # This needs to be enough bytes to spill over into the
     1115                    # userspace Twisted send buffer - if it all fits into
     1116                    # the kernel, Twisted won't even poll for OUT events,
     1117                    # which means it won't poll for any events at all, so
     1118                    # the disconnection is never noticed.  This is due to
     1119                    # #1662.  When #1662 is fixed, this test will likely
     1120                    # need to be adjusted, otherwise connection lost
     1121                    # notification will happen too soon and the test will
     1122                    # probably begin to fail with ConnectionDone instead of
     1123                    # ConnectionLost (in any case, it will no longer be
     1124                    # entirely correct).
     1125                    producer = Infinite(self.transport)
     1126                    msg('UnreadingWriter.connectionMade write created producer')
     1127                    self.transport.registerProducer(producer, False)
     1128                    msg('UnreadingWriter.connectionMade write registered producer')
     1129                serverLost.addCallback(write)
     1130
     1131        # Create the client and initiate the connection
     1132        client = MyClientFactory()
     1133        client.protocolFactory = UnreadingWriter
     1134        clientConnectionLost = client.deferred
     1135        def cbClientLost(ignored):
     1136            msg('cbClientLost')
     1137            return client.lostReason
     1138        clientConnectionLost.addCallback(cbClientLost)
     1139        msg('Connecting to %s:%s' % (addr.host, addr.port))
     1140        reactor.connectTCP(addr.host, addr.port, client)
     1141
     1142        # By the end of the test, the client should have received notification
     1143        # of unclean disconnection.
     1144        msg('Returning Deferred')
     1145        return self.assertFailure(clientConnectionLost, error.ConnectionLost)
     1146
     1147
     1148
     1149class ConnectionLosingProtocol(protocol.Protocol):
     1150    def connectionMade(self):
     1151        self.transport.write("1")
     1152        self.transport.loseConnection()
     1153        self.master._connectionMade()
     1154        self.master.ports.append(self.transport)
     1155
     1156
     1157
     1158class NoopProtocol(protocol.Protocol):
     1159    def connectionMade(self):
     1160        self.d = defer.Deferred()
     1161        self.master.serverConns.append(self.d)
     1162
     1163    def connectionLost(self, reason):
     1164        self.d.callback(True)
     1165
     1166
     1167
     1168class ConnectionLostNotifyingProtocol(protocol.Protocol):
     1169    """
     1170    Protocol which fires a Deferred which was previously passed to
     1171    its initializer when the connection is lost.
     1172
     1173    @ivar onConnectionLost: The L{Deferred} which will be fired in
     1174        C{connectionLost}.
     1175
     1176    @ivar lostConnectionReason: C{None} until the connection is lost, then a
     1177        reference to the reason passed to C{connectionLost}.
     1178    """
     1179    def __init__(self, onConnectionLost):
     1180        self.lostConnectionReason = None
     1181        self.onConnectionLost = onConnectionLost
     1182
     1183
     1184    def connectionLost(self, reason):
     1185        self.lostConnectionReason = reason
     1186        self.onConnectionLost.callback(self)
     1187
     1188
     1189
     1190class HandleSavingProtocol(ConnectionLostNotifyingProtocol):
     1191    """
     1192    Protocol which grabs the platform-specific socket handle and
     1193    saves it as an attribute on itself when the connection is
     1194    established.
     1195    """
     1196    def makeConnection(self, transport):
     1197        """
     1198        Save the platform-specific socket handle for future
     1199        introspection.
     1200        """
     1201        self.handle = transport.getHandle()
     1202        return protocol.Protocol.makeConnection(self, transport)
     1203
     1204
     1205
     1206class ProperlyCloseFilesMixin:
     1207    """
     1208    Tests for platform resources properly being cleaned up.
     1209    """
     1210    def createServer(self, address, portNumber, factory):
     1211        """
     1212        Bind a server port to which connections will be made.  The server
     1213        should use the given protocol factory.
     1214
     1215        @return: The L{IListeningPort} for the server created.
     1216        """
     1217        raise NotImplementedError()
     1218
     1219
     1220    def connectClient(self, address, portNumber, clientCreator):
     1221        """
     1222        Establish a connection to the given address using the given
     1223        L{ClientCreator} instance.
     1224
     1225        @return: A Deferred which will fire with the connected protocol instance.
     1226        """
     1227        raise NotImplementedError()
     1228
     1229
     1230    def getHandleExceptionType(self):
     1231        """
     1232        Return the exception class which will be raised when an operation is
     1233        attempted on a closed platform handle.
     1234        """
     1235        raise NotImplementedError()
     1236
     1237
     1238    def getHandleErrorCode(self):
     1239        """
     1240        Return the errno expected to result from writing to a closed
     1241        platform socket handle.
     1242        """
     1243        # These platforms have been seen to give EBADF:
     1244        #
     1245        #  Linux 2.4.26, Linux 2.6.15, OS X 10.4, FreeBSD 5.4
     1246        #  Windows 2000 SP 4, Windows XP SP 2
     1247        return errno.EBADF
     1248
     1249
     1250    def test_properlyCloseFiles(self):
     1251        """
     1252        Test that lost connections properly have their underlying socket
     1253        resources cleaned up.
     1254        """
     1255        onServerConnectionLost = defer.Deferred()
     1256        serverFactory = protocol.ServerFactory()
     1257        serverFactory.protocol = lambda: ConnectionLostNotifyingProtocol(
     1258            onServerConnectionLost)
     1259        serverPort = self.createServer('::1', 0, serverFactory)
     1260
     1261        onClientConnectionLost = defer.Deferred()
     1262        serverAddr = serverPort.getHost()
     1263        clientCreator = protocol.ClientCreator(
     1264            reactor, lambda: HandleSavingProtocol(onClientConnectionLost))
     1265        clientDeferred = self.connectClient(
     1266            serverAddr.host, serverAddr.port, clientCreator)
     1267
     1268        def clientConnected(client):
     1269            """
     1270            Disconnect the client.  Return a Deferred which fires when both
     1271            the client and the server have received disconnect notification.
     1272            """
     1273            client.transport.write(
     1274                'some bytes to make sure the connection is set up')
     1275            client.transport.loseConnection()
     1276            return defer.gatherResults([
     1277                onClientConnectionLost, onServerConnectionLost])
     1278        clientDeferred.addCallback(clientConnected)
     1279
     1280        def clientDisconnected((client, server)):
     1281            """
     1282            Verify that the underlying platform socket handle has been
     1283            cleaned up.
     1284            """
     1285            client.lostConnectionReason.trap(error.ConnectionClosed)
     1286            server.lostConnectionReason.trap(error.ConnectionClosed)
     1287            expectedErrorCode = self.getHandleErrorCode()
     1288            err = self.assertRaises(
     1289                self.getHandleExceptionType(), client.handle.send, 'bytes')
     1290            self.assertEqual(err.args[0], expectedErrorCode)
     1291        clientDeferred.addCallback(clientDisconnected)
     1292
     1293        def cleanup(passthrough):
     1294            """
     1295            Shut down the server port.  Return a Deferred which fires when
     1296            this has completed.
     1297            """
     1298            result = defer.maybeDeferred(serverPort.stopListening)
     1299            result.addCallback(lambda ign: passthrough)
     1300            return result
     1301        clientDeferred.addBoth(cleanup)
     1302
     1303        return clientDeferred
     1304
     1305
     1306
     1307class ProperlyCloseFilesTestCase(unittest.TestCase, ProperlyCloseFilesMixin):
     1308    """
     1309    Test that the sockets created by L{IReactorTCP.connectTCP} are cleaned up
     1310    when the connection they are associated with is closed.
     1311    """
     1312    def createServer(self, address, portNumber, factory):
     1313        """
     1314        Create a TCP server using L{IReactorTCP.listenTCP}.
     1315        """
     1316        return reactor.listenTCP(portNumber, factory, interface=address)
     1317
     1318
     1319    def connectClient(self, address, portNumber, clientCreator):
     1320        """
     1321        Create a TCP client using L{IReactorTCP.connectTCP}.
     1322        """
     1323        return clientCreator.connectTCP(address, portNumber)
     1324
     1325
     1326    def getHandleExceptionType(self):
     1327        """
     1328        Return L{socket.error} as the expected error type which will be
     1329        raised by a write to the low-level socket object after it has been
     1330        closed.
     1331        """
     1332        return socket.error
     1333
     1334
     1335
     1336class WiredForDeferreds(policies.ProtocolWrapper):
     1337    def __init__(self, factory, wrappedProtocol):
     1338        policies.ProtocolWrapper.__init__(self, factory, wrappedProtocol)
     1339
     1340    def connectionMade(self):
     1341        policies.ProtocolWrapper.connectionMade(self)
     1342        self.factory.onConnect.callback(None)
     1343
     1344    def connectionLost(self, reason):
     1345        policies.ProtocolWrapper.connectionLost(self, reason)
     1346        self.factory.onDisconnect.callback(None)
     1347
     1348
     1349
     1350class WiredFactory(policies.WrappingFactory):
     1351    protocol = WiredForDeferreds
     1352
     1353    def __init__(self, wrappedFactory):
     1354        policies.WrappingFactory.__init__(self, wrappedFactory)
     1355        self.onConnect = defer.Deferred()
     1356        self.onDisconnect = defer.Deferred()
     1357
     1358
     1359
     1360class AddressTestCase(unittest.TestCase):
     1361    """
     1362    Tests for address-related interactions with client and server protocols.
     1363    """
     1364    def setUp(self):
     1365        """
     1366        Create a port and connected client/server pair which can be used
     1367        to test factory behavior related to addresses.
     1368
     1369        @return: A L{defer.Deferred} which will be called back when both the
     1370            client and server protocols have received their connection made
     1371            callback.
     1372        """
     1373        class RememberingWrapper(protocol.ClientFactory):
     1374            """
     1375            Simple wrapper factory which records the addresses which are
     1376            passed to its L{buildProtocol} method and delegates actual
     1377            protocol creation to another factory.
     1378
     1379            @ivar addresses: A list of the objects passed to buildProtocol.
     1380            @ivar factory: The wrapped factory to which protocol creation is
     1381                delegated.
     1382            """
     1383            def __init__(self, factory):
     1384                self.addresses = []
     1385                self.factory = factory
     1386
     1387            # Only bother to pass on buildProtocol calls to the wrapped
     1388            # factory - doStart, doStop, etc aren't necessary for this test
     1389            # to pass.
     1390            def buildProtocol(self, addr):
     1391                """
     1392                Append the given address to C{self.addresses} and forward
     1393                the call to C{self.factory}.
     1394                """
     1395                self.addresses.append(addr)
     1396                return self.factory.buildProtocol(addr)
     1397
     1398        # Make a server which we can receive connection and disconnection
     1399        # notification for, and which will record the address passed to its
     1400        # buildProtocol.
     1401        self.server = MyServerFactory()
     1402        self.serverConnMade = self.server.protocolConnectionMade = defer.Deferred()
     1403        self.serverConnLost = self.server.protocolConnectionLost = defer.Deferred()
     1404        # RememberingWrapper is a ClientFactory, but ClientFactory is-a
     1405        # ServerFactory, so this is okay.
     1406        self.serverWrapper = RememberingWrapper(self.server)
     1407
     1408        # Do something similar for a client.
     1409        self.client = MyClientFactory()
     1410        self.clientConnMade = self.client.protocolConnectionMade = defer.Deferred()
     1411        self.clientConnLost = self.client.protocolConnectionLost = defer.Deferred()
     1412        self.clientWrapper = RememberingWrapper(self.client)
     1413
     1414        self.port = reactor.listenTCP(0, self.serverWrapper, interface='::1')
     1415        self.connector = reactor.connectTCP(
     1416            self.port.getHost().host, self.port.getHost().port, self.clientWrapper)
     1417
     1418        return defer.gatherResults([self.serverConnMade, self.clientConnMade])
     1419
     1420
     1421    def tearDown(self):
     1422        """
     1423        Disconnect the client/server pair and shutdown the port created in
     1424        L{setUp}.
     1425        """
     1426        self.connector.disconnect()
     1427        return defer.gatherResults([
     1428            self.serverConnLost, self.clientConnLost,
     1429            defer.maybeDeferred(self.port.stopListening)])
     1430
     1431
     1432    def test_buildProtocolClient(self):
     1433        """
     1434        L{ClientFactory.buildProtocol} should be invoked with the address of
     1435        the server to which a connection has been established, which should
     1436        be the same as the address reported by the C{getHost} method of the
     1437        transport of the server protocol and as the C{getPeer} method of the
     1438        transport of the client protocol.
     1439        """
     1440        serverHost = self.server.protocol.transport.getHost()
     1441        clientPeer = self.client.protocol.transport.getPeer()
     1442
     1443        self.assertEqual(
     1444            self.clientWrapper.addresses,
     1445            [IPv6Address('TCP', serverHost.host, serverHost.port)])
     1446        self.assertEqual(
     1447            self.clientWrapper.addresses,
     1448            [IPv6Address('TCP', clientPeer.host, clientPeer.port)])
     1449
     1450
     1451
     1452class LargeBufferWriterProtocol(protocol.Protocol):
     1453
     1454    # Win32 sockets cannot handle single huge chunks of bytes.  Write one
     1455    # massive string to make sure Twisted deals with this fact.
     1456
     1457    def connectionMade(self):
     1458        # write 60MB
     1459        self.transport.write('X'*self.factory.len)
     1460        self.factory.done = 1
     1461        self.transport.loseConnection()
     1462
     1463class LargeBufferReaderProtocol(protocol.Protocol):
     1464    def dataReceived(self, data):
     1465        self.factory.len += len(data)
     1466    def connectionLost(self, reason):
     1467        self.factory.done = 1
     1468
     1469class LargeBufferReaderClientFactory(protocol.ClientFactory):
     1470    def __init__(self):
     1471        self.done = 0
     1472        self.len = 0
     1473    def buildProtocol(self, addr):
     1474        p = LargeBufferReaderProtocol()
     1475        p.factory = self
     1476        self.protocol = p
     1477        return p
     1478
     1479
     1480class FireOnClose(policies.ProtocolWrapper):
     1481    """A wrapper around a protocol that makes it fire a deferred when
     1482    connectionLost is called.
     1483    """
     1484    def connectionLost(self, reason):
     1485        policies.ProtocolWrapper.connectionLost(self, reason)
     1486        self.factory.deferred.callback(None)
     1487
     1488
     1489class FireOnCloseFactory(policies.WrappingFactory):
     1490    protocol = FireOnClose
     1491
     1492    def __init__(self, wrappedFactory):
     1493        policies.WrappingFactory.__init__(self, wrappedFactory)
     1494        self.deferred = defer.Deferred()
     1495
     1496
     1497class LargeBufferTestCase(unittest.TestCase):
     1498    """Test that buffering large amounts of data works.
     1499    """
     1500
     1501    datalen = 60*1024*1024
     1502    def testWriter(self):
     1503        f = protocol.Factory()
     1504        f.protocol = LargeBufferWriterProtocol
     1505        f.done = 0
     1506        f.problem = 0
     1507        f.len = self.datalen
     1508        wrappedF = FireOnCloseFactory(f)
     1509        p = reactor.listenTCP(0, wrappedF, interface="::1")
     1510        self.addCleanup(p.stopListening)
     1511        n = p.getHost().port
     1512        clientF = LargeBufferReaderClientFactory()
     1513        wrappedClientF = FireOnCloseFactory(clientF)
     1514        reactor.connectTCP("::1", n, wrappedClientF)
     1515
     1516        d = defer.gatherResults([wrappedF.deferred, wrappedClientF.deferred])
     1517        def check(ignored):
     1518            self.failUnless(f.done, "writer didn't finish, it probably died")
     1519            self.failUnless(clientF.len == self.datalen,
     1520                            "client didn't receive all the data it expected "
     1521                            "(%d != %d)" % (clientF.len, self.datalen))
     1522            self.failUnless(clientF.done,
     1523                            "client didn't see connection dropped")
     1524        return d.addCallback(check)
     1525
     1526
     1527class MyHCProtocol(AccumulatingProtocol):
     1528
     1529    implements(IHalfCloseableProtocol)
     1530
     1531    readHalfClosed = False
     1532    writeHalfClosed = False
     1533
     1534    def readConnectionLost(self):
     1535        self.readHalfClosed = True
     1536        # Invoke notification logic from the base class to simplify testing.
     1537        if self.writeHalfClosed:
     1538            self.connectionLost(None)
     1539
     1540    def writeConnectionLost(self):
     1541        self.writeHalfClosed = True
     1542        # Invoke notification logic from the base class to simplify testing.
     1543        if self.readHalfClosed:
     1544            self.connectionLost(None)
     1545
     1546
     1547class MyHCFactory(protocol.ServerFactory):
     1548
     1549    called = 0
     1550    protocolConnectionMade = None
     1551
     1552    def buildProtocol(self, addr):
     1553        self.called += 1
     1554        p = MyHCProtocol()
     1555        p.factory = self
     1556        self.protocol = p
     1557        return p
     1558
     1559
     1560class HalfCloseTestCase(unittest.TestCase):
     1561    """Test half-closing connections."""
     1562
     1563    def setUp(self):
     1564        self.f = f = MyHCFactory()
     1565        self.p = p = reactor.listenTCP(0, f, interface="::1")
     1566        self.addCleanup(p.stopListening)
     1567        d = loopUntil(lambda :p.connected)
     1568
     1569        self.cf = protocol.ClientCreator(reactor, MyHCProtocol)
     1570
     1571        d.addCallback(lambda _: self.cf.connectTCP(p.getHost().host,
     1572                                                   p.getHost().port))
     1573        d.addCallback(self._setUp)
     1574        return d
     1575
     1576    def _setUp(self, client):
     1577        self.client = client
     1578        self.clientProtoConnectionLost = self.client.closedDeferred = defer.Deferred()
     1579        self.assertEquals(self.client.transport.connected, 1)
     1580        # Wait for the server to notice there is a connection, too.
     1581        return loopUntil(lambda: getattr(self.f, 'protocol', None) is not None)
     1582
     1583    def tearDown(self):
     1584        self.assertEquals(self.client.closed, 0)
     1585        self.client.transport.loseConnection()
     1586        d = defer.maybeDeferred(self.p.stopListening)
     1587        d.addCallback(lambda ign: self.clientProtoConnectionLost)
     1588        d.addCallback(self._tearDown)
     1589        return d
     1590
     1591    def _tearDown(self, ignored):
     1592        self.assertEquals(self.client.closed, 1)
     1593        # because we did half-close, the server also needs to
     1594        # closed explicitly.
     1595        self.assertEquals(self.f.protocol.closed, 0)
     1596        d = defer.Deferred()
     1597        def _connectionLost(reason):
     1598            self.f.protocol.closed = 1
     1599            d.callback(None)
     1600        self.f.protocol.connectionLost = _connectionLost
     1601        self.f.protocol.transport.loseConnection()
     1602        d.addCallback(lambda x:self.assertEquals(self.f.protocol.closed, 1))
     1603        return d
     1604
     1605    def testCloseWriteCloser(self):
     1606        client = self.client
     1607        f = self.f
     1608        t = client.transport
     1609
     1610        t.write("hello")
     1611        d = loopUntil(lambda :len(t._tempDataBuffer) == 0)
     1612        def loseWrite(ignored):
     1613            t.loseWriteConnection()
     1614            return loopUntil(lambda :t._writeDisconnected)
     1615        def check(ignored):
     1616            self.assertEquals(client.closed, False)
     1617            self.assertEquals(client.writeHalfClosed, True)
     1618            self.assertEquals(client.readHalfClosed, False)
     1619            return loopUntil(lambda :f.protocol.readHalfClosed)
     1620        def write(ignored):
     1621            w = client.transport.write
     1622            w(" world")
     1623            w("lalala fooled you")
     1624            self.assertEquals(0, len(client.transport._tempDataBuffer))
     1625            self.assertEquals(f.protocol.data, "hello")
     1626            self.assertEquals(f.protocol.closed, False)
     1627            self.assertEquals(f.protocol.readHalfClosed, True)
     1628        return d.addCallback(loseWrite).addCallback(check).addCallback(write)
     1629
     1630    def testWriteCloseNotification(self):
     1631        f = self.f
     1632        f.protocol.transport.loseWriteConnection()
     1633
     1634        d = defer.gatherResults([
     1635            loopUntil(lambda :f.protocol.writeHalfClosed),
     1636            loopUntil(lambda :self.client.readHalfClosed)])
     1637        d.addCallback(lambda _: self.assertEquals(
     1638            f.protocol.readHalfClosed, False))
     1639        return d
     1640
     1641
     1642class HalfClose2TestCase(unittest.TestCase):
     1643
     1644    def setUp(self):
     1645        self.f = f = MyServerFactory()
     1646        self.f.protocolConnectionMade = defer.Deferred()
     1647        self.p = p = reactor.listenTCP(0, f, interface="::1")
     1648
     1649        # XXX we don't test server side yet since we don't do it yet
     1650        d = protocol.ClientCreator(reactor, AccumulatingProtocol).connectTCP(
     1651            p.getHost().host, p.getHost().port)
     1652        d.addCallback(self._gotClient)
     1653        return d
     1654
     1655    def _gotClient(self, client):
     1656        self.client = client
     1657        # Now wait for the server to catch up - it doesn't matter if this
     1658        # Deferred has already fired and gone away, in that case we'll
     1659        # return None and not wait at all, which is precisely correct.
     1660        return self.f.protocolConnectionMade
     1661
     1662    def tearDown(self):
     1663        self.client.transport.loseConnection()
     1664        return self.p.stopListening()
     1665
     1666    def testNoNotification(self):
     1667        """
     1668        TCP protocols support half-close connections, but not all of them
     1669        support being notified of write closes.  In this case, test that
     1670        half-closing the connection causes the peer's connection to be
     1671        closed.
     1672        """
     1673        self.client.transport.write("hello")
     1674        self.client.transport.loseWriteConnection()
     1675        self.f.protocol.closedDeferred = d = defer.Deferred()
     1676        self.client.closedDeferred = d2 = defer.Deferred()
     1677        d.addCallback(lambda x:
     1678                      self.assertEqual(self.f.protocol.data, 'hello'))
     1679        d.addCallback(lambda x: self.assertEqual(self.f.protocol.closed, True))
     1680        return defer.gatherResults([d, d2])
     1681
     1682    def testShutdownException(self):
     1683        """
     1684        If the other side has already closed its connection,
     1685        loseWriteConnection should pass silently.
     1686        """
     1687        self.f.protocol.transport.loseConnection()
     1688        self.client.transport.write("X")
     1689        self.client.transport.loseWriteConnection()
     1690        self.f.protocol.closedDeferred = d = defer.Deferred()
     1691        self.client.closedDeferred = d2 = defer.Deferred()
     1692        d.addCallback(lambda x:
     1693                      self.failUnlessEqual(self.f.protocol.closed, True))
     1694        return defer.gatherResults([d, d2])
     1695
     1696
     1697class HalfCloseBuggyApplicationTests(unittest.TestCase):
     1698    """
     1699    Test half-closing connections where notification code has bugs.
     1700    """
     1701
     1702    def setUp(self):
     1703        """
     1704        Set up a server and connect a client to it.  Return a Deferred which
     1705        only fires once this is done.
     1706        """
     1707        self.serverFactory = MyHCFactory()
     1708        self.serverFactory.protocolConnectionMade = defer.Deferred()
     1709        self.port = reactor.listenTCP(
     1710            0, self.serverFactory, interface="::1")
     1711        self.addCleanup(self.port.stopListening)
     1712        addr = self.port.getHost()
     1713        creator = protocol.ClientCreator(reactor, MyHCProtocol)
     1714        clientDeferred = creator.connectTCP(addr.host, addr.port)
     1715        def setClient(clientProtocol):
     1716            self.clientProtocol = clientProtocol
     1717        clientDeferred.addCallback(setClient)
     1718        return defer.gatherResults([
     1719            self.serverFactory.protocolConnectionMade,
     1720            clientDeferred])
     1721
     1722
     1723    def aBug(self, *args):
     1724        """
     1725        Fake implementation of a callback which illegally raises an
     1726        exception.
     1727        """
     1728        raise RuntimeError("ONO I AM BUGGY CODE")
     1729
     1730
     1731    def _notificationRaisesTest(self):
     1732        """
     1733        Helper for testing that an exception is logged by the time the
     1734        client protocol loses its connection.
     1735        """
     1736        closed = self.clientProtocol.closedDeferred = defer.Deferred()
     1737        self.clientProtocol.transport.loseWriteConnection()
     1738        def check(ignored):
     1739            errors = self.flushLoggedErrors(RuntimeError)
     1740            self.assertEqual(len(errors), 1)
     1741        closed.addCallback(check)
     1742        return closed
     1743
     1744
     1745    def test_readNotificationRaises(self):
     1746        """
     1747        If C{readConnectionLost} raises an exception when the transport
     1748        calls it to notify the protocol of that event, the exception should
     1749        be logged and the protocol should be disconnected completely.
     1750        """
     1751        self.serverFactory.protocol.readConnectionLost = self.aBug
     1752        return self._notificationRaisesTest()
     1753
     1754
     1755    def test_writeNotificationRaises(self):
     1756        """
     1757        If C{writeConnectionLost} raises an exception when the transport
     1758        calls it to notify the protocol of that event, the exception should
     1759        be logged and the protocol should be disconnected completely.
     1760        """
     1761        self.clientProtocol.writeConnectionLost = self.aBug
     1762        return self._notificationRaisesTest()
     1763
     1764
     1765
     1766class LogTestCase(unittest.TestCase):
     1767    """
     1768    Test logging facility of TCP base classes.
     1769    """
     1770
     1771    def test_logstrClientSetup(self):
     1772        """
     1773        Check that the log customization of the client transport happens
     1774        once the client is connected.
     1775        """
     1776        server = MyServerFactory()
     1777
     1778        client = MyClientFactory()
     1779        client.protocolConnectionMade = defer.Deferred()
     1780
     1781        port = reactor.listenTCP(0, server, interface='::1')
     1782        self.addCleanup(port.stopListening)
     1783
     1784        connector = reactor.connectTCP(
     1785            port.getHost().host, port.getHost().port, client)
     1786        self.addCleanup(connector.disconnect)
     1787
     1788        # It should still have the default value
     1789        self.assertEquals(connector.transport.logstr,
     1790                          "Uninitialized")
     1791
     1792        def cb(ign):
     1793            self.assertEquals(connector.transport.logstr,
     1794                              "AccumulatingProtocol,client")
     1795        client.protocolConnectionMade.addCallback(cb)
     1796        return client.protocolConnectionMade
     1797
     1798
     1799
     1800class PauseProducingTestCase(unittest.TestCase):
     1801    """
     1802    Test some behaviors of pausing the production of a transport.
     1803    """
     1804
     1805    def test_pauseProducingInConnectionMade(self):
     1806        """
     1807        In C{connectionMade} of a client protocol, C{pauseProducing} used to be
     1808        ignored: this test is here to ensure it's not ignored.
     1809        """
     1810        server = MyServerFactory()
     1811
     1812        client = MyClientFactory()
     1813        client.protocolConnectionMade = defer.Deferred()
     1814
     1815        port = reactor.listenTCP(0, server, interface='::1')
     1816        self.addCleanup(port.stopListening)
     1817
     1818        connector = reactor.connectTCP(
     1819            port.getHost().host, port.getHost().port, client)
     1820        self.addCleanup(connector.disconnect)
     1821
     1822        def checkInConnectionMade(proto):
     1823            tr = proto.transport
     1824            # The transport should already be monitored
     1825            self.assertIn(tr, reactor.getReaders() +
     1826                              reactor.getWriters())
     1827            proto.transport.pauseProducing()
     1828            self.assertNotIn(tr, reactor.getReaders() +
     1829                                 reactor.getWriters())
     1830            d = defer.Deferred()
     1831            d.addCallback(checkAfterConnectionMade)
     1832            reactor.callLater(0, d.callback, proto)
     1833            return d
     1834        def checkAfterConnectionMade(proto):
     1835            tr = proto.transport
     1836            # The transport should still not be monitored
     1837            self.assertNotIn(tr, reactor.getReaders() +
     1838                                 reactor.getWriters())
     1839        client.protocolConnectionMade.addCallback(checkInConnectionMade)
     1840        return client.protocolConnectionMade
     1841
     1842    if not interfaces.IReactorFDSet.providedBy(reactor):
     1843        test_pauseProducingInConnectionMade.skip = "Reactor not providing IReactorFDSet"
     1844
     1845
     1846
     1847class CallBackOrderTestCase(unittest.TestCase):
     1848    """
     1849    Test the order of reactor callbacks
     1850    """
     1851
     1852    def test_loseOrder(self):
     1853        """
     1854        Check that Protocol.connectionLost is called before factory's
     1855        clientConnectionLost
     1856        """
     1857        server = MyServerFactory()
     1858        server.protocolConnectionMade = (defer.Deferred()
     1859                .addCallback(lambda proto: self.addCleanup(
     1860                             proto.transport.loseConnection)))
     1861
     1862        client = MyClientFactory()
     1863        client.protocolConnectionLost = defer.Deferred()
     1864        client.protocolConnectionMade = defer.Deferred()
     1865
     1866        def _cbCM(res):
     1867            """
     1868            protocol.connectionMade callback
     1869            """
     1870            reactor.callLater(0, client.protocol.transport.loseConnection)
     1871
     1872        client.protocolConnectionMade.addCallback(_cbCM)
     1873
     1874        port = reactor.listenTCP(0, server, interface='::1')
     1875        self.addCleanup(port.stopListening)
     1876
     1877        connector = reactor.connectTCP(
     1878            port.getHost().host, port.getHost().port, client)
     1879        self.addCleanup(connector.disconnect)
     1880
     1881        def _cbCCL(res):
     1882            """
     1883            factory.clientConnectionLost callback
     1884            """
     1885            return 'CCL'
     1886
     1887        def _cbCL(res):
     1888            """
     1889            protocol.connectionLost callback
     1890            """
     1891            return 'CL'
     1892
     1893        def _cbGather(res):
     1894            self.assertEquals(res, ['CL', 'CCL'])
     1895
     1896        d = defer.gatherResults([
     1897                client.protocolConnectionLost.addCallback(_cbCL),
     1898                client.deferred.addCallback(_cbCCL)])
     1899        return d.addCallback(_cbGather)
     1900
     1901
     1902
     1903try:
     1904    import resource
     1905except ImportError:
     1906    pass
     1907else:
     1908    numRounds = resource.getrlimit(resource.RLIMIT_NOFILE)[0] + 10
     1909    ProperlyCloseFilesTestCase.numberRounds = numRounds