Ticket #1918: kqueue.2.patch

File kqueue.2.patch, 12.1 KB (added by oberstet, 3 years ago)

Revised patch after review

  • twisted/internet/test/test_fdset.py

     
    286286        reactor = self.buildReactor() 
    287287 
    288288        name = reactor.__class__.__name__ 
    289         if name in ('EPollReactor', 'CFReactor'): 
     289        if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): 
    290290            # Closing a file descriptor immediately removes it from the epoll 
    291291            # set without generating a notification.  That means epollreactor 
    292292            # will not call any methods on Victim after the close, so there's 
  • twisted/internet/kqreactor.py

     
    44""" 
    55A kqueue()/kevent() based implementation of the Twisted main loop. 
    66 
    7 To install the event loop (and you should do this before any connections, 
    8 listeners or connectors are added):: 
     7To use this reactor, start your application specifying the kqueue reactor: 
    98 
    10     | from twisted.internet import kqreactor 
    11     | kqreactor.install() 
     9   twistd ... --reactor kqueue 
    1210 
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is 
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/} 
     11To install the event loop from code (and you should do this before any 
     12connections, listeners or connectors are added):: 
    1513 
     14   from twisted.internet import kqreactor 
     15   kqreactor.install() 
    1616 
     17This implementation depends on Python 2.6 or higher which has kqueue support 
     18built in the select module. 
    1719 
    18 You're going to need to patch PyKqueue:: 
     20Note, that you should use Python 2.6.5 or higher, since previous implementations 
     21of select.kqueue had 
    1922 
    20     ===================================================== 
    21     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001 
    22     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002 
    23     @@ -137,7 +137,7 @@ 
    24      } 
    25       
    26      statichere PyTypeObject KQEvent_Type = { 
    27     -  PyObject_HEAD_INIT(NULL) 
    28     +  PyObject_HEAD_INIT(&PyType_Type) 
    29        0,                             // ob_size 
    30        "KQEvent",                     // tp_name 
    31        sizeof(KQEventObject),         // tp_basicsize 
    32     @@ -291,13 +291,14 @@ 
    33       
    34        /* Build timespec for timeout */ 
    35        totimespec.tv_sec = timeout / 1000; 
    36     -  totimespec.tv_nsec = (timeout % 1000) * 100000; 
    37     +  totimespec.tv_nsec = (timeout % 1000) * 1000000; 
    38       
    39        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 
    40       
    41        /* Make the call */ 
    42     - 
    43     +  Py_BEGIN_ALLOW_THREADS 
    44        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 
    45     +  Py_END_ALLOW_THREADS 
    46       
    47        /* Don't need the input event list anymore, so get rid of it */ 
    48        free (changelist); 
    49     @@ -361,7 +362,7 @@ 
    50      statichere PyTypeObject KQueue_Type = { 
    51             /* The ob_type field must be initialized in the module init function 
    52              * to be portable to Windows without using C++. */ 
    53     -   PyObject_HEAD_INIT(NULL) 
    54     +   PyObject_HEAD_INIT(&PyType_Type) 
    55             0,                  /*ob_size*/ 
    56             "KQueue",                   /*tp_name*/ 
    57             sizeof(KQueueObject),       /*tp_basicsize*/ 
     23   http://bugs.python.org/issue5910 
    5824 
     25not yet fixed. 
    5926""" 
    6027 
    61 import errno, sys 
     28import errno 
    6229 
    6330from zope.interface import implements 
    6431 
    65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD 
    66 from kqsyscall import kqueue, kevent 
     32from select import kqueue, kevent 
     33from select import KQ_FILTER_READ, KQ_FILTER_WRITE, \ 
     34                   KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF 
    6735 
    6836from twisted.internet.interfaces import IReactorFDSet 
    6937 
     
    7341 
    7442class KQueueReactor(posixbase.PosixReactorBase): 
    7543    """ 
    76     A reactor that uses kqueue(2)/kevent(2). 
     44    A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher 
     45    which has built in support for kqueue in the select module. 
    7746 
    7847    @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. 
    7948 
     
    9766    """ 
    9867    implements(IReactorFDSet) 
    9968 
     69 
    10070    def __init__(self): 
    10171        """ 
    10272        Initialize kqueue object, file descriptor tracking dictionaries, and the 
     
    10979        posixbase.PosixReactorBase.__init__(self) 
    11080 
    11181 
    112     def _updateRegistration(self, *args): 
    113         self._kq.kevent([kevent(*args)], 0, 0) 
     82    def _updateRegistration(self, fd, filter, op): 
     83        """ 
     84        Private method for changing kqueue registration on a given FD 
     85        filtering for events given filter/op. This will never block and 
     86        returns nothing. 
     87        """ 
     88        self._kq.control([kevent(fd, filter, op)], 0, 0) 
    11489 
     90 
    11591    def addReader(self, reader): 
    116         """Add a FileDescriptor for notification of data available to read. 
    11792        """ 
     93        Implement L{IReactorFDSet.addReader}. 
     94        """ 
    11895        fd = reader.fileno() 
    11996        if fd not in self._reads: 
    120             self._selectables[fd] = reader 
    121             self._reads[fd] = 1 
    122             self._updateRegistration(fd, EVFILT_READ, EV_ADD) 
     97            try: 
     98                self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 
     99            except OSError: 
     100                pass 
     101            finally: 
     102                self._selectables[fd] = reader 
     103                self._reads[fd] = 1 
    123104 
     105 
    124106    def addWriter(self, writer): 
    125         """Add a FileDescriptor for notification of data available to write. 
    126107        """ 
     108        Implement L{IReactorFDSet.addWriter}. 
     109        """ 
    127110        fd = writer.fileno() 
    128111        if fd not in self._writes: 
    129             self._selectables[fd] = writer 
    130             self._writes[fd] = 1 
    131             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 
     112            try: 
     113                self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 
     114            except OSError: 
     115                pass 
     116            finally: 
     117                self._selectables[fd] = writer 
     118                self._writes[fd] = 1 
    132119 
     120 
    133121    def removeReader(self, reader): 
    134         """Remove a Selectable for notification of data available to read. 
    135122        """ 
    136         fd = reader.fileno() 
     123        Implement L{IReactorFDSet.removeReader}. 
     124        """ 
     125        wasLost = False 
     126        try: 
     127            fd = reader.fileno() 
     128        except: 
     129            fd = -1 
     130        if fd == -1: 
     131            for fd, fdes in self._selectables.items(): 
     132                if reader is fdes: 
     133                    wasLost = True 
     134                    break 
     135            else: 
     136                return 
    137137        if fd in self._reads: 
    138138            del self._reads[fd] 
    139139            if fd not in self._writes: 
    140140                del self._selectables[fd] 
    141             self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 
     141            if not wasLost: 
     142                try: 
     143                    self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) 
     144                except OSError: 
     145                    pass 
    142146 
     147 
    143148    def removeWriter(self, writer): 
    144         """Remove a Selectable for notification of data available to write. 
    145149        """ 
    146         fd = writer.fileno() 
     150        Implement L{IReactorFDSet.removeWriter}. 
     151        """ 
     152        wasLost = False 
     153        try: 
     154            fd = writer.fileno() 
     155        except: 
     156            fd = -1 
     157        if fd == -1: 
     158            for fd, fdes in self._selectables.items(): 
     159                if writer is fdes: 
     160                    wasLost = True 
     161                    break 
     162            else: 
     163                return 
    147164        if fd in self._writes: 
    148165            del self._writes[fd] 
    149166            if fd not in self._reads: 
    150167                del self._selectables[fd] 
    151             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 
     168            if not wasLost: 
     169                try: 
     170                    self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) 
     171                except OSError: 
     172                    pass 
    152173 
     174 
    153175    def removeAll(self): 
    154176        """ 
    155         Remove all selectables, and return a list of them. 
     177        Implement L{IReactorFDSet.removeAll}. 
    156178        """ 
    157179        return self._removeAll( 
    158180            [self._selectables[fd] for fd in self._reads], 
     
    160182 
    161183 
    162184    def getReaders(self): 
     185        """ 
     186        Implement L{IReactorFDSet.getReaders}. 
     187        """ 
    163188        return [self._selectables[fd] for fd in self._reads] 
    164189 
    165190 
    166191    def getWriters(self): 
     192        """ 
     193        Implement L{IReactorFDSet.getWriters}. 
     194        """ 
    167195        return [self._selectables[fd] for fd in self._writes] 
    168196 
    169197 
    170198    def doKEvent(self, timeout): 
    171         """Poll the kqueue for new events.""" 
     199        """ 
     200        Poll the kqueue for new events. 
     201        """ 
    172202        if timeout is None: 
    173             timeout = 1000 
    174         else: 
    175             timeout = int(timeout * 1000) # convert seconds to milliseconds 
     203            timeout = 1 
    176204 
    177205        try: 
    178             l = self._kq.kevent([], len(self._selectables), timeout) 
     206            l = self._kq.control([], len(self._selectables), timeout) 
    179207        except OSError, e: 
    180208            if e[0] == errno.EINTR: 
    181209                return 
    182210            else: 
    183211                raise 
     212 
    184213        _drdw = self._doWriteOrRead 
    185214        for event in l: 
    186             why = None 
    187             fd, filter = event.ident, event.filter 
     215            fd = event.ident 
    188216            try: 
    189217                selectable = self._selectables[fd] 
    190218            except KeyError: 
    191219                # Handles the infrequent case where one selectable's 
    192220                # handler disconnects another. 
    193221                continue 
    194             log.callWithLogger(selectable, _drdw, selectable, fd, filter) 
     222            else: 
     223                log.callWithLogger(selectable, _drdw, selectable, fd, event) 
    195224 
    196     def _doWriteOrRead(self, selectable, fd, filter): 
    197         try: 
    198             if filter == EVFILT_READ: 
    199                 why = selectable.doRead() 
    200             if filter == EVFILT_WRITE: 
    201                 why = selectable.doWrite() 
    202             if not selectable.fileno() == fd: 
    203                 why = main.CONNECTION_LOST 
    204         except: 
    205             why = sys.exc_info()[1] 
    206             log.deferr() 
    207225 
     226    def _doWriteOrRead(self, selectable, fd, event): 
     227        """ 
     228        Private method called when a FD is ready for reading, writing or was 
     229        lost. Do the work and raise errors where necessary. 
     230        """ 
     231        why = None 
     232        inRead = False 
     233        (filter, flags, data, fflags) = ( 
     234            event.filter, event.flags, event.data, event.fflags) 
     235 
     236        if flags & KQ_EV_EOF and data and fflags: 
     237            why = main.CONNECTION_LOST 
     238        else: 
     239            try: 
     240                if selectable.fileno() == -1: 
     241                    inRead = False 
     242                    why = posixbase._NO_FILEDESC 
     243                else: 
     244                   if filter == KQ_FILTER_READ: 
     245                       inRead = True 
     246                       why = selectable.doRead() 
     247                   if filter == KQ_FILTER_WRITE: 
     248                       inRead = False 
     249                       why = selectable.doWrite() 
     250            except: 
     251                # Any exception from application code gets logged and will 
     252                # cause us to disconnect the selectable. 
     253                why = failure.Failure() 
     254                log.err(why) 
     255 
    208256        if why: 
    209             self.removeReader(selectable) 
    210             self.removeWriter(selectable) 
    211             selectable.connectionLost(failure.Failure(why)) 
     257            self._disconnectSelectable(selectable, why, inRead) 
    212258 
    213259    doIteration = doKEvent 
    214260 
    215261 
    216262def install(): 
    217     k = KQueueReactor() 
    218     main.installReactor(k) 
     263    """ 
     264    Install the kqueue() reactor. 
     265    """ 
     266    p = KQueueReactor() 
     267    from twisted.internet.main import installReactor 
     268    installReactor(p) 
    219269 
    220270 
    221271__all__ = ["KQueueReactor", "install"] 
  • NEWS

     
    2323 - twisted.protocols.ftp.FTP.ftp_STOR now catches `FTPCmdError`s 
    2424   raised by the file writer, and returns the error back to the 
    2525   client. (#4909) 
     26 - The kqueue reactor has been revived. (#1918) 
    2627 
    2728Bugfixes 
    2829--------