Ticket #1918: kqueue.2.patch
| File kqueue.2.patch, 12.1 KB (added by oberstet, 20 months ago) |
|---|
-
twisted/internet/test/test_fdset.py
286 286 reactor = self.buildReactor() 287 287 288 288 name = reactor.__class__.__name__ 289 if name in ('EPollReactor', ' CFReactor'):289 if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): 290 290 # Closing a file descriptor immediately removes it from the epoll 291 291 # set without generating a notification. That means epollreactor 292 292 # will not call any methods on Victim after the close, so there's -
twisted/internet/kqreactor.py
4 4 """ 5 5 A kqueue()/kevent() based implementation of the Twisted main loop. 6 6 7 To install the event loop (and you should do this before any connections, 8 listeners or connectors are added):: 7 To use this reactor, start your application specifying the kqueue reactor: 9 8 10 | from twisted.internet import kqreactor 11 | kqreactor.install() 9 twistd ... --reactor kqueue 12 10 13 T his reactor only works on FreeBSD and requires PyKQueue 1.3, which is14 available at: U{http://people.freebsd.org/~dwhite/PyKQueue/} 11 To install the event loop from code (and you should do this before any 12 connections, listeners or connectors are added):: 15 13 14 from twisted.internet import kqreactor 15 kqreactor.install() 16 16 17 This implementation depends on Python 2.6 or higher which has kqueue support 18 built in the select module. 17 19 18 You're going to need to patch PyKqueue:: 20 Note, that you should use Python 2.6.5 or higher, since previous implementations 21 of select.kqueue had 19 22 20 ===================================================== 21 --- PyKQueue-1.3/kqsyscallmodule.c Sun Jan 28 21:59:50 2001 22 +++ PyKQueue-1.3/kqsyscallmodule.c.new Tue Jul 30 18:06:08 2002 23 @@ -137,7 +137,7 @@ 24 } 25 26 statichere PyTypeObject KQEvent_Type = { 27 - PyObject_HEAD_INIT(NULL) 28 + PyObject_HEAD_INIT(&PyType_Type) 29 0, // ob_size 30 "KQEvent", // tp_name 31 sizeof(KQEventObject), // tp_basicsize 32 @@ -291,13 +291,14 @@ 33 34 /* Build timespec for timeout */ 35 totimespec.tv_sec = timeout / 1000; 36 - totimespec.tv_nsec = (timeout % 1000) * 100000; 37 + totimespec.tv_nsec = (timeout % 1000) * 1000000; 38 39 // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 40 41 /* Make the call */ 42 - 43 + Py_BEGIN_ALLOW_THREADS 44 gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 45 + Py_END_ALLOW_THREADS 46 47 /* Don't need the input event list anymore, so get rid of it */ 48 free (changelist); 49 @@ -361,7 +362,7 @@ 50 statichere PyTypeObject KQueue_Type = { 51 /* The ob_type field must be initialized in the module init function 52 * to be portable to Windows without using C++. */ 53 - PyObject_HEAD_INIT(NULL) 54 + PyObject_HEAD_INIT(&PyType_Type) 55 0, /*ob_size*/ 56 "KQueue", /*tp_name*/ 57 sizeof(KQueueObject), /*tp_basicsize*/ 23 http://bugs.python.org/issue5910 58 24 25 not yet fixed. 59 26 """ 60 27 61 import errno , sys28 import errno 62 29 63 30 from zope.interface import implements 64 31 65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD 66 from kqsyscall import kqueue, kevent 32 from select import kqueue, kevent 33 from select import KQ_FILTER_READ, KQ_FILTER_WRITE, \ 34 KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF 67 35 68 36 from twisted.internet.interfaces import IReactorFDSet 69 37 … … 73 41 74 42 class KQueueReactor(posixbase.PosixReactorBase): 75 43 """ 76 A reactor that uses kqueue(2)/kevent(2). 44 A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher 45 which has built in support for kqueue in the select module. 77 46 78 47 @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. 79 48 … … 97 66 """ 98 67 implements(IReactorFDSet) 99 68 69 100 70 def __init__(self): 101 71 """ 102 72 Initialize kqueue object, file descriptor tracking dictionaries, and the … … 109 79 posixbase.PosixReactorBase.__init__(self) 110 80 111 81 112 def _updateRegistration(self, *args): 113 self._kq.kevent([kevent(*args)], 0, 0) 82 def _updateRegistration(self, fd, filter, op): 83 """ 84 Private method for changing kqueue registration on a given FD 85 filtering for events given filter/op. This will never block and 86 returns nothing. 87 """ 88 self._kq.control([kevent(fd, filter, op)], 0, 0) 114 89 90 115 91 def addReader(self, reader): 116 """Add a FileDescriptor for notification of data available to read.117 92 """ 93 Implement L{IReactorFDSet.addReader}. 94 """ 118 95 fd = reader.fileno() 119 96 if fd not in self._reads: 120 self._selectables[fd] = reader 121 self._reads[fd] = 1 122 self._updateRegistration(fd, EVFILT_READ, EV_ADD) 97 try: 98 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 99 except OSError: 100 pass 101 finally: 102 self._selectables[fd] = reader 103 self._reads[fd] = 1 123 104 105 124 106 def addWriter(self, writer): 125 """Add a FileDescriptor for notification of data available to write.126 107 """ 108 Implement L{IReactorFDSet.addWriter}. 109 """ 127 110 fd = writer.fileno() 128 111 if fd not in self._writes: 129 self._selectables[fd] = writer 130 self._writes[fd] = 1 131 self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 112 try: 113 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 114 except OSError: 115 pass 116 finally: 117 self._selectables[fd] = writer 118 self._writes[fd] = 1 132 119 120 133 121 def removeReader(self, reader): 134 """Remove a Selectable for notification of data available to read.135 122 """ 136 fd = reader.fileno() 123 Implement L{IReactorFDSet.removeReader}. 124 """ 125 wasLost = False 126 try: 127 fd = reader.fileno() 128 except: 129 fd = -1 130 if fd == -1: 131 for fd, fdes in self._selectables.items(): 132 if reader is fdes: 133 wasLost = True 134 break 135 else: 136 return 137 137 if fd in self._reads: 138 138 del self._reads[fd] 139 139 if fd not in self._writes: 140 140 del self._selectables[fd] 141 self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 141 if not wasLost: 142 try: 143 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) 144 except OSError: 145 pass 142 146 147 143 148 def removeWriter(self, writer): 144 """Remove a Selectable for notification of data available to write.145 149 """ 146 fd = writer.fileno() 150 Implement L{IReactorFDSet.removeWriter}. 151 """ 152 wasLost = False 153 try: 154 fd = writer.fileno() 155 except: 156 fd = -1 157 if fd == -1: 158 for fd, fdes in self._selectables.items(): 159 if writer is fdes: 160 wasLost = True 161 break 162 else: 163 return 147 164 if fd in self._writes: 148 165 del self._writes[fd] 149 166 if fd not in self._reads: 150 167 del self._selectables[fd] 151 self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 168 if not wasLost: 169 try: 170 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) 171 except OSError: 172 pass 152 173 174 153 175 def removeAll(self): 154 176 """ 155 Remove all selectables, and return a list of them.177 Implement L{IReactorFDSet.removeAll}. 156 178 """ 157 179 return self._removeAll( 158 180 [self._selectables[fd] for fd in self._reads], … … 160 182 161 183 162 184 def getReaders(self): 185 """ 186 Implement L{IReactorFDSet.getReaders}. 187 """ 163 188 return [self._selectables[fd] for fd in self._reads] 164 189 165 190 166 191 def getWriters(self): 192 """ 193 Implement L{IReactorFDSet.getWriters}. 194 """ 167 195 return [self._selectables[fd] for fd in self._writes] 168 196 169 197 170 198 def doKEvent(self, timeout): 171 """Poll the kqueue for new events.""" 199 """ 200 Poll the kqueue for new events. 201 """ 172 202 if timeout is None: 173 timeout = 1000 174 else: 175 timeout = int(timeout * 1000) # convert seconds to milliseconds 203 timeout = 1 176 204 177 205 try: 178 l = self._kq. kevent([], len(self._selectables), timeout)206 l = self._kq.control([], len(self._selectables), timeout) 179 207 except OSError, e: 180 208 if e[0] == errno.EINTR: 181 209 return 182 210 else: 183 211 raise 212 184 213 _drdw = self._doWriteOrRead 185 214 for event in l: 186 why = None 187 fd, filter = event.ident, event.filter 215 fd = event.ident 188 216 try: 189 217 selectable = self._selectables[fd] 190 218 except KeyError: 191 219 # Handles the infrequent case where one selectable's 192 220 # handler disconnects another. 193 221 continue 194 log.callWithLogger(selectable, _drdw, selectable, fd, filter) 222 else: 223 log.callWithLogger(selectable, _drdw, selectable, fd, event) 195 224 196 def _doWriteOrRead(self, selectable, fd, filter):197 try:198 if filter == EVFILT_READ:199 why = selectable.doRead()200 if filter == EVFILT_WRITE:201 why = selectable.doWrite()202 if not selectable.fileno() == fd:203 why = main.CONNECTION_LOST204 except:205 why = sys.exc_info()[1]206 log.deferr()207 225 226 def _doWriteOrRead(self, selectable, fd, event): 227 """ 228 Private method called when a FD is ready for reading, writing or was 229 lost. Do the work and raise errors where necessary. 230 """ 231 why = None 232 inRead = False 233 (filter, flags, data, fflags) = ( 234 event.filter, event.flags, event.data, event.fflags) 235 236 if flags & KQ_EV_EOF and data and fflags: 237 why = main.CONNECTION_LOST 238 else: 239 try: 240 if selectable.fileno() == -1: 241 inRead = False 242 why = posixbase._NO_FILEDESC 243 else: 244 if filter == KQ_FILTER_READ: 245 inRead = True 246 why = selectable.doRead() 247 if filter == KQ_FILTER_WRITE: 248 inRead = False 249 why = selectable.doWrite() 250 except: 251 # Any exception from application code gets logged and will 252 # cause us to disconnect the selectable. 253 why = failure.Failure() 254 log.err(why) 255 208 256 if why: 209 self.removeReader(selectable) 210 self.removeWriter(selectable) 211 selectable.connectionLost(failure.Failure(why)) 257 self._disconnectSelectable(selectable, why, inRead) 212 258 213 259 doIteration = doKEvent 214 260 215 261 216 262 def install(): 217 k = KQueueReactor() 218 main.installReactor(k) 263 """ 264 Install the kqueue() reactor. 265 """ 266 p = KQueueReactor() 267 from twisted.internet.main import installReactor 268 installReactor(p) 219 269 220 270 221 271 __all__ = ["KQueueReactor", "install"] -
NEWS
23 23 - twisted.protocols.ftp.FTP.ftp_STOR now catches `FTPCmdError`s 24 24 raised by the file writer, and returns the error back to the 25 25 client. (#4909) 26 - The kqueue reactor has been revived. (#1918) 26 27 27 28 Bugfixes 28 29 --------
