Ticket #2839: test_pollingfile-2839.patch

File test_pollingfile-2839.patch, 7.2 KB (added by John Popplewell, 7 years ago)
  • twisted/internet/test/test_pollingfile.py

     
    55Tests for L{twisted.internet._pollingfile}.
    66"""
    77
     8from zope.interface import implements
    89from twisted.python.runtime import platform
    910from twisted.trial.unittest import TestCase
     11from twisted.internet import interfaces, defer, reactor
     12from twisted.protocols import basic
    1013
    1114if platform.isWindows():
     15    import win32pipe
     16    import win32security
     17
    1218    from twisted.internet import _pollingfile
     19
     20    from twisted.internet._pollingfile import _PollingTimer, _PollableReadPipe
     21    from twisted.internet._pollingfile import _PollableWritePipe
     22    _skipNotWindows = False
     23    _oldPollingFile = True
    1324else:
    14     _pollingfile = None
     25    _skipNotWindows = True
     26    _PollingTimer = object
    1527
    1628
    1729
    18 class TestPollableWritePipe(TestCase):
     30class _TestProducer(object):
    1931    """
     32    Test producer based on the Produce() class in:
     33    doc/core/examples/streaming.py except that it produces a series of
     34    integer messages.
     35   
     36    It is configured as a push-producer if streaming is True otherwise it is
     37    configured as a pull-producer.
     38    """
     39
     40    implements(interfaces.IPushProducer)
     41
     42    def __init__(self, transport, streaming, count, msgFormat):
     43        self._transport = transport
     44        self._goal = count
     45        self._produced = 0
     46        self._paused = False
     47        self.pauseCount = 0
     48        self.resumeCount = 0
     49        self.stopCount = 0
     50        self.msgFormat = msgFormat
     51        if streaming:
     52            self.resumeProducing = self._resumeProducingPush
     53        else:
     54            self.resumeProducing = self._resumeProducingPull
     55
     56    def pauseProducing(self):
     57        self._paused = True
     58        self.pauseCount += 1
     59
     60
     61    def _resumeProducingPush(self):
     62        self._paused = False
     63        self.resumeCount += 1
     64
     65        while not self._paused and self._produced < self._goal:
     66            self._transport.write(self.msgFormat % self._produced)
     67            self._produced += 1
     68
     69        if self._produced == self._goal:
     70            self._transport.unregisterProducer()
     71            self._transport.writer.close()
     72
     73
     74    def _resumeProducingPull(self):
     75        self.resumeCount += 1
     76
     77        if self._produced < self._goal:
     78            self._transport.write(self.msgFormat % self._produced)
     79            self._produced += 1
     80
     81        if self._produced == self._goal:
     82            self._transport.unregisterProducer()
     83            self._transport.writer.close()
     84
     85
     86    def stopProducing(self):
     87        self._produced = self._goal
     88        self.stopCount += 1
     89
     90
     91
     92class _PipeRunner(_PollingTimer):
     93    """
     94    Builds, initializes and runs a pair of read/write pipes.
     95    """
     96    def __init__(self, pipeSize, doneReadCB, doneWriteCB, receivedCB, reactor=reactor):
     97        _PollingTimer.__init__(self, reactor)
     98        sAttrs = win32security.SECURITY_ATTRIBUTES()
     99        sAttrs.bInheritHandle = 1
     100        hRead, hWrite = win32pipe.CreatePipe(sAttrs, pipeSize)
     101        self.reader = _PollableReadPipe(hRead, receivedCB, doneReadCB)
     102        self.writer = _PollableWritePipe(hWrite, doneWriteCB)
     103        self._addPollableResource(self.reader)
     104        self._addPollableResource(self.writer)
     105
     106    def write(self, data):
     107        self.writer.write(data)
     108
     109    def writeSequence(self, seq):
     110        self.writer.writeSequence(seq)
     111
     112    def registerProducer(self, producer, streaming):
     113        self.writer.registerProducer(producer, streaming)
     114
     115    def unregisterProducer(self):
     116        self.writer.unregisterProducer()
     117
     118
     119
     120class TestPollablePipes(TestCase):
     121    """
     122    Tests for L{_pollingfile._PollableWritePipe} and
     123    L{_pollingfile._PollableReadPipe}.
     124    """
     125    def setUp(self):
     126        if _oldPollingFile:
     127            self._oldBufferSize = _pollingfile.FULL_BUFFER_SIZE
     128
     129
     130    def tearDown(self):
     131        if _oldPollingFile:
     132            _pollingfile.FULL_BUFFER_SIZE = self._oldBufferSize
     133
     134
     135
     136    class TestConsumerDisconnectedProtocol(basic.LineReceiver):
     137        """
     138        Expects a series of ascending integer messages, possibly including
     139        a large number of leading zeros.
     140
     141        Counts the number of messages, records any that aren't in sequence.
     142
     143        When the count reaches threshold, the write pipe is forced to
     144        disconnect by calling close(), once, on the read pipe.
     145        """
     146        from os import linesep as delimiter
     147
     148        counter = 0
     149        errors = []
     150
     151        def __init__(self, threshold):
     152            self.threshold = threshold
     153
     154
     155        def lineReceived(self, line):
     156            if int(line) != self.counter:
     157                self.errors.append((int(line), self.counter))
     158            self.counter += 1
     159            if self.counter == self.threshold:
     160                self._transport.reader.close()
     161
     162
     163
     164    def test_pushProducerDisconnected(self):
     165        """
     166        Test write pipe as a consumer using a push producer when the
     167        connection is forcibly disconnected.
     168       
     169        When a consumer is forcibly disconnected it should call the
     170        stopProducing() method on the producer.
     171
     172        See Ticket #2839: stopProducing() not called.
     173        """
     174        pipeSize = 4096
     175        totalMessages = 50000
     176        msgFormat = '%d\r\n'
     177        totalPauseCounts = 14
     178
     179        d = defer.Deferred()
     180
     181        def doneWriteCB():
     182            d.callback(None)
     183            self.assertEqual(producer.stopCount, 1)
     184            self.assertEqual(protocol.errors, [])
     185
     186        protocol = self.TestConsumerDisconnectedProtocol(1000)
     187        transport = _PipeRunner(pipeSize,
     188                                lambda: None,
     189                                doneWriteCB,    # stop when writer is closed.
     190                                protocol.dataReceived)
     191        protocol._transport = transport
     192        producer = _TestProducer(transport, True, totalMessages, msgFormat)
     193        transport.registerProducer(producer, True)
     194        producer.resumeProducing()
     195        return d
     196
     197
     198
     199class TestPollableWritePipeUnicode(TestCase):
     200    """
    20201    Tests for L{_pollingfile._PollableWritePipe}.
    21202    """
    22203
     
    25206        L{_pollingfile._PollableWritePipe.write} raises a C{TypeError} if an
    26207        attempt is made to append unicode data to the output buffer.
    27208        """
    28         p = _pollingfile._PollableWritePipe(1, lambda: None)
     209        p = _PollableWritePipe(1, lambda: None)
    29210        self.assertRaises(TypeError, p.write, u"test")
    30211
    31212
     
    35216        if unicode data is part of the data sequence to be appended to the
    36217        output buffer.
    37218        """
    38         p = _pollingfile._PollableWritePipe(1, lambda: None)
     219        p = _PollableWritePipe(1, lambda: None)
    39220        self.assertRaises(TypeError, p.writeSequence, [u"test"])
    40221        self.assertRaises(TypeError, p.writeSequence, (u"test", ))
    41222
    42223
    43224
     225if _skipNotWindows:
     226    skipMessage = "Test will run only on Windows."
     227    TestPollablePipes.skip = skipMessage
     228    TestPollableWritePipeUnicode.skip = skipMessage
    44229
    45 if _pollingfile is None:
    46     TestPollableWritePipe.skip = "Test will run only on Windows."