Ticket #1918: kqueue_1.1.diff
| File kqueue_1.1.diff, 12.9 KB (added by dialtone, 7 years ago) |
|---|
-
twisted/conch/test/test_recvline.py
11 11 from twisted.conch import recvline 12 12 13 13 from twisted.python import log, reflect, components 14 from twisted.internet import defer, error, task 14 from twisted.internet import defer, error, task, reactor 15 15 from twisted.trial import unittest 16 16 from twisted.cred import portal 17 17 from twisted.test.proto_helpers import StringTransport … … 664 664 class HistoricRecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin): 665 665 if stdio is None: 666 666 skip = "Terminal requirements missing, can't run historic recvline tests over stdio" 667 668 if reactor.__class__.__name__ == 'KQueueReactor': 669 _StdioMixin.skip = "KQueue doesn't support this yet" 670 -
twisted/test/test_process.py
29 29 from twisted.python import util, runtime 30 30 from twisted.python import procutils 31 31 32 kqreactor = False 33 kqosx = False 34 if reactor.__class__.__name__ == 'KQueueReactor': 35 kqueueskipmessage = "KQueue doesn't support this yet" 36 kqreactor = True 37 if runtime.platform.isMacOSX(): 38 kqosx = True 39 32 40 class TrivialProcessProtocol(protocol.ProcessProtocol): 33 41 def __init__(self, d): 34 42 self.deferred = d … … 371 379 reactor.callLater(1, self.close, 0) 372 380 reactor.callLater(2, self.close, 1) 373 381 return self._onClose() 382 if kqosx: 383 testClosePty.skip = kqueueskipmessage + " on Mac OSX" 374 384 375 385 def testKillPty(self): 376 386 if self.verbose: print "starting processes" … … 378 388 reactor.callLater(1, self.kill, 0) 379 389 reactor.callLater(2, self.kill, 1) 380 390 return self._onClose() 391 if kqreactor: 392 testKillPty.skip = kqueueskipmessage 381 393 382 394 class FDChecker(protocol.ProcessProtocol): 383 395 state = 0 … … 554 566 self.assertEquals(p.reason.value.signal, None) 555 567 d.addCallback(check) 556 568 return d 569 if kqreactor: 570 testNormalTermination.skip = kqueueskipmessage 557 571 558 572 def testAbnormalTermination(self): 559 573 if os.path.exists('/bin/false'): cmd = '/bin/false' … … 571 585 self.assertEquals(p.reason.value.signal, None) 572 586 d.addCallback(check) 573 587 return d 588 if kqreactor: 589 testAbnormalTermination.skip = kqueueskipmessage 574 590 575 591 def _testSignal(self, sig): 576 592 exe = sys.executable … … 665 681 "Error message from process_tty follows:\n\n%s\n\n" % p.outF.getvalue()) 666 682 return d.addCallback(processEnded) 667 683 668 669 684 def testBadArgs(self): 670 685 pyExe = sys.executable 671 686 pyArgs = [pyExe, "-u", "-c", "print 'hello'"] … … 870 885 if not interfaces.IReactorProcess(reactor, None): 871 886 ProcessTestCase.skip = skipMessage 872 887 ClosingPipes.skip = skipMessage 888 889 if kqreactor: 890 PosixProcessTestCasePTY.skip = kqueueskipmessage -
twisted/internet/kqreactor.py
10 10 | from twisted.internet import kqreactor 11 11 | kqreactor.install() 12 12 13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is14 available at: U{http://p eople.freebsd.org/~dwhite/PyKQueue/}13 This reactor only works on FreeBSD and requires PyKQueue 2, which is 14 available at: U{http://python-hpio.net/trac/wiki/PyKQueue} 15 15 16 16 API Stability: stable 17 17 18 18 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} 19 20 21 22 You're going to need to patch PyKqueue::23 24 =====================================================25 --- PyKQueue-1.3/kqsyscallmodule.c Sun Jan 28 21:59:50 200126 +++ PyKQueue-1.3/kqsyscallmodule.c.new Tue Jul 30 18:06:08 200227 @@ -137,7 +137,7 @@28 }29 30 statichere PyTypeObject KQEvent_Type = {31 - PyObject_HEAD_INIT(NULL)32 + PyObject_HEAD_INIT(&PyType_Type)33 0, // ob_size34 "KQEvent", // tp_name35 sizeof(KQEventObject), // tp_basicsize36 @@ -291,13 +291,14 @@37 38 /* Build timespec for timeout */39 totimespec.tv_sec = timeout / 1000;40 - totimespec.tv_nsec = (timeout % 1000) * 100000;41 + totimespec.tv_nsec = (timeout % 1000) * 1000000;42 43 // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec);44 45 /* Make the call */46 -47 + Py_BEGIN_ALLOW_THREADS48 gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec);49 + Py_END_ALLOW_THREADS50 51 /* Don't need the input event list anymore, so get rid of it */52 free (changelist);53 @@ -361,7 +362,7 @@54 statichere PyTypeObject KQueue_Type = {55 /* The ob_type field must be initialized in the module init function56 * to be portable to Windows without using C++. */57 - PyObject_HEAD_INIT(NULL)58 + PyObject_HEAD_INIT(&PyType_Type)59 0, /*ob_size*/60 "KQueue", /*tp_name*/61 sizeof(KQueueObject), /*tp_basicsize*/62 63 19 """ 64 20 65 21 # System imports 66 22 import errno, sys 67 23 68 24 # PyKQueue imports 69 from kq syscallimport *25 from kqueue import * 70 26 27 from zope.interface import implements 28 71 29 # Twisted imports 72 30 from twisted.python import log, failure 73 31 32 from twisted.internet.interfaces import IReactorFDSet 33 74 34 # Sibling imports 75 35 import main 76 36 import posixbase 77 37 try: 38 set() 39 except NameError: 40 from set import Set as set 41 78 42 # globals 79 reads = {}80 writes = {}43 reads = set() 44 writes = set() 81 45 selectables = {} 46 reverse = {} 82 47 kq = kqueue() 83 48 84 49 85 50 class KQueueReactor(posixbase.PosixReactorBase): 86 51 """A reactor that uses kqueue(2)/kevent(2).""" 52 implements(IReactorFDSet) 87 53 88 def _updateRegistration(self, *args): 89 kq.kevent([kevent(*args)], 0, 0) 54 def _updateRegistration(self, fd, filter, flags, OSError=OSError, kq=kq): 55 try: 56 kevent(kq, [Event(fd, filter, flags)], 0, 0) 57 except OSError, e: 58 if e[0] == errno.EBADF: 59 return 60 else: 61 raise 90 62 91 63 def addReader(self, reader): 92 64 """Add a FileDescriptor for notification of data available to read. 93 65 """ 94 66 fd = reader.fileno() 95 if not reads.has_key(fd): 67 reads.add(fd) 68 if not selectables.has_key(fd): 96 69 selectables[fd] = reader 97 reads[fd] = 1 98 self._updateRegistration(fd, EVFILT_READ, EV_ADD) 70 reverse[reader] = fd 71 self._updateRegistration(fd, EVFILT_READ, EV_ADD|EV_ENABLE) 72 self._updateRegistration(fd, EVFILT_WRITE, EV_ADD|EV_DISABLE) 73 else: 74 self._updateRegistration(fd, EVFILT_READ, EV_ENABLE) 99 75 100 def addWriter(self, writer , writes=writes, selectables=selectables):76 def addWriter(self, writer): 101 77 """Add a FileDescriptor for notification of data available to write. 102 78 """ 103 79 fd = writer.fileno() 104 if not writes.has_key(fd): 80 writes.add(fd) 81 if not selectables.has_key(fd): 105 82 selectables[fd] = writer 106 writes[fd] = 1 107 self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 83 reverse[writer] = fd 84 self._updateRegistration(fd, EVFILT_WRITE, EV_ADD|EV_ENABLE) 85 self._updateRegistration(fd, EVFILT_READ, EV_ADD|EV_DISABLE) 86 else: 87 self._updateRegistration(fd, EVFILT_WRITE, EV_ENABLE) 108 88 109 89 def removeReader(self, reader): 110 90 """Remove a Selectable for notification of data available to read. 111 91 """ 112 92 fd = reader.fileno() 113 if reads.has_key(fd): 114 del reads[fd] 115 if not writes.has_key(fd): del selectables[fd] 116 self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 93 if fd == -1: 94 try: 95 fd = reverse[reader] 96 except KeyError: 97 return 98 if fd in reads: 99 reads.discard(fd) 100 if fd not in writes: 101 del selectables[fd] 102 del reverse[reader] 103 self._updateRegistration(fd, EVFILT_READ, EV_DISABLE) 104 else: 105 self._updateRegistration(fd, EVFILT_READ, EV_DISABLE) 117 106 118 def removeWriter(self, writer , writes=writes):107 def removeWriter(self, writer): 119 108 """Remove a Selectable for notification of data available to write. 120 109 """ 121 110 fd = writer.fileno() 122 if writes.has_key(fd): 123 del writes[fd] 124 if not reads.has_key(fd): del selectables[fd] 125 self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 111 if fd == -1: 112 try: 113 fd = reverse[writer] 114 except KeyError: 115 return 116 if fd in writes: 117 writes.discard(fd) 118 if fd not in reads: 119 del selectables[fd] 120 del reverse[writer] 121 self._updateRegistration(fd, EVFILT_WRITE, EV_DISABLE) 122 else: 123 self._updateRegistration(fd, EVFILT_WRITE, EV_DISABLE) 126 124 127 125 def removeAll(self): 128 126 """Remove all selectables, and return a list of them.""" 129 127 if self.waker is not None: 130 128 self.removeReader(self.waker) 131 129 result = selectables.values() 132 for fd in reads .keys():130 for fd in reads: 133 131 self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 134 for fd in writes .keys():132 for fd in writes: 135 133 self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 136 134 reads.clear() 137 135 writes.clear() 138 136 selectables.clear() 137 reverse.clear() 139 138 if self.waker is not None: 140 139 self.addReader(self.waker) 141 140 return result 142 141 143 142 def doKEvent(self, timeout, 144 reads=reads,145 writes=writes,146 143 selectables=selectables, 147 144 kq=kq, 148 145 log=log, 149 OSError=OSError, 150 EVFILT_READ=EVFILT_READ, 151 EVFILT_WRITE=EVFILT_WRITE): 146 OSError=OSError): 152 147 """Poll the kqueue for new events.""" 153 148 if timeout is None: 154 timeout = 1000 149 timeout = 1000000 155 150 else: 156 timeout = int(timeout * 1000 ) # convert seconds to milliseconds151 timeout = int(timeout * 1000000) # convert seconds to nanoseconds 157 152 158 153 try: 159 l = k q.kevent([], len(selectables), timeout)154 l = kevent(kq, [], len(selectables), timeout) 160 155 except OSError, e: 161 156 if e[0] == errno.EINTR: 162 157 return 163 158 else: 164 159 raise 165 160 _drdw = self._doWriteOrRead 161 166 162 for event in l: 167 why = None 168 fd, filter = event.ident, event.filter 169 selectable = selectables[fd] 170 log.callWithLogger(selectable, _drdw, selectable, fd, filter) 163 fd = event.ident 164 try: 165 selectable = selectables[fd] 166 except KeyError: 167 continue 168 log.callWithLogger(selectable, _drdw, selectable, fd, event) 171 169 172 def _doWriteOrRead(self, selectable, fd, filter): 173 try: 174 if filter == EVFILT_READ: 175 why = selectable.doRead() 176 if filter == EVFILT_WRITE: 177 why = selectable.doWrite() 178 if not selectable.fileno() == fd: 179 why = main.CONNECTION_LOST 180 except: 181 why = sys.exc_info()[1] 182 log.deferr() 170 def _doWriteOrRead(self, selectable, fd, event, CONNECTION_LOST=main.CONNECTION_LOST, EV_EOF=EV_EOF, EVFILT_READ=EVFILT_READ, EVFILT_WRITE=EVFILT_WRITE): 171 why = None 172 inRead = False 173 filter, flags, data, fflags = event.filter, event.flags, event.data, event.fflags 174 if flags & EV_EOF and data and fflags: 175 why = CONNECTION_LOST 176 else: 177 try: 178 if filter == EVFILT_READ: 179 inRead = True 180 why = selectable.doRead() 181 if filter == EVFILT_WRITE: 182 inRead = False 183 why = selectable.doWrite() 184 if not selectable.fileno() == fd: 185 why = CONNECTION_LOST 186 inRead = False 187 except: 188 why = sys.exc_info()[1] 189 log.deferr() 183 190 184 191 if why: 185 self.removeReader(selectable) 186 self.removeWriter(selectable) 187 selectable.connectionLost(failure.Failure(why)) 192 self._disconnectSelectable(selectable, why, inRead) 188 193 189 194 doIteration = doKEvent 190 195 … … 195 200 196 201 197 202 __all__ = ["KQueueReactor", "install"] 203
