Ticket #2835: test_pollingfile-2835.patch

File test_pollingfile-2835.patch, 7.5 KB (added by John Popplewell, 9 years ago)
  • twisted/internet/test/test_pollingfile.py

     
    55Tests for L{twisted.internet._pollingfile}.
    66"""
    77
     8from zope.interface import implements
    89from twisted.python.runtime import platform
    910from twisted.trial.unittest import TestCase
     11from twisted.internet import interfaces, defer, reactor
     12from twisted.protocols import basic
    1013
    1114if platform.isWindows():
     15    import win32pipe
     16    import win32security
     17
    1218    from twisted.internet import _pollingfile
     19
     20    from twisted.internet._pollingfile import _PollingTimer, _PollableReadPipe
     21    from twisted.internet._pollingfile import _PollableWritePipe
     22    _skipNotWindows = False
     23    _oldPollingFile = True
    1324else:
    14     _pollingfile = None
     25    _skipNotWindows = True
     26    _PollingTimer = object
    1527
    1628
    1729
    18 class TestPollableWritePipe(TestCase):
     30class _TestProducer(object):
    1931    """
     32    Test producer based on the Produce() class in:
     33    doc/core/examples/streaming.py except that it produces a series of
     34    integer messages.
     35   
     36    It is configured as a push-producer if streaming is True otherwise it is
     37    configured as a pull-producer.
     38    """
     39
     40    implements(interfaces.IPushProducer)
     41
     42    def __init__(self, transport, streaming, count, msgFormat):
     43        self._transport = transport
     44        self._goal = count
     45        self._produced = 0
     46        self._paused = False
     47        self.pauseCount = 0
     48        self.resumeCount = 0
     49        self.stopCount = 0
     50        self.msgFormat = msgFormat
     51        if streaming:
     52            self.resumeProducing = self._resumeProducingPush
     53        else:
     54            self.resumeProducing = self._resumeProducingPull
     55
     56    def pauseProducing(self):
     57        self._paused = True
     58        self.pauseCount += 1
     59
     60
     61    def _resumeProducingPush(self):
     62        self._paused = False
     63        self.resumeCount += 1
     64
     65        while not self._paused and self._produced < self._goal:
     66            self._transport.write(self.msgFormat % self._produced)
     67            self._produced += 1
     68
     69        if self._produced == self._goal:
     70            self._transport.unregisterProducer()
     71            self._transport.writer.close()
     72
     73
     74    def _resumeProducingPull(self):
     75        self.resumeCount += 1
     76
     77        if self._produced < self._goal:
     78            self._transport.write(self.msgFormat % self._produced)
     79            self._produced += 1
     80
     81        if self._produced == self._goal:
     82            self._transport.unregisterProducer()
     83            self._transport.writer.close()
     84
     85
     86    def stopProducing(self):
     87        self._produced = self._goal
     88        self.stopCount += 1
     89
     90
     91
     92class _PipeRunner(_PollingTimer):
     93    """
     94    Builds, initializes and runs a pair of read/write pipes.
     95    """
     96    def __init__(self, pipeSize, doneReadCB, doneWriteCB, receivedCB, reactor=reactor):
     97        _PollingTimer.__init__(self, reactor)
     98        sAttrs = win32security.SECURITY_ATTRIBUTES()
     99        sAttrs.bInheritHandle = 1
     100        hRead, hWrite = win32pipe.CreatePipe(sAttrs, pipeSize)
     101        self.reader = _PollableReadPipe(hRead, receivedCB, doneReadCB)
     102        self.writer = _PollableWritePipe(hWrite, doneWriteCB)
     103        self._addPollableResource(self.reader)
     104        self._addPollableResource(self.writer)
     105
     106    def write(self, data):
     107        self.writer.write(data)
     108
     109    def writeSequence(self, seq):
     110        self.writer.writeSequence(seq)
     111
     112    def registerProducer(self, producer, streaming):
     113        self.writer.registerProducer(producer, streaming)
     114
     115    def unregisterProducer(self):
     116        self.writer.unregisterProducer()
     117
     118
     119
     120class TestPollablePipes(TestCase):
     121    """
     122    Tests for L{_pollingfile._PollableWritePipe} and
     123    L{_pollingfile._PollableReadPipe}.
     124    """
     125    def setUp(self):
     126        if _oldPollingFile:
     127            self._oldBufferSize = _pollingfile.FULL_BUFFER_SIZE
     128
     129
     130    def tearDown(self):
     131        if _oldPollingFile:
     132            _pollingfile.FULL_BUFFER_SIZE = self._oldBufferSize
     133
     134
     135
     136    class TestConsumerProtocol(basic.LineReceiver):
     137        """
     138        Expects a series of ascending integer messages, possibly including
     139        a large number of leading zeros.
     140
     141        Counts the number of messages, records any that aren't in sequence.
     142        """
     143        from os import linesep as delimiter
     144        counter = 0
     145        errors = []
     146
     147        def lineReceived(self, line):
     148            if int(line) != self.counter:
     149                self.errors.append((int(line), self.counter))
     150            self.counter += 1
     151
     152
     153
     154    def test_pullProducerCheckPauseProducing(self):
     155        """
     156        Test write pipe as a consumer using a pull producer.
     157
     158        The data received at the other end of the pipe is checked and counted
     159        as are the number of producer pause/resume/stop calls.
     160
     161        When calling registerProducer() with a pull-producer, the streaming
     162        parameter is False. The consumer should *not* be calling the producers
     163        pauseProducing() method when it's buffer fills as that method is not
     164        normally present, causing an error, and is redundant.
     165
     166        See Ticket #2835: pauseProducing() called erroneously.
     167        """
     168
     169        if _oldPollingFile:
     170            _pollingfile.FULL_BUFFER_SIZE = 1024    # restrict write buffer size
     171
     172        pipeSize = 4096
     173        totalMessages = 10000
     174        msgFormat = '%02048d\r\n'   # long messages trigger buffer full.
     175        totalResumeCounts = totalMessages
     176
     177        d = defer.Deferred()
     178
     179        def doneReadCB():
     180            d.callback(None)
     181            self.assertEqual(producer.stopCount, 0)
     182            self.assertEqual(protocol.counter, totalMessages)
     183            self.assertEqual(protocol.errors, [])
     184            self.assertEqual(producer.resumeCount, totalResumeCounts)
     185            self.assertEqual(producer.pauseCount, 0)
     186
     187
     188        protocol = self.TestConsumerProtocol()
     189        transport = _PipeRunner(pipeSize,
     190                                doneReadCB,     # stop when reader is closed.
     191                                lambda: None,
     192                                protocol.dataReceived)
     193        if not _oldPollingFile:
     194            transport.writer.bufferSize = 1024
     195        producer = _TestProducer(transport, False, totalMessages, msgFormat)
     196        transport.registerProducer(producer, False)
     197        producer.resumeProducing()
     198        return d
     199
     200
     201
     202class TestPollableWritePipeUnicode(TestCase):
     203    """
    20204    Tests for L{_pollingfile._PollableWritePipe}.
    21205    """
    22206
     
    25209        L{_pollingfile._PollableWritePipe.write} raises a C{TypeError} if an
    26210        attempt is made to append unicode data to the output buffer.
    27211        """
    28         p = _pollingfile._PollableWritePipe(1, lambda: None)
     212        p = _PollableWritePipe(1, lambda: None)
    29213        self.assertRaises(TypeError, p.write, u"test")
    30214
    31215
     
    35219        if unicode data is part of the data sequence to be appended to the
    36220        output buffer.
    37221        """
    38         p = _pollingfile._PollableWritePipe(1, lambda: None)
     222        p = _PollableWritePipe(1, lambda: None)
    39223        self.assertRaises(TypeError, p.writeSequence, [u"test"])
    40224        self.assertRaises(TypeError, p.writeSequence, (u"test", ))
    41225
    42226
    43227
     228if _skipNotWindows:
     229    skipMessage = "Test will run only on Windows."
     230    TestPollablePipes.skip = skipMessage
     231    TestPollableWritePipeUnicode.skip = skipMessage
    44232
    45 if _pollingfile is None:
    46     TestPollableWritePipe.skip = "Test will run only on Windows."