Ticket #1918: kqueue.patch

File kqueue.patch, 10.4 KB (added by oberstet, 2 years ago)

select.kqueue based reactor

  • twisted/internet/test/test_fdset.py

     
    286286        reactor = self.buildReactor() 
    287287 
    288288        name = reactor.__class__.__name__ 
    289         if name in ('EPollReactor', 'CFReactor'): 
     289        if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): 
    290290            # Closing a file descriptor immediately removes it from the epoll 
    291291            # set without generating a notification.  That means epollreactor 
    292292            # will not call any methods on Victim after the close, so there's 
  • twisted/internet/kqreactor.py

     
    1010    | from twisted.internet import kqreactor 
    1111    | kqreactor.install() 
    1212 
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is 
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/} 
    15  
    16  
    17  
    18 You're going to need to patch PyKqueue:: 
    19  
    20     ===================================================== 
    21     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001 
    22     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002 
    23     @@ -137,7 +137,7 @@ 
    24      } 
    25       
    26      statichere PyTypeObject KQEvent_Type = { 
    27     -  PyObject_HEAD_INIT(NULL) 
    28     +  PyObject_HEAD_INIT(&PyType_Type) 
    29        0,                             // ob_size 
    30        "KQEvent",                     // tp_name 
    31        sizeof(KQEventObject),         // tp_basicsize 
    32     @@ -291,13 +291,14 @@ 
    33       
    34        /* Build timespec for timeout */ 
    35        totimespec.tv_sec = timeout / 1000; 
    36     -  totimespec.tv_nsec = (timeout % 1000) * 100000; 
    37     +  totimespec.tv_nsec = (timeout % 1000) * 1000000; 
    38       
    39        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 
    40       
    41        /* Make the call */ 
    42     - 
    43     +  Py_BEGIN_ALLOW_THREADS 
    44        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 
    45     +  Py_END_ALLOW_THREADS 
    46       
    47        /* Don't need the input event list anymore, so get rid of it */ 
    48        free (changelist); 
    49     @@ -361,7 +362,7 @@ 
    50      statichere PyTypeObject KQueue_Type = { 
    51             /* The ob_type field must be initialized in the module init function 
    52              * to be portable to Windows without using C++. */ 
    53     -   PyObject_HEAD_INIT(NULL) 
    54     +   PyObject_HEAD_INIT(&PyType_Type) 
    55             0,                  /*ob_size*/ 
    56             "KQueue",                   /*tp_name*/ 
    57             sizeof(KQueueObject),       /*tp_basicsize*/ 
    58  
     13This implementation depends on Python 2.6 or higher which has kqueue support 
     14built in the select module. 
    5915""" 
    6016 
    6117import errno, sys 
    6218 
    6319from zope.interface import implements 
    6420 
    65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD 
    66 from kqsyscall import kqueue, kevent 
     21from select import kqueue, kevent 
     22from select import KQ_FILTER_READ, KQ_FILTER_WRITE, KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF 
    6723 
    6824from twisted.internet.interfaces import IReactorFDSet 
    6925 
     
    7329 
    7430class KQueueReactor(posixbase.PosixReactorBase): 
    7531    """ 
    76     A reactor that uses kqueue(2)/kevent(2). 
     32    A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher 
     33    which has built in support for kqueue in the select module. 
    7734 
    7835    @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. 
    7936 
     
    9754    """ 
    9855    implements(IReactorFDSet) 
    9956 
     57 
    10058    def __init__(self): 
    10159        """ 
    10260        Initialize kqueue object, file descriptor tracking dictionaries, and the 
     
    10967        posixbase.PosixReactorBase.__init__(self) 
    11068 
    11169 
    112     def _updateRegistration(self, *args): 
    113         self._kq.kevent([kevent(*args)], 0, 0) 
     70    def _updateRegistration(self, fd, filter, op): 
     71        """ 
     72        Private method for changing kqueue registration. 
     73        """ 
     74        self._kq.control([kevent(fd, filter, op)], 0, 0) 
    11475 
     76 
    11577    def addReader(self, reader): 
    116         """Add a FileDescriptor for notification of data available to read. 
    11778        """ 
     79        Add a FileDescriptor for notification of data available to read. 
     80        """ 
    11881        fd = reader.fileno() 
    11982        if fd not in self._reads: 
    120             self._selectables[fd] = reader 
    121             self._reads[fd] = 1 
    122             self._updateRegistration(fd, EVFILT_READ, EV_ADD) 
     83            try: 
     84                self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 
     85            except OSError, e: 
     86                pass 
     87            finally: 
     88                self._selectables[fd] = reader 
     89                self._reads[fd] = 1 
    12390 
     91 
    12492    def addWriter(self, writer): 
    125         """Add a FileDescriptor for notification of data available to write. 
    12693        """ 
     94        Add a FileDescriptor for notification of data available to write. 
     95        """ 
    12796        fd = writer.fileno() 
    12897        if fd not in self._writes: 
    129             self._selectables[fd] = writer 
    130             self._writes[fd] = 1 
    131             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 
     98            try: 
     99                self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 
     100            except OSError, e: 
     101                pass 
     102            finally: 
     103                self._selectables[fd] = writer 
     104                self._writes[fd] = 1 
    132105 
     106 
    133107    def removeReader(self, reader): 
    134         """Remove a Selectable for notification of data available to read. 
    135108        """ 
    136         fd = reader.fileno() 
     109        Remove a Selectable for notification of data available to read. 
     110        """ 
     111        wasLost = False 
     112        try: 
     113            fd = reader.fileno() 
     114        except: 
     115            fd = -1 
     116        if fd == -1: 
     117            for fd, fdes in self._selectables.items(): 
     118                if reader is fdes: 
     119                    wasLost = True 
     120                    break 
     121            else: 
     122                return 
    137123        if fd in self._reads: 
    138124            del self._reads[fd] 
    139125            if fd not in self._writes: 
    140126                del self._selectables[fd] 
    141             self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 
     127            if not wasLost: 
     128                try: 
     129                    self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) 
     130                except OSError, e: 
     131                    pass 
    142132 
     133 
    143134    def removeWriter(self, writer): 
    144         """Remove a Selectable for notification of data available to write. 
    145135        """ 
    146         fd = writer.fileno() 
     136        Remove a Selectable for notification of data available to write. 
     137        """ 
     138        wasLost = False 
     139        try: 
     140            fd = writer.fileno() 
     141        except: 
     142            fd = -1 
     143        if fd == -1: 
     144            for fd, fdes in self._selectables.items(): 
     145                if writer is fdes: 
     146                    wasLost = True 
     147                    break 
     148            else: 
     149                return 
    147150        if fd in self._writes: 
    148151            del self._writes[fd] 
    149152            if fd not in self._reads: 
    150153                del self._selectables[fd] 
    151             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 
     154            if not wasLost: 
     155                try: 
     156                    self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) 
     157                except OSError, e: 
     158                    pass 
    152159 
     160 
    153161    def removeAll(self): 
    154162        """ 
    155163        Remove all selectables, and return a list of them. 
     
    168176 
    169177 
    170178    def doKEvent(self, timeout): 
    171         """Poll the kqueue for new events.""" 
     179        """ 
     180        Poll the kqueue for new events. 
     181        """ 
    172182        if timeout is None: 
    173             timeout = 1000 
    174         else: 
    175             timeout = int(timeout * 1000) # convert seconds to milliseconds 
     183            timeout = 1 
    176184 
    177185        try: 
    178             l = self._kq.kevent([], len(self._selectables), timeout) 
     186            l = self._kq.control([], len(self._selectables), timeout) 
    179187        except OSError, e: 
    180188            if e[0] == errno.EINTR: 
    181189                return 
    182190            else: 
    183191                raise 
     192 
    184193        _drdw = self._doWriteOrRead 
    185194        for event in l: 
    186             why = None 
    187             fd, filter = event.ident, event.filter 
     195            fd = event.ident 
    188196            try: 
    189197                selectable = self._selectables[fd] 
    190198            except KeyError: 
    191199                # Handles the infrequent case where one selectable's 
    192200                # handler disconnects another. 
    193201                continue 
    194             log.callWithLogger(selectable, _drdw, selectable, fd, filter) 
     202            else: 
     203                log.callWithLogger(selectable, _drdw, selectable, fd, event) 
    195204 
    196     def _doWriteOrRead(self, selectable, fd, filter): 
    197         try: 
    198             if filter == EVFILT_READ: 
    199                 why = selectable.doRead() 
    200             if filter == EVFILT_WRITE: 
    201                 why = selectable.doWrite() 
    202             if not selectable.fileno() == fd: 
    203                 why = main.CONNECTION_LOST 
    204         except: 
    205             why = sys.exc_info()[1] 
    206             log.deferr() 
    207205 
     206    def _doWriteOrRead(self, selectable, fd, event): 
     207        why = None 
     208        inRead = False 
     209        filter, flags, data, fflags = event.filter, event.flags, event.data, event.fflags 
     210 
     211        if flags & KQ_EV_EOF and data and fflags: 
     212            why = main.CONNECTION_LOST 
     213        else: 
     214            try: 
     215                if selectable.fileno() == -1: 
     216                    inRead = False 
     217                    why = posixbase._NO_FILEDESC 
     218                else: 
     219                   if filter == KQ_FILTER_READ: 
     220                       inRead = True 
     221                       why = selectable.doRead() 
     222                   if filter == KQ_FILTER_WRITE: 
     223                       inRead = False 
     224                       why = selectable.doWrite() 
     225            except: 
     226                # Any exception from application code gets logged and will 
     227                # cause us to disconnect the selectable. 
     228                why = sys.exc_info()[1] 
     229                log.err() 
     230 
    208231        if why: 
    209             self.removeReader(selectable) 
    210             self.removeWriter(selectable) 
    211             selectable.connectionLost(failure.Failure(why)) 
     232            self._disconnectSelectable(selectable, why, inRead) 
    212233 
    213234    doIteration = doKEvent 
    214235 
    215236 
    216237def install(): 
    217     k = KQueueReactor() 
    218     main.installReactor(k) 
     238    """ 
     239    Install the kqueue() reactor. 
     240    """ 
     241    p = KQueueReactor() 
     242    from twisted.internet.main import installReactor 
     243    installReactor(p) 
    219244 
    220245 
    221246__all__ = ["KQueueReactor", "install"]