root / trunk / twisted / internet / epollreactor.py

Revision 26118, 7.7 kB (checked in by exarkun, 5 months ago)

Merge internal-fd-tracking-3602

Author: exarkun, itamarst
Reviewer: therve
Fixes: #3602

Refactor the reactor's internal tracking of file descriptors, particular
with respect to "internal" file descriptors which are created and used by
the reactor itself, not by applications.

Line 
1 # Copyright (c) 2001-2009 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 An epoll() based implementation of the twisted main loop.
6
7 To install the event loop (and you should do this before any connections,
8 listeners or connectors are added)::
9
10     from twisted.internet import epollreactor
11     epollreactor.install()
12 """
13
14 import sys, errno
15
16 from zope.interface import implements
17
18 from twisted.internet.interfaces import IReactorFDSet
19
20 from twisted.python import _epoll
21 from twisted.python import log
22 from twisted.internet import posixbase, error
23 from twisted.internet.main import CONNECTION_LOST
24
25
26 _POLL_DISCONNECTED = (_epoll.HUP | _epoll.ERR)
27
28 class EPollReactor(posixbase.PosixReactorBase):
29     """
30     A reactor that uses epoll(4).
31
32     @ivar _poller: A L{poll} which will be used to check for I/O
33         readiness.
34
35     @ivar _selectables: A dictionary mapping integer file descriptors to
36         instances of L{FileDescriptor} which have been registered with the
37         reactor.  All L{FileDescriptors} which are currently receiving read or
38         write readiness notifications will be present as values in this
39         dictionary.
40
41     @ivar _reads: A dictionary mapping integer file descriptors to arbitrary
42         values (this is essentially a set).  Keys in this dictionary will be
43         registered with C{_poller} for read readiness notifications which will
44         be dispatched to the corresponding L{FileDescriptor} instances in
45         C{_selectables}.
46
47     @ivar _writes: A dictionary mapping integer file descriptors to arbitrary
48         values (this is essentially a set).  Keys in this dictionary will be
49         registered with C{_poller} for write readiness notifications which will
50         be dispatched to the corresponding L{FileDescriptor} instances in
51         C{_selectables}.
52     """
53     implements(IReactorFDSet)
54
55     def __init__(self):
56         """
57         Initialize epoll object, file descriptor tracking dictionaries, and the
58         base class.
59         """
60         # Create the poller we're going to use.  The 1024 here is just a hint
61         # to the kernel, it is not a hard maximum.
62         self._poller = _epoll.epoll(1024)
63         self._reads = {}
64         self._writes = {}
65         self._selectables = {}
66         posixbase.PosixReactorBase.__init__(self)
67
68
69     def _add(self, xer, primary, other, selectables, event, antievent):
70         """
71         Private method for adding a descriptor from the event loop.
72
73         It takes care of adding it if  new or modifying it if already added
74         for another state (read -> read/write for example).
75         """
76         fd = xer.fileno()
77         if fd not in primary:
78             cmd = _epoll.CTL_ADD
79             flags = event
80             if fd in other:
81                 flags |= antievent
82                 cmd = _epoll.CTL_MOD
83             primary[fd] = 1
84             selectables[fd] = xer
85             # epoll_ctl can raise all kinds of IOErrors, and every one
86             # indicates a bug either in the reactor or application-code.
87             # Let them all through so someone sees a traceback and fixes
88             # something.  We'll do the same thing for every other call to
89             # this method in this file.
90             self._poller._control(cmd, fd, flags)
91
92
93     def addReader(self, reader):
94         """
95         Add a FileDescriptor for notification of data available to read.
96         """
97         self._add(reader, self._reads, self._writes, self._selectables, _epoll.IN, _epoll.OUT)
98
99
100     def addWriter(self, writer):
101         """
102         Add a FileDescriptor for notification of data available to write.
103         """
104         self._add(writer, self._writes, self._reads, self._selectables, _epoll.OUT, _epoll.IN)
105
106
107     def _remove(self, xer, primary, other, selectables, event, antievent):
108         """
109         Private method for removing a descriptor from the event loop.
110
111         It does the inverse job of _add, and also add a check in case of the fd
112         has gone away.
113         """
114         fd = xer.fileno()
115         if fd == -1:
116             for fd, fdes in selectables.items():
117                 if xer is fdes:
118                     break
119             else:
120                 return
121         if fd in primary:
122             cmd = _epoll.CTL_DEL
123             flags = event
124             if fd in other:
125                 flags = antievent
126                 cmd = _epoll.CTL_MOD
127             else:
128                 del selectables[fd]
129             del primary[fd]
130             # See comment above _control call in _add.
131             self._poller._control(cmd, fd, flags)
132
133
134     def removeReader(self, reader):
135         """
136         Remove a Selectable for notification of data available to read.
137         """
138         self._remove(reader, self._reads, self._writes, self._selectables, _epoll.IN, _epoll.OUT)
139
140
141     def removeWriter(self, writer):
142         """
143         Remove a Selectable for notification of data available to write.
144         """
145         self._remove(writer, self._writes, self._reads, self._selectables, _epoll.OUT, _epoll.IN)
146
147     def removeAll(self):
148         """
149         Remove all selectables, and return a list of them.
150         """
151         return self._removeAll(
152             [self._selectables[fd] for fd in self._reads],
153             [self._selectables[fd] for fd in self._writes])
154
155
156     def getReaders(self):
157         return [self._selectables[fd] for fd in self._reads]
158
159
160     def getWriters(self):
161         return [self._selectables[fd] for fd in self._writes]
162
163
164     def doPoll(self, timeout):
165         """
166         Poll the poller for new events.
167         """
168         if timeout is None:
169             timeout = 1
170         timeout = int(timeout * 1000) # convert seconds to milliseconds
171
172         try:
173             # Limit the number of events to the number of io objects we're
174             # currently tracking (because that's maybe a good heuristic) and
175             # the amount of time we block to the value specified by our
176             # caller.
177             l = self._poller.wait(len(self._selectables), timeout)
178         except IOError, err:
179             if err.errno == errno.EINTR:
180                 return
181             # See epoll_wait(2) for documentation on the other conditions
182             # under which this can fail.  They can only be due to a serious
183             # programming error on our part, so let's just announce them
184             # loudly.
185             raise
186
187         _drdw = self._doReadOrWrite
188         for fd, event in l:
189             try:
190                 selectable = self._selectables[fd]
191             except KeyError:
192                 pass
193             else:
194                 log.callWithLogger(selectable, _drdw, selectable, fd, event)
195
196     doIteration = doPoll
197
198     def _doReadOrWrite(self, selectable, fd, event):
199         """
200         fd is available for read or write, make the work and raise errors
201         if necessary.
202         """
203         why = None
204         inRead = False
205         if event & _POLL_DISCONNECTED and not (event & _epoll.IN):
206             why = CONNECTION_LOST
207         else:
208             try:
209                 if event & _epoll.IN:
210                     why = selectable.doRead()
211                     inRead = True
212                 if not why and event & _epoll.OUT:
213                     why = selectable.doWrite()
214                     inRead = False
215                 if selectable.fileno() != fd:
216                     why = error.ConnectionFdescWentAway(
217                           'Filedescriptor went away')
218                     inRead = False
219             except:
220                 log.err()
221                 why = sys.exc_info()[1]
222         if why:
223             self._disconnectSelectable(selectable, why, inRead)
224
225 def install():
226     """
227     Install the epoll() reactor.
228     """
229     p = EPollReactor()
230     from twisted.internet.main import installReactor
231     installReactor(p)
232
233
234 __all__ = ["EPollReactor", "install"]
235
Note: See TracBrowser for help on using the browser.