Ticket #3420: webclient.diff

File webclient.diff, 12.5 KB (added by yasusii, 6 years ago)

patch to and the persistent connection feature to twisted.web.client.Agent

  • twisted/web/_newclient.py

    diff -ru Twisted-10.1.0.orig/twisted/web/_newclient.py Twisted-10.1.0/twisted/web/_newclient.py
    old new  
    524524    @ivar bodyProducer: C{None} or an L{IBodyProducer} provider which
    525525        produces the content body to send to the remote HTTP server.
    526526    """
    527     def __init__(self, method, uri, headers, bodyProducer):
     527    def __init__(self, method, uri, headers, bodyProducer, persistent=False):
    528528        self.method = method
    529529        self.uri = uri
    530530        self.headers = headers
    531531        self.bodyProducer = bodyProducer
     532        self.persistent = persistent
    532533
    533534
    534535    def _writeHeaders(self, transport, TEorCL):
     
    542543        requestLines = []
    543544        requestLines.append(
    544545            '%s %s HTTP/1.1\r\n' % (self.method, self.uri))
    545         requestLines.append('Connection: close\r\n')
     546        if not self.persistent:
     547            requestLines.append('Connection: close\r\n')
    546548        if TEorCL is not None:
    547549            requestLines.append(TEorCL)
    548550        for name, values in self.headers.getAllRawHeaders():
     
    12391241    """
    12401242    _state = 'QUIESCENT'
    12411243    _parser = None
     1244    persistent = False
     1245
     1246    @property
     1247    def state(self):
     1248        return self._state
    12421249
    12431250    def request(self, request):
    12441251        """
     
    12591266            may errback with L{RequestNotSent} if it is not possible to send
    12601267            any more requests using this L{HTTP11ClientProtocol}.
    12611268        """
     1269        self.persistent = request.persistent
    12621270        if self._state != 'QUIESCENT':
    12631271            return fail(RequestNotSent())
    12641272
     
    12781286        def cbRequestWrotten(ignored):
    12791287            if self._state == 'TRANSMITTING':
    12801288                self._state = 'WAITING'
    1281                 # XXX We're stuck in WAITING until we lose the connection now.
    1282                 # This will be wrong when persistent connections are supported.
    1283                 # See #3420 for persistent connections.
    1284 
    12851289                self._responseDeferred.chainDeferred(self._finishedRequest)
    12861290
    12871291        def ebRequestWriting(err):
     
    13071311            the L{HTTPClientParser} which were not part of the response it
    13081312            was parsing.
    13091313        """
    1310         # XXX this is because Connection: close is hard-coded above, probably
    1311         # will want to change that at some point.  Either the client or the
    1312         # server can control this.
    1313 
    1314         # XXX If the connection isn't being closed at this point, it's
    1315         # important to make sure the transport isn't paused (after _giveUp,
    1316         # or inside it, or something - after the parser can no longer touch
    1317         # the transport)
     1314        assert self._state in ('WAITING', 'TRANSMITTING')
    13181315
    1319         # For both of the above, see #3420 for persistent connections.
    1320 
    1321         if self._state == 'TRANSMITTING':
     1316        if self._state == 'WAITING':
     1317            self._state = 'QUIESCENT'
     1318        else:
    13221319            # The server sent the entire response before we could send the
    13231320            # whole request.  That sucks.  Oh well.  Fire the request()
    13241321            # Deferred with the response.  But first, make sure that if the
     
    13271324            self._state = 'TRANSMITTING_AFTER_RECEIVING_RESPONSE'
    13281325            self._responseDeferred.chainDeferred(self._finishedRequest)
    13291326
    1330         self._giveUp(Failure(ConnectionDone("synthetic!")))
     1327        reason = ConnectionDone("synthetic!")
     1328        self._giveUp(Failure(reason))
    13311329
    13321330
    13331331    def _disconnectParser(self, reason):
  • twisted/web/client.py

    diff -ru Twisted-10.1.0.orig/twisted/web/client.py Twisted-10.1.0/twisted/web/client.py
    old new  
    628628    @since: 9.0
    629629    """
    630630    _protocol = HTTP11ClientProtocol
     631    maxConnections = 2 # RFC 2616: A single-user client SHOULD NOT
     632                       # maintain more than 2 connections with any
     633                       # server or proxy.
    631634
    632     def __init__(self, reactor, contextFactory=WebClientContextFactory()):
     635    def __init__(self, reactor, contextFactory=WebClientContextFactory(),
     636                 persistent=False):
    633637        self._reactor = reactor
    634638        self._contextFactory = contextFactory
     639        self.persistent = persistent
     640        self._semaphores = {}
     641        self._protocolCache = {}
    635642
    636643
    637644    def _wrapContextFactory(self, host, port):
     
    704711        @rtype: L{Deferred}
    705712        """
    706713        scheme, host, port, path = _parse(uri)
    707         d = self._connect(scheme, host, port)
    708714        if headers is None:
    709715            headers = Headers()
    710716        if not headers.hasHeader('host'):
     
    713719            headers = Headers(dict(headers.getAllRawHeaders()))
    714720            headers.addRawHeader(
    715721                'host', self._computeHostValue(scheme, host, port))
     722        if self.persistent:
     723            sem = self._semaphores.get((scheme, host, port))
     724            if sem is None:
     725                sem = defer.DeferredSemaphore(self.maxConnections)
     726                self._semaphores[scheme, host, port] = sem
     727            return sem.run(self._request, method, scheme, host, port, path,
     728                           headers, bodyProducer)
     729        else:
     730            return self._request(
     731                method, scheme, host, port, path, headers, bodyProducer)
     732
     733
     734    def _request(self, method, scheme, host, port, path, headers, bodyProducer):
     735        """
     736        Issue a new request.
     737
     738        @param method: The request method to send.
     739        @type method: C{str}
     740
     741        @param uri: The request URI send.
     742        @type uri: C{str}
     743
     744        @param headers: The request headers to send.  If no I{Host} header is
     745            included, one will be added based on the request URI.
     746        @type headers: L{Headers}
     747
     748        @param bodyProducer: An object which will produce the request body or,
     749            if the request body is to be empty, L{None}.
     750        @type bodyProducer: L{IBodyProducer} provider
     751
     752        @return: A L{Deferred} which fires with the result of the request (a
     753            L{Response} instance), or fails if there is a problem setting up a
     754            connection over which to issue the request.  It may also fail with
     755            L{SchemeNotSupported} if the scheme of the given URI is not
     756            supported.
     757        @rtype: L{Deferred}
     758        """
     759        protos = self._protocolCache.setdefault((scheme, host, port), [])
     760        while protos:
     761            # connection exists
     762            p = protos.pop(0)
     763            if p.state == 'QUIESCENT':
     764                d = defer.succeed(p)
     765                break
     766        else:
     767            # new connection
     768            d = self._connect(scheme, host, port)
    716769        def cbConnected(proto):
    717             return proto.request(Request(method, path, headers, bodyProducer))
     770            def cbRequest(response):
     771                if self.persistent:
     772                    protos.append(proto)
     773                return response
     774            req = Request(method, path, headers, bodyProducer,
     775                          persistent=self.persistent)
     776            rd = proto.request(req)
     777            rd.addCallback(cbRequest)
     778            return rd
    718779        d.addCallback(cbConnected)
    719780        return d
    720781
  • twisted/web/test/test_newclient.py

    diff -ru Twisted-10.1.0.orig/twisted/web/test/test_newclient.py Twisted-10.1.0/twisted/web/test/test_newclient.py
    old new  
    775775    """
    776776    method = 'GET'
    777777    stopped = False
     778    persistent = False
    778779
    779780    def writeTo(self, transport):
    780781        self.finished = Deferred()
     
    793794    returns a succeeded L{Deferred}.  This vaguely emulates the behavior of a
    794795    L{Request} with no body producer.
    795796    """
     797    persistent = False
     798
    796799    def writeTo(self, transport):
    797800        transport.write('SOME BYTES')
    798801        return succeed(None)
     
    861864        L{RequestGenerationFailed} wrapping the underlying failure.
    862865        """
    863866        class BrokenRequest:
     867            persistent = False
    864868            def writeTo(self, transport):
    865869                return fail(ArbitraryException())
    866870
     
    883887        a L{Failure} of L{RequestGenerationFailed} wrapping that exception.
    884888        """
    885889        class BrokenRequest:
     890            persistent = False
    886891            def writeTo(self, transport):
    887892                raise ArbitraryException()
    888893
     
    952957            self.assertEqual(response.code, 200)
    953958            self.assertEqual(response.headers, Headers())
    954959            self.assertTrue(self.transport.disconnecting)
     960            self.assertEqual(self.protocol.state, 'QUIESCENT')
    955961        d.addCallback(cbRequest)
    956962        self.protocol.dataReceived(
    957963            "HTTP/1.1 200 OK\r\n"
     
    9971003            p = AccumulatingProtocol()
    9981004            whenFinished = p.closedDeferred = Deferred()
    9991005            response.deliverBody(p)
     1006            self.assertEqual(
     1007                self.protocol.state, 'TRANSMITTING_AFTER_RECEIVING_RESPONSE')
    10001008            return whenFinished.addCallback(
    10011009                lambda ign: (response, p.data))
    10021010        d.addCallback(cbResponse)
     
    12931301            "\r\n")
    12941302
    12951303
     1304    def test_sendSimplestPersistentRequest(self):
     1305        """
     1306        A pesistent request does not send 'Connection: close' header.
     1307        """
     1308        req = Request('GET', '/', _boringHeaders, None, persistent=True)
     1309        req.writeTo(self.transport)
     1310        self.assertEqual(
     1311            self.transport.value(),
     1312            "GET / HTTP/1.1\r\n"
     1313            "Host: example.com\r\n"
     1314            "\r\n")
     1315
     1316
    12961317    def test_sendRequestHeaders(self):
    12971318        """
    12981319        L{Request.writeTo} formats header data and writes it to the given
  • twisted/web/test/test_webclient.py

    diff -ru Twisted-10.1.0.orig/twisted/web/test/test_webclient.py Twisted-10.1.0/twisted/web/test/test_webclient.py
    old new  
    857857    """
    858858    def __init__(self):
    859859        self.requests = []
     860        self.state = 'QUIESCENT'
    860861
    861862
    862863    def request(self, request):
     
    916917        the TCP connection attempt fails.
    917918        """
    918919        result = self.agent.request('GET', 'http://foo/')
    919 
    920920        # Cause the connection to be refused
    921921        host, port, factory = self.reactor.tcpClients.pop()[:3]
    922922        factory.clientConnectionFailed(None, Failure(ConnectionRefusedError()))
    923923        self.completeConnection()
    924 
    925924        return self.assertFailure(result, ConnectionRefusedError)
    926925
    927926
     
    10511050        self.assertIdentical(req.bodyProducer, body)
    10521051
    10531052
     1053    def test_persistentRequest(self):
     1054        """
     1055        Test L{Agent.request} with persistent=True option.
     1056        """
     1057        self.agent.persistent = True
     1058        self.agent._connect = self._dummyConnect
     1059
     1060        # first request
     1061        d1 = self.agent.request(
     1062            'GET', 'http://example.com:1234/foo', None, None)
     1063        p1 = self.protocol
     1064        self.assertEquals(len(p1.requests), 1)
     1065        req, res = p1.requests.pop()
     1066        self.assertEquals(req.uri, '/foo')
     1067        # second request
     1068        d2 = self.agent.request(
     1069            'GET', 'http://example.com:1234/bar', None, None)
     1070        self.assertTrue(self.protocol is not p1)
     1071        p2 = self.protocol
     1072        self.assertEquals(len(p2.requests), 1)
     1073        req, res = p2.requests.pop()
     1074        self.assertEquals(req.uri, '/bar')
     1075        # third request
     1076        d3 = self.agent.request(
     1077            'GET', 'http://example.com:1234/baz', None, None)
     1078        # third request does not make a new protocol instance
     1079        self.assertTrue(self.protocol is p2)
     1080        sem = self.agent._semaphores['http', 'example.com', 1234]
     1081        self.assertIsInstance(sem, defer.DeferredSemaphore)
     1082        # third request is in waiting
     1083        self.assertEquals(len(sem.waiting), 1)
     1084        d1.result.result.callback('First request done')
     1085        # third request starts
     1086        self.assertEquals(len(sem.waiting), 0)
     1087        cache = self.agent._protocolCache['http', 'example.com', 1234]
     1088        self.assertEquals(len(cache), 0)
     1089        d2.result.result.callback('Second request done')
     1090        self.assertEquals(len(cache), 1)
     1091
     1092
    10541093    def test_hostProvided(self):
    10551094        """
    10561095        If C{None} is passed to L{Agent.request} for the C{headers}