Ticket #6491: test_pollingfile-5413-data-dropped.patch

File test_pollingfile-5413-data-dropped.patch, 7.7 KB (added by John Popplewell, 9 years ago)
  • twisted/internet/test/test_pollingfile.py

     
    55Tests for L{twisted.internet._pollingfile}.
    66"""
    77
     8from zope.interface import implements
    89from twisted.python.runtime import platform
    910from twisted.trial.unittest import TestCase
     11from twisted.internet import interfaces, defer, reactor
     12from twisted.protocols import basic
    1013
    1114if platform.isWindows():
     15    import win32pipe
     16    import win32security
     17
    1218    from twisted.internet import _pollingfile
     19
     20    from twisted.internet._pollingfile import _PollingTimer, _PollableReadPipe
     21    from twisted.internet._pollingfile import _PollableWritePipe
     22    _skipNotWindows = False
     23    _oldPollingFile = True
    1324else:
    14     _pollingfile = None
     25    _skipNotWindows = True
     26    _PollingTimer = object
    1527
    1628
    1729
    18 class TestPollableWritePipe(TestCase):
     30class _TestProducer(object):
    1931    """
     32    Test producer based on the Produce() class in:
     33    doc/core/examples/streaming.py except that it produces a series of
     34    integer messages.
     35   
     36    It is configured as a push-producer if streaming is True otherwise it is
     37    configured as a pull-producer.
     38    """
     39
     40    implements(interfaces.IPushProducer)
     41
     42    def __init__(self, transport, streaming, count, msgFormat):
     43        self._transport = transport
     44        self._goal = count
     45        self._produced = 0
     46        self._paused = False
     47        self.pauseCount = 0
     48        self.resumeCount = 0
     49        self.stopCount = 0
     50        self.msgFormat = msgFormat
     51        if streaming:
     52            self.resumeProducing = self._resumeProducingPush
     53        else:
     54            self.resumeProducing = self._resumeProducingPull
     55
     56    def pauseProducing(self):
     57        self._paused = True
     58        self.pauseCount += 1
     59
     60
     61    def _resumeProducingPush(self):
     62        self._paused = False
     63        self.resumeCount += 1
     64
     65        while not self._paused and self._produced < self._goal:
     66            self._transport.write(self.msgFormat % self._produced)
     67            self._produced += 1
     68
     69        if self._produced == self._goal:
     70            self._transport.unregisterProducer()
     71            self._transport.writer.close()
     72
     73
     74    def _resumeProducingPull(self):
     75        self.resumeCount += 1
     76
     77        if self._produced < self._goal:
     78            self._transport.write(self.msgFormat % self._produced)
     79            self._produced += 1
     80
     81        if self._produced == self._goal:
     82            self._transport.unregisterProducer()
     83            self._transport.writer.close()
     84
     85
     86    def stopProducing(self):
     87        self._produced = self._goal
     88        self.stopCount += 1
     89
     90
     91
     92class _PipeRunner(_PollingTimer):
     93    """
     94    Builds, initializes and runs a pair of read/write pipes.
     95    """
     96    def __init__(self, pipeSize, doneReadCB, doneWriteCB, receivedCB, reactor=reactor):
     97        _PollingTimer.__init__(self, reactor)
     98        sAttrs = win32security.SECURITY_ATTRIBUTES()
     99        sAttrs.bInheritHandle = 1
     100        hRead, hWrite = win32pipe.CreatePipe(sAttrs, pipeSize)
     101        self.reader = _PollableReadPipe(hRead, receivedCB, doneReadCB)
     102        self.writer = _PollableWritePipe(hWrite, doneWriteCB)
     103        self._addPollableResource(self.reader)
     104        self._addPollableResource(self.writer)
     105
     106    def write(self, data):
     107        self.writer.write(data)
     108
     109    def writeSequence(self, seq):
     110        self.writer.writeSequence(seq)
     111
     112    def registerProducer(self, producer, streaming):
     113        self.writer.registerProducer(producer, streaming)
     114
     115    def unregisterProducer(self):
     116        self.writer.unregisterProducer()
     117
     118
     119
     120class TestPollablePipes(TestCase):
     121    """
     122    Tests for L{_pollingfile._PollableWritePipe} and
     123    L{_pollingfile._PollableReadPipe}.
     124    """
     125    def setUp(self):
     126        if _oldPollingFile:
     127            self._oldBufferSize = _pollingfile.FULL_BUFFER_SIZE
     128
     129
     130    def tearDown(self):
     131        if _oldPollingFile:
     132            _pollingfile.FULL_BUFFER_SIZE = self._oldBufferSize
     133
     134
     135
     136    class TestConsumerLostConnectionProtocol(basic.LineReceiver):
     137        """
     138        Expects a series of ascending integer messages, possibly including
     139        a large number of leading zeros.
     140
     141        Counts the number of messages, records any that aren't in sequence.
     142
     143        When the count reaches threshold, the write pipe is requested to
     144        disconnect by calling close(), once, on the write pipe.
     145        """
     146        from os import linesep as delimiter
     147
     148        counter = 0
     149        errors = []
     150
     151        def __init__(self, threshold):
     152            self.threshold = threshold
     153
     154
     155        def lineReceived(self, line):
     156            if int(line) != self.counter:
     157                self.errors.append((int(line), self.counter))
     158            self.counter += 1
     159            if self.counter == self.threshold:
     160                self._transport.writer.close()
     161
     162
     163
     164    def test_pushProducerDataDroppedAtLoseConnection(self):
     165        """
     166        Test write pipe as a consumer using a push producer.
     167
     168        If a producer has sent a consumer data, the consumer should
     169        honor a disconnect request made using loseConnection() when the data
     170        sent by the producer has been written.
     171
     172                    vvvv--- to be changed
     173        See Ticket #5413: data written gets dropped while the pipe is still
     174        disconnecting.
     175        """
     176
     177        if _oldPollingFile:
     178            _pollingfile.FULL_BUFFER_SIZE = 1024    # restrict write buffer size
     179
     180        pipeSize = 4096
     181        totalMessages = 10000
     182        msgFormat = '%08d\r\n'
     183        totalPauseCounts = 97
     184
     185        d = defer.Deferred()
     186
     187        def doneCB():
     188            d.callback(None)
     189            self.assertEqual(producer.stopCount, 0)
     190            self.assertEqual(protocol.counter, totalMessages)
     191            self.assertEqual(protocol.errors, [])
     192            self.assertEqual(producer.pauseCount, totalPauseCounts)
     193            self.assertEqual(producer.resumeCount, totalPauseCounts+1)
     194
     195
     196        protocol = self.TestConsumerLostConnectionProtocol(1000)
     197        transport = _PipeRunner(pipeSize,
     198                                doneCB,     # stop when reader is closed
     199                                lambda: None,
     200                                protocol.dataReceived)
     201        protocol._transport = transport
     202        if not _oldPollingFile:
     203            transport.writer.bufferSize = 1024
     204        producer = _TestProducer(transport, True, totalMessages, msgFormat)
     205        transport.registerProducer(producer, True)
     206        producer.resumeProducing()
     207        return d
     208
     209
     210
     211class TestPollableWritePipeUnicode(TestCase):
     212    """
    20213    Tests for L{_pollingfile._PollableWritePipe}.
    21214    """
    22215
     
    25218        L{_pollingfile._PollableWritePipe.write} raises a C{TypeError} if an
    26219        attempt is made to append unicode data to the output buffer.
    27220        """
    28         p = _pollingfile._PollableWritePipe(1, lambda: None)
     221        p = _PollableWritePipe(1, lambda: None)
    29222        self.assertRaises(TypeError, p.write, u"test")
    30223
    31224
     
    35228        if unicode data is part of the data sequence to be appended to the
    36229        output buffer.
    37230        """
    38         p = _pollingfile._PollableWritePipe(1, lambda: None)
     231        p = _PollableWritePipe(1, lambda: None)
    39232        self.assertRaises(TypeError, p.writeSequence, [u"test"])
    40233        self.assertRaises(TypeError, p.writeSequence, (u"test", ))
    41234
    42235
    43236
     237if _skipNotWindows:
     238    skipMessage = "Test will run only on Windows."
     239    TestPollablePipes.skip = skipMessage
     240    TestPollableWritePipeUnicode.skip = skipMessage
    44241
    45 if _pollingfile is None:
    46     TestPollableWritePipe.skip = "Test will run only on Windows."