Ticket #5413: test_pollingfile-all-6-new-tests.patch

File test_pollingfile-all-6-new-tests.patch, 18.3 KB (added by John Popplewell, 9 years ago)
  • twisted/internet/test/test_pollingfile.py

     
    55Tests for L{twisted.internet._pollingfile}.
    66"""
    77
     8from zope.interface import implements
    89from twisted.python.runtime import platform
    910from twisted.trial.unittest import TestCase
     11from twisted.internet import interfaces, defer, reactor
     12from twisted.protocols import basic
    1013
    1114if platform.isWindows():
     15    import win32pipe
     16    import win32security
     17
    1218    from twisted.internet import _pollingfile
     19
     20    from twisted.internet._pollingfile import _PollingTimer, _PollableReadPipe
     21    from twisted.internet._pollingfile import _PollableWritePipe
     22    _skipNotWindows = False
     23    _oldPollingFile = hasattr(_pollingfile, "FULL_BUFFER_SIZE")
    1324else:
    14     _pollingfile = None
     25    _skipNotWindows = True
     26    _PollingTimer = object
     27 
     28 
     29 
     30class _TestProducer(object):
     31    """
     32    Test producer based on the Produce() class in:
     33    doc/core/examples/streaming.py except that it produces a series of
     34    integer messages.
     35   
     36    It is configured as a push-producer if streaming is True otherwise it is
     37    configured as a pull-producer.
     38    """
    1539
     40    implements(interfaces.IPushProducer)
    1641
     42    def __init__(self, transport, streaming, count, msgFormat):
     43        self._transport = transport
     44        self._goal = count
     45        self._produced = 0
     46        self._paused = False
     47        self.pauseCount = 0
     48        self.resumeCount = 0
     49        self.stopCount = 0
     50        self.msgFormat = msgFormat
     51        if streaming:
     52            self.resumeProducing = self._resumeProducingPush
     53        else:
     54            self.resumeProducing = self._resumeProducingPull
    1755
     56    def pauseProducing(self):
     57        self._paused = True
     58        self.pauseCount += 1
     59
     60
     61    def _resumeProducingPush(self):
     62        self._paused = False
     63        self.resumeCount += 1
     64
     65        while not self._paused and self._produced < self._goal:
     66            self._transport.write(self.msgFormat % self._produced)
     67            self._produced += 1
     68
     69        if self._produced == self._goal:
     70            self._transport.unregisterProducer()
     71            self._transport.writer.close()
     72
     73
     74    def _resumeProducingPull(self):
     75        self.resumeCount += 1
     76
     77        if self._produced < self._goal:
     78            self._transport.write(self.msgFormat % self._produced)
     79            self._produced += 1
     80
     81        if self._produced == self._goal:
     82            self._transport.unregisterProducer()
     83            self._transport.writer.close()
     84
     85
     86    def stopProducing(self):
     87        self._produced = self._goal
     88        self.stopCount += 1
     89
     90
     91
     92class _TestProducerWriteSequence(object):
     93    """
     94    Test push producer based on the Produce() class in:
     95    doc/core/examples/streaming.py except that it produces a series of
     96    integer messages in blocks using writeSequence().
     97    """
     98
     99    implements(interfaces.IPushProducer)
     100
     101    def __init__(self, transport, count, blockSize, msgFormat):
     102        self._transport = transport
     103        self._goal = count
     104        self._produced = 0
     105        self._paused = False
     106        self.pauseCount = 0
     107        self.resumeCount = 0
     108        self.stopCount = 0
     109        self.msgFormat = msgFormat
     110        self.blockSize = blockSize
     111
     112
     113    def pauseProducing(self):
     114        self._paused = True
     115        self.pauseCount += 1
     116
     117
     118    def resumeProducing(self):
     119        self._paused = False
     120        self.resumeCount += 1
     121
     122        while not self._paused and self._produced < self._goal:
     123            block = []
     124            nBlocks = min(self.blockSize, self._goal - self._produced)
     125            for i in range(nBlocks):
     126                block.append(self.msgFormat % (self._produced + i,))
     127            self._transport.writeSequence(block)
     128            self._produced += self.blockSize
     129
     130        if self._produced == self._goal:
     131            self._transport.unregisterProducer()
     132            self._transport.writer.close()
     133
     134
     135    def stopProducing(self):
     136        self._produced = self._goal
     137        self.stopCount += 1
     138
     139
     140
     141class _PipeRunner(_PollingTimer):
     142    """
     143    Builds, initializes and runs a pair of read/write pipes.
     144    """
     145    def __init__(self, pipeSize, doneReadCB, doneWriteCB, receivedCB, reactor=reactor):
     146        _PollingTimer.__init__(self, reactor)
     147        sAttrs = win32security.SECURITY_ATTRIBUTES()
     148        sAttrs.bInheritHandle = 1
     149        hRead, hWrite = win32pipe.CreatePipe(sAttrs, pipeSize)
     150        self.reader = _PollableReadPipe(hRead, receivedCB, doneReadCB)
     151        self.writer = _PollableWritePipe(hWrite, doneWriteCB)
     152        self._addPollableResource(self.reader)
     153        self._addPollableResource(self.writer)
     154
     155    def write(self, data):
     156        self.writer.write(data)
     157
     158    def writeSequence(self, seq):
     159        self.writer.writeSequence(seq)
     160
     161    def registerProducer(self, producer, streaming):
     162        self.writer.registerProducer(producer, streaming)
     163
     164    def unregisterProducer(self):
     165        self.writer.unregisterProducer()
     166
     167
     168
     169class TestPollablePipes(TestCase):
     170    """
     171    Tests for L{_pollingfile._PollableWritePipe} and
     172    L{_pollingfile._PollableReadPipe}.
     173    """
     174    def setUp(self):
     175        if _oldPollingFile:
     176            self._oldBufferSize = _pollingfile.FULL_BUFFER_SIZE
     177
     178
     179    def tearDown(self):
     180        if _oldPollingFile:
     181            _pollingfile.FULL_BUFFER_SIZE = self._oldBufferSize
     182
     183
     184
     185    class TestConsumerProtocol(basic.LineReceiver):
     186        """
     187        Expects a series of ascending integer messages, possibly including
     188        a large number of leading zeros.
     189
     190        Counts the number of messages, records any that aren't in sequence.
     191        """
     192        from os import linesep as delimiter
     193        counter = 0
     194        errors = []
     195
     196        def lineReceived(self, line):
     197            if int(line) != self.counter:
     198                self.errors.append((int(line), self.counter))
     199            self.counter += 1
     200
     201
     202
     203    class TestConsumerDisconnectedProtocol(basic.LineReceiver):
     204        """
     205        Expects a series of ascending integer messages, possibly including
     206        a large number of leading zeros.
     207
     208        Counts the number of messages, records any that aren't in sequence.
     209
     210        When the count reaches threshold, the write pipe is forced to
     211        disconnect by calling close(), once, on the read pipe.
     212        """
     213        from os import linesep as delimiter
     214
     215        counter = 0
     216        errors = []
     217
     218        def __init__(self, threshold):
     219            self.threshold = threshold
     220
     221
     222        def lineReceived(self, line):
     223            if int(line) != self.counter:
     224                self.errors.append((int(line), self.counter))
     225            self.counter += 1
     226            if self.counter == self.threshold:
     227                self._transport.reader.close()
     228
     229
     230
     231    class TestConsumerLostConnectionProtocol(basic.LineReceiver):
     232        """
     233        Expects a series of ascending integer messages, possibly including
     234        a large number of leading zeros.
     235
     236        Counts the number of messages, records any that aren't in sequence.
     237
     238        When the count reaches threshold, the write pipe is requested to
     239        disconnect by calling close(), once, on the write pipe.
     240        """
     241        from os import linesep as delimiter
     242
     243        counter = 0
     244        errors = []
     245
     246        def __init__(self, threshold):
     247            self.threshold = threshold
     248
     249
     250        def lineReceived(self, line):
     251            if int(line) != self.counter:
     252                self.errors.append((int(line), self.counter))
     253            self.counter += 1
     254            if self.counter == self.threshold:
     255                self._transport.writer.close()
     256
     257
     258
     259    def test_pullProducerCheckPauseProducing(self):
     260        """
     261        Test write pipe as a consumer using a pull producer.
     262
     263        The data received at the other end of the pipe is checked and counted
     264        as are the number of producer pause/resume/stop calls.
     265
     266        When calling registerProducer() with a pull-producer, the streaming
     267        parameter is False. The consumer should *not* be calling the producers
     268        pauseProducing() method when it's buffer fills as that method is not
     269        normally present, causing an error, and is redundant.
     270
     271        See Ticket #2835: pauseProducing() called erroneously.
     272        """
     273
     274        if _oldPollingFile:
     275            _pollingfile.FULL_BUFFER_SIZE = 1024    # restrict write buffer size
     276
     277        pipeSize = 4096
     278        totalMessages = 10000
     279        msgFormat = '%02048d\r\n'   # long messages trigger buffer full.
     280        totalResumeCounts = totalMessages
     281
     282        d = defer.Deferred()
     283
     284        def doneReadCB():
     285            d.callback(None)
     286            self.assertEqual(producer.stopCount, 0)
     287            self.assertEqual(protocol.counter, totalMessages)
     288            self.assertEqual(protocol.errors, [])
     289            self.assertEqual(producer.resumeCount, totalResumeCounts)
     290            self.assertEqual(producer.pauseCount, 0)
     291
     292
     293        protocol = self.TestConsumerProtocol()
     294        transport = _PipeRunner(pipeSize,
     295                                doneReadCB,     # stop when reader is closed.
     296                                lambda: None,
     297                                protocol.dataReceived)
     298        if not _oldPollingFile:
     299            transport.writer.bufferSize = 1024
     300        producer = _TestProducer(transport, False, totalMessages, msgFormat)
     301        transport.registerProducer(producer, False)
     302        producer.resumeProducing()
     303        return d
     304
     305
     306    def test_pushProducerDisconnected(self):
     307        """
     308        Test write pipe as a consumer using a push producer when the
     309        connection is forcibly disconnected.
     310       
     311        When a consumer is forcibly disconnected it should call the
     312        stopProducing() method on the producer.
     313
     314        See Ticket #2839: stopProducing() not called.
     315        """
     316        pipeSize = 4096
     317        totalMessages = 50000
     318        msgFormat = '%d\r\n'
     319        totalPauseCounts = 14
     320
     321        d = defer.Deferred()
     322
     323        def doneWriteCB():
     324            d.callback(None)
     325            self.assertEqual(producer.stopCount, 1)
     326            self.assertEqual(protocol.errors, [])
     327
     328        protocol = self.TestConsumerDisconnectedProtocol(1000)
     329        transport = _PipeRunner(pipeSize,
     330                                lambda: None,
     331                                doneWriteCB,    # stop when writer is closed.
     332                                protocol.dataReceived)
     333        protocol._transport = transport
     334        producer = _TestProducer(transport, True, totalMessages, msgFormat)
     335        transport.registerProducer(producer, True)
     336        producer.resumeProducing()
     337        return d
     338
     339
     340    def test_pullProducerPipeBufferSize(self):
     341        """
     342        This test times-out with the existing _pollingfile implementation.
     343
     344        See Ticket #5365: _pollingfile assumes that (win32 API) WriteFile()
     345        will allow a partial write, but WriteFile() writes either all or
     346        nothing. Which means if you try to write too much at once in a pipe's
     347        buffer, the _PollableWritePipe sits there doing nothing.
     348        """
     349
     350        if _oldPollingFile:
     351            _pollingfile.FULL_BUFFER_SIZE = 1024 # restrict write buffer size
     352
     353        pipeSize = 512              # set a small pipe buffer size
     354        totalMessages = 1000
     355        msgFormat  = '%02048d\r\n'  # send long messages
     356        totalResumeCounts = totalMessages
     357
     358        d = defer.Deferred()
     359
     360        def doneReadCB():
     361            d.callback(None)
     362            self.assertEqual(producer.stopCount, 0)
     363            self.assertEqual(protocol.counter, totalMessages)
     364            self.assertEqual(protocol.errors, [])
     365            self.assertEqual(producer.resumeCount, totalResumeCounts)
     366
     367        protocol = self.TestConsumerProtocol()
     368        transport = _PipeRunner(pipeSize,
     369                                doneReadCB,     # stop when reader is closed.
     370                                lambda: None,
     371                                protocol.dataReceived)
     372        protocol._transport = transport
     373        if not _oldPollingFile:
     374            transport.writer.bufferSize = 1024
     375        producer = _TestProducer(transport, False, totalMessages, msgFormat)
     376        transport.registerProducer(producer, False)
     377        producer.resumeProducing()
     378        return d
     379
     380
     381    def test_pushProducerDataDroppedAtLoseConnection(self):
     382        """
     383        Test write pipe as a consumer using a push producer.
     384
     385        If a producer has sent a consumer data, the consumer should
     386        honor a disconnect request made using loseConnection() when the data
     387        sent by the producer has been written.
     388
     389                    vvvv--- to be changed
     390        See Ticket #5413: data written gets dropped while the pipe is still
     391        disconnecting.
     392        """
     393
     394        if _oldPollingFile:
     395            _pollingfile.FULL_BUFFER_SIZE = 1024    # restrict write buffer size
     396
     397        pipeSize = 4096
     398        totalMessages = 10000
     399        msgFormat = '%08d\r\n'
     400        totalPauseCounts = 97
     401
     402        d = defer.Deferred()
     403
     404        def doneCB():
     405            d.callback(None)
     406            self.assertEqual(producer.stopCount, 0)
     407            self.assertEqual(protocol.counter, totalMessages)
     408            self.assertEqual(protocol.errors, [])
     409            self.assertEqual(producer.pauseCount, totalPauseCounts)
     410            self.assertEqual(producer.resumeCount, totalPauseCounts+1)
     411
     412
     413        protocol = self.TestConsumerLostConnectionProtocol(1000)
     414        transport = _PipeRunner(pipeSize,
     415                                doneCB,     # stop when reader is closed
     416                                lambda: None,
     417                                protocol.dataReceived)
     418        protocol._transport = transport
     419        if not _oldPollingFile:
     420            transport.writer.bufferSize = 1024
     421        producer = _TestProducer(transport, True, totalMessages, msgFormat)
     422        transport.registerProducer(producer, True)
     423        producer.resumeProducing()
     424        return d
     425
     426
     427    def test_pushProducerCheckResumeProducing(self):
     428        """
     429        Test write pipe as a consumer using a push producer.
     430
     431                    vvvv--- to be changed
     432        See Ticket #5413: bufferEmpty sets producerPaused attribute on the
     433        producer instead of on itself.
     434        """
     435        if _oldPollingFile:
     436            _pollingfile.FULL_BUFFER_SIZE = 512 # restrict write buffer size
     437
     438        pipeSize = 4096
     439        totalMessages = 1000
     440        msgFormat  = '%08d\r\n'
     441        totalResumeCounts = 19
     442
     443        d = defer.Deferred()
     444
     445        def doneReadCB():
     446            d.callback(None)
     447            self.assertEqual(producer.stopCount, 0)
     448            self.assertEqual(protocol.counter, totalMessages)
     449            self.assertEqual(protocol.errors, [])
     450            self.assertEqual(producer.pauseCount, totalResumeCounts)
     451            self.assertEqual(producer.resumeCount, totalResumeCounts+1)
     452            diff_producer_attribs = set(dir(producer)).difference(pre_producer_attribs)
     453            self.assertEqual(diff_producer_attribs, set())
     454
     455        protocol = self.TestConsumerProtocol()
     456        transport = _PipeRunner(pipeSize,
     457                                doneReadCB,
     458                                lambda: None,
     459                                protocol.dataReceived)
     460        protocol._transport = transport
     461        transport.writer.bufferSize = 512
     462        producer = _TestProducer(transport, True, totalMessages, msgFormat)
     463        pre_producer_attribs = set(dir(producer))
     464        transport.registerProducer(producer, True)
     465        producer.resumeProducing()
     466        return d
     467
     468
     469    def test_pullProducerUsingWriteSequence(self):
     470        """
     471        Test write pipe as a consumer with a pull producer using
     472        writeSequence().
     473
     474        Calling writeSequence() should behave the same as calling write()
     475        with each item, including flow control and disconnect handling.
     476
     477                    vvvv--- to be changed
     478        See Ticket #5413: writeSequence() doesn't pause the producer when
     479        outgoing buffer is full.
     480        """
     481
     482        if _oldPollingFile:
     483            _pollingfile.FULL_BUFFER_SIZE = 1024    # restrict write buffer size
     484
     485        pipeSize = 4096
     486        totalMessages = 10000
     487        msgFormat = '%08d\r\n'
     488
     489        d = defer.Deferred()
     490
     491        def doneReadCB():
     492            d.callback(None)
     493            self.assertEqual(producer.stopCount, 0)
     494            self.assertEqual(protocol.counter, totalMessages)
     495            self.assertEqual(protocol.errors, [])
     496            self.assertEqual(producer.pauseCount, 20)
     497            self.assertEqual(producer.resumeCount, 20)
     498
     499
     500        protocol = self.TestConsumerProtocol()
     501        transport = _PipeRunner(pipeSize,
     502                                doneReadCB,     # stop when reader is closed.
     503                                lambda: None,
     504                                protocol.dataReceived)
     505        if not _oldPollingFile:
     506            transport.writer.bufferSize = 1024
     507        producer = _TestProducerWriteSequence(transport, totalMessages, 500, msgFormat)
     508        transport.registerProducer(producer, True)
     509        producer.resumeProducing()
     510        return d
     511
     512
     513
    18514class TestPollableWritePipe(TestCase):
    19515    """
    20516    Tests for L{_pollingfile._PollableWritePipe}.
     
    25521        L{_pollingfile._PollableWritePipe.write} raises a C{TypeError} if an
    26522        attempt is made to append unicode data to the output buffer.
    27523        """
    28         p = _pollingfile._PollableWritePipe(1, lambda: None)
     524        p = _PollableWritePipe(1, lambda: None)
    29525        self.assertRaises(TypeError, p.write, u"test")
    30526
    31527
     
    35531        if unicode data is part of the data sequence to be appended to the
    36532        output buffer.
    37533        """
    38         p = _pollingfile._PollableWritePipe(1, lambda: None)
     534        p = _PollableWritePipe(1, lambda: None)
    39535        self.assertRaises(TypeError, p.writeSequence, [u"test"])
    40536        self.assertRaises(TypeError, p.writeSequence, (u"test", ))
    41537
    42538
    43539
     540if _skipNotWindows:
     541    skipMessage = "Test will run only on Windows."
     542    TestPollablePipes.skip = skipMessage
     543    TestPollableWritePipeUnicode.skip = skipMessage
    44544
    45 if _pollingfile is None:
    46     TestPollableWritePipe.skip = "Test will run only on Windows."