Ticket #1918: kqueue.5.patch

File kqueue.5.patch, 21.2 KB (added by oberstet, 3 years ago)

Revised patch: unit tests

  • twisted/test/test_twistd.py

     
    2323from zope.interface.verify import verifyObject 
    2424 
    2525from twisted.trial import unittest 
     26from twisted.test.test_process import MockOS 
    2627 
    2728from twisted import plugin 
    2829from twisted.application.service import IServiceMaker 
     
    3435from twisted.python.versions import Version 
    3536from twisted.python.components import Componentized 
    3637from twisted.internet.defer import Deferred 
     38from twisted.internet.interfaces import IReactorDaemonize 
    3739from twisted.python.fakepwd import UserDatabase 
    3840 
    3941try: 
     
    616618        self.runner = UnixApplicationRunner({}) 
    617619 
    618620 
    619     def daemonize(self): 
     621    def daemonize(self, daemonize_reactor, daemonize_os): 
    620622        """ 
    621623        Indicate that daemonization has happened and change the PID so that the 
    622624        value written to the pidfile can be tested in the daemonization case. 
     
    806808 
    807809 
    808810 
     811class FakeNonDaemonizingReactor: 
     812    """ 
     813    A dummy reactor, providing C{beforeDaemonize} and C{afterDaemonize} methods, 
     814    but not announcing this, and logging whether the methods have been called. 
     815 
     816    @ivar _beforeDaemonizeCalled: if C{beforeDaemonize} has been called or not. 
     817    @type _beforeDaemonizeCalled: C{bool} 
     818    @ivar _afterDaemonizeCalled: if C{afterDaemonize} has been called or not. 
     819    @type _afterDaemonizeCalled: C{bool} 
     820    """ 
     821 
     822    def __init__(self): 
     823        self._beforeDaemonizeCalled = False 
     824        self._afterDaemonizeCalled = False 
     825 
     826    def beforeDaemonize(self): 
     827        self._beforeDaemonizeCalled = True 
     828 
     829    def afterDaemonize(self): 
     830        self._afterDaemonizeCalled = True 
     831 
     832 
     833 
     834class FakeDaemonizingReactor(FakeNonDaemonizingReactor): 
     835    """ 
     836    A dummy reactor, providing C{beforeDaemonize} and C{afterDaemonize} methods, 
     837    announcing this, and logging whether the methods have been called. 
     838 
     839    @ivar _beforeDaemonizeCalled: if C{beforeDaemonize} has been called or not. 
     840    @type _beforeDaemonizeCalled: C{bool} 
     841    @ivar _afterDaemonizeCalled: if C{afterDaemonize} has been called or not. 
     842    @type _afterDaemonizeCalled: C{bool} 
     843    """ 
     844 
     845    implements(IReactorDaemonize) 
     846 
     847 
     848 
     849class ReactorDaemonizationTests(unittest.TestCase): 
     850    """ 
     851    Tests for L{_twistd_unix.daemonize} and L{IReactorDaemonize}. 
     852    """ 
     853    if _twistd_unix is None: 
     854        skip = "twistd unix not available" 
     855 
     856 
     857    def test_DaemonizationHooksCalled(self): 
     858        """ 
     859        Check that L{_twistd_unix.daemonize} indeed calls 
     860        L{IReactorDaemonize.beforeDaemonize} and 
     861        L{IReactorDaemonize.afterDaemonize} if the reactor implements 
     862        L{IReactorDaemonize}. 
     863        """ 
     864        test_reactor = FakeDaemonizingReactor() 
     865        test_os = MockOS() 
     866        _twistd_unix.daemonize(test_reactor, test_os) 
     867        self.assertTrue(test_reactor._beforeDaemonizeCalled and \ 
     868                        test_reactor._afterDaemonizeCalled) 
     869 
     870 
     871    def test_DaemonizationHooksNotCalled(self): 
     872        """ 
     873        Check that L{_twistd_unix.daemonize} does NOT call 
     874        L{IReactorDaemonize.beforeDaemonize} or 
     875        L{IReactorDaemonize.afterDaemonize} if the reactor does NOT 
     876        implement L{IReactorDaemonize}. 
     877        """ 
     878        test_reactor = FakeNonDaemonizingReactor() 
     879        test_os = MockOS() 
     880        _twistd_unix.daemonize(test_reactor, test_os) 
     881        self.assertTrue(not test_reactor._beforeDaemonizeCalled and \ 
     882                        not test_reactor._afterDaemonizeCalled) 
     883 
     884 
     885 
    809886class DummyReactor(object): 
    810887    """ 
    811888    A dummy reactor, only providing a C{run} method and checking that it 
  • twisted/topfiles/1918.feature

     
     1The kqueue reactor has been revived. 
  • twisted/scripts/_twistd_unix.py

     
    77from twisted.python import log, syslog, logfile, usage 
    88from twisted.python.util import switchUID, uidFromString, gidFromString 
    99from twisted.application import app, service 
     10from twisted.internet.interfaces import IReactorDaemonize 
    1011from twisted import copyright 
    1112 
    1213 
     
    155156 
    156157 
    157158 
    158 def daemonize(): 
    159     # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16 
    160     if os.fork():   # launch child and... 
    161         os._exit(0) # kill off parent 
    162     os.setsid() 
    163     if os.fork():   # launch child and... 
    164         os._exit(0) # kill off parent again. 
    165     null = os.open('/dev/null', os.O_RDWR) 
     159def daemonize(daemonize_reactor, daemonize_os): 
     160    """ 
     161    Daemonizes the application on Unix. This is done by the usual double 
     162    forking approach. 
     163 
     164    See: 
     165      [1] http://code.activestate.com/recipes/278731/ 
     166      [2] W. Richard Stevens, "Advanced Programming in the Unix Environment", 
     167          1992, Addison-Wesley, ISBN 0-201-56317-7 
     168    """ 
     169 
     170    ## If the reactor requires hooks to be called for daemonization, call them. 
     171    ## Currently the only reactor which provides/needs that is KQueueReactor. 
     172    if IReactorDaemonize.providedBy(daemonize_reactor): 
     173        daemonize_reactor.beforeDaemonize() 
     174 
     175    if daemonize_os.fork():   # launch child and... 
     176        daemonize_os._exit(0) # kill off parent 
     177    daemonize_os.setsid() 
     178    if daemonize_os.fork():   # launch child and... 
     179        daemonize_os._exit(0) # kill off parent again. 
     180    null = daemonize_os.open('/dev/null', daemonize_os.O_RDWR) 
    166181    for i in range(3): 
    167182        try: 
    168             os.dup2(null, i) 
     183            daemonize_os.dup2(null, i) 
    169184        except OSError, e: 
    170185            if e.errno != errno.EBADF: 
    171186                raise 
    172     os.close(null) 
     187    daemonize_os.close(null) 
    173188 
     189    if IReactorDaemonize.providedBy(daemonize_reactor): 
     190        daemonize_reactor.afterDaemonize() 
    174191 
    175192 
     193 
    176194def launchWithName(name): 
    177195    if name and name != sys.argv[0]: 
    178196        exe = os.path.realpath(sys.executable) 
     
    265283        if umask is not None: 
    266284            os.umask(umask) 
    267285        if daemon: 
    268             daemonize() 
     286            from twisted.internet import reactor 
     287            daemonize(reactor, os) 
    269288        if pidfile: 
    270289            f = open(pidfile,'wb') 
    271290            f.write(str(os.getpid())) 
  • twisted/internet/interfaces.py

     
    870870        """ 
    871871 
    872872 
     873class IReactorDaemonize(Interface): 
     874    """ 
     875    A reactor which provides hooks that need to be called before and after 
     876    daemonization. 
     877    """ 
     878 
     879    def beforeDaemonize(): 
     880        """ 
     881        Hook to be called immediately before daemonization. No reactor methods 
     882        must be called until L{afterDaemonize} is called. 
     883 
     884        @return: C{None}. 
     885        """ 
     886 
     887    def afterDaemonize(): 
     888        """ 
     889        Hook to be called immediately after daemonization. This must only be 
     890        called after L{beforeDaemonize} had been called previously. 
     891 
     892        @return: C{None}. 
     893        """ 
     894 
     895 
    873896class IReactorFDSet(Interface): 
    874897    """ 
    875898    Implement me to be able to use L{IFileDescriptor} type resources. 
     
    18991922        @return: a client endpoint 
    19001923        @rtype: L{IStreamClientEndpoint} 
    19011924        """ 
    1902  
  • twisted/internet/test/test_fdset.py

     
    286286        reactor = self.buildReactor() 
    287287 
    288288        name = reactor.__class__.__name__ 
    289         if name in ('EPollReactor', 'CFReactor'): 
     289        if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): 
    290290            # Closing a file descriptor immediately removes it from the epoll 
    291291            # set without generating a notification.  That means epollreactor 
    292292            # will not call any methods on Victim after the close, so there's 
  • twisted/internet/kqreactor.py

     
    44""" 
    55A kqueue()/kevent() based implementation of the Twisted main loop. 
    66 
    7 To install the event loop (and you should do this before any connections, 
    8 listeners or connectors are added):: 
     7To use this reactor, start your application specifying the kqueue reactor: 
    98 
    10     | from twisted.internet import kqreactor 
    11     | kqreactor.install() 
     9   twistd ... --reactor kqueue 
    1210 
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is 
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/} 
     11To install the event loop from code (and you should do this before any 
     12connections, listeners or connectors are added):: 
    1513 
     14   from twisted.internet import kqreactor 
     15   kqreactor.install() 
    1616 
     17This implementation depends on Python 2.6 or higher which has kqueue support 
     18built in the select module. 
    1719 
    18 You're going to need to patch PyKqueue:: 
     20Note, that you should use Python 2.6.5 or higher, since previous implementations 
     21of select.kqueue had 
    1922 
    20     ===================================================== 
    21     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001 
    22     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002 
    23     @@ -137,7 +137,7 @@ 
    24      } 
    25       
    26      statichere PyTypeObject KQEvent_Type = { 
    27     -  PyObject_HEAD_INIT(NULL) 
    28     +  PyObject_HEAD_INIT(&PyType_Type) 
    29        0,                             // ob_size 
    30        "KQEvent",                     // tp_name 
    31        sizeof(KQEventObject),         // tp_basicsize 
    32     @@ -291,13 +291,14 @@ 
    33       
    34        /* Build timespec for timeout */ 
    35        totimespec.tv_sec = timeout / 1000; 
    36     -  totimespec.tv_nsec = (timeout % 1000) * 100000; 
    37     +  totimespec.tv_nsec = (timeout % 1000) * 1000000; 
    38       
    39        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 
    40       
    41        /* Make the call */ 
    42     - 
    43     +  Py_BEGIN_ALLOW_THREADS 
    44        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 
    45     +  Py_END_ALLOW_THREADS 
    46       
    47        /* Don't need the input event list anymore, so get rid of it */ 
    48        free (changelist); 
    49     @@ -361,7 +362,7 @@ 
    50      statichere PyTypeObject KQueue_Type = { 
    51             /* The ob_type field must be initialized in the module init function 
    52              * to be portable to Windows without using C++. */ 
    53     -   PyObject_HEAD_INIT(NULL) 
    54     +   PyObject_HEAD_INIT(&PyType_Type) 
    55             0,                  /*ob_size*/ 
    56             "KQueue",                   /*tp_name*/ 
    57             sizeof(KQueueObject),       /*tp_basicsize*/ 
     23   http://bugs.python.org/issue5910 
    5824 
     25not yet fixed. 
    5926""" 
    6027 
    61 import errno, sys 
     28import errno 
    6229 
    6330from zope.interface import implements 
    6431 
    65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD 
    66 from kqsyscall import kqueue, kevent 
     32from select import kqueue, kevent 
     33from select import KQ_FILTER_READ, KQ_FILTER_WRITE, \ 
     34                   KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF 
    6735 
    68 from twisted.internet.interfaces import IReactorFDSet 
     36from twisted.internet.interfaces import IReactorFDSet, IReactorDaemonize 
    6937 
    7038from twisted.python import log, failure 
    7139from twisted.internet import main, posixbase 
     
    7341 
    7442class KQueueReactor(posixbase.PosixReactorBase): 
    7543    """ 
    76     A reactor that uses kqueue(2)/kevent(2). 
     44    A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher 
     45    which has built in support for kqueue in the select module. 
    7746 
    7847    @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. 
    7948 
     
    9564        dispatched to the corresponding L{FileDescriptor} instances in 
    9665        C{_selectables}. 
    9766    """ 
    98     implements(IReactorFDSet) 
     67    implements(IReactorFDSet, IReactorDaemonize) 
    9968 
     69 
    10070    def __init__(self): 
    10171        """ 
    10272        Initialize kqueue object, file descriptor tracking dictionaries, and the 
    10373        base class. 
     74 
     75        See: 
     76            - http://docs.python.org/library/select.html 
     77            - www.freebsd.org/cgi/man.cgi?query=kqueue 
     78            - people.freebsd.org/~jlemon/papers/kqueue.pdf 
    10479        """ 
    10580        self._kq = kqueue() 
    10681        self._reads = {} 
     
    10984        posixbase.PosixReactorBase.__init__(self) 
    11085 
    11186 
    112     def _updateRegistration(self, *args): 
    113         self._kq.kevent([kevent(*args)], 0, 0) 
     87    def _updateRegistration(self, fd, filter, op): 
     88        """ 
     89        Private method for changing kqueue registration on a given FD 
     90        filtering for events given filter/op. This will never block and 
     91        returns nothing. 
     92        """ 
     93        self._kq.control([kevent(fd, filter, op)], 0, 0) 
    11494 
     95 
     96    def beforeDaemonize(self): 
     97        """ 
     98        Implement L{IReactorDaemonize.beforeDaemonize}. 
     99        """ 
     100        # Twisted-internal method called during daemonization (when application 
     101        # is started via twistd). This is called right before the magic double 
     102        # forking done for daemonization. We cleanly close the kqueue() and later 
     103        # recreate it. This is needed since a) kqueue() are not inherited across 
     104        # forks and b) twistd will create the reactor already before daemonization 
     105        # (and will also add at least 1 reader to the reactor, an instance of 
     106        # twisted.internet.posixbase._UnixWaker). 
     107        # 
     108        # See: twisted.scripts._twistd_unix.daemonize() 
     109        self._kq.close() 
     110        self._kq = None 
     111 
     112 
     113    def afterDaemonize(self): 
     114        """ 
     115        Implement L{IReactorDaemonize.afterDaemonize}. 
     116        """ 
     117        # Twisted-internal method called during daemonization. This is called right 
     118        # after daemonization and recreates the kqueue() and any readers/writers 
     119        # that were added before. Note that you MUST NOT call any reactor methods 
     120        # in between beforeDaemonize() and afterDaemonize()! 
     121        self._kq = kqueue() 
     122        for fd in self._reads: 
     123            self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 
     124        for fd in self._writes: 
     125            self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 
     126 
     127 
    115128    def addReader(self, reader): 
    116         """Add a FileDescriptor for notification of data available to read. 
    117129        """ 
     130        Implement L{IReactorFDSet.addReader}. 
     131        """ 
    118132        fd = reader.fileno() 
    119133        if fd not in self._reads: 
    120             self._selectables[fd] = reader 
    121             self._reads[fd] = 1 
    122             self._updateRegistration(fd, EVFILT_READ, EV_ADD) 
     134            try: 
     135                self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 
     136            except OSError: 
     137                pass 
     138            finally: 
     139                self._selectables[fd] = reader 
     140                self._reads[fd] = 1 
    123141 
     142 
    124143    def addWriter(self, writer): 
    125         """Add a FileDescriptor for notification of data available to write. 
    126144        """ 
     145        Implement L{IReactorFDSet.addWriter}. 
     146        """ 
    127147        fd = writer.fileno() 
    128148        if fd not in self._writes: 
    129             self._selectables[fd] = writer 
    130             self._writes[fd] = 1 
    131             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 
     149            try: 
     150                self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 
     151            except OSError: 
     152                pass 
     153            finally: 
     154                self._selectables[fd] = writer 
     155                self._writes[fd] = 1 
    132156 
     157 
    133158    def removeReader(self, reader): 
    134         """Remove a Selectable for notification of data available to read. 
    135159        """ 
    136         fd = reader.fileno() 
     160        Implement L{IReactorFDSet.removeReader}. 
     161        """ 
     162        wasLost = False 
     163        try: 
     164            fd = reader.fileno() 
     165        except: 
     166            fd = -1 
     167        if fd == -1: 
     168            for fd, fdes in self._selectables.items(): 
     169                if reader is fdes: 
     170                    wasLost = True 
     171                    break 
     172            else: 
     173                return 
    137174        if fd in self._reads: 
    138175            del self._reads[fd] 
    139176            if fd not in self._writes: 
    140177                del self._selectables[fd] 
    141             self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 
     178            if not wasLost: 
     179                try: 
     180                    self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) 
     181                except OSError: 
     182                    pass 
    142183 
     184 
    143185    def removeWriter(self, writer): 
    144         """Remove a Selectable for notification of data available to write. 
    145186        """ 
    146         fd = writer.fileno() 
     187        Implement L{IReactorFDSet.removeWriter}. 
     188        """ 
     189        wasLost = False 
     190        try: 
     191            fd = writer.fileno() 
     192        except: 
     193            fd = -1 
     194        if fd == -1: 
     195            for fd, fdes in self._selectables.items(): 
     196                if writer is fdes: 
     197                    wasLost = True 
     198                    break 
     199            else: 
     200                return 
    147201        if fd in self._writes: 
    148202            del self._writes[fd] 
    149203            if fd not in self._reads: 
    150204                del self._selectables[fd] 
    151             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 
     205            if not wasLost: 
     206                try: 
     207                    self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) 
     208                except OSError: 
     209                    pass 
    152210 
     211 
    153212    def removeAll(self): 
    154213        """ 
    155         Remove all selectables, and return a list of them. 
     214        Implement L{IReactorFDSet.removeAll}. 
    156215        """ 
    157216        return self._removeAll( 
    158217            [self._selectables[fd] for fd in self._reads], 
     
    160219 
    161220 
    162221    def getReaders(self): 
     222        """ 
     223        Implement L{IReactorFDSet.getReaders}. 
     224        """ 
    163225        return [self._selectables[fd] for fd in self._reads] 
    164226 
    165227 
    166228    def getWriters(self): 
     229        """ 
     230        Implement L{IReactorFDSet.getWriters}. 
     231        """ 
    167232        return [self._selectables[fd] for fd in self._writes] 
    168233 
    169234 
    170235    def doKEvent(self, timeout): 
    171         """Poll the kqueue for new events.""" 
     236        """ 
     237        Poll the kqueue for new events. 
     238        """ 
    172239        if timeout is None: 
    173             timeout = 1000 
    174         else: 
    175             timeout = int(timeout * 1000) # convert seconds to milliseconds 
     240            timeout = 1 
    176241 
    177242        try: 
    178             l = self._kq.kevent([], len(self._selectables), timeout) 
     243            l = self._kq.control([], len(self._selectables), timeout) 
    179244        except OSError, e: 
    180245            if e[0] == errno.EINTR: 
    181246                return 
    182247            else: 
    183248                raise 
     249 
    184250        _drdw = self._doWriteOrRead 
    185251        for event in l: 
    186             why = None 
    187             fd, filter = event.ident, event.filter 
     252            fd = event.ident 
    188253            try: 
    189254                selectable = self._selectables[fd] 
    190255            except KeyError: 
    191256                # Handles the infrequent case where one selectable's 
    192257                # handler disconnects another. 
    193258                continue 
    194             log.callWithLogger(selectable, _drdw, selectable, fd, filter) 
     259            else: 
     260                log.callWithLogger(selectable, _drdw, selectable, fd, event) 
    195261 
    196     def _doWriteOrRead(self, selectable, fd, filter): 
    197         try: 
    198             if filter == EVFILT_READ: 
    199                 why = selectable.doRead() 
    200             if filter == EVFILT_WRITE: 
    201                 why = selectable.doWrite() 
    202             if not selectable.fileno() == fd: 
    203                 why = main.CONNECTION_LOST 
    204         except: 
    205             why = sys.exc_info()[1] 
    206             log.deferr() 
    207262 
     263    def _doWriteOrRead(self, selectable, fd, event): 
     264        """ 
     265        Private method called when a FD is ready for reading, writing or was 
     266        lost. Do the work and raise errors where necessary. 
     267        """ 
     268        why = None 
     269        inRead = False 
     270        (filter, flags, data, fflags) = ( 
     271            event.filter, event.flags, event.data, event.fflags) 
     272 
     273        if flags & KQ_EV_EOF and data and fflags: 
     274            why = main.CONNECTION_LOST 
     275        else: 
     276            try: 
     277                if selectable.fileno() == -1: 
     278                    inRead = False 
     279                    why = posixbase._NO_FILEDESC 
     280                else: 
     281                   if filter == KQ_FILTER_READ: 
     282                       inRead = True 
     283                       why = selectable.doRead() 
     284                   if filter == KQ_FILTER_WRITE: 
     285                       inRead = False 
     286                       why = selectable.doWrite() 
     287            except: 
     288                # Any exception from application code gets logged and will 
     289                # cause us to disconnect the selectable. 
     290                why = failure.Failure() 
     291                log.err(why) 
     292 
    208293        if why: 
    209             self.removeReader(selectable) 
    210             self.removeWriter(selectable) 
    211             selectable.connectionLost(failure.Failure(why)) 
     294            self._disconnectSelectable(selectable, why, inRead) 
    212295 
    213296    doIteration = doKEvent 
    214297 
    215298 
    216299def install(): 
    217     k = KQueueReactor() 
    218     main.installReactor(k) 
     300    """ 
     301    Install the kqueue() reactor. 
     302    """ 
     303    p = KQueueReactor() 
     304    from twisted.internet.main import installReactor 
     305    installReactor(p) 
    219306 
    220307 
    221308__all__ = ["KQueueReactor", "install"]