Ticket #1918: kqueue.5.patch
| File kqueue.5.patch, 21.2 KB (added by oberstet, 18 months ago) |
|---|
-
twisted/test/test_twistd.py
23 23 from zope.interface.verify import verifyObject 24 24 25 25 from twisted.trial import unittest 26 from twisted.test.test_process import MockOS 26 27 27 28 from twisted import plugin 28 29 from twisted.application.service import IServiceMaker … … 34 35 from twisted.python.versions import Version 35 36 from twisted.python.components import Componentized 36 37 from twisted.internet.defer import Deferred 38 from twisted.internet.interfaces import IReactorDaemonize 37 39 from twisted.python.fakepwd import UserDatabase 38 40 39 41 try: … … 616 618 self.runner = UnixApplicationRunner({}) 617 619 618 620 619 def daemonize(self ):621 def daemonize(self, daemonize_reactor, daemonize_os): 620 622 """ 621 623 Indicate that daemonization has happened and change the PID so that the 622 624 value written to the pidfile can be tested in the daemonization case. … … 806 808 807 809 808 810 811 class FakeNonDaemonizingReactor: 812 """ 813 A dummy reactor, providing C{beforeDaemonize} and C{afterDaemonize} methods, 814 but not announcing this, and logging whether the methods have been called. 815 816 @ivar _beforeDaemonizeCalled: if C{beforeDaemonize} has been called or not. 817 @type _beforeDaemonizeCalled: C{bool} 818 @ivar _afterDaemonizeCalled: if C{afterDaemonize} has been called or not. 819 @type _afterDaemonizeCalled: C{bool} 820 """ 821 822 def __init__(self): 823 self._beforeDaemonizeCalled = False 824 self._afterDaemonizeCalled = False 825 826 def beforeDaemonize(self): 827 self._beforeDaemonizeCalled = True 828 829 def afterDaemonize(self): 830 self._afterDaemonizeCalled = True 831 832 833 834 class FakeDaemonizingReactor(FakeNonDaemonizingReactor): 835 """ 836 A dummy reactor, providing C{beforeDaemonize} and C{afterDaemonize} methods, 837 announcing this, and logging whether the methods have been called. 838 839 @ivar _beforeDaemonizeCalled: if C{beforeDaemonize} has been called or not. 840 @type _beforeDaemonizeCalled: C{bool} 841 @ivar _afterDaemonizeCalled: if C{afterDaemonize} has been called or not. 842 @type _afterDaemonizeCalled: C{bool} 843 """ 844 845 implements(IReactorDaemonize) 846 847 848 849 class ReactorDaemonizationTests(unittest.TestCase): 850 """ 851 Tests for L{_twistd_unix.daemonize} and L{IReactorDaemonize}. 852 """ 853 if _twistd_unix is None: 854 skip = "twistd unix not available" 855 856 857 def test_DaemonizationHooksCalled(self): 858 """ 859 Check that L{_twistd_unix.daemonize} indeed calls 860 L{IReactorDaemonize.beforeDaemonize} and 861 L{IReactorDaemonize.afterDaemonize} if the reactor implements 862 L{IReactorDaemonize}. 863 """ 864 test_reactor = FakeDaemonizingReactor() 865 test_os = MockOS() 866 _twistd_unix.daemonize(test_reactor, test_os) 867 self.assertTrue(test_reactor._beforeDaemonizeCalled and \ 868 test_reactor._afterDaemonizeCalled) 869 870 871 def test_DaemonizationHooksNotCalled(self): 872 """ 873 Check that L{_twistd_unix.daemonize} does NOT call 874 L{IReactorDaemonize.beforeDaemonize} or 875 L{IReactorDaemonize.afterDaemonize} if the reactor does NOT 876 implement L{IReactorDaemonize}. 877 """ 878 test_reactor = FakeNonDaemonizingReactor() 879 test_os = MockOS() 880 _twistd_unix.daemonize(test_reactor, test_os) 881 self.assertTrue(not test_reactor._beforeDaemonizeCalled and \ 882 not test_reactor._afterDaemonizeCalled) 883 884 885 809 886 class DummyReactor(object): 810 887 """ 811 888 A dummy reactor, only providing a C{run} method and checking that it -
twisted/topfiles/1918.feature
1 The kqueue reactor has been revived. -
twisted/scripts/_twistd_unix.py
7 7 from twisted.python import log, syslog, logfile, usage 8 8 from twisted.python.util import switchUID, uidFromString, gidFromString 9 9 from twisted.application import app, service 10 from twisted.internet.interfaces import IReactorDaemonize 10 11 from twisted import copyright 11 12 12 13 … … 155 156 156 157 157 158 158 def daemonize(): 159 # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16 160 if os.fork(): # launch child and... 161 os._exit(0) # kill off parent 162 os.setsid() 163 if os.fork(): # launch child and... 164 os._exit(0) # kill off parent again. 165 null = os.open('/dev/null', os.O_RDWR) 159 def daemonize(daemonize_reactor, daemonize_os): 160 """ 161 Daemonizes the application on Unix. This is done by the usual double 162 forking approach. 163 164 See: 165 [1] http://code.activestate.com/recipes/278731/ 166 [2] W. Richard Stevens, "Advanced Programming in the Unix Environment", 167 1992, Addison-Wesley, ISBN 0-201-56317-7 168 """ 169 170 ## If the reactor requires hooks to be called for daemonization, call them. 171 ## Currently the only reactor which provides/needs that is KQueueReactor. 172 if IReactorDaemonize.providedBy(daemonize_reactor): 173 daemonize_reactor.beforeDaemonize() 174 175 if daemonize_os.fork(): # launch child and... 176 daemonize_os._exit(0) # kill off parent 177 daemonize_os.setsid() 178 if daemonize_os.fork(): # launch child and... 179 daemonize_os._exit(0) # kill off parent again. 180 null = daemonize_os.open('/dev/null', daemonize_os.O_RDWR) 166 181 for i in range(3): 167 182 try: 168 os.dup2(null, i)183 daemonize_os.dup2(null, i) 169 184 except OSError, e: 170 185 if e.errno != errno.EBADF: 171 186 raise 172 os.close(null)187 daemonize_os.close(null) 173 188 189 if IReactorDaemonize.providedBy(daemonize_reactor): 190 daemonize_reactor.afterDaemonize() 174 191 175 192 193 176 194 def launchWithName(name): 177 195 if name and name != sys.argv[0]: 178 196 exe = os.path.realpath(sys.executable) … … 265 283 if umask is not None: 266 284 os.umask(umask) 267 285 if daemon: 268 daemonize() 286 from twisted.internet import reactor 287 daemonize(reactor, os) 269 288 if pidfile: 270 289 f = open(pidfile,'wb') 271 290 f.write(str(os.getpid())) -
twisted/internet/interfaces.py
870 870 """ 871 871 872 872 873 class IReactorDaemonize(Interface): 874 """ 875 A reactor which provides hooks that need to be called before and after 876 daemonization. 877 """ 878 879 def beforeDaemonize(): 880 """ 881 Hook to be called immediately before daemonization. No reactor methods 882 must be called until L{afterDaemonize} is called. 883 884 @return: C{None}. 885 """ 886 887 def afterDaemonize(): 888 """ 889 Hook to be called immediately after daemonization. This must only be 890 called after L{beforeDaemonize} had been called previously. 891 892 @return: C{None}. 893 """ 894 895 873 896 class IReactorFDSet(Interface): 874 897 """ 875 898 Implement me to be able to use L{IFileDescriptor} type resources. … … 1899 1922 @return: a client endpoint 1900 1923 @rtype: L{IStreamClientEndpoint} 1901 1924 """ 1902 -
twisted/internet/test/test_fdset.py
286 286 reactor = self.buildReactor() 287 287 288 288 name = reactor.__class__.__name__ 289 if name in ('EPollReactor', ' CFReactor'):289 if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): 290 290 # Closing a file descriptor immediately removes it from the epoll 291 291 # set without generating a notification. That means epollreactor 292 292 # will not call any methods on Victim after the close, so there's -
twisted/internet/kqreactor.py
4 4 """ 5 5 A kqueue()/kevent() based implementation of the Twisted main loop. 6 6 7 To install the event loop (and you should do this before any connections, 8 listeners or connectors are added):: 7 To use this reactor, start your application specifying the kqueue reactor: 9 8 10 | from twisted.internet import kqreactor 11 | kqreactor.install() 9 twistd ... --reactor kqueue 12 10 13 T his reactor only works on FreeBSD and requires PyKQueue 1.3, which is14 available at: U{http://people.freebsd.org/~dwhite/PyKQueue/} 11 To install the event loop from code (and you should do this before any 12 connections, listeners or connectors are added):: 15 13 14 from twisted.internet import kqreactor 15 kqreactor.install() 16 16 17 This implementation depends on Python 2.6 or higher which has kqueue support 18 built in the select module. 17 19 18 You're going to need to patch PyKqueue:: 20 Note, that you should use Python 2.6.5 or higher, since previous implementations 21 of select.kqueue had 19 22 20 ===================================================== 21 --- PyKQueue-1.3/kqsyscallmodule.c Sun Jan 28 21:59:50 2001 22 +++ PyKQueue-1.3/kqsyscallmodule.c.new Tue Jul 30 18:06:08 2002 23 @@ -137,7 +137,7 @@ 24 } 25 26 statichere PyTypeObject KQEvent_Type = { 27 - PyObject_HEAD_INIT(NULL) 28 + PyObject_HEAD_INIT(&PyType_Type) 29 0, // ob_size 30 "KQEvent", // tp_name 31 sizeof(KQEventObject), // tp_basicsize 32 @@ -291,13 +291,14 @@ 33 34 /* Build timespec for timeout */ 35 totimespec.tv_sec = timeout / 1000; 36 - totimespec.tv_nsec = (timeout % 1000) * 100000; 37 + totimespec.tv_nsec = (timeout % 1000) * 1000000; 38 39 // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 40 41 /* Make the call */ 42 - 43 + Py_BEGIN_ALLOW_THREADS 44 gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 45 + Py_END_ALLOW_THREADS 46 47 /* Don't need the input event list anymore, so get rid of it */ 48 free (changelist); 49 @@ -361,7 +362,7 @@ 50 statichere PyTypeObject KQueue_Type = { 51 /* The ob_type field must be initialized in the module init function 52 * to be portable to Windows without using C++. */ 53 - PyObject_HEAD_INIT(NULL) 54 + PyObject_HEAD_INIT(&PyType_Type) 55 0, /*ob_size*/ 56 "KQueue", /*tp_name*/ 57 sizeof(KQueueObject), /*tp_basicsize*/ 23 http://bugs.python.org/issue5910 58 24 25 not yet fixed. 59 26 """ 60 27 61 import errno , sys28 import errno 62 29 63 30 from zope.interface import implements 64 31 65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD 66 from kqsyscall import kqueue, kevent 32 from select import kqueue, kevent 33 from select import KQ_FILTER_READ, KQ_FILTER_WRITE, \ 34 KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF 67 35 68 from twisted.internet.interfaces import IReactorFDSet 36 from twisted.internet.interfaces import IReactorFDSet, IReactorDaemonize 69 37 70 38 from twisted.python import log, failure 71 39 from twisted.internet import main, posixbase … … 73 41 74 42 class KQueueReactor(posixbase.PosixReactorBase): 75 43 """ 76 A reactor that uses kqueue(2)/kevent(2). 44 A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher 45 which has built in support for kqueue in the select module. 77 46 78 47 @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. 79 48 … … 95 64 dispatched to the corresponding L{FileDescriptor} instances in 96 65 C{_selectables}. 97 66 """ 98 implements(IReactorFDSet )67 implements(IReactorFDSet, IReactorDaemonize) 99 68 69 100 70 def __init__(self): 101 71 """ 102 72 Initialize kqueue object, file descriptor tracking dictionaries, and the 103 73 base class. 74 75 See: 76 - http://docs.python.org/library/select.html 77 - www.freebsd.org/cgi/man.cgi?query=kqueue 78 - people.freebsd.org/~jlemon/papers/kqueue.pdf 104 79 """ 105 80 self._kq = kqueue() 106 81 self._reads = {} … … 109 84 posixbase.PosixReactorBase.__init__(self) 110 85 111 86 112 def _updateRegistration(self, *args): 113 self._kq.kevent([kevent(*args)], 0, 0) 87 def _updateRegistration(self, fd, filter, op): 88 """ 89 Private method for changing kqueue registration on a given FD 90 filtering for events given filter/op. This will never block and 91 returns nothing. 92 """ 93 self._kq.control([kevent(fd, filter, op)], 0, 0) 114 94 95 96 def beforeDaemonize(self): 97 """ 98 Implement L{IReactorDaemonize.beforeDaemonize}. 99 """ 100 # Twisted-internal method called during daemonization (when application 101 # is started via twistd). This is called right before the magic double 102 # forking done for daemonization. We cleanly close the kqueue() and later 103 # recreate it. This is needed since a) kqueue() are not inherited across 104 # forks and b) twistd will create the reactor already before daemonization 105 # (and will also add at least 1 reader to the reactor, an instance of 106 # twisted.internet.posixbase._UnixWaker). 107 # 108 # See: twisted.scripts._twistd_unix.daemonize() 109 self._kq.close() 110 self._kq = None 111 112 113 def afterDaemonize(self): 114 """ 115 Implement L{IReactorDaemonize.afterDaemonize}. 116 """ 117 # Twisted-internal method called during daemonization. This is called right 118 # after daemonization and recreates the kqueue() and any readers/writers 119 # that were added before. Note that you MUST NOT call any reactor methods 120 # in between beforeDaemonize() and afterDaemonize()! 121 self._kq = kqueue() 122 for fd in self._reads: 123 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 124 for fd in self._writes: 125 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 126 127 115 128 def addReader(self, reader): 116 """Add a FileDescriptor for notification of data available to read.117 129 """ 130 Implement L{IReactorFDSet.addReader}. 131 """ 118 132 fd = reader.fileno() 119 133 if fd not in self._reads: 120 self._selectables[fd] = reader 121 self._reads[fd] = 1 122 self._updateRegistration(fd, EVFILT_READ, EV_ADD) 134 try: 135 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 136 except OSError: 137 pass 138 finally: 139 self._selectables[fd] = reader 140 self._reads[fd] = 1 123 141 142 124 143 def addWriter(self, writer): 125 """Add a FileDescriptor for notification of data available to write.126 144 """ 145 Implement L{IReactorFDSet.addWriter}. 146 """ 127 147 fd = writer.fileno() 128 148 if fd not in self._writes: 129 self._selectables[fd] = writer 130 self._writes[fd] = 1 131 self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 149 try: 150 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 151 except OSError: 152 pass 153 finally: 154 self._selectables[fd] = writer 155 self._writes[fd] = 1 132 156 157 133 158 def removeReader(self, reader): 134 """Remove a Selectable for notification of data available to read.135 159 """ 136 fd = reader.fileno() 160 Implement L{IReactorFDSet.removeReader}. 161 """ 162 wasLost = False 163 try: 164 fd = reader.fileno() 165 except: 166 fd = -1 167 if fd == -1: 168 for fd, fdes in self._selectables.items(): 169 if reader is fdes: 170 wasLost = True 171 break 172 else: 173 return 137 174 if fd in self._reads: 138 175 del self._reads[fd] 139 176 if fd not in self._writes: 140 177 del self._selectables[fd] 141 self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 178 if not wasLost: 179 try: 180 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) 181 except OSError: 182 pass 142 183 184 143 185 def removeWriter(self, writer): 144 """Remove a Selectable for notification of data available to write.145 186 """ 146 fd = writer.fileno() 187 Implement L{IReactorFDSet.removeWriter}. 188 """ 189 wasLost = False 190 try: 191 fd = writer.fileno() 192 except: 193 fd = -1 194 if fd == -1: 195 for fd, fdes in self._selectables.items(): 196 if writer is fdes: 197 wasLost = True 198 break 199 else: 200 return 147 201 if fd in self._writes: 148 202 del self._writes[fd] 149 203 if fd not in self._reads: 150 204 del self._selectables[fd] 151 self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 205 if not wasLost: 206 try: 207 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) 208 except OSError: 209 pass 152 210 211 153 212 def removeAll(self): 154 213 """ 155 Remove all selectables, and return a list of them.214 Implement L{IReactorFDSet.removeAll}. 156 215 """ 157 216 return self._removeAll( 158 217 [self._selectables[fd] for fd in self._reads], … … 160 219 161 220 162 221 def getReaders(self): 222 """ 223 Implement L{IReactorFDSet.getReaders}. 224 """ 163 225 return [self._selectables[fd] for fd in self._reads] 164 226 165 227 166 228 def getWriters(self): 229 """ 230 Implement L{IReactorFDSet.getWriters}. 231 """ 167 232 return [self._selectables[fd] for fd in self._writes] 168 233 169 234 170 235 def doKEvent(self, timeout): 171 """Poll the kqueue for new events.""" 236 """ 237 Poll the kqueue for new events. 238 """ 172 239 if timeout is None: 173 timeout = 1000 174 else: 175 timeout = int(timeout * 1000) # convert seconds to milliseconds 240 timeout = 1 176 241 177 242 try: 178 l = self._kq. kevent([], len(self._selectables), timeout)243 l = self._kq.control([], len(self._selectables), timeout) 179 244 except OSError, e: 180 245 if e[0] == errno.EINTR: 181 246 return 182 247 else: 183 248 raise 249 184 250 _drdw = self._doWriteOrRead 185 251 for event in l: 186 why = None 187 fd, filter = event.ident, event.filter 252 fd = event.ident 188 253 try: 189 254 selectable = self._selectables[fd] 190 255 except KeyError: 191 256 # Handles the infrequent case where one selectable's 192 257 # handler disconnects another. 193 258 continue 194 log.callWithLogger(selectable, _drdw, selectable, fd, filter) 259 else: 260 log.callWithLogger(selectable, _drdw, selectable, fd, event) 195 261 196 def _doWriteOrRead(self, selectable, fd, filter):197 try:198 if filter == EVFILT_READ:199 why = selectable.doRead()200 if filter == EVFILT_WRITE:201 why = selectable.doWrite()202 if not selectable.fileno() == fd:203 why = main.CONNECTION_LOST204 except:205 why = sys.exc_info()[1]206 log.deferr()207 262 263 def _doWriteOrRead(self, selectable, fd, event): 264 """ 265 Private method called when a FD is ready for reading, writing or was 266 lost. Do the work and raise errors where necessary. 267 """ 268 why = None 269 inRead = False 270 (filter, flags, data, fflags) = ( 271 event.filter, event.flags, event.data, event.fflags) 272 273 if flags & KQ_EV_EOF and data and fflags: 274 why = main.CONNECTION_LOST 275 else: 276 try: 277 if selectable.fileno() == -1: 278 inRead = False 279 why = posixbase._NO_FILEDESC 280 else: 281 if filter == KQ_FILTER_READ: 282 inRead = True 283 why = selectable.doRead() 284 if filter == KQ_FILTER_WRITE: 285 inRead = False 286 why = selectable.doWrite() 287 except: 288 # Any exception from application code gets logged and will 289 # cause us to disconnect the selectable. 290 why = failure.Failure() 291 log.err(why) 292 208 293 if why: 209 self.removeReader(selectable) 210 self.removeWriter(selectable) 211 selectable.connectionLost(failure.Failure(why)) 294 self._disconnectSelectable(selectable, why, inRead) 212 295 213 296 doIteration = doKEvent 214 297 215 298 216 299 def install(): 217 k = KQueueReactor() 218 main.installReactor(k) 300 """ 301 Install the kqueue() reactor. 302 """ 303 p = KQueueReactor() 304 from twisted.internet.main import installReactor 305 installReactor(p) 219 306 220 307 221 308 __all__ = ["KQueueReactor", "install"]
