Ticket #1918: kqueue.6.patch

File kqueue.6.patch, 20.9 KB (added by oberstet, 5 years ago)

revised for review comments of exarkun

  • twisted/test/test_twistd.py

     
    2323from zope.interface.verify import verifyObject
    2424
    2525from twisted.trial import unittest
     26from twisted.test.test_process import MockOS
    2627
    2728from twisted import plugin
    2829from twisted.application.service import IServiceMaker
     
    3435from twisted.python.versions import Version
    3536from twisted.python.components import Componentized
    3637from twisted.internet.defer import Deferred
     38from twisted.internet.interfaces import IReactorDaemonize
    3739from twisted.python.fakepwd import UserDatabase
    3840
    3941try:
     
    616618        self.runner = UnixApplicationRunner({})
    617619
    618620
    619     def daemonize(self):
     621    def daemonize(self, daemonize_reactor, daemonize_os):
    620622        """
    621623        Indicate that daemonization has happened and change the PID so that the
    622624        value written to the pidfile can be tested in the daemonization case.
     
    806808
    807809
    808810
     811class FakeNonDaemonizingReactor:
     812    """
     813    A dummy reactor, providing C{beforeDaemonize} and C{afterDaemonize} methods,
     814    but not announcing this, and logging whether the methods have been called.
     815
     816    @ivar _beforeDaemonizeCalled: if C{beforeDaemonize} has been called or not.
     817    @type _beforeDaemonizeCalled: C{bool}
     818    @ivar _afterDaemonizeCalled: if C{afterDaemonize} has been called or not.
     819    @type _afterDaemonizeCalled: C{bool}
     820    """
     821
     822    def __init__(self):
     823        self._beforeDaemonizeCalled = False
     824        self._afterDaemonizeCalled = False
     825
     826    def beforeDaemonize(self):
     827        self._beforeDaemonizeCalled = True
     828
     829    def afterDaemonize(self):
     830        self._afterDaemonizeCalled = True
     831
     832
     833
     834class FakeDaemonizingReactor(FakeNonDaemonizingReactor):
     835    """
     836    A dummy reactor, providing C{beforeDaemonize} and C{afterDaemonize} methods,
     837    announcing this, and logging whether the methods have been called.
     838    """
     839
     840    implements(IReactorDaemonize)
     841
     842
     843
     844class ReactorDaemonizationTests(unittest.TestCase):
     845    """
     846    Tests for L{_twistd_unix.daemonize} and L{IReactorDaemonize}.
     847    """
     848    if _twistd_unix is None:
     849        skip = "twistd unix not available"
     850
     851
     852    def test_daemonizationHooksCalled(self):
     853        """
     854        L{_twistd_unix.daemonize} indeed calls
     855        L{IReactorDaemonize.beforeDaemonize} and
     856        L{IReactorDaemonize.afterDaemonize} if the reactor implements
     857        L{IReactorDaemonize}.
     858        """
     859        reactor = FakeDaemonizingReactor()
     860        os = MockOS()
     861        _twistd_unix.daemonize(reactor, os)
     862        msg = "At least one reactor daemonization hook WAS NOT called"
     863        self.assertTrue(reactor._beforeDaemonizeCalled and \
     864                        reactor._afterDaemonizeCalled, msg = msg)
     865
     866
     867    def test_daemonizationHooksNotCalled(self):
     868        """
     869        L{_twistd_unix.daemonize} does NOT call
     870        L{IReactorDaemonize.beforeDaemonize} or
     871        L{IReactorDaemonize.afterDaemonize} if the reactor does NOT
     872        implement L{IReactorDaemonize}.
     873        """
     874        reactor = FakeNonDaemonizingReactor()
     875        os = MockOS()
     876        _twistd_unix.daemonize(reactor, os)
     877        msg = "At least one reactor daemonization hook WAS called"
     878        self.assertTrue(not reactor._beforeDaemonizeCalled and \
     879                        not reactor._afterDaemonizeCalled, msg = msg)
     880
     881
     882
    809883class DummyReactor(object):
    810884    """
    811885    A dummy reactor, only providing a C{run} method and checking that it
  • twisted/topfiles/1918.feature

     
     1The kqueue reactor has been revived.
  • twisted/scripts/_twistd_unix.py

     
    77from twisted.python import log, syslog, logfile, usage
    88from twisted.python.util import switchUID, uidFromString, gidFromString
    99from twisted.application import app, service
     10from twisted.internet.interfaces import IReactorDaemonize
    1011from twisted import copyright
    1112
    1213
     
    155156
    156157
    157158
    158 def daemonize():
    159     # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
     159def daemonize(reactor, os):
     160    """
     161    Daemonizes the application on Unix. This is done by the usual double
     162    forking approach.
     163
     164    See:
     165      [1] http://code.activestate.com/recipes/278731/
     166      [2] W. Richard Stevens, "Advanced Programming in the Unix Environment",
     167          1992, Addison-Wesley, ISBN 0-201-56317-7
     168
     169    @type reactor: C{reactor}
     170    @param reactor: The reactor in use.
     171    @type os: C{os}
     172    @param os: The 'os' module to use for daemonization.
     173    """
     174
     175    ## If the reactor requires hooks to be called for daemonization, call them.
     176    ## Currently the only reactor which provides/needs that is KQueueReactor.
     177    if IReactorDaemonize.providedBy(reactor):
     178        reactor.beforeDaemonize()
     179
    160180    if os.fork():   # launch child and...
    161181        os._exit(0) # kill off parent
    162182    os.setsid()
     
    171191                raise
    172192    os.close(null)
    173193
     194    if IReactorDaemonize.providedBy(reactor):
     195        reactor.afterDaemonize()
    174196
    175197
     198
    176199def launchWithName(name):
    177200    if name and name != sys.argv[0]:
    178201        exe = os.path.realpath(sys.executable)
     
    265288        if umask is not None:
    266289            os.umask(umask)
    267290        if daemon:
    268             daemonize()
     291            from twisted.internet import reactor
     292            daemonize(reactor, os)
    269293        if pidfile:
    270294            f = open(pidfile,'wb')
    271295            f.write(str(os.getpid()))
  • twisted/internet/interfaces.py

     
    872872        """
    873873
    874874
     875class IReactorDaemonize(Interface):
     876    """
     877    A reactor which provides hooks that need to be called before and after
     878    daemonization.
     879
     880    Notes:
     881       - This interface SHOULD NOT be called by applications.
     882       - This interface should only be implemented by reactors as a workaround
     883         (in particular, it's implemented currently only by kqueue()).
     884         For details please see the comments on ticket #1918.
     885    """
     886
     887    def beforeDaemonize():
     888        """
     889        Hook to be called immediately before daemonization. No reactor methods
     890        must be called until L{afterDaemonize} is called.
     891
     892        @return: C{None}.
     893        """
     894
     895    def afterDaemonize():
     896        """
     897        Hook to be called immediately after daemonization. This must only be
     898        called after L{beforeDaemonize} had been called previously.
     899
     900        @return: C{None}.
     901        """
     902
     903
    875904class IReactorFDSet(Interface):
    876905    """
    877906    Implement me to be able to use L{IFileDescriptor} type resources.
     
    19011930        @return: a client endpoint
    19021931        @rtype: L{IStreamClientEndpoint}
    19031932        """
    1904 
  • twisted/internet/test/test_fdset.py

     
    286286        reactor = self.buildReactor()
    287287
    288288        name = reactor.__class__.__name__
    289         if name in ('EPollReactor', 'CFReactor'):
     289        if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'):
    290290            # Closing a file descriptor immediately removes it from the epoll
    291291            # set without generating a notification.  That means epollreactor
    292292            # will not call any methods on Victim after the close, so there's
  • twisted/internet/kqreactor.py

     
    44"""
    55A kqueue()/kevent() based implementation of the Twisted main loop.
    66
    7 To install the event loop (and you should do this before any connections,
    8 listeners or connectors are added)::
     7To use this reactor, start your application specifying the kqueue reactor:
    98
    10     | from twisted.internet import kqreactor
    11     | kqreactor.install()
     9   twistd ... --reactor kqueue
    1210
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/}
     11To install the event loop from code (and you should do this before any
     12connections, listeners or connectors are added)::
    1513
     14   from twisted.internet import kqreactor
     15   kqreactor.install()
    1616
     17This implementation depends on Python 2.6 or higher which has kqueue support
     18built in the select module.
    1719
    18 You're going to need to patch PyKqueue::
     20Note, that you should use Python 2.6.5 or higher, since previous implementations
     21of select.kqueue had
    1922
    20     =====================================================
    21     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001
    22     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002
    23     @@ -137,7 +137,7 @@
    24      }
    25      
    26      statichere PyTypeObject KQEvent_Type = {
    27     -  PyObject_HEAD_INIT(NULL)
    28     +  PyObject_HEAD_INIT(&PyType_Type)
    29        0,                             // ob_size
    30        "KQEvent",                     // tp_name
    31        sizeof(KQEventObject),         // tp_basicsize
    32     @@ -291,13 +291,14 @@
    33      
    34        /* Build timespec for timeout */
    35        totimespec.tv_sec = timeout / 1000;
    36     -  totimespec.tv_nsec = (timeout % 1000) * 100000;
    37     +  totimespec.tv_nsec = (timeout % 1000) * 1000000;
    38      
    39        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec);
    40      
    41        /* Make the call */
    42     -
    43     +  Py_BEGIN_ALLOW_THREADS
    44        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec);
    45     +  Py_END_ALLOW_THREADS
    46      
    47        /* Don't need the input event list anymore, so get rid of it */
    48        free (changelist);
    49     @@ -361,7 +362,7 @@
    50      statichere PyTypeObject KQueue_Type = {
    51             /* The ob_type field must be initialized in the module init function
    52              * to be portable to Windows without using C++. */
    53     -   PyObject_HEAD_INIT(NULL)
    54     +   PyObject_HEAD_INIT(&PyType_Type)
    55             0,                  /*ob_size*/
    56             "KQueue",                   /*tp_name*/
    57             sizeof(KQueueObject),       /*tp_basicsize*/
     23   http://bugs.python.org/issue5910
    5824
     25not yet fixed.
    5926"""
    6027
    61 import errno, sys
     28import errno
    6229
    6330from zope.interface import implements
    6431
    65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD
    66 from kqsyscall import kqueue, kevent
     32from select import kqueue, kevent
     33from select import KQ_FILTER_READ, KQ_FILTER_WRITE
     34from select import KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF
    6735
    68 from twisted.internet.interfaces import IReactorFDSet
     36from twisted.internet.interfaces import IReactorFDSet, IReactorDaemonize
    6937
    7038from twisted.python import log, failure
    7139from twisted.internet import main, posixbase
     
    7341
    7442class KQueueReactor(posixbase.PosixReactorBase):
    7543    """
    76     A reactor that uses kqueue(2)/kevent(2).
     44    A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher
     45    which has built in support for kqueue in the select module.
    7746
    7847    @ivar _kq: A L{kqueue} which will be used to check for I/O readiness.
    7948
     
    9564        dispatched to the corresponding L{FileDescriptor} instances in
    9665        C{_selectables}.
    9766    """
    98     implements(IReactorFDSet)
     67    implements(IReactorFDSet, IReactorDaemonize)
    9968
     69
    10070    def __init__(self):
    10171        """
    10272        Initialize kqueue object, file descriptor tracking dictionaries, and the
    10373        base class.
     74
     75        See:
     76            - http://docs.python.org/library/select.html
     77            - www.freebsd.org/cgi/man.cgi?query=kqueue
     78            - people.freebsd.org/~jlemon/papers/kqueue.pdf
    10479        """
    10580        self._kq = kqueue()
    10681        self._reads = {}
     
    10984        posixbase.PosixReactorBase.__init__(self)
    11085
    11186
    112     def _updateRegistration(self, *args):
    113         self._kq.kevent([kevent(*args)], 0, 0)
     87    def _updateRegistration(self, fd, filter, op):
     88        """
     89        Private method for changing kqueue registration on a given FD
     90        filtering for events given filter/op. This will never block and
     91        returns nothing.
     92        """
     93        self._kq.control([kevent(fd, filter, op)], 0, 0)
    11494
     95
     96    def beforeDaemonize(self):
     97        """
     98        Implement L{IReactorDaemonize.beforeDaemonize}.
     99        """
     100        # Twisted-internal method called during daemonization (when application
     101        # is started via twistd). This is called right before the magic double
     102        # forking done for daemonization. We cleanly close the kqueue() and later
     103        # recreate it. This is needed since a) kqueue() are not inherited across
     104        # forks and b) twistd will create the reactor already before daemonization
     105        # (and will also add at least 1 reader to the reactor, an instance of
     106        # twisted.internet.posixbase._UnixWaker).
     107        #
     108        # See: twisted.scripts._twistd_unix.daemonize()
     109        self._kq.close()
     110        self._kq = None
     111
     112
     113    def afterDaemonize(self):
     114        """
     115        Implement L{IReactorDaemonize.afterDaemonize}.
     116        """
     117        # Twisted-internal method called during daemonization. This is called right
     118        # after daemonization and recreates the kqueue() and any readers/writers
     119        # that were added before. Note that you MUST NOT call any reactor methods
     120        # in between beforeDaemonize() and afterDaemonize()!
     121        self._kq = kqueue()
     122        for fd in self._reads:
     123            self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
     124        for fd in self._writes:
     125            self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
     126
     127
    115128    def addReader(self, reader):
    116         """Add a FileDescriptor for notification of data available to read.
    117129        """
     130        Implement L{IReactorFDSet.addReader}.
     131        """
    118132        fd = reader.fileno()
    119133        if fd not in self._reads:
    120             self._selectables[fd] = reader
    121             self._reads[fd] = 1
    122             self._updateRegistration(fd, EVFILT_READ, EV_ADD)
     134            try:
     135                self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
     136            except OSError:
     137                pass
     138            finally:
     139                self._selectables[fd] = reader
     140                self._reads[fd] = 1
    123141
     142
    124143    def addWriter(self, writer):
    125         """Add a FileDescriptor for notification of data available to write.
    126144        """
     145        Implement L{IReactorFDSet.addWriter}.
     146        """
    127147        fd = writer.fileno()
    128148        if fd not in self._writes:
    129             self._selectables[fd] = writer
    130             self._writes[fd] = 1
    131             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD)
     149            try:
     150                self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
     151            except OSError:
     152                pass
     153            finally:
     154                self._selectables[fd] = writer
     155                self._writes[fd] = 1
    132156
     157
    133158    def removeReader(self, reader):
    134         """Remove a Selectable for notification of data available to read.
    135159        """
    136         fd = reader.fileno()
     160        Implement L{IReactorFDSet.removeReader}.
     161        """
     162        wasLost = False
     163        try:
     164            fd = reader.fileno()
     165        except:
     166            fd = -1
     167        if fd == -1:
     168            for fd, fdes in self._selectables.items():
     169                if reader is fdes:
     170                    wasLost = True
     171                    break
     172            else:
     173                return
    137174        if fd in self._reads:
    138175            del self._reads[fd]
    139176            if fd not in self._writes:
    140177                del self._selectables[fd]
    141             self._updateRegistration(fd, EVFILT_READ, EV_DELETE)
     178            if not wasLost:
     179                try:
     180                    self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE)
     181                except OSError:
     182                    pass
    142183
     184
    143185    def removeWriter(self, writer):
    144         """Remove a Selectable for notification of data available to write.
    145186        """
    146         fd = writer.fileno()
     187        Implement L{IReactorFDSet.removeWriter}.
     188        """
     189        wasLost = False
     190        try:
     191            fd = writer.fileno()
     192        except:
     193            fd = -1
     194        if fd == -1:
     195            for fd, fdes in self._selectables.items():
     196                if writer is fdes:
     197                    wasLost = True
     198                    break
     199            else:
     200                return
    147201        if fd in self._writes:
    148202            del self._writes[fd]
    149203            if fd not in self._reads:
    150204                del self._selectables[fd]
    151             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE)
     205            if not wasLost:
     206                try:
     207                    self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE)
     208                except OSError:
     209                    pass
    152210
     211
    153212    def removeAll(self):
    154213        """
    155         Remove all selectables, and return a list of them.
     214        Implement L{IReactorFDSet.removeAll}.
    156215        """
    157216        return self._removeAll(
    158217            [self._selectables[fd] for fd in self._reads],
     
    160219
    161220
    162221    def getReaders(self):
     222        """
     223        Implement L{IReactorFDSet.getReaders}.
     224        """
    163225        return [self._selectables[fd] for fd in self._reads]
    164226
    165227
    166228    def getWriters(self):
     229        """
     230        Implement L{IReactorFDSet.getWriters}.
     231        """
    167232        return [self._selectables[fd] for fd in self._writes]
    168233
    169234
    170235    def doKEvent(self, timeout):
    171         """Poll the kqueue for new events."""
     236        """
     237        Poll the kqueue for new events.
     238        """
    172239        if timeout is None:
    173             timeout = 1000
    174         else:
    175             timeout = int(timeout * 1000) # convert seconds to milliseconds
     240            timeout = 1
    176241
    177242        try:
    178             l = self._kq.kevent([], len(self._selectables), timeout)
     243            l = self._kq.control([], len(self._selectables), timeout)
    179244        except OSError, e:
    180245            if e[0] == errno.EINTR:
    181246                return
    182247            else:
    183248                raise
     249
    184250        _drdw = self._doWriteOrRead
    185251        for event in l:
    186             why = None
    187             fd, filter = event.ident, event.filter
     252            fd = event.ident
    188253            try:
    189254                selectable = self._selectables[fd]
    190255            except KeyError:
    191256                # Handles the infrequent case where one selectable's
    192257                # handler disconnects another.
    193258                continue
    194             log.callWithLogger(selectable, _drdw, selectable, fd, filter)
     259            else:
     260                log.callWithLogger(selectable, _drdw, selectable, fd, event)
    195261
    196     def _doWriteOrRead(self, selectable, fd, filter):
    197         try:
    198             if filter == EVFILT_READ:
    199                 why = selectable.doRead()
    200             if filter == EVFILT_WRITE:
    201                 why = selectable.doWrite()
    202             if not selectable.fileno() == fd:
    203                 why = main.CONNECTION_LOST
    204         except:
    205             why = sys.exc_info()[1]
    206             log.deferr()
    207262
     263    def _doWriteOrRead(self, selectable, fd, event):
     264        """
     265        Private method called when a FD is ready for reading, writing or was
     266        lost. Do the work and raise errors where necessary.
     267        """
     268        why = None
     269        inRead = False
     270        (filter, flags, data, fflags) = (
     271            event.filter, event.flags, event.data, event.fflags)
     272
     273        if flags & KQ_EV_EOF and data and fflags:
     274            why = main.CONNECTION_LOST
     275        else:
     276            try:
     277                if selectable.fileno() == -1:
     278                    inRead = False
     279                    why = posixbase._NO_FILEDESC
     280                else:
     281                   if filter == KQ_FILTER_READ:
     282                       inRead = True
     283                       why = selectable.doRead()
     284                   if filter == KQ_FILTER_WRITE:
     285                       inRead = False
     286                       why = selectable.doWrite()
     287            except:
     288                # Any exception from application code gets logged and will
     289                # cause us to disconnect the selectable.
     290                why = failure.Failure()
     291                log.err(why, "An exception was raised from application code" \
     292                             " while processing a reactor selectable")
     293
    208294        if why:
    209             self.removeReader(selectable)
    210             self.removeWriter(selectable)
    211             selectable.connectionLost(failure.Failure(why))
     295            self._disconnectSelectable(selectable, why, inRead)
    212296
    213297    doIteration = doKEvent
    214298
    215299
    216300def install():
    217     k = KQueueReactor()
    218     main.installReactor(k)
     301    """
     302    Install the kqueue() reactor.
     303    """
     304    p = KQueueReactor()
     305    from twisted.internet.main import installReactor
     306    installReactor(p)
    219307
    220308
    221309__all__ = ["KQueueReactor", "install"]