Ticket #1918: kqueue.diff

File kqueue.diff, 15.1 KB (added by dialtone, 10 years ago)

Working KQueue reactor patch

  • twisted/python/runtime.py

     
    7676            return imp.find_module('thread')[0] is None
    7777        except ImportError:
    7878            return False
     79   
     80    def usingKQueue(self):
     81        """Are we using KQeueueReactor?
     82        """
     83        try:
     84            from twisted.internet import kqreactor, reactor
     85            if isinstance(reactor, kqreactor.KQueueReactor):
     86                return True
     87        except ImportError:
     88            return False
    7989
    8090platform = Platform()
    8191platformType = platform.getType()
  • twisted/conch/test/test_manhole.py

     
    1010from twisted.internet import error, defer
    1111from twisted.conch.test.test_recvline import _TelnetMixin, _SSHMixin, _StdioMixin, stdio, ssh
    1212from twisted.conch import manhole
     13from twisted.python import runtime
    1314
    1415def determineDefaultFunctionName():
    1516    """
     
    2223        return traceback.extract_stack()[0][2]
    2324defaultFunctionName = determineDefaultFunctionName()
    2425
     26kqreactor = False
     27kqosx = False
     28if runtime.platform.usingKQueue():
     29    kqueueskipmessage = "KQueue doesn't support this yet"
     30    kqreactor = True
     31    if runtime.platform.isMacOSX():
     32        kqosx = True
    2533
    2634class WriterTestCase(unittest.TestCase):
    2735    def testInteger(self):
     
    272280        skip = "Terminal requirements missing, can't run manhole tests over stdio"
    273281    else:
    274282        serverProtocol = stdio.ConsoleManhole
     283if kqosx:
     284    ManholeLoopbackMixin.skip = kqueueskipmessage
     285 No newline at end of file
  • twisted/conch/test/test_recvline.py

     
    1010from twisted.conch.insults import insults
    1111from twisted.conch import recvline
    1212
    13 from twisted.python import log, reflect, components
     13from twisted.python import log, reflect, components, runtime
    1414from twisted.internet import defer, error, task
    1515from twisted.trial import unittest
    1616from twisted.cred import portal
    1717from twisted.test.proto_helpers import StringTransport
    1818
     19kqreactor = False
     20kqosx = False
     21if runtime.platform.usingKQueue():
     22    kqueueskipmessage = "KQueue doesn't support this yet"
     23    kqreactor = True
     24    if runtime.platform.isMacOSX():
     25        kqosx = True
     26
    1927class Arrows(unittest.TestCase):
    2028    def setUp(self):
    2129        self.underlyingTransport = StringTransport()
     
    664672class HistoricRecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin):
    665673    if stdio is None:
    666674        skip = "Terminal requirements missing, can't run historic recvline tests over stdio"
     675if kqosx:
     676    _StdioMixin.skip = kqueueskipmessage + " on OSX"
     677 No newline at end of file
  • twisted/test/test_process.py

     
    2929from twisted.python import util, runtime
    3030from twisted.python import procutils
    3131
     32kqreactor = False
     33kqosx = False
     34if runtime.platform.usingKQueue():
     35    kqueueskipmessage = "KQueue doesn't support this yet"
     36    kqreactor = True
     37    if runtime.platform.isMacOSX():
     38        kqosx = True
     39
    3240class TrivialProcessProtocol(protocol.ProcessProtocol):
    3341    def __init__(self, d):
    3442        self.deferred = d
     
    371379        reactor.callLater(1, self.close, 0)
    372380        reactor.callLater(2, self.close, 1)
    373381        return self._onClose()
     382    if kqosx:
     383        testClosePty.skip = kqueueskipmessage + " on Mac OSX"
    374384
    375385    def testKillPty(self):
    376386        if self.verbose: print "starting processes"
     
    378388        reactor.callLater(1, self.kill, 0)
    379389        reactor.callLater(2, self.kill, 1)
    380390        return self._onClose()
     391    if kqreactor:
     392        testKillPty.skip = kqueueskipmessage
    381393
    382394class FDChecker(protocol.ProcessProtocol):
    383395    state = 0
     
    554566            self.assertEquals(p.reason.value.signal, None)
    555567        d.addCallback(check)
    556568        return d
     569    if kqreactor:
     570        testNormalTermination.skip = kqueueskipmessage
    557571
    558572    def testAbnormalTermination(self):
    559573        if os.path.exists('/bin/false'): cmd = '/bin/false'
     
    571585            self.assertEquals(p.reason.value.signal, None)
    572586        d.addCallback(check)
    573587        return d
     588    if kqreactor:
     589        testAbnormalTermination.skip = kqueueskipmessage
    574590
    575591    def _testSignal(self, sig):
    576592        exe = sys.executable
     
    665681                "Error message from process_tty follows:\n\n%s\n\n" % p.outF.getvalue())
    666682        return d.addCallback(processEnded)
    667683
    668 
    669684    def testBadArgs(self):
    670685        pyExe = sys.executable
    671686        pyArgs = [pyExe, "-u", "-c", "print 'hello'"]
     
    870885if not interfaces.IReactorProcess(reactor, None):
    871886    ProcessTestCase.skip = skipMessage
    872887    ClosingPipes.skip = skipMessage
     888
     889if kqreactor:
     890    PosixProcessTestCasePTY.skip = kqueueskipmessage
  • twisted/internet/kqreactor.py

     
    1010    | from twisted.internet import kqreactor
    1111    | kqreactor.install()
    1212
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/}
     13This reactor only works on FreeBSD and requires PyKQueue 2, which is
     14available at:  U{http://python-hpio.net/trac/wiki/PyKQueue}
    1515
    1616API Stability: stable
    1717
    1818Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
    19 
    20 
    21 
    22 You're going to need to patch PyKqueue::
    23 
    24     =====================================================
    25     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001
    26     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002
    27     @@ -137,7 +137,7 @@
    28      }
    29      
    30      statichere PyTypeObject KQEvent_Type = {
    31     -  PyObject_HEAD_INIT(NULL)
    32     +  PyObject_HEAD_INIT(&PyType_Type)
    33        0,                             // ob_size
    34        "KQEvent",                     // tp_name
    35        sizeof(KQEventObject),         // tp_basicsize
    36     @@ -291,13 +291,14 @@
    37      
    38        /* Build timespec for timeout */
    39        totimespec.tv_sec = timeout / 1000;
    40     -  totimespec.tv_nsec = (timeout % 1000) * 100000;
    41     +  totimespec.tv_nsec = (timeout % 1000) * 1000000;
    42      
    43        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec);
    44      
    45        /* Make the call */
    46     -
    47     +  Py_BEGIN_ALLOW_THREADS
    48        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec);
    49     +  Py_END_ALLOW_THREADS
    50      
    51        /* Don't need the input event list anymore, so get rid of it */
    52        free (changelist);
    53     @@ -361,7 +362,7 @@
    54      statichere PyTypeObject KQueue_Type = {
    55             /* The ob_type field must be initialized in the module init function
    56              * to be portable to Windows without using C++. */
    57     -   PyObject_HEAD_INIT(NULL)
    58     +   PyObject_HEAD_INIT(&PyType_Type)
    59             0,                  /*ob_size*/
    60             "KQueue",                   /*tp_name*/
    61             sizeof(KQueueObject),       /*tp_basicsize*/
    62 
    6319"""
    6420
    6521# System imports
    6622import errno, sys
    6723
    6824# PyKQueue imports
    69 from kqsyscall import *
     25from kqueue import *
    7026
     27from zope.interface import implements
     28
    7129# Twisted imports
    7230from twisted.python import log, failure
    7331
     32from twisted.internet.interfaces import IReactorFDSet
     33
    7434# Sibling imports
    7535import main
    7636import posixbase
    77 
     37try:
     38    set()
     39except NameError:
     40    from set import Set as set
     41                               
    7842# globals
    79 reads = {}
    80 writes = {}
     43reads = set()
     44writes = set()
    8145selectables = {}
     46reverse = {}
    8247kq = kqueue()
    8348
    8449
    8550class KQueueReactor(posixbase.PosixReactorBase):
    8651    """A reactor that uses kqueue(2)/kevent(2)."""
     52    implements(IReactorFDSet)
    8753
    88     def _updateRegistration(self, *args):
    89         kq.kevent([kevent(*args)], 0, 0)
     54    def _updateRegistration(self, fd, filter, flags, OSError=OSError, kq=kq):
     55        try:
     56            kevent(kq, [Event(fd, filter, flags)], 0, 0)
     57        except OSError, e:
     58            if e[0] == errno.EBADF:
     59                return
     60            else:
     61                raise
    9062
    9163    def addReader(self, reader):
    9264        """Add a FileDescriptor for notification of data available to read.
    9365        """
    9466        fd = reader.fileno()
    95         if not reads.has_key(fd):
     67        reads.add(fd)
     68        if not selectables.has_key(fd):
    9669            selectables[fd] = reader
    97             reads[fd] = 1
    98             self._updateRegistration(fd, EVFILT_READ, EV_ADD)
     70            reverse[reader] = fd
     71            self._updateRegistration(fd, EVFILT_READ, EV_ADD|EV_ENABLE)
     72            self._updateRegistration(fd, EVFILT_WRITE, EV_ADD|EV_DISABLE)
     73        else:
     74            self._updateRegistration(fd, EVFILT_READ, EV_ENABLE)
    9975
    100     def addWriter(self, writer, writes=writes, selectables=selectables):
     76    def addWriter(self, writer):
    10177        """Add a FileDescriptor for notification of data available to write.
    10278        """
    10379        fd = writer.fileno()
    104         if not writes.has_key(fd):
     80        writes.add(fd)
     81        if not selectables.has_key(fd):
    10582            selectables[fd] = writer
    106             writes[fd] = 1
    107             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD)
     83            reverse[writer] = fd
     84            self._updateRegistration(fd, EVFILT_WRITE, EV_ADD|EV_ENABLE)
     85            self._updateRegistration(fd, EVFILT_READ, EV_ADD|EV_DISABLE)
     86        else:
     87            self._updateRegistration(fd, EVFILT_WRITE, EV_ENABLE)
    10888
    10989    def removeReader(self, reader):
    11090        """Remove a Selectable for notification of data available to read.
    11191        """
    11292        fd = reader.fileno()
    113         if reads.has_key(fd):
    114             del reads[fd]
    115             if not writes.has_key(fd): del selectables[fd]
    116             self._updateRegistration(fd, EVFILT_READ, EV_DELETE)
     93        if fd == -1:
     94            try:
     95                fd = reverse[reader]
     96            except KeyError:
     97                return
     98        if fd in reads:
     99            reads.discard(fd)
     100            if fd not in writes:
     101                del selectables[fd]
     102                del reverse[reader]
     103                self._updateRegistration(fd, EVFILT_READ, EV_DISABLE)
     104            else:
     105                self._updateRegistration(fd, EVFILT_READ, EV_DISABLE)
    117106
    118     def removeWriter(self, writer, writes=writes):
     107    def removeWriter(self, writer):
    119108        """Remove a Selectable for notification of data available to write.
    120109        """
    121110        fd = writer.fileno()
    122         if writes.has_key(fd):
    123             del writes[fd]
    124             if not reads.has_key(fd): del selectables[fd]
    125             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE)
     111        if fd == -1:
     112            try:
     113                fd = reverse[writer]
     114            except KeyError:
     115                return
     116        if fd in writes:
     117            writes.discard(fd)
     118            if fd not in reads:
     119                del selectables[fd]
     120                del reverse[writer]
     121                self._updateRegistration(fd, EVFILT_WRITE, EV_DISABLE)
     122            else:
     123                self._updateRegistration(fd, EVFILT_WRITE, EV_DISABLE)
    126124
    127125    def removeAll(self):
    128126        """Remove all selectables, and return a list of them."""
    129127        if self.waker is not None:
    130128            self.removeReader(self.waker)
    131129        result = selectables.values()
    132         for fd in reads.keys():
     130        for fd in reads:
    133131            self._updateRegistration(fd, EVFILT_READ, EV_DELETE)
    134         for fd in writes.keys():
     132        for fd in writes:
    135133            self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE)
    136134        reads.clear()
    137135        writes.clear()
    138136        selectables.clear()
     137        reverse.clear()
    139138        if self.waker is not None:
    140139            self.addReader(self.waker)
    141140        return result
    142141
    143142    def doKEvent(self, timeout,
    144                  reads=reads,
    145                  writes=writes,
    146143                 selectables=selectables,
    147144                 kq=kq,
    148145                 log=log,
    149                  OSError=OSError,
    150                  EVFILT_READ=EVFILT_READ,
    151                  EVFILT_WRITE=EVFILT_WRITE):
     146                 OSError=OSError):
    152147        """Poll the kqueue for new events."""
    153148        if timeout is None:
    154             timeout = 1000
     149            timeout = 1000000
    155150        else:
    156             timeout = int(timeout * 1000) # convert seconds to milliseconds
     151            timeout = int(timeout * 1000000) # convert seconds to nanoseconds
    157152
    158153        try:
    159             l = kq.kevent([], len(selectables), timeout)
     154            l = kevent(kq, [], len(selectables), timeout)
    160155        except OSError, e:
    161156            if e[0] == errno.EINTR:
    162157                return
    163158            else:
    164159                raise
    165160        _drdw = self._doWriteOrRead
     161
    166162        for event in l:
    167             why = None
    168             fd, filter = event.ident, event.filter
    169             selectable = selectables[fd]
    170             log.callWithLogger(selectable, _drdw, selectable, fd, filter)
     163            fd = event.ident
     164            try:
     165                selectable = selectables[fd]
     166            except KeyError:
     167                continue
     168            log.callWithLogger(selectable, _drdw, selectable, fd, event)
    171169
    172     def _doWriteOrRead(self, selectable, fd, filter):
    173         try:
    174             if filter == EVFILT_READ:
    175                 why = selectable.doRead()
    176             if filter == EVFILT_WRITE:
    177                 why = selectable.doWrite()
    178             if not selectable.fileno() == fd:
    179                 why = main.CONNECTION_LOST
    180         except:
    181             why = sys.exc_info()[1]
    182             log.deferr()
     170    def _doWriteOrRead(self, selectable, fd, event, CONNECTION_LOST=main.CONNECTION_LOST, EV_EOF=EV_EOF, EVFILT_READ=EVFILT_READ, EVFILT_WRITE=EVFILT_WRITE):
     171        why = None
     172        inRead = False
     173        filter, flags, data, fflags = event.filter, event.flags, event.data, event.fflags
     174        if flags & EV_EOF and data and fflags:
     175            why = CONNECTION_LOST
     176        else:
     177            try:
     178                if filter == EVFILT_READ:
     179                    inRead = True
     180                    why = selectable.doRead()
     181                if filter == EVFILT_WRITE:
     182                    inRead = False
     183                    why = selectable.doWrite()
     184                if not selectable.fileno() == fd:
     185                    why = CONNECTION_LOST
     186                    inRead = False
     187            except:
     188                why = sys.exc_info()[1]
     189                log.deferr()
    183190
    184191        if why:
    185             self.removeReader(selectable)
    186             self.removeWriter(selectable)
    187             selectable.connectionLost(failure.Failure(why))
     192            self._disconnectSelectable(selectable, why, inRead)
    188193
    189194    doIteration = doKEvent
    190195
     
    195200
    196201
    197202__all__ = ["KQueueReactor", "install"]
     203