Ticket #5413: _pollingfile-use-filedescriptor.patch

File _pollingfile-use-filedescriptor.patch, 19.9 KB (added by John Popplewell, 9 years ago)
  • twisted/internet/_pollingfile.py

     
    44
    55"""
    66Implements a simple polling interface for file descriptors that don't work with
    7 select() - this is pretty much only useful on Windows.
     7C{select()} - this is pretty much only useful on Windows.
    88"""
    99
     10from zope.interface import Interface
    1011from zope.interface import implements
    1112
    1213from twisted.internet.interfaces import IConsumer, IPushProducer
     14from twisted.internet.abstract import FileDescriptor
     15from twisted.internet import main
     16from twisted.python.compat import lazyByteSlice
     17from twisted.python import failure
    1318
     19import win32pipe
     20import win32file
     21import win32api
     22import pywintypes
    1423
     24
     25
    1526MIN_TIMEOUT = 0.000000001
    1627MAX_TIMEOUT = 0.1
    1728
    1829
    1930
    20 class _PollableResource:
     31class _PollableResource(object):
     32    """
     33    A 'resource' that requires polling periodically derives from this (a mixin
     34    class).
     35
     36    @ivar active: determines whether a resource's C{checkWork()} is called
     37    from L{_PollingTimer._pollEvent()}
     38    @type active: C{bool} or C{int}
     39    """
    2140    active = True
    2241
    2342    def activate(self):
     43        """
     44        Resource is eligible for polling.
     45        """
    2446        self.active = True
    2547
    2648
    2749    def deactivate(self):
     50        """
     51        Resource polling is disabled.
     52        """
    2853        self.active = False
    2954
    3055
    3156
    32 class _PollingTimer:
    33     # Everything is private here because it is really an implementation detail.
     57class _PollingTimer(object):
     58    """
     59    Manages pollable 'resources'.
     60   
     61    Uses a single L{IReactorTime.callLater()} timer to poll multiple
     62    'resources'. If no resources require polling the timer is disabled until a
     63    new resource is added or L{_unpause()} or L{_startPolling()} is called.
    3464
     65    It looks like a 'resource' calling L{activate()} should probably restart
     66    polling, but doesn't.
     67
     68    @ivar reactor: See L{__init__}
     69
     70    @ivar _resources: List of 'resources' added by calling
     71    L{_addPollableResource()}.
     72    @type _resources: C{list}
     73
     74    @ivar _pollTimer: None when polling is paused/stopped or an object which
     75    can be used to cancel the scheduled call.
     76    @type _pollTimer: L{IDelayedCall} or C{None}
     77
     78    @ivar _currentTimeout: Calculated current polling interval.
     79    @type _currentTimeout: C{float}
     80
     81    @ivar _paused: True if polling is paused, False if polling is occurring.
     82    @type _paused: C{bool} or C{int}
     83    """
    3584    def __init__(self, reactor):
     85        """
     86        @param reactor:
     87        @type reactor: L{IReactorTime}
     88        """
    3689        self.reactor = reactor
    3790        self._resources = []
    3891        self._pollTimer = None
    3992        self._currentTimeout = MAX_TIMEOUT
    4093        self._paused = False
    4194
     95
    4296    def _addPollableResource(self, res):
     97        """
     98        Adds a 'resource' to be polled.
     99
     100        @param res:
     101        @type res: Object derived from L{_PollableResource}.
     102        """
    43103        self._resources.append(res)
    44104        self._checkPollingState()
    45105
     106
    46107    def _checkPollingState(self):
     108        """
     109        Scans for active 'resources'. If one is found, polling is restarted,
     110        otherwise polling is stopped.
     111        """
    47112        for resource in self._resources:
    48113            if resource.active:
    49114                self._startPolling()
     
    51116        else:
    52117            self._stopPolling()
    53118
     119
    54120    def _startPolling(self):
     121        """
     122        Starts polling if there is no outstanding timer.
     123        """
    55124        if self._pollTimer is None:
    56125            self._pollTimer = self._reschedule()
    57126
     127
    58128    def _stopPolling(self):
     129        """
     130        If there is an outstanding timer event, cancel it.
     131        """
    59132        if self._pollTimer is not None:
    60133            self._pollTimer.cancel()
    61134            self._pollTimer = None
    62135
     136
    63137    def _pause(self):
     138        """
     139        Pauses polling.
     140        """
    64141        self._paused = True
    65142
     143
    66144    def _unpause(self):
     145        """
     146        Resumes polling if there are any active resources.
     147        """
    67148        self._paused = False
    68149        self._checkPollingState()
    69150
     151
    70152    def _reschedule(self):
     153        """
     154        If not paused, calls L{IReactorTime.callLater()} with the current
     155        timeout.
     156
     157        @return: L{IDelayedCall}
     158        """
    71159        if not self._paused:
    72             return self.reactor.callLater(self._currentTimeout, self._pollEvent)
     160            return self.reactor.callLater(self._currentTimeout,
     161                                          self._pollEvent)
    73162
     163
    74164    def _pollEvent(self):
     165        """
     166        Event callback for L{IReactorTime.callLater()}.
     167
     168        Iterates through list of resources calling C{checkWork()} making a
     169        note of whether they are still active.
     170       
     171        A calculation is performed using the sum of the returned values from
     172        C{checkWork()} that determines a new polling interval. The value is
     173        clamped between MIN_TIMEOUT and MAX_TIMEOUT. The more work done the
     174        shorter the time interval and vice-versa.
     175
     176        If any 'resources' are still active a new callback is scheduled.
     177        """
     178        self._pollTimer = None
    75179        workUnits = 0.
    76180        anyActive = []
    77181        for resource in self._resources:
     
    95199            self._pollTimer = self._reschedule()
    96200
    97201
    98 # If we ever (let's hope not) need the above functionality on UNIX, this could
    99 # be factored into a different module.
    100202
    101 import win32pipe
    102 import win32file
    103 import win32api
    104 import pywintypes
     203class _PollableReader(_PollableResource, FileDescriptor):
     204    """
     205    A generic pollable reader implementing C{IPushProducer}.
    105206
    106 class _PollableReadPipe(_PollableResource):
     207    @ivar handle: See L{__init__}.
     208    @ivar receivedCallback: See L{__init__}.
     209    @ivar lostCallback: See L{__init__}.
     210    @ivar connected: True if pipe is connected or file open.
     211    @type connected: C{bool} or C{int}.
     212    """
    107213
    108214    implements(IPushProducer)
    109215
    110     def __init__(self, pipe, receivedCallback, lostCallback):
    111         # security attributes for pipes
    112         self.pipe = pipe
     216    def __init__(self, handle, receivedCallback, lostCallback):
     217        """
     218        @param handle: Handle to a file or pipe.
     219        @type  handle: pywin32.PyHandle
     220
     221        @param receivedCallback: receives read data
     222        @type  receivedCallback: C{function} callback(data)
     223
     224        @param lostCallback: connection lost
     225        @type  lostCallback: C{function} callback()
     226        """
     227        FileDescriptor.__init__(self)
     228        self.handle = handle
     229        self.connected = True
    113230        self.receivedCallback = receivedCallback
    114231        self.lostCallback = lostCallback
    115232
    116     def checkWork(self):
    117         finished = 0
    118         fullDataRead = []
    119233
    120         while 1:
    121             try:
    122                 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
    123                 # finished = (result == -1)
    124                 if not bytesToRead:
    125                     break
    126                 hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
    127                 fullDataRead.append(data)
    128             except win32api.error:
    129                 finished = 1
    130                 break
     234    def _read(self):
     235        """
     236        Attempts to read from the pipe (non-blocking).
    131237
    132         dataBuf = ''.join(fullDataRead)
    133         if dataBuf:
    134             self.receivedCallback(dataBuf)
    135         if finished:
    136             self.cleanup()
    137         return len(dataBuf)
     238        @return: None if there is an error, an empty string if there is no
     239        data to read, or a string containing the data.
     240        """
     241        try:
     242            _, bytesToRead, _ = win32pipe.PeekNamedPipe(self.handle, 1)
     243        except pywintypes.error:
     244            return None
     245        if not bytesToRead:
     246            return ''
     247        _, data = win32file.ReadFile(self.handle, bytesToRead, None)
     248        return data
    138249
    139     def cleanup(self):
    140         self.deactivate()
    141         self.lostCallback()
    142250
    143     def close(self):
     251    def _closeRead(self):
     252        """
     253        Closes the pipe, ignoring any errors.
     254        """
    144255        try:
    145             win32api.CloseHandle(self.pipe)
     256            win32api.CloseHandle(self.handle)
    146257        except pywintypes.error:
    147             # You can't close std handles...?
    148258            pass
    149259
    150     def stopProducing(self):
    151         self.close()
    152260
    153     def pauseProducing(self):
     261    def checkWork(self):
     262        """
     263        Called by C{_PollingTimer()} to try reading. An error causes the
     264        connection to be lost.
     265
     266        @return: number of bytes read, may be zero.
     267        """
     268        data = self._read()
     269        if data is None:
     270            self.readConnectionLost(main.CONNECTION_DONE)
     271            return 0
     272        if data:
     273            self.receivedCallback(data)
     274        return len(data)
     275
     276
     277    def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
     278        """
     279        This is a reader not a writer so we set C{_writeDisconnected} True
     280        so that the disconnect is immediate and then call up into
     281        C{FileDescriptor}.
     282        """
     283        self._writeDisconnected = True
     284        FileDescriptor.loseConnection(self, _connDone)
     285
     286
     287    def connectionLost(self, reason):
     288        """
     289        Here we clean-up, call up into the C{FileDescriptor} and then call our
     290        C{lostCallback()}.
     291        """
     292        self._closeRead()
     293        FileDescriptor.connectionLost(self, reason)
     294        self.lostCallback()
     295
     296
     297    def stopReading(self):
     298        """
     299        Prevents C{checkWork()} from being called.
     300        """
    154301        self.deactivate()
    155302
    156     def resumeProducing(self):
     303
     304    def startReading(self):
     305        """
     306        C{checkWork()} will now be called again.
     307        """
    157308        self.activate()
    158309
    159310
    160 FULL_BUFFER_SIZE = 64 * 1024
     311    def stopWriting(self):
     312        """
     313        Prevents this reader from being removed from the reactor.
     314        """
     315        pass
    161316
    162 class _PollableWritePipe(_PollableResource):
    163317
     318    def startWriting(self):
     319        """
     320        Prevents this reader from being added to the reactor.
     321        """
     322        pass
     323
     324
     325
     326class _PollableWriter(_PollableResource, FileDescriptor):
     327    """
     328    A generic pollable writer implementing C{IConsumer}.
     329
     330    @ivar handle: See L{__init__}.
     331    @ivar lostCallback: See L{__init__}.
     332    @ivar connected: True if pipe is connected or file open.
     333    @type connected: C{bool} or C{int}.
     334    """
     335
    164336    implements(IConsumer)
    165337
    166     def __init__(self, writePipe, lostCallback):
    167         self.disconnecting = False
    168         self.producer = None
    169         self.producerPaused = 0
    170         self.streamingProducer = 0
    171         self.outQueue = []
    172         self.writePipe = writePipe
     338    def __init__(self, handle, lostCallback):
     339        """
     340        @param handle: Handle to a file or pipe.
     341        @type  handle: pywin32.PyHandle
     342
     343        @param lostCallback: connection lost
     344        @type  lostCallback: C{function} callback()
     345        """
     346        FileDescriptor.__init__(self)
     347        self.handle = handle
     348        self.connected = True
    173349        self.lostCallback = lostCallback
    174350        try:
    175             win32pipe.SetNamedPipeHandleState(writePipe,
     351            win32pipe.SetNamedPipeHandleState(self.handle,
    176352                                              win32pipe.PIPE_NOWAIT,
    177353                                              None,
    178354                                              None)
    179355        except pywintypes.error:
    180             # Maybe it's an invalid handle.  Who knows.
     356            # fails if not a pipe handle, e.g. it's a file handle
    181357            pass
    182358
    183     def close(self):
    184         self.disconnecting = True
    185359
    186     def bufferFull(self):
    187         if self.producer is not None:
    188             self.producerPaused = 1
    189             self.producer.pauseProducing()
     360    def _isWriteClosed(self):
     361        """
     362        Tests to see if the pipe is closed.
    190363
    191     def bufferEmpty(self):
    192         if self.producer is not None and ((not self.streamingProducer) or
    193                                           self.producerPaused):
    194             self.producer.producerPaused = 0
    195             self.producer.resumeProducing()
     364        @return: True if the pipe has been closed, False otherwise.
     365        """
     366        try:
     367            win32file.WriteFile(self.handle, '', None)
     368            return False
     369        except pywintypes.error:
    196370            return True
    197         return False
    198371
    199     # almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh
    200372
    201     def registerProducer(self, producer, streaming):
    202         """Register to receive data from a producer.
     373    def _bufferSize(self):
     374        """
     375        Queries the current pipe write buffer size.
    203376
    204         This sets this selectable to be a consumer for a producer.  When this
    205         selectable runs out of data on a write() call, it will ask the producer
    206         to resumeProducing(). A producer should implement the IProducer
    207         interface.
     377        @return: Current size of the pipe buffer or None if there is an error.
     378        """
     379        try:
     380            _, writeBufferSize, _, _ = win32pipe.GetNamedPipeInfo(self.handle)
     381            return writeBufferSize
     382        except pywintypes.error:
     383            return None
    208384
    209         FileDescriptor provides some infrastructure for producer methods.
     385
     386    def _write(self, data):
    210387        """
    211         if self.producer is not None:
    212             raise RuntimeError(
    213                 "Cannot register producer %s, because producer %s was never "
    214                 "unregistered." % (producer, self.producer))
    215         if not self.active:
    216             producer.stopProducing()
    217         else:
    218             self.producer = producer
    219             self.streamingProducer = streaming
    220             if not streaming:
    221                 producer.resumeProducing()
     388        Writes data to the pipe, handling errors.
    222389
    223     def unregisterProducer(self):
    224         """Stop consuming data from a producer, without disconnecting.
     390        @param data: data to write to the pipe.
     391        @type  data: C{string}
     392
     393        @return: Number of bytes written or None if error.
    225394        """
    226         self.producer = None
     395        try:
     396            _, bytesWritten = win32file.WriteFile(self.handle, data, None)
     397            return bytesWritten
     398        except pywintypes.error:
     399            return None
    227400
    228     def writeConnectionLost(self):
    229         self.deactivate()
     401
     402    def _closeWrite(self):
     403        """
     404        Closes the pipe, ignoring any errors.
     405        """
    230406        try:
    231             win32api.CloseHandle(self.writePipe)
     407            win32api.CloseHandle(self.handle)
    232408        except pywintypes.error:
    233             # OMG what
    234409            pass
    235         self.lostCallback()
    236410
    237411
    238     def writeSequence(self, seq):
     412    def checkWork(self):
    239413        """
    240         Append a C{list} or C{tuple} of bytes to the output buffer.
     414        Called by C{_PollingTimer()} to process C{FileDescriptor} write
     415        buffers and detect closure of the pipe.
    241416
    242         @param seq: C{list} or C{tuple} of C{str} instances to be appended to
    243             the output buffer.
     417        Leverages C{FileDescriptor} to do the heavy lifting. An error causes
     418        the connection to be lost.
    244419
    245         @raise TypeError: If C{seq} contains C{unicode}.
     420        @return: Zero if connection lost, one if still active.
    246421        """
    247         if unicode in map(type, seq):
    248             raise TypeError("Unicode not allowed in output buffer.")
    249         self.outQueue.extend(seq)
     422        self.doWrite()
     423        self.startWriting()
     424        if self._isWriteClosed():
     425            self.connectionLost(main.CONNECTION_LOST)
     426            if self.lostCallback:
     427                self.lostCallback()
     428                self.lostCallback = None
     429            return 0
     430        return 1
    250431
    251432
    252     def write(self, data):
     433    def writeSomeData(self, data):
    253434        """
    254         Append some bytes to the output buffer.
     435        Called from C{FileDescriptor} to do the actual writing.
    255436
    256         @param data: C{str} to be appended to the output buffer.
    257         @type data: C{str}.
     437        Avoids writing more data than the pipe can handle. Excess data is put
     438        back into the write buffers by C{FileDescriptor}.
    258439
    259         @raise TypeError: If C{data} is C{unicode} instead of C{str}.
     440        @return: Number of bytes written or main.CONNECTION_LOST on error.
    260441        """
    261         if isinstance(data, unicode):
    262             raise TypeError("Unicode not allowed in output buffer.")
    263         if self.disconnecting:
    264             return
    265         self.outQueue.append(data)
    266         if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
    267             self.bufferFull()
     442        size = self._bufferSize()
     443        if size is not None:
     444            data = lazyByteSlice(data, 0, size)
     445        bytesWritten = self._write(data)
     446        if bytesWritten is None:
     447            return main.CONNECTION_LOST
     448        return bytesWritten
    268449
    269450
    270     def checkWork(self):
    271         numBytesWritten = 0
    272         if not self.outQueue:
    273             if self.disconnecting:
    274                 self.writeConnectionLost()
    275                 return 0
    276             try:
    277                 win32file.WriteFile(self.writePipe, '', None)
    278             except pywintypes.error:
    279                 self.writeConnectionLost()
    280                 return numBytesWritten
    281         while self.outQueue:
    282             data = self.outQueue.pop(0)
    283             errCode = 0
    284             try:
    285                 errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
    286                                                              data, None)
    287             except win32api.error:
    288                 self.writeConnectionLost()
    289                 break
    290             else:
    291                 # assert not errCode, "wtf an error code???"
    292                 numBytesWritten += nBytesWritten
    293                 if len(data) > nBytesWritten:
    294                     self.outQueue.insert(0, data[nBytesWritten:])
    295                     break
    296         else:
    297             resumed = self.bufferEmpty()
    298             if not resumed and self.disconnecting:
    299                 self.writeConnectionLost()
    300         return numBytesWritten
     451    def _postLoseConnection(self):
     452        """
     453        Under normal circumstances, clean-up is performed here and the
     454        C{lostCallback()} triggered.
     455        """
     456        self._closeWrite()
     457        if self.lostCallback:
     458            self.lostCallback()
     459            self.lostCallback = None
     460
     461
     462    def stopWriting(self):
     463        """
     464        Prevents C{checkWork()} from being called.
     465        """
     466        self.deactivate()
     467
     468
     469    def startWriting(self):
     470        """
     471        C{checkWork()} will now be called again.
     472        """
     473        self.activate()
     474
     475
     476    def stopReading(self):
     477        """
     478        Prevents this writer from being removed from the reactor.
     479        """
     480        pass
     481
     482
     483    def startReading(self):
     484        """
     485        Prevents this writer from being added to the reactor.
     486        """
     487        pass
     488
     489
     490
     491class _PollableReadPipe(_PollableReader):
     492    """
     493    Implements a pollable read-pipe derived from C{_PollableReader}.
     494    """
     495    def __init__(self, handle, receivedCallback, lostCallback):
     496        """
     497        See L{_PollableReader.__init__()}.
     498        """
     499        _PollableReader.__init__(self, handle, receivedCallback, lostCallback)
     500
     501
     502    def close(self):
     503        """
     504        Provided for backwards compatibility with the old _PollableReadPipe().
     505        Calls loseConnection().
     506        """
     507        self.loseConnection()
     508
     509
     510
     511class _PollableWritePipe(_PollableWriter):
     512    """
     513    Implements a pollable write-pipe derived from C{_PollableWriter}.
     514    """
     515    def __init__(self, handle, lostCallback):
     516        """
     517        See L{_PollableWriter.__init__()}.
     518        """
     519        _PollableWriter.__init__(self, handle, lostCallback)
     520
     521
     522    def close(self):
     523        """
     524        Provided for backwards compatibility with the old _PollableWritePipe().
     525        Calls C{loseConnection()}.
     526        """
     527        self.loseConnection()
     528
     529
     530
     531all = [_PollableReadPipe, _PollableWritePipe]
     532