Ticket #1918: kqueue.2.patch

File kqueue.2.patch, 12.1 KB (added by oberstet, 5 years ago)

Revised patch after review

  • twisted/internet/test/test_fdset.py

     
    286286        reactor = self.buildReactor()
    287287
    288288        name = reactor.__class__.__name__
    289         if name in ('EPollReactor', 'CFReactor'):
     289        if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'):
    290290            # Closing a file descriptor immediately removes it from the epoll
    291291            # set without generating a notification.  That means epollreactor
    292292            # will not call any methods on Victim after the close, so there's
  • twisted/internet/kqreactor.py

     
    44"""
    55A kqueue()/kevent() based implementation of the Twisted main loop.
    66
    7 To install the event loop (and you should do this before any connections,
    8 listeners or connectors are added)::
     7To use this reactor, start your application specifying the kqueue reactor:
    98
    10     | from twisted.internet import kqreactor
    11     | kqreactor.install()
     9   twistd ... --reactor kqueue
    1210
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/}
     11To install the event loop from code (and you should do this before any
     12connections, listeners or connectors are added)::
    1513
     14   from twisted.internet import kqreactor
     15   kqreactor.install()
    1616
     17This implementation depends on Python 2.6 or higher which has kqueue support
     18built in the select module.
    1719
    18 You're going to need to patch PyKqueue::
     20Note, that you should use Python 2.6.5 or higher, since previous implementations
     21of select.kqueue had
    1922
    20     =====================================================
    21     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001
    22     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002
    23     @@ -137,7 +137,7 @@
    24      }
    25      
    26      statichere PyTypeObject KQEvent_Type = {
    27     -  PyObject_HEAD_INIT(NULL)
    28     +  PyObject_HEAD_INIT(&PyType_Type)
    29        0,                             // ob_size
    30        "KQEvent",                     // tp_name
    31        sizeof(KQEventObject),         // tp_basicsize
    32     @@ -291,13 +291,14 @@
    33      
    34        /* Build timespec for timeout */
    35        totimespec.tv_sec = timeout / 1000;
    36     -  totimespec.tv_nsec = (timeout % 1000) * 100000;
    37     +  totimespec.tv_nsec = (timeout % 1000) * 1000000;
    38      
    39        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec);
    40      
    41        /* Make the call */
    42     -
    43     +  Py_BEGIN_ALLOW_THREADS
    44        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec);
    45     +  Py_END_ALLOW_THREADS
    46      
    47        /* Don't need the input event list anymore, so get rid of it */
    48        free (changelist);
    49     @@ -361,7 +362,7 @@
    50      statichere PyTypeObject KQueue_Type = {
    51             /* The ob_type field must be initialized in the module init function
    52              * to be portable to Windows without using C++. */
    53     -   PyObject_HEAD_INIT(NULL)
    54     +   PyObject_HEAD_INIT(&PyType_Type)
    55             0,                  /*ob_size*/
    56             "KQueue",                   /*tp_name*/
    57             sizeof(KQueueObject),       /*tp_basicsize*/
     23   http://bugs.python.org/issue5910
    5824
     25not yet fixed.
    5926"""
    6027
    61 import errno, sys
     28import errno
    6229
    6330from zope.interface import implements
    6431
    65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD
    66 from kqsyscall import kqueue, kevent
     32from select import kqueue, kevent
     33from select import KQ_FILTER_READ, KQ_FILTER_WRITE, \
     34                   KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF
    6735
    6836from twisted.internet.interfaces import IReactorFDSet
    6937
     
    7341
    7442class KQueueReactor(posixbase.PosixReactorBase):
    7543    """
    76     A reactor that uses kqueue(2)/kevent(2).
     44    A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher
     45    which has built in support for kqueue in the select module.
    7746
    7847    @ivar _kq: A L{kqueue} which will be used to check for I/O readiness.
    7948
     
    9766    """
    9867    implements(IReactorFDSet)
    9968
     69
    10070    def __init__(self):
    10171        """
    10272        Initialize kqueue object, file descriptor tracking dictionaries, and the
     
    10979        posixbase.PosixReactorBase.__init__(self)
    11080
    11181
    112     def _updateRegistration(self, *args):
    113         self._kq.kevent([kevent(*args)], 0, 0)
     82    def _updateRegistration(self, fd, filter, op):
     83        """
     84        Private method for changing kqueue registration on a given FD
     85        filtering for events given filter/op. This will never block and
     86        returns nothing.
     87        """
     88        self._kq.control([kevent(fd, filter, op)], 0, 0)
    11489
     90
    11591    def addReader(self, reader):
    116         """Add a FileDescriptor for notification of data available to read.
    11792        """
     93        Implement L{IReactorFDSet.addReader}.
     94        """
    11895        fd = reader.fileno()
    11996        if fd not in self._reads:
    120             self._selectables[fd] = reader
    121             self._reads[fd] = 1
    122             self._updateRegistration(fd, EVFILT_READ, EV_ADD)
     97            try:
     98                self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
     99            except OSError:
     100                pass
     101            finally:
     102                self._selectables[fd] = reader
     103                self._reads[fd] = 1
    123104
     105
    124106    def addWriter(self, writer):
    125         """Add a FileDescriptor for notification of data available to write.
    126107        """
     108        Implement L{IReactorFDSet.addWriter}.
     109        """
    127110        fd = writer.fileno()
    128111        if fd not in self._writes:
    129             self._selectables[fd] = writer
    130             self._writes[fd] = 1
    131             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD)
     112            try:
     113                self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
     114            except OSError:
     115                pass
     116            finally:
     117                self._selectables[fd] = writer
     118                self._writes[fd] = 1
    132119
     120
    133121    def removeReader(self, reader):
    134         """Remove a Selectable for notification of data available to read.
    135122        """
    136         fd = reader.fileno()
     123        Implement L{IReactorFDSet.removeReader}.
     124        """
     125        wasLost = False
     126        try:
     127            fd = reader.fileno()
     128        except:
     129            fd = -1
     130        if fd == -1:
     131            for fd, fdes in self._selectables.items():
     132                if reader is fdes:
     133                    wasLost = True
     134                    break
     135            else:
     136                return
    137137        if fd in self._reads:
    138138            del self._reads[fd]
    139139            if fd not in self._writes:
    140140                del self._selectables[fd]
    141             self._updateRegistration(fd, EVFILT_READ, EV_DELETE)
     141            if not wasLost:
     142                try:
     143                    self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE)
     144                except OSError:
     145                    pass
    142146
     147
    143148    def removeWriter(self, writer):
    144         """Remove a Selectable for notification of data available to write.
    145149        """
    146         fd = writer.fileno()
     150        Implement L{IReactorFDSet.removeWriter}.
     151        """
     152        wasLost = False
     153        try:
     154            fd = writer.fileno()
     155        except:
     156            fd = -1
     157        if fd == -1:
     158            for fd, fdes in self._selectables.items():
     159                if writer is fdes:
     160                    wasLost = True
     161                    break
     162            else:
     163                return
    147164        if fd in self._writes:
    148165            del self._writes[fd]
    149166            if fd not in self._reads:
    150167                del self._selectables[fd]
    151             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE)
     168            if not wasLost:
     169                try:
     170                    self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE)
     171                except OSError:
     172                    pass
    152173
     174
    153175    def removeAll(self):
    154176        """
    155         Remove all selectables, and return a list of them.
     177        Implement L{IReactorFDSet.removeAll}.
    156178        """
    157179        return self._removeAll(
    158180            [self._selectables[fd] for fd in self._reads],
     
    160182
    161183
    162184    def getReaders(self):
     185        """
     186        Implement L{IReactorFDSet.getReaders}.
     187        """
    163188        return [self._selectables[fd] for fd in self._reads]
    164189
    165190
    166191    def getWriters(self):
     192        """
     193        Implement L{IReactorFDSet.getWriters}.
     194        """
    167195        return [self._selectables[fd] for fd in self._writes]
    168196
    169197
    170198    def doKEvent(self, timeout):
    171         """Poll the kqueue for new events."""
     199        """
     200        Poll the kqueue for new events.
     201        """
    172202        if timeout is None:
    173             timeout = 1000
    174         else:
    175             timeout = int(timeout * 1000) # convert seconds to milliseconds
     203            timeout = 1
    176204
    177205        try:
    178             l = self._kq.kevent([], len(self._selectables), timeout)
     206            l = self._kq.control([], len(self._selectables), timeout)
    179207        except OSError, e:
    180208            if e[0] == errno.EINTR:
    181209                return
    182210            else:
    183211                raise
     212
    184213        _drdw = self._doWriteOrRead
    185214        for event in l:
    186             why = None
    187             fd, filter = event.ident, event.filter
     215            fd = event.ident
    188216            try:
    189217                selectable = self._selectables[fd]
    190218            except KeyError:
    191219                # Handles the infrequent case where one selectable's
    192220                # handler disconnects another.
    193221                continue
    194             log.callWithLogger(selectable, _drdw, selectable, fd, filter)
     222            else:
     223                log.callWithLogger(selectable, _drdw, selectable, fd, event)
    195224
    196     def _doWriteOrRead(self, selectable, fd, filter):
    197         try:
    198             if filter == EVFILT_READ:
    199                 why = selectable.doRead()
    200             if filter == EVFILT_WRITE:
    201                 why = selectable.doWrite()
    202             if not selectable.fileno() == fd:
    203                 why = main.CONNECTION_LOST
    204         except:
    205             why = sys.exc_info()[1]
    206             log.deferr()
    207225
     226    def _doWriteOrRead(self, selectable, fd, event):
     227        """
     228        Private method called when a FD is ready for reading, writing or was
     229        lost. Do the work and raise errors where necessary.
     230        """
     231        why = None
     232        inRead = False
     233        (filter, flags, data, fflags) = (
     234            event.filter, event.flags, event.data, event.fflags)
     235
     236        if flags & KQ_EV_EOF and data and fflags:
     237            why = main.CONNECTION_LOST
     238        else:
     239            try:
     240                if selectable.fileno() == -1:
     241                    inRead = False
     242                    why = posixbase._NO_FILEDESC
     243                else:
     244                   if filter == KQ_FILTER_READ:
     245                       inRead = True
     246                       why = selectable.doRead()
     247                   if filter == KQ_FILTER_WRITE:
     248                       inRead = False
     249                       why = selectable.doWrite()
     250            except:
     251                # Any exception from application code gets logged and will
     252                # cause us to disconnect the selectable.
     253                why = failure.Failure()
     254                log.err(why)
     255
    208256        if why:
    209             self.removeReader(selectable)
    210             self.removeWriter(selectable)
    211             selectable.connectionLost(failure.Failure(why))
     257            self._disconnectSelectable(selectable, why, inRead)
    212258
    213259    doIteration = doKEvent
    214260
    215261
    216262def install():
    217     k = KQueueReactor()
    218     main.installReactor(k)
     263    """
     264    Install the kqueue() reactor.
     265    """
     266    p = KQueueReactor()
     267    from twisted.internet.main import installReactor
     268    installReactor(p)
    219269
    220270
    221271__all__ = ["KQueueReactor", "install"]
  • NEWS

     
    2323 - twisted.protocols.ftp.FTP.ftp_STOR now catches `FTPCmdError`s
    2424   raised by the file writer, and returns the error back to the
    2525   client. (#4909)
     26 - The kqueue reactor has been revived. (#1918)
    2627
    2728Bugfixes
    2829--------