Ticket #6493: writesequence-calls-pauseproducer.patch

File writesequence-calls-pauseproducer.patch, 4.6 KB (added by John Popplewell, 9 years ago)
  • twisted/internet/_pollingfile.py

     
    246246        """
    247247        if unicode in map(type, seq):
    248248            raise TypeError("Unicode not allowed in output buffer.")
     249        if self.disconnecting:
     250            return
    249251        self.outQueue.extend(seq)
     252        # check for output buffer full
     253        if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
     254            self.bufferFull()
    250255
    251256
    252257    def write(self, data):
     
    263268        if self.disconnecting:
    264269            return
    265270        self.outQueue.append(data)
     271        # check for output buffer full
    266272        if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
    267273            self.bufferFull()
    268274
  • twisted/internet/test/test_pollingfile.py

     
    44"""
    55Tests for L{twisted.internet._pollingfile}.
    66"""
     7import os
    78
    89from twisted.python.runtime import platform
    910from twisted.trial.unittest import TestCase
     11from twisted.protocols import basic
     12from twisted.internet.test.reactormixins import ReactorBuilder
    1013
     14
     15
    1116if platform.isWindows():
     17    import win32pipe
     18    import win32security
     19
    1220    from twisted.internet import _pollingfile
     21
     22    from twisted.internet._pollingfile import _PollingTimer
     23    _skipNotWindows = None
    1324else:
    14     _pollingfile = None
     25    _skipNotWindows = "Test will run only on Windows."
     26    _PollingTimer = object
     27 
    1528
     29 
     30class PipeRunner(_PollingTimer):
     31    """
     32    Builds, initializes and runs a pair of read/write pipes.
     33    """
     34    def __init__(self, pipeSize, doneReadCB, doneWriteCB, receivedCB, reactor):
     35        _PollingTimer.__init__(self, reactor)
     36        sAttrs = win32security.SECURITY_ATTRIBUTES()
     37        sAttrs.bInheritHandle = 1
     38        hRead, hWrite = win32pipe.CreatePipe(sAttrs, pipeSize)
     39        self.reader = _pollingfile._PollableReadPipe(hRead, receivedCB, doneReadCB)
     40        self.writer = _pollingfile._PollableWritePipe(hWrite, doneWriteCB)
     41        self._addPollableResource(self.reader)
     42        self._addPollableResource(self.writer)
    1643
    1744
     45
     46class TestPollablePipes(ReactorBuilder):
     47    """
     48    Tests for L{_pollingfile._PollableWritePipe} and
     49    L{_pollingfile._PollableReadPipe}.
     50    """
     51    def setUp(self):
     52        self._oldBufferSize = _pollingfile.FULL_BUFFER_SIZE
     53
     54
     55    def tearDown(self):
     56        _pollingfile.FULL_BUFFER_SIZE = self._oldBufferSize
     57
     58
     59    def test_pushProducerUsingWriteSequence(self):
     60        """
     61        Test write pipe as a consumer with a push producer using
     62        writeSequence().
     63
     64        Calling writeSequence() should behave the same as calling write()
     65        with each item, including flow control and disconnect handling.
     66
     67        Note: test depends on Ticket #2835 resolution.
     68
     69        See Ticket #6493: writeSequence() doesn't pause the producer when
     70        outgoing buffer is full.
     71        """
     72        pipeSize = 4096  # avoid Ticket #5365 issues
     73        _pollingfile.FULL_BUFFER_SIZE = 1024
     74        testMessage = '0' * 1024
     75
     76        class TestProtocol(basic.LineReceiver):
     77            def lineReceived(self, line):
     78                self.testResponse = line
     79                transport.writer.close()
     80
     81        class TestProducer(object):
     82            pauseCount = 0
     83            def resumeProducing(self):
     84                transport.writer.writeSequence((testMessage+"\r\n",))
     85
     86            def pauseProducing(self):
     87                self.pauseCount += 1
     88
     89            def stopProducing(self):
     90                pass
     91
     92        reactor = self.buildReactor()
     93        r, w = lambda: reactor.stop(), lambda: None
     94        protocol, producer = TestProtocol(), TestProducer()
     95        transport = PipeRunner(pipeSize, r, w, protocol.dataReceived, reactor)
     96        transport.writer.registerProducer(producer, True)
     97        producer.resumeProducing()
     98        self.runReactor(reactor)
     99        self.assertEqual(testMessage, protocol.testResponse)
     100        self.assertEqual(1, producer.pauseCount)
     101
     102
     103globals().update(TestPollablePipes.makeTestCaseClasses())
     104
     105
    18106class TestPollableWritePipe(TestCase):
    19107    """
    20108    Tests for L{_pollingfile._PollableWritePipe}.
     
    41129
    42130
    43131
     132if _skipNotWindows:
     133    TestPollablePipes.skip = skipMessage
     134    TestPollableWritePipeUnicode.skip = skipMessage
    44135
    45 if _pollingfile is None:
    46     TestPollableWritePipe.skip = "Test will run only on Windows."