root / trunk / twisted / internet / base.py

Revision 26967, 40.8 kB (checked in by exarkun, 1 month ago)

Merge unsigned-id-delayed-call-3791

Author: drake, exarkun
Reviewer: exarkun, therve
Fixes: #3791

Add twisted.python.util.setIDFunction to allow the behavior of unsignedID
to be controlled. Then use this to test that unsignedID is being used in
DelayedCall and AMP to construct the repr string.

Line 
1 # -*- test-case-name: twisted.test.test_internet -*-
2 # Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Very basic functionality for a Reactor implementation.
7 """
8
9 import socket # needed only for sync-dns
10 from zope.interface import implements, classImplements
11
12 import sys
13 import warnings
14 import operator
15 from heapq import heappush, heappop, heapify
16
17 import traceback
18
19 from twisted.python.compat import set
20 from twisted.python.util import unsignedID
21 from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThreads
22 from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver
23 from twisted.internet.interfaces import IConnector, IDelayedCall
24 from twisted.internet import fdesc, main, error, abstract, defer, threads
25 from twisted.python import log, failure, reflect
26 from twisted.python.runtime import seconds as runtimeSeconds, platform, platformType
27 from twisted.internet.defer import Deferred, DeferredList
28 from twisted.persisted import styles
29
30 # This import is for side-effects!  Even if you don't see any code using it
31 # in this module, don't delete it.
32 from twisted.python import threadable
33
34
35 class DelayedCall(styles.Ephemeral):
36
37     implements(IDelayedCall)
38     # enable .debug to record creator call stack, and it will be logged if
39     # an exception occurs while the function is being run
40     debug = False
41     _str = None
42
43     def __init__(self, time, func, args, kw, cancel, reset,
44                  seconds=runtimeSeconds):
45         """
46         @param time: Seconds from the epoch at which to call C{func}.
47         @param func: The callable to call.
48         @param args: The positional arguments to pass to the callable.
49         @param kw: The keyword arguments to pass to the callable.
50         @param cancel: A callable which will be called with this
51             DelayedCall before cancellation.
52         @param reset: A callable which will be called with this
53             DelayedCall after changing this DelayedCall's scheduled
54             execution time. The callable should adjust any necessary
55             scheduling details to ensure this DelayedCall is invoked
56             at the new appropriate time.
57         @param seconds: If provided, a no-argument callable which will be
58             used to determine the current time any time that information is
59             needed.
60         """
61         self.time, self.func, self.args, self.kw = time, func, args, kw
62         self.resetter = reset
63         self.canceller = cancel
64         self.seconds = seconds
65         self.cancelled = self.called = 0
66         self.delayed_time = 0
67         if self.debug:
68             self.creator = traceback.format_stack()[:-2]
69
70     def getTime(self):
71         """Return the time at which this call will fire
72
73         @rtype: C{float}
74         @return: The number of seconds after the epoch at which this call is
75         scheduled to be made.
76         """
77         return self.time + self.delayed_time
78
79     def cancel(self):
80         """Unschedule this call
81
82         @raise AlreadyCancelled: Raised if this call has already been
83         unscheduled.
84
85         @raise AlreadyCalled: Raised if this call has already been made.
86         """
87         if self.cancelled:
88             raise error.AlreadyCancelled
89         elif self.called:
90             raise error.AlreadyCalled
91         else:
92             self.canceller(self)
93             self.cancelled = 1
94             if self.debug:
95                 self._str = str(self)
96             del self.func, self.args, self.kw
97
98     def reset(self, secondsFromNow):
99         """Reschedule this call for a different time
100
101         @type secondsFromNow: C{float}
102         @param secondsFromNow: The number of seconds from the time of the
103         C{reset} call at which this call will be scheduled.
104
105         @raise AlreadyCancelled: Raised if this call has been cancelled.
106         @raise AlreadyCalled: Raised if this call has already been made.
107         """
108         if self.cancelled:
109             raise error.AlreadyCancelled
110         elif self.called:
111             raise error.AlreadyCalled
112         else:
113             newTime = self.seconds() + secondsFromNow
114             if newTime < self.time:
115                 self.delayed_time = 0
116                 self.time = newTime
117                 self.resetter(self)
118             else:
119                 self.delayed_time = newTime - self.time
120
121     def delay(self, secondsLater):
122         """Reschedule this call for a later time
123
124         @type secondsLater: C{float}
125         @param secondsLater: The number of seconds after the originally
126         scheduled time for which to reschedule this call.
127
128         @raise AlreadyCancelled: Raised if this call has been cancelled.
129         @raise AlreadyCalled: Raised if this call has already been made.
130         """
131         if self.cancelled:
132             raise error.AlreadyCancelled
133         elif self.called:
134             raise error.AlreadyCalled
135         else:
136             self.delayed_time += secondsLater
137             if self.delayed_time < 0:
138                 self.activate_delay()
139                 self.resetter(self)
140
141     def activate_delay(self):
142         self.time += self.delayed_time
143         self.delayed_time = 0
144
145     def active(self):
146         """Determine whether this call is still pending
147
148         @rtype: C{bool}
149         @return: True if this call has not yet been made or cancelled,
150         False otherwise.
151         """
152         return not (self.cancelled or self.called)
153
154     def __le__(self, other):
155         return self.time <= other.time
156
157
158     def __str__(self):
159         if self._str is not None:
160             return self._str
161         if hasattr(self, 'func'):
162             if hasattr(self.func, 'func_name'):
163                 func = self.func.func_name
164                 if hasattr(self.func, 'im_class'):
165                     func = self.func.im_class.__name__ + '.' + func
166             else:
167                 func = reflect.safe_repr(self.func)
168         else:
169             func = None
170
171         now = self.seconds()
172         L = ["<DelayedCall 0x%x [%ss] called=%s cancelled=%s" % (
173                 unsignedID(self), self.time - now, self.called,
174                 self.cancelled)]
175         if func is not None:
176             L.extend((" ", func, "("))
177             if self.args:
178                 L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
179                 if self.kw:
180                     L.append(", ")
181             if self.kw:
182                 L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.iteritems()]))
183             L.append(")")
184
185         if self.debug:
186             L.append("\n\ntraceback at creation: \n\n%s" % ('    '.join(self.creator)))
187         L.append('>')
188
189         return "".join(L)
190
191
192
193 class ThreadedResolver(object):
194     """
195     L{ThreadedResolver} uses a reactor, a threadpool, and
196     L{socket.gethostbyname} to perform name lookups without blocking the
197     reactor thread.  It also supports timeouts indepedently from whatever
198     timeout logic L{socket.gethostbyname} might have.
199
200     @ivar reactor: The reactor the threadpool of which will be used to call
201         L{socket.gethostbyname} and the I/O thread of which the result will be
202         delivered.
203     """
204     implements(IResolverSimple)
205
206     def __init__(self, reactor):
207         self.reactor = reactor
208         self._runningQueries = {}
209
210
211     def _fail(self, name, err):
212         err = error.DNSLookupError("address %r not found: %s" % (name, err))
213         return failure.Failure(err)
214
215
216     def _cleanup(self, name, lookupDeferred):
217         userDeferred, cancelCall = self._runningQueries[lookupDeferred]
218         del self._runningQueries[lookupDeferred]
219         userDeferred.errback(self._fail(name, "timeout error"))
220
221
222     def _checkTimeout(self, result, name, lookupDeferred):
223         try:
224             userDeferred, cancelCall = self._runningQueries[lookupDeferred]
225         except KeyError:
226             pass
227         else:
228             del self._runningQueries[lookupDeferred]
229             cancelCall.cancel()
230
231             if isinstance(result, failure.Failure):
232                 userDeferred.errback(self._fail(name, result.getErrorMessage()))
233             else:
234                 userDeferred.callback(result)
235
236
237     def getHostByName(self, name, timeout = (1, 3, 11, 45)):
238         """
239         See L{twisted.internet.interfaces.IResolverSimple.getHostByName}.
240
241         Note that the elements of C{timeout} are summed and the result is used
242         as a timeout for the lookup.  Any intermediate timeout or retry logic
243         is left up to the platform via L{socket.gethostbyname}.
244         """
245         if timeout:
246             timeoutDelay = reduce(operator.add, timeout)
247         else:
248             timeoutDelay = 60
249         userDeferred = defer.Deferred()
250         lookupDeferred = threads.deferToThreadPool(
251             self.reactor, self.reactor.getThreadPool(),
252             socket.gethostbyname, name)
253         cancelCall = self.reactor.callLater(
254             timeoutDelay, self._cleanup, name, lookupDeferred)
255         self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
256         lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
257         return userDeferred
258
259
260
261 class BlockingResolver:
262     implements(IResolverSimple)
263
264     def getHostByName(self, name, timeout = (1, 3, 11, 45)):
265         try:
266             address = socket.gethostbyname(name)
267         except socket.error:
268             msg = "address %r not found" % (name,)
269             err = error.DNSLookupError(msg)
270             return defer.fail(err)
271         else:
272             return defer.succeed(address)
273
274
275 class _ThreePhaseEvent(object):
276     """
277     Collection of callables (with arguments) which can be invoked as a group in
278     a particular order.
279
280     This provides the underlying implementation for the reactor's system event
281     triggers.  An instance of this class tracks triggers for all phases of a
282     single type of event.
283
284     @ivar before: A list of the before-phase triggers containing three-tuples
285         of a callable, a tuple of positional arguments, and a dict of keyword
286         arguments
287
288     @ivar finishedBefore: A list of the before-phase triggers which have
289         already been executed.  This is only populated in the C{'BEFORE'} state.
290
291     @ivar during: A list of the during-phase triggers containing three-tuples
292         of a callable, a tuple of positional arguments, and a dict of keyword
293         arguments
294
295     @ivar after: A list of the after-phase triggers containing three-tuples
296         of a callable, a tuple of positional arguments, and a dict of keyword
297         arguments
298
299     @ivar state: A string indicating what is currently going on with this
300         object.  One of C{'BASE'} (for when nothing in particular is happening;
301         this is the initial value), C{'BEFORE'} (when the before-phase triggers
302         are in the process of being executed).
303     """
304     def __init__(self):
305         self.before = []
306         self.during = []
307         self.after = []
308         self.state = 'BASE'
309
310
311     def addTrigger(self, phase, callable, *args, **kwargs):
312         """
313         Add a trigger to the indicate phase.
314
315         @param phase: One of C{'before'}, C{'during'}, or C{'after'}.
316
317         @param callable: An object to be called when this event is triggered.
318         @param *args: Positional arguments to pass to C{callable}.
319         @param **kwargs: Keyword arguments to pass to C{callable}.
320
321         @return: An opaque handle which may be passed to L{removeTrigger} to
322             reverse the effects of calling this method.
323         """
324         if phase not in ('before', 'during', 'after'):
325             raise KeyError("invalid phase")
326         getattr(self, phase).append((callable, args, kwargs))
327         return phase, callable, args, kwargs
328
329
330     def removeTrigger(self, handle):
331         """
332         Remove a previously added trigger callable.
333
334         @param handle: An object previously returned by L{addTrigger}.  The
335             trigger added by that call will be removed.
336
337         @raise ValueError: If the trigger associated with C{handle} has already
338             been removed or if C{handle} is not a valid handle.
339         """
340         return getattr(self, 'removeTrigger_' + self.state)(handle)
341
342
343     def removeTrigger_BASE(self, handle):
344         """
345         Just try to remove the trigger.
346
347         @see: removeTrigger
348         """
349         try:
350             phase, callable, args, kwargs = handle
351         except (TypeError, ValueError), e:
352             raise ValueError("invalid trigger handle")
353         else:
354             if phase not in ('before', 'during', 'after'):
355                 raise KeyError("invalid phase")
356             getattr(self, phase).remove((callable, args, kwargs))
357
358
359     def removeTrigger_BEFORE(self, handle):
360         """
361         Remove the trigger if it has yet to be executed, otherwise emit a
362         warning that in the future an exception will be raised when removing an
363         already-executed trigger.
364
365         @see: removeTrigger
366         """
367         phase, callable, args, kwargs = handle
368         if phase != 'before':
369             return self.removeTrigger_BASE(handle)
370         if (callable, args, kwargs) in self.finishedBefore:
371             warnings.warn(
372                 "Removing already-fired system event triggers will raise an "
373                 "exception in a future version of Twisted.",
374                 category=DeprecationWarning,
375                 stacklevel=3)
376         else:
377             self.removeTrigger_BASE(handle)
378
379
380     def fireEvent(self):
381         """
382         Call the triggers added to this event.
383         """
384         self.state = 'BEFORE'
385         self.finishedBefore = []
386         beforeResults = []
387         while self.before:
388             callable, args, kwargs = self.before.pop(0)
389             self.finishedBefore.append((callable, args, kwargs))
390             try:
391                 result = callable(*args, **kwargs)
392             except:
393                 log.err()
394             else:
395                 if isinstance(result, Deferred):
396                     beforeResults.append(result)
397         DeferredList(beforeResults).addCallback(self._continueFiring)
398
399
400     def _continueFiring(self, ignored):
401         """
402         Call the during and after phase triggers for this event.
403         """
404         self.state = 'BASE'
405         self.finishedBefore = []
406         for phase in self.during, self.after:
407             while phase:
408                 callable, args, kwargs = phase.pop(0)
409                 try:
410                     callable(*args, **kwargs)
411                 except:
412                     log.err()
413
414
415
416 class ReactorBase(object):
417     """
418     Default base class for Reactors.
419
420     @type _stopped: C{bool}
421     @ivar _stopped: A flag which is true between paired calls to C{reactor.run}
422         and C{reactor.stop}.  This should be replaced with an explicit state
423         machine.
424
425     @type _justStopped: C{bool}
426     @ivar _justStopped: A flag which is true between the time C{reactor.stop}
427         is called and the time the shutdown system event is fired.  This is
428         used to determine whether that event should be fired after each
429         iteration through the mainloop.  This should be replaced with an
430         explicit state machine.
431
432     @type _started: C{bool}
433     @ivar _started: A flag which is true from the time C{reactor.run} is called
434         until the time C{reactor.run} returns.  This is used to prevent calls
435         to C{reactor.run} on a running reactor.  This should be replaced with
436         an explicit state machine.
437
438     @ivar running: See L{IReactorCore.running}
439     """
440     implements(IReactorCore, IReactorTime, IReactorPluggableResolver)
441
442     _stopped = True
443     installed = False
444     usingThreads = False
445     resolver = BlockingResolver()
446
447     __name__ = "twisted.internet.reactor"
448
449     def __init__(self):
450         self.threadCallQueue = []
451         self._eventTriggers = {}
452         self._pendingTimedCalls = []
453         self._newTimedCalls = []
454         self._cancellations = 0
455         self.running = False
456         self._started = False
457         self._justStopped = False
458         # reactor internal readers, e.g. the waker.
459         self._internalReaders = set()
460         self.waker = None
461
462         # Arrange for the running attribute to change to True at the right time
463         # and let a subclass possibly do other things at that time (eg install
464         # signal handlers).
465         self.addSystemEventTrigger(
466             'during', 'startup', self._reallyStartRunning)
467         self.addSystemEventTrigger('during', 'shutdown', self.crash)
468         self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
469
470         if platform.supportsThreads():
471             self._initThreads()
472
473     # override in subclasses
474
475     _lock = None
476
477     def installWaker(self):
478         raise NotImplementedError(
479             reflect.qual(self.__class__) + " did not implement installWaker")
480
481     def installResolver(self, resolver):
482         assert IResolverSimple.providedBy(resolver)
483         oldResolver = self.resolver
484         self.resolver = resolver
485         return oldResolver
486
487     def wakeUp(self):
488         """
489         Wake up the event loop.
490         """
491         if self.waker:
492             self.waker.wakeUp()
493         # if the waker isn't installed, the reactor isn't running, and
494         # therefore doesn't need to be woken up
495
496     def doIteration(self, delay):
497         """
498         Do one iteration over the readers and writers which have been added.
499         """
500         raise NotImplementedError(
501             reflect.qual(self.__class__) + " did not implement doIteration")
502
503     def addReader(self, reader):
504         raise NotImplementedError(
505             reflect.qual(self.__class__) + " did not implement addReader")
506
507     def addWriter(self, writer):
508         raise NotImplementedError(
509             reflect.qual(self.__class__) + " did not implement addWriter")
510
511     def removeReader(self, reader):
512         raise NotImplementedError(
513             reflect.qual(self.__class__) + " did not implement removeReader")
514
515     def removeWriter(self, writer):
516         raise NotImplementedError(
517             reflect.qual(self.__class__) + " did not implement removeWriter")
518
519     def removeAll(self):
520         raise NotImplementedError(
521             reflect.qual(self.__class__) + " did not implement removeAll")
522
523
524     def getReaders(self):
525         raise NotImplementedError(
526             reflect.qual(self.__class__) + " did not implement getReaders")
527
528
529     def getWriters(self):
530         raise NotImplementedError(
531             reflect.qual(self.__class__) + " did not implement getWriters")
532
533
534     def resolve(self, name, timeout = (1, 3, 11, 45)):
535         """Return a Deferred that will resolve a hostname.
536         """
537         if not name:
538             # XXX - This is *less than* '::', and will screw up IPv6 servers
539             return defer.succeed('0.0.0.0')
540         if abstract.isIPAddress(name):
541             return defer.succeed(name)
542         return self.resolver.getHostByName(name, timeout)
543
544     # Installation.
545
546     # IReactorCore
547     def stop(self):
548         """
549         See twisted.internet.interfaces.IReactorCore.stop.
550         """
551         if self._stopped:
552             raise error.ReactorNotRunning(
553                 "Can't stop reactor that isn't running.")
554         self._stopped = True
555         self._justStopped = True
556
557
558     def crash(self):
559         """
560         See twisted.internet.interfaces.IReactorCore.crash.
561
562         Reset reactor state tracking attributes and re-initialize certain
563         state-transition helpers which were set up in C{__init__} but later
564         destroyed (through use).
565         """
566         self._started = False
567         self.running = False
568         self.addSystemEventTrigger(
569             'during', 'startup', self._reallyStartRunning)
570
571     def sigInt(self, *args):
572         """Handle a SIGINT interrupt.
573         """
574         log.msg("Received SIGINT, shutting down.")
575         self.callFromThread(self.stop)
576
577     def sigBreak(self, *args):
578         """Handle a SIGBREAK interrupt.
579         """
580         log.msg("Received SIGBREAK, shutting down.")
581         self.callFromThread(self.stop)
582
583     def sigTerm(self, *args):
584         """Handle a SIGTERM interrupt.
585         """
586         log.msg("Received SIGTERM, shutting down.")
587         self.callFromThread(self.stop)
588
589     def disconnectAll(self):
590         """Disconnect every reader, and writer in the system.
591         """
592         selectables = self.removeAll()
593         for reader in selectables:
594             log.callWithLogger(reader,
595                                reader.connectionLost,
596                                failure.Failure(main.CONNECTION_LOST))
597
598
599     def iterate(self, delay=0):
600         """See twisted.internet.interfaces.IReactorCore.iterate.
601         """
602         self.runUntilCurrent()
603         self.doIteration(delay)
604
605
606     def fireSystemEvent(self, eventType):
607         """See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
608         """
609         event = self._eventTriggers.get(eventType)
610         if event is not None:
611             event.fireEvent()
612
613
614     def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw):
615         """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
616         """
617         assert callable(_f), "%s is not callable" % _f
618         if _eventType not in self._eventTriggers:
619             self._eventTriggers[_eventType] = _ThreePhaseEvent()
620         return (_eventType, self._eventTriggers[_eventType].addTrigger(
621             _phase, _f, *args, **kw))
622
623
624     def removeSystemEventTrigger(self, triggerID):
625         """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger.
626         """
627         eventType, handle = triggerID
628         self._eventTriggers[eventType].removeTrigger(handle)
629
630
631     def callWhenRunning(self, _callable, *args, **kw):
632         """See twisted.internet.interfaces.IReactorCore.callWhenRunning.
633         """
634         if self.running:
635             _callable(*args, **kw)
636         else:
637             return self.addSystemEventTrigger('after', 'startup',
638                                               _callable, *args, **kw)
639
640     def startRunning(self):
641         """
642         Method called when reactor starts: do some initialization and fire
643         startup events.
644
645         Don't call this directly, call reactor.run() instead: it should take
646         care of calling this.
647
648         This method is somewhat misnamed.  The reactor will not necessarily be
649         in the running state by the time this method returns.  The only
650         guarantee is that it will be on its way to the running state.
651         """
652         if self._started:
653             raise error.ReactorAlreadyRunning()
654         self._started = True
655         self._stopped = False
656         threadable.registerAsIOThread()
657         self.fireSystemEvent('startup')
658
659
660     def _reallyStartRunning(self):
661         """
662         Method called to transition to the running state.  This should happen
663         in the I{during startup} event trigger phase.
664         """
665         self.running = True
666
667     # IReactorTime
668
669     seconds = staticmethod(runtimeSeconds)
670
671     def callLater(self, _seconds, _f, *args, **kw):
672         """See twisted.internet.interfaces.IReactorTime.callLater.
673         """
674         assert callable(_f), "%s is not callable" % _f
675         assert sys.maxint >= _seconds >= 0, \
676                "%s is not greater than or equal to 0 seconds" % (_seconds,)
677         tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
678                            self._cancelCallLater,
679                            self._moveCallLaterSooner,
680                            seconds=self.seconds)
681         self._newTimedCalls.append(tple)
682         return tple
683
684     def _moveCallLaterSooner(self, tple):
685         # Linear time find: slow.
686         heap = self._pendingTimedCalls
687         try:
688             pos = heap.index(tple)
689
690             # Move elt up the heap until it rests at the right place.
691             elt = heap[pos]
692             while pos != 0:
693                 parent = (pos-1) // 2
694                 if heap[parent] <= elt:
695                     break
696                 # move parent down
697                 heap[pos] = heap[parent]
698                 pos = parent
699             heap[pos] = elt
700         except ValueError:
701             # element was not found in heap - oh well...
702             pass
703
704     def _cancelCallLater(self, tple):
705         self._cancellations+=1
706
707     def cancelCallLater(self, callID):
708         """See twisted.internet.interfaces.IReactorTime.cancelCallLater.
709         """
710         # DO NOT DELETE THIS - this is documented in Python in a Nutshell, so we
711         # we can't get rid of it for a long time.
712         warnings.warn("reactor.cancelCallLater(callID) is deprecated - use callID.cancel() instead")
713         callID.cancel()
714
715     def getDelayedCalls(self):
716         """Return all the outstanding delayed calls in the system.
717         They are returned in no particular order.
718         This method is not efficient -- it is really only meant for
719         test cases."""
720         return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if not x.cancelled]
721
722     def _insertNewDelayedCalls(self):
723         for call in self._newTimedCalls:
724             if call.cancelled:
725                 self._cancellations-=1
726             else:
727                 call.activate_delay()
728                 heappush(self._pendingTimedCalls, call)
729         self._newTimedCalls = []
730
731     def timeout(self):
732         # insert new delayed calls to make sure to include them in timeout value
733         self._insertNewDelayedCalls()
734
735         if not self._pendingTimedCalls:
736             return None
737
738         return max(0, self._pendingTimedCalls[0].time - self.seconds())
739
740
741     def runUntilCurrent(self):
742         """Run all pending timed calls.
743         """
744         if self.threadCallQueue:
745             # Keep track of how many calls we actually make, as we're
746             # making them, in case another call is added to the queue
747             # while we're in this loop.
748             count = 0
749             total = len(self.threadCallQueue)
750             for (f, a, kw) in self.threadCallQueue:
751                 try:
752                     f(*a, **kw)
753                 except:
754                     log.err()
755                 count += 1
756                 if count == total:
757                     break
758             del self.threadCallQueue[:count]
759             if self.threadCallQueue:
760                 self.wakeUp()
761
762         # insert new delayed calls now
763         self._insertNewDelayedCalls()
764
765         now = self.seconds()
766         while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
767             call = heappop(self._pendingTimedCalls)
768             if call.cancelled:
769                 self._cancellations-=1
770                 continue
771
772             if call.delayed_time > 0:
773                 call.activate_delay()
774                 heappush(self._pendingTimedCalls, call)
775                 continue
776
777             try:
778                 call.called = 1
779                 call.func(*call.args, **call.kw)
780             except:
781                 log.deferr()
782                 if hasattr(call, "creator"):
783                     e = "\n"
784                     e += " C: previous exception occurred in " + \
785                          "a DelayedCall created here:\n"
786                     e += " C:"
787                     e += "".join(call.creator).rstrip().replace("\n","\n C:")
788                     e += "\n"
789                     log.msg(e)
790
791
792         if (self._cancellations > 50 and
793              self._cancellations > len(self._pendingTimedCalls) >> 1):
794             self._cancellations = 0
795             self._pendingTimedCalls = [x for x in self._pendingTimedCalls
796                                        if not x.cancelled]
797             heapify(self._pendingTimedCalls)
798
799         if self._justStopped:
800             self._justStopped = False
801             self.fireSystemEvent("shutdown")
802
803     # IReactorProcess
804
805     def _checkProcessArgs(self, args, env):
806         """
807         Check for valid arguments and environment to spawnProcess.
808
809         @return: A two element tuple giving values to use when creating the
810         process.  The first element of the tuple is a C{list} of C{str}
811         giving the values for argv of the child process.  The second element
812         of the tuple is either C{None} if C{env} was C{None} or a C{dict}
813         mapping C{str} environment keys to C{str} environment values.
814         """
815         # Any unicode string which Python would successfully implicitly
816         # encode to a byte string would have worked before these explicit
817         # checks were added.  Anything which would have failed with a
818         # UnicodeEncodeError during that implicit encoding step would have
819         # raised an exception in the child process and that would have been
820         # a pain in the butt to debug.
821         #
822         # So, we will explicitly attempt the same encoding which Python
823         # would implicitly do later.  If it fails, we will report an error
824         # without ever spawning a child process.  If it succeeds, we'll save
825         # the result so that Python doesn't need to do it implicitly later.
826         #
827         # For any unicode which we can actually encode, we'll also issue a
828         # deprecation warning, because no one should be passing unicode here
829         # anyway.
830         #
831         # -exarkun
832         defaultEncoding = sys.getdefaultencoding()
833
834         # Common check function
835         def argChecker(arg):
836             """
837             Return either a str or None.  If the given value is not
838             allowable for some reason, None is returned.  Otherwise, a
839             possibly different object which should be used in place of arg
840             is returned.  This forces unicode encoding to happen now, rather
841             than implicitly later.
842             """
843             if isinstance(arg, unicode):
844                 try:
845                     arg = arg.encode(defaultEncoding)
846                 except UnicodeEncodeError:
847                     return None
848                 warnings.warn(
849                     "Argument strings and environment keys/values passed to "
850                     "reactor.spawnProcess should be str, not unicode.",
851                     category=DeprecationWarning,
852                     stacklevel=4)
853             if isinstance(arg, str) and '\0' not in arg:
854                 return arg
855             return None
856
857         # Make a few tests to check input validity
858         if not isinstance(args, (tuple, list)):
859             raise TypeError("Arguments must be a tuple or list")
860
861         outputArgs = []
862         for arg in args:
863             arg = argChecker(arg)
864             if arg is None:
865                 raise TypeError("Arguments contain a non-string value")
866             else:
867                 outputArgs.append(arg)
868
869         outputEnv = None
870         if env is not None:
871             outputEnv = {}
872             for key, val in env.iteritems():
873                 key = argChecker(key)
874                 if key is None:
875                     raise TypeError("Environment contains a non-string key")
876                 val = argChecker(val)
877                 if val is None:
878                     raise TypeError("Environment contains a non-string value")
879                 outputEnv[key] = val
880         return outputArgs, outputEnv
881
882     # IReactorThreads
883     if platform.supportsThreads():
884         threadpool = None
885         # ID of the trigger starting the threadpool
886         _threadpoolStartupID = None
887         # ID of the trigger stopping the threadpool
888         threadpoolShutdownID = None
889
890         def _initThreads(self):
891             self.usingThreads = True
892             self.resolver = ThreadedResolver(self)
893             self.installWaker()
894
895         def callFromThread(self, f, *args, **kw):
896             """
897             See L{twisted.internet.interfaces.IReactorThreads.callFromThread}.
898             """
899             assert callable(f), "%s is not callable" % (f,)
900             # lists are thread-safe in CPython, but not in Jython
901             # this is probably a bug in Jython, but until fixed this code
902             # won't work in Jython.
903             self.threadCallQueue.append((f, args, kw))
904             self.wakeUp()
905
906         def _initThreadPool(self):
907             """
908             Create the threadpool accessible with callFromThread.
909             """
910             from twisted.python import threadpool
911             self.threadpool = threadpool.ThreadPool(
912                 0, 10, 'twisted.internet.reactor')
913             self._threadpoolStartupID = self.callWhenRunning(
914                 self.threadpool.start)
915             self.threadpoolShutdownID = self.addSystemEventTrigger(
916                 'during', 'shutdown', self._stopThreadPool)
917
918         def _stopThreadPool(self):
919             """
920             Stop the reactor threadpool.  This method is only valid if there
921             is currently a threadpool (created by L{_initThreadPool}).  It
922             is not intended to be called directly; instead, it will be
923             called by a shutdown trigger created in L{_initThreadPool}.
924             """
925             triggers = [self._threadpoolStartupID, self.threadpoolShutdownID]
926             for trigger in filter(None, triggers):
927                 try:
928                     self.removeSystemEventTrigger(trigger)
929                 except ValueError:
930                     pass
931             self._threadpoolStartupID = None
932             self.threadpoolShutdownID = None
933             self.threadpool.stop()
934             self.threadpool = None
935
936
937         def getThreadPool(self):
938             """
939             See L{twisted.internet.interfaces.IReactorThreads.getThreadPool}.
940             """
941             if self.threadpool is None:
942                 self._initThreadPool()
943             return self.threadpool
944
945
946         def callInThread(self, _callable, *args, **kwargs):
947             """
948             See L{twisted.internet.interfaces.IReactorThreads.callInThread}.
949             """
950             self.getThreadPool().callInThread(_callable, *args, **kwargs)
951
952         def suggestThreadPoolSize(self, size):
953             """
954             See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize}.
955             """
956             self.getThreadPool().adjustPoolsize(maxthreads=size)
957     else:
958         # This is for signal handlers.
959         def callFromThread(self, f, *args, **kw):
960             assert callable(f), "%s is not callable" % (f,)
961             # See comment in the other callFromThread implementation.
962             self.threadCallQueue.append((f, args, kw))
963
964 if platform.supportsThreads():
965     classImplements(ReactorBase, IReactorThreads)
966
967
968 class BaseConnector(styles.Ephemeral):
969     """Basic implementation of connector.
970
971     State can be: "connecting", "connected", "disconnected"
972     """
973
974     implements(IConnector)
975
976     timeoutID = None
977     factoryStarted = 0
978
979     def __init__(self, factory, timeout, reactor):
980         self.state = "disconnected"
981         self.reactor = reactor
982         self.factory = factory
983         self.timeout = timeout
984
985     def disconnect(self):
986         """Disconnect whatever our state is."""
987         if self.state == 'connecting':
988             self.stopConnecting()
989         elif self.state == 'connected':
990             self.transport.loseConnection()
991
992     def connect(self):
993         """Start connection to remote server."""
994         if self.state != "disconnected":
995             raise RuntimeError, "can't connect in this state"
996
997         self.state = "connecting"
998         if not self.factoryStarted:
999             self.factory.doStart()
1000             self.factoryStarted = 1
1001         self.transport = transport = self._makeTransport()
1002         if self.timeout is not None:
1003             self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
1004         self.factory.startedConnecting(self)
1005
1006     def stopConnecting(self):
1007         """Stop attempting to connect."""
1008         if self.state != "connecting":
1009             raise error.NotConnectingError, "we're not trying to connect"
1010
1011         self.state = "disconnected"
1012         self.transport.failIfNotConnected(error.UserError())
1013         del self.transport
1014
1015     def cancelTimeout(self):
1016         if self.timeoutID is not None:
1017             try:
1018                 self.timeoutID.cancel()
1019             except ValueError:
1020                 pass
1021             del self.timeoutID
1022
1023     def buildProtocol(self, addr):
1024         self.state = "connected"
1025         self.cancelTimeout()
1026         return self.factory.buildProtocol(addr)
1027
1028     def connectionFailed(self, reason):
1029         self.cancelTimeout()
1030         self.transport = None
1031         self.state = "disconnected"
1032         self.factory.clientConnectionFailed(self, reason)
1033         if self.state == "disconnected":
1034             # factory hasn't called our connect() method
1035             self.factory.doStop()
1036             self.factoryStarted = 0
1037
1038     def connectionLost(self, reason):
1039         self.state = "disconnected"
1040         self.factory.clientConnectionLost(self, reason)
1041         if self.state == "disconnected":
1042             # factory hasn't called our connect() method
1043             self.factory.doStop()
1044             self.factoryStarted = 0
1045
1046     def getDestination(self):
1047         raise NotImplementedError(
1048             reflect.qual(self.__class__) + " did not implement "
1049             "getDestination")
1050
1051
1052
1053 class BasePort(abstract.FileDescriptor):
1054     """Basic implementation of a ListeningPort.
1055
1056     Note: This does not actually implement IListeningPort.
1057     """
1058
1059     addressFamily = None
1060     socketType = None
1061
1062     def createInternetSocket(self):
1063         s = socket.socket(self.addressFamily, self.socketType)
1064         s.setblocking(0)
1065         fdesc._setCloseOnExec(s.fileno())
1066         return s
1067
1068
1069     def doWrite(self):
1070         """Raises a RuntimeError"""
1071         raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class__)
1072
1073
1074
1075 class _SignalReactorMixin:
1076     """
1077     Private mixin to manage signals: it installs signal handlers at start time,
1078     and define run method.
1079
1080     It can only be used mixed in with L{ReactorBase}, and has to be defined
1081     first in the inheritance (so that method resolution order finds
1082     startRunning first).
1083
1084     @type _installSignalHandlers: C{bool}
1085     @ivar _installSignalHandlers: A flag which indicates whether any signal
1086         handlers will be installed during startup.  This includes handlers for
1087         SIGCHLD to monitor child processes, and SIGINT, SIGTERM, and SIGBREAK
1088         to stop the reactor.
1089     """
1090
1091     _installSignalHandlers = False
1092
1093     def _handleSignals(self):
1094         """
1095         Install the signal handlers for the Twisted event loop.
1096         """
1097         try:
1098             import signal
1099         except ImportError:
1100             log.msg("Warning: signal module unavailable -- "
1101                     "not installing signal handlers.")
1102             return
1103
1104         if signal.getsignal(signal.SIGINT) == signal.default_int_handler:
1105             # only handle if there isn't already a handler, e.g. for Pdb.
1106             signal.signal(signal.SIGINT, self.sigInt)
1107         signal.signal(signal.SIGTERM, self.sigTerm)
1108
1109         # Catch Ctrl-Break in windows
1110         if hasattr(signal, "SIGBREAK"):
1111             signal.signal(signal.SIGBREAK, self.sigBreak)
1112
1113         if platformType == 'posix':
1114             signal.signal(signal.SIGCHLD, self._handleSigchld)
1115
1116
1117     def _handleSigchld(self, signum, frame, _threadSupport=platform.supportsThreads()):
1118         """
1119         Reap all processes on SIGCHLD.
1120
1121         This gets called on SIGCHLD. We do no processing inside a signal
1122         handler, as the calls we make here could occur between any two
1123         python bytecode instructions. Deferring processing to the next
1124         eventloop round prevents us from violating the state constraints
1125         of arbitrary classes.
1126         """
1127         from twisted.internet.process import reapAllProcesses
1128         if _threadSupport:
1129             self.callFromThread(reapAllProcesses)
1130         else:
1131             self.callLater(0, reapAllProcesses)
1132
1133
1134     def startRunning(self, installSignalHandlers=True):
1135         """
1136         Extend the base implementation in order to remember whether signal
1137         handlers should be installed later.
1138
1139         @type installSignalHandlers: C{bool}
1140         @param installSignalHandlers: A flag which, if set, indicates that
1141             handlers for a number of (implementation-defined) signals should be
1142             installed during startup.
1143         """
1144         self._installSignalHandlers = installSignalHandlers
1145         ReactorBase.startRunning(self)
1146
1147
1148     def _reallyStartRunning(self):
1149         """
1150         Extend the base implementation by also installing signal handlers, if
1151         C{self._installSignalHandlers} is true.
1152         """
1153         ReactorBase._reallyStartRunning(self)
1154         if self._installSignalHandlers:
1155             # Make sure this happens before after-startup events, since the
1156             # expectation of after-startup is that the reactor is fully
1157             # initialized.  Don't do it right away for historical reasons
1158             # (perhaps some before-startup triggers don't want there to be a
1159             # custom SIGCHLD handler so that they can run child processes with
1160             # some blocking api).
1161             self._handleSignals()
1162
1163
1164     def run(self, installSignalHandlers=True):
1165         self.startRunning(installSignalHandlers=installSignalHandlers)
1166         self.mainLoop()
1167
1168
1169     def mainLoop(self):
1170         while self._started:
1171             try:
1172                 while self._started:
1173                     # Advance simulation time in delayed event
1174                     # processors.
1175                     self.runUntilCurrent()
1176                     t2 = self.timeout()
1177                     t = self.running and t2
1178                     self.doIteration(t)
1179             except:
1180                 log.msg("Unexpected error in main loop.")
1181                 log.err()
1182             else:
1183                 log.msg('Main loop terminated.')
1184
1185
1186
1187 __all__ = []
Note: See TracBrowser for help on using the browser.