Ticket #1918: kqueue.3.patch
| File kqueue.3.patch, 14.3 KB (added by oberstet, 19 months ago) |
|---|
-
twisted/scripts/_twistd_unix.py
156 156 157 157 158 158 def daemonize(): 159 # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16 159 # See: 160 # http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16 161 # http://code.activestate.com/recipes/278731/ 162 from twisted.internet import reactor 163 if hasattr(reactor, '_beforeFork') and callable(reactor._beforeFork): 164 reactor._beforeFork() 160 165 if os.fork(): # launch child and... 161 166 os._exit(0) # kill off parent 162 167 os.setsid() … … 170 175 if e.errno != errno.EBADF: 171 176 raise 172 177 os.close(null) 178 if hasattr(reactor, '_afterFork') and callable(reactor._afterFork): 179 reactor._afterFork() 173 180 174 181 175 182 -
twisted/internet/test/test_fdset.py
286 286 reactor = self.buildReactor() 287 287 288 288 name = reactor.__class__.__name__ 289 if name in ('EPollReactor', ' CFReactor'):289 if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): 290 290 # Closing a file descriptor immediately removes it from the epoll 291 291 # set without generating a notification. That means epollreactor 292 292 # will not call any methods on Victim after the close, so there's -
twisted/internet/kqreactor.py
4 4 """ 5 5 A kqueue()/kevent() based implementation of the Twisted main loop. 6 6 7 To install the event loop (and you should do this before any connections, 8 listeners or connectors are added):: 7 To use this reactor, start your application specifying the kqueue reactor: 9 8 10 | from twisted.internet import kqreactor 11 | kqreactor.install() 9 twistd ... --reactor kqueue 12 10 13 T his reactor only works on FreeBSD and requires PyKQueue 1.3, which is14 available at: U{http://people.freebsd.org/~dwhite/PyKQueue/} 11 To install the event loop from code (and you should do this before any 12 connections, listeners or connectors are added):: 15 13 14 from twisted.internet import kqreactor 15 kqreactor.install() 16 16 17 This implementation depends on Python 2.6 or higher which has kqueue support 18 built in the select module. 17 19 18 You're going to need to patch PyKqueue:: 20 Note, that you should use Python 2.6.5 or higher, since previous implementations 21 of select.kqueue had 19 22 20 ===================================================== 21 --- PyKQueue-1.3/kqsyscallmodule.c Sun Jan 28 21:59:50 2001 22 +++ PyKQueue-1.3/kqsyscallmodule.c.new Tue Jul 30 18:06:08 2002 23 @@ -137,7 +137,7 @@ 24 } 25 26 statichere PyTypeObject KQEvent_Type = { 27 - PyObject_HEAD_INIT(NULL) 28 + PyObject_HEAD_INIT(&PyType_Type) 29 0, // ob_size 30 "KQEvent", // tp_name 31 sizeof(KQEventObject), // tp_basicsize 32 @@ -291,13 +291,14 @@ 33 34 /* Build timespec for timeout */ 35 totimespec.tv_sec = timeout / 1000; 36 - totimespec.tv_nsec = (timeout % 1000) * 100000; 37 + totimespec.tv_nsec = (timeout % 1000) * 1000000; 38 39 // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 40 41 /* Make the call */ 42 - 43 + Py_BEGIN_ALLOW_THREADS 44 gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 45 + Py_END_ALLOW_THREADS 46 47 /* Don't need the input event list anymore, so get rid of it */ 48 free (changelist); 49 @@ -361,7 +362,7 @@ 50 statichere PyTypeObject KQueue_Type = { 51 /* The ob_type field must be initialized in the module init function 52 * to be portable to Windows without using C++. */ 53 - PyObject_HEAD_INIT(NULL) 54 + PyObject_HEAD_INIT(&PyType_Type) 55 0, /*ob_size*/ 56 "KQueue", /*tp_name*/ 57 sizeof(KQueueObject), /*tp_basicsize*/ 23 http://bugs.python.org/issue5910 58 24 25 not yet fixed. 59 26 """ 60 27 61 import errno , sys28 import errno 62 29 63 30 from zope.interface import implements 64 31 65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD 66 from kqsyscall import kqueue, kevent 32 from select import kqueue, kevent 33 from select import KQ_FILTER_READ, KQ_FILTER_WRITE, \ 34 KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF 67 35 68 36 from twisted.internet.interfaces import IReactorFDSet 69 37 … … 73 41 74 42 class KQueueReactor(posixbase.PosixReactorBase): 75 43 """ 76 A reactor that uses kqueue(2)/kevent(2). 44 A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher 45 which has built in support for kqueue in the select module. 77 46 78 47 @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. 79 48 … … 97 66 """ 98 67 implements(IReactorFDSet) 99 68 69 100 70 def __init__(self): 101 71 """ 102 72 Initialize kqueue object, file descriptor tracking dictionaries, and the 103 73 base class. 74 75 See: 76 - http://docs.python.org/library/select.html 77 - www.freebsd.org/cgi/man.cgi?query=kqueue 78 - people.freebsd.org/~jlemon/papers/kqueue.pdf 104 79 """ 105 80 self._kq = kqueue() 106 81 self._reads = {} … … 109 84 posixbase.PosixReactorBase.__init__(self) 110 85 111 86 112 def _updateRegistration(self, *args): 113 self._kq.kevent([kevent(*args)], 0, 0) 87 def _updateRegistration(self, fd, filter, op): 88 """ 89 Private method for changing kqueue registration on a given FD 90 filtering for events given filter/op. This will never block and 91 returns nothing. 92 """ 93 self._kq.control([kevent(fd, filter, op)], 0, 0) 114 94 95 96 def _beforeFork(self): 97 """ 98 Twisted-internal method called during daemonization (when application 99 is started via twistd). This is called right before the magic double 100 forking done for daemonization. We cleanly close the kqueue() and later 101 recreate it. This is needed since a) kqueue() are not inherited across 102 forks and b) twistd will create the reactor already before daemonization 103 (and will also add at least 1 reader to the reactor, an instance of 104 twisted.internet.posixbase._UnixWaker). 105 106 See: twisted.scripts._twistd_unix.daemonize() 107 """ 108 self._kq.close() 109 self._kq = None 110 111 112 def _afterFork(self): 113 """ 114 Twisted-internal method called during daemonization. This is called right 115 after daemonization and recreates the kqueue() and any readers/writers 116 that were added before. Note that you MUST NOT call any reactor methods 117 in between _beforeFork and _afterFork! 118 """ 119 self._kq = kqueue() 120 for fd in self._reads: 121 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 122 for fd in self._writes: 123 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 124 125 115 126 def addReader(self, reader): 116 """Add a FileDescriptor for notification of data available to read.117 127 """ 128 Implement L{IReactorFDSet.addReader}. 129 """ 118 130 fd = reader.fileno() 119 131 if fd not in self._reads: 120 self._selectables[fd] = reader 121 self._reads[fd] = 1 122 self._updateRegistration(fd, EVFILT_READ, EV_ADD) 132 try: 133 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 134 except OSError: 135 pass 136 finally: 137 self._selectables[fd] = reader 138 self._reads[fd] = 1 123 139 140 124 141 def addWriter(self, writer): 125 """Add a FileDescriptor for notification of data available to write.126 142 """ 143 Implement L{IReactorFDSet.addWriter}. 144 """ 127 145 fd = writer.fileno() 128 146 if fd not in self._writes: 129 self._selectables[fd] = writer 130 self._writes[fd] = 1 131 self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 147 try: 148 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 149 except OSError: 150 pass 151 finally: 152 self._selectables[fd] = writer 153 self._writes[fd] = 1 132 154 155 133 156 def removeReader(self, reader): 134 """Remove a Selectable for notification of data available to read.135 157 """ 136 fd = reader.fileno() 158 Implement L{IReactorFDSet.removeReader}. 159 """ 160 wasLost = False 161 try: 162 fd = reader.fileno() 163 except: 164 fd = -1 165 if fd == -1: 166 for fd, fdes in self._selectables.items(): 167 if reader is fdes: 168 wasLost = True 169 break 170 else: 171 return 137 172 if fd in self._reads: 138 173 del self._reads[fd] 139 174 if fd not in self._writes: 140 175 del self._selectables[fd] 141 self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 176 if not wasLost: 177 try: 178 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) 179 except OSError: 180 pass 142 181 182 143 183 def removeWriter(self, writer): 144 """Remove a Selectable for notification of data available to write.145 184 """ 146 fd = writer.fileno() 185 Implement L{IReactorFDSet.removeWriter}. 186 """ 187 wasLost = False 188 try: 189 fd = writer.fileno() 190 except: 191 fd = -1 192 if fd == -1: 193 for fd, fdes in self._selectables.items(): 194 if writer is fdes: 195 wasLost = True 196 break 197 else: 198 return 147 199 if fd in self._writes: 148 200 del self._writes[fd] 149 201 if fd not in self._reads: 150 202 del self._selectables[fd] 151 self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 203 if not wasLost: 204 try: 205 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) 206 except OSError: 207 pass 152 208 209 153 210 def removeAll(self): 154 211 """ 155 Remove all selectables, and return a list of them.212 Implement L{IReactorFDSet.removeAll}. 156 213 """ 157 214 return self._removeAll( 158 215 [self._selectables[fd] for fd in self._reads], … … 160 217 161 218 162 219 def getReaders(self): 220 """ 221 Implement L{IReactorFDSet.getReaders}. 222 """ 163 223 return [self._selectables[fd] for fd in self._reads] 164 224 165 225 166 226 def getWriters(self): 227 """ 228 Implement L{IReactorFDSet.getWriters}. 229 """ 167 230 return [self._selectables[fd] for fd in self._writes] 168 231 169 232 170 233 def doKEvent(self, timeout): 171 """Poll the kqueue for new events.""" 234 """ 235 Poll the kqueue for new events. 236 """ 172 237 if timeout is None: 173 timeout = 1000 174 else: 175 timeout = int(timeout * 1000) # convert seconds to milliseconds 238 timeout = 1 176 239 177 240 try: 178 l = self._kq. kevent([], len(self._selectables), timeout)241 l = self._kq.control([], len(self._selectables), timeout) 179 242 except OSError, e: 180 243 if e[0] == errno.EINTR: 181 244 return 182 245 else: 183 246 raise 247 184 248 _drdw = self._doWriteOrRead 185 249 for event in l: 186 why = None 187 fd, filter = event.ident, event.filter 250 fd = event.ident 188 251 try: 189 252 selectable = self._selectables[fd] 190 253 except KeyError: 191 254 # Handles the infrequent case where one selectable's 192 255 # handler disconnects another. 193 256 continue 194 log.callWithLogger(selectable, _drdw, selectable, fd, filter) 257 else: 258 log.callWithLogger(selectable, _drdw, selectable, fd, event) 195 259 196 def _doWriteOrRead(self, selectable, fd, filter):197 try:198 if filter == EVFILT_READ:199 why = selectable.doRead()200 if filter == EVFILT_WRITE:201 why = selectable.doWrite()202 if not selectable.fileno() == fd:203 why = main.CONNECTION_LOST204 except:205 why = sys.exc_info()[1]206 log.deferr()207 260 261 def _doWriteOrRead(self, selectable, fd, event): 262 """ 263 Private method called when a FD is ready for reading, writing or was 264 lost. Do the work and raise errors where necessary. 265 """ 266 why = None 267 inRead = False 268 (filter, flags, data, fflags) = ( 269 event.filter, event.flags, event.data, event.fflags) 270 271 if flags & KQ_EV_EOF and data and fflags: 272 why = main.CONNECTION_LOST 273 else: 274 try: 275 if selectable.fileno() == -1: 276 inRead = False 277 why = posixbase._NO_FILEDESC 278 else: 279 if filter == KQ_FILTER_READ: 280 inRead = True 281 why = selectable.doRead() 282 if filter == KQ_FILTER_WRITE: 283 inRead = False 284 why = selectable.doWrite() 285 except: 286 # Any exception from application code gets logged and will 287 # cause us to disconnect the selectable. 288 why = failure.Failure() 289 log.err(why) 290 208 291 if why: 209 self.removeReader(selectable) 210 self.removeWriter(selectable) 211 selectable.connectionLost(failure.Failure(why)) 292 self._disconnectSelectable(selectable, why, inRead) 212 293 213 294 doIteration = doKEvent 214 295 215 296 216 297 def install(): 217 k = KQueueReactor() 218 main.installReactor(k) 298 """ 299 Install the kqueue() reactor. 300 """ 301 p = KQueueReactor() 302 from twisted.internet.main import installReactor 303 installReactor(p) 219 304 220 305 221 306 __all__ = ["KQueueReactor", "install"] -
1918.feature
1 The kqueue reactor has been revived. (#1918)
