Ticket #1918: kqueue.3.patch

File kqueue.3.patch, 14.3 KB (added by oberstet, 5 years ago)

Revised patch : daemonization, correct news file

  • twisted/scripts/_twistd_unix.py

     
    156156
    157157
    158158def daemonize():
    159     # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
     159    # See:
     160    # http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
     161    # http://code.activestate.com/recipes/278731/
     162    from twisted.internet import reactor
     163    if hasattr(reactor, '_beforeFork') and callable(reactor._beforeFork):
     164        reactor._beforeFork()
    160165    if os.fork():   # launch child and...
    161166        os._exit(0) # kill off parent
    162167    os.setsid()
     
    170175            if e.errno != errno.EBADF:
    171176                raise
    172177    os.close(null)
     178    if hasattr(reactor, '_afterFork') and callable(reactor._afterFork):
     179        reactor._afterFork()
    173180
    174181
    175182
  • twisted/internet/test/test_fdset.py

     
    286286        reactor = self.buildReactor()
    287287
    288288        name = reactor.__class__.__name__
    289         if name in ('EPollReactor', 'CFReactor'):
     289        if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'):
    290290            # Closing a file descriptor immediately removes it from the epoll
    291291            # set without generating a notification.  That means epollreactor
    292292            # will not call any methods on Victim after the close, so there's
  • twisted/internet/kqreactor.py

     
    44"""
    55A kqueue()/kevent() based implementation of the Twisted main loop.
    66
    7 To install the event loop (and you should do this before any connections,
    8 listeners or connectors are added)::
     7To use this reactor, start your application specifying the kqueue reactor:
    98
    10     | from twisted.internet import kqreactor
    11     | kqreactor.install()
     9   twistd ... --reactor kqueue
    1210
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/}
     11To install the event loop from code (and you should do this before any
     12connections, listeners or connectors are added)::
    1513
     14   from twisted.internet import kqreactor
     15   kqreactor.install()
    1616
     17This implementation depends on Python 2.6 or higher which has kqueue support
     18built in the select module.
    1719
    18 You're going to need to patch PyKqueue::
     20Note, that you should use Python 2.6.5 or higher, since previous implementations
     21of select.kqueue had
    1922
    20     =====================================================
    21     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001
    22     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002
    23     @@ -137,7 +137,7 @@
    24      }
    25      
    26      statichere PyTypeObject KQEvent_Type = {
    27     -  PyObject_HEAD_INIT(NULL)
    28     +  PyObject_HEAD_INIT(&PyType_Type)
    29        0,                             // ob_size
    30        "KQEvent",                     // tp_name
    31        sizeof(KQEventObject),         // tp_basicsize
    32     @@ -291,13 +291,14 @@
    33      
    34        /* Build timespec for timeout */
    35        totimespec.tv_sec = timeout / 1000;
    36     -  totimespec.tv_nsec = (timeout % 1000) * 100000;
    37     +  totimespec.tv_nsec = (timeout % 1000) * 1000000;
    38      
    39        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec);
    40      
    41        /* Make the call */
    42     -
    43     +  Py_BEGIN_ALLOW_THREADS
    44        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec);
    45     +  Py_END_ALLOW_THREADS
    46      
    47        /* Don't need the input event list anymore, so get rid of it */
    48        free (changelist);
    49     @@ -361,7 +362,7 @@
    50      statichere PyTypeObject KQueue_Type = {
    51             /* The ob_type field must be initialized in the module init function
    52              * to be portable to Windows without using C++. */
    53     -   PyObject_HEAD_INIT(NULL)
    54     +   PyObject_HEAD_INIT(&PyType_Type)
    55             0,                  /*ob_size*/
    56             "KQueue",                   /*tp_name*/
    57             sizeof(KQueueObject),       /*tp_basicsize*/
     23   http://bugs.python.org/issue5910
    5824
     25not yet fixed.
    5926"""
    6027
    61 import errno, sys
     28import errno
    6229
    6330from zope.interface import implements
    6431
    65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD
    66 from kqsyscall import kqueue, kevent
     32from select import kqueue, kevent
     33from select import KQ_FILTER_READ, KQ_FILTER_WRITE, \
     34                   KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF
    6735
    6836from twisted.internet.interfaces import IReactorFDSet
    6937
     
    7341
    7442class KQueueReactor(posixbase.PosixReactorBase):
    7543    """
    76     A reactor that uses kqueue(2)/kevent(2).
     44    A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher
     45    which has built in support for kqueue in the select module.
    7746
    7847    @ivar _kq: A L{kqueue} which will be used to check for I/O readiness.
    7948
     
    9766    """
    9867    implements(IReactorFDSet)
    9968
     69
    10070    def __init__(self):
    10171        """
    10272        Initialize kqueue object, file descriptor tracking dictionaries, and the
    10373        base class.
     74
     75        See:
     76            - http://docs.python.org/library/select.html
     77            - www.freebsd.org/cgi/man.cgi?query=kqueue
     78            - people.freebsd.org/~jlemon/papers/kqueue.pdf
    10479        """
    10580        self._kq = kqueue()
    10681        self._reads = {}
     
    10984        posixbase.PosixReactorBase.__init__(self)
    11085
    11186
    112     def _updateRegistration(self, *args):
    113         self._kq.kevent([kevent(*args)], 0, 0)
     87    def _updateRegistration(self, fd, filter, op):
     88        """
     89        Private method for changing kqueue registration on a given FD
     90        filtering for events given filter/op. This will never block and
     91        returns nothing.
     92        """
     93        self._kq.control([kevent(fd, filter, op)], 0, 0)
    11494
     95
     96    def _beforeFork(self):
     97        """
     98        Twisted-internal method called during daemonization (when application
     99        is started via twistd). This is called right before the magic double
     100        forking done for daemonization. We cleanly close the kqueue() and later
     101        recreate it. This is needed since a) kqueue() are not inherited across
     102        forks and b) twistd will create the reactor already before daemonization
     103        (and will also add at least 1 reader to the reactor, an instance of
     104        twisted.internet.posixbase._UnixWaker).
     105
     106        See: twisted.scripts._twistd_unix.daemonize()
     107        """
     108        self._kq.close()
     109        self._kq = None
     110
     111
     112    def _afterFork(self):
     113        """
     114        Twisted-internal method called during daemonization. This is called right
     115        after daemonization and recreates the kqueue() and any readers/writers
     116        that were added before. Note that you MUST NOT call any reactor methods
     117        in between _beforeFork and _afterFork!
     118        """
     119        self._kq = kqueue()
     120        for fd in self._reads:
     121            self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
     122        for fd in self._writes:
     123            self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
     124
     125
    115126    def addReader(self, reader):
    116         """Add a FileDescriptor for notification of data available to read.
    117127        """
     128        Implement L{IReactorFDSet.addReader}.
     129        """
    118130        fd = reader.fileno()
    119131        if fd not in self._reads:
    120             self._selectables[fd] = reader
    121             self._reads[fd] = 1
    122             self._updateRegistration(fd, EVFILT_READ, EV_ADD)
     132            try:
     133                self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
     134            except OSError:
     135                pass
     136            finally:
     137                self._selectables[fd] = reader
     138                self._reads[fd] = 1
    123139
     140
    124141    def addWriter(self, writer):
    125         """Add a FileDescriptor for notification of data available to write.
    126142        """
     143        Implement L{IReactorFDSet.addWriter}.
     144        """
    127145        fd = writer.fileno()
    128146        if fd not in self._writes:
    129             self._selectables[fd] = writer
    130             self._writes[fd] = 1
    131             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD)
     147            try:
     148                self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
     149            except OSError:
     150                pass
     151            finally:
     152                self._selectables[fd] = writer
     153                self._writes[fd] = 1
    132154
     155
    133156    def removeReader(self, reader):
    134         """Remove a Selectable for notification of data available to read.
    135157        """
    136         fd = reader.fileno()
     158        Implement L{IReactorFDSet.removeReader}.
     159        """
     160        wasLost = False
     161        try:
     162            fd = reader.fileno()
     163        except:
     164            fd = -1
     165        if fd == -1:
     166            for fd, fdes in self._selectables.items():
     167                if reader is fdes:
     168                    wasLost = True
     169                    break
     170            else:
     171                return
    137172        if fd in self._reads:
    138173            del self._reads[fd]
    139174            if fd not in self._writes:
    140175                del self._selectables[fd]
    141             self._updateRegistration(fd, EVFILT_READ, EV_DELETE)
     176            if not wasLost:
     177                try:
     178                    self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE)
     179                except OSError:
     180                    pass
    142181
     182
    143183    def removeWriter(self, writer):
    144         """Remove a Selectable for notification of data available to write.
    145184        """
    146         fd = writer.fileno()
     185        Implement L{IReactorFDSet.removeWriter}.
     186        """
     187        wasLost = False
     188        try:
     189            fd = writer.fileno()
     190        except:
     191            fd = -1
     192        if fd == -1:
     193            for fd, fdes in self._selectables.items():
     194                if writer is fdes:
     195                    wasLost = True
     196                    break
     197            else:
     198                return
    147199        if fd in self._writes:
    148200            del self._writes[fd]
    149201            if fd not in self._reads:
    150202                del self._selectables[fd]
    151             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE)
     203            if not wasLost:
     204                try:
     205                    self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE)
     206                except OSError:
     207                    pass
    152208
     209
    153210    def removeAll(self):
    154211        """
    155         Remove all selectables, and return a list of them.
     212        Implement L{IReactorFDSet.removeAll}.
    156213        """
    157214        return self._removeAll(
    158215            [self._selectables[fd] for fd in self._reads],
     
    160217
    161218
    162219    def getReaders(self):
     220        """
     221        Implement L{IReactorFDSet.getReaders}.
     222        """
    163223        return [self._selectables[fd] for fd in self._reads]
    164224
    165225
    166226    def getWriters(self):
     227        """
     228        Implement L{IReactorFDSet.getWriters}.
     229        """
    167230        return [self._selectables[fd] for fd in self._writes]
    168231
    169232
    170233    def doKEvent(self, timeout):
    171         """Poll the kqueue for new events."""
     234        """
     235        Poll the kqueue for new events.
     236        """
    172237        if timeout is None:
    173             timeout = 1000
    174         else:
    175             timeout = int(timeout * 1000) # convert seconds to milliseconds
     238            timeout = 1
    176239
    177240        try:
    178             l = self._kq.kevent([], len(self._selectables), timeout)
     241            l = self._kq.control([], len(self._selectables), timeout)
    179242        except OSError, e:
    180243            if e[0] == errno.EINTR:
    181244                return
    182245            else:
    183246                raise
     247
    184248        _drdw = self._doWriteOrRead
    185249        for event in l:
    186             why = None
    187             fd, filter = event.ident, event.filter
     250            fd = event.ident
    188251            try:
    189252                selectable = self._selectables[fd]
    190253            except KeyError:
    191254                # Handles the infrequent case where one selectable's
    192255                # handler disconnects another.
    193256                continue
    194             log.callWithLogger(selectable, _drdw, selectable, fd, filter)
     257            else:
     258                log.callWithLogger(selectable, _drdw, selectable, fd, event)
    195259
    196     def _doWriteOrRead(self, selectable, fd, filter):
    197         try:
    198             if filter == EVFILT_READ:
    199                 why = selectable.doRead()
    200             if filter == EVFILT_WRITE:
    201                 why = selectable.doWrite()
    202             if not selectable.fileno() == fd:
    203                 why = main.CONNECTION_LOST
    204         except:
    205             why = sys.exc_info()[1]
    206             log.deferr()
    207260
     261    def _doWriteOrRead(self, selectable, fd, event):
     262        """
     263        Private method called when a FD is ready for reading, writing or was
     264        lost. Do the work and raise errors where necessary.
     265        """
     266        why = None
     267        inRead = False
     268        (filter, flags, data, fflags) = (
     269            event.filter, event.flags, event.data, event.fflags)
     270
     271        if flags & KQ_EV_EOF and data and fflags:
     272            why = main.CONNECTION_LOST
     273        else:
     274            try:
     275                if selectable.fileno() == -1:
     276                    inRead = False
     277                    why = posixbase._NO_FILEDESC
     278                else:
     279                   if filter == KQ_FILTER_READ:
     280                       inRead = True
     281                       why = selectable.doRead()
     282                   if filter == KQ_FILTER_WRITE:
     283                       inRead = False
     284                       why = selectable.doWrite()
     285            except:
     286                # Any exception from application code gets logged and will
     287                # cause us to disconnect the selectable.
     288                why = failure.Failure()
     289                log.err(why)
     290
    208291        if why:
    209             self.removeReader(selectable)
    210             self.removeWriter(selectable)
    211             selectable.connectionLost(failure.Failure(why))
     292            self._disconnectSelectable(selectable, why, inRead)
    212293
    213294    doIteration = doKEvent
    214295
    215296
    216297def install():
    217     k = KQueueReactor()
    218     main.installReactor(k)
     298    """
     299    Install the kqueue() reactor.
     300    """
     301    p = KQueueReactor()
     302    from twisted.internet.main import installReactor
     303    installReactor(p)
    219304
    220305
    221306__all__ = ["KQueueReactor", "install"]
  • 1918.feature

     
     1The kqueue reactor has been revived. (#1918)