Ticket #1918: kqueue.3.patch

File kqueue.3.patch, 14.3 KB (added by oberstet, 2 years ago)

Revised patch : daemonization, correct news file

  • twisted/scripts/_twistd_unix.py

     
    156156 
    157157 
    158158def daemonize(): 
    159     # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16 
     159    # See: 
     160    # http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16 
     161    # http://code.activestate.com/recipes/278731/ 
     162    from twisted.internet import reactor 
     163    if hasattr(reactor, '_beforeFork') and callable(reactor._beforeFork): 
     164        reactor._beforeFork() 
    160165    if os.fork():   # launch child and... 
    161166        os._exit(0) # kill off parent 
    162167    os.setsid() 
     
    170175            if e.errno != errno.EBADF: 
    171176                raise 
    172177    os.close(null) 
     178    if hasattr(reactor, '_afterFork') and callable(reactor._afterFork): 
     179        reactor._afterFork() 
    173180 
    174181 
    175182 
  • twisted/internet/test/test_fdset.py

     
    286286        reactor = self.buildReactor() 
    287287 
    288288        name = reactor.__class__.__name__ 
    289         if name in ('EPollReactor', 'CFReactor'): 
     289        if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): 
    290290            # Closing a file descriptor immediately removes it from the epoll 
    291291            # set without generating a notification.  That means epollreactor 
    292292            # will not call any methods on Victim after the close, so there's 
  • twisted/internet/kqreactor.py

     
    44""" 
    55A kqueue()/kevent() based implementation of the Twisted main loop. 
    66 
    7 To install the event loop (and you should do this before any connections, 
    8 listeners or connectors are added):: 
     7To use this reactor, start your application specifying the kqueue reactor: 
    98 
    10     | from twisted.internet import kqreactor 
    11     | kqreactor.install() 
     9   twistd ... --reactor kqueue 
    1210 
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is 
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/} 
     11To install the event loop from code (and you should do this before any 
     12connections, listeners or connectors are added):: 
    1513 
     14   from twisted.internet import kqreactor 
     15   kqreactor.install() 
    1616 
     17This implementation depends on Python 2.6 or higher which has kqueue support 
     18built in the select module. 
    1719 
    18 You're going to need to patch PyKqueue:: 
     20Note, that you should use Python 2.6.5 or higher, since previous implementations 
     21of select.kqueue had 
    1922 
    20     ===================================================== 
    21     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001 
    22     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002 
    23     @@ -137,7 +137,7 @@ 
    24      } 
    25       
    26      statichere PyTypeObject KQEvent_Type = { 
    27     -  PyObject_HEAD_INIT(NULL) 
    28     +  PyObject_HEAD_INIT(&PyType_Type) 
    29        0,                             // ob_size 
    30        "KQEvent",                     // tp_name 
    31        sizeof(KQEventObject),         // tp_basicsize 
    32     @@ -291,13 +291,14 @@ 
    33       
    34        /* Build timespec for timeout */ 
    35        totimespec.tv_sec = timeout / 1000; 
    36     -  totimespec.tv_nsec = (timeout % 1000) * 100000; 
    37     +  totimespec.tv_nsec = (timeout % 1000) * 1000000; 
    38       
    39        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 
    40       
    41        /* Make the call */ 
    42     - 
    43     +  Py_BEGIN_ALLOW_THREADS 
    44        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 
    45     +  Py_END_ALLOW_THREADS 
    46       
    47        /* Don't need the input event list anymore, so get rid of it */ 
    48        free (changelist); 
    49     @@ -361,7 +362,7 @@ 
    50      statichere PyTypeObject KQueue_Type = { 
    51             /* The ob_type field must be initialized in the module init function 
    52              * to be portable to Windows without using C++. */ 
    53     -   PyObject_HEAD_INIT(NULL) 
    54     +   PyObject_HEAD_INIT(&PyType_Type) 
    55             0,                  /*ob_size*/ 
    56             "KQueue",                   /*tp_name*/ 
    57             sizeof(KQueueObject),       /*tp_basicsize*/ 
     23   http://bugs.python.org/issue5910 
    5824 
     25not yet fixed. 
    5926""" 
    6027 
    61 import errno, sys 
     28import errno 
    6229 
    6330from zope.interface import implements 
    6431 
    65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD 
    66 from kqsyscall import kqueue, kevent 
     32from select import kqueue, kevent 
     33from select import KQ_FILTER_READ, KQ_FILTER_WRITE, \ 
     34                   KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF 
    6735 
    6836from twisted.internet.interfaces import IReactorFDSet 
    6937 
     
    7341 
    7442class KQueueReactor(posixbase.PosixReactorBase): 
    7543    """ 
    76     A reactor that uses kqueue(2)/kevent(2). 
     44    A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher 
     45    which has built in support for kqueue in the select module. 
    7746 
    7847    @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. 
    7948 
     
    9766    """ 
    9867    implements(IReactorFDSet) 
    9968 
     69 
    10070    def __init__(self): 
    10171        """ 
    10272        Initialize kqueue object, file descriptor tracking dictionaries, and the 
    10373        base class. 
     74 
     75        See: 
     76            - http://docs.python.org/library/select.html 
     77            - www.freebsd.org/cgi/man.cgi?query=kqueue 
     78            - people.freebsd.org/~jlemon/papers/kqueue.pdf 
    10479        """ 
    10580        self._kq = kqueue() 
    10681        self._reads = {} 
     
    10984        posixbase.PosixReactorBase.__init__(self) 
    11085 
    11186 
    112     def _updateRegistration(self, *args): 
    113         self._kq.kevent([kevent(*args)], 0, 0) 
     87    def _updateRegistration(self, fd, filter, op): 
     88        """ 
     89        Private method for changing kqueue registration on a given FD 
     90        filtering for events given filter/op. This will never block and 
     91        returns nothing. 
     92        """ 
     93        self._kq.control([kevent(fd, filter, op)], 0, 0) 
    11494 
     95 
     96    def _beforeFork(self): 
     97        """ 
     98        Twisted-internal method called during daemonization (when application 
     99        is started via twistd). This is called right before the magic double 
     100        forking done for daemonization. We cleanly close the kqueue() and later 
     101        recreate it. This is needed since a) kqueue() are not inherited across 
     102        forks and b) twistd will create the reactor already before daemonization 
     103        (and will also add at least 1 reader to the reactor, an instance of 
     104        twisted.internet.posixbase._UnixWaker). 
     105 
     106        See: twisted.scripts._twistd_unix.daemonize() 
     107        """ 
     108        self._kq.close() 
     109        self._kq = None 
     110 
     111 
     112    def _afterFork(self): 
     113        """ 
     114        Twisted-internal method called during daemonization. This is called right 
     115        after daemonization and recreates the kqueue() and any readers/writers 
     116        that were added before. Note that you MUST NOT call any reactor methods 
     117        in between _beforeFork and _afterFork! 
     118        """ 
     119        self._kq = kqueue() 
     120        for fd in self._reads: 
     121            self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 
     122        for fd in self._writes: 
     123            self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 
     124 
     125 
    115126    def addReader(self, reader): 
    116         """Add a FileDescriptor for notification of data available to read. 
    117127        """ 
     128        Implement L{IReactorFDSet.addReader}. 
     129        """ 
    118130        fd = reader.fileno() 
    119131        if fd not in self._reads: 
    120             self._selectables[fd] = reader 
    121             self._reads[fd] = 1 
    122             self._updateRegistration(fd, EVFILT_READ, EV_ADD) 
     132            try: 
     133                self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 
     134            except OSError: 
     135                pass 
     136            finally: 
     137                self._selectables[fd] = reader 
     138                self._reads[fd] = 1 
    123139 
     140 
    124141    def addWriter(self, writer): 
    125         """Add a FileDescriptor for notification of data available to write. 
    126142        """ 
     143        Implement L{IReactorFDSet.addWriter}. 
     144        """ 
    127145        fd = writer.fileno() 
    128146        if fd not in self._writes: 
    129             self._selectables[fd] = writer 
    130             self._writes[fd] = 1 
    131             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 
     147            try: 
     148                self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 
     149            except OSError: 
     150                pass 
     151            finally: 
     152                self._selectables[fd] = writer 
     153                self._writes[fd] = 1 
    132154 
     155 
    133156    def removeReader(self, reader): 
    134         """Remove a Selectable for notification of data available to read. 
    135157        """ 
    136         fd = reader.fileno() 
     158        Implement L{IReactorFDSet.removeReader}. 
     159        """ 
     160        wasLost = False 
     161        try: 
     162            fd = reader.fileno() 
     163        except: 
     164            fd = -1 
     165        if fd == -1: 
     166            for fd, fdes in self._selectables.items(): 
     167                if reader is fdes: 
     168                    wasLost = True 
     169                    break 
     170            else: 
     171                return 
    137172        if fd in self._reads: 
    138173            del self._reads[fd] 
    139174            if fd not in self._writes: 
    140175                del self._selectables[fd] 
    141             self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 
     176            if not wasLost: 
     177                try: 
     178                    self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) 
     179                except OSError: 
     180                    pass 
    142181 
     182 
    143183    def removeWriter(self, writer): 
    144         """Remove a Selectable for notification of data available to write. 
    145184        """ 
    146         fd = writer.fileno() 
     185        Implement L{IReactorFDSet.removeWriter}. 
     186        """ 
     187        wasLost = False 
     188        try: 
     189            fd = writer.fileno() 
     190        except: 
     191            fd = -1 
     192        if fd == -1: 
     193            for fd, fdes in self._selectables.items(): 
     194                if writer is fdes: 
     195                    wasLost = True 
     196                    break 
     197            else: 
     198                return 
    147199        if fd in self._writes: 
    148200            del self._writes[fd] 
    149201            if fd not in self._reads: 
    150202                del self._selectables[fd] 
    151             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 
     203            if not wasLost: 
     204                try: 
     205                    self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) 
     206                except OSError: 
     207                    pass 
    152208 
     209 
    153210    def removeAll(self): 
    154211        """ 
    155         Remove all selectables, and return a list of them. 
     212        Implement L{IReactorFDSet.removeAll}. 
    156213        """ 
    157214        return self._removeAll( 
    158215            [self._selectables[fd] for fd in self._reads], 
     
    160217 
    161218 
    162219    def getReaders(self): 
     220        """ 
     221        Implement L{IReactorFDSet.getReaders}. 
     222        """ 
    163223        return [self._selectables[fd] for fd in self._reads] 
    164224 
    165225 
    166226    def getWriters(self): 
     227        """ 
     228        Implement L{IReactorFDSet.getWriters}. 
     229        """ 
    167230        return [self._selectables[fd] for fd in self._writes] 
    168231 
    169232 
    170233    def doKEvent(self, timeout): 
    171         """Poll the kqueue for new events.""" 
     234        """ 
     235        Poll the kqueue for new events. 
     236        """ 
    172237        if timeout is None: 
    173             timeout = 1000 
    174         else: 
    175             timeout = int(timeout * 1000) # convert seconds to milliseconds 
     238            timeout = 1 
    176239 
    177240        try: 
    178             l = self._kq.kevent([], len(self._selectables), timeout) 
     241            l = self._kq.control([], len(self._selectables), timeout) 
    179242        except OSError, e: 
    180243            if e[0] == errno.EINTR: 
    181244                return 
    182245            else: 
    183246                raise 
     247 
    184248        _drdw = self._doWriteOrRead 
    185249        for event in l: 
    186             why = None 
    187             fd, filter = event.ident, event.filter 
     250            fd = event.ident 
    188251            try: 
    189252                selectable = self._selectables[fd] 
    190253            except KeyError: 
    191254                # Handles the infrequent case where one selectable's 
    192255                # handler disconnects another. 
    193256                continue 
    194             log.callWithLogger(selectable, _drdw, selectable, fd, filter) 
     257            else: 
     258                log.callWithLogger(selectable, _drdw, selectable, fd, event) 
    195259 
    196     def _doWriteOrRead(self, selectable, fd, filter): 
    197         try: 
    198             if filter == EVFILT_READ: 
    199                 why = selectable.doRead() 
    200             if filter == EVFILT_WRITE: 
    201                 why = selectable.doWrite() 
    202             if not selectable.fileno() == fd: 
    203                 why = main.CONNECTION_LOST 
    204         except: 
    205             why = sys.exc_info()[1] 
    206             log.deferr() 
    207260 
     261    def _doWriteOrRead(self, selectable, fd, event): 
     262        """ 
     263        Private method called when a FD is ready for reading, writing or was 
     264        lost. Do the work and raise errors where necessary. 
     265        """ 
     266        why = None 
     267        inRead = False 
     268        (filter, flags, data, fflags) = ( 
     269            event.filter, event.flags, event.data, event.fflags) 
     270 
     271        if flags & KQ_EV_EOF and data and fflags: 
     272            why = main.CONNECTION_LOST 
     273        else: 
     274            try: 
     275                if selectable.fileno() == -1: 
     276                    inRead = False 
     277                    why = posixbase._NO_FILEDESC 
     278                else: 
     279                   if filter == KQ_FILTER_READ: 
     280                       inRead = True 
     281                       why = selectable.doRead() 
     282                   if filter == KQ_FILTER_WRITE: 
     283                       inRead = False 
     284                       why = selectable.doWrite() 
     285            except: 
     286                # Any exception from application code gets logged and will 
     287                # cause us to disconnect the selectable. 
     288                why = failure.Failure() 
     289                log.err(why) 
     290 
    208291        if why: 
    209             self.removeReader(selectable) 
    210             self.removeWriter(selectable) 
    211             selectable.connectionLost(failure.Failure(why)) 
     292            self._disconnectSelectable(selectable, why, inRead) 
    212293 
    213294    doIteration = doKEvent 
    214295 
    215296 
    216297def install(): 
    217     k = KQueueReactor() 
    218     main.installReactor(k) 
     298    """ 
     299    Install the kqueue() reactor. 
     300    """ 
     301    p = KQueueReactor() 
     302    from twisted.internet.main import installReactor 
     303    installReactor(p) 
    219304 
    220305 
    221306__all__ = ["KQueueReactor", "install"] 
  • 1918.feature

     
     1The kqueue reactor has been revived. (#1918)