Ticket #1918: kqueue.patch
| File kqueue.patch, 10.4 KB (added by oberstet, 19 months ago) |
|---|
-
twisted/internet/test/test_fdset.py
286 286 reactor = self.buildReactor() 287 287 288 288 name = reactor.__class__.__name__ 289 if name in ('EPollReactor', ' CFReactor'):289 if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): 290 290 # Closing a file descriptor immediately removes it from the epoll 291 291 # set without generating a notification. That means epollreactor 292 292 # will not call any methods on Victim after the close, so there's -
twisted/internet/kqreactor.py
10 10 | from twisted.internet import kqreactor 11 11 | kqreactor.install() 12 12 13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is 14 available at: U{http://people.freebsd.org/~dwhite/PyKQueue/} 15 16 17 18 You're going to need to patch PyKqueue:: 19 20 ===================================================== 21 --- PyKQueue-1.3/kqsyscallmodule.c Sun Jan 28 21:59:50 2001 22 +++ PyKQueue-1.3/kqsyscallmodule.c.new Tue Jul 30 18:06:08 2002 23 @@ -137,7 +137,7 @@ 24 } 25 26 statichere PyTypeObject KQEvent_Type = { 27 - PyObject_HEAD_INIT(NULL) 28 + PyObject_HEAD_INIT(&PyType_Type) 29 0, // ob_size 30 "KQEvent", // tp_name 31 sizeof(KQEventObject), // tp_basicsize 32 @@ -291,13 +291,14 @@ 33 34 /* Build timespec for timeout */ 35 totimespec.tv_sec = timeout / 1000; 36 - totimespec.tv_nsec = (timeout % 1000) * 100000; 37 + totimespec.tv_nsec = (timeout % 1000) * 1000000; 38 39 // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 40 41 /* Make the call */ 42 - 43 + Py_BEGIN_ALLOW_THREADS 44 gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 45 + Py_END_ALLOW_THREADS 46 47 /* Don't need the input event list anymore, so get rid of it */ 48 free (changelist); 49 @@ -361,7 +362,7 @@ 50 statichere PyTypeObject KQueue_Type = { 51 /* The ob_type field must be initialized in the module init function 52 * to be portable to Windows without using C++. */ 53 - PyObject_HEAD_INIT(NULL) 54 + PyObject_HEAD_INIT(&PyType_Type) 55 0, /*ob_size*/ 56 "KQueue", /*tp_name*/ 57 sizeof(KQueueObject), /*tp_basicsize*/ 58 13 This implementation depends on Python 2.6 or higher which has kqueue support 14 built in the select module. 59 15 """ 60 16 61 17 import errno, sys 62 18 63 19 from zope.interface import implements 64 20 65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD66 from kqsyscall import kqueue, kevent21 from select import kqueue, kevent 22 from select import KQ_FILTER_READ, KQ_FILTER_WRITE, KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF 67 23 68 24 from twisted.internet.interfaces import IReactorFDSet 69 25 … … 73 29 74 30 class KQueueReactor(posixbase.PosixReactorBase): 75 31 """ 76 A reactor that uses kqueue(2)/kevent(2). 32 A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher 33 which has built in support for kqueue in the select module. 77 34 78 35 @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. 79 36 … … 97 54 """ 98 55 implements(IReactorFDSet) 99 56 57 100 58 def __init__(self): 101 59 """ 102 60 Initialize kqueue object, file descriptor tracking dictionaries, and the … … 109 67 posixbase.PosixReactorBase.__init__(self) 110 68 111 69 112 def _updateRegistration(self, *args): 113 self._kq.kevent([kevent(*args)], 0, 0) 70 def _updateRegistration(self, fd, filter, op): 71 """ 72 Private method for changing kqueue registration. 73 """ 74 self._kq.control([kevent(fd, filter, op)], 0, 0) 114 75 76 115 77 def addReader(self, reader): 116 """Add a FileDescriptor for notification of data available to read.117 78 """ 79 Add a FileDescriptor for notification of data available to read. 80 """ 118 81 fd = reader.fileno() 119 82 if fd not in self._reads: 120 self._selectables[fd] = reader 121 self._reads[fd] = 1 122 self._updateRegistration(fd, EVFILT_READ, EV_ADD) 83 try: 84 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 85 except OSError, e: 86 pass 87 finally: 88 self._selectables[fd] = reader 89 self._reads[fd] = 1 123 90 91 124 92 def addWriter(self, writer): 125 """Add a FileDescriptor for notification of data available to write.126 93 """ 94 Add a FileDescriptor for notification of data available to write. 95 """ 127 96 fd = writer.fileno() 128 97 if fd not in self._writes: 129 self._selectables[fd] = writer 130 self._writes[fd] = 1 131 self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 98 try: 99 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 100 except OSError, e: 101 pass 102 finally: 103 self._selectables[fd] = writer 104 self._writes[fd] = 1 132 105 106 133 107 def removeReader(self, reader): 134 """Remove a Selectable for notification of data available to read.135 108 """ 136 fd = reader.fileno() 109 Remove a Selectable for notification of data available to read. 110 """ 111 wasLost = False 112 try: 113 fd = reader.fileno() 114 except: 115 fd = -1 116 if fd == -1: 117 for fd, fdes in self._selectables.items(): 118 if reader is fdes: 119 wasLost = True 120 break 121 else: 122 return 137 123 if fd in self._reads: 138 124 del self._reads[fd] 139 125 if fd not in self._writes: 140 126 del self._selectables[fd] 141 self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 127 if not wasLost: 128 try: 129 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) 130 except OSError, e: 131 pass 142 132 133 143 134 def removeWriter(self, writer): 144 """Remove a Selectable for notification of data available to write.145 135 """ 146 fd = writer.fileno() 136 Remove a Selectable for notification of data available to write. 137 """ 138 wasLost = False 139 try: 140 fd = writer.fileno() 141 except: 142 fd = -1 143 if fd == -1: 144 for fd, fdes in self._selectables.items(): 145 if writer is fdes: 146 wasLost = True 147 break 148 else: 149 return 147 150 if fd in self._writes: 148 151 del self._writes[fd] 149 152 if fd not in self._reads: 150 153 del self._selectables[fd] 151 self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 154 if not wasLost: 155 try: 156 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) 157 except OSError, e: 158 pass 152 159 160 153 161 def removeAll(self): 154 162 """ 155 163 Remove all selectables, and return a list of them. … … 168 176 169 177 170 178 def doKEvent(self, timeout): 171 """Poll the kqueue for new events.""" 179 """ 180 Poll the kqueue for new events. 181 """ 172 182 if timeout is None: 173 timeout = 1000 174 else: 175 timeout = int(timeout * 1000) # convert seconds to milliseconds 183 timeout = 1 176 184 177 185 try: 178 l = self._kq. kevent([], len(self._selectables), timeout)186 l = self._kq.control([], len(self._selectables), timeout) 179 187 except OSError, e: 180 188 if e[0] == errno.EINTR: 181 189 return 182 190 else: 183 191 raise 192 184 193 _drdw = self._doWriteOrRead 185 194 for event in l: 186 why = None 187 fd, filter = event.ident, event.filter 195 fd = event.ident 188 196 try: 189 197 selectable = self._selectables[fd] 190 198 except KeyError: 191 199 # Handles the infrequent case where one selectable's 192 200 # handler disconnects another. 193 201 continue 194 log.callWithLogger(selectable, _drdw, selectable, fd, filter) 202 else: 203 log.callWithLogger(selectable, _drdw, selectable, fd, event) 195 204 196 def _doWriteOrRead(self, selectable, fd, filter):197 try:198 if filter == EVFILT_READ:199 why = selectable.doRead()200 if filter == EVFILT_WRITE:201 why = selectable.doWrite()202 if not selectable.fileno() == fd:203 why = main.CONNECTION_LOST204 except:205 why = sys.exc_info()[1]206 log.deferr()207 205 206 def _doWriteOrRead(self, selectable, fd, event): 207 why = None 208 inRead = False 209 filter, flags, data, fflags = event.filter, event.flags, event.data, event.fflags 210 211 if flags & KQ_EV_EOF and data and fflags: 212 why = main.CONNECTION_LOST 213 else: 214 try: 215 if selectable.fileno() == -1: 216 inRead = False 217 why = posixbase._NO_FILEDESC 218 else: 219 if filter == KQ_FILTER_READ: 220 inRead = True 221 why = selectable.doRead() 222 if filter == KQ_FILTER_WRITE: 223 inRead = False 224 why = selectable.doWrite() 225 except: 226 # Any exception from application code gets logged and will 227 # cause us to disconnect the selectable. 228 why = sys.exc_info()[1] 229 log.err() 230 208 231 if why: 209 self.removeReader(selectable) 210 self.removeWriter(selectable) 211 selectable.connectionLost(failure.Failure(why)) 232 self._disconnectSelectable(selectable, why, inRead) 212 233 213 234 doIteration = doKEvent 214 235 215 236 216 237 def install(): 217 k = KQueueReactor() 218 main.installReactor(k) 238 """ 239 Install the kqueue() reactor. 240 """ 241 p = KQueueReactor() 242 from twisted.internet.main import installReactor 243 installReactor(p) 219 244 220 245 221 246 __all__ = ["KQueueReactor", "install"]
