Ticket #1918: kqueue.4.patch

File kqueue.4.patch, 16.5 KB (added by oberstet, 2 years ago)

Revised patch: docstring, IReactorDaemonize

  • twisted/scripts/_twistd_unix.py

     
    77from twisted.python import log, syslog, logfile, usage 
    88from twisted.python.util import switchUID, uidFromString, gidFromString 
    99from twisted.application import app, service 
     10from twisted.internet.interfaces import IReactorDaemonize 
    1011from twisted import copyright 
    1112 
    1213 
     
    156157 
    157158 
    158159def daemonize(): 
    159     # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16 
     160    """ 
     161    Daemonizes the application on Unix. This is done by the usual double 
     162    forking approach. 
     163 
     164    See: 
     165      [1] http://code.activestate.com/recipes/278731/ 
     166      [2] W. Richard Stevens, "Advanced Programming in the Unix Environment", 
     167          1992, Addison-Wesley, ISBN 0-201-56317-7 
     168    """ 
     169 
     170    ## If the reactor requires hooks to be called for daemonization, call them. 
     171    ## Currently the only reactor which provides/needs that is KQueueReactor. 
     172    from twisted.internet import reactor 
     173    if IReactorDaemonize.providedBy(reactor): 
     174        reactor.beforeDaemonize() 
     175 
    160176    if os.fork():   # launch child and... 
    161177        os._exit(0) # kill off parent 
    162178    os.setsid() 
     
    171187                raise 
    172188    os.close(null) 
    173189 
     190    if IReactorDaemonize.providedBy(reactor): 
     191        reactor.afterDaemonize() 
    174192 
    175193 
     194 
    176195def launchWithName(name): 
    177196    if name and name != sys.argv[0]: 
    178197        exe = os.path.realpath(sys.executable) 
  • twisted/internet/interfaces.py

     
    870870        """ 
    871871 
    872872 
     873class IReactorDaemonize(Interface): 
     874    """ 
     875    A reactor which provides hooks that need to be called before and after 
     876    daemonization. 
     877    """ 
     878 
     879    def beforeDaemonize(): 
     880        """ 
     881        Hook to be called immediately before daemonization. No reactor methods 
     882        must be called until L{afterDaemonize} is called. 
     883 
     884        @return: C{None}. 
     885        """ 
     886 
     887    def afterDaemonize(): 
     888        """ 
     889        Hook to be called immediately after daemonization. This must only be 
     890        called after L{beforeDaemonize} had been called previously. 
     891 
     892        @return: C{None}. 
     893        """ 
     894 
     895 
    873896class IReactorFDSet(Interface): 
    874897    """ 
    875898    Implement me to be able to use L{IFileDescriptor} type resources. 
     
    18991922        @return: a client endpoint 
    19001923        @rtype: L{IStreamClientEndpoint} 
    19011924        """ 
    1902  
  • twisted/internet/test/test_fdset.py

     
    286286        reactor = self.buildReactor() 
    287287 
    288288        name = reactor.__class__.__name__ 
    289         if name in ('EPollReactor', 'CFReactor'): 
     289        if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): 
    290290            # Closing a file descriptor immediately removes it from the epoll 
    291291            # set without generating a notification.  That means epollreactor 
    292292            # will not call any methods on Victim after the close, so there's 
  • twisted/internet/kqreactor.py

     
    44""" 
    55A kqueue()/kevent() based implementation of the Twisted main loop. 
    66 
    7 To install the event loop (and you should do this before any connections, 
    8 listeners or connectors are added):: 
     7To use this reactor, start your application specifying the kqueue reactor: 
    98 
    10     | from twisted.internet import kqreactor 
    11     | kqreactor.install() 
     9   twistd ... --reactor kqueue 
    1210 
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is 
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/} 
     11To install the event loop from code (and you should do this before any 
     12connections, listeners or connectors are added):: 
    1513 
     14   from twisted.internet import kqreactor 
     15   kqreactor.install() 
    1616 
     17This implementation depends on Python 2.6 or higher which has kqueue support 
     18built in the select module. 
    1719 
    18 You're going to need to patch PyKqueue:: 
     20Note, that you should use Python 2.6.5 or higher, since previous implementations 
     21of select.kqueue had 
    1922 
    20     ===================================================== 
    21     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001 
    22     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002 
    23     @@ -137,7 +137,7 @@ 
    24      } 
    25       
    26      statichere PyTypeObject KQEvent_Type = { 
    27     -  PyObject_HEAD_INIT(NULL) 
    28     +  PyObject_HEAD_INIT(&PyType_Type) 
    29        0,                             // ob_size 
    30        "KQEvent",                     // tp_name 
    31        sizeof(KQEventObject),         // tp_basicsize 
    32     @@ -291,13 +291,14 @@ 
    33       
    34        /* Build timespec for timeout */ 
    35        totimespec.tv_sec = timeout / 1000; 
    36     -  totimespec.tv_nsec = (timeout % 1000) * 100000; 
    37     +  totimespec.tv_nsec = (timeout % 1000) * 1000000; 
    38       
    39        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 
    40       
    41        /* Make the call */ 
    42     - 
    43     +  Py_BEGIN_ALLOW_THREADS 
    44        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 
    45     +  Py_END_ALLOW_THREADS 
    46       
    47        /* Don't need the input event list anymore, so get rid of it */ 
    48        free (changelist); 
    49     @@ -361,7 +362,7 @@ 
    50      statichere PyTypeObject KQueue_Type = { 
    51             /* The ob_type field must be initialized in the module init function 
    52              * to be portable to Windows without using C++. */ 
    53     -   PyObject_HEAD_INIT(NULL) 
    54     +   PyObject_HEAD_INIT(&PyType_Type) 
    55             0,                  /*ob_size*/ 
    56             "KQueue",                   /*tp_name*/ 
    57             sizeof(KQueueObject),       /*tp_basicsize*/ 
     23   http://bugs.python.org/issue5910 
    5824 
     25not yet fixed. 
    5926""" 
    6027 
    61 import errno, sys 
     28import errno 
    6229 
    6330from zope.interface import implements 
    6431 
    65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD 
    66 from kqsyscall import kqueue, kevent 
     32from select import kqueue, kevent 
     33from select import KQ_FILTER_READ, KQ_FILTER_WRITE, \ 
     34                   KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF 
    6735 
    68 from twisted.internet.interfaces import IReactorFDSet 
     36from twisted.internet.interfaces import IReactorFDSet, IReactorDaemonize 
    6937 
    7038from twisted.python import log, failure 
    7139from twisted.internet import main, posixbase 
     
    7341 
    7442class KQueueReactor(posixbase.PosixReactorBase): 
    7543    """ 
    76     A reactor that uses kqueue(2)/kevent(2). 
     44    A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher 
     45    which has built in support for kqueue in the select module. 
    7746 
    7847    @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. 
    7948 
     
    9564        dispatched to the corresponding L{FileDescriptor} instances in 
    9665        C{_selectables}. 
    9766    """ 
    98     implements(IReactorFDSet) 
     67    implements(IReactorFDSet, IReactorDaemonize) 
    9968 
     69 
    10070    def __init__(self): 
    10171        """ 
    10272        Initialize kqueue object, file descriptor tracking dictionaries, and the 
    10373        base class. 
     74 
     75        See: 
     76            - http://docs.python.org/library/select.html 
     77            - www.freebsd.org/cgi/man.cgi?query=kqueue 
     78            - people.freebsd.org/~jlemon/papers/kqueue.pdf 
    10479        """ 
    10580        self._kq = kqueue() 
    10681        self._reads = {} 
     
    10984        posixbase.PosixReactorBase.__init__(self) 
    11085 
    11186 
    112     def _updateRegistration(self, *args): 
    113         self._kq.kevent([kevent(*args)], 0, 0) 
     87    def _updateRegistration(self, fd, filter, op): 
     88        """ 
     89        Private method for changing kqueue registration on a given FD 
     90        filtering for events given filter/op. This will never block and 
     91        returns nothing. 
     92        """ 
     93        self._kq.control([kevent(fd, filter, op)], 0, 0) 
    11494 
     95 
     96    def beforeDaemonize(self): 
     97        """ 
     98        Implement L{IReactorDaemonize.beforeDaemonize}. 
     99        """ 
     100        # Twisted-internal method called during daemonization (when application 
     101        # is started via twistd). This is called right before the magic double 
     102        # forking done for daemonization. We cleanly close the kqueue() and later 
     103        # recreate it. This is needed since a) kqueue() are not inherited across 
     104        # forks and b) twistd will create the reactor already before daemonization 
     105        # (and will also add at least 1 reader to the reactor, an instance of 
     106        # twisted.internet.posixbase._UnixWaker). 
     107        # 
     108        # See: twisted.scripts._twistd_unix.daemonize() 
     109        self._kq.close() 
     110        self._kq = None 
     111 
     112 
     113    def afterDaemonize(self): 
     114        """ 
     115        Implement L{IReactorDaemonize.afterDaemonize}. 
     116        """ 
     117        # Twisted-internal method called during daemonization. This is called right 
     118        # after daemonization and recreates the kqueue() and any readers/writers 
     119        # that were added before. Note that you MUST NOT call any reactor methods 
     120        # in between beforeDaemonize() and afterDaemonize()! 
     121        self._kq = kqueue() 
     122        for fd in self._reads: 
     123            self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 
     124        for fd in self._writes: 
     125            self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 
     126 
     127 
    115128    def addReader(self, reader): 
    116         """Add a FileDescriptor for notification of data available to read. 
    117129        """ 
     130        Implement L{IReactorFDSet.addReader}. 
     131        """ 
    118132        fd = reader.fileno() 
    119133        if fd not in self._reads: 
    120             self._selectables[fd] = reader 
    121             self._reads[fd] = 1 
    122             self._updateRegistration(fd, EVFILT_READ, EV_ADD) 
     134            try: 
     135                self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 
     136            except OSError: 
     137                pass 
     138            finally: 
     139                self._selectables[fd] = reader 
     140                self._reads[fd] = 1 
    123141 
     142 
    124143    def addWriter(self, writer): 
    125         """Add a FileDescriptor for notification of data available to write. 
    126144        """ 
     145        Implement L{IReactorFDSet.addWriter}. 
     146        """ 
    127147        fd = writer.fileno() 
    128148        if fd not in self._writes: 
    129             self._selectables[fd] = writer 
    130             self._writes[fd] = 1 
    131             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 
     149            try: 
     150                self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 
     151            except OSError: 
     152                pass 
     153            finally: 
     154                self._selectables[fd] = writer 
     155                self._writes[fd] = 1 
    132156 
     157 
    133158    def removeReader(self, reader): 
    134         """Remove a Selectable for notification of data available to read. 
    135159        """ 
    136         fd = reader.fileno() 
     160        Implement L{IReactorFDSet.removeReader}. 
     161        """ 
     162        wasLost = False 
     163        try: 
     164            fd = reader.fileno() 
     165        except: 
     166            fd = -1 
     167        if fd == -1: 
     168            for fd, fdes in self._selectables.items(): 
     169                if reader is fdes: 
     170                    wasLost = True 
     171                    break 
     172            else: 
     173                return 
    137174        if fd in self._reads: 
    138175            del self._reads[fd] 
    139176            if fd not in self._writes: 
    140177                del self._selectables[fd] 
    141             self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 
     178            if not wasLost: 
     179                try: 
     180                    self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) 
     181                except OSError: 
     182                    pass 
    142183 
     184 
    143185    def removeWriter(self, writer): 
    144         """Remove a Selectable for notification of data available to write. 
    145186        """ 
    146         fd = writer.fileno() 
     187        Implement L{IReactorFDSet.removeWriter}. 
     188        """ 
     189        wasLost = False 
     190        try: 
     191            fd = writer.fileno() 
     192        except: 
     193            fd = -1 
     194        if fd == -1: 
     195            for fd, fdes in self._selectables.items(): 
     196                if writer is fdes: 
     197                    wasLost = True 
     198                    break 
     199            else: 
     200                return 
    147201        if fd in self._writes: 
    148202            del self._writes[fd] 
    149203            if fd not in self._reads: 
    150204                del self._selectables[fd] 
    151             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 
     205            if not wasLost: 
     206                try: 
     207                    self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) 
     208                except OSError: 
     209                    pass 
    152210 
     211 
    153212    def removeAll(self): 
    154213        """ 
    155         Remove all selectables, and return a list of them. 
     214        Implement L{IReactorFDSet.removeAll}. 
    156215        """ 
    157216        return self._removeAll( 
    158217            [self._selectables[fd] for fd in self._reads], 
     
    160219 
    161220 
    162221    def getReaders(self): 
     222        """ 
     223        Implement L{IReactorFDSet.getReaders}. 
     224        """ 
    163225        return [self._selectables[fd] for fd in self._reads] 
    164226 
    165227 
    166228    def getWriters(self): 
     229        """ 
     230        Implement L{IReactorFDSet.getWriters}. 
     231        """ 
    167232        return [self._selectables[fd] for fd in self._writes] 
    168233 
    169234 
    170235    def doKEvent(self, timeout): 
    171         """Poll the kqueue for new events.""" 
     236        """ 
     237        Poll the kqueue for new events. 
     238        """ 
    172239        if timeout is None: 
    173             timeout = 1000 
    174         else: 
    175             timeout = int(timeout * 1000) # convert seconds to milliseconds 
     240            timeout = 1 
    176241 
    177242        try: 
    178             l = self._kq.kevent([], len(self._selectables), timeout) 
     243            l = self._kq.control([], len(self._selectables), timeout) 
    179244        except OSError, e: 
    180245            if e[0] == errno.EINTR: 
    181246                return 
    182247            else: 
    183248                raise 
     249 
    184250        _drdw = self._doWriteOrRead 
    185251        for event in l: 
    186             why = None 
    187             fd, filter = event.ident, event.filter 
     252            fd = event.ident 
    188253            try: 
    189254                selectable = self._selectables[fd] 
    190255            except KeyError: 
    191256                # Handles the infrequent case where one selectable's 
    192257                # handler disconnects another. 
    193258                continue 
    194             log.callWithLogger(selectable, _drdw, selectable, fd, filter) 
     259            else: 
     260                log.callWithLogger(selectable, _drdw, selectable, fd, event) 
    195261 
    196     def _doWriteOrRead(self, selectable, fd, filter): 
    197         try: 
    198             if filter == EVFILT_READ: 
    199                 why = selectable.doRead() 
    200             if filter == EVFILT_WRITE: 
    201                 why = selectable.doWrite() 
    202             if not selectable.fileno() == fd: 
    203                 why = main.CONNECTION_LOST 
    204         except: 
    205             why = sys.exc_info()[1] 
    206             log.deferr() 
    207262 
     263    def _doWriteOrRead(self, selectable, fd, event): 
     264        """ 
     265        Private method called when a FD is ready for reading, writing or was 
     266        lost. Do the work and raise errors where necessary. 
     267        """ 
     268        why = None 
     269        inRead = False 
     270        (filter, flags, data, fflags) = ( 
     271            event.filter, event.flags, event.data, event.fflags) 
     272 
     273        if flags & KQ_EV_EOF and data and fflags: 
     274            why = main.CONNECTION_LOST 
     275        else: 
     276            try: 
     277                if selectable.fileno() == -1: 
     278                    inRead = False 
     279                    why = posixbase._NO_FILEDESC 
     280                else: 
     281                   if filter == KQ_FILTER_READ: 
     282                       inRead = True 
     283                       why = selectable.doRead() 
     284                   if filter == KQ_FILTER_WRITE: 
     285                       inRead = False 
     286                       why = selectable.doWrite() 
     287            except: 
     288                # Any exception from application code gets logged and will 
     289                # cause us to disconnect the selectable. 
     290                why = failure.Failure() 
     291                log.err(why) 
     292 
    208293        if why: 
    209             self.removeReader(selectable) 
    210             self.removeWriter(selectable) 
    211             selectable.connectionLost(failure.Failure(why)) 
     294            self._disconnectSelectable(selectable, why, inRead) 
    212295 
    213296    doIteration = doKEvent 
    214297 
    215298 
    216299def install(): 
    217     k = KQueueReactor() 
    218     main.installReactor(k) 
     300    """ 
     301    Install the kqueue() reactor. 
     302    """ 
     303    p = KQueueReactor() 
     304    from twisted.internet.main import installReactor 
     305    installReactor(p) 
    219306 
    220307 
    221308__all__ = ["KQueueReactor", "install"] 
  • 1918.feature

     
     1The kqueue reactor has been revived. (#1918)