Ticket #6492: test_pollingfile-5413-producer-paused.patch

File test_pollingfile-5413-producer-paused.patch, 7.3 KB (added by John Popplewell, 9 years ago)
  • twisted/internet/test/test_pollingfile.py

     
    55Tests for L{twisted.internet._pollingfile}.
    66"""
    77
     8from zope.interface import implements
    89from twisted.python.runtime import platform
    910from twisted.trial.unittest import TestCase
     11from twisted.internet import interfaces, defer, reactor
     12from twisted.protocols import basic
    1013
    1114if platform.isWindows():
     15    import win32pipe
     16    import win32security
     17
    1218    from twisted.internet import _pollingfile
     19
     20    from twisted.internet._pollingfile import _PollingTimer, _PollableReadPipe
     21    from twisted.internet._pollingfile import _PollableWritePipe
     22    _skipNotWindows = False
     23    _oldPollingFile = True
    1324else:
    14     _pollingfile = None
     25    _skipNotWindows = True
     26    _PollingTimer = object
    1527
    1628
    1729
    18 class TestPollableWritePipe(TestCase):
     30class _TestProducer(object):
    1931    """
     32    Test producer based on the Produce() class in:
     33    doc/core/examples/streaming.py except that it produces a series of
     34    integer messages.
     35   
     36    It is configured as a push-producer if streaming is True otherwise it is
     37    configured as a pull-producer.
     38    """
     39
     40    implements(interfaces.IPushProducer)
     41
     42    def __init__(self, transport, streaming, count, msgFormat):
     43        self._transport = transport
     44        self._goal = count
     45        self._produced = 0
     46        self._paused = False
     47        self.pauseCount = 0
     48        self.resumeCount = 0
     49        self.stopCount = 0
     50        self.msgFormat = msgFormat
     51        if streaming:
     52            self.resumeProducing = self._resumeProducingPush
     53        else:
     54            self.resumeProducing = self._resumeProducingPull
     55
     56    def pauseProducing(self):
     57        self._paused = True
     58        self.pauseCount += 1
     59
     60
     61    def _resumeProducingPush(self):
     62        self._paused = False
     63        self.resumeCount += 1
     64
     65        while not self._paused and self._produced < self._goal:
     66            self._transport.write(self.msgFormat % self._produced)
     67            self._produced += 1
     68
     69        if self._produced == self._goal:
     70            self._transport.unregisterProducer()
     71            self._transport.writer.close()
     72
     73
     74    def _resumeProducingPull(self):
     75        self.resumeCount += 1
     76
     77        if self._produced < self._goal:
     78            self._transport.write(self.msgFormat % self._produced)
     79            self._produced += 1
     80
     81        if self._produced == self._goal:
     82            self._transport.unregisterProducer()
     83            self._transport.writer.close()
     84
     85
     86    def stopProducing(self):
     87        self._produced = self._goal
     88        self.stopCount += 1
     89
     90
     91
     92class _PipeRunner(_PollingTimer):
     93    """
     94    Builds, initializes and runs a pair of read/write pipes.
     95    """
     96    def __init__(self, pipeSize, doneReadCB, doneWriteCB, receivedCB, reactor=reactor):
     97        _PollingTimer.__init__(self, reactor)
     98        sAttrs = win32security.SECURITY_ATTRIBUTES()
     99        sAttrs.bInheritHandle = 1
     100        hRead, hWrite = win32pipe.CreatePipe(sAttrs, pipeSize)
     101        self.reader = _PollableReadPipe(hRead, receivedCB, doneReadCB)
     102        self.writer = _PollableWritePipe(hWrite, doneWriteCB)
     103        self._addPollableResource(self.reader)
     104        self._addPollableResource(self.writer)
     105
     106    def write(self, data):
     107        self.writer.write(data)
     108
     109    def writeSequence(self, seq):
     110        self.writer.writeSequence(seq)
     111
     112    def registerProducer(self, producer, streaming):
     113        self.writer.registerProducer(producer, streaming)
     114
     115    def unregisterProducer(self):
     116        self.writer.unregisterProducer()
     117
     118
     119
     120class TestPollablePipes(TestCase):
     121    """
     122    Tests for L{_pollingfile._PollableWritePipe} and
     123    L{_pollingfile._PollableReadPipe}.
     124    """
     125    def setUp(self):
     126        if _oldPollingFile:
     127            self._oldBufferSize = _pollingfile.FULL_BUFFER_SIZE
     128
     129
     130    def tearDown(self):
     131        if _oldPollingFile:
     132            _pollingfile.FULL_BUFFER_SIZE = self._oldBufferSize
     133
     134
     135
     136    class TestConsumerProtocol(basic.LineReceiver):
     137        """
     138        Expects a series of ascending integer messages, possibly including
     139        a large number of leading zeros.
     140
     141        Counts the number of messages, records any that aren't in sequence.
     142        """
     143        from os import linesep as delimiter
     144        counter = 0
     145        errors = []
     146
     147        def lineReceived(self, line):
     148            if int(line) != self.counter:
     149                self.errors.append((int(line), self.counter))
     150            self.counter += 1
     151
     152
     153
     154    def test_pushProducerCheckResumeProducing(self):
     155        """
     156        Test write pipe as a consumer using a push producer.
     157
     158                    vvvv--- to be changed
     159        See Ticket #5413: bufferEmpty sets producerPaused attribute on the
     160        producer instead of on itself.
     161        """
     162        if _oldPollingFile:
     163            _pollingfile.FULL_BUFFER_SIZE = 512 # restrict write buffer size
     164
     165        pipeSize = 4096
     166        totalMessages = 1000
     167        msgFormat  = '%08d\r\n'
     168        totalResumeCounts = 19
     169
     170        d = defer.Deferred()
     171
     172        def doneReadCB():
     173            d.callback(None)
     174            self.assertEqual(producer.stopCount, 0)
     175            self.assertEqual(protocol.counter, totalMessages)
     176            self.assertEqual(protocol.errors, [])
     177            self.assertEqual(producer.pauseCount, totalResumeCounts)
     178            self.assertEqual(producer.resumeCount, totalResumeCounts+1)
     179            diff_producer_attribs = set(dir(producer)).difference(pre_producer_attribs)
     180            self.assertEqual(diff_producer_attribs, set())
     181
     182        protocol = self.TestConsumerProtocol()
     183        transport = _PipeRunner(pipeSize,
     184                                doneReadCB,
     185                                lambda: None,
     186                                protocol.dataReceived)
     187        protocol._transport = transport
     188        transport.writer.bufferSize = 512
     189        producer = _TestProducer(transport, True, totalMessages, msgFormat)
     190        pre_producer_attribs = set(dir(producer))
     191        transport.registerProducer(producer, True)
     192        producer.resumeProducing()
     193        return d
     194
     195
     196
     197class TestPollableWritePipeUnicode(TestCase):
     198    """
    20199    Tests for L{_pollingfile._PollableWritePipe}.
    21200    """
    22201
     
    25204        L{_pollingfile._PollableWritePipe.write} raises a C{TypeError} if an
    26205        attempt is made to append unicode data to the output buffer.
    27206        """
    28         p = _pollingfile._PollableWritePipe(1, lambda: None)
     207        p = _PollableWritePipe(1, lambda: None)
    29208        self.assertRaises(TypeError, p.write, u"test")
    30209
    31210
     
    35214        if unicode data is part of the data sequence to be appended to the
    36215        output buffer.
    37216        """
    38         p = _pollingfile._PollableWritePipe(1, lambda: None)
     217        p = _PollableWritePipe(1, lambda: None)
    39218        self.assertRaises(TypeError, p.writeSequence, [u"test"])
    40219        self.assertRaises(TypeError, p.writeSequence, (u"test", ))
    41220
    42221
    43222
     223if _skipNotWindows:
     224    skipMessage = "Test will run only on Windows."
     225    TestPollablePipes.skip = skipMessage
     226    TestPollableWritePipeUnicode.skip = skipMessage
    44227
    45 if _pollingfile is None:
    46     TestPollableWritePipe.skip = "Test will run only on Windows."