root / trunk / twisted / internet / _threadedselect.py

Revision 24441, 12.2 kB (checked in by thijs, 1 year ago)

Merge maintainer-email-2438: Get rid of references to maintainer email addresses from code.

Author: thijs
Reviewer: exarkun
Fixes: #2438

  • Property svn:eol-style set to native
Line 
1 # -*- test-case-name: twisted.test.test_internet -*-
2 # $Id: default.py,v 1.90 2004/01/06 22:35:22 warner Exp $
3 #
4 # Copyright (c) 2001-2004 Twisted Matrix Laboratories.
5 # See LICENSE for details.
6
7 from __future__ import generators
8
9 """
10 Threaded select reactor
11
12 Maintainer: Bob Ippolito
13
14
15 The threadedselectreactor is a specialized reactor for integrating with
16 arbitrary foreign event loop, such as those you find in GUI toolkits.
17
18 There are three things you'll need to do to use this reactor.
19
20 Install the reactor at the beginning of your program, before importing
21 the rest of Twisted::
22
23     | from twisted.internet import _threadedselect
24     | _threadedselect.install()
25
26 Interleave this reactor with your foreign event loop, at some point after
27 your event loop is initialized::
28
29     | from twisted.internet import reactor
30     | reactor.interleave(foreignEventLoopWakerFunction)
31     | self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop)
32
33 Instead of shutting down the foreign event loop directly, shut down the
34 reactor::
35
36     | from twisted.internet import reactor
37     | reactor.stop()
38
39 In order for Twisted to do its work in the main thread (the thread that
40 interleave is called from), a waker function is necessary.  The waker function
41 will be called from a "background" thread with one argument: func.
42 The waker function's purpose is to call func() from the main thread.
43 Many GUI toolkits ship with appropriate waker functions.
44 Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in
45 older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter.
46 These would be used in place of "foreignEventLoopWakerFunction" in the above
47 example.
48
49 The other integration point at which the foreign event loop and this reactor
50 must integrate is shutdown.  In order to ensure clean shutdown of Twisted,
51 you must allow for Twisted to come to a complete stop before quitting the
52 application.  Typically, you will do this by setting up an after shutdown
53 trigger to stop your foreign event loop, and call reactor.stop() where you
54 would normally have initiated the shutdown procedure for the foreign event
55 loop.  Shutdown functions that could be used in place of
56 "foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance
57 with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function.
58 """
59
60 from threading import Thread
61 from Queue import Queue, Empty
62 from time import sleep
63 import sys
64
65 from zope.interface import implements
66
67 from twisted.internet.interfaces import IReactorFDSet
68 from twisted.internet import error
69 from twisted.internet import posixbase
70 from twisted.python import log, failure, threadable
71 from twisted.persisted import styles
72 from twisted.python.runtime import platformType
73
74 import select
75 from errno import EINTR, EBADF
76
77 from twisted.internet.selectreactor import _select
78
79 # Exceptions that doSelect might return frequently
80 _NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method')
81 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
82
83 def dictRemove(dct, value):
84     try:
85         del dct[value]
86     except KeyError:
87         pass
88
89 def raiseException(e):
90     raise e
91
92 class ThreadedSelectReactor(posixbase.PosixReactorBase):
93     """A threaded select() based reactor - runs on all POSIX platforms and on
94     Win32.
95     """
96     implements(IReactorFDSet)
97
98     def __init__(self):
99         threadable.init(1)
100         self.reads = {}
101         self.writes = {}
102         self.toThreadQueue = Queue()
103         self.toMainThread = Queue()
104         self.workerThread = None
105         self.mainWaker = None
106         posixbase.PosixReactorBase.__init__(self)
107         self.addSystemEventTrigger('after', 'shutdown', self._mainLoopShutdown)
108
109     def wakeUp(self):
110         # we want to wake up from any thread
111         self.waker.wakeUp()
112
113     def callLater(self, *args, **kw):
114         tple = posixbase.PosixReactorBase.callLater(self, *args, **kw)
115         self.wakeUp()
116         return tple
117
118     def _sendToMain(self, msg, *args):
119         #print >>sys.stderr, 'sendToMain', msg, args
120         self.toMainThread.put((msg, args))
121         if self.mainWaker is not None:
122             self.mainWaker()
123
124     def _sendToThread(self, fn, *args):
125         #print >>sys.stderr, 'sendToThread', fn, args
126         self.toThreadQueue.put((fn, args))
127
128     def _preenDescriptorsInThread(self):
129         log.msg("Malformed file descriptor found.  Preening lists.")
130         readers = self.reads.keys()
131         writers = self.writes.keys()
132         self.reads.clear()
133         self.writes.clear()
134         for selDict, selList in ((self.reads, readers), (self.writes, writers)):
135             for selectable in selList:
136                 try:
137                     select.select([selectable], [selectable], [selectable], 0)
138                 except:
139                     log.msg("bad descriptor %s" % selectable)
140                 else:
141                     selDict[selectable] = 1
142
143     def _workerInThread(self):
144         try:
145             while 1:
146                 fn, args = self.toThreadQueue.get()
147                 #print >>sys.stderr, "worker got", fn, args
148                 fn(*args)
149         except SystemExit:
150             pass # exception indicates this thread should exit
151         except:
152             f = failure.Failure()
153             self._sendToMain('Failure', f)
154         #print >>sys.stderr, "worker finished"
155
156     def _doSelectInThread(self, timeout):
157         """Run one iteration of the I/O monitor loop.
158
159         This will run all selectables who had input or output readiness
160         waiting for them.
161         """
162         reads = self.reads
163         writes = self.writes
164         while 1:
165             try:
166                 r, w, ignored = _select(reads.keys(),
167                                         writes.keys(),
168                                         [], timeout)
169                 break
170             except ValueError, ve:
171                 # Possibly a file descriptor has gone negative?
172                 log.err()
173                 self._preenDescriptorsInThread()
174             except TypeError, te:
175                 # Something *totally* invalid (object w/o fileno, non-integral
176                 # result) was passed
177                 log.err()
178                 self._preenDescriptorsInThread()
179             except (select.error, IOError), se:
180                 # select(2) encountered an error
181                 if se.args[0] in (0, 2):
182                     # windows does this if it got an empty list
183                     if (not reads) and (not writes):
184                         return
185                     else:
186                         raise
187                 elif se.args[0] == EINTR:
188                     return
189                 elif se.args[0] == EBADF:
190                     self._preenDescriptorsInThread()
191                 else:
192                     # OK, I really don't know what's going on.  Blow up.
193                     raise
194         self._sendToMain('Notify', r, w)
195
196     def _process_Notify(self, r, w):
197         #print >>sys.stderr, "_process_Notify"
198         reads = self.reads
199         writes = self.writes
200
201         _drdw = self._doReadOrWrite
202         _logrun = log.callWithLogger
203         for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", writes)):
204             for selectable in selectables:
205                 # if this was disconnected in another thread, kill it.
206                 if selectable not in dct:
207                     continue
208                 # This for pausing input when we're not ready for more.
209                 _logrun(selectable, _drdw, selectable, method, dct)
210         #print >>sys.stderr, "done _process_Notify"
211
212     def _process_Failure(self, f):
213         f.raiseException()
214
215     _doIterationInThread = _doSelectInThread
216
217     def ensureWorkerThread(self):
218         if self.workerThread is None or not self.workerThread.isAlive():
219             self.workerThread = Thread(target=self._workerInThread)
220             self.workerThread.start()
221
222     def doThreadIteration(self, timeout):
223         self._sendToThread(self._doIterationInThread, timeout)
224         self.ensureWorkerThread()
225         #print >>sys.stderr, 'getting...'
226         msg, args = self.toMainThread.get()
227         #print >>sys.stderr, 'got', msg, args
228         getattr(self, '_process_' + msg)(*args)
229
230     doIteration = doThreadIteration
231
232     def _interleave(self):
233         while self.running:
234             #print >>sys.stderr, "runUntilCurrent"
235             self.runUntilCurrent()
236             t2 = self.timeout()
237             t = self.running and t2
238             self._sendToThread(self._doIterationInThread, t)
239             #print >>sys.stderr, "yielding"
240             yield None
241             #print >>sys.stderr, "fetching"
242             msg, args = self.toMainThread.get_nowait()
243             getattr(self, '_process_' + msg)(*args)
244
245     def interleave(self, waker, *args, **kw):
246         """
247         interleave(waker) interleaves this reactor with the
248         current application by moving the blocking parts of
249         the reactor (select() in this case) to a separate
250         thread.  This is typically useful for integration with
251         GUI applications which have their own event loop
252         already running.
253
254         See the module docstring for more information.
255         """
256         self.startRunning(*args, **kw)
257         loop = self._interleave()
258         def mainWaker(waker=waker, loop=loop):
259             #print >>sys.stderr, "mainWaker()"
260             waker(loop.next)
261         self.mainWaker = mainWaker
262         loop.next()
263         self.ensureWorkerThread()
264
265     def _mainLoopShutdown(self):
266         self.mainWaker = None
267         if self.workerThread is not None:
268             #print >>sys.stderr, 'getting...'
269             self._sendToThread(raiseException, SystemExit)
270             self.wakeUp()
271             try:
272                 while 1:
273                     msg, args = self.toMainThread.get_nowait()
274                     #print >>sys.stderr, "ignored:", (msg, args)
275             except Empty:
276                 pass
277             self.workerThread.join()
278             self.workerThread = None
279         try:
280             while 1:
281                 fn, args = self.toThreadQueue.get_nowait()
282                 if fn is self._doIterationInThread:
283                     log.msg('Iteration is still in the thread queue!')
284                 elif fn is raiseException and args[0] is SystemExit:
285                     pass
286                 else:
287                     fn(*args)
288         except Empty:
289             pass
290
291     def _doReadOrWrite(self, selectable, method, dict):
292         try:
293             why = getattr(selectable, method)()
294             handfn = getattr(selectable, 'fileno', None)
295             if not handfn:
296                 why = _NO_FILENO
297             elif handfn() == -1:
298                 why = _NO_FILEDESC
299         except:
300             why = sys.exc_info()[1]
301             log.err()
302         if why:
303             self._disconnectSelectable(selectable, why, method == "doRead")
304
305     def addReader(self, reader):
306         """Add a FileDescriptor for notification of data available to read.
307         """
308         self._sendToThread(self.reads.__setitem__, reader, 1)
309         self.wakeUp()
310
311     def addWriter(self, writer):
312         """Add a FileDescriptor for notification of data available to write.
313         """
314         self._sendToThread(self.writes.__setitem__, writer, 1)
315         self.wakeUp()
316
317     def removeReader(self, reader):
318         """Remove a Selectable for notification of data available to read.
319         """
320         self._sendToThread(dictRemove, self.reads, reader)
321
322     def removeWriter(self, writer):
323         """Remove a Selectable for notification of data available to write.
324         """
325         self._sendToThread(dictRemove, self.writes, writer)
326
327     def removeAll(self):
328         return self._removeAll(self.reads, self.writes)
329
330
331     def getReaders(self):
332         return self.reads.keys()
333
334
335     def getWriters(self):
336         return self.writes.keys()
337
338
339     def run(self, installSignalHandlers=1):
340         self.startRunning(installSignalHandlers=installSignalHandlers)
341         self.mainLoop()
342
343     def mainLoop(self):
344         q = Queue()
345         self.interleave(q.put)
346         while self.running:
347             try:
348                 q.get()()
349             except StopIteration:
350                 break
351
352
353
354 def install():
355     """Configure the twisted mainloop to be run using the select() reactor.
356     """
357     reactor = ThreadedSelectReactor()
358     from twisted.internet.main import installReactor
359     installReactor(reactor)
360     return reactor
361
362 __all__ = ['install']
Note: See TracBrowser for help on using the browser.