root / trunk / twisted / internet / kqreactor.py

Revision 26118, 7.2 kB (checked in by exarkun, 5 months ago)

Merge internal-fd-tracking-3602

Author: exarkun, itamarst
Reviewer: therve
Fixes: #3602

Refactor the reactor's internal tracking of file descriptors, particular
with respect to "internal" file descriptors which are created and used by
the reactor itself, not by applications.

Line 
1 # Copyright (c) 2001-2009 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 A kqueue()/kevent() based implementation of the Twisted main loop.
6
7 To install the event loop (and you should do this before any connections,
8 listeners or connectors are added)::
9
10     | from twisted.internet import kqreactor
11     | kqreactor.install()
12
13 This reactor only works on FreeBSD and requires PyKQueue 1.3, which is
14 available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/}
15
16
17
18 You're going to need to patch PyKqueue::
19
20     =====================================================
21     --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001
22     +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002
23     @@ -137,7 +137,7 @@
24      }
25     
26      statichere PyTypeObject KQEvent_Type = {
27     -  PyObject_HEAD_INIT(NULL)
28     +  PyObject_HEAD_INIT(&PyType_Type)
29        0,                             // ob_size
30        "KQEvent",                     // tp_name
31        sizeof(KQEventObject),         // tp_basicsize
32     @@ -291,13 +291,14 @@
33     
34        /* Build timespec for timeout */
35        totimespec.tv_sec = timeout / 1000;
36     -  totimespec.tv_nsec = (timeout % 1000) * 100000;
37     +  totimespec.tv_nsec = (timeout % 1000) * 1000000;
38     
39        // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec);
40     
41        /* Make the call */
42     -
43     +  Py_BEGIN_ALLOW_THREADS
44        gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec);
45     +  Py_END_ALLOW_THREADS
46     
47        /* Don't need the input event list anymore, so get rid of it */
48        free (changelist);
49     @@ -361,7 +362,7 @@
50      statichere PyTypeObject KQueue_Type = {
51             /* The ob_type field must be initialized in the module init function
52              * to be portable to Windows without using C++. */
53     -   PyObject_HEAD_INIT(NULL)
54     +   PyObject_HEAD_INIT(&PyType_Type)
55             0,                  /*ob_size*/
56             "KQueue",                   /*tp_name*/
57             sizeof(KQueueObject),       /*tp_basicsize*/
58
59 """
60
61 import errno, sys
62
63 from zope.interface import implements
64
65 from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD
66 from kqsyscall import kqueue, kevent
67
68 from twisted.internet.interfaces import IReactorFDSet
69
70 from twisted.python import log, failure
71 from twisted.internet import main, posixbase
72
73
74 class KQueueReactor(posixbase.PosixReactorBase):
75     """
76     A reactor that uses kqueue(2)/kevent(2).
77
78     @ivar _kq: A L{kqueue} which will be used to check for I/O readiness.
79
80     @ivar _selectables: A dictionary mapping integer file descriptors to
81         instances of L{FileDescriptor} which have been registered with the
82         reactor.  All L{FileDescriptors} which are currently receiving read or
83         write readiness notifications will be present as values in this
84         dictionary.
85
86     @ivar _reads: A dictionary mapping integer file descriptors to arbitrary
87         values (this is essentially a set).  Keys in this dictionary will be
88         registered with C{_kq} for read readiness notifications which will be
89         dispatched to the corresponding L{FileDescriptor} instances in
90         C{_selectables}.
91
92     @ivar _writes: A dictionary mapping integer file descriptors to arbitrary
93         values (this is essentially a set).  Keys in this dictionary will be
94         registered with C{_kq} for write readiness notifications which will be
95         dispatched to the corresponding L{FileDescriptor} instances in
96         C{_selectables}.
97     """
98     implements(IReactorFDSet)
99
100     def __init__(self):
101         """
102         Initialize kqueue object, file descriptor tracking dictionaries, and the
103         base class.
104         """
105         self._kq = kqueue()
106         self._reads = {}
107         self._writes = {}
108         self._selectables = {}
109         posixbase.PosixReactorBase.__init__(self)
110
111
112     def _updateRegistration(self, *args):
113         self._kq.kevent([kevent(*args)], 0, 0)
114
115     def addReader(self, reader):
116         """Add a FileDescriptor for notification of data available to read.
117         """
118         fd = reader.fileno()
119         if fd not in self._reads:
120             self._selectables[fd] = reader
121             self._reads[fd] = 1
122             self._updateRegistration(fd, EVFILT_READ, EV_ADD)
123
124     def addWriter(self, writer):
125         """Add a FileDescriptor for notification of data available to write.
126         """
127         fd = writer.fileno()
128         if fd not in self._writes:
129             self._selectables[fd] = writer
130             self._writes[fd] = 1
131             self._updateRegistration(fd, EVFILT_WRITE, EV_ADD)
132
133     def removeReader(self, reader):
134         """Remove a Selectable for notification of data available to read.
135         """
136         fd = reader.fileno()
137         if fd in self._reads:
138             del self._reads[fd]
139             if fd not in self._writes:
140                 del self._selectables[fd]
141             self._updateRegistration(fd, EVFILT_READ, EV_DELETE)
142
143     def removeWriter(self, writer):
144         """Remove a Selectable for notification of data available to write.
145         """
146         fd = writer.fileno()
147         if fd in self._writes:
148             del self._writes[fd]
149             if fd not in self._reads:
150                 del self._selectables[fd]
151             self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE)
152
153     def removeAll(self):
154         """
155         Remove all selectables, and return a list of them.
156         """
157         return self._removeAll(
158             [self._selectables[fd] for fd in self._reads],
159             [self._selectables[fd] for fd in self._writes])
160
161
162     def getReaders(self):
163         return [self._selectables[fd] for fd in self._reads]
164
165
166     def getWriters(self):
167         return [self._selectables[fd] for fd in self._writes]
168
169
170     def doKEvent(self, timeout):
171         """Poll the kqueue for new events."""
172         if timeout is None:
173             timeout = 1000
174         else:
175             timeout = int(timeout * 1000) # convert seconds to milliseconds
176
177         try:
178             l = self._kq.kevent([], len(self._selectables), timeout)
179         except OSError, e:
180             if e[0] == errno.EINTR:
181                 return
182             else:
183                 raise
184         _drdw = self._doWriteOrRead
185         for event in l:
186             why = None
187             fd, filter = event.ident, event.filter
188             try:
189                 selectable = self._selectables[fd]
190             except KeyError:
191                 # Handles the infrequent case where one selectable's
192                 # handler disconnects another.
193                 continue
194             log.callWithLogger(selectable, _drdw, selectable, fd, filter)
195
196     def _doWriteOrRead(self, selectable, fd, filter):
197         try:
198             if filter == EVFILT_READ:
199                 why = selectable.doRead()
200             if filter == EVFILT_WRITE:
201                 why = selectable.doWrite()
202             if not selectable.fileno() == fd:
203                 why = main.CONNECTION_LOST
204         except:
205             why = sys.exc_info()[1]
206             log.deferr()
207
208         if why:
209             self.removeReader(selectable)
210             self.removeWriter(selectable)
211             selectable.connectionLost(failure.Failure(why))
212
213     doIteration = doKEvent
214
215
216 def install():
217     k = KQueueReactor()
218     main.installReactor(k)
219
220
221 __all__ = ["KQueueReactor", "install"]
Note: See TracBrowser for help on using the browser.