Ticket #5413: _pollingfile-using-filedescriptor-with-improved-tests.patch

File _pollingfile-using-filedescriptor-with-improved-tests.patch, 29.9 KB (added by John Popplewell, 9 years ago)

Improved tests, smaller patch to _pollingfile

  • twisted/internet/test/test_pollingfile.py

     
    55Tests for L{twisted.internet._pollingfile}.
    66"""
    77
     8from zope.interface import implements
    89from twisted.python.runtime import platform
    910from twisted.trial.unittest import TestCase
     11from twisted.internet import interfaces, defer
     12from twisted.protocols import basic
     13from twisted.internet.test.reactormixins import ReactorBuilder
    1014
    1115if platform.isWindows():
     16    import win32pipe
     17    import win32security
     18
    1219    from twisted.internet import _pollingfile
     20
     21    from twisted.internet._pollingfile import _PollingTimer
     22    _skipNotWindows = False
     23    _oldPollingFile = hasattr(_pollingfile, "FULL_BUFFER_SIZE")
    1324else:
    14     _pollingfile = None
     25    _skipNotWindows = True
     26    _PollingTimer = object
     27 
     28 
     29 
     30class TestProducer(object):
     31    """
     32    Test producer based on the Produce() class in:
     33    doc/core/examples/streaming.py except that it produces a series of
     34    integer messages.
     35   
     36    It is configured as a push-producer if streaming is True otherwise it is
     37    configured as a pull-producer.
     38   
     39    If blockSize isn't None the push-producer uses writeSequence() to write
     40    blockSize length sequences of integer messages.
     41    """
    1542
     43    implements(interfaces.IPushProducer)
    1644
     45    PULL, PUSH = range(2)
    1746
     47    def __init__(self, streaming, count, msgFormat, blockSize=None):
     48        self._goal = count
     49        self._produced = 0
     50        self._paused = False
     51        self._transport = None
     52        self.pauseCount = 0
     53        self.resumeCount = 0
     54        self.stopCount = 0
     55        self.msgFormat = msgFormat
     56        self.blockSize = blockSize
     57        self.streaming = streaming
     58        if streaming:
     59            self.resumeProducing = self._resumeProducingPush
     60        else:
     61            self.resumeProducing = self._resumeProducingPull
     62
     63    def pauseProducing(self):
     64        self._paused = True
     65        self.pauseCount += 1
     66
     67
     68    def _resumeProducingPush(self):
     69        self._paused = False
     70        self.resumeCount += 1
     71
     72        while not self._paused and self._produced < self._goal:
     73            if self.blockSize is None:
     74                self._transport.writer.write(self.msgFormat % self._produced)
     75                self._produced += 1
     76            else:
     77                block = []
     78                nBlocks = min(self.blockSize, self._goal - self._produced)
     79                for i in range(nBlocks):
     80                    block.append(self.msgFormat % (self._produced + i,))
     81                self._transport.writer.writeSequence(block)
     82                self._produced += self.blockSize
     83
     84        if self._produced == self._goal:
     85            self._transport.writer.unregisterProducer()
     86            self._transport.writer.close()
     87
     88
     89    def _resumeProducingPull(self):
     90        self.resumeCount += 1
     91
     92        if self._produced < self._goal:
     93            self._transport.writer.write(self.msgFormat % self._produced)
     94            self._produced += 1
     95
     96        if self._produced == self._goal:
     97            self._transport.writer.unregisterProducer()
     98            self._transport.writer.close()
     99
     100
     101    def stopProducing(self):
     102        self._produced = self._goal
     103        self.stopCount += 1
     104
     105
     106
     107class PipeRunner(_PollingTimer):
     108    """
     109    Builds, initializes and runs a pair of read/write pipes.
     110    """
     111    def __init__(self, pipeSize, doneReadCB, doneWriteCB, receivedCB, reactor):
     112        _PollingTimer.__init__(self, reactor)
     113        sAttrs = win32security.SECURITY_ATTRIBUTES()
     114        sAttrs.bInheritHandle = 1
     115        hRead, hWrite = win32pipe.CreatePipe(sAttrs, pipeSize)
     116        self.reader = _pollingfile._PollableReadPipe(hRead, receivedCB, doneReadCB)
     117        self.writer = _pollingfile._PollableWritePipe(hWrite, doneWriteCB)
     118        self._addPollableResource(self.reader)
     119        self._addPollableResource(self.writer)
     120
     121
     122
     123class TestProtocol(basic.LineReceiver):
     124    """
     125    Expects a series of ascending integer messages, possibly including
     126    a large number of leading zeros.
     127
     128    Counts the number of messages, records any that aren't in sequence.
     129
     130    When the count reaches threshold, close() is called on the reader
     131    or the writer.
     132    """
     133    from os import linesep as delimiter
     134
     135    READ, WRITE = range(2)
     136    counter = 0
     137    errors = []
     138
     139    def __init__(self, threshold=0, closer=0):
     140        self.threshold = threshold
     141        self.closer = closer
     142        self._transport = None
     143
     144
     145    def lineReceived(self, line):
     146        if int(line) != self.counter:
     147            self.errors.append((int(line), self.counter))
     148        self.counter += 1
     149        if self.counter == self.threshold:
     150            op = (self._transport.reader.close, self._transport.writer.close)
     151            op[self.closer]()
     152
     153
     154
     155class TestPollablePipes(ReactorBuilder):
     156    """
     157    Tests for L{_pollingfile._PollableWritePipe} and
     158    L{_pollingfile._PollableReadPipe}.
     159    """
     160    def setUp(self):
     161        if _oldPollingFile:
     162            self._oldBufferSize = _pollingfile.FULL_BUFFER_SIZE
     163
     164
     165    def tearDown(self):
     166        if _oldPollingFile:
     167            _pollingfile.FULL_BUFFER_SIZE = self._oldBufferSize
     168
     169
     170    def runTest(self, pipeSize, protocol, producer, bufferSize=None, waitWriter=False):
     171        reactor = self.buildReactor()
     172        r, w = lambda: reactor.stop(), lambda: None
     173        if waitWriter:
     174            r, w = w, r
     175        transport = PipeRunner(pipeSize, r, w, protocol.dataReceived, reactor)
     176        protocol._transport = producer._transport = transport
     177        if bufferSize is not None:
     178            if _oldPollingFile:
     179                _pollingfile.FULL_BUFFER_SIZE = bufferSize
     180            else:
     181                transport.writer.bufferSize = bufferSize
     182        transport.writer.registerProducer(producer, producer.streaming)
     183        producer.resumeProducing()
     184        self.runReactor(reactor)
     185
     186
     187    def test_pullProducerCheckPauseProducing(self):
     188        """
     189        Test write pipe as a consumer using a pull producer.
     190
     191        The data received at the other end of the pipe is checked and counted
     192        as are the number of producer pause/resume/stop calls.
     193
     194        When calling registerProducer() with a pull-producer, the streaming
     195        parameter is False. The consumer should *not* be calling the producers
     196        pauseProducing() method when it's buffer fills as that method is not
     197        normally present, causing an error, and is redundant.
     198
     199        See Ticket #2835: pauseProducing() called erroneously.
     200        """
     201        pipeSize = 4096
     202        totalMessages = 1000
     203        msgFormat = '%02048d\r\n'   # long messages trigger buffer full.
     204        totalResumeCounts = totalMessages
     205        protocol = TestProtocol()
     206        producer = TestProducer(TestProducer.PULL, totalMessages, msgFormat)
     207        self.runTest(pipeSize, protocol, producer, bufferSize=1024)
     208        self.assertEqual(producer.stopCount, 0)
     209        self.assertEqual(protocol.counter, totalMessages)
     210        self.assertEqual(protocol.errors, [])
     211        self.assertEqual(producer.resumeCount, totalResumeCounts)
     212        self.assertEqual(producer.pauseCount, 0)
     213
     214
     215    def test_pushProducerDisconnected(self):
     216        """
     217        Test write pipe as a consumer using a push producer when the
     218        connection is forcibly disconnected.
     219       
     220        When a consumer is forcibly disconnected it should call the
     221        stopProducing() method on the producer.
     222
     223        See Ticket #2839: stopProducing() not called.
     224        """
     225        pipeSize = 4096
     226        totalMessages = 50000
     227        msgFormat = '%d\r\n'
     228        protocol = TestProtocol(threshold=1000, closer=TestProtocol.READ)
     229        producer = TestProducer(TestProducer.PUSH, totalMessages, msgFormat)
     230        self.runTest(pipeSize, protocol, producer, waitWriter=True)
     231        self.assertEqual(producer.stopCount, 1)
     232        self.assertEqual(protocol.errors, [])
     233
     234
     235    def test_pullProducerPipeBufferSize(self):
     236        """
     237        This test times-out with the existing _pollingfile implementation.
     238
     239        See Ticket #5365: _pollingfile assumes that (win32 API) WriteFile()
     240        will allow a partial write, but WriteFile() writes either all or
     241        nothing. Which means if you try to write too much at once in a pipe's
     242        buffer, the _PollableWritePipe sits there doing nothing.
     243        """
     244        pipeSize = 512              # set a small pipe buffer size
     245        totalMessages = 1000
     246        msgFormat  = '%02048d\r\n'  # send long messages
     247        totalResumeCounts = totalMessages
     248        protocol = TestProtocol()
     249        producer = TestProducer(TestProducer.PULL, totalMessages, msgFormat)
     250        self.runTest(pipeSize, protocol, producer, bufferSize=1024)
     251        self.assertEqual(producer.stopCount, 0)
     252        self.assertEqual(protocol.counter, totalMessages)
     253        self.assertEqual(protocol.errors, [])
     254        self.assertEqual(producer.resumeCount, totalResumeCounts)
     255
     256
     257    def test_pushProducerDataDroppedAtLoseConnection(self):
     258        """
     259        Test write pipe as a consumer using a push producer.
     260
     261        If a producer has sent a consumer data, the consumer should
     262        honor a disconnect request made using loseConnection() when the data
     263        sent by the producer has been written.
     264
     265        See Ticket #6491: data written gets dropped while the pipe is still
     266        disconnecting.
     267        """
     268        pipeSize = 4096
     269        totalMessages = 10000
     270        msgFormat = '%08d\r\n'
     271        totalPauseCounts = 97
     272        protocol = TestProtocol(threshold=1000, closer=TestProtocol.WRITE)
     273        producer = TestProducer(TestProducer.PUSH, totalMessages, msgFormat)
     274        self.runTest(pipeSize, protocol, producer, bufferSize=1024)
     275        self.assertEqual(producer.stopCount, 0)
     276        self.assertEqual(protocol.counter, totalMessages)
     277        self.assertEqual(protocol.errors, [])
     278        self.assertEqual(producer.pauseCount, totalPauseCounts)
     279        self.assertEqual(producer.resumeCount, totalPauseCounts+1)
     280
     281
     282    def test_pushProducerCheckResumeProducing(self):
     283        """
     284        Test write pipe as a consumer using a push producer.
     285
     286        See Ticket #6492: bufferEmpty sets producerPaused attribute on the
     287        producer instead of on itself.
     288        """
     289        pipeSize = 4096
     290        totalMessages = 1000
     291        msgFormat  = '%08d\r\n'
     292        totalResumeCounts = 19
     293        protocol = TestProtocol()
     294        producer = TestProducer(TestProducer.PUSH, totalMessages, msgFormat)
     295        pre_producer_attribs = set(dir(producer))
     296        self.runTest(pipeSize, protocol, producer, bufferSize=512)
     297        self.assertEqual(producer.stopCount, 0)
     298        self.assertEqual(protocol.counter, totalMessages)
     299        self.assertEqual(protocol.errors, [])
     300        self.assertEqual(producer.pauseCount, totalResumeCounts)
     301        self.assertEqual(producer.resumeCount, totalResumeCounts+1)
     302        diff_producer_attribs = set(dir(producer)).difference(pre_producer_attribs)
     303        self.assertEqual(diff_producer_attribs, set())
     304
     305
     306    def test_pullProducerUsingWriteSequence(self):
     307        """
     308        Test write pipe as a consumer with a pull producer using
     309        writeSequence().
     310
     311        Calling writeSequence() should behave the same as calling write()
     312        with each item, including flow control and disconnect handling.
     313
     314        See Ticket #6493: writeSequence() doesn't pause the producer when
     315        outgoing buffer is full.
     316        """
     317        pipeSize = 4096
     318        totalMessages = 10000
     319        msgFormat = '%08d\r\n'
     320        protocol = TestProtocol()
     321        producer = TestProducer(TestProducer.PUSH, totalMessages, msgFormat, 500)
     322        self.runTest(pipeSize, protocol, producer, bufferSize=1024)
     323        self.assertEqual(producer.stopCount, 0)
     324        self.assertEqual(protocol.counter, totalMessages)
     325        self.assertEqual(protocol.errors, [])
     326        self.assertEqual(producer.pauseCount, 20)
     327        self.assertEqual(producer.resumeCount, 20)
     328
     329globals().update(TestPollablePipes.makeTestCaseClasses())
     330
     331
    18332class TestPollableWritePipe(TestCase):
    19333    """
    20334    Tests for L{_pollingfile._PollableWritePipe}.
     
    41355
    42356
    43357
     358if _skipNotWindows:
     359    skipMessage = "Test will run only on Windows."
     360    TestPollablePipes.skip = skipMessage
     361    TestPollableWritePipeUnicode.skip = skipMessage
    44362
    45 if _pollingfile is None:
    46     TestPollableWritePipe.skip = "Test will run only on Windows."
  • twisted/internet/_pollingfile.py

     
    44
    55"""
    66Implements a simple polling interface for file descriptors that don't work with
    7 select() - this is pretty much only useful on Windows.
     7C{select()} - this is pretty much only useful on Windows.
    88"""
    99
     10from zope.interface import Interface
    1011from zope.interface import implements
    1112
    1213from twisted.internet.interfaces import IConsumer, IPushProducer
     14from twisted.internet.abstract import FileDescriptor
     15from twisted.internet import main
     16from twisted.python.compat import lazyByteSlice
     17from twisted.python import failure
    1318
     19import win32pipe
     20import win32file
     21import win32api
     22import pywintypes
    1423
     24
     25
    1526MIN_TIMEOUT = 0.000000001
    1627MAX_TIMEOUT = 0.1
    1728
    1829
    1930
    20 class _PollableResource:
     31class _PollableResource(object):
     32    """
     33    A 'resource' that requires polling periodically derives from this (a mixin
     34    class).
     35
     36    @ivar active: determines whether a resource's C{checkWork()} is called
     37    from L{_PollingTimer._pollEvent()}
     38    @type active: C{bool} or C{int}
     39    """
    2140    active = True
    2241
    2342    def activate(self):
     43        """
     44        Resource is eligible for polling.
     45        """
    2446        self.active = True
    2547
    2648
    2749    def deactivate(self):
     50        """
     51        Resource polling is disabled.
     52        """
    2853        self.active = False
    2954
    3055
    3156
    32 class _PollingTimer:
    33     # Everything is private here because it is really an implementation detail.
     57class _PollingTimer(object):
     58    """
     59    Manages pollable 'resources'.
     60   
     61    Uses a single L{IReactorTime.callLater()} timer to poll multiple
     62    'resources'. If no resources require polling the timer is disabled until a
     63    new resource is added or L{_unpause()} or L{_startPolling()} is called.
    3464
     65    It looks like a 'resource' calling L{activate()} should probably restart
     66    polling, but doesn't.
     67
     68    @ivar reactor: See L{__init__}
     69
     70    @ivar _resources: List of 'resources' added by calling
     71    L{_addPollableResource()}.
     72    @type _resources: C{list}
     73
     74    @ivar _pollTimer: None when polling is paused/stopped or an object which
     75    can be used to cancel the scheduled call.
     76    @type _pollTimer: L{IDelayedCall} or C{None}
     77
     78    @ivar _currentTimeout: Calculated current polling interval.
     79    @type _currentTimeout: C{float}
     80
     81    @ivar _paused: True if polling is paused, False if polling is occurring.
     82    @type _paused: C{bool} or C{int}
     83    """
    3584    def __init__(self, reactor):
     85        """
     86        @param reactor:
     87        @type reactor: L{IReactorTime}
     88        """
    3689        self.reactor = reactor
    3790        self._resources = []
    3891        self._pollTimer = None
    3992        self._currentTimeout = MAX_TIMEOUT
    4093        self._paused = False
    4194
     95
    4296    def _addPollableResource(self, res):
     97        """
     98        Adds a 'resource' to be polled.
     99
     100        @param res:
     101        @type res: Object derived from L{_PollableResource}.
     102        """
    43103        self._resources.append(res)
    44104        self._checkPollingState()
    45105
     106
    46107    def _checkPollingState(self):
     108        """
     109        Scans for active 'resources'. If one is found, polling is restarted,
     110        otherwise polling is stopped.
     111        """
    47112        for resource in self._resources:
    48113            if resource.active:
    49114                self._startPolling()
     
    51116        else:
    52117            self._stopPolling()
    53118
     119
    54120    def _startPolling(self):
     121        """
     122        Starts polling if there is no outstanding timer.
     123        """
    55124        if self._pollTimer is None:
    56125            self._pollTimer = self._reschedule()
    57126
     127
    58128    def _stopPolling(self):
     129        """
     130        If there is an outstanding timer event, cancel it.
     131        """
    59132        if self._pollTimer is not None:
    60133            self._pollTimer.cancel()
    61134            self._pollTimer = None
    62135
     136
    63137    def _pause(self):
     138        """
     139        Pauses polling.
     140        """
    64141        self._paused = True
    65142
     143
    66144    def _unpause(self):
     145        """
     146        Resumes polling if there are any active resources.
     147        """
    67148        self._paused = False
    68149        self._checkPollingState()
    69150
     151
    70152    def _reschedule(self):
     153        """
     154        If not paused, calls L{IReactorTime.callLater()} with the current
     155        timeout.
     156
     157        @return: L{IDelayedCall}
     158        """
    71159        if not self._paused:
    72             return self.reactor.callLater(self._currentTimeout, self._pollEvent)
     160            return self.reactor.callLater(self._currentTimeout,
     161                                          self._pollEvent)
    73162
     163
    74164    def _pollEvent(self):
     165        """
     166        Event callback for L{IReactorTime.callLater()}.
     167
     168        Iterates through list of resources calling C{checkWork()} making a
     169        note of whether they are still active.
     170       
     171        A calculation is performed using the sum of the returned values from
     172        C{checkWork()} that determines a new polling interval. The value is
     173        clamped between MIN_TIMEOUT and MAX_TIMEOUT. The more work done the
     174        shorter the time interval and vice-versa.
     175
     176        If any 'resources' are still active a new callback is scheduled.
     177        """
     178        self._pollTimer = None
    75179        workUnits = 0.
    76180        anyActive = []
    77181        for resource in self._resources:
     
    95199            self._pollTimer = self._reschedule()
    96200
    97201
    98 # If we ever (let's hope not) need the above functionality on UNIX, this could
    99 # be factored into a different module.
    100202
    101 import win32pipe
    102 import win32file
    103 import win32api
    104 import pywintypes
     203class _PollableReadPipe(_PollableResource, FileDescriptor):
     204    """
     205    A generic pollable reader implementing C{IPushProducer}.
    105206
    106 class _PollableReadPipe(_PollableResource):
     207    @ivar pipe: See L{__init__}.
     208    @ivar receivedCallback: See L{__init__}.
     209    @ivar lostCallback: See L{__init__}.
     210    @ivar connected: True if pipe is connected or file open.
     211    @type connected: C{bool} or C{int}.
     212    """
    107213
    108214    implements(IPushProducer)
    109215
    110216    def __init__(self, pipe, receivedCallback, lostCallback):
    111         # security attributes for pipes
     217        """
     218        @param pipe: Handle to a pipe.
     219        @type  pipe: pywin32.PyHandle
     220
     221        @param receivedCallback: receives read data
     222        @type  receivedCallback: C{function} callback(data)
     223
     224        @param lostCallback: connection lost
     225        @type  lostCallback: C{function} callback()
     226        """
     227        FileDescriptor.__init__(self)
     228        self.connected = True
    112229        self.pipe = pipe
    113230        self.receivedCallback = receivedCallback
    114231        self.lostCallback = lostCallback
    115232
     233
    116234    def checkWork(self):
    117         finished = 0
    118         fullDataRead = []
     235        """
     236        Called by C{_PollingTimer()} to try reading. An error causes the
     237        connection to be lost.
    119238
    120         while 1:
    121             try:
    122                 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
    123                 # finished = (result == -1)
    124                 if not bytesToRead:
    125                     break
    126                 hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
    127                 fullDataRead.append(data)
    128             except win32api.error:
    129                 finished = 1
    130                 break
     239        @return: number of bytes read, may be zero.
     240        """
     241        try:
     242            _, bytesToRead, _ = win32pipe.PeekNamedPipe(self.pipe, 1)
     243        except pywintypes.error:
     244            self.readConnectionLost(main.CONNECTION_DONE)
     245            return 0
     246        if not bytesToRead:
     247            return 0
     248        _, data = win32file.ReadFile(self.pipe, bytesToRead, None)
     249        if data:
     250            self.receivedCallback(data)
     251        return len(data)
    131252
    132         dataBuf = ''.join(fullDataRead)
    133         if dataBuf:
    134             self.receivedCallback(dataBuf)
    135         if finished:
    136             self.cleanup()
    137         return len(dataBuf)
    138253
    139     def cleanup(self):
    140         self.deactivate()
    141         self.lostCallback()
     254    def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
     255        """
     256        This is a reader not a writer so we set C{_writeDisconnected} True
     257        so that the disconnect is immediate and then call up into
     258        C{FileDescriptor}.
     259        """
     260        self._writeDisconnected = True
     261        FileDescriptor.loseConnection(self, _connDone)
    142262
    143     def close(self):
     263
     264    def connectionLost(self, reason):
     265        """
     266        Here we clean-up, call up into the C{FileDescriptor} and then call our
     267        C{lostCallback()}.
     268        """
    144269        try:
    145270            win32api.CloseHandle(self.pipe)
    146271        except pywintypes.error:
    147             # You can't close std handles...?
    148272            pass
     273        FileDescriptor.connectionLost(self, reason)
     274        self.lostCallback()
    149275
    150     def stopProducing(self):
    151         self.close()
    152276
    153     def pauseProducing(self):
     277    def stopReading(self):
     278        """
     279        Prevents C{checkWork()} from being called.
     280        """
    154281        self.deactivate()
    155282
    156     def resumeProducing(self):
     283
     284    def startReading(self):
     285        """
     286        C{checkWork()} will now be called again.
     287        """
    157288        self.activate()
    158289
    159290
    160 FULL_BUFFER_SIZE = 64 * 1024
     291    def stopWriting(self):
     292        """
     293        Prevents this reader from being removed from the reactor.
     294        """
     295        pass
    161296
    162 class _PollableWritePipe(_PollableResource):
    163297
     298    def startWriting(self):
     299        """
     300        Prevents this reader from being added to the reactor.
     301        """
     302        pass
     303
     304
     305    def close(self):
     306        """
     307        Provided for backwards compatibility with the old _PollableReadPipe().
     308        Calls loseConnection().
     309        """
     310        self.loseConnection()
     311
     312
     313
     314class _PollableWritePipe(_PollableResource, FileDescriptor):
     315    """
     316    A generic pollable writer implementing C{IConsumer}.
     317
     318    @ivar writePipe: See L{__init__}.
     319    @ivar lostCallback: See L{__init__}.
     320    @ivar connected: True if pipe is connected or file open.
     321    @type connected: C{bool} or C{int}.
     322    """
     323
    164324    implements(IConsumer)
    165325
    166326    def __init__(self, writePipe, lostCallback):
    167         self.disconnecting = False
    168         self.producer = None
    169         self.producerPaused = 0
    170         self.streamingProducer = 0
    171         self.outQueue = []
     327        """
     328        @param writePipe: Handle to a pipe.
     329        @type  writePipe: pywin32.PyHandle
     330
     331        @param lostCallback: connection lost
     332        @type  lostCallback: C{function} callback()
     333        """
     334        FileDescriptor.__init__(self)
     335        self.connected = True
    172336        self.writePipe = writePipe
    173337        self.lostCallback = lostCallback
    174338        try:
    175             win32pipe.SetNamedPipeHandleState(writePipe,
     339            win32pipe.SetNamedPipeHandleState(self.writePipe,
    176340                                              win32pipe.PIPE_NOWAIT,
    177341                                              None,
    178342                                              None)
    179343        except pywintypes.error:
    180             # Maybe it's an invalid handle.  Who knows.
     344            # fails if not a valid pipe handle
    181345            pass
    182346
    183     def close(self):
    184         self.disconnecting = True
    185347
    186     def bufferFull(self):
    187         if self.producer is not None:
    188             self.producerPaused = 1
    189             self.producer.pauseProducing()
     348    def checkWork(self):
     349        """
     350        Called by C{_PollingTimer()} to process C{FileDescriptor} write
     351        buffers and detect closure of the pipe.
    190352
    191     def bufferEmpty(self):
    192         if self.producer is not None and ((not self.streamingProducer) or
    193                                           self.producerPaused):
    194             self.producer.producerPaused = 0
    195             self.producer.resumeProducing()
    196             return True
    197         return False
     353        Leverages C{FileDescriptor} to do the heavy lifting. An error causes
     354        the connection to be lost.
    198355
    199     # almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh
     356        @return: Zero if connection lost, one if still active.
     357        """
     358        self.doWrite()
     359        self.startWriting()
     360        try:
     361            win32file.WriteFile(self.writePipe, '', None)
     362            return 1
     363        except pywintypes.error:
     364            self.connectionLost(main.CONNECTION_LOST)
     365            if self.lostCallback:
     366                self.lostCallback()
     367                self.lostCallback = None
     368            return 0
    200369
    201     def registerProducer(self, producer, streaming):
    202         """Register to receive data from a producer.
    203370
    204         This sets this selectable to be a consumer for a producer.  When this
    205         selectable runs out of data on a write() call, it will ask the producer
    206         to resumeProducing(). A producer should implement the IProducer
    207         interface.
     371    def writeSomeData(self, data):
     372        """
     373        Called from C{FileDescriptor} to do the actual writing.
    208374
    209         FileDescriptor provides some infrastructure for producer methods.
     375        Avoids writing more data than the pipe can handle. Excess data is put
     376        back into the write buffers by C{FileDescriptor}.
     377
     378        @return: Number of bytes written or main.CONNECTION_LOST on error.
    210379        """
    211         if self.producer is not None:
    212             raise RuntimeError(
    213                 "Cannot register producer %s, because producer %s was never "
    214                 "unregistered." % (producer, self.producer))
    215         if not self.active:
    216             producer.stopProducing()
    217         else:
    218             self.producer = producer
    219             self.streamingProducer = streaming
    220             if not streaming:
    221                 producer.resumeProducing()
     380        try:
     381            _, writeBufferSize, _, _ = win32pipe.GetNamedPipeInfo(self.writePipe)
     382            data = lazyByteSlice(data, 0, writeBufferSize)
     383        except pywintypes.error:
     384            pass
     385        try:
     386            _, bytesWritten = win32file.WriteFile(self.writePipe, data, None)
     387        except pywintypes.error:
     388            return main.CONNECTION_LOST
     389        return bytesWritten
    222390
    223     def unregisterProducer(self):
    224         """Stop consuming data from a producer, without disconnecting.
     391
     392    def _postLoseConnection(self):
    225393        """
    226         self.producer = None
    227 
    228     def writeConnectionLost(self):
    229         self.deactivate()
     394        Under normal circumstances, clean-up is performed here and the
     395        C{lostCallback()} triggered.
     396        """
    230397        try:
    231398            win32api.CloseHandle(self.writePipe)
    232399        except pywintypes.error:
    233             # OMG what
    234400            pass
    235         self.lostCallback()
     401        if self.lostCallback:
     402            self.lostCallback()
     403            self.lostCallback = None
    236404
    237405
    238     def writeSequence(self, seq):
     406    def stopWriting(self):
    239407        """
    240         Append a C{list} or C{tuple} of bytes to the output buffer.
     408        Prevents C{checkWork()} from being called.
     409        """
     410        self.deactivate()
    241411
    242         @param seq: C{list} or C{tuple} of C{str} instances to be appended to
    243             the output buffer.
    244412
    245         @raise TypeError: If C{seq} contains C{unicode}.
     413    def startWriting(self):
    246414        """
    247         if unicode in map(type, seq):
    248             raise TypeError("Unicode not allowed in output buffer.")
    249         self.outQueue.extend(seq)
     415        C{checkWork()} will now be called again.
     416        """
     417        self.activate()
    250418
    251419
    252     def write(self, data):
     420    def stopReading(self):
    253421        """
    254         Append some bytes to the output buffer.
     422        Prevents this writer from being removed from the reactor.
     423        """
     424        pass
    255425
    256         @param data: C{str} to be appended to the output buffer.
    257         @type data: C{str}.
    258426
    259         @raise TypeError: If C{data} is C{unicode} instead of C{str}.
     427    def startReading(self):
    260428        """
    261         if isinstance(data, unicode):
    262             raise TypeError("Unicode not allowed in output buffer.")
    263         if self.disconnecting:
    264             return
    265         self.outQueue.append(data)
    266         if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
    267             self.bufferFull()
     429        Prevents this writer from being added to the reactor.
     430        """
     431        pass
    268432
    269433
    270     def checkWork(self):
    271         numBytesWritten = 0
    272         if not self.outQueue:
    273             if self.disconnecting:
    274                 self.writeConnectionLost()
    275                 return 0
    276             try:
    277                 win32file.WriteFile(self.writePipe, '', None)
    278             except pywintypes.error:
    279                 self.writeConnectionLost()
    280                 return numBytesWritten
    281         while self.outQueue:
    282             data = self.outQueue.pop(0)
    283             errCode = 0
    284             try:
    285                 errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
    286                                                              data, None)
    287             except win32api.error:
    288                 self.writeConnectionLost()
    289                 break
    290             else:
    291                 # assert not errCode, "wtf an error code???"
    292                 numBytesWritten += nBytesWritten
    293                 if len(data) > nBytesWritten:
    294                     self.outQueue.insert(0, data[nBytesWritten:])
    295                     break
    296         else:
    297             resumed = self.bufferEmpty()
    298             if not resumed and self.disconnecting:
    299                 self.writeConnectionLost()
    300         return numBytesWritten
     434    def close(self):
     435        """
     436        Provided for backwards compatibility with the old _PollableWritePipe().
     437        Calls C{loseConnection()}.
     438        """
     439        self.loseConnection()
     440
     441
     442
     443all = [_PollableReadPipe, _PollableWritePipe]
     444