root / trunk / twisted / internet / cfreactor.py

Revision 24441, 10.4 kB (checked in by thijs, 1 year ago)

Merge maintainer-email-2438: Get rid of references to maintainer email addresses from code.

Author: thijs
Reviewer: exarkun
Fixes: #2438

Line 
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4
5 """
6 This module provides support for Twisted to interact with CoreFoundation
7 CFRunLoops.  This includes Cocoa's NSRunLoop.
8
9 In order to use this support, simply do the following::
10
11     |  from twisted.internet import cfreactor
12     |  cfreactor.install()
13
14 Then use the twisted.internet APIs as usual.  The other methods here are not
15 intended to be called directly under normal use.  However, install can take
16 a runLoop kwarg, and run will take a withRunLoop arg if you need to explicitly
17 pass a CFRunLoop for some reason.  Otherwise it will make a pretty good guess
18 as to which runLoop you want (the current NSRunLoop if PyObjC is imported,
19 otherwise the current CFRunLoop.  Either way, if one doesn't exist, it will
20 be created).
21
22 Maintainer: Bob Ippolito
23 """
24
25 __all__ = ['install']
26
27 import sys
28
29 # hints for py2app
30 import Carbon.CF
31 import traceback
32
33 import cfsupport as cf
34
35 from zope.interface import implements
36
37 from twisted.python import log, threadable, failure
38 from twisted.internet.interfaces import IReactorFDSet
39 from twisted.internet import posixbase, error
40 from weakref import WeakKeyDictionary
41 from Foundation import NSRunLoop
42 from AppKit import NSApp
43
44 # cache two extremely common "failures" without traceback info
45 _faildict = {
46     error.ConnectionDone: failure.Failure(error.ConnectionDone()),
47     error.ConnectionLost: failure.Failure(error.ConnectionLost()),
48 }
49
50 class SelectableSocketWrapper(object):
51     _objCache = WeakKeyDictionary()
52
53     cf = None
54     def socketWrapperForReactorAndObject(klass, reactor, obj):
55         _objCache = klass._objCache
56         if obj in _objCache:
57             return _objCache[obj]
58         v = _objCache[obj] = klass(reactor, obj)
59         return v
60     socketWrapperForReactorAndObject = classmethod(socketWrapperForReactorAndObject)
61        
62     def __init__(self, reactor, obj):
63         if self.cf:
64             raise ValueError, "This socket wrapper is already initialized"
65         self.reactor = reactor
66         self.obj = obj
67         obj._orig_ssw_connectionLost = obj.connectionLost
68         obj.connectionLost = self.objConnectionLost
69         self.fd = obj.fileno()
70         self.writing = False
71         self.reading = False
72         self.wouldRead = False
73         self.wouldWrite = False
74         self.cf = cf.PyCFSocket(obj.fileno(), self.doRead, self.doWrite, self.doConnect)
75         self.cf.stopWriting()
76         reactor.getRunLoop().addSocket(self.cf)
77        
78     def __repr__(self):
79         return 'SSW(fd=%r r=%r w=%r x=%08x o=%08x)' % (self.fd, int(self.reading), int(self.writing), id(self), id(self.obj))
80
81     def objConnectionLost(self, *args, **kwargs):
82         obj = self.obj
83         self.reactor.removeReader(obj)
84         self.reactor.removeWriter(obj)
85         obj.connectionLost = obj._orig_ssw_connectionLost
86         obj.connectionLost(*args, **kwargs)
87         try:
88             del self._objCache[obj]
89         except:
90             pass
91         self.obj = None
92         self.cf = None
93
94     def doConnect(self, why):
95         pass
96
97     def startReading(self):
98         self.cf.startReading()
99         self.reading = True
100         if self.wouldRead:
101             if not self.reactor.running:
102                 self.reactor.callLater(0, self.doRead)
103             else:
104                 self.doRead()
105             self.wouldRead = False
106         return self
107
108     def stopReading(self):
109         self.cf.stopReading()
110         self.reading = False
111         self.wouldRead = False
112         return self
113
114     def startWriting(self):
115         self.cf.startWriting()
116         self.writing = True
117         if self.wouldWrite:
118             if not self.reactor.running:
119                 self.reactor.callLater(0, self.doWrite)
120             else:
121                 self.doWrite()
122             self.wouldWrite = False
123         return self
124
125     def stopWriting(self):
126         self.cf.stopWriting()
127         self.writing = False
128         self.wouldWrite = False
129    
130     def _finishReadOrWrite(self, fn, faildict=_faildict):
131         try:
132             why = fn()
133         except:
134             why = sys.exc_info()[1]
135             log.err()
136         if why:
137             try:
138                 f = faildict.get(why.__class__) or failure.Failure(why)
139                 self.objConnectionLost(f)
140             except:
141                 log.err()
142         if self.reactor.running:
143             self.reactor.simulate()
144
145     def doRead(self):
146         obj = self.obj
147         if not obj:
148             return
149         if not self.reading:
150             self.wouldRead = True
151             if self.reactor.running:
152                 self.reactor.simulate()
153             return
154         self._finishReadOrWrite(obj.doRead)
155
156     def doWrite(self):
157         obj = self.obj
158         if not obj:
159             return
160         if not self.writing:
161             self.wouldWrite = True
162             if self.reactor.running:
163                 self.reactor.simulate()
164             return
165         self._finishReadOrWrite(obj.doWrite)
166  
167     def __hash__(self):
168         return hash(self.fd)
169
170 class CFReactor(posixbase.PosixReactorBase):
171     implements(IReactorFDSet)
172     # how long to poll if we're don't care about signals
173     longIntervalOfTime = 60.0
174
175     # how long we should poll if we do care about signals
176     shortIntervalOfTime = 1.0
177
178     # don't set this
179     pollInterval = longIntervalOfTime
180
181     def __init__(self, runLoop=None):
182         self.readers = {}
183         self.writers = {}
184         self.running = 0
185         self.crashing = False
186         self._doRunUntilCurrent = True
187         self.timer = None
188         self.runLoop = None
189         self.nsRunLoop = None
190         self.didStartRunLoop = False
191         if runLoop is not None:
192             self.getRunLoop(runLoop)
193         posixbase.PosixReactorBase.__init__(self)
194
195     def getRunLoop(self, runLoop=None):
196         if self.runLoop is None:
197             self.nsRunLoop = runLoop or NSRunLoop.currentRunLoop()
198             self.runLoop = cf.PyCFRunLoop(self.nsRunLoop.getCFRunLoop())
199         return self.runLoop
200    
201     def addReader(self, reader):
202         self.readers[reader] = SelectableSocketWrapper.socketWrapperForReactorAndObject(self, reader).startReading()
203
204     def addWriter(self, writer):
205         self.writers[writer] = SelectableSocketWrapper.socketWrapperForReactorAndObject(self, writer).startWriting()
206
207     def removeReader(self, reader):
208         wrapped = self.readers.get(reader, None)
209         if wrapped is not None:
210             del self.readers[reader]
211             wrapped.stopReading()
212
213     def removeWriter(self, writer):
214         wrapped = self.writers.get(writer, None)
215         if wrapped is not None:
216             del self.writers[writer]
217             wrapped.stopWriting()
218
219
220     def getReaders(self):
221         return self.readers.keys()
222
223
224     def getWriters(self):
225         return self.writers.keys()
226
227
228     def removeAll(self):
229         r = self.readers.keys()
230         for s in self.readers.itervalues():
231             s.stopReading()
232         for s in self.writers.itervalues():
233             s.stopWriting()
234         self.readers.clear()
235         self.writers.clear()
236         return r
237        
238     def run(self, installSignalHandlers=1, withRunLoop=None):
239         if self.running:
240             raise ValueError, "Reactor already running"
241         if installSignalHandlers:
242             self.pollInterval = self.shortIntervalOfTime
243         runLoop = self.getRunLoop(withRunLoop)
244         self._startup()
245        
246         self.startRunning(installSignalHandlers=installSignalHandlers)
247
248         self.running = True
249         if NSApp() is None and self.nsRunLoop.currentMode() is None:
250             # Most of the time the NSRunLoop will have already started,
251             # but in this case it wasn't.
252             runLoop.run()
253             self.crashing = False
254             self.didStartRunLoop = True
255
256     def callLater(self, howlong, *args, **kwargs):
257         rval = posixbase.PosixReactorBase.callLater(self, howlong, *args, **kwargs)
258         if self.timer:
259             timeout = self.timeout()
260             if timeout is None:
261                 timeout = howlong
262             sleepUntil = cf.now() + min(timeout, howlong)
263             if sleepUntil < self.timer.getNextFireDate():
264                 self.timer.setNextFireDate(sleepUntil)
265         else:
266             pass
267         return rval
268        
269     def iterate(self, howlong=0.0):
270         if self.running:
271             raise ValueError, "Can't iterate a running reactor"
272         self.runUntilCurrent()
273         self.doIteration(howlong)
274        
275     def doIteration(self, howlong):
276         if self.running:
277             raise ValueError, "Can't iterate a running reactor"
278         howlong = howlong or 0.01
279         pi = self.pollInterval
280         self.pollInterval = howlong
281         self._doRunUntilCurrent = False
282         self.run()
283         self._doRunUntilCurrent = True
284         self.pollInterval = pi
285
286     def simulate(self):
287         if self.crashing:
288             return
289         if not self.running:
290             raise ValueError, "You can't simulate a stopped reactor"
291         if self._doRunUntilCurrent:
292             self.runUntilCurrent()
293         if self.crashing:
294             return
295         if self.timer is None:
296             return
297         nap = self.timeout()
298         if nap is None:
299             nap = self.pollInterval
300         else:
301             nap = min(self.pollInterval, nap)
302         if self.running:
303             self.timer.setNextFireDate(cf.now() + nap)
304         if not self._doRunUntilCurrent:
305             self.crash()
306        
307     def _startup(self):
308         if self.running:
309             raise ValueError, "Can't bootstrap a running reactor"
310         self.timer = cf.PyCFRunLoopTimer(cf.now(), self.pollInterval, self.simulate)
311         self.runLoop.addTimer(self.timer)
312
313     def cleanup(self):
314         pass
315
316     def sigInt(self, *args):
317         self.callLater(0.0, self.stop)
318
319     def crash(self):
320         if not self.running:
321             raise ValueError, "Can't crash a stopped reactor"
322         posixbase.PosixReactorBase.crash(self)
323         self.crashing = True
324         if self.timer is not None:
325             self.runLoop.removeTimer(self.timer)
326             self.timer = None
327         if self.didStartRunLoop:
328             self.runLoop.stop()
329
330     def stop(self):
331         if not self.running:
332             raise ValueError, "Can't stop a stopped reactor"
333         posixbase.PosixReactorBase.stop(self)
334
335 def install(runLoop=None):
336     """Configure the twisted mainloop to be run inside CFRunLoop.
337     """
338     reactor = CFReactor(runLoop=runLoop)
339     reactor.addSystemEventTrigger('after', 'shutdown', reactor.cleanup)
340     from twisted.internet.main import installReactor
341     installReactor(reactor)
342     return reactor
Note: See TracBrowser for help on using the browser.