Ticket #1918: kqueue_1.1.diff

File kqueue_1.1.diff, 12.9 KB (added by dialtone, 10 years ago)

New version

  • twisted/conch/test/test_recvline.py

     
    1111from twisted.conch import recvline
    1212
    1313from twisted.python import log, reflect, components
    14 from twisted.internet import defer, error, task
     14from twisted.internet import defer, error, task, reactor
    1515from twisted.trial import unittest
    1616from twisted.cred import portal
    1717from twisted.test.proto_helpers import StringTransport
     
    664664class HistoricRecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin):
    665665    if stdio is None:
    666666        skip = "Terminal requirements missing, can't run historic recvline tests over stdio"
     667
     668if reactor.__class__.__name__ == 'KQueueReactor':
     669    _StdioMixin.skip = "KQueue doesn't support this yet"
     670
  • twisted/test/test_process.py

     
    2929from twisted.python import util, runtime
    3030from twisted.python import procutils
    3131
     32kqreactor = False
     33kqosx = False
     34if reactor.__class__.__name__ == 'KQueueReactor':
     35    kqueueskipmessage = "KQueue doesn't support this yet"
     36    kqreactor = True
     37    if runtime.platform.isMacOSX():
     38        kqosx = True
     39
    3240class TrivialProcessProtocol(protocol.ProcessProtocol):
    3341    def __init__(self, d):
    3442        self.deferred = d
     
    371379        reactor.callLater(1, self.close, 0)
    372380        reactor.callLater(2, self.close, 1)
    373381        return self._onClose()
     382    if kqosx:
     383        testClosePty.skip = kqueueskipmessage + " on Mac OSX"
    374384
    375385    def testKillPty(self):
    376386        if self.verbose: print "starting processes"
     
    378388        reactor.callLater(1, self.kill, 0)
    379389        reactor.callLater(2, self.kill, 1)
    380390        return self._onClose()
     391    if kqreactor:
     392        testKillPty.skip = kqueueskipmessage
    381393
    382394class FDChecker(protocol.ProcessProtocol):
    383395    state = 0
     
    554566            self.assertEquals(p.reason.value.signal, None)
    555567        d.addCallback(check)
    556568        return d
     569    if kqreactor:
     570        testNormalTermination.skip = kqueueskipmessage
    557571
    558572    def testAbnormalTermination(self):
    559573        if os.path.exists('/bin/false'): cmd = '/bin/false'
     
    571585            self.assertEquals(p.reason.value.signal, None)
    572586        d.addCallback(check)
    573587        return d
     588    if kqreactor:
     589        testAbnormalTermination.skip = kqueueskipmessage
    574590
    575591    def _testSignal(self, sig):
    576592        exe = sys.executable
     
    665681                "Error message from process_tty follows:\n\n%s\n\n" % p.outF.getvalue())
    666682        return d.addCallback(processEnded)
    667683
    668 
    669684    def testBadArgs(self):
    670685        pyExe = sys.executable
    671686        pyArgs = [pyExe, "-u", "-c", "print 'hello'"]
     
    870885if not interfaces.IReactorProcess(reactor, None):
    871886    ProcessTestCase.skip = skipMessage
    872887    ClosingPipes.skip = skipMessage
     888
     889if kqreactor:
     890    PosixProcessTestCasePTY.skip = kqueueskipmessage
  • twisted/internet/kqreactor.py

     
    1010    | from twisted.internet import kqreactor
    1111    | kqreactor.install()
    1212
    13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is
    14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/}
     13This reactor only works on FreeBSD and requires PyKQueue 2, which is
     14available at:  U{http://python-hpio.net/trac/wiki/PyKQueue}
    1515
    1616API Stability: stable
    1717
    1818Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
    19 
    20 
    21 
    22 You're going to need to patch PyKqueue::
    23 
    24     =====================================================
    25     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001
    26     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002
    27     @@ -137,7 +137,7 @@
    28      }
    29      
    30      statichere PyTypeObject KQEvent_Type = {
    31     -  PyObject_HEAD_INIT(NULL)
    32     +  PyObject_HEAD_INIT(&PyType_Type)
    33        0,                             // ob_size
    34        "KQEvent",                     // tp_name
    35        sizeof(KQEventObject),         // tp_basicsize
    36     @@ -291,13 +291,14 @@
    37      
    38        /* Build timespec for timeout */
    39        totimespec.tv_sec = timeout / 1000;
    40     -  totimespec.tv_nsec = (timeout % 1000) * 100000;
    41     +  totimespec.tv_nsec = (timeout % 1000) * 1000000;
    42      
    43        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec);
    44      
    45        /* Make the call */
    46     -
    47     +  Py_BEGIN_ALLOW_THREADS
    48        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec);
    49     +  Py_END_ALLOW_THREADS
    50      
    51        /* Don't need the input event list anymore, so get rid of it */
    52        free (changelist);
    53     @@ -361,7 +362,7 @@
    54      statichere PyTypeObject KQueue_Type = {
    55             /* The ob_type field must be initialized in the module init function
    56              * to be portable to Windows without using C++. */
    57     -   PyObject_HEAD_INIT(NULL)
    58     +   PyObject_HEAD_INIT(&PyType_Type)
    59             0,                  /*ob_size*/
    60             "KQueue",                   /*tp_name*/
    61             sizeof(KQueueObject),       /*tp_basicsize*/
    62 
    6319"""
    6420
    6521# System imports
    6622import errno, sys
    6723
    6824# PyKQueue imports
    69 from kqsyscall import *
     25from kqueue import *
    7026
     27from zope.interface import implements
     28
    7129# Twisted imports
    7230from twisted.python import log, failure
    7331
     32from twisted.internet.interfaces import IReactorFDSet
     33
    7434# Sibling imports
    7535import main
    7636import posixbase
    77 
     37try:
     38    set()
     39except NameError:
     40    from set import Set as set
     41                               
    7842# globals
    79 reads = {}
    80 writes = {}
     43reads = set()
     44writes = set()
    8145selectables = {}
     46reverse = {}
    8247kq = kqueue()
    8348
    8449
    8550class KQueueReactor(posixbase.PosixReactorBase):
    8651    """A reactor that uses kqueue(2)/kevent(2)."""
     52    implements(IReactorFDSet)
    8753
    88     def _updateRegistration(self, *args):
    89         kq.kevent([kevent(*args)], 0, 0)
     54    def _updateRegistration(self, fd, filter, flags, OSError=OSError, kq=kq):
     55        try:
     56            kevent(kq, [Event(fd, filter, flags)], 0, 0)
     57        except OSError, e:
     58            if e[0] == errno.EBADF:
     59                return
     60            else:
     61                raise
    9062
    9163    def addReader(self, reader):
    9264        """Add a FileDescriptor for notification of data available to read.
    9365        """
    9466        fd = reader.fileno()
    95         if not reads.has_key(fd):
     67        reads.add(fd)
     68        if not selectables.has_key(fd):
    9669            selectables[fd] = reader
    97             reads[fd] = 1
    98             self._updateRegistration(fd, EVFILT_READ, EV_ADD)
     70            reverse[reader] = fd
     71            self._updateRegistration(fd, EVFILT_READ, EV_ADD|EV_ENABLE)
     72            self._updateRegistration(fd, EVFILT_WRITE, EV_ADD|EV_DISABLE)
     73        else:
     74            self._updateRegistration(fd, EVFILT_READ, EV_ENABLE)
    9975
    100     def addWriter(self, writer, writes=writes, selectables=selectables):
     76    def addWriter(self, writer):
    10177        """Add a FileDescriptor for notification of data available to write.
    10278        """
    10379        fd = writer.fileno()
    104         if not writes.has_key(fd):
     80        writes.add(fd)
     81        if not selectables.has_key(fd):
    10582            selectables[fd] = writer
    106             writes[fd] = 1
    107             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD)
     83            reverse[writer] = fd
     84            self._updateRegistration(fd, EVFILT_WRITE, EV_ADD|EV_ENABLE)
     85            self._updateRegistration(fd, EVFILT_READ, EV_ADD|EV_DISABLE)
     86        else:
     87            self._updateRegistration(fd, EVFILT_WRITE, EV_ENABLE)
    10888
    10989    def removeReader(self, reader):
    11090        """Remove a Selectable for notification of data available to read.
    11191        """
    11292        fd = reader.fileno()
    113         if reads.has_key(fd):
    114             del reads[fd]
    115             if not writes.has_key(fd): del selectables[fd]
    116             self._updateRegistration(fd, EVFILT_READ, EV_DELETE)
     93        if fd == -1:
     94            try:
     95                fd = reverse[reader]
     96            except KeyError:
     97                return
     98        if fd in reads:
     99            reads.discard(fd)
     100            if fd not in writes:
     101                del selectables[fd]
     102                del reverse[reader]
     103                self._updateRegistration(fd, EVFILT_READ, EV_DISABLE)
     104            else:
     105                self._updateRegistration(fd, EVFILT_READ, EV_DISABLE)
    117106
    118     def removeWriter(self, writer, writes=writes):
     107    def removeWriter(self, writer):
    119108        """Remove a Selectable for notification of data available to write.
    120109        """
    121110        fd = writer.fileno()
    122         if writes.has_key(fd):
    123             del writes[fd]
    124             if not reads.has_key(fd): del selectables[fd]
    125             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE)
     111        if fd == -1:
     112            try:
     113                fd = reverse[writer]
     114            except KeyError:
     115                return
     116        if fd in writes:
     117            writes.discard(fd)
     118            if fd not in reads:
     119                del selectables[fd]
     120                del reverse[writer]
     121                self._updateRegistration(fd, EVFILT_WRITE, EV_DISABLE)
     122            else:
     123                self._updateRegistration(fd, EVFILT_WRITE, EV_DISABLE)
    126124
    127125    def removeAll(self):
    128126        """Remove all selectables, and return a list of them."""
    129127        if self.waker is not None:
    130128            self.removeReader(self.waker)
    131129        result = selectables.values()
    132         for fd in reads.keys():
     130        for fd in reads:
    133131            self._updateRegistration(fd, EVFILT_READ, EV_DELETE)
    134         for fd in writes.keys():
     132        for fd in writes:
    135133            self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE)
    136134        reads.clear()
    137135        writes.clear()
    138136        selectables.clear()
     137        reverse.clear()
    139138        if self.waker is not None:
    140139            self.addReader(self.waker)
    141140        return result
    142141
    143142    def doKEvent(self, timeout,
    144                  reads=reads,
    145                  writes=writes,
    146143                 selectables=selectables,
    147144                 kq=kq,
    148145                 log=log,
    149                  OSError=OSError,
    150                  EVFILT_READ=EVFILT_READ,
    151                  EVFILT_WRITE=EVFILT_WRITE):
     146                 OSError=OSError):
    152147        """Poll the kqueue for new events."""
    153148        if timeout is None:
    154             timeout = 1000
     149            timeout = 1000000
    155150        else:
    156             timeout = int(timeout * 1000) # convert seconds to milliseconds
     151            timeout = int(timeout * 1000000) # convert seconds to nanoseconds
    157152
    158153        try:
    159             l = kq.kevent([], len(selectables), timeout)
     154            l = kevent(kq, [], len(selectables), timeout)
    160155        except OSError, e:
    161156            if e[0] == errno.EINTR:
    162157                return
    163158            else:
    164159                raise
    165160        _drdw = self._doWriteOrRead
     161
    166162        for event in l:
    167             why = None
    168             fd, filter = event.ident, event.filter
    169             selectable = selectables[fd]
    170             log.callWithLogger(selectable, _drdw, selectable, fd, filter)
     163            fd = event.ident
     164            try:
     165                selectable = selectables[fd]
     166            except KeyError:
     167                continue
     168            log.callWithLogger(selectable, _drdw, selectable, fd, event)
    171169
    172     def _doWriteOrRead(self, selectable, fd, filter):
    173         try:
    174             if filter == EVFILT_READ:
    175                 why = selectable.doRead()
    176             if filter == EVFILT_WRITE:
    177                 why = selectable.doWrite()
    178             if not selectable.fileno() == fd:
    179                 why = main.CONNECTION_LOST
    180         except:
    181             why = sys.exc_info()[1]
    182             log.deferr()
     170    def _doWriteOrRead(self, selectable, fd, event, CONNECTION_LOST=main.CONNECTION_LOST, EV_EOF=EV_EOF, EVFILT_READ=EVFILT_READ, EVFILT_WRITE=EVFILT_WRITE):
     171        why = None
     172        inRead = False
     173        filter, flags, data, fflags = event.filter, event.flags, event.data, event.fflags
     174        if flags & EV_EOF and data and fflags:
     175            why = CONNECTION_LOST
     176        else:
     177            try:
     178                if filter == EVFILT_READ:
     179                    inRead = True
     180                    why = selectable.doRead()
     181                if filter == EVFILT_WRITE:
     182                    inRead = False
     183                    why = selectable.doWrite()
     184                if not selectable.fileno() == fd:
     185                    why = CONNECTION_LOST
     186                    inRead = False
     187            except:
     188                why = sys.exc_info()[1]
     189                log.deferr()
    183190
    184191        if why:
    185             self.removeReader(selectable)
    186             self.removeWriter(selectable)
    187             selectable.connectionLost(failure.Failure(why))
     192            self._disconnectSelectable(selectable, why, inRead)
    188193
    189194    doIteration = doKEvent
    190195
     
    195200
    196201
    197202__all__ = ["KQueueReactor", "install"]
     203