Ticket #1918: kqueue.7.patch
| File kqueue.7.patch, 20.9 KB (added by oberstet, 16 months ago) |
|---|
-
twisted/test/test_twistd.py
23 23 from zope.interface.verify import verifyObject 24 24 25 25 from twisted.trial import unittest 26 from twisted.test.test_process import MockOS 26 27 27 28 from twisted import plugin 28 29 from twisted.application.service import IServiceMaker … … 34 35 from twisted.python.versions import Version 35 36 from twisted.python.components import Componentized 36 37 from twisted.internet.defer import Deferred 38 from twisted.internet.interfaces import IReactorDaemonize 37 39 from twisted.python.fakepwd import UserDatabase 38 40 39 41 try: … … 616 618 self.runner = UnixApplicationRunner({}) 617 619 618 620 619 def daemonize(self ):621 def daemonize(self, daemonize_reactor, daemonize_os): 620 622 """ 621 623 Indicate that daemonization has happened and change the PID so that the 622 624 value written to the pidfile can be tested in the daemonization case. … … 806 808 807 809 808 810 811 class FakeNonDaemonizingReactor(object): 812 """ 813 A dummy reactor, providing C{beforeDaemonize} and C{afterDaemonize} methods, 814 but not announcing this, and logging whether the methods have been called. 815 816 @ivar _beforeDaemonizeCalled: if C{beforeDaemonize} has been called or not. 817 @type _beforeDaemonizeCalled: C{bool} 818 @ivar _afterDaemonizeCalled: if C{afterDaemonize} has been called or not. 819 @type _afterDaemonizeCalled: C{bool} 820 """ 821 822 def __init__(self): 823 self._beforeDaemonizeCalled = False 824 self._afterDaemonizeCalled = False 825 826 def beforeDaemonize(self): 827 self._beforeDaemonizeCalled = True 828 829 def afterDaemonize(self): 830 self._afterDaemonizeCalled = True 831 832 833 834 class FakeDaemonizingReactor(FakeNonDaemonizingReactor): 835 """ 836 A dummy reactor, providing C{beforeDaemonize} and C{afterDaemonize} methods, 837 announcing this, and logging whether the methods have been called. 838 """ 839 840 implements(IReactorDaemonize) 841 842 843 844 class ReactorDaemonizationTests(unittest.TestCase): 845 """ 846 Tests for L{_twistd_unix.daemonize} and L{IReactorDaemonize}. 847 """ 848 if _twistd_unix is None: 849 skip = "twistd unix not available" 850 851 852 def test_daemonizationHooksCalled(self): 853 """ 854 L{_twistd_unix.daemonize} indeed calls 855 L{IReactorDaemonize.beforeDaemonize} and 856 L{IReactorDaemonize.afterDaemonize} if the reactor implements 857 L{IReactorDaemonize}. 858 """ 859 reactor = FakeDaemonizingReactor() 860 os = MockOS() 861 _twistd_unix.daemonize(reactor, os) 862 msg = "At least one reactor daemonization hook WAS NOT called" 863 self.assertTrue(reactor._beforeDaemonizeCalled and \ 864 reactor._afterDaemonizeCalled, msg = msg) 865 866 867 def test_daemonizationHooksNotCalled(self): 868 """ 869 L{_twistd_unix.daemonize} does NOT call 870 L{IReactorDaemonize.beforeDaemonize} or 871 L{IReactorDaemonize.afterDaemonize} if the reactor does NOT 872 implement L{IReactorDaemonize}. 873 """ 874 reactor = FakeNonDaemonizingReactor() 875 os = MockOS() 876 _twistd_unix.daemonize(reactor, os) 877 msg = "At least one reactor daemonization hook WAS called" 878 self.assertTrue(not reactor._beforeDaemonizeCalled and \ 879 not reactor._afterDaemonizeCalled, msg = msg) 880 881 882 809 883 class DummyReactor(object): 810 884 """ 811 885 A dummy reactor, only providing a C{run} method and checking that it -
twisted/topfiles/1918.feature
1 The kqueue reactor has been revived. -
twisted/scripts/_twistd_unix.py
7 7 from twisted.python import log, syslog, logfile, usage 8 8 from twisted.python.util import switchUID, uidFromString, gidFromString 9 9 from twisted.application import app, service 10 from twisted.internet.interfaces import IReactorDaemonize 10 11 from twisted import copyright 11 12 12 13 … … 155 156 156 157 157 158 158 def daemonize(): 159 # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16 159 def daemonize(reactor, os): 160 """ 161 Daemonizes the application on Unix. This is done by the usual double 162 forking approach. 163 164 See: 165 [1] http://code.activestate.com/recipes/278731/ 166 [2] W. Richard Stevens, "Advanced Programming in the Unix Environment", 167 1992, Addison-Wesley, ISBN 0-201-56317-7 168 169 @type reactor: C{reactor} 170 @param reactor: The reactor in use. 171 @type os: C{os} 172 @param os: The 'os' module to use for daemonization. 173 """ 174 175 ## If the reactor requires hooks to be called for daemonization, call them. 176 ## Currently the only reactor which provides/needs that is KQueueReactor. 177 if IReactorDaemonize.providedBy(reactor): 178 reactor.beforeDaemonize() 179 160 180 if os.fork(): # launch child and... 161 181 os._exit(0) # kill off parent 162 182 os.setsid() … … 171 191 raise 172 192 os.close(null) 173 193 194 if IReactorDaemonize.providedBy(reactor): 195 reactor.afterDaemonize() 174 196 175 197 198 176 199 def launchWithName(name): 177 200 if name and name != sys.argv[0]: 178 201 exe = os.path.realpath(sys.executable) … … 265 288 if umask is not None: 266 289 os.umask(umask) 267 290 if daemon: 268 daemonize() 291 from twisted.internet import reactor 292 daemonize(reactor, os) 269 293 if pidfile: 270 294 f = open(pidfile,'wb') 271 295 f.write(str(os.getpid())) -
twisted/internet/interfaces.py
872 872 """ 873 873 874 874 875 class IReactorDaemonize(Interface): 876 """ 877 A reactor which provides hooks that need to be called before and after 878 daemonization. 879 880 Notes: 881 - This interface SHOULD NOT be called by applications. 882 - This interface should only be implemented by reactors as a workaround 883 (in particular, it's implemented currently only by kqueue()). 884 For details please see the comments on ticket #1918. 885 """ 886 887 def beforeDaemonize(): 888 """ 889 Hook to be called immediately before daemonization. No reactor methods 890 must be called until L{afterDaemonize} is called. 891 892 @return: C{None}. 893 """ 894 895 def afterDaemonize(): 896 """ 897 Hook to be called immediately after daemonization. This must only be 898 called after L{beforeDaemonize} had been called previously. 899 900 @return: C{None}. 901 """ 902 903 875 904 class IReactorFDSet(Interface): 876 905 """ 877 906 Implement me to be able to use L{IFileDescriptor} type resources. … … 1901 1930 @return: a client endpoint 1902 1931 @rtype: L{IStreamClientEndpoint} 1903 1932 """ 1904 -
twisted/internet/test/test_fdset.py
286 286 reactor = self.buildReactor() 287 287 288 288 name = reactor.__class__.__name__ 289 if name in ('EPollReactor', ' CFReactor'):289 if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): 290 290 # Closing a file descriptor immediately removes it from the epoll 291 291 # set without generating a notification. That means epollreactor 292 292 # will not call any methods on Victim after the close, so there's -
twisted/internet/kqreactor.py
4 4 """ 5 5 A kqueue()/kevent() based implementation of the Twisted main loop. 6 6 7 To install the event loop (and you should do this before any connections, 8 listeners or connectors are added):: 7 To use this reactor, start your application specifying the kqueue reactor: 9 8 10 | from twisted.internet import kqreactor 11 | kqreactor.install() 9 twistd ... --reactor kqueue 12 10 13 T his reactor only works on FreeBSD and requires PyKQueue 1.3, which is14 available at: U{http://people.freebsd.org/~dwhite/PyKQueue/} 11 To install the event loop from code (and you should do this before any 12 connections, listeners or connectors are added):: 15 13 14 from twisted.internet import kqreactor 15 kqreactor.install() 16 16 17 This implementation depends on Python 2.6 or higher which has kqueue support 18 built in the select module. 17 19 18 You're going to need to patch PyKqueue:: 20 Note, that you should use Python 2.6.5 or higher, since previous implementations 21 of select.kqueue had 19 22 20 ===================================================== 21 --- PyKQueue-1.3/kqsyscallmodule.c Sun Jan 28 21:59:50 2001 22 +++ PyKQueue-1.3/kqsyscallmodule.c.new Tue Jul 30 18:06:08 2002 23 @@ -137,7 +137,7 @@ 24 } 25 26 statichere PyTypeObject KQEvent_Type = { 27 - PyObject_HEAD_INIT(NULL) 28 + PyObject_HEAD_INIT(&PyType_Type) 29 0, // ob_size 30 "KQEvent", // tp_name 31 sizeof(KQEventObject), // tp_basicsize 32 @@ -291,13 +291,14 @@ 33 34 /* Build timespec for timeout */ 35 totimespec.tv_sec = timeout / 1000; 36 - totimespec.tv_nsec = (timeout % 1000) * 100000; 37 + totimespec.tv_nsec = (timeout % 1000) * 1000000; 38 39 // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); 40 41 /* Make the call */ 42 - 43 + Py_BEGIN_ALLOW_THREADS 44 gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); 45 + Py_END_ALLOW_THREADS 46 47 /* Don't need the input event list anymore, so get rid of it */ 48 free (changelist); 49 @@ -361,7 +362,7 @@ 50 statichere PyTypeObject KQueue_Type = { 51 /* The ob_type field must be initialized in the module init function 52 * to be portable to Windows without using C++. */ 53 - PyObject_HEAD_INIT(NULL) 54 + PyObject_HEAD_INIT(&PyType_Type) 55 0, /*ob_size*/ 56 "KQueue", /*tp_name*/ 57 sizeof(KQueueObject), /*tp_basicsize*/ 23 http://bugs.python.org/issue5910 58 24 25 not yet fixed. 59 26 """ 60 27 61 import errno , sys28 import errno 62 29 63 30 from zope.interface import implements 64 31 65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD 66 from kqsyscall import kqueue, kevent 32 from select import kqueue, kevent 33 from select import KQ_FILTER_READ, KQ_FILTER_WRITE 34 from select import KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF 67 35 68 from twisted.internet.interfaces import IReactorFDSet 36 from twisted.internet.interfaces import IReactorFDSet, IReactorDaemonize 69 37 70 38 from twisted.python import log, failure 71 39 from twisted.internet import main, posixbase … … 73 41 74 42 class KQueueReactor(posixbase.PosixReactorBase): 75 43 """ 76 A reactor that uses kqueue(2)/kevent(2). 44 A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher 45 which has built in support for kqueue in the select module. 77 46 78 47 @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. 79 48 … … 95 64 dispatched to the corresponding L{FileDescriptor} instances in 96 65 C{_selectables}. 97 66 """ 98 implements(IReactorFDSet )67 implements(IReactorFDSet, IReactorDaemonize) 99 68 69 100 70 def __init__(self): 101 71 """ 102 72 Initialize kqueue object, file descriptor tracking dictionaries, and the 103 73 base class. 74 75 See: 76 - http://docs.python.org/library/select.html 77 - www.freebsd.org/cgi/man.cgi?query=kqueue 78 - people.freebsd.org/~jlemon/papers/kqueue.pdf 104 79 """ 105 80 self._kq = kqueue() 106 81 self._reads = {} … … 109 84 posixbase.PosixReactorBase.__init__(self) 110 85 111 86 112 def _updateRegistration(self, *args): 113 self._kq.kevent([kevent(*args)], 0, 0) 87 def _updateRegistration(self, fd, filter, op): 88 """ 89 Private method for changing kqueue registration on a given FD 90 filtering for events given filter/op. This will never block and 91 returns nothing. 92 """ 93 self._kq.control([kevent(fd, filter, op)], 0, 0) 114 94 95 96 def beforeDaemonize(self): 97 """ 98 Implement L{IReactorDaemonize.beforeDaemonize}. 99 """ 100 # Twisted-internal method called during daemonization (when application 101 # is started via twistd). This is called right before the magic double 102 # forking done for daemonization. We cleanly close the kqueue() and later 103 # recreate it. This is needed since a) kqueue() are not inherited across 104 # forks and b) twistd will create the reactor already before daemonization 105 # (and will also add at least 1 reader to the reactor, an instance of 106 # twisted.internet.posixbase._UnixWaker). 107 # 108 # See: twisted.scripts._twistd_unix.daemonize() 109 self._kq.close() 110 self._kq = None 111 112 113 def afterDaemonize(self): 114 """ 115 Implement L{IReactorDaemonize.afterDaemonize}. 116 """ 117 # Twisted-internal method called during daemonization. This is called right 118 # after daemonization and recreates the kqueue() and any readers/writers 119 # that were added before. Note that you MUST NOT call any reactor methods 120 # in between beforeDaemonize() and afterDaemonize()! 121 self._kq = kqueue() 122 for fd in self._reads: 123 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 124 for fd in self._writes: 125 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 126 127 115 128 def addReader(self, reader): 116 """Add a FileDescriptor for notification of data available to read.117 129 """ 130 Implement L{IReactorFDSet.addReader}. 131 """ 118 132 fd = reader.fileno() 119 133 if fd not in self._reads: 120 self._selectables[fd] = reader 121 self._reads[fd] = 1 122 self._updateRegistration(fd, EVFILT_READ, EV_ADD) 134 try: 135 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) 136 except OSError: 137 pass 138 finally: 139 self._selectables[fd] = reader 140 self._reads[fd] = 1 123 141 142 124 143 def addWriter(self, writer): 125 """Add a FileDescriptor for notification of data available to write.126 144 """ 145 Implement L{IReactorFDSet.addWriter}. 146 """ 127 147 fd = writer.fileno() 128 148 if fd not in self._writes: 129 self._selectables[fd] = writer 130 self._writes[fd] = 1 131 self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) 149 try: 150 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) 151 except OSError: 152 pass 153 finally: 154 self._selectables[fd] = writer 155 self._writes[fd] = 1 132 156 157 133 158 def removeReader(self, reader): 134 """Remove a Selectable for notification of data available to read.135 159 """ 136 fd = reader.fileno() 160 Implement L{IReactorFDSet.removeReader}. 161 """ 162 wasLost = False 163 try: 164 fd = reader.fileno() 165 except: 166 fd = -1 167 if fd == -1: 168 for fd, fdes in self._selectables.items(): 169 if reader is fdes: 170 wasLost = True 171 break 172 else: 173 return 137 174 if fd in self._reads: 138 175 del self._reads[fd] 139 176 if fd not in self._writes: 140 177 del self._selectables[fd] 141 self._updateRegistration(fd, EVFILT_READ, EV_DELETE) 178 if not wasLost: 179 try: 180 self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) 181 except OSError: 182 pass 142 183 184 143 185 def removeWriter(self, writer): 144 """Remove a Selectable for notification of data available to write.145 186 """ 146 fd = writer.fileno() 187 Implement L{IReactorFDSet.removeWriter}. 188 """ 189 wasLost = False 190 try: 191 fd = writer.fileno() 192 except: 193 fd = -1 194 if fd == -1: 195 for fd, fdes in self._selectables.items(): 196 if writer is fdes: 197 wasLost = True 198 break 199 else: 200 return 147 201 if fd in self._writes: 148 202 del self._writes[fd] 149 203 if fd not in self._reads: 150 204 del self._selectables[fd] 151 self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) 205 if not wasLost: 206 try: 207 self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) 208 except OSError: 209 pass 152 210 211 153 212 def removeAll(self): 154 213 """ 155 Remove all selectables, and return a list of them.214 Implement L{IReactorFDSet.removeAll}. 156 215 """ 157 216 return self._removeAll( 158 217 [self._selectables[fd] for fd in self._reads], … … 160 219 161 220 162 221 def getReaders(self): 222 """ 223 Implement L{IReactorFDSet.getReaders}. 224 """ 163 225 return [self._selectables[fd] for fd in self._reads] 164 226 165 227 166 228 def getWriters(self): 229 """ 230 Implement L{IReactorFDSet.getWriters}. 231 """ 167 232 return [self._selectables[fd] for fd in self._writes] 168 233 169 234 170 235 def doKEvent(self, timeout): 171 """Poll the kqueue for new events.""" 236 """ 237 Poll the kqueue for new events. 238 """ 172 239 if timeout is None: 173 timeout = 1000 174 else: 175 timeout = int(timeout * 1000) # convert seconds to milliseconds 240 timeout = 1 176 241 177 242 try: 178 l = self._kq. kevent([], len(self._selectables), timeout)243 l = self._kq.control([], len(self._selectables), timeout) 179 244 except OSError, e: 180 245 if e[0] == errno.EINTR: 181 246 return 182 247 else: 183 248 raise 249 184 250 _drdw = self._doWriteOrRead 185 251 for event in l: 186 why = None 187 fd, filter = event.ident, event.filter 252 fd = event.ident 188 253 try: 189 254 selectable = self._selectables[fd] 190 255 except KeyError: 191 256 # Handles the infrequent case where one selectable's 192 257 # handler disconnects another. 193 258 continue 194 log.callWithLogger(selectable, _drdw, selectable, fd, filter) 259 else: 260 log.callWithLogger(selectable, _drdw, selectable, fd, event) 195 261 196 def _doWriteOrRead(self, selectable, fd, filter):197 try:198 if filter == EVFILT_READ:199 why = selectable.doRead()200 if filter == EVFILT_WRITE:201 why = selectable.doWrite()202 if not selectable.fileno() == fd:203 why = main.CONNECTION_LOST204 except:205 why = sys.exc_info()[1]206 log.deferr()207 262 263 def _doWriteOrRead(self, selectable, fd, event): 264 """ 265 Private method called when a FD is ready for reading, writing or was 266 lost. Do the work and raise errors where necessary. 267 """ 268 why = None 269 inRead = False 270 (filter, flags, data, fflags) = ( 271 event.filter, event.flags, event.data, event.fflags) 272 273 if flags & KQ_EV_EOF and data and fflags: 274 why = main.CONNECTION_LOST 275 else: 276 try: 277 if selectable.fileno() == -1: 278 inRead = False 279 why = posixbase._NO_FILEDESC 280 else: 281 if filter == KQ_FILTER_READ: 282 inRead = True 283 why = selectable.doRead() 284 if filter == KQ_FILTER_WRITE: 285 inRead = False 286 why = selectable.doWrite() 287 except: 288 # Any exception from application code gets logged and will 289 # cause us to disconnect the selectable. 290 why = failure.Failure() 291 log.err(why, "An exception was raised from application code" \ 292 " while processing a reactor selectable") 293 208 294 if why: 209 self.removeReader(selectable) 210 self.removeWriter(selectable) 211 selectable.connectionLost(failure.Failure(why)) 295 self._disconnectSelectable(selectable, why, inRead) 212 296 213 297 doIteration = doKEvent 214 298 215 299 216 300 def install(): 217 k = KQueueReactor() 218 main.installReactor(k) 301 """ 302 Install the kqueue() reactor. 303 """ 304 p = KQueueReactor() 305 from twisted.internet.main import installReactor 306 installReactor(p) 219 307 220 308 221 309 __all__ = ["KQueueReactor", "install"]
