Ticket #6493: test_pollingfile-5413-write-sequence.patch

File test_pollingfile-5413-write-sequence.patch, 6.9 KB (added by John Popplewell, 9 years ago)
  • twisted/internet/test/test_pollingfile.py

     
    55Tests for L{twisted.internet._pollingfile}.
    66"""
    77
     8from zope.interface import implements
    89from twisted.python.runtime import platform
    910from twisted.trial.unittest import TestCase
     11from twisted.internet import interfaces, defer, reactor
     12from twisted.protocols import basic
    1013
    1114if platform.isWindows():
     15    import win32pipe
     16    import win32security
     17
    1218    from twisted.internet import _pollingfile
     19
     20    from twisted.internet._pollingfile import _PollingTimer, _PollableReadPipe
     21    from twisted.internet._pollingfile import _PollableWritePipe
     22    _skipNotWindows = False
     23    _oldPollingFile = True
    1324else:
    14     _pollingfile = None
     25    _skipNotWindows = True
     26    _PollingTimer = object
    1527
    1628
    1729
    18 class TestPollableWritePipe(TestCase):
     30class _TestProducerWriteSequence(object):
    1931    """
     32    Test push producer based on the Produce() class in:
     33    doc/core/examples/streaming.py except that it produces a series of
     34    integer messages in blocks using writeSequence().
     35    """
     36
     37    implements(interfaces.IPushProducer)
     38
     39    def __init__(self, transport, count, blockSize, msgFormat):
     40        self._transport = transport
     41        self._goal = count
     42        self._produced = 0
     43        self._paused = False
     44        self.pauseCount = 0
     45        self.resumeCount = 0
     46        self.stopCount = 0
     47        self.msgFormat = msgFormat
     48        self.blockSize = blockSize
     49
     50
     51    def pauseProducing(self):
     52        self._paused = True
     53        self.pauseCount += 1
     54
     55
     56    def resumeProducing(self):
     57        self._paused = False
     58        self.resumeCount += 1
     59
     60        while not self._paused and self._produced < self._goal:
     61            block = []
     62            nBlocks = min(self.blockSize, self._goal - self._produced)
     63            for i in range(nBlocks):
     64                block.append(self.msgFormat % (self._produced + i,))
     65            self._transport.writeSequence(block)
     66            self._produced += self.blockSize
     67
     68        if self._produced == self._goal:
     69            self._transport.unregisterProducer()
     70            self._transport.writer.close()
     71
     72
     73    def stopProducing(self):
     74        self._produced = self._goal
     75        self.stopCount += 1
     76
     77
     78
     79class _PipeRunner(_PollingTimer):
     80    """
     81    Builds, initializes and runs a pair of read/write pipes.
     82    """
     83    def __init__(self, pipeSize, doneReadCB, doneWriteCB, receivedCB, reactor=reactor):
     84        _PollingTimer.__init__(self, reactor)
     85        sAttrs = win32security.SECURITY_ATTRIBUTES()
     86        sAttrs.bInheritHandle = 1
     87        hRead, hWrite = win32pipe.CreatePipe(sAttrs, pipeSize)
     88        self.reader = _PollableReadPipe(hRead, receivedCB, doneReadCB)
     89        self.writer = _PollableWritePipe(hWrite, doneWriteCB)
     90        self._addPollableResource(self.reader)
     91        self._addPollableResource(self.writer)
     92
     93    def write(self, data):
     94        self.writer.write(data)
     95
     96    def writeSequence(self, seq):
     97        self.writer.writeSequence(seq)
     98
     99    def registerProducer(self, producer, streaming):
     100        self.writer.registerProducer(producer, streaming)
     101
     102    def unregisterProducer(self):
     103        self.writer.unregisterProducer()
     104
     105
     106
     107class TestPollablePipes(TestCase):
     108    """
     109    Tests for L{_pollingfile._PollableWritePipe} and
     110    L{_pollingfile._PollableReadPipe}.
     111    """
     112    def setUp(self):
     113        if _oldPollingFile:
     114            self._oldBufferSize = _pollingfile.FULL_BUFFER_SIZE
     115
     116
     117    def tearDown(self):
     118        if _oldPollingFile:
     119            _pollingfile.FULL_BUFFER_SIZE = self._oldBufferSize
     120
     121
     122
     123    class TestConsumerProtocol(basic.LineReceiver):
     124        """
     125        Expects a series of ascending integer messages, possibly including
     126        a large number of leading zeros.
     127
     128        Counts the number of messages, records any that aren't in sequence.
     129        """
     130        from os import linesep as delimiter
     131        counter = 0
     132        errors = []
     133
     134        def lineReceived(self, line):
     135            if int(line) != self.counter:
     136                self.errors.append((int(line), self.counter))
     137            self.counter += 1
     138
     139
     140
     141    def test_pullProducerUsingWriteSequence(self):
     142        """
     143        Test write pipe as a consumer with a pull producer using
     144        writeSequence().
     145
     146        Calling writeSequence() should behave the same as calling write()
     147        with each item, including flow control and disconnect handling.
     148
     149                    vvvv--- to be changed
     150        See Ticket #5413: writeSequence() doesn't pause the producer when
     151        outgoing buffer is full.
     152        """
     153
     154        if _oldPollingFile:
     155            _pollingfile.FULL_BUFFER_SIZE = 1024    # restrict write buffer size
     156
     157        pipeSize = 4096
     158        totalMessages = 10000
     159        msgFormat = '%08d\r\n'
     160
     161        d = defer.Deferred()
     162
     163        def doneReadCB():
     164            d.callback(None)
     165            self.assertEqual(producer.stopCount, 0)
     166            self.assertEqual(protocol.counter, totalMessages)
     167            self.assertEqual(protocol.errors, [])
     168            self.assertEqual(producer.pauseCount, 20)
     169            self.assertEqual(producer.resumeCount, 20)
     170
     171
     172        protocol = self.TestConsumerProtocol()
     173        transport = _PipeRunner(pipeSize,
     174                                doneReadCB,     # stop when reader is closed.
     175                                lambda: None,
     176                                protocol.dataReceived)
     177        if not _oldPollingFile:
     178            transport.writer.bufferSize = 1024
     179        producer = _TestProducerWriteSequence(transport, totalMessages, 500, msgFormat)
     180        transport.registerProducer(producer, True)
     181        producer.resumeProducing()
     182        return d
     183
     184
     185
     186class TestPollableWritePipeUnicode(TestCase):
     187    """
    20188    Tests for L{_pollingfile._PollableWritePipe}.
    21189    """
    22190
     
    25193        L{_pollingfile._PollableWritePipe.write} raises a C{TypeError} if an
    26194        attempt is made to append unicode data to the output buffer.
    27195        """
    28         p = _pollingfile._PollableWritePipe(1, lambda: None)
     196        p = _PollableWritePipe(1, lambda: None)
    29197        self.assertRaises(TypeError, p.write, u"test")
    30198
    31199
     
    35203        if unicode data is part of the data sequence to be appended to the
    36204        output buffer.
    37205        """
    38         p = _pollingfile._PollableWritePipe(1, lambda: None)
     206        p = _PollableWritePipe(1, lambda: None)
    39207        self.assertRaises(TypeError, p.writeSequence, [u"test"])
    40208        self.assertRaises(TypeError, p.writeSequence, (u"test", ))
    41209
    42210
    43211
     212if _skipNotWindows:
     213    skipMessage = "Test will run only on Windows."
     214    TestPollablePipes.skip = skipMessage
     215    TestPollableWritePipeUnicode.skip = skipMessage
    44216
    45 if _pollingfile is None:
    46     TestPollableWritePipe.skip = "Test will run only on Windows."