Ticket #1918: kqueue_1.1.diff

File kqueue_1.1.diff, 12.9 KB (added by dialtone, 8 years ago)

New version

  • twisted/conch/test/test_recvline.py

     
    1111from twisted.conch import recvline 
    1212 
    1313from twisted.python import log, reflect, components 
    14 from twisted.internet import defer, error, task 
     14from twisted.internet import defer, error, task, reactor 
    1515from twisted.trial import unittest 
    1616from twisted.cred import portal 
    1717from twisted.test.proto_helpers import StringTransport 
     
    664664class HistoricRecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin): 
    665665    if stdio is None: 
    666666        skip = "Terminal requirements missing, can't run historic recvline tests over stdio" 
     667 
     668if reactor.__class__.__name__ == 'KQueueReactor': 
     669    _StdioMixin.skip = "KQueue doesn't support this yet" 
     670 
  • twisted/test/test_process.py

     
    2929from twisted.python import util, runtime 
    3030from twisted.python import procutils 
    3131 
     32kqreactor = False 
     33kqosx = False 
     34if reactor.__class__.__name__ == 'KQueueReactor': 
     35    kqueueskipmessage = "KQueue doesn't support this yet" 
     36    kqreactor = True 
     37    if runtime.platform.isMacOSX(): 
     38        kqosx = True 
     39 
    3240class TrivialProcessProtocol(protocol.ProcessProtocol): 
    3341    def __init__(self, d): 
    3442        self.deferred = d 
     
    371379        reactor.callLater(1, self.close, 0) 
    372380        reactor.callLater(2, self.close, 1) 
    373381        return self._onClose() 
     382    if kqosx: 
     383        testClosePty.skip = kqueueskipmessage + " on Mac OSX" 
    374384 
    375385    def testKillPty(self): 
    376386        if self.verbose: print "starting processes" 
     
    378388        reactor.callLater(1, self.kill, 0) 
    379389        reactor.callLater(2, self.kill, 1) 
    380390        return self._onClose() 
     391    if kqreactor: 
     392        testKillPty.skip = kqueueskipmessage 
    381393 
    382394class FDChecker(protocol.ProcessProtocol): 
    383395    state = 0 
     
    554566            self.assertEquals(p.reason.value.signal, None) 
    555567        d.addCallback(check) 
    556568        return d 
     569    if kqreactor: 
     570        testNormalTermination.skip = kqueueskipmessage 
    557571 
    558572    def testAbnormalTermination(self): 
    559573        if os.path.exists('/bin/false'): cmd = '/bin/false' 
     
    571585            self.assertEquals(p.reason.value.signal, None) 
    572586        d.addCallback(check) 
    573587        return d 
     588    if kqreactor: 
     589        testAbnormalTermination.skip = kqueueskipmessage 
    574590 
    575591    def _testSignal(self, sig): 
    576592        exe = sys.executable 
     
    665681                "Error message from process_tty follows:\n\n%s\n\n" % p.outF.getvalue()) 
    666682        return d.addCallback(processEnded) 
    667683 
    668  
    669684    def testBadArgs(self): 
    670685        pyExe = sys.executable 
    671686        pyArgs = [pyExe, "-u", "-c", "print 'hello'"] 
     
    870885if not interfaces.IReactorProcess(reactor, None): 
    871886    ProcessTestCase.skip = skipMessage 
    872887    ClosingPipes.skip = skipMessage 
     888 
     889if kqreactor: 
     890    PosixProcessTestCasePTY.skip = kqueueskipmessage 
  • twisted/internet/kqreactor.py

     
    1010    | from twisted.internet import kqreactor 
    1111    | kqreactor.install() 
    1212 
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is 
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/} 
     13This reactor only works on FreeBSD and requires PyKQueue 2, which is 
     14available at:  U{http://python-hpio.net/trac/wiki/PyKQueue} 
    1515 
    1616API Stability: stable 
    1717 
    1818Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} 
    19  
    20  
    21  
    22 You're going to need to patch PyKqueue:: 
    23  
    24     ===================================================== 
    25     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001 
    26     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002 
    27     @@ -137,7 +137,7 @@ 
    28      } 
    29       
    30      statichere PyTypeObject KQEvent_Type = { 
    31     -  PyObject_HEAD_INIT(NULL) 
    32     +  PyObject_HEAD_INIT(&PyType_Type) 
    33        0,                             // ob_size 
    34        "KQEvent",                     // tp_name 
    35        sizeof(KQEventObject),         // tp_basicsize 
    36     @@ -291,13 +291,14 @@ 
    37       
    38        /* Build timespec for timeout */ 
    39        totimespec.tv_sec = timeout / 1000; 
    40     -  totimespec.tv_nsec = (timeout % 1000) * 100000; 
    41     +  totimespec.tv_nsec = (timeout % 1000) * 1000000; 
    42       
    43        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 
    44       
    45        /* Make the call */ 
    46     - 
    47     +  Py_BEGIN_ALLOW_THREADS 
    48        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 
    49     +  Py_END_ALLOW_THREADS 
    50       
    51        /* Don't need the input event list anymore, so get rid of it */ 
    52        free (changelist); 
    53     @@ -361,7 +362,7 @@ 
    54      statichere PyTypeObject KQueue_Type = { 
    55             /* The ob_type field must be initialized in the module init function 
    56              * to be portable to Windows without using C++. */ 
    57     -   PyObject_HEAD_INIT(NULL) 
    58     +   PyObject_HEAD_INIT(&PyType_Type) 
    59             0,                  /*ob_size*/ 
    60             "KQueue",                   /*tp_name*/ 
    61             sizeof(KQueueObject),       /*tp_basicsize*/ 
    62  
    6319""" 
    6420 
    6521# System imports 
    6622import errno, sys 
    6723 
    6824# PyKQueue imports 
    69 from kqsyscall import * 
     25from kqueue import * 
    7026 
     27from zope.interface import implements 
     28 
    7129# Twisted imports 
    7230from twisted.python import log, failure 
    7331 
     32from twisted.internet.interfaces import IReactorFDSet 
     33 
    7434# Sibling imports 
    7535import main 
    7636import posixbase 
    77  
     37try: 
     38    set() 
     39except NameError: 
     40    from set import Set as set 
     41                                 
    7842# globals 
    79 reads = {} 
    80 writes = {} 
     43reads = set() 
     44writes = set() 
    8145selectables = {} 
     46reverse = {} 
    8247kq = kqueue() 
    8348 
    8449 
    8550class KQueueReactor(posixbase.PosixReactorBase): 
    8651    """A reactor that uses kqueue(2)/kevent(2).""" 
     52    implements(IReactorFDSet) 
    8753 
    88     def _updateRegistration(self, *args): 
    89         kq.kevent([kevent(*args)], 0, 0) 
     54    def _updateRegistration(self, fd, filter, flags, OSError=OSError, kq=kq): 
     55        try: 
     56            kevent(kq, [Event(fd, filter, flags)], 0, 0) 
     57        except OSError, e: 
     58            if e[0] == errno.EBADF: 
     59                return 
     60            else: 
     61                raise 
    9062 
    9163    def addReader(self, reader): 
    9264        """Add a FileDescriptor for notification of data available to read. 
    9365        """ 
    9466        fd = reader.fileno() 
    95         if not reads.has_key(fd): 
     67        reads.add(fd) 
     68        if not selectables.has_key(fd): 
    9669            selectables[fd] = reader 
    97             reads[fd] = 1 
    98             self._updateRegistration(fd, EVFILT_READ, EV_ADD) 
     70            reverse[reader] = fd 
     71            self._updateRegistration(fd, EVFILT_READ, EV_ADD|EV_ENABLE) 
     72            self._updateRegistration(fd, EVFILT_WRITE, EV_ADD|EV_DISABLE) 
     73        else: 
     74            self._updateRegistration(fd, EVFILT_READ, EV_ENABLE) 
    9975 
    100     def addWriter(self, writer, writes=writes, selectables=selectables): 
     76    def addWriter(self, writer): 
    10177        """Add a FileDescriptor for notification of data available to write. 
    10278        """ 
    10379        fd = writer.fileno() 
    104         if not writes.has_key(fd): 
     80        writes.add(fd) 
     81        if not selectables.has_key(fd): 
    10582            selectables[fd] = writer 
    106             writes[fd] = 1 
    107             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 
     83            reverse[writer] = fd 
     84            self._updateRegistration(fd, EVFILT_WRITE, EV_ADD|EV_ENABLE) 
     85            self._updateRegistration(fd, EVFILT_READ, EV_ADD|EV_DISABLE) 
     86        else: 
     87            self._updateRegistration(fd, EVFILT_WRITE, EV_ENABLE) 
    10888 
    10989    def removeReader(self, reader): 
    11090        """Remove a Selectable for notification of data available to read. 
    11191        """ 
    11292        fd = reader.fileno() 
    113         if reads.has_key(fd): 
    114             del reads[fd] 
    115             if not writes.has_key(fd): del selectables[fd] 
    116             self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 
     93        if fd == -1: 
     94            try: 
     95                fd = reverse[reader] 
     96            except KeyError: 
     97                return 
     98        if fd in reads: 
     99            reads.discard(fd) 
     100            if fd not in writes: 
     101                del selectables[fd] 
     102                del reverse[reader] 
     103                self._updateRegistration(fd, EVFILT_READ, EV_DISABLE) 
     104            else: 
     105                self._updateRegistration(fd, EVFILT_READ, EV_DISABLE) 
    117106 
    118     def removeWriter(self, writer, writes=writes): 
     107    def removeWriter(self, writer): 
    119108        """Remove a Selectable for notification of data available to write. 
    120109        """ 
    121110        fd = writer.fileno() 
    122         if writes.has_key(fd): 
    123             del writes[fd] 
    124             if not reads.has_key(fd): del selectables[fd] 
    125             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 
     111        if fd == -1: 
     112            try: 
     113                fd = reverse[writer] 
     114            except KeyError: 
     115                return 
     116        if fd in writes: 
     117            writes.discard(fd) 
     118            if fd not in reads: 
     119                del selectables[fd] 
     120                del reverse[writer] 
     121                self._updateRegistration(fd, EVFILT_WRITE, EV_DISABLE) 
     122            else: 
     123                self._updateRegistration(fd, EVFILT_WRITE, EV_DISABLE) 
    126124 
    127125    def removeAll(self): 
    128126        """Remove all selectables, and return a list of them.""" 
    129127        if self.waker is not None: 
    130128            self.removeReader(self.waker) 
    131129        result = selectables.values() 
    132         for fd in reads.keys(): 
     130        for fd in reads: 
    133131            self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 
    134         for fd in writes.keys(): 
     132        for fd in writes: 
    135133            self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 
    136134        reads.clear() 
    137135        writes.clear() 
    138136        selectables.clear() 
     137        reverse.clear() 
    139138        if self.waker is not None: 
    140139            self.addReader(self.waker) 
    141140        return result 
    142141 
    143142    def doKEvent(self, timeout, 
    144                  reads=reads, 
    145                  writes=writes, 
    146143                 selectables=selectables, 
    147144                 kq=kq, 
    148145                 log=log, 
    149                  OSError=OSError, 
    150                  EVFILT_READ=EVFILT_READ, 
    151                  EVFILT_WRITE=EVFILT_WRITE): 
     146                 OSError=OSError): 
    152147        """Poll the kqueue for new events.""" 
    153148        if timeout is None: 
    154             timeout = 1000 
     149            timeout = 1000000 
    155150        else: 
    156             timeout = int(timeout * 1000) # convert seconds to milliseconds 
     151            timeout = int(timeout * 1000000) # convert seconds to nanoseconds 
    157152 
    158153        try: 
    159             l = kq.kevent([], len(selectables), timeout) 
     154            l = kevent(kq, [], len(selectables), timeout) 
    160155        except OSError, e: 
    161156            if e[0] == errno.EINTR: 
    162157                return 
    163158            else: 
    164159                raise 
    165160        _drdw = self._doWriteOrRead 
     161 
    166162        for event in l: 
    167             why = None 
    168             fd, filter = event.ident, event.filter 
    169             selectable = selectables[fd] 
    170             log.callWithLogger(selectable, _drdw, selectable, fd, filter) 
     163            fd = event.ident 
     164            try: 
     165                selectable = selectables[fd] 
     166            except KeyError: 
     167                continue 
     168            log.callWithLogger(selectable, _drdw, selectable, fd, event) 
    171169 
    172     def _doWriteOrRead(self, selectable, fd, filter): 
    173         try: 
    174             if filter == EVFILT_READ: 
    175                 why = selectable.doRead() 
    176             if filter == EVFILT_WRITE: 
    177                 why = selectable.doWrite() 
    178             if not selectable.fileno() == fd: 
    179                 why = main.CONNECTION_LOST 
    180         except: 
    181             why = sys.exc_info()[1] 
    182             log.deferr() 
     170    def _doWriteOrRead(self, selectable, fd, event, CONNECTION_LOST=main.CONNECTION_LOST, EV_EOF=EV_EOF, EVFILT_READ=EVFILT_READ, EVFILT_WRITE=EVFILT_WRITE): 
     171        why = None 
     172        inRead = False 
     173        filter, flags, data, fflags = event.filter, event.flags, event.data, event.fflags 
     174        if flags & EV_EOF and data and fflags: 
     175            why = CONNECTION_LOST 
     176        else: 
     177            try: 
     178                if filter == EVFILT_READ: 
     179                    inRead = True 
     180                    why = selectable.doRead() 
     181                if filter == EVFILT_WRITE: 
     182                    inRead = False 
     183                    why = selectable.doWrite() 
     184                if not selectable.fileno() == fd: 
     185                    why = CONNECTION_LOST 
     186                    inRead = False 
     187            except: 
     188                why = sys.exc_info()[1] 
     189                log.deferr() 
    183190 
    184191        if why: 
    185             self.removeReader(selectable) 
    186             self.removeWriter(selectable) 
    187             selectable.connectionLost(failure.Failure(why)) 
     192            self._disconnectSelectable(selectable, why, inRead) 
    188193 
    189194    doIteration = doKEvent 
    190195 
     
    195200 
    196201 
    197202__all__ = ["KQueueReactor", "install"] 
     203