Ticket #1918: kqueue.patch

File kqueue.patch, 10.4 KB (added by oberstet, 5 years ago)

select.kqueue based reactor

  • twisted/internet/test/test_fdset.py

     
    286286        reactor = self.buildReactor()
    287287
    288288        name = reactor.__class__.__name__
    289         if name in ('EPollReactor', 'CFReactor'):
     289        if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'):
    290290            # Closing a file descriptor immediately removes it from the epoll
    291291            # set without generating a notification.  That means epollreactor
    292292            # will not call any methods on Victim after the close, so there's
  • twisted/internet/kqreactor.py

     
    1010    | from twisted.internet import kqreactor
    1111    | kqreactor.install()
    1212
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/}
    15 
    16 
    17 
    18 You're going to need to patch PyKqueue::
    19 
    20     =====================================================
    21     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001
    22     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002
    23     @@ -137,7 +137,7 @@
    24      }
    25      
    26      statichere PyTypeObject KQEvent_Type = {
    27     -  PyObject_HEAD_INIT(NULL)
    28     +  PyObject_HEAD_INIT(&PyType_Type)
    29        0,                             // ob_size
    30        "KQEvent",                     // tp_name
    31        sizeof(KQEventObject),         // tp_basicsize
    32     @@ -291,13 +291,14 @@
    33      
    34        /* Build timespec for timeout */
    35        totimespec.tv_sec = timeout / 1000;
    36     -  totimespec.tv_nsec = (timeout % 1000) * 100000;
    37     +  totimespec.tv_nsec = (timeout % 1000) * 1000000;
    38      
    39        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec);
    40      
    41        /* Make the call */
    42     -
    43     +  Py_BEGIN_ALLOW_THREADS
    44        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec);
    45     +  Py_END_ALLOW_THREADS
    46      
    47        /* Don't need the input event list anymore, so get rid of it */
    48        free (changelist);
    49     @@ -361,7 +362,7 @@
    50      statichere PyTypeObject KQueue_Type = {
    51             /* The ob_type field must be initialized in the module init function
    52              * to be portable to Windows without using C++. */
    53     -   PyObject_HEAD_INIT(NULL)
    54     +   PyObject_HEAD_INIT(&PyType_Type)
    55             0,                  /*ob_size*/
    56             "KQueue",                   /*tp_name*/
    57             sizeof(KQueueObject),       /*tp_basicsize*/
    58 
     13This implementation depends on Python 2.6 or higher which has kqueue support
     14built in the select module.
    5915"""
    6016
    6117import errno, sys
    6218
    6319from zope.interface import implements
    6420
    65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD
    66 from kqsyscall import kqueue, kevent
     21from select import kqueue, kevent
     22from select import KQ_FILTER_READ, KQ_FILTER_WRITE, KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF
    6723
    6824from twisted.internet.interfaces import IReactorFDSet
    6925
     
    7329
    7430class KQueueReactor(posixbase.PosixReactorBase):
    7531    """
    76     A reactor that uses kqueue(2)/kevent(2).
     32    A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher
     33    which has built in support for kqueue in the select module.
    7734
    7835    @ivar _kq: A L{kqueue} which will be used to check for I/O readiness.
    7936
     
    9754    """
    9855    implements(IReactorFDSet)
    9956
     57
    10058    def __init__(self):
    10159        """
    10260        Initialize kqueue object, file descriptor tracking dictionaries, and the
     
    10967        posixbase.PosixReactorBase.__init__(self)
    11068
    11169
    112     def _updateRegistration(self, *args):
    113         self._kq.kevent([kevent(*args)], 0, 0)
     70    def _updateRegistration(self, fd, filter, op):
     71        """
     72        Private method for changing kqueue registration.
     73        """
     74        self._kq.control([kevent(fd, filter, op)], 0, 0)
    11475
     76
    11577    def addReader(self, reader):
    116         """Add a FileDescriptor for notification of data available to read.
    11778        """
     79        Add a FileDescriptor for notification of data available to read.
     80        """
    11881        fd = reader.fileno()
    11982        if fd not in self._reads:
    120             self._selectables[fd] = reader
    121             self._reads[fd] = 1
    122             self._updateRegistration(fd, EVFILT_READ, EV_ADD)
     83            try:
     84                self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
     85            except OSError, e:
     86                pass
     87            finally:
     88                self._selectables[fd] = reader
     89                self._reads[fd] = 1
    12390
     91
    12492    def addWriter(self, writer):
    125         """Add a FileDescriptor for notification of data available to write.
    12693        """
     94        Add a FileDescriptor for notification of data available to write.
     95        """
    12796        fd = writer.fileno()
    12897        if fd not in self._writes:
    129             self._selectables[fd] = writer
    130             self._writes[fd] = 1
    131             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD)
     98            try:
     99                self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
     100            except OSError, e:
     101                pass
     102            finally:
     103                self._selectables[fd] = writer
     104                self._writes[fd] = 1
    132105
     106
    133107    def removeReader(self, reader):
    134         """Remove a Selectable for notification of data available to read.
    135108        """
    136         fd = reader.fileno()
     109        Remove a Selectable for notification of data available to read.
     110        """
     111        wasLost = False
     112        try:
     113            fd = reader.fileno()
     114        except:
     115            fd = -1
     116        if fd == -1:
     117            for fd, fdes in self._selectables.items():
     118                if reader is fdes:
     119                    wasLost = True
     120                    break
     121            else:
     122                return
    137123        if fd in self._reads:
    138124            del self._reads[fd]
    139125            if fd not in self._writes:
    140126                del self._selectables[fd]
    141             self._updateRegistration(fd, EVFILT_READ, EV_DELETE)
     127            if not wasLost:
     128                try:
     129                    self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE)
     130                except OSError, e:
     131                    pass
    142132
     133
    143134    def removeWriter(self, writer):
    144         """Remove a Selectable for notification of data available to write.
    145135        """
    146         fd = writer.fileno()
     136        Remove a Selectable for notification of data available to write.
     137        """
     138        wasLost = False
     139        try:
     140            fd = writer.fileno()
     141        except:
     142            fd = -1
     143        if fd == -1:
     144            for fd, fdes in self._selectables.items():
     145                if writer is fdes:
     146                    wasLost = True
     147                    break
     148            else:
     149                return
    147150        if fd in self._writes:
    148151            del self._writes[fd]
    149152            if fd not in self._reads:
    150153                del self._selectables[fd]
    151             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE)
     154            if not wasLost:
     155                try:
     156                    self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE)
     157                except OSError, e:
     158                    pass
    152159
     160
    153161    def removeAll(self):
    154162        """
    155163        Remove all selectables, and return a list of them.
     
    168176
    169177
    170178    def doKEvent(self, timeout):
    171         """Poll the kqueue for new events."""
     179        """
     180        Poll the kqueue for new events.
     181        """
    172182        if timeout is None:
    173             timeout = 1000
    174         else:
    175             timeout = int(timeout * 1000) # convert seconds to milliseconds
     183            timeout = 1
    176184
    177185        try:
    178             l = self._kq.kevent([], len(self._selectables), timeout)
     186            l = self._kq.control([], len(self._selectables), timeout)
    179187        except OSError, e:
    180188            if e[0] == errno.EINTR:
    181189                return
    182190            else:
    183191                raise
     192
    184193        _drdw = self._doWriteOrRead
    185194        for event in l:
    186             why = None
    187             fd, filter = event.ident, event.filter
     195            fd = event.ident
    188196            try:
    189197                selectable = self._selectables[fd]
    190198            except KeyError:
    191199                # Handles the infrequent case where one selectable's
    192200                # handler disconnects another.
    193201                continue
    194             log.callWithLogger(selectable, _drdw, selectable, fd, filter)
     202            else:
     203                log.callWithLogger(selectable, _drdw, selectable, fd, event)
    195204
    196     def _doWriteOrRead(self, selectable, fd, filter):
    197         try:
    198             if filter == EVFILT_READ:
    199                 why = selectable.doRead()
    200             if filter == EVFILT_WRITE:
    201                 why = selectable.doWrite()
    202             if not selectable.fileno() == fd:
    203                 why = main.CONNECTION_LOST
    204         except:
    205             why = sys.exc_info()[1]
    206             log.deferr()
    207205
     206    def _doWriteOrRead(self, selectable, fd, event):
     207        why = None
     208        inRead = False
     209        filter, flags, data, fflags = event.filter, event.flags, event.data, event.fflags
     210
     211        if flags & KQ_EV_EOF and data and fflags:
     212            why = main.CONNECTION_LOST
     213        else:
     214            try:
     215                if selectable.fileno() == -1:
     216                    inRead = False
     217                    why = posixbase._NO_FILEDESC
     218                else:
     219                   if filter == KQ_FILTER_READ:
     220                       inRead = True
     221                       why = selectable.doRead()
     222                   if filter == KQ_FILTER_WRITE:
     223                       inRead = False
     224                       why = selectable.doWrite()
     225            except:
     226                # Any exception from application code gets logged and will
     227                # cause us to disconnect the selectable.
     228                why = sys.exc_info()[1]
     229                log.err()
     230
    208231        if why:
    209             self.removeReader(selectable)
    210             self.removeWriter(selectable)
    211             selectable.connectionLost(failure.Failure(why))
     232            self._disconnectSelectable(selectable, why, inRead)
    212233
    213234    doIteration = doKEvent
    214235
    215236
    216237def install():
    217     k = KQueueReactor()
    218     main.installReactor(k)
     238    """
     239    Install the kqueue() reactor.
     240    """
     241    p = KQueueReactor()
     242    from twisted.internet.main import installReactor
     243    installReactor(p)
    219244
    220245
    221246__all__ = ["KQueueReactor", "install"]