Ticket #6491: data-dropped-at-loseconnection-#6491.patch

File data-dropped-at-loseconnection-#6491.patch, 5.4 KB (added by John Popplewell, 9 years ago)
  • twisted/internet/_pollingfile.py

     
    165165
    166166    def __init__(self, writePipe, lostCallback):
    167167        self.disconnecting = False
     168        self.disconnected = False
    168169        self.producer = None
    169170        self.producerPaused = 0
    170171        self.streamingProducer = 0
     
    227228
    228229    def writeConnectionLost(self):
    229230        self.deactivate()
     231        self.disconnected = True
    230232        try:
    231233            win32api.CloseHandle(self.writePipe)
    232234        except pywintypes.error:
     
    246248        """
    247249        if unicode in map(type, seq):
    248250            raise TypeError("Unicode not allowed in output buffer.")
     251        if self.disconnected:
     252            return
    249253        self.outQueue.extend(seq)
    250254
    251255
     
    260264        """
    261265        if isinstance(data, unicode):
    262266            raise TypeError("Unicode not allowed in output buffer.")
    263         if self.disconnecting:
     267        if self.disconnected:
    264268            return
    265269        self.outQueue.append(data)
    266270        if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
  • twisted/internet/test/test_pollingfile.py

     
    44"""
    55Tests for L{twisted.internet._pollingfile}.
    66"""
     7import os
    78
    89from twisted.python.runtime import platform
    910from twisted.trial.unittest import TestCase
     11from twisted.protocols import basic
     12from twisted.internet.test.reactormixins import ReactorBuilder
    1013
     14
     15
    1116if platform.isWindows():
     17    import win32pipe
     18    import win32security
     19
    1220    from twisted.internet import _pollingfile
     21
     22    from twisted.internet._pollingfile import _PollingTimer
     23    _skipNotWindows = None
    1324else:
    14     _pollingfile = None
     25    _skipNotWindows = "Test will run only on Windows."
     26    _PollingTimer = object
     27 
    1528
     29 
     30class PipeRunner(_PollingTimer):
     31    """
     32    Builds, initializes and runs a pair of read/write pipes.
     33    """
     34    def __init__(self, pipeSize, doneReadCB, doneWriteCB, receivedCB, reactor):
     35        _PollingTimer.__init__(self, reactor)
     36        sAttrs = win32security.SECURITY_ATTRIBUTES()
     37        sAttrs.bInheritHandle = 1
     38        hRead, hWrite = win32pipe.CreatePipe(sAttrs, pipeSize)
     39        self.reader = _pollingfile._PollableReadPipe(hRead, receivedCB, doneReadCB)
     40        self.writer = _pollingfile._PollableWritePipe(hWrite, doneWriteCB)
     41        self._addPollableResource(self.reader)
     42        self._addPollableResource(self.writer)
    1643
    1744
     45
     46class TestPollablePipes(ReactorBuilder):
     47    """
     48    Tests for L{_pollingfile._PollableWritePipe} and
     49    L{_pollingfile._PollableReadPipe}.
     50    """
     51    def setUp(self):
     52        self._oldBufferSize = _pollingfile.FULL_BUFFER_SIZE
     53
     54
     55    def tearDown(self):
     56        _pollingfile.FULL_BUFFER_SIZE = self._oldBufferSize
     57
     58
     59    def test_pullProducerDataDroppedAtLoseConnection(self):
     60        """
     61        Test write pipe as a consumer using a push producer.
     62
     63        If a producer has sent a consumer data, the consumer should
     64        honor a disconnect request made using loseConnection() when the data
     65        sent by the producer has been written.
     66
     67        This test depends on the tickets: #2835, #2839, #5365, #6492 and #6493
     68        being resolved first.
     69
     70        See Ticket #6491: data written gets dropped while the pipe is still
     71        disconnecting.
     72        """
     73        pipeSize = 4096  # avoid Ticket #5365 issues
     74        _pollingfile.FULL_BUFFER_SIZE = 1024
     75        testMessage = '0' * 1024
     76
     77        class TestProtocol(basic.LineReceiver):
     78            lineCount = 0
     79            def lineReceived(self, line):
     80                if self.lineCount == 10:
     81                    transport.writer.close()
     82                self.lineCount += 1
     83                self.testResponse = line
     84
     85        class TestProducer(object):
     86            count = 0
     87            paused = False
     88            totalMessages = 100
     89            def resumeProducing(self):
     90                self.paused = False
     91                while not self.paused and self.count < self.totalMessages:
     92                    transport.writer.write(testMessage+"\r\n")
     93                    self.count += 1
     94
     95            def pauseProducing(self):
     96                self.paused = True
     97
     98            def stopProducing(self):
     99                self.count = self.totalMessages
     100
     101        reactor = self.buildReactor()
     102        r, w = lambda: reactor.stop(), lambda: None
     103        protocol, producer = TestProtocol(), TestProducer()
     104        transport = PipeRunner(pipeSize, r, w, protocol.dataReceived, reactor)
     105        transport.writer.registerProducer(producer, True)
     106        producer.resumeProducing()
     107        self.runReactor(reactor)
     108        self.assertEqual(testMessage, protocol.testResponse)
     109        self.assertEqual(100, protocol.lineCount)
     110
     111
     112globals().update(TestPollablePipes.makeTestCaseClasses())
     113
     114
    18115class TestPollableWritePipe(TestCase):
    19116    """
    20117    Tests for L{_pollingfile._PollableWritePipe}.
     
    41138
    42139
    43140
     141if _skipNotWindows:
     142    TestPollablePipes.skip = skipMessage
     143    TestPollableWritePipeUnicode.skip = skipMessage
    44144
    45 if _pollingfile is None:
    46     TestPollableWritePipe.skip = "Test will run only on Windows."