--- web2/test/test_client.py	2007-10-19 01:16:52.000000000 -0700
+++ new/web2/test/test_client.py	2008-04-25 14:21:14.240015000 -0700
@@ -5,7 +5,7 @@
 Tests for HTTP client.
 """
 
-from twisted.internet import protocol, defer
+from twisted.internet import protocol, defer, reactor
 
 from twisted.web2.client import http
 from twisted.web2 import http_headers
@@ -35,11 +35,11 @@
 
 
 class ClientTests(HTTPTests):
-    def connect(self, logFile=None, maxPipeline=4,
+    def connect(self, mgr = None, logFile=None, maxPipeline=4,
                 inputTimeOut=60000, betweenRequestsTimeOut=600000):
         cxn = TestConnection()
 
-        cxn.client = http.HTTPClientProtocol()
+        cxn.client = http.HTTPClientProtocol(mgr)
         cxn.client.inputTimeOut = inputTimeOut
         cxn.server = TestServer()
 
@@ -492,3 +492,265 @@
         d.addCallback(cb)
         return d
 
+class TestHTTPClientManager(object):
+    """
+    An HTTPClientManager for testing purposes. It calls a deferred
+    whenever the client becomes Idle, Gone, or Pipelined.
+    """
+
+    def __init__(self):
+        self.df = defer.Deferred()
+
+    def clientBusy(self, proto):
+        pass
+
+    def clientIdle(self, proto):
+        df = self.df
+        self.df = defer.Deferred()
+        df.callback('idle')
+
+    def clientPipelining(self, proto):
+        df = self.df
+        self.df = defer.Deferred()
+        df.callback('pipeline')
+
+    def clientGone(self, proto):
+        df = self.df
+        self.df = defer.Deferred()
+        df.callback('gone')
+
+class TestHTTPClientPipelining(ClientTests):
+    """
+    Test that the http client works with pipelined requests.
+    """
+    
+    timeout = 2
+    
+    def test_pipelining(self):
+        """
+        Submitting multiple requests using the manager.
+        """
+        requests = 5
+        mgr = TestHTTPClientManager()
+        cxn = self.connect(mgr = mgr, inputTimeOut=None)
+        req = http.ClientRequest('GET', '/', None, None)
+
+        didIt = [0]
+
+        def sendData(resp, data):
+            reactor.callLater(0, self.writeToClient, cxn, data)
+            return resp
+
+        def sendNextResponse(resp, data):
+            reactor.callLater(0, self.writeLines, cxn, data)
+            return resp
+
+        def submitRequest(_):
+            didIt[0] += 1
+
+            if didIt[0] == requests - 1:
+                keepAlive='close'
+            else:
+                keepAlive='Keep-Alive'
+
+            cxn.server.data = ''
+            df = mgr.df
+
+            d = cxn.client.submitRequest(req, closeAfter=(didIt[0] == requests))
+            d.addCallback(sendData, 'foobar' + str(didIt[0]))
+            
+            if didIt[0] < requests:
+                d.addCallback(sendNextResponse, ('HTTP/1.1 200 OK',
+                                      'Connection: '+ keepAlive,
+                                      'Content-Length: 7',
+                                      '\r\n'))
+                d.addCallback(self.checkResponse, 200, [], 7, 'foobar' + str(didIt[0]))
+                df.addCallback(submitRequest)
+            
+            if didIt[0] == 1:
+                self.writeLines(cxn, ('HTTP/1.1 200 OK',
+                                      'Connection: '+ keepAlive,
+                                      'Content-Length: 7',
+                                      '\r\n'))
+
+            d = defer.DeferredList([d, df], fireOnOneErrback=True)
+            if didIt[0] == requests:
+                d.addCallback(lambda _: self.assertDone(cxn))
+            return d
+
+        d = submitRequest(None)
+
+        return d
+
+    def test_firstRequestPipeline(self):
+        """
+        Verify that the client manager gets a call before the first
+        request completes.
+        """
+        mgr = TestHTTPClientManager()
+        cxn = self.connect(mgr = mgr, inputTimeOut=None)
+        req = http.ClientRequest('GET', '/', None, None)
+
+        def gotData(data):
+            self.assertEquals(data, '1234567890')
+
+        def sendResp(type):
+            self.assertEquals(type, 'pipeline')
+
+        def gotResp(resp):
+            self.assertEquals(resp.code, 200)
+            self.assertHeaders(resp, [])
+            self.assertEquals(resp.stream.length, 10)
+
+            self.writeToClient(cxn, '1234567890')
+
+            return defer.maybeDeferred(resp.stream.read).addCallback(gotData)
+
+        df = mgr.df.addCallback(sendResp)
+        d = cxn.client.submitRequest(req, closeAfter = False).addCallback(gotResp)
+
+        self.assertReceived(cxn, 'GET / HTTP/1.1',
+                            ['Connection: Keep-Alive'])
+
+        self.writeLines(cxn, ('HTTP/1.1 200 OK',
+                              'Content-Length: 10',
+                              '\r\n'))
+
+        return df
+
+    def test_pipelineClosed(self):
+        """
+        Submitting multiple requests but the server closes the connection
+        while there are pipelined requests.
+        """
+        requests = 5
+        stopAfter = 2
+        mgr = TestHTTPClientManager()
+        cxn = self.connect(mgr = mgr, inputTimeOut=None)
+        req = http.ClientRequest('GET', '/', None, None)
+
+        didIt = [0]
+
+        def sendData(resp, data):
+            reactor.callLater(0, self.writeToClient, cxn, data)
+            return resp
+
+        def sendNextResponse(resp, data):
+            reactor.callLater(0, self.writeLines, cxn, data)
+            return resp
+
+        def submitRequest(_):
+            didIt[0] += 1
+
+            if didIt[0] == requests - 1 or didIt[0] == stopAfter:
+                keepAlive='close'
+            else:
+                keepAlive='Keep-Alive'
+
+            cxn.server.data = ''
+            df = mgr.df
+
+            d = cxn.client.submitRequest(req, closeAfter=(didIt[0] == requests))
+            d.addCallback(sendData, 'foobar' + str(didIt[0]))
+            
+            if didIt[0] < requests:
+                if didIt[0] <= stopAfter:
+                    d.addCallback(sendNextResponse, ('HTTP/1.1 200 OK',
+                                          'Connection: '+ keepAlive,
+                                          'Content-Length: 7',
+                                          '\r\n'))
+                    d.addCallback(self.checkResponse, 200, [], 7, 'foobar' + str(didIt[0]))
+                else:
+                    self.assertFailure(d, getattr(http, 'PipelineError', http.ProtocolError))
+                d = df.addCallback(submitRequest)
+            else:
+                self.assertFailure(d, getattr(http, 'PipelineError', http.ProtocolError))
+
+            if didIt[0] == 1:
+                self.writeLines(cxn, ('HTTP/1.1 200 OK',
+                                      'Connection: '+ keepAlive,
+                                      'Content-Length: 7',
+                                      '\r\n'))
+
+            d = defer.DeferredList([d, df], fireOnOneErrback=True)
+            if didIt[0] == requests:
+                d.addCallback(lambda _: self.assertDone(cxn))
+            return d
+
+        d = submitRequest(None)
+
+        return d
+
+    def test_pipelineStreamClosed(self):
+        """
+        Submitting multiple requests but the client closes the stream
+        of a request while there are other requests pipelined.
+        """
+        requests = 5
+        closeAfter = 3
+        mgr = TestHTTPClientManager()
+        cxn = self.connect(mgr = mgr, inputTimeOut=None)
+        req = http.ClientRequest('GET', '/', None, None)
+
+        didIt = [0]
+
+        def sendData(resp, data):
+            reactor.callLater(0, self.writeToClient, cxn, data)
+            return resp
+
+        def sendNextResponse(resp, data):
+            reactor.callLater(0, self.writeLines, cxn, data)
+            return resp
+
+        def closeStream(resp, code, headers, length, data):
+            def gotData(gotdata):
+                self.assertEquals(gotdata, data)
+            self.assertEquals(resp.code, code)
+            self.assertHeaders(resp, headers)
+            self.assertEquals(resp.stream.length, length)
+            resp.stream.close()
+
+        def submitRequest(_):
+            didIt[0] += 1
+
+            if didIt[0] == requests - 1:
+                keepAlive='close'
+            else:
+                keepAlive='Keep-Alive'
+
+            cxn.server.data = ''
+            df = mgr.df
+
+            d = cxn.client.submitRequest(req, closeAfter=(didIt[0] == requests))
+            d.addCallback(sendData, 'foobar' + str(didIt[0]))
+            
+            if didIt[0] < requests:
+                if didIt[0] <= closeAfter:
+                    d.addCallback(sendNextResponse, ('HTTP/1.1 200 OK',
+                                          'Connection: '+ keepAlive,
+                                          'Content-Length: 7',
+                                          '\r\n'))
+                    if didIt[0] == closeAfter:
+                        d.addCallback(closeStream, 200, [], 7, 'foobar' + str(didIt[0]))
+                    else:
+                        d.addCallback(self.checkResponse, 200, [], 7, 'foobar' + str(didIt[0]))
+                else:
+                    self.assertFailure(d, getattr(http, 'PipelineError', http.ProtocolError))
+                d = df.addCallback(submitRequest)
+            else:
+                self.assertFailure(d, getattr(http, 'PipelineError', http.ProtocolError))
+
+            if didIt[0] == 1:
+                self.writeLines(cxn, ('HTTP/1.1 200 OK',
+                                      'Connection: '+ keepAlive,
+                                      'Content-Length: 7',
+                                      '\r\n'))
+
+            d = defer.DeferredList([d, df], fireOnOneErrback=True)
+            if didIt[0] == requests:
+                d.addCallback(lambda _: self.assertDone(cxn))
+            return d
+
+        d = submitRequest(None)
+
+        return d
