Ticket #1918: kqueue.diff

File kqueue.diff, 15.1 KB (added by dialtone, 8 years ago)

Working KQueue reactor patch

  • twisted/python/runtime.py

     
    7676            return imp.find_module('thread')[0] is None 
    7777        except ImportError: 
    7878            return False 
     79     
     80    def usingKQueue(self): 
     81        """Are we using KQeueueReactor? 
     82        """ 
     83        try: 
     84            from twisted.internet import kqreactor, reactor 
     85            if isinstance(reactor, kqreactor.KQueueReactor): 
     86                return True 
     87        except ImportError: 
     88            return False 
    7989 
    8090platform = Platform() 
    8191platformType = platform.getType() 
  • twisted/conch/test/test_manhole.py

     
    1010from twisted.internet import error, defer 
    1111from twisted.conch.test.test_recvline import _TelnetMixin, _SSHMixin, _StdioMixin, stdio, ssh 
    1212from twisted.conch import manhole 
     13from twisted.python import runtime 
    1314 
    1415def determineDefaultFunctionName(): 
    1516    """ 
     
    2223        return traceback.extract_stack()[0][2] 
    2324defaultFunctionName = determineDefaultFunctionName() 
    2425 
     26kqreactor = False 
     27kqosx = False 
     28if runtime.platform.usingKQueue(): 
     29    kqueueskipmessage = "KQueue doesn't support this yet" 
     30    kqreactor = True 
     31    if runtime.platform.isMacOSX(): 
     32        kqosx = True 
    2533 
    2634class WriterTestCase(unittest.TestCase): 
    2735    def testInteger(self): 
     
    272280        skip = "Terminal requirements missing, can't run manhole tests over stdio" 
    273281    else: 
    274282        serverProtocol = stdio.ConsoleManhole 
     283if kqosx: 
     284    ManholeLoopbackMixin.skip = kqueueskipmessage 
     285 No newline at end of file 
  • twisted/conch/test/test_recvline.py

     
    1010from twisted.conch.insults import insults 
    1111from twisted.conch import recvline 
    1212 
    13 from twisted.python import log, reflect, components 
     13from twisted.python import log, reflect, components, runtime 
    1414from twisted.internet import defer, error, task 
    1515from twisted.trial import unittest 
    1616from twisted.cred import portal 
    1717from twisted.test.proto_helpers import StringTransport 
    1818 
     19kqreactor = False 
     20kqosx = False 
     21if runtime.platform.usingKQueue(): 
     22    kqueueskipmessage = "KQueue doesn't support this yet" 
     23    kqreactor = True 
     24    if runtime.platform.isMacOSX(): 
     25        kqosx = True 
     26 
    1927class Arrows(unittest.TestCase): 
    2028    def setUp(self): 
    2129        self.underlyingTransport = StringTransport() 
     
    664672class HistoricRecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin): 
    665673    if stdio is None: 
    666674        skip = "Terminal requirements missing, can't run historic recvline tests over stdio" 
     675if kqosx: 
     676    _StdioMixin.skip = kqueueskipmessage + " on OSX" 
     677 No newline at end of file 
  • twisted/test/test_process.py

     
    2929from twisted.python import util, runtime 
    3030from twisted.python import procutils 
    3131 
     32kqreactor = False 
     33kqosx = False 
     34if runtime.platform.usingKQueue(): 
     35    kqueueskipmessage = "KQueue doesn't support this yet" 
     36    kqreactor = True 
     37    if runtime.platform.isMacOSX(): 
     38        kqosx = True 
     39 
    3240class TrivialProcessProtocol(protocol.ProcessProtocol): 
    3341    def __init__(self, d): 
    3442        self.deferred = d 
     
    371379        reactor.callLater(1, self.close, 0) 
    372380        reactor.callLater(2, self.close, 1) 
    373381        return self._onClose() 
     382    if kqosx: 
     383        testClosePty.skip = kqueueskipmessage + " on Mac OSX" 
    374384 
    375385    def testKillPty(self): 
    376386        if self.verbose: print "starting processes" 
     
    378388        reactor.callLater(1, self.kill, 0) 
    379389        reactor.callLater(2, self.kill, 1) 
    380390        return self._onClose() 
     391    if kqreactor: 
     392        testKillPty.skip = kqueueskipmessage 
    381393 
    382394class FDChecker(protocol.ProcessProtocol): 
    383395    state = 0 
     
    554566            self.assertEquals(p.reason.value.signal, None) 
    555567        d.addCallback(check) 
    556568        return d 
     569    if kqreactor: 
     570        testNormalTermination.skip = kqueueskipmessage 
    557571 
    558572    def testAbnormalTermination(self): 
    559573        if os.path.exists('/bin/false'): cmd = '/bin/false' 
     
    571585            self.assertEquals(p.reason.value.signal, None) 
    572586        d.addCallback(check) 
    573587        return d 
     588    if kqreactor: 
     589        testAbnormalTermination.skip = kqueueskipmessage 
    574590 
    575591    def _testSignal(self, sig): 
    576592        exe = sys.executable 
     
    665681                "Error message from process_tty follows:\n\n%s\n\n" % p.outF.getvalue()) 
    666682        return d.addCallback(processEnded) 
    667683 
    668  
    669684    def testBadArgs(self): 
    670685        pyExe = sys.executable 
    671686        pyArgs = [pyExe, "-u", "-c", "print 'hello'"] 
     
    870885if not interfaces.IReactorProcess(reactor, None): 
    871886    ProcessTestCase.skip = skipMessage 
    872887    ClosingPipes.skip = skipMessage 
     888 
     889if kqreactor: 
     890    PosixProcessTestCasePTY.skip = kqueueskipmessage 
  • twisted/internet/kqreactor.py

     
    1010    | from twisted.internet import kqreactor 
    1111    | kqreactor.install() 
    1212 
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is 
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/} 
     13This reactor only works on FreeBSD and requires PyKQueue 2, which is 
     14available at:  U{http://python-hpio.net/trac/wiki/PyKQueue} 
    1515 
    1616API Stability: stable 
    1717 
    1818Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} 
    19  
    20  
    21  
    22 You're going to need to patch PyKqueue:: 
    23  
    24     ===================================================== 
    25     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001 
    26     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002 
    27     @@ -137,7 +137,7 @@ 
    28      } 
    29       
    30      statichere PyTypeObject KQEvent_Type = { 
    31     -  PyObject_HEAD_INIT(NULL) 
    32     +  PyObject_HEAD_INIT(&PyType_Type) 
    33        0,                             // ob_size 
    34        "KQEvent",                     // tp_name 
    35        sizeof(KQEventObject),         // tp_basicsize 
    36     @@ -291,13 +291,14 @@ 
    37       
    38        /* Build timespec for timeout */ 
    39        totimespec.tv_sec = timeout / 1000; 
    40     -  totimespec.tv_nsec = (timeout % 1000) * 100000; 
    41     +  totimespec.tv_nsec = (timeout % 1000) * 1000000; 
    42       
    43        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 
    44       
    45        /* Make the call */ 
    46     - 
    47     +  Py_BEGIN_ALLOW_THREADS 
    48        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 
    49     +  Py_END_ALLOW_THREADS 
    50       
    51        /* Don't need the input event list anymore, so get rid of it */ 
    52        free (changelist); 
    53     @@ -361,7 +362,7 @@ 
    54      statichere PyTypeObject KQueue_Type = { 
    55             /* The ob_type field must be initialized in the module init function 
    56              * to be portable to Windows without using C++. */ 
    57     -   PyObject_HEAD_INIT(NULL) 
    58     +   PyObject_HEAD_INIT(&PyType_Type) 
    59             0,                  /*ob_size*/ 
    60             "KQueue",                   /*tp_name*/ 
    61             sizeof(KQueueObject),       /*tp_basicsize*/ 
    62  
    6319""" 
    6420 
    6521# System imports 
    6622import errno, sys 
    6723 
    6824# PyKQueue imports 
    69 from kqsyscall import * 
     25from kqueue import * 
    7026 
     27from zope.interface import implements 
     28 
    7129# Twisted imports 
    7230from twisted.python import log, failure 
    7331 
     32from twisted.internet.interfaces import IReactorFDSet 
     33 
    7434# Sibling imports 
    7535import main 
    7636import posixbase 
    77  
     37try: 
     38    set() 
     39except NameError: 
     40    from set import Set as set 
     41                                 
    7842# globals 
    79 reads = {} 
    80 writes = {} 
     43reads = set() 
     44writes = set() 
    8145selectables = {} 
     46reverse = {} 
    8247kq = kqueue() 
    8348 
    8449 
    8550class KQueueReactor(posixbase.PosixReactorBase): 
    8651    """A reactor that uses kqueue(2)/kevent(2).""" 
     52    implements(IReactorFDSet) 
    8753 
    88     def _updateRegistration(self, *args): 
    89         kq.kevent([kevent(*args)], 0, 0) 
     54    def _updateRegistration(self, fd, filter, flags, OSError=OSError, kq=kq): 
     55        try: 
     56            kevent(kq, [Event(fd, filter, flags)], 0, 0) 
     57        except OSError, e: 
     58            if e[0] == errno.EBADF: 
     59                return 
     60            else: 
     61                raise 
    9062 
    9163    def addReader(self, reader): 
    9264        """Add a FileDescriptor for notification of data available to read. 
    9365        """ 
    9466        fd = reader.fileno() 
    95         if not reads.has_key(fd): 
     67        reads.add(fd) 
     68        if not selectables.has_key(fd): 
    9669            selectables[fd] = reader 
    97             reads[fd] = 1 
    98             self._updateRegistration(fd, EVFILT_READ, EV_ADD) 
     70            reverse[reader] = fd 
     71            self._updateRegistration(fd, EVFILT_READ, EV_ADD|EV_ENABLE) 
     72            self._updateRegistration(fd, EVFILT_WRITE, EV_ADD|EV_DISABLE) 
     73        else: 
     74            self._updateRegistration(fd, EVFILT_READ, EV_ENABLE) 
    9975 
    100     def addWriter(self, writer, writes=writes, selectables=selectables): 
     76    def addWriter(self, writer): 
    10177        """Add a FileDescriptor for notification of data available to write. 
    10278        """ 
    10379        fd = writer.fileno() 
    104         if not writes.has_key(fd): 
     80        writes.add(fd) 
     81        if not selectables.has_key(fd): 
    10582            selectables[fd] = writer 
    106             writes[fd] = 1 
    107             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 
     83            reverse[writer] = fd 
     84            self._updateRegistration(fd, EVFILT_WRITE, EV_ADD|EV_ENABLE) 
     85            self._updateRegistration(fd, EVFILT_READ, EV_ADD|EV_DISABLE) 
     86        else: 
     87            self._updateRegistration(fd, EVFILT_WRITE, EV_ENABLE) 
    10888 
    10989    def removeReader(self, reader): 
    11090        """Remove a Selectable for notification of data available to read. 
    11191        """ 
    11292        fd = reader.fileno() 
    113         if reads.has_key(fd): 
    114             del reads[fd] 
    115             if not writes.has_key(fd): del selectables[fd] 
    116             self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 
     93        if fd == -1: 
     94            try: 
     95                fd = reverse[reader] 
     96            except KeyError: 
     97                return 
     98        if fd in reads: 
     99            reads.discard(fd) 
     100            if fd not in writes: 
     101                del selectables[fd] 
     102                del reverse[reader] 
     103                self._updateRegistration(fd, EVFILT_READ, EV_DISABLE) 
     104            else: 
     105                self._updateRegistration(fd, EVFILT_READ, EV_DISABLE) 
    117106 
    118     def removeWriter(self, writer, writes=writes): 
     107    def removeWriter(self, writer): 
    119108        """Remove a Selectable for notification of data available to write. 
    120109        """ 
    121110        fd = writer.fileno() 
    122         if writes.has_key(fd): 
    123             del writes[fd] 
    124             if not reads.has_key(fd): del selectables[fd] 
    125             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 
     111        if fd == -1: 
     112            try: 
     113                fd = reverse[writer] 
     114            except KeyError: 
     115                return 
     116        if fd in writes: 
     117            writes.discard(fd) 
     118            if fd not in reads: 
     119                del selectables[fd] 
     120                del reverse[writer] 
     121                self._updateRegistration(fd, EVFILT_WRITE, EV_DISABLE) 
     122            else: 
     123                self._updateRegistration(fd, EVFILT_WRITE, EV_DISABLE) 
    126124 
    127125    def removeAll(self): 
    128126        """Remove all selectables, and return a list of them.""" 
    129127        if self.waker is not None: 
    130128            self.removeReader(self.waker) 
    131129        result = selectables.values() 
    132         for fd in reads.keys(): 
     130        for fd in reads: 
    133131            self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 
    134         for fd in writes.keys(): 
     132        for fd in writes: 
    135133            self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 
    136134        reads.clear() 
    137135        writes.clear() 
    138136        selectables.clear() 
     137        reverse.clear() 
    139138        if self.waker is not None: 
    140139            self.addReader(self.waker) 
    141140        return result 
    142141 
    143142    def doKEvent(self, timeout, 
    144                  reads=reads, 
    145                  writes=writes, 
    146143                 selectables=selectables, 
    147144                 kq=kq, 
    148145                 log=log, 
    149                  OSError=OSError, 
    150                  EVFILT_READ=EVFILT_READ, 
    151                  EVFILT_WRITE=EVFILT_WRITE): 
     146                 OSError=OSError): 
    152147        """Poll the kqueue for new events.""" 
    153148        if timeout is None: 
    154             timeout = 1000 
     149            timeout = 1000000 
    155150        else: 
    156             timeout = int(timeout * 1000) # convert seconds to milliseconds 
     151            timeout = int(timeout * 1000000) # convert seconds to nanoseconds 
    157152 
    158153        try: 
    159             l = kq.kevent([], len(selectables), timeout) 
     154            l = kevent(kq, [], len(selectables), timeout) 
    160155        except OSError, e: 
    161156            if e[0] == errno.EINTR: 
    162157                return 
    163158            else: 
    164159                raise 
    165160        _drdw = self._doWriteOrRead 
     161 
    166162        for event in l: 
    167             why = None 
    168             fd, filter = event.ident, event.filter 
    169             selectable = selectables[fd] 
    170             log.callWithLogger(selectable, _drdw, selectable, fd, filter) 
     163            fd = event.ident 
     164            try: 
     165                selectable = selectables[fd] 
     166            except KeyError: 
     167                continue 
     168            log.callWithLogger(selectable, _drdw, selectable, fd, event) 
    171169 
    172     def _doWriteOrRead(self, selectable, fd, filter): 
    173         try: 
    174             if filter == EVFILT_READ: 
    175                 why = selectable.doRead() 
    176             if filter == EVFILT_WRITE: 
    177                 why = selectable.doWrite() 
    178             if not selectable.fileno() == fd: 
    179                 why = main.CONNECTION_LOST 
    180         except: 
    181             why = sys.exc_info()[1] 
    182             log.deferr() 
     170    def _doWriteOrRead(self, selectable, fd, event, CONNECTION_LOST=main.CONNECTION_LOST, EV_EOF=EV_EOF, EVFILT_READ=EVFILT_READ, EVFILT_WRITE=EVFILT_WRITE): 
     171        why = None 
     172        inRead = False 
     173        filter, flags, data, fflags = event.filter, event.flags, event.data, event.fflags 
     174        if flags & EV_EOF and data and fflags: 
     175            why = CONNECTION_LOST 
     176        else: 
     177            try: 
     178                if filter == EVFILT_READ: 
     179                    inRead = True 
     180                    why = selectable.doRead() 
     181                if filter == EVFILT_WRITE: 
     182                    inRead = False 
     183                    why = selectable.doWrite() 
     184                if not selectable.fileno() == fd: 
     185                    why = CONNECTION_LOST 
     186                    inRead = False 
     187            except: 
     188                why = sys.exc_info()[1] 
     189                log.deferr() 
    183190 
    184191        if why: 
    185             self.removeReader(selectable) 
    186             self.removeWriter(selectable) 
    187             selectable.connectionLost(failure.Failure(why)) 
     192            self._disconnectSelectable(selectable, why, inRead) 
    188193 
    189194    doIteration = doKEvent 
    190195 
     
    195200 
    196201 
    197202__all__ = ["KQueueReactor", "install"] 
     203