| | 1 | # Copyright (c) Twisted Matrix Laboratories. |
| | 2 | # See LICENSE for details. |
| | 3 | |
| | 4 | """ |
| | 5 | Tests for implementations of L{IReactorTCP} using IPv6. |
| | 6 | """ |
| | 7 | |
| | 8 | import socket, random, errno |
| | 9 | |
| | 10 | from zope.interface import implements |
| | 11 | |
| | 12 | from twisted.trial import unittest |
| | 13 | |
| | 14 | from twisted.python.log import msg |
| | 15 | from twisted.internet import protocol, reactor, defer, interfaces |
| | 16 | from twisted.internet import error |
| | 17 | from twisted.internet.address import IPv6Address |
| | 18 | from twisted.internet.interfaces import IHalfCloseableProtocol, IPullProducer |
| | 19 | from twisted.protocols import policies |
| | 20 | from twisted.test.proto_helpers import AccumulatingProtocol |
| | 21 | |
| | 22 | |
| | 23 | def loopUntil(predicate, interval=0): |
| | 24 | """ |
| | 25 | Poor excuse for an event notification helper. This polls a condition and |
| | 26 | calls back a Deferred when it is seen to be true. |
| | 27 | |
| | 28 | Do not use this function. |
| | 29 | """ |
| | 30 | from twisted.internet import task |
| | 31 | d = defer.Deferred() |
| | 32 | def check(): |
| | 33 | res = predicate() |
| | 34 | if res: |
| | 35 | d.callback(res) |
| | 36 | call = task.LoopingCall(check) |
| | 37 | def stop(result): |
| | 38 | call.stop() |
| | 39 | return result |
| | 40 | d.addCallback(stop) |
| | 41 | d2 = call.start(interval) |
| | 42 | d2.addErrback(d.errback) |
| | 43 | return d |
| | 44 | |
| | 45 | |
| | 46 | |
| | 47 | class ClosingProtocol(protocol.Protocol): |
| | 48 | |
| | 49 | def connectionMade(self): |
| | 50 | msg("ClosingProtocol.connectionMade") |
| | 51 | self.transport.loseConnection() |
| | 52 | |
| | 53 | def connectionLost(self, reason): |
| | 54 | msg("ClosingProtocol.connectionLost") |
| | 55 | reason.trap(error.ConnectionDone) |
| | 56 | |
| | 57 | |
| | 58 | |
| | 59 | class ClosingFactory(protocol.ServerFactory): |
| | 60 | """ |
| | 61 | Factory that closes port immediately. |
| | 62 | """ |
| | 63 | |
| | 64 | _cleanerUpper = None |
| | 65 | |
| | 66 | def buildProtocol(self, conn): |
| | 67 | self._cleanerUpper = self.port.stopListening() |
| | 68 | return ClosingProtocol() |
| | 69 | |
| | 70 | |
| | 71 | def cleanUp(self): |
| | 72 | """ |
| | 73 | Clean-up for tests to wait for the port to stop listening. |
| | 74 | """ |
| | 75 | if self._cleanerUpper is None: |
| | 76 | return self.port.stopListening() |
| | 77 | return self._cleanerUpper |
| | 78 | |
| | 79 | |
| | 80 | |
| | 81 | class MyProtocolFactoryMixin(object): |
| | 82 | """ |
| | 83 | Mixin for factories which create L{AccumulatingProtocol} instances. |
| | 84 | |
| | 85 | @type protocolFactory: no-argument callable |
| | 86 | @ivar protocolFactory: Factory for protocols - takes the place of the |
| | 87 | typical C{protocol} attribute of factories (but that name is used by |
| | 88 | this class for something else). |
| | 89 | |
| | 90 | @type protocolConnectionMade: L{NoneType} or L{defer.Deferred} |
| | 91 | @ivar protocolConnectionMade: When an instance of L{AccumulatingProtocol} |
| | 92 | is connected, if this is not C{None}, the L{Deferred} will be called |
| | 93 | back with the protocol instance and the attribute set to C{None}. |
| | 94 | |
| | 95 | @type protocolConnectionLost: L{NoneType} or L{defer.Deferred} |
| | 96 | @ivar protocolConnectionLost: When an instance of L{AccumulatingProtocol} |
| | 97 | is created, this will be set as its C{closedDeferred} attribute and |
| | 98 | then this attribute will be set to C{None} so the L{defer.Deferred} is |
| | 99 | not used by more than one protocol. |
| | 100 | |
| | 101 | @ivar protocol: The most recently created L{AccumulatingProtocol} instance |
| | 102 | which was returned from C{buildProtocol}. |
| | 103 | |
| | 104 | @type called: C{int} |
| | 105 | @ivar called: A counter which is incremented each time C{buildProtocol} |
| | 106 | is called. |
| | 107 | |
| | 108 | @ivar peerAddresses: A C{list} of the addresses passed to C{buildProtocol}. |
| | 109 | """ |
| | 110 | protocolFactory = AccumulatingProtocol |
| | 111 | |
| | 112 | protocolConnectionMade = None |
| | 113 | protocolConnectionLost = None |
| | 114 | protocol = None |
| | 115 | called = 0 |
| | 116 | |
| | 117 | def __init__(self): |
| | 118 | self.peerAddresses = [] |
| | 119 | |
| | 120 | |
| | 121 | def buildProtocol(self, addr): |
| | 122 | """ |
| | 123 | Create a L{AccumulatingProtocol} and set it up to be able to perform |
| | 124 | callbacks. |
| | 125 | """ |
| | 126 | self.peerAddresses.append(addr) |
| | 127 | self.called += 1 |
| | 128 | p = self.protocolFactory() |
| | 129 | p.factory = self |
| | 130 | p.closedDeferred = self.protocolConnectionLost |
| | 131 | self.protocolConnectionLost = None |
| | 132 | self.protocol = p |
| | 133 | return p |
| | 134 | |
| | 135 | |
| | 136 | |
| | 137 | class MyServerFactory(MyProtocolFactoryMixin, protocol.ServerFactory): |
| | 138 | """ |
| | 139 | Server factory which creates L{AccumulatingProtocol} instances. |
| | 140 | """ |
| | 141 | |
| | 142 | |
| | 143 | |
| | 144 | class MyClientFactory(MyProtocolFactoryMixin, protocol.ClientFactory): |
| | 145 | """ |
| | 146 | Client factory which creates L{AccumulatingProtocol} instances. |
| | 147 | """ |
| | 148 | failed = 0 |
| | 149 | stopped = 0 |
| | 150 | |
| | 151 | def __init__(self): |
| | 152 | MyProtocolFactoryMixin.__init__(self) |
| | 153 | self.deferred = defer.Deferred() |
| | 154 | self.failDeferred = defer.Deferred() |
| | 155 | |
| | 156 | def clientConnectionFailed(self, connector, reason): |
| | 157 | self.failed = 1 |
| | 158 | self.reason = reason |
| | 159 | self.failDeferred.callback(None) |
| | 160 | |
| | 161 | def clientConnectionLost(self, connector, reason): |
| | 162 | self.lostReason = reason |
| | 163 | self.deferred.callback(None) |
| | 164 | |
| | 165 | def stopFactory(self): |
| | 166 | self.stopped = 1 |
| | 167 | |
| | 168 | |
| | 169 | |
| | 170 | class ListeningTestCase(unittest.TestCase): |
| | 171 | |
| | 172 | def test_listen(self): |
| | 173 | """ |
| | 174 | L{IReactorTCP.listenTCP} returns an object which provides |
| | 175 | L{IListeningPort}. |
| | 176 | """ |
| | 177 | f = MyServerFactory() |
| | 178 | p1 = reactor.listenTCP(0, f, interface="::1") |
| | 179 | self.addCleanup(p1.stopListening) |
| | 180 | self.failUnless(interfaces.IListeningPort.providedBy(p1)) |
| | 181 | |
| | 182 | |
| | 183 | def testStopListening(self): |
| | 184 | """ |
| | 185 | The L{IListeningPort} returned by L{IReactorTCP.listenTCP} can be |
| | 186 | stopped with its C{stopListening} method. After the L{Deferred} it |
| | 187 | (optionally) returns has been called back, the port number can be bound |
| | 188 | to a new server. |
| | 189 | """ |
| | 190 | f = MyServerFactory() |
| | 191 | port = reactor.listenTCP(0, f, interface="::1") |
| | 192 | n = port.getHost().port |
| | 193 | |
| | 194 | def cbStopListening(ignored): |
| | 195 | # Make sure we can rebind the port right away |
| | 196 | port = reactor.listenTCP(n, f, interface="::1") |
| | 197 | return port.stopListening() |
| | 198 | |
| | 199 | d = defer.maybeDeferred(port.stopListening) |
| | 200 | d.addCallback(cbStopListening) |
| | 201 | return d |
| | 202 | |
| | 203 | |
| | 204 | def testNumberedInterface(self): |
| | 205 | f = MyServerFactory() |
| | 206 | # listen only on the loopback interface |
| | 207 | p1 = reactor.listenTCP(0, f, interface='::1') |
| | 208 | return p1.stopListening() |
| | 209 | |
| | 210 | def testPortRepr(self): |
| | 211 | f = MyServerFactory() |
| | 212 | p = reactor.listenTCP(0, f) |
| | 213 | portNo = str(p.getHost().port) |
| | 214 | self.failIf(repr(p).find(portNo) == -1) |
| | 215 | def stoppedListening(ign): |
| | 216 | self.failIf(repr(p).find(portNo) != -1) |
| | 217 | d = defer.maybeDeferred(p.stopListening) |
| | 218 | return d.addCallback(stoppedListening) |
| | 219 | |
| | 220 | |
| | 221 | def test_serverRepr(self): |
| | 222 | """ |
| | 223 | Check that the repr string of the server transport get the good port |
| | 224 | number if the server listens on 0. |
| | 225 | """ |
| | 226 | server = MyServerFactory() |
| | 227 | serverConnMade = server.protocolConnectionMade = defer.Deferred() |
| | 228 | port = reactor.listenTCP(0, server, 50, interface="::1") |
| | 229 | self.addCleanup(port.stopListening) |
| | 230 | |
| | 231 | client = MyClientFactory() |
| | 232 | clientConnMade = client.protocolConnectionMade = defer.Deferred() |
| | 233 | connector = reactor.connectTCP("::1", |
| | 234 | port.getHost().port, client) |
| | 235 | self.addCleanup(connector.disconnect) |
| | 236 | def check((serverProto, clientProto)): |
| | 237 | portNumber = port.getHost().port |
| | 238 | self.assertEquals( |
| | 239 | repr(serverProto.transport), |
| | 240 | "<AccumulatingProtocol #0 on %s>" % (portNumber,)) |
| | 241 | serverProto.transport.loseConnection() |
| | 242 | clientProto.transport.loseConnection() |
| | 243 | |
| | 244 | return defer.gatherResults([serverConnMade, clientConnMade] |
| | 245 | ).addCallback(check) |
| | 246 | |
| | 247 | |
| | 248 | def test_restartListening(self): |
| | 249 | """ |
| | 250 | Stop and then try to restart a L{tcp.Port}: after a restart, the |
| | 251 | server should be able to handle client connections. |
| | 252 | """ |
| | 253 | serverFactory = MyServerFactory() |
| | 254 | port = reactor.listenTCP(0, serverFactory, interface="::1") |
| | 255 | self.addCleanup(port.stopListening) |
| | 256 | |
| | 257 | def cbStopListening(ignored): |
| | 258 | port.startListening() |
| | 259 | |
| | 260 | client = MyClientFactory() |
| | 261 | serverFactory.protocolConnectionMade = defer.Deferred() |
| | 262 | client.protocolConnectionMade = defer.Deferred() |
| | 263 | connector = reactor.connectTCP("::1", |
| | 264 | port.getHost().port, client) |
| | 265 | self.addCleanup(connector.disconnect) |
| | 266 | return defer.gatherResults([serverFactory.protocolConnectionMade, |
| | 267 | client.protocolConnectionMade] |
| | 268 | ).addCallback(close) |
| | 269 | |
| | 270 | def close((serverProto, clientProto)): |
| | 271 | clientProto.transport.loseConnection() |
| | 272 | serverProto.transport.loseConnection() |
| | 273 | |
| | 274 | d = defer.maybeDeferred(port.stopListening) |
| | 275 | d.addCallback(cbStopListening) |
| | 276 | return d |
| | 277 | |
| | 278 | |
| | 279 | def test_exceptInStop(self): |
| | 280 | """ |
| | 281 | If the server factory raises an exception in C{stopFactory}, the |
| | 282 | deferred returned by L{tcp.Port.stopListening} should fail with the |
| | 283 | corresponding error. |
| | 284 | """ |
| | 285 | serverFactory = MyServerFactory() |
| | 286 | def raiseException(): |
| | 287 | raise RuntimeError("An error") |
| | 288 | serverFactory.stopFactory = raiseException |
| | 289 | port = reactor.listenTCP(0, serverFactory, interface="::1") |
| | 290 | |
| | 291 | return self.assertFailure(port.stopListening(), RuntimeError) |
| | 292 | |
| | 293 | |
| | 294 | def test_restartAfterExcept(self): |
| | 295 | """ |
| | 296 | Even if the server factory raise an exception in C{stopFactory}, the |
| | 297 | corresponding C{tcp.Port} instance should be in a sane state and can |
| | 298 | be restarted. |
| | 299 | """ |
| | 300 | serverFactory = MyServerFactory() |
| | 301 | def raiseException(): |
| | 302 | raise RuntimeError("An error") |
| | 303 | serverFactory.stopFactory = raiseException |
| | 304 | port = reactor.listenTCP(0, serverFactory, interface="::1") |
| | 305 | self.addCleanup(port.stopListening) |
| | 306 | |
| | 307 | def cbStopListening(ignored): |
| | 308 | del serverFactory.stopFactory |
| | 309 | port.startListening() |
| | 310 | |
| | 311 | client = MyClientFactory() |
| | 312 | serverFactory.protocolConnectionMade = defer.Deferred() |
| | 313 | client.protocolConnectionMade = defer.Deferred() |
| | 314 | connector = reactor.connectTCP("::1", |
| | 315 | port.getHost().port, client) |
| | 316 | self.addCleanup(connector.disconnect) |
| | 317 | return defer.gatherResults([serverFactory.protocolConnectionMade, |
| | 318 | client.protocolConnectionMade] |
| | 319 | ).addCallback(close) |
| | 320 | |
| | 321 | def close((serverProto, clientProto)): |
| | 322 | clientProto.transport.loseConnection() |
| | 323 | serverProto.transport.loseConnection() |
| | 324 | |
| | 325 | return self.assertFailure(port.stopListening(), RuntimeError |
| | 326 | ).addCallback(cbStopListening) |
| | 327 | |
| | 328 | |
| | 329 | def test_directConnectionLostCall(self): |
| | 330 | """ |
| | 331 | If C{connectionLost} is called directly on a port object, it succeeds |
| | 332 | (and doesn't expect the presence of a C{deferred} attribute). |
| | 333 | |
| | 334 | C{connectionLost} is called by L{reactor.disconnectAll} at shutdown. |
| | 335 | """ |
| | 336 | serverFactory = MyServerFactory() |
| | 337 | port = reactor.listenTCP(0, serverFactory, interface="::1") |
| | 338 | portNumber = port.getHost().port |
| | 339 | port.connectionLost(None) |
| | 340 | |
| | 341 | client = MyClientFactory() |
| | 342 | serverFactory.protocolConnectionMade = defer.Deferred() |
| | 343 | client.protocolConnectionMade = defer.Deferred() |
| | 344 | reactor.connectTCP("::1", portNumber, client) |
| | 345 | def check(ign): |
| | 346 | client.reason.trap(error.ConnectionRefusedError) |
| | 347 | return client.failDeferred.addCallback(check) |
| | 348 | |
| | 349 | |
| | 350 | def test_exceptInConnectionLostCall(self): |
| | 351 | """ |
| | 352 | If C{connectionLost} is called directory on a port object and that the |
| | 353 | server factory raises an exception in C{stopFactory}, the exception is |
| | 354 | passed through to the caller. |
| | 355 | |
| | 356 | C{connectionLost} is called by L{reactor.disconnectAll} at shutdown. |
| | 357 | """ |
| | 358 | serverFactory = MyServerFactory() |
| | 359 | def raiseException(): |
| | 360 | raise RuntimeError("An error") |
| | 361 | serverFactory.stopFactory = raiseException |
| | 362 | port = reactor.listenTCP(0, serverFactory, interface="::1") |
| | 363 | self.assertRaises(RuntimeError, port.connectionLost, None) |
| | 364 | |
| | 365 | |
| | 366 | |
| | 367 | def callWithSpew(f): |
| | 368 | from twisted.python.util import spewerWithLinenums as spewer |
| | 369 | import sys |
| | 370 | sys.settrace(spewer) |
| | 371 | try: |
| | 372 | f() |
| | 373 | finally: |
| | 374 | sys.settrace(None) |
| | 375 | |
| | 376 | class LoopbackTestCase(unittest.TestCase): |
| | 377 | """ |
| | 378 | Test loopback connections. |
| | 379 | """ |
| | 380 | def test_closePortInProtocolFactory(self): |
| | 381 | """ |
| | 382 | A port created with L{IReactorTCP.listenTCP} can be connected to with |
| | 383 | L{IReactorTCP.connectTCP}. |
| | 384 | """ |
| | 385 | f = ClosingFactory() |
| | 386 | port = reactor.listenTCP(0, f, interface="::1") |
| | 387 | f.port = port |
| | 388 | self.addCleanup(f.cleanUp) |
| | 389 | portNumber = port.getHost().port |
| | 390 | clientF = MyClientFactory() |
| | 391 | reactor.connectTCP("::1", portNumber, clientF) |
| | 392 | def check(x): |
| | 393 | self.assertTrue(clientF.protocol.made) |
| | 394 | self.assertTrue(port.disconnected) |
| | 395 | clientF.lostReason.trap(error.ConnectionDone) |
| | 396 | return clientF.deferred.addCallback(check) |
| | 397 | |
| | 398 | def _trapCnxDone(self, obj): |
| | 399 | getattr(obj, 'trap', lambda x: None)(error.ConnectionDone) |
| | 400 | |
| | 401 | |
| | 402 | def _connectedClientAndServerTest(self, callback): |
| | 403 | """ |
| | 404 | Invoke the given callback with a client protocol and a server protocol |
| | 405 | which have been connected to each other. |
| | 406 | """ |
| | 407 | serverFactory = MyServerFactory() |
| | 408 | serverConnMade = defer.Deferred() |
| | 409 | serverFactory.protocolConnectionMade = serverConnMade |
| | 410 | port = reactor.listenTCP(0, serverFactory, interface="::1") |
| | 411 | self.addCleanup(port.stopListening) |
| | 412 | |
| | 413 | portNumber = port.getHost().port |
| | 414 | clientF = MyClientFactory() |
| | 415 | clientConnMade = defer.Deferred() |
| | 416 | clientF.protocolConnectionMade = clientConnMade |
| | 417 | reactor.connectTCP("::1", portNumber, clientF) |
| | 418 | |
| | 419 | connsMade = defer.gatherResults([serverConnMade, clientConnMade]) |
| | 420 | def connected((serverProtocol, clientProtocol)): |
| | 421 | callback(serverProtocol, clientProtocol) |
| | 422 | serverProtocol.transport.loseConnection() |
| | 423 | clientProtocol.transport.loseConnection() |
| | 424 | connsMade.addCallback(connected) |
| | 425 | return connsMade |
| | 426 | |
| | 427 | |
| | 428 | def test_tcpNoDelay(self): |
| | 429 | """ |
| | 430 | The transport of a protocol connected with L{IReactorTCP.connectTCP} or |
| | 431 | L{IReactor.TCP.listenTCP} can have its I{TCP_NODELAY} state inspected |
| | 432 | and manipulated with L{ITCPTransport.getTcpNoDelay} and |
| | 433 | L{ITCPTransport.setTcpNoDelay}. |
| | 434 | """ |
| | 435 | def check(serverProtocol, clientProtocol): |
| | 436 | for p in [serverProtocol, clientProtocol]: |
| | 437 | transport = p.transport |
| | 438 | self.assertEquals(transport.getTcpNoDelay(), 0) |
| | 439 | transport.setTcpNoDelay(1) |
| | 440 | self.assertEquals(transport.getTcpNoDelay(), 1) |
| | 441 | transport.setTcpNoDelay(0) |
| | 442 | self.assertEquals(transport.getTcpNoDelay(), 0) |
| | 443 | return self._connectedClientAndServerTest(check) |
| | 444 | |
| | 445 | |
| | 446 | def test_tcpKeepAlive(self): |
| | 447 | """ |
| | 448 | The transport of a protocol connected with L{IReactorTCP.connectTCP} or |
| | 449 | L{IReactor.TCP.listenTCP} can have its I{SO_KEEPALIVE} state inspected |
| | 450 | and manipulated with L{ITCPTransport.getTcpKeepAlive} and |
| | 451 | L{ITCPTransport.setTcpKeepAlive}. |
| | 452 | """ |
| | 453 | def check(serverProtocol, clientProtocol): |
| | 454 | for p in [serverProtocol, clientProtocol]: |
| | 455 | transport = p.transport |
| | 456 | self.assertEquals(transport.getTcpKeepAlive(), 0) |
| | 457 | transport.setTcpKeepAlive(1) |
| | 458 | self.assertEquals(transport.getTcpKeepAlive(), 1) |
| | 459 | transport.setTcpKeepAlive(0) |
| | 460 | self.assertEquals(transport.getTcpKeepAlive(), 0) |
| | 461 | return self._connectedClientAndServerTest(check) |
| | 462 | |
| | 463 | |
| | 464 | def testFailing(self): |
| | 465 | clientF = MyClientFactory() |
| | 466 | # XXX we assume no one is listening on TCP port 69 |
| | 467 | reactor.connectTCP("::1", 69, clientF, timeout=5) |
| | 468 | def check(ignored): |
| | 469 | clientF.reason.trap(error.ConnectionRefusedError) |
| | 470 | return clientF.failDeferred.addCallback(check) |
| | 471 | |
| | 472 | |
| | 473 | def test_connectionRefusedErrorNumber(self): |
| | 474 | """ |
| | 475 | Assert that the error number of the ConnectionRefusedError is |
| | 476 | ECONNREFUSED, and not some other socket related error. |
| | 477 | """ |
| | 478 | |
| | 479 | # Bind a number of ports in the operating system. We will attempt |
| | 480 | # to connect to these in turn immediately after closing them, in the |
| | 481 | # hopes that no one else has bound them in the mean time. Any |
| | 482 | # connection which succeeds is ignored and causes us to move on to |
| | 483 | # the next port. As soon as a connection attempt fails, we move on |
| | 484 | # to making an assertion about how it failed. If they all succeed, |
| | 485 | # the test will fail. |
| | 486 | |
| | 487 | # It would be nice to have a simpler, reliable way to cause a |
| | 488 | # connection failure from the platform. |
| | 489 | # |
| | 490 | # On Linux (2.6.15), connecting to port 0 always fails. FreeBSD |
| | 491 | # (5.4) rejects the connection attempt with EADDRNOTAVAIL. |
| | 492 | # |
| | 493 | # On FreeBSD (5.4), listening on a port and then repeatedly |
| | 494 | # connecting to it without ever accepting any connections eventually |
| | 495 | # leads to an ECONNREFUSED. On Linux (2.6.15), a seemingly |
| | 496 | # unbounded number of connections succeed. |
| | 497 | |
| | 498 | serverSockets = [] |
| | 499 | for i in xrange(10): |
| | 500 | serverSocket = socket.socket(socket.AF_INET6) |
| | 501 | serverSocket.bind(('::1', 0)) |
| | 502 | serverSocket.listen(1) |
| | 503 | serverSockets.append(serverSocket) |
| | 504 | random.shuffle(serverSockets) |
| | 505 | |
| | 506 | clientCreator = protocol.ClientCreator(reactor, protocol.Protocol) |
| | 507 | |
| | 508 | def tryConnectFailure(): |
| | 509 | def connected(proto): |
| | 510 | """ |
| | 511 | Darn. Kill it and try again, if there are any tries left. |
| | 512 | """ |
| | 513 | proto.transport.loseConnection() |
| | 514 | if serverSockets: |
| | 515 | return tryConnectFailure() |
| | 516 | self.fail("Could not fail to connect - could not test errno for that case.") |
| | 517 | |
| | 518 | serverSocket = serverSockets.pop() |
| | 519 | serverHost, serverPort = serverSocket.getsockname()[:2] |
| | 520 | serverSocket.close() |
| | 521 | |
| | 522 | connectDeferred = clientCreator.connectTCP(serverHost, serverPort) |
| | 523 | connectDeferred.addCallback(connected) |
| | 524 | return connectDeferred |
| | 525 | |
| | 526 | refusedDeferred = tryConnectFailure() |
| | 527 | self.assertFailure(refusedDeferred, error.ConnectionRefusedError) |
| | 528 | def connRefused(exc): |
| | 529 | self.assertEqual(exc.osError, errno.ECONNREFUSED) |
| | 530 | refusedDeferred.addCallback(connRefused) |
| | 531 | def cleanup(passthrough): |
| | 532 | while serverSockets: |
| | 533 | serverSockets.pop().close() |
| | 534 | return passthrough |
| | 535 | refusedDeferred.addBoth(cleanup) |
| | 536 | return refusedDeferred |
| | 537 | |
| | 538 | |
| | 539 | def test_connectByServiceFail(self): |
| | 540 | """ |
| | 541 | Connecting to a named service which does not exist raises |
| | 542 | L{error.ServiceNameUnknownError}. |
| | 543 | """ |
| | 544 | self.assertRaises( |
| | 545 | error.ServiceNameUnknownError, |
| | 546 | reactor.connectTCP, |
| | 547 | "::1", "thisbetternotexist", MyClientFactory()) |
| | 548 | |
| | 549 | |
| | 550 | def test_connectByService(self): |
| | 551 | """ |
| | 552 | L{IReactorTCP.connectTCP} accepts the name of a service instead of a |
| | 553 | port number and connects to the port number associated with that |
| | 554 | service, as defined by L{socket.getservbyname}. |
| | 555 | """ |
| | 556 | serverFactory = MyServerFactory() |
| | 557 | serverConnMade = defer.Deferred() |
| | 558 | serverFactory.protocolConnectionMade = serverConnMade |
| | 559 | port = reactor.listenTCP(0, serverFactory, interface="::1") |
| | 560 | self.addCleanup(port.stopListening) |
| | 561 | portNumber = port.getHost().port |
| | 562 | clientFactory = MyClientFactory() |
| | 563 | clientConnMade = defer.Deferred() |
| | 564 | clientFactory.protocolConnectionMade = clientConnMade |
| | 565 | |
| | 566 | def fakeGetServicePortByName(serviceName, protocolName): |
| | 567 | if serviceName == 'http' and protocolName == 'tcp': |
| | 568 | return portNumber |
| | 569 | return 10 |
| | 570 | self.patch(socket, 'getservbyname', fakeGetServicePortByName) |
| | 571 | |
| | 572 | reactor.connectTCP('::1', 'http', clientFactory) |
| | 573 | |
| | 574 | connMade = defer.gatherResults([serverConnMade, clientConnMade]) |
| | 575 | def connected((serverProtocol, clientProtocol)): |
| | 576 | self.assertTrue( |
| | 577 | serverFactory.called, |
| | 578 | "Server factory was not called upon to build a protocol.") |
| | 579 | serverProtocol.transport.loseConnection() |
| | 580 | clientProtocol.transport.loseConnection() |
| | 581 | connMade.addCallback(connected) |
| | 582 | return connMade |
| | 583 | |
| | 584 | |
| | 585 | class StartStopFactory(protocol.Factory): |
| | 586 | |
| | 587 | started = 0 |
| | 588 | stopped = 0 |
| | 589 | |
| | 590 | def startFactory(self): |
| | 591 | if self.started or self.stopped: |
| | 592 | raise RuntimeError |
| | 593 | self.started = 1 |
| | 594 | |
| | 595 | def stopFactory(self): |
| | 596 | if not self.started or self.stopped: |
| | 597 | raise RuntimeError |
| | 598 | self.stopped = 1 |
| | 599 | |
| | 600 | |
| | 601 | class ClientStartStopFactory(MyClientFactory): |
| | 602 | |
| | 603 | started = 0 |
| | 604 | stopped = 0 |
| | 605 | |
| | 606 | def startFactory(self): |
| | 607 | if self.started or self.stopped: |
| | 608 | raise RuntimeError |
| | 609 | self.started = 1 |
| | 610 | |
| | 611 | def stopFactory(self): |
| | 612 | if not self.started or self.stopped: |
| | 613 | raise RuntimeError |
| | 614 | self.stopped = 1 |
| | 615 | |
| | 616 | |
| | 617 | class FactoryTestCase(unittest.TestCase): |
| | 618 | """Tests for factories.""" |
| | 619 | |
| | 620 | def test_serverStartStop(self): |
| | 621 | """ |
| | 622 | The factory passed to L{IReactorTCP.listenTCP} should be started only |
| | 623 | when it transitions from being used on no ports to being used on one |
| | 624 | port and should be stopped only when it transitions from being used on |
| | 625 | one port to being used on no ports. |
| | 626 | """ |
| | 627 | # Note - this test doesn't need to use listenTCP. It is exercising |
| | 628 | # logic implemented in Factory.doStart and Factory.doStop, so it could |
| | 629 | # just call that directly. Some other test can make sure that |
| | 630 | # listenTCP and stopListening correctly call doStart and |
| | 631 | # doStop. -exarkun |
| | 632 | |
| | 633 | f = StartStopFactory() |
| | 634 | |
| | 635 | # listen on port |
| | 636 | p1 = reactor.listenTCP(0, f, interface='::1') |
| | 637 | self.addCleanup(p1.stopListening) |
| | 638 | |
| | 639 | self.assertEqual((f.started, f.stopped), (1, 0)) |
| | 640 | |
| | 641 | # listen on two more ports |
| | 642 | p2 = reactor.listenTCP(0, f, interface='::1') |
| | 643 | p3 = reactor.listenTCP(0, f, interface='::1') |
| | 644 | |
| | 645 | self.assertEqual((f.started, f.stopped), (1, 0)) |
| | 646 | |
| | 647 | # close two ports |
| | 648 | d1 = defer.maybeDeferred(p1.stopListening) |
| | 649 | d2 = defer.maybeDeferred(p2.stopListening) |
| | 650 | closedDeferred = defer.gatherResults([d1, d2]) |
| | 651 | def cbClosed(ignored): |
| | 652 | self.assertEqual((f.started, f.stopped), (1, 0)) |
| | 653 | # Close the last port |
| | 654 | return p3.stopListening() |
| | 655 | closedDeferred.addCallback(cbClosed) |
| | 656 | |
| | 657 | def cbClosedAll(ignored): |
| | 658 | self.assertEquals((f.started, f.stopped), (1, 1)) |
| | 659 | closedDeferred.addCallback(cbClosedAll) |
| | 660 | return closedDeferred |
| | 661 | |
| | 662 | |
| | 663 | def test_clientStartStop(self): |
| | 664 | """ |
| | 665 | The factory passed to L{IReactorTCP.connectTCP} should be started when |
| | 666 | the connection attempt starts and stopped when it is over. |
| | 667 | """ |
| | 668 | f = ClosingFactory() |
| | 669 | p = reactor.listenTCP(0, f, interface="::1") |
| | 670 | f.port = p |
| | 671 | self.addCleanup(f.cleanUp) |
| | 672 | portNumber = p.getHost().port |
| | 673 | |
| | 674 | factory = ClientStartStopFactory() |
| | 675 | reactor.connectTCP("::1", portNumber, factory) |
| | 676 | self.assertTrue(factory.started) |
| | 677 | return loopUntil(lambda: factory.stopped) |
| | 678 | |
| | 679 | |
| | 680 | |
| | 681 | class ConnectorTestCase(unittest.TestCase): |
| | 682 | |
| | 683 | def test_connectorIdentity(self): |
| | 684 | """ |
| | 685 | L{IReactorTCP.connectTCP} returns an object which provides |
| | 686 | L{IConnector}. The destination of the connector is the address which |
| | 687 | was passed to C{connectTCP}. The same connector object is passed to |
| | 688 | the factory's C{startedConnecting} method as to the factory's |
| | 689 | C{clientConnectionLost} method. |
| | 690 | """ |
| | 691 | serverFactory = ClosingFactory() |
| | 692 | tcpPort = reactor.listenTCP(0, serverFactory, interface="::1") |
| | 693 | serverFactory.port = tcpPort |
| | 694 | self.addCleanup(serverFactory.cleanUp) |
| | 695 | portNumber = tcpPort.getHost().port |
| | 696 | |
| | 697 | seenConnectors = [] |
| | 698 | seenFailures = [] |
| | 699 | |
| | 700 | clientFactory = ClientStartStopFactory() |
| | 701 | clientFactory.clientConnectionLost = ( |
| | 702 | lambda connector, reason: (seenConnectors.append(connector), |
| | 703 | seenFailures.append(reason))) |
| | 704 | clientFactory.startedConnecting = seenConnectors.append |
| | 705 | |
| | 706 | connector = reactor.connectTCP("::1", portNumber, clientFactory) |
| | 707 | self.assertTrue(interfaces.IConnector.providedBy(connector)) |
| | 708 | dest = connector.getDestination() |
| | 709 | self.assertEquals(dest.type, "TCP") |
| | 710 | self.assertEquals(dest.host, "::1") |
| | 711 | self.assertEquals(dest.port, portNumber) |
| | 712 | |
| | 713 | d = loopUntil(lambda: clientFactory.stopped) |
| | 714 | def clientFactoryStopped(ignored): |
| | 715 | seenFailures[0].trap(error.ConnectionDone) |
| | 716 | self.assertEqual(seenConnectors, [connector, connector]) |
| | 717 | d.addCallback(clientFactoryStopped) |
| | 718 | return d |
| | 719 | |
| | 720 | |
| | 721 | def test_userFail(self): |
| | 722 | """ |
| | 723 | Calling L{IConnector.stopConnecting} in C{Factory.startedConnecting} |
| | 724 | results in C{Factory.clientConnectionFailed} being called with |
| | 725 | L{error.UserError} as the reason. |
| | 726 | """ |
| | 727 | serverFactory = MyServerFactory() |
| | 728 | tcpPort = reactor.listenTCP(0, serverFactory, interface="::1") |
| | 729 | self.addCleanup(tcpPort.stopListening) |
| | 730 | portNumber = tcpPort.getHost().port |
| | 731 | |
| | 732 | def startedConnecting(connector): |
| | 733 | connector.stopConnecting() |
| | 734 | |
| | 735 | clientFactory = ClientStartStopFactory() |
| | 736 | clientFactory.startedConnecting = startedConnecting |
| | 737 | reactor.connectTCP("::1", portNumber, clientFactory) |
| | 738 | |
| | 739 | d = loopUntil(lambda: clientFactory.stopped) |
| | 740 | def check(ignored): |
| | 741 | self.assertEquals(clientFactory.failed, 1) |
| | 742 | clientFactory.reason.trap(error.UserError) |
| | 743 | return d.addCallback(check) |
| | 744 | |
| | 745 | |
| | 746 | def test_reconnect(self): |
| | 747 | """ |
| | 748 | Calling L{IConnector.connect} in C{Factory.clientConnectionLost} causes |
| | 749 | a new connection attempt to be made. |
| | 750 | """ |
| | 751 | serverFactory = ClosingFactory() |
| | 752 | tcpPort = reactor.listenTCP(0, serverFactory, interface="::1") |
| | 753 | serverFactory.port = tcpPort |
| | 754 | self.addCleanup(serverFactory.cleanUp) |
| | 755 | portNumber = tcpPort.getHost().port |
| | 756 | |
| | 757 | clientFactory = MyClientFactory() |
| | 758 | |
| | 759 | def clientConnectionLost(connector, reason): |
| | 760 | connector.connect() |
| | 761 | clientFactory.clientConnectionLost = clientConnectionLost |
| | 762 | reactor.connectTCP("::1", portNumber, clientFactory) |
| | 763 | |
| | 764 | d = loopUntil(lambda: clientFactory.failed) |
| | 765 | def reconnectFailed(ignored): |
| | 766 | p = clientFactory.protocol |
| | 767 | self.assertEqual((p.made, p.closed), (1, 1)) |
| | 768 | clientFactory.reason.trap(error.ConnectionRefusedError) |
| | 769 | self.assertEqual(clientFactory.stopped, 1) |
| | 770 | return d.addCallback(reconnectFailed) |
| | 771 | |
| | 772 | |
| | 773 | |
| | 774 | class CannotBindTestCase(unittest.TestCase): |
| | 775 | """ |
| | 776 | Tests for correct behavior when a reactor cannot bind to the required TCP |
| | 777 | port. |
| | 778 | """ |
| | 779 | |
| | 780 | def test_cannotBind(self): |
| | 781 | """ |
| | 782 | L{IReactorTCP.listenTCP} raises L{error.CannotListenError} if the |
| | 783 | address to listen on is already in use. |
| | 784 | """ |
| | 785 | f = MyServerFactory() |
| | 786 | |
| | 787 | p1 = reactor.listenTCP(0, f, interface='::1') |
| | 788 | self.addCleanup(p1.stopListening) |
| | 789 | n = p1.getHost().port |
| | 790 | dest = p1.getHost() |
| | 791 | self.assertEquals(dest.type, "TCP") |
| | 792 | self.assertEquals(dest.host, "::1") |
| | 793 | self.assertEquals(dest.port, n) |
| | 794 | |
| | 795 | # make sure new listen raises error |
| | 796 | self.assertRaises(error.CannotListenError, |
| | 797 | reactor.listenTCP, n, f, interface='::1') |
| | 798 | |
| | 799 | |
| | 800 | |
| | 801 | def _fireWhenDoneFunc(self, d, f): |
| | 802 | """Returns closure that when called calls f and then callbacks d. |
| | 803 | """ |
| | 804 | from twisted.python import util as tputil |
| | 805 | def newf(*args, **kw): |
| | 806 | rtn = f(*args, **kw) |
| | 807 | d.callback('') |
| | 808 | return rtn |
| | 809 | return tputil.mergeFunctionMetadata(f, newf) |
| | 810 | |
| | 811 | |
| | 812 | def test_clientBind(self): |
| | 813 | """ |
| | 814 | L{IReactorTCP.connectTCP} calls C{Factory.clientConnectionFailed} with |
| | 815 | L{error.ConnectBindError} if the bind address specified is already in |
| | 816 | use. |
| | 817 | """ |
| | 818 | theDeferred = defer.Deferred() |
| | 819 | sf = MyServerFactory() |
| | 820 | sf.startFactory = self._fireWhenDoneFunc(theDeferred, sf.startFactory) |
| | 821 | p = reactor.listenTCP(0, sf, interface="::1") |
| | 822 | self.addCleanup(p.stopListening) |
| | 823 | |
| | 824 | def _connect1(results): |
| | 825 | d = defer.Deferred() |
| | 826 | cf1 = MyClientFactory() |
| | 827 | cf1.buildProtocol = self._fireWhenDoneFunc(d, cf1.buildProtocol) |
| | 828 | reactor.connectTCP("::1", p.getHost().port, cf1, |
| | 829 | bindAddress=("::1", 0)) |
| | 830 | d.addCallback(_conmade, cf1) |
| | 831 | return d |
| | 832 | |
| | 833 | def _conmade(results, cf1): |
| | 834 | d = defer.Deferred() |
| | 835 | cf1.protocol.connectionMade = self._fireWhenDoneFunc( |
| | 836 | d, cf1.protocol.connectionMade) |
| | 837 | d.addCallback(_check1connect2, cf1) |
| | 838 | return d |
| | 839 | |
| | 840 | def _check1connect2(results, cf1): |
| | 841 | self.assertEquals(cf1.protocol.made, 1) |
| | 842 | |
| | 843 | d1 = defer.Deferred() |
| | 844 | d2 = defer.Deferred() |
| | 845 | port = cf1.protocol.transport.getHost().port |
| | 846 | cf2 = MyClientFactory() |
| | 847 | cf2.clientConnectionFailed = self._fireWhenDoneFunc( |
| | 848 | d1, cf2.clientConnectionFailed) |
| | 849 | cf2.stopFactory = self._fireWhenDoneFunc(d2, cf2.stopFactory) |
| | 850 | reactor.connectTCP("::1", p.getHost().port, cf2, |
| | 851 | bindAddress=("::1", port)) |
| | 852 | d1.addCallback(_check2failed, cf1, cf2) |
| | 853 | d2.addCallback(_check2stopped, cf1, cf2) |
| | 854 | dl = defer.DeferredList([d1, d2]) |
| | 855 | dl.addCallback(_stop, cf1, cf2) |
| | 856 | return dl |
| | 857 | |
| | 858 | def _check2failed(results, cf1, cf2): |
| | 859 | self.assertEquals(cf2.failed, 1) |
| | 860 | cf2.reason.trap(error.ConnectBindError) |
| | 861 | self.assertTrue(cf2.reason.check(error.ConnectBindError)) |
| | 862 | return results |
| | 863 | |
| | 864 | def _check2stopped(results, cf1, cf2): |
| | 865 | self.assertEquals(cf2.stopped, 1) |
| | 866 | return results |
| | 867 | |
| | 868 | def _stop(results, cf1, cf2): |
| | 869 | d = defer.Deferred() |
| | 870 | d.addCallback(_check1cleanup, cf1) |
| | 871 | cf1.stopFactory = self._fireWhenDoneFunc(d, cf1.stopFactory) |
| | 872 | cf1.protocol.transport.loseConnection() |
| | 873 | return d |
| | 874 | |
| | 875 | def _check1cleanup(results, cf1): |
| | 876 | self.assertEquals(cf1.stopped, 1) |
| | 877 | |
| | 878 | theDeferred.addCallback(_connect1) |
| | 879 | return theDeferred |
| | 880 | |
| | 881 | |
| | 882 | |
| | 883 | class MyOtherClientFactory(protocol.ClientFactory): |
| | 884 | def buildProtocol(self, address): |
| | 885 | self.address = address |
| | 886 | self.protocol = AccumulatingProtocol() |
| | 887 | return self.protocol |
| | 888 | |
| | 889 | |
| | 890 | |
| | 891 | class LocalRemoteAddressTestCase(unittest.TestCase): |
| | 892 | """ |
| | 893 | Tests for correct getHost/getPeer values and that the correct address is |
| | 894 | passed to buildProtocol. |
| | 895 | """ |
| | 896 | def test_hostAddress(self): |
| | 897 | """ |
| | 898 | L{IListeningPort.getHost} returns the same address as a client |
| | 899 | connection's L{ITCPTransport.getPeer}. |
| | 900 | """ |
| | 901 | serverFactory = MyServerFactory() |
| | 902 | serverFactory.protocolConnectionLost = defer.Deferred() |
| | 903 | serverConnectionLost = serverFactory.protocolConnectionLost |
| | 904 | port = reactor.listenTCP(0, serverFactory, interface='::1') |
| | 905 | self.addCleanup(port.stopListening) |
| | 906 | n = port.getHost().port |
| | 907 | |
| | 908 | clientFactory = MyClientFactory() |
| | 909 | onConnection = clientFactory.protocolConnectionMade = defer.Deferred() |
| | 910 | connector = reactor.connectTCP('::1', n, clientFactory) |
| | 911 | |
| | 912 | def check(ignored): |
| | 913 | self.assertEquals([port.getHost()], clientFactory.peerAddresses) |
| | 914 | self.assertEquals( |
| | 915 | port.getHost(), clientFactory.protocol.transport.getPeer()) |
| | 916 | onConnection.addCallback(check) |
| | 917 | |
| | 918 | def cleanup(ignored): |
| | 919 | # Clean up the client explicitly here so that tear down of |
| | 920 | # the server side of the connection begins, then wait for |
| | 921 | # the server side to actually disconnect. |
| | 922 | connector.disconnect() |
| | 923 | return serverConnectionLost |
| | 924 | onConnection.addCallback(cleanup) |
| | 925 | |
| | 926 | return onConnection |
| | 927 | |
| | 928 | |
| | 929 | |
| | 930 | class WriterProtocol(protocol.Protocol): |
| | 931 | def connectionMade(self): |
| | 932 | # use everything ITransport claims to provide. If something here |
| | 933 | # fails, the exception will be written to the log, but it will not |
| | 934 | # directly flunk the test. The test will fail when maximum number of |
| | 935 | # iterations have passed and the writer's factory.done has not yet |
| | 936 | # been set. |
| | 937 | self.transport.write("Hello Cleveland!\n") |
| | 938 | seq = ["Goodbye", " cruel", " world", "\n"] |
| | 939 | self.transport.writeSequence(seq) |
| | 940 | peer = self.transport.getPeer() |
| | 941 | if peer.type != "TCP": |
| | 942 | print "getPeer returned non-TCP socket:", peer |
| | 943 | self.factory.problem = 1 |
| | 944 | us = self.transport.getHost() |
| | 945 | if us.type != "TCP": |
| | 946 | print "getHost returned non-TCP socket:", us |
| | 947 | self.factory.problem = 1 |
| | 948 | self.factory.done = 1 |
| | 949 | |
| | 950 | self.transport.loseConnection() |
| | 951 | |
| | 952 | class ReaderProtocol(protocol.Protocol): |
| | 953 | def dataReceived(self, data): |
| | 954 | self.factory.data += data |
| | 955 | def connectionLost(self, reason): |
| | 956 | self.factory.done = 1 |
| | 957 | |
| | 958 | class WriterClientFactory(protocol.ClientFactory): |
| | 959 | def __init__(self): |
| | 960 | self.done = 0 |
| | 961 | self.data = "" |
| | 962 | def buildProtocol(self, addr): |
| | 963 | p = ReaderProtocol() |
| | 964 | p.factory = self |
| | 965 | self.protocol = p |
| | 966 | return p |
| | 967 | |
| | 968 | class WriteDataTestCase(unittest.TestCase): |
| | 969 | """ |
| | 970 | Test that connected TCP sockets can actually write data. Try to exercise |
| | 971 | the entire ITransport interface. |
| | 972 | """ |
| | 973 | |
| | 974 | def test_writer(self): |
| | 975 | """ |
| | 976 | L{ITCPTransport.write} and L{ITCPTransport.writeSequence} send bytes to |
| | 977 | the other end of the connection. |
| | 978 | """ |
| | 979 | f = protocol.Factory() |
| | 980 | f.protocol = WriterProtocol |
| | 981 | f.done = 0 |
| | 982 | f.problem = 0 |
| | 983 | wrappedF = WiredFactory(f) |
| | 984 | p = reactor.listenTCP(0, wrappedF, interface="::1") |
| | 985 | self.addCleanup(p.stopListening) |
| | 986 | n = p.getHost().port |
| | 987 | clientF = WriterClientFactory() |
| | 988 | wrappedClientF = WiredFactory(clientF) |
| | 989 | reactor.connectTCP("::1", n, wrappedClientF) |
| | 990 | |
| | 991 | def check(ignored): |
| | 992 | self.failUnless(f.done, "writer didn't finish, it probably died") |
| | 993 | self.failUnless(f.problem == 0, "writer indicated an error") |
| | 994 | self.failUnless(clientF.done, |
| | 995 | "client didn't see connection dropped") |
| | 996 | expected = "".join(["Hello Cleveland!\n", |
| | 997 | "Goodbye", " cruel", " world", "\n"]) |
| | 998 | self.failUnless(clientF.data == expected, |
| | 999 | "client didn't receive all the data it expected") |
| | 1000 | d = defer.gatherResults([wrappedF.onDisconnect, |
| | 1001 | wrappedClientF.onDisconnect]) |
| | 1002 | return d.addCallback(check) |
| | 1003 | |
| | 1004 | |
| | 1005 | def test_writeAfterShutdownWithoutReading(self): |
| | 1006 | """ |
| | 1007 | A TCP transport which is written to after the connection has been shut |
| | 1008 | down should notify its protocol that the connection has been lost, even |
| | 1009 | if the TCP transport is not actively being monitored for read events |
| | 1010 | (ie, pauseProducing was called on it). |
| | 1011 | """ |
| | 1012 | # This is an unpleasant thing. Generally tests shouldn't skip or |
| | 1013 | # run based on the name of the reactor being used (most tests |
| | 1014 | # shouldn't care _at all_ what reactor is being used, in fact). The |
| | 1015 | # Gtk reactor cannot pass this test, though, because it fails to |
| | 1016 | # implement IReactorTCP entirely correctly. Gtk is quite old at |
| | 1017 | # this point, so it's more likely that gtkreactor will be deprecated |
| | 1018 | # and removed rather than fixed to handle this case correctly. |
| | 1019 | # Since this is a pre-existing (and very long-standing) issue with |
| | 1020 | # the Gtk reactor, there's no reason for it to prevent this test |
| | 1021 | # being added to exercise the other reactors, for which the behavior |
| | 1022 | # was also untested but at least works correctly (now). See #2833 |
| | 1023 | # for information on the status of gtkreactor. |
| | 1024 | if reactor.__class__.__name__ == 'IOCPReactor': |
| | 1025 | raise unittest.SkipTest( |
| | 1026 | "iocpreactor does not, in fact, stop reading immediately after " |
| | 1027 | "pauseProducing is called. This results in a bonus disconnection " |
| | 1028 | "notification. Under some circumstances, it might be possible to " |
| | 1029 | "not receive this notifications (specifically, pauseProducing, " |
| | 1030 | "deliver some data, proceed with this test).") |
| | 1031 | if reactor.__class__.__name__ == 'GtkReactor': |
| | 1032 | raise unittest.SkipTest( |
| | 1033 | "gtkreactor does not implement unclean disconnection " |
| | 1034 | "notification correctly. This might more properly be " |
| | 1035 | "a todo, but due to technical limitations it cannot be.") |
| | 1036 | |
| | 1037 | # Called back after the protocol for the client side of the connection |
| | 1038 | # has paused its transport, preventing it from reading, therefore |
| | 1039 | # preventing it from noticing the disconnection before the rest of the |
| | 1040 | # actions which are necessary to trigger the case this test is for have |
| | 1041 | # been taken. |
| | 1042 | clientPaused = defer.Deferred() |
| | 1043 | |
| | 1044 | # Called back when the protocol for the server side of the connection |
| | 1045 | # has received connection lost notification. |
| | 1046 | serverLost = defer.Deferred() |
| | 1047 | |
| | 1048 | class Disconnecter(protocol.Protocol): |
| | 1049 | """ |
| | 1050 | Protocol for the server side of the connection which disconnects |
| | 1051 | itself in a callback on clientPaused and publishes notification |
| | 1052 | when its connection is actually lost. |
| | 1053 | """ |
| | 1054 | def connectionMade(self): |
| | 1055 | """ |
| | 1056 | Set up a callback on clientPaused to lose the connection. |
| | 1057 | """ |
| | 1058 | msg('Disconnector.connectionMade') |
| | 1059 | def disconnect(ignored): |
| | 1060 | msg('Disconnector.connectionMade disconnect') |
| | 1061 | self.transport.loseConnection() |
| | 1062 | msg('loseConnection called') |
| | 1063 | clientPaused.addCallback(disconnect) |
| | 1064 | |
| | 1065 | def connectionLost(self, reason): |
| | 1066 | """ |
| | 1067 | Notify observers that the server side of the connection has |
| | 1068 | ended. |
| | 1069 | """ |
| | 1070 | msg('Disconnecter.connectionLost') |
| | 1071 | serverLost.callback(None) |
| | 1072 | msg('serverLost called back') |
| | 1073 | |
| | 1074 | # Create the server port to which a connection will be made. |
| | 1075 | server = protocol.ServerFactory() |
| | 1076 | server.protocol = Disconnecter |
| | 1077 | port = reactor.listenTCP(0, server, interface='::1') |
| | 1078 | self.addCleanup(port.stopListening) |
| | 1079 | addr = port.getHost() |
| | 1080 | |
| | 1081 | class Infinite(object): |
| | 1082 | """ |
| | 1083 | A producer which will write to its consumer as long as |
| | 1084 | resumeProducing is called. |
| | 1085 | |
| | 1086 | @ivar consumer: The L{IConsumer} which will be written to. |
| | 1087 | """ |
| | 1088 | implements(IPullProducer) |
| | 1089 | |
| | 1090 | def __init__(self, consumer): |
| | 1091 | self.consumer = consumer |
| | 1092 | |
| | 1093 | def resumeProducing(self): |
| | 1094 | msg('Infinite.resumeProducing') |
| | 1095 | self.consumer.write('x') |
| | 1096 | msg('Infinite.resumeProducing wrote to consumer') |
| | 1097 | |
| | 1098 | def stopProducing(self): |
| | 1099 | msg('Infinite.stopProducing') |
| | 1100 | |
| | 1101 | |
| | 1102 | class UnreadingWriter(protocol.Protocol): |
| | 1103 | """ |
| | 1104 | Trivial protocol which pauses its transport immediately and then |
| | 1105 | writes some bytes to it. |
| | 1106 | """ |
| | 1107 | def connectionMade(self): |
| | 1108 | msg('UnreadingWriter.connectionMade') |
| | 1109 | self.transport.pauseProducing() |
| | 1110 | clientPaused.callback(None) |
| | 1111 | msg('clientPaused called back') |
| | 1112 | def write(ignored): |
| | 1113 | msg('UnreadingWriter.connectionMade write') |
| | 1114 | # This needs to be enough bytes to spill over into the |
| | 1115 | # userspace Twisted send buffer - if it all fits into |
| | 1116 | # the kernel, Twisted won't even poll for OUT events, |
| | 1117 | # which means it won't poll for any events at all, so |
| | 1118 | # the disconnection is never noticed. This is due to |
| | 1119 | # #1662. When #1662 is fixed, this test will likely |
| | 1120 | # need to be adjusted, otherwise connection lost |
| | 1121 | # notification will happen too soon and the test will |
| | 1122 | # probably begin to fail with ConnectionDone instead of |
| | 1123 | # ConnectionLost (in any case, it will no longer be |
| | 1124 | # entirely correct). |
| | 1125 | producer = Infinite(self.transport) |
| | 1126 | msg('UnreadingWriter.connectionMade write created producer') |
| | 1127 | self.transport.registerProducer(producer, False) |
| | 1128 | msg('UnreadingWriter.connectionMade write registered producer') |
| | 1129 | serverLost.addCallback(write) |
| | 1130 | |
| | 1131 | # Create the client and initiate the connection |
| | 1132 | client = MyClientFactory() |
| | 1133 | client.protocolFactory = UnreadingWriter |
| | 1134 | clientConnectionLost = client.deferred |
| | 1135 | def cbClientLost(ignored): |
| | 1136 | msg('cbClientLost') |
| | 1137 | return client.lostReason |
| | 1138 | clientConnectionLost.addCallback(cbClientLost) |
| | 1139 | msg('Connecting to %s:%s' % (addr.host, addr.port)) |
| | 1140 | reactor.connectTCP(addr.host, addr.port, client) |
| | 1141 | |
| | 1142 | # By the end of the test, the client should have received notification |
| | 1143 | # of unclean disconnection. |
| | 1144 | msg('Returning Deferred') |
| | 1145 | return self.assertFailure(clientConnectionLost, error.ConnectionLost) |
| | 1146 | |
| | 1147 | |
| | 1148 | |
| | 1149 | class ConnectionLosingProtocol(protocol.Protocol): |
| | 1150 | def connectionMade(self): |
| | 1151 | self.transport.write("1") |
| | 1152 | self.transport.loseConnection() |
| | 1153 | self.master._connectionMade() |
| | 1154 | self.master.ports.append(self.transport) |
| | 1155 | |
| | 1156 | |
| | 1157 | |
| | 1158 | class NoopProtocol(protocol.Protocol): |
| | 1159 | def connectionMade(self): |
| | 1160 | self.d = defer.Deferred() |
| | 1161 | self.master.serverConns.append(self.d) |
| | 1162 | |
| | 1163 | def connectionLost(self, reason): |
| | 1164 | self.d.callback(True) |
| | 1165 | |
| | 1166 | |
| | 1167 | |
| | 1168 | class ConnectionLostNotifyingProtocol(protocol.Protocol): |
| | 1169 | """ |
| | 1170 | Protocol which fires a Deferred which was previously passed to |
| | 1171 | its initializer when the connection is lost. |
| | 1172 | |
| | 1173 | @ivar onConnectionLost: The L{Deferred} which will be fired in |
| | 1174 | C{connectionLost}. |
| | 1175 | |
| | 1176 | @ivar lostConnectionReason: C{None} until the connection is lost, then a |
| | 1177 | reference to the reason passed to C{connectionLost}. |
| | 1178 | """ |
| | 1179 | def __init__(self, onConnectionLost): |
| | 1180 | self.lostConnectionReason = None |
| | 1181 | self.onConnectionLost = onConnectionLost |
| | 1182 | |
| | 1183 | |
| | 1184 | def connectionLost(self, reason): |
| | 1185 | self.lostConnectionReason = reason |
| | 1186 | self.onConnectionLost.callback(self) |
| | 1187 | |
| | 1188 | |
| | 1189 | |
| | 1190 | class HandleSavingProtocol(ConnectionLostNotifyingProtocol): |
| | 1191 | """ |
| | 1192 | Protocol which grabs the platform-specific socket handle and |
| | 1193 | saves it as an attribute on itself when the connection is |
| | 1194 | established. |
| | 1195 | """ |
| | 1196 | def makeConnection(self, transport): |
| | 1197 | """ |
| | 1198 | Save the platform-specific socket handle for future |
| | 1199 | introspection. |
| | 1200 | """ |
| | 1201 | self.handle = transport.getHandle() |
| | 1202 | return protocol.Protocol.makeConnection(self, transport) |
| | 1203 | |
| | 1204 | |
| | 1205 | |
| | 1206 | class ProperlyCloseFilesMixin: |
| | 1207 | """ |
| | 1208 | Tests for platform resources properly being cleaned up. |
| | 1209 | """ |
| | 1210 | def createServer(self, address, portNumber, factory): |
| | 1211 | """ |
| | 1212 | Bind a server port to which connections will be made. The server |
| | 1213 | should use the given protocol factory. |
| | 1214 | |
| | 1215 | @return: The L{IListeningPort} for the server created. |
| | 1216 | """ |
| | 1217 | raise NotImplementedError() |
| | 1218 | |
| | 1219 | |
| | 1220 | def connectClient(self, address, portNumber, clientCreator): |
| | 1221 | """ |
| | 1222 | Establish a connection to the given address using the given |
| | 1223 | L{ClientCreator} instance. |
| | 1224 | |
| | 1225 | @return: A Deferred which will fire with the connected protocol instance. |
| | 1226 | """ |
| | 1227 | raise NotImplementedError() |
| | 1228 | |
| | 1229 | |
| | 1230 | def getHandleExceptionType(self): |
| | 1231 | """ |
| | 1232 | Return the exception class which will be raised when an operation is |
| | 1233 | attempted on a closed platform handle. |
| | 1234 | """ |
| | 1235 | raise NotImplementedError() |
| | 1236 | |
| | 1237 | |
| | 1238 | def getHandleErrorCode(self): |
| | 1239 | """ |
| | 1240 | Return the errno expected to result from writing to a closed |
| | 1241 | platform socket handle. |
| | 1242 | """ |
| | 1243 | # These platforms have been seen to give EBADF: |
| | 1244 | # |
| | 1245 | # Linux 2.4.26, Linux 2.6.15, OS X 10.4, FreeBSD 5.4 |
| | 1246 | # Windows 2000 SP 4, Windows XP SP 2 |
| | 1247 | return errno.EBADF |
| | 1248 | |
| | 1249 | |
| | 1250 | def test_properlyCloseFiles(self): |
| | 1251 | """ |
| | 1252 | Test that lost connections properly have their underlying socket |
| | 1253 | resources cleaned up. |
| | 1254 | """ |
| | 1255 | onServerConnectionLost = defer.Deferred() |
| | 1256 | serverFactory = protocol.ServerFactory() |
| | 1257 | serverFactory.protocol = lambda: ConnectionLostNotifyingProtocol( |
| | 1258 | onServerConnectionLost) |
| | 1259 | serverPort = self.createServer('::1', 0, serverFactory) |
| | 1260 | |
| | 1261 | onClientConnectionLost = defer.Deferred() |
| | 1262 | serverAddr = serverPort.getHost() |
| | 1263 | clientCreator = protocol.ClientCreator( |
| | 1264 | reactor, lambda: HandleSavingProtocol(onClientConnectionLost)) |
| | 1265 | clientDeferred = self.connectClient( |
| | 1266 | serverAddr.host, serverAddr.port, clientCreator) |
| | 1267 | |
| | 1268 | def clientConnected(client): |
| | 1269 | """ |
| | 1270 | Disconnect the client. Return a Deferred which fires when both |
| | 1271 | the client and the server have received disconnect notification. |
| | 1272 | """ |
| | 1273 | client.transport.write( |
| | 1274 | 'some bytes to make sure the connection is set up') |
| | 1275 | client.transport.loseConnection() |
| | 1276 | return defer.gatherResults([ |
| | 1277 | onClientConnectionLost, onServerConnectionLost]) |
| | 1278 | clientDeferred.addCallback(clientConnected) |
| | 1279 | |
| | 1280 | def clientDisconnected((client, server)): |
| | 1281 | """ |
| | 1282 | Verify that the underlying platform socket handle has been |
| | 1283 | cleaned up. |
| | 1284 | """ |
| | 1285 | client.lostConnectionReason.trap(error.ConnectionClosed) |
| | 1286 | server.lostConnectionReason.trap(error.ConnectionClosed) |
| | 1287 | expectedErrorCode = self.getHandleErrorCode() |
| | 1288 | err = self.assertRaises( |
| | 1289 | self.getHandleExceptionType(), client.handle.send, 'bytes') |
| | 1290 | self.assertEqual(err.args[0], expectedErrorCode) |
| | 1291 | clientDeferred.addCallback(clientDisconnected) |
| | 1292 | |
| | 1293 | def cleanup(passthrough): |
| | 1294 | """ |
| | 1295 | Shut down the server port. Return a Deferred which fires when |
| | 1296 | this has completed. |
| | 1297 | """ |
| | 1298 | result = defer.maybeDeferred(serverPort.stopListening) |
| | 1299 | result.addCallback(lambda ign: passthrough) |
| | 1300 | return result |
| | 1301 | clientDeferred.addBoth(cleanup) |
| | 1302 | |
| | 1303 | return clientDeferred |
| | 1304 | |
| | 1305 | |
| | 1306 | |
| | 1307 | class ProperlyCloseFilesTestCase(unittest.TestCase, ProperlyCloseFilesMixin): |
| | 1308 | """ |
| | 1309 | Test that the sockets created by L{IReactorTCP.connectTCP} are cleaned up |
| | 1310 | when the connection they are associated with is closed. |
| | 1311 | """ |
| | 1312 | def createServer(self, address, portNumber, factory): |
| | 1313 | """ |
| | 1314 | Create a TCP server using L{IReactorTCP.listenTCP}. |
| | 1315 | """ |
| | 1316 | return reactor.listenTCP(portNumber, factory, interface=address) |
| | 1317 | |
| | 1318 | |
| | 1319 | def connectClient(self, address, portNumber, clientCreator): |
| | 1320 | """ |
| | 1321 | Create a TCP client using L{IReactorTCP.connectTCP}. |
| | 1322 | """ |
| | 1323 | return clientCreator.connectTCP(address, portNumber) |
| | 1324 | |
| | 1325 | |
| | 1326 | def getHandleExceptionType(self): |
| | 1327 | """ |
| | 1328 | Return L{socket.error} as the expected error type which will be |
| | 1329 | raised by a write to the low-level socket object after it has been |
| | 1330 | closed. |
| | 1331 | """ |
| | 1332 | return socket.error |
| | 1333 | |
| | 1334 | |
| | 1335 | |
| | 1336 | class WiredForDeferreds(policies.ProtocolWrapper): |
| | 1337 | def __init__(self, factory, wrappedProtocol): |
| | 1338 | policies.ProtocolWrapper.__init__(self, factory, wrappedProtocol) |
| | 1339 | |
| | 1340 | def connectionMade(self): |
| | 1341 | policies.ProtocolWrapper.connectionMade(self) |
| | 1342 | self.factory.onConnect.callback(None) |
| | 1343 | |
| | 1344 | def connectionLost(self, reason): |
| | 1345 | policies.ProtocolWrapper.connectionLost(self, reason) |
| | 1346 | self.factory.onDisconnect.callback(None) |
| | 1347 | |
| | 1348 | |
| | 1349 | |
| | 1350 | class WiredFactory(policies.WrappingFactory): |
| | 1351 | protocol = WiredForDeferreds |
| | 1352 | |
| | 1353 | def __init__(self, wrappedFactory): |
| | 1354 | policies.WrappingFactory.__init__(self, wrappedFactory) |
| | 1355 | self.onConnect = defer.Deferred() |
| | 1356 | self.onDisconnect = defer.Deferred() |
| | 1357 | |
| | 1358 | |
| | 1359 | |
| | 1360 | class AddressTestCase(unittest.TestCase): |
| | 1361 | """ |
| | 1362 | Tests for address-related interactions with client and server protocols. |
| | 1363 | """ |
| | 1364 | def setUp(self): |
| | 1365 | """ |
| | 1366 | Create a port and connected client/server pair which can be used |
| | 1367 | to test factory behavior related to addresses. |
| | 1368 | |
| | 1369 | @return: A L{defer.Deferred} which will be called back when both the |
| | 1370 | client and server protocols have received their connection made |
| | 1371 | callback. |
| | 1372 | """ |
| | 1373 | class RememberingWrapper(protocol.ClientFactory): |
| | 1374 | """ |
| | 1375 | Simple wrapper factory which records the addresses which are |
| | 1376 | passed to its L{buildProtocol} method and delegates actual |
| | 1377 | protocol creation to another factory. |
| | 1378 | |
| | 1379 | @ivar addresses: A list of the objects passed to buildProtocol. |
| | 1380 | @ivar factory: The wrapped factory to which protocol creation is |
| | 1381 | delegated. |
| | 1382 | """ |
| | 1383 | def __init__(self, factory): |
| | 1384 | self.addresses = [] |
| | 1385 | self.factory = factory |
| | 1386 | |
| | 1387 | # Only bother to pass on buildProtocol calls to the wrapped |
| | 1388 | # factory - doStart, doStop, etc aren't necessary for this test |
| | 1389 | # to pass. |
| | 1390 | def buildProtocol(self, addr): |
| | 1391 | """ |
| | 1392 | Append the given address to C{self.addresses} and forward |
| | 1393 | the call to C{self.factory}. |
| | 1394 | """ |
| | 1395 | self.addresses.append(addr) |
| | 1396 | return self.factory.buildProtocol(addr) |
| | 1397 | |
| | 1398 | # Make a server which we can receive connection and disconnection |
| | 1399 | # notification for, and which will record the address passed to its |
| | 1400 | # buildProtocol. |
| | 1401 | self.server = MyServerFactory() |
| | 1402 | self.serverConnMade = self.server.protocolConnectionMade = defer.Deferred() |
| | 1403 | self.serverConnLost = self.server.protocolConnectionLost = defer.Deferred() |
| | 1404 | # RememberingWrapper is a ClientFactory, but ClientFactory is-a |
| | 1405 | # ServerFactory, so this is okay. |
| | 1406 | self.serverWrapper = RememberingWrapper(self.server) |
| | 1407 | |
| | 1408 | # Do something similar for a client. |
| | 1409 | self.client = MyClientFactory() |
| | 1410 | self.clientConnMade = self.client.protocolConnectionMade = defer.Deferred() |
| | 1411 | self.clientConnLost = self.client.protocolConnectionLost = defer.Deferred() |
| | 1412 | self.clientWrapper = RememberingWrapper(self.client) |
| | 1413 | |
| | 1414 | self.port = reactor.listenTCP(0, self.serverWrapper, interface='::1') |
| | 1415 | self.connector = reactor.connectTCP( |
| | 1416 | self.port.getHost().host, self.port.getHost().port, self.clientWrapper) |
| | 1417 | |
| | 1418 | return defer.gatherResults([self.serverConnMade, self.clientConnMade]) |
| | 1419 | |
| | 1420 | |
| | 1421 | def tearDown(self): |
| | 1422 | """ |
| | 1423 | Disconnect the client/server pair and shutdown the port created in |
| | 1424 | L{setUp}. |
| | 1425 | """ |
| | 1426 | self.connector.disconnect() |
| | 1427 | return defer.gatherResults([ |
| | 1428 | self.serverConnLost, self.clientConnLost, |
| | 1429 | defer.maybeDeferred(self.port.stopListening)]) |
| | 1430 | |
| | 1431 | |
| | 1432 | def test_buildProtocolClient(self): |
| | 1433 | """ |
| | 1434 | L{ClientFactory.buildProtocol} should be invoked with the address of |
| | 1435 | the server to which a connection has been established, which should |
| | 1436 | be the same as the address reported by the C{getHost} method of the |
| | 1437 | transport of the server protocol and as the C{getPeer} method of the |
| | 1438 | transport of the client protocol. |
| | 1439 | """ |
| | 1440 | serverHost = self.server.protocol.transport.getHost() |
| | 1441 | clientPeer = self.client.protocol.transport.getPeer() |
| | 1442 | |
| | 1443 | self.assertEqual( |
| | 1444 | self.clientWrapper.addresses, |
| | 1445 | [IPv6Address('TCP', serverHost.host, serverHost.port)]) |
| | 1446 | self.assertEqual( |
| | 1447 | self.clientWrapper.addresses, |
| | 1448 | [IPv6Address('TCP', clientPeer.host, clientPeer.port)]) |
| | 1449 | |
| | 1450 | |
| | 1451 | |
| | 1452 | class LargeBufferWriterProtocol(protocol.Protocol): |
| | 1453 | |
| | 1454 | # Win32 sockets cannot handle single huge chunks of bytes. Write one |
| | 1455 | # massive string to make sure Twisted deals with this fact. |
| | 1456 | |
| | 1457 | def connectionMade(self): |
| | 1458 | # write 60MB |
| | 1459 | self.transport.write('X'*self.factory.len) |
| | 1460 | self.factory.done = 1 |
| | 1461 | self.transport.loseConnection() |
| | 1462 | |
| | 1463 | class LargeBufferReaderProtocol(protocol.Protocol): |
| | 1464 | def dataReceived(self, data): |
| | 1465 | self.factory.len += len(data) |
| | 1466 | def connectionLost(self, reason): |
| | 1467 | self.factory.done = 1 |
| | 1468 | |
| | 1469 | class LargeBufferReaderClientFactory(protocol.ClientFactory): |
| | 1470 | def __init__(self): |
| | 1471 | self.done = 0 |
| | 1472 | self.len = 0 |
| | 1473 | def buildProtocol(self, addr): |
| | 1474 | p = LargeBufferReaderProtocol() |
| | 1475 | p.factory = self |
| | 1476 | self.protocol = p |
| | 1477 | return p |
| | 1478 | |
| | 1479 | |
| | 1480 | class FireOnClose(policies.ProtocolWrapper): |
| | 1481 | """A wrapper around a protocol that makes it fire a deferred when |
| | 1482 | connectionLost is called. |
| | 1483 | """ |
| | 1484 | def connectionLost(self, reason): |
| | 1485 | policies.ProtocolWrapper.connectionLost(self, reason) |
| | 1486 | self.factory.deferred.callback(None) |
| | 1487 | |
| | 1488 | |
| | 1489 | class FireOnCloseFactory(policies.WrappingFactory): |
| | 1490 | protocol = FireOnClose |
| | 1491 | |
| | 1492 | def __init__(self, wrappedFactory): |
| | 1493 | policies.WrappingFactory.__init__(self, wrappedFactory) |
| | 1494 | self.deferred = defer.Deferred() |
| | 1495 | |
| | 1496 | |
| | 1497 | class LargeBufferTestCase(unittest.TestCase): |
| | 1498 | """Test that buffering large amounts of data works. |
| | 1499 | """ |
| | 1500 | |
| | 1501 | datalen = 60*1024*1024 |
| | 1502 | def testWriter(self): |
| | 1503 | f = protocol.Factory() |
| | 1504 | f.protocol = LargeBufferWriterProtocol |
| | 1505 | f.done = 0 |
| | 1506 | f.problem = 0 |
| | 1507 | f.len = self.datalen |
| | 1508 | wrappedF = FireOnCloseFactory(f) |
| | 1509 | p = reactor.listenTCP(0, wrappedF, interface="::1") |
| | 1510 | self.addCleanup(p.stopListening) |
| | 1511 | n = p.getHost().port |
| | 1512 | clientF = LargeBufferReaderClientFactory() |
| | 1513 | wrappedClientF = FireOnCloseFactory(clientF) |
| | 1514 | reactor.connectTCP("::1", n, wrappedClientF) |
| | 1515 | |
| | 1516 | d = defer.gatherResults([wrappedF.deferred, wrappedClientF.deferred]) |
| | 1517 | def check(ignored): |
| | 1518 | self.failUnless(f.done, "writer didn't finish, it probably died") |
| | 1519 | self.failUnless(clientF.len == self.datalen, |
| | 1520 | "client didn't receive all the data it expected " |
| | 1521 | "(%d != %d)" % (clientF.len, self.datalen)) |
| | 1522 | self.failUnless(clientF.done, |
| | 1523 | "client didn't see connection dropped") |
| | 1524 | return d.addCallback(check) |
| | 1525 | |
| | 1526 | |
| | 1527 | class MyHCProtocol(AccumulatingProtocol): |
| | 1528 | |
| | 1529 | implements(IHalfCloseableProtocol) |
| | 1530 | |
| | 1531 | readHalfClosed = False |
| | 1532 | writeHalfClosed = False |
| | 1533 | |
| | 1534 | def readConnectionLost(self): |
| | 1535 | self.readHalfClosed = True |
| | 1536 | # Invoke notification logic from the base class to simplify testing. |
| | 1537 | if self.writeHalfClosed: |
| | 1538 | self.connectionLost(None) |
| | 1539 | |
| | 1540 | def writeConnectionLost(self): |
| | 1541 | self.writeHalfClosed = True |
| | 1542 | # Invoke notification logic from the base class to simplify testing. |
| | 1543 | if self.readHalfClosed: |
| | 1544 | self.connectionLost(None) |
| | 1545 | |
| | 1546 | |
| | 1547 | class MyHCFactory(protocol.ServerFactory): |
| | 1548 | |
| | 1549 | called = 0 |
| | 1550 | protocolConnectionMade = None |
| | 1551 | |
| | 1552 | def buildProtocol(self, addr): |
| | 1553 | self.called += 1 |
| | 1554 | p = MyHCProtocol() |
| | 1555 | p.factory = self |
| | 1556 | self.protocol = p |
| | 1557 | return p |
| | 1558 | |
| | 1559 | |
| | 1560 | class HalfCloseTestCase(unittest.TestCase): |
| | 1561 | """Test half-closing connections.""" |
| | 1562 | |
| | 1563 | def setUp(self): |
| | 1564 | self.f = f = MyHCFactory() |
| | 1565 | self.p = p = reactor.listenTCP(0, f, interface="::1") |
| | 1566 | self.addCleanup(p.stopListening) |
| | 1567 | d = loopUntil(lambda :p.connected) |
| | 1568 | |
| | 1569 | self.cf = protocol.ClientCreator(reactor, MyHCProtocol) |
| | 1570 | |
| | 1571 | d.addCallback(lambda _: self.cf.connectTCP(p.getHost().host, |
| | 1572 | p.getHost().port)) |
| | 1573 | d.addCallback(self._setUp) |
| | 1574 | return d |
| | 1575 | |
| | 1576 | def _setUp(self, client): |
| | 1577 | self.client = client |
| | 1578 | self.clientProtoConnectionLost = self.client.closedDeferred = defer.Deferred() |
| | 1579 | self.assertEquals(self.client.transport.connected, 1) |
| | 1580 | # Wait for the server to notice there is a connection, too. |
| | 1581 | return loopUntil(lambda: getattr(self.f, 'protocol', None) is not None) |
| | 1582 | |
| | 1583 | def tearDown(self): |
| | 1584 | self.assertEquals(self.client.closed, 0) |
| | 1585 | self.client.transport.loseConnection() |
| | 1586 | d = defer.maybeDeferred(self.p.stopListening) |
| | 1587 | d.addCallback(lambda ign: self.clientProtoConnectionLost) |
| | 1588 | d.addCallback(self._tearDown) |
| | 1589 | return d |
| | 1590 | |
| | 1591 | def _tearDown(self, ignored): |
| | 1592 | self.assertEquals(self.client.closed, 1) |
| | 1593 | # because we did half-close, the server also needs to |
| | 1594 | # closed explicitly. |
| | 1595 | self.assertEquals(self.f.protocol.closed, 0) |
| | 1596 | d = defer.Deferred() |
| | 1597 | def _connectionLost(reason): |
| | 1598 | self.f.protocol.closed = 1 |
| | 1599 | d.callback(None) |
| | 1600 | self.f.protocol.connectionLost = _connectionLost |
| | 1601 | self.f.protocol.transport.loseConnection() |
| | 1602 | d.addCallback(lambda x:self.assertEquals(self.f.protocol.closed, 1)) |
| | 1603 | return d |
| | 1604 | |
| | 1605 | def testCloseWriteCloser(self): |
| | 1606 | client = self.client |
| | 1607 | f = self.f |
| | 1608 | t = client.transport |
| | 1609 | |
| | 1610 | t.write("hello") |
| | 1611 | d = loopUntil(lambda :len(t._tempDataBuffer) == 0) |
| | 1612 | def loseWrite(ignored): |
| | 1613 | t.loseWriteConnection() |
| | 1614 | return loopUntil(lambda :t._writeDisconnected) |
| | 1615 | def check(ignored): |
| | 1616 | self.assertEquals(client.closed, False) |
| | 1617 | self.assertEquals(client.writeHalfClosed, True) |
| | 1618 | self.assertEquals(client.readHalfClosed, False) |
| | 1619 | return loopUntil(lambda :f.protocol.readHalfClosed) |
| | 1620 | def write(ignored): |
| | 1621 | w = client.transport.write |
| | 1622 | w(" world") |
| | 1623 | w("lalala fooled you") |
| | 1624 | self.assertEquals(0, len(client.transport._tempDataBuffer)) |
| | 1625 | self.assertEquals(f.protocol.data, "hello") |
| | 1626 | self.assertEquals(f.protocol.closed, False) |
| | 1627 | self.assertEquals(f.protocol.readHalfClosed, True) |
| | 1628 | return d.addCallback(loseWrite).addCallback(check).addCallback(write) |
| | 1629 | |
| | 1630 | def testWriteCloseNotification(self): |
| | 1631 | f = self.f |
| | 1632 | f.protocol.transport.loseWriteConnection() |
| | 1633 | |
| | 1634 | d = defer.gatherResults([ |
| | 1635 | loopUntil(lambda :f.protocol.writeHalfClosed), |
| | 1636 | loopUntil(lambda :self.client.readHalfClosed)]) |
| | 1637 | d.addCallback(lambda _: self.assertEquals( |
| | 1638 | f.protocol.readHalfClosed, False)) |
| | 1639 | return d |
| | 1640 | |
| | 1641 | |
| | 1642 | class HalfClose2TestCase(unittest.TestCase): |
| | 1643 | |
| | 1644 | def setUp(self): |
| | 1645 | self.f = f = MyServerFactory() |
| | 1646 | self.f.protocolConnectionMade = defer.Deferred() |
| | 1647 | self.p = p = reactor.listenTCP(0, f, interface="::1") |
| | 1648 | |
| | 1649 | # XXX we don't test server side yet since we don't do it yet |
| | 1650 | d = protocol.ClientCreator(reactor, AccumulatingProtocol).connectTCP( |
| | 1651 | p.getHost().host, p.getHost().port) |
| | 1652 | d.addCallback(self._gotClient) |
| | 1653 | return d |
| | 1654 | |
| | 1655 | def _gotClient(self, client): |
| | 1656 | self.client = client |
| | 1657 | # Now wait for the server to catch up - it doesn't matter if this |
| | 1658 | # Deferred has already fired and gone away, in that case we'll |
| | 1659 | # return None and not wait at all, which is precisely correct. |
| | 1660 | return self.f.protocolConnectionMade |
| | 1661 | |
| | 1662 | def tearDown(self): |
| | 1663 | self.client.transport.loseConnection() |
| | 1664 | return self.p.stopListening() |
| | 1665 | |
| | 1666 | def testNoNotification(self): |
| | 1667 | """ |
| | 1668 | TCP protocols support half-close connections, but not all of them |
| | 1669 | support being notified of write closes. In this case, test that |
| | 1670 | half-closing the connection causes the peer's connection to be |
| | 1671 | closed. |
| | 1672 | """ |
| | 1673 | self.client.transport.write("hello") |
| | 1674 | self.client.transport.loseWriteConnection() |
| | 1675 | self.f.protocol.closedDeferred = d = defer.Deferred() |
| | 1676 | self.client.closedDeferred = d2 = defer.Deferred() |
| | 1677 | d.addCallback(lambda x: |
| | 1678 | self.assertEqual(self.f.protocol.data, 'hello')) |
| | 1679 | d.addCallback(lambda x: self.assertEqual(self.f.protocol.closed, True)) |
| | 1680 | return defer.gatherResults([d, d2]) |
| | 1681 | |
| | 1682 | def testShutdownException(self): |
| | 1683 | """ |
| | 1684 | If the other side has already closed its connection, |
| | 1685 | loseWriteConnection should pass silently. |
| | 1686 | """ |
| | 1687 | self.f.protocol.transport.loseConnection() |
| | 1688 | self.client.transport.write("X") |
| | 1689 | self.client.transport.loseWriteConnection() |
| | 1690 | self.f.protocol.closedDeferred = d = defer.Deferred() |
| | 1691 | self.client.closedDeferred = d2 = defer.Deferred() |
| | 1692 | d.addCallback(lambda x: |
| | 1693 | self.failUnlessEqual(self.f.protocol.closed, True)) |
| | 1694 | return defer.gatherResults([d, d2]) |
| | 1695 | |
| | 1696 | |
| | 1697 | class HalfCloseBuggyApplicationTests(unittest.TestCase): |
| | 1698 | """ |
| | 1699 | Test half-closing connections where notification code has bugs. |
| | 1700 | """ |
| | 1701 | |
| | 1702 | def setUp(self): |
| | 1703 | """ |
| | 1704 | Set up a server and connect a client to it. Return a Deferred which |
| | 1705 | only fires once this is done. |
| | 1706 | """ |
| | 1707 | self.serverFactory = MyHCFactory() |
| | 1708 | self.serverFactory.protocolConnectionMade = defer.Deferred() |
| | 1709 | self.port = reactor.listenTCP( |
| | 1710 | 0, self.serverFactory, interface="::1") |
| | 1711 | self.addCleanup(self.port.stopListening) |
| | 1712 | addr = self.port.getHost() |
| | 1713 | creator = protocol.ClientCreator(reactor, MyHCProtocol) |
| | 1714 | clientDeferred = creator.connectTCP(addr.host, addr.port) |
| | 1715 | def setClient(clientProtocol): |
| | 1716 | self.clientProtocol = clientProtocol |
| | 1717 | clientDeferred.addCallback(setClient) |
| | 1718 | return defer.gatherResults([ |
| | 1719 | self.serverFactory.protocolConnectionMade, |
| | 1720 | clientDeferred]) |
| | 1721 | |
| | 1722 | |
| | 1723 | def aBug(self, *args): |
| | 1724 | """ |
| | 1725 | Fake implementation of a callback which illegally raises an |
| | 1726 | exception. |
| | 1727 | """ |
| | 1728 | raise RuntimeError("ONO I AM BUGGY CODE") |
| | 1729 | |
| | 1730 | |
| | 1731 | def _notificationRaisesTest(self): |
| | 1732 | """ |
| | 1733 | Helper for testing that an exception is logged by the time the |
| | 1734 | client protocol loses its connection. |
| | 1735 | """ |
| | 1736 | closed = self.clientProtocol.closedDeferred = defer.Deferred() |
| | 1737 | self.clientProtocol.transport.loseWriteConnection() |
| | 1738 | def check(ignored): |
| | 1739 | errors = self.flushLoggedErrors(RuntimeError) |
| | 1740 | self.assertEqual(len(errors), 1) |
| | 1741 | closed.addCallback(check) |
| | 1742 | return closed |
| | 1743 | |
| | 1744 | |
| | 1745 | def test_readNotificationRaises(self): |
| | 1746 | """ |
| | 1747 | If C{readConnectionLost} raises an exception when the transport |
| | 1748 | calls it to notify the protocol of that event, the exception should |
| | 1749 | be logged and the protocol should be disconnected completely. |
| | 1750 | """ |
| | 1751 | self.serverFactory.protocol.readConnectionLost = self.aBug |
| | 1752 | return self._notificationRaisesTest() |
| | 1753 | |
| | 1754 | |
| | 1755 | def test_writeNotificationRaises(self): |
| | 1756 | """ |
| | 1757 | If C{writeConnectionLost} raises an exception when the transport |
| | 1758 | calls it to notify the protocol of that event, the exception should |
| | 1759 | be logged and the protocol should be disconnected completely. |
| | 1760 | """ |
| | 1761 | self.clientProtocol.writeConnectionLost = self.aBug |
| | 1762 | return self._notificationRaisesTest() |
| | 1763 | |
| | 1764 | |
| | 1765 | |
| | 1766 | class LogTestCase(unittest.TestCase): |
| | 1767 | """ |
| | 1768 | Test logging facility of TCP base classes. |
| | 1769 | """ |
| | 1770 | |
| | 1771 | def test_logstrClientSetup(self): |
| | 1772 | """ |
| | 1773 | Check that the log customization of the client transport happens |
| | 1774 | once the client is connected. |
| | 1775 | """ |
| | 1776 | server = MyServerFactory() |
| | 1777 | |
| | 1778 | client = MyClientFactory() |
| | 1779 | client.protocolConnectionMade = defer.Deferred() |
| | 1780 | |
| | 1781 | port = reactor.listenTCP(0, server, interface='::1') |
| | 1782 | self.addCleanup(port.stopListening) |
| | 1783 | |
| | 1784 | connector = reactor.connectTCP( |
| | 1785 | port.getHost().host, port.getHost().port, client) |
| | 1786 | self.addCleanup(connector.disconnect) |
| | 1787 | |
| | 1788 | # It should still have the default value |
| | 1789 | self.assertEquals(connector.transport.logstr, |
| | 1790 | "Uninitialized") |
| | 1791 | |
| | 1792 | def cb(ign): |
| | 1793 | self.assertEquals(connector.transport.logstr, |
| | 1794 | "AccumulatingProtocol,client") |
| | 1795 | client.protocolConnectionMade.addCallback(cb) |
| | 1796 | return client.protocolConnectionMade |
| | 1797 | |
| | 1798 | |
| | 1799 | |
| | 1800 | class PauseProducingTestCase(unittest.TestCase): |
| | 1801 | """ |
| | 1802 | Test some behaviors of pausing the production of a transport. |
| | 1803 | """ |
| | 1804 | |
| | 1805 | def test_pauseProducingInConnectionMade(self): |
| | 1806 | """ |
| | 1807 | In C{connectionMade} of a client protocol, C{pauseProducing} used to be |
| | 1808 | ignored: this test is here to ensure it's not ignored. |
| | 1809 | """ |
| | 1810 | server = MyServerFactory() |
| | 1811 | |
| | 1812 | client = MyClientFactory() |
| | 1813 | client.protocolConnectionMade = defer.Deferred() |
| | 1814 | |
| | 1815 | port = reactor.listenTCP(0, server, interface='::1') |
| | 1816 | self.addCleanup(port.stopListening) |
| | 1817 | |
| | 1818 | connector = reactor.connectTCP( |
| | 1819 | port.getHost().host, port.getHost().port, client) |
| | 1820 | self.addCleanup(connector.disconnect) |
| | 1821 | |
| | 1822 | def checkInConnectionMade(proto): |
| | 1823 | tr = proto.transport |
| | 1824 | # The transport should already be monitored |
| | 1825 | self.assertIn(tr, reactor.getReaders() + |
| | 1826 | reactor.getWriters()) |
| | 1827 | proto.transport.pauseProducing() |
| | 1828 | self.assertNotIn(tr, reactor.getReaders() + |
| | 1829 | reactor.getWriters()) |
| | 1830 | d = defer.Deferred() |
| | 1831 | d.addCallback(checkAfterConnectionMade) |
| | 1832 | reactor.callLater(0, d.callback, proto) |
| | 1833 | return d |
| | 1834 | def checkAfterConnectionMade(proto): |
| | 1835 | tr = proto.transport |
| | 1836 | # The transport should still not be monitored |
| | 1837 | self.assertNotIn(tr, reactor.getReaders() + |
| | 1838 | reactor.getWriters()) |
| | 1839 | client.protocolConnectionMade.addCallback(checkInConnectionMade) |
| | 1840 | return client.protocolConnectionMade |
| | 1841 | |
| | 1842 | if not interfaces.IReactorFDSet.providedBy(reactor): |
| | 1843 | test_pauseProducingInConnectionMade.skip = "Reactor not providing IReactorFDSet" |
| | 1844 | |
| | 1845 | |
| | 1846 | |
| | 1847 | class CallBackOrderTestCase(unittest.TestCase): |
| | 1848 | """ |
| | 1849 | Test the order of reactor callbacks |
| | 1850 | """ |
| | 1851 | |
| | 1852 | def test_loseOrder(self): |
| | 1853 | """ |
| | 1854 | Check that Protocol.connectionLost is called before factory's |
| | 1855 | clientConnectionLost |
| | 1856 | """ |
| | 1857 | server = MyServerFactory() |
| | 1858 | server.protocolConnectionMade = (defer.Deferred() |
| | 1859 | .addCallback(lambda proto: self.addCleanup( |
| | 1860 | proto.transport.loseConnection))) |
| | 1861 | |
| | 1862 | client = MyClientFactory() |
| | 1863 | client.protocolConnectionLost = defer.Deferred() |
| | 1864 | client.protocolConnectionMade = defer.Deferred() |
| | 1865 | |
| | 1866 | def _cbCM(res): |
| | 1867 | """ |
| | 1868 | protocol.connectionMade callback |
| | 1869 | """ |
| | 1870 | reactor.callLater(0, client.protocol.transport.loseConnection) |
| | 1871 | |
| | 1872 | client.protocolConnectionMade.addCallback(_cbCM) |
| | 1873 | |
| | 1874 | port = reactor.listenTCP(0, server, interface='::1') |
| | 1875 | self.addCleanup(port.stopListening) |
| | 1876 | |
| | 1877 | connector = reactor.connectTCP( |
| | 1878 | port.getHost().host, port.getHost().port, client) |
| | 1879 | self.addCleanup(connector.disconnect) |
| | 1880 | |
| | 1881 | def _cbCCL(res): |
| | 1882 | """ |
| | 1883 | factory.clientConnectionLost callback |
| | 1884 | """ |
| | 1885 | return 'CCL' |
| | 1886 | |
| | 1887 | def _cbCL(res): |
| | 1888 | """ |
| | 1889 | protocol.connectionLost callback |
| | 1890 | """ |
| | 1891 | return 'CL' |
| | 1892 | |
| | 1893 | def _cbGather(res): |
| | 1894 | self.assertEquals(res, ['CL', 'CCL']) |
| | 1895 | |
| | 1896 | d = defer.gatherResults([ |
| | 1897 | client.protocolConnectionLost.addCallback(_cbCL), |
| | 1898 | client.deferred.addCallback(_cbCCL)]) |
| | 1899 | return d.addCallback(_cbGather) |
| | 1900 | |
| | 1901 | |
| | 1902 | |
| | 1903 | try: |
| | 1904 | import resource |
| | 1905 | except ImportError: |
| | 1906 | pass |
| | 1907 | else: |
| | 1908 | numRounds = resource.getrlimit(resource.RLIMIT_NOFILE)[0] + 10 |
| | 1909 | ProperlyCloseFilesTestCase.numberRounds = numRounds |