Ticket #1918: kqueue.4.patch
| File kqueue.4.patch, 16.5 KB (added by oberstet, 18 months ago) |
|---|
-
twisted/scripts/_twistd_unix.py
7 7 from twisted.python import log, syslog, logfile, usage 8 8 from twisted.python.util import switchUID, uidFromString, gidFromString 9 9 from twisted.application import app, service 10 from twisted.internet.interfaces import IReactorDaemonize 10 11 from twisted import copyright 11 12 12 13 … … 156 157 157 158 158 159 def daemonize(): 159 # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16 160 """ 161 Daemonizes the application on Unix. This is done by the usual double 162 forking approach. 163 164 See: 165 [1] http://code.activestate.com/recipes/278731/ 166 [2] W. Richard Stevens, "Advanced Programming in the Unix Environment", 167 1992, Addison-Wesley, ISBN 0-201-56317-7 168 """ 169 170 ## If the reactor requires hooks to be called for daemonization, call them. 171 ## Currently the only reactor which provides/needs that is KQueueReactor. 172 from twisted.internet import reactor 173 if IReactorDaemonize.providedBy(reactor): 174 reactor.beforeDaemonize() 175 160 176 if os.fork(): # launch child and... 161 177 os._exit(0) # kill off parent 162 178 os.setsid() … … 171 187 raise 172 188 os.close(null) 173 189 190 if IReactorDaemonize.providedBy(reactor): 191 reactor.afterDaemonize() 174 192 175 193 194 176 195 def launchWithName(name): 177 196 if name and name != sys.argv[0]: 178 197 exe = os.path.realpath(sys.executable) -
twisted/internet/interfaces.py
870 870 """ 871 871 872 872 873 class IReactorDaemonize(Interface): 874 """ 875 A reactor which provides hooks that need to be called before and after 876 daemonization. 877 """ 878 879 def beforeDaemonize(): 880 """ 881 Hook to be called immediately before daemonization. No reactor methods 882 must be called until L{afterDaemonize} is called. 883 884 @return: C{None}. 885 """ 886 887 def afterDaemonize(): 888 """ 889 Hook to be called immediately after daemonization. This must only be 890 called after L{beforeDaemonize} had been called previously. 891 892 @return: C{None}. 893 """ 894 895 873 896 class IReactorFDSet(Interface): 874 897 """ 875 898 Implement me to be able to use L{IFileDescriptor} type resources. … … 1899 1922 @return: a client endpoint 1900 1923 @rtype: L{IStreamClientEndpoint} 1901 1924 """ 1902 -
twisted/internet/test/test_fdset.py
286 286 reactor = self.buildReactor() 287 287 288 288 name = reactor.__class__.__name__ 289 if name in ('EPollReactor', ' CFReactor'):289 if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): 290 290 # Closing a file descriptor immediately removes it from the epoll 291 291 # set without generating a notification. That means epollreactor 292 292 # will not call any methods on Victim after the close, so there's -
twisted/internet/kqreactor.py
4 4 """ 5 5 A kqueue()/kevent() based implementation of the Twisted main loop. 6 6 7 To install the event loop (and you should do this before any connections, 8 listeners or connectors are added):: 7 To use this reactor, start your application specifying the kqueue reactor: 9 8 10 | from twisted.internet import kqreactor 11 | kqreactor.install() 9 twistd ... --reactor kqueue 12 10 13 T his reactor only works on FreeBSD and requires PyKQueue 1.3, which is14 available at: U{http://people.freebsd.org/~dwhite/PyKQueue/} 11 To install the event loop from code (and you should do this before any 12 connections, listeners or connectors are added):: 15 13 14 from twisted.internet import kqreactor 15 kqreactor.install() 16 16 17 This implementation depends on Python 2.6 or higher which has kqueue support 18 built in the select module. 17 19 18 You're going to need to patch PyKqueue:: 20 Note, that you should use Python 2.6.5 or higher, since previous implementations 21 of select.kqueue had 19 22 20 ===================================================== 21 --- PyKQueue-1.3/kqsyscallmodule.c Sun Jan 28 21:59:50 2001 22 +++ PyKQueue-1.3/kqsyscallmodule.c.new Tue Jul 30 18:06:08 2002 23 @@ -137,7 +137,7 @@ 24 } 25 26 statichere PyTypeObject KQEvent_Type = { 27 - PyObject_HEAD_INIT(NULL) 28 + PyObject_HEAD_INIT(&PyType_Type) 29 0, // ob_size 30 "KQEvent", // tp_name 31 sizeof(KQEventObject), // tp_basicsize 32 @@ -291,13 +291,14 @@ 33 34 /* Build timespec for timeout */ 35 totimespec.tv_sec = timeout / 1000; 36 - totimespec.tv_nsec = (timeout % 1000) * 100000; 37 + totimespec.tv_nsec = (timeout % 1000) * 1000000; 38 39 // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 40 41 /* Make the call */ 42 - 43 + Py_BEGIN_ALLOW_THREADS 44 gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 45 + Py_END_ALLOW_THREADS 46 47 /* Don't need the input event list anymore, so get rid of it */ 48 free (changelist); 49 @@ -361,7 +362,7 @@ 50 statichere PyTypeObject KQueue_Type = { 51 /* The ob_type field must be initialized in the module init function 52 * to be portable to Windows without using C++. */ 53 - PyObject_HEAD_INIT(NULL) 54 + PyObject_HEAD_INIT(&PyType_Type) 55 0, /*ob_size*/ 56 "KQueue", /*tp_name*/ 57 sizeof(KQueueObject), /*tp_basicsize*/ 23 http://bugs.python.org/issue5910 58 24 25 not yet fixed. 59 26 """ 60 27 61 import errno , sys28 import errno 62 29 63 30 from zope.interface import implements 64 31 65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD 66 from kqsyscall import kqueue, kevent 32 from select import kqueue, kevent 33 from select import KQ_FILTER_READ, KQ_FILTER_WRITE, \ 34 KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF 67 35 68 from twisted.internet.interfaces import IReactorFDSet 36 from twisted.internet.interfaces import IReactorFDSet, IReactorDaemonize 69 37 70 38 from twisted.python import log, failure 71 39 from twisted.internet import main, posixbase … … 73 41 74 42 class KQueueReactor(posixbase.PosixReactorBase): 75 43 """ 76 A reactor that uses kqueue(2)/kevent(2). 44 A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher 45 which has built in support for kqueue in the select module. 77 46 78 47 @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. 79 48 … … 95 64 dispatched to the corresponding L{FileDescriptor} instances in 96 65 C{_selectables}. 97 66 """ 98 implements(IReactorFDSet )67 implements(IReactorFDSet, IReactorDaemonize) 99 68 69 100 70 def __init__(self): 101 71 """ 102 72 Initialize kqueue object, file descriptor tracking dictionaries, and the 103 73 base class. 74 75 See: 76 - http://docs.python.org/library/select.html 77 - www.freebsd.org/cgi/man.cgi?query=kqueue 78 - people.freebsd.org/~jlemon/papers/kqueue.pdf 104 79 """ 105 80 self._kq = kqueue() 106 81 self._reads = {} … … 109 84 posixbase.PosixReactorBase.__init__(self) 110 85 111 86 112 def _updateRegistration(self, *args): 113 self._kq.kevent([kevent(*args)], 0, 0) 87 def _updateRegistration(self, fd, filter, op): 88 """ 89 Private method for changing kqueue registration on a given FD 90 filtering for events given filter/op. This will never block and 91 returns nothing. 92 """ 93 self._kq.control([kevent(fd, filter, op)], 0, 0) 114 94 95 96 def beforeDaemonize(self): 97 """ 98 Implement L{IReactorDaemonize.beforeDaemonize}. 99 """ 100 # Twisted-internal method called during daemonization (when application 101 # is started via twistd). This is called right before the magic double 102 # forking done for daemonization. We cleanly close the kqueue() and later 103 # recreate it. This is needed since a) kqueue() are not inherited across 104 # forks and b) twistd will create the reactor already before daemonization 105 # (and will also add at least 1 reader to the reactor, an instance of 106 # twisted.internet.posixbase._UnixWaker). 107 # 108 # See: twisted.scripts._twistd_unix.daemonize() 109 self._kq.close() 110 self._kq = None 111 112 113 def afterDaemonize(self): 114 """ 115 Implement L{IReactorDaemonize.afterDaemonize}. 116 """ 117 # Twisted-internal method called during daemonization. This is called right 118 # after daemonization and recreates the kqueue() and any readers/writers 119 # that were added before. Note that you MUST NOT call any reactor methods 120 # in between beforeDaemonize() and afterDaemonize()! 121 self._kq = kqueue() 122 for fd in self._reads: 123 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 124 for fd in self._writes: 125 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 126 127 115 128 def addReader(self, reader): 116 """Add a FileDescriptor for notification of data available to read.117 129 """ 130 Implement L{IReactorFDSet.addReader}. 131 """ 118 132 fd = reader.fileno() 119 133 if fd not in self._reads: 120 self._selectables[fd] = reader 121 self._reads[fd] = 1 122 self._updateRegistration(fd, EVFILT_READ, EV_ADD) 134 try: 135 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 136 except OSError: 137 pass 138 finally: 139 self._selectables[fd] = reader 140 self._reads[fd] = 1 123 141 142 124 143 def addWriter(self, writer): 125 """Add a FileDescriptor for notification of data available to write.126 144 """ 145 Implement L{IReactorFDSet.addWriter}. 146 """ 127 147 fd = writer.fileno() 128 148 if fd not in self._writes: 129 self._selectables[fd] = writer 130 self._writes[fd] = 1 131 self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 149 try: 150 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 151 except OSError: 152 pass 153 finally: 154 self._selectables[fd] = writer 155 self._writes[fd] = 1 132 156 157 133 158 def removeReader(self, reader): 134 """Remove a Selectable for notification of data available to read.135 159 """ 136 fd = reader.fileno() 160 Implement L{IReactorFDSet.removeReader}. 161 """ 162 wasLost = False 163 try: 164 fd = reader.fileno() 165 except: 166 fd = -1 167 if fd == -1: 168 for fd, fdes in self._selectables.items(): 169 if reader is fdes: 170 wasLost = True 171 break 172 else: 173 return 137 174 if fd in self._reads: 138 175 del self._reads[fd] 139 176 if fd not in self._writes: 140 177 del self._selectables[fd] 141 self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 178 if not wasLost: 179 try: 180 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) 181 except OSError: 182 pass 142 183 184 143 185 def removeWriter(self, writer): 144 """Remove a Selectable for notification of data available to write.145 186 """ 146 fd = writer.fileno() 187 Implement L{IReactorFDSet.removeWriter}. 188 """ 189 wasLost = False 190 try: 191 fd = writer.fileno() 192 except: 193 fd = -1 194 if fd == -1: 195 for fd, fdes in self._selectables.items(): 196 if writer is fdes: 197 wasLost = True 198 break 199 else: 200 return 147 201 if fd in self._writes: 148 202 del self._writes[fd] 149 203 if fd not in self._reads: 150 204 del self._selectables[fd] 151 self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 205 if not wasLost: 206 try: 207 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) 208 except OSError: 209 pass 152 210 211 153 212 def removeAll(self): 154 213 """ 155 Remove all selectables, and return a list of them.214 Implement L{IReactorFDSet.removeAll}. 156 215 """ 157 216 return self._removeAll( 158 217 [self._selectables[fd] for fd in self._reads], … … 160 219 161 220 162 221 def getReaders(self): 222 """ 223 Implement L{IReactorFDSet.getReaders}. 224 """ 163 225 return [self._selectables[fd] for fd in self._reads] 164 226 165 227 166 228 def getWriters(self): 229 """ 230 Implement L{IReactorFDSet.getWriters}. 231 """ 167 232 return [self._selectables[fd] for fd in self._writes] 168 233 169 234 170 235 def doKEvent(self, timeout): 171 """Poll the kqueue for new events.""" 236 """ 237 Poll the kqueue for new events. 238 """ 172 239 if timeout is None: 173 timeout = 1000 174 else: 175 timeout = int(timeout * 1000) # convert seconds to milliseconds 240 timeout = 1 176 241 177 242 try: 178 l = self._kq. kevent([], len(self._selectables), timeout)243 l = self._kq.control([], len(self._selectables), timeout) 179 244 except OSError, e: 180 245 if e[0] == errno.EINTR: 181 246 return 182 247 else: 183 248 raise 249 184 250 _drdw = self._doWriteOrRead 185 251 for event in l: 186 why = None 187 fd, filter = event.ident, event.filter 252 fd = event.ident 188 253 try: 189 254 selectable = self._selectables[fd] 190 255 except KeyError: 191 256 # Handles the infrequent case where one selectable's 192 257 # handler disconnects another. 193 258 continue 194 log.callWithLogger(selectable, _drdw, selectable, fd, filter) 259 else: 260 log.callWithLogger(selectable, _drdw, selectable, fd, event) 195 261 196 def _doWriteOrRead(self, selectable, fd, filter):197 try:198 if filter == EVFILT_READ:199 why = selectable.doRead()200 if filter == EVFILT_WRITE:201 why = selectable.doWrite()202 if not selectable.fileno() == fd:203 why = main.CONNECTION_LOST204 except:205 why = sys.exc_info()[1]206 log.deferr()207 262 263 def _doWriteOrRead(self, selectable, fd, event): 264 """ 265 Private method called when a FD is ready for reading, writing or was 266 lost. Do the work and raise errors where necessary. 267 """ 268 why = None 269 inRead = False 270 (filter, flags, data, fflags) = ( 271 event.filter, event.flags, event.data, event.fflags) 272 273 if flags & KQ_EV_EOF and data and fflags: 274 why = main.CONNECTION_LOST 275 else: 276 try: 277 if selectable.fileno() == -1: 278 inRead = False 279 why = posixbase._NO_FILEDESC 280 else: 281 if filter == KQ_FILTER_READ: 282 inRead = True 283 why = selectable.doRead() 284 if filter == KQ_FILTER_WRITE: 285 inRead = False 286 why = selectable.doWrite() 287 except: 288 # Any exception from application code gets logged and will 289 # cause us to disconnect the selectable. 290 why = failure.Failure() 291 log.err(why) 292 208 293 if why: 209 self.removeReader(selectable) 210 self.removeWriter(selectable) 211 selectable.connectionLost(failure.Failure(why)) 294 self._disconnectSelectable(selectable, why, inRead) 212 295 213 296 doIteration = doKEvent 214 297 215 298 216 299 def install(): 217 k = KQueueReactor() 218 main.installReactor(k) 300 """ 301 Install the kqueue() reactor. 302 """ 303 p = KQueueReactor() 304 from twisted.internet.main import installReactor 305 installReactor(p) 219 306 220 307 221 308 __all__ = ["KQueueReactor", "install"] -
1918.feature
1 The kqueue reactor has been revived. (#1918)
