root/trunk/twisted/internet/base.py

Revision 29351, 40.2 KB (checked in by cyli, 2 months ago)

Merge remove-cancelcalllater-4076 : remove t.i.base.ReactorBase.cancelCallLater

Author: cyli
Reviewer: jesstess
Fixes: #4076

Remove deprecated twisted.internet.base.ReactorBase.cancelCallLater. It does not seem to be in Python in a Nutshell 2.

Line 
1# -*- test-case-name: twisted.test.test_internet -*-
2# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Very basic functionality for a Reactor implementation.
7"""
8
9import socket # needed only for sync-dns
10from zope.interface import implements, classImplements
11
12import sys
13import warnings
14from heapq import heappush, heappop, heapify
15
16import traceback
17
18from twisted.python.compat import set
19from twisted.python.util import unsignedID
20from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThreads
21from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver
22from twisted.internet.interfaces import IConnector, IDelayedCall
23from twisted.internet import fdesc, main, error, abstract, defer, threads
24from twisted.python import log, failure, reflect
25from twisted.python.runtime import seconds as runtimeSeconds, platform
26from twisted.internet.defer import Deferred, DeferredList
27from twisted.persisted import styles
28
29# This import is for side-effects!  Even if you don't see any code using it
30# in this module, don't delete it.
31from twisted.python import threadable
32
33
34class DelayedCall(styles.Ephemeral):
35
36    implements(IDelayedCall)
37    # enable .debug to record creator call stack, and it will be logged if
38    # an exception occurs while the function is being run
39    debug = False
40    _str = None
41
42    def __init__(self, time, func, args, kw, cancel, reset,
43                 seconds=runtimeSeconds):
44        """
45        @param time: Seconds from the epoch at which to call C{func}.
46        @param func: The callable to call.
47        @param args: The positional arguments to pass to the callable.
48        @param kw: The keyword arguments to pass to the callable.
49        @param cancel: A callable which will be called with this
50            DelayedCall before cancellation.
51        @param reset: A callable which will be called with this
52            DelayedCall after changing this DelayedCall's scheduled
53            execution time. The callable should adjust any necessary
54            scheduling details to ensure this DelayedCall is invoked
55            at the new appropriate time.
56        @param seconds: If provided, a no-argument callable which will be
57            used to determine the current time any time that information is
58            needed.
59        """
60        self.time, self.func, self.args, self.kw = time, func, args, kw
61        self.resetter = reset
62        self.canceller = cancel
63        self.seconds = seconds
64        self.cancelled = self.called = 0
65        self.delayed_time = 0
66        if self.debug:
67            self.creator = traceback.format_stack()[:-2]
68
69    def getTime(self):
70        """Return the time at which this call will fire
71
72        @rtype: C{float}
73        @return: The number of seconds after the epoch at which this call is
74        scheduled to be made.
75        """
76        return self.time + self.delayed_time
77
78    def cancel(self):
79        """Unschedule this call
80
81        @raise AlreadyCancelled: Raised if this call has already been
82        unscheduled.
83
84        @raise AlreadyCalled: Raised if this call has already been made.
85        """
86        if self.cancelled:
87            raise error.AlreadyCancelled
88        elif self.called:
89            raise error.AlreadyCalled
90        else:
91            self.canceller(self)
92            self.cancelled = 1
93            if self.debug:
94                self._str = str(self)
95            del self.func, self.args, self.kw
96
97    def reset(self, secondsFromNow):
98        """Reschedule this call for a different time
99
100        @type secondsFromNow: C{float}
101        @param secondsFromNow: The number of seconds from the time of the
102        C{reset} call at which this call will be scheduled.
103
104        @raise AlreadyCancelled: Raised if this call has been cancelled.
105        @raise AlreadyCalled: Raised if this call has already been made.
106        """
107        if self.cancelled:
108            raise error.AlreadyCancelled
109        elif self.called:
110            raise error.AlreadyCalled
111        else:
112            newTime = self.seconds() + secondsFromNow
113            if newTime < self.time:
114                self.delayed_time = 0
115                self.time = newTime
116                self.resetter(self)
117            else:
118                self.delayed_time = newTime - self.time
119
120    def delay(self, secondsLater):
121        """Reschedule this call for a later time
122
123        @type secondsLater: C{float}
124        @param secondsLater: The number of seconds after the originally
125        scheduled time for which to reschedule this call.
126
127        @raise AlreadyCancelled: Raised if this call has been cancelled.
128        @raise AlreadyCalled: Raised if this call has already been made.
129        """
130        if self.cancelled:
131            raise error.AlreadyCancelled
132        elif self.called:
133            raise error.AlreadyCalled
134        else:
135            self.delayed_time += secondsLater
136            if self.delayed_time < 0:
137                self.activate_delay()
138                self.resetter(self)
139
140    def activate_delay(self):
141        self.time += self.delayed_time
142        self.delayed_time = 0
143
144    def active(self):
145        """Determine whether this call is still pending
146
147        @rtype: C{bool}
148        @return: True if this call has not yet been made or cancelled,
149        False otherwise.
150        """
151        return not (self.cancelled or self.called)
152
153
154    def __le__(self, other):
155        """
156        Implement C{<=} operator between two L{DelayedCall} instances.
157
158        Comparison is based on the C{time} attribute (unadjusted by the
159        delayed time).
160        """
161        return self.time <= other.time
162
163
164    def __lt__(self, other):
165        """
166        Implement C{<} operator between two L{DelayedCall} instances.
167
168        Comparison is based on the C{time} attribute (unadjusted by the
169        delayed time).
170        """
171        return self.time < other.time
172
173
174    def __str__(self):
175        if self._str is not None:
176            return self._str
177        if hasattr(self, 'func'):
178            if hasattr(self.func, 'func_name'):
179                func = self.func.func_name
180                if hasattr(self.func, 'im_class'):
181                    func = self.func.im_class.__name__ + '.' + func
182            else:
183                func = reflect.safe_repr(self.func)
184        else:
185            func = None
186
187        now = self.seconds()
188        L = ["<DelayedCall 0x%x [%ss] called=%s cancelled=%s" % (
189                unsignedID(self), self.time - now, self.called,
190                self.cancelled)]
191        if func is not None:
192            L.extend((" ", func, "("))
193            if self.args:
194                L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
195                if self.kw:
196                    L.append(", ")
197            if self.kw:
198                L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.iteritems()]))
199            L.append(")")
200
201        if self.debug:
202            L.append("\n\ntraceback at creation: \n\n%s" % ('    '.join(self.creator)))
203        L.append('>')
204
205        return "".join(L)
206
207
208
209class ThreadedResolver(object):
210    """
211    L{ThreadedResolver} uses a reactor, a threadpool, and
212    L{socket.gethostbyname} to perform name lookups without blocking the
213    reactor thread.  It also supports timeouts indepedently from whatever
214    timeout logic L{socket.gethostbyname} might have.
215
216    @ivar reactor: The reactor the threadpool of which will be used to call
217        L{socket.gethostbyname} and the I/O thread of which the result will be
218        delivered.
219    """
220    implements(IResolverSimple)
221
222    def __init__(self, reactor):
223        self.reactor = reactor
224        self._runningQueries = {}
225
226
227    def _fail(self, name, err):
228        err = error.DNSLookupError("address %r not found: %s" % (name, err))
229        return failure.Failure(err)
230
231
232    def _cleanup(self, name, lookupDeferred):
233        userDeferred, cancelCall = self._runningQueries[lookupDeferred]
234        del self._runningQueries[lookupDeferred]
235        userDeferred.errback(self._fail(name, "timeout error"))
236
237
238    def _checkTimeout(self, result, name, lookupDeferred):
239        try:
240            userDeferred, cancelCall = self._runningQueries[lookupDeferred]
241        except KeyError:
242            pass
243        else:
244            del self._runningQueries[lookupDeferred]
245            cancelCall.cancel()
246
247            if isinstance(result, failure.Failure):
248                userDeferred.errback(self._fail(name, result.getErrorMessage()))
249            else:
250                userDeferred.callback(result)
251
252
253    def getHostByName(self, name, timeout = (1, 3, 11, 45)):
254        """
255        See L{twisted.internet.interfaces.IResolverSimple.getHostByName}.
256
257        Note that the elements of C{timeout} are summed and the result is used
258        as a timeout for the lookup.  Any intermediate timeout or retry logic
259        is left up to the platform via L{socket.gethostbyname}.
260        """
261        if timeout:
262            timeoutDelay = sum(timeout)
263        else:
264            timeoutDelay = 60
265        userDeferred = defer.Deferred()
266        lookupDeferred = threads.deferToThreadPool(
267            self.reactor, self.reactor.getThreadPool(),
268            socket.gethostbyname, name)
269        cancelCall = self.reactor.callLater(
270            timeoutDelay, self._cleanup, name, lookupDeferred)
271        self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
272        lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
273        return userDeferred
274
275
276
277class BlockingResolver:
278    implements(IResolverSimple)
279
280    def getHostByName(self, name, timeout = (1, 3, 11, 45)):
281        try:
282            address = socket.gethostbyname(name)
283        except socket.error:
284            msg = "address %r not found" % (name,)
285            err = error.DNSLookupError(msg)
286            return defer.fail(err)
287        else:
288            return defer.succeed(address)
289
290
291class _ThreePhaseEvent(object):
292    """
293    Collection of callables (with arguments) which can be invoked as a group in
294    a particular order.
295
296    This provides the underlying implementation for the reactor's system event
297    triggers.  An instance of this class tracks triggers for all phases of a
298    single type of event.
299
300    @ivar before: A list of the before-phase triggers containing three-tuples
301        of a callable, a tuple of positional arguments, and a dict of keyword
302        arguments
303
304    @ivar finishedBefore: A list of the before-phase triggers which have
305        already been executed.  This is only populated in the C{'BEFORE'} state.
306
307    @ivar during: A list of the during-phase triggers containing three-tuples
308        of a callable, a tuple of positional arguments, and a dict of keyword
309        arguments
310
311    @ivar after: A list of the after-phase triggers containing three-tuples
312        of a callable, a tuple of positional arguments, and a dict of keyword
313        arguments
314
315    @ivar state: A string indicating what is currently going on with this
316        object.  One of C{'BASE'} (for when nothing in particular is happening;
317        this is the initial value), C{'BEFORE'} (when the before-phase triggers
318        are in the process of being executed).
319    """
320    def __init__(self):
321        self.before = []
322        self.during = []
323        self.after = []
324        self.state = 'BASE'
325
326
327    def addTrigger(self, phase, callable, *args, **kwargs):
328        """
329        Add a trigger to the indicate phase.
330
331        @param phase: One of C{'before'}, C{'during'}, or C{'after'}.
332
333        @param callable: An object to be called when this event is triggered.
334        @param *args: Positional arguments to pass to C{callable}.
335        @param **kwargs: Keyword arguments to pass to C{callable}.
336
337        @return: An opaque handle which may be passed to L{removeTrigger} to
338            reverse the effects of calling this method.
339        """
340        if phase not in ('before', 'during', 'after'):
341            raise KeyError("invalid phase")
342        getattr(self, phase).append((callable, args, kwargs))
343        return phase, callable, args, kwargs
344
345
346    def removeTrigger(self, handle):
347        """
348        Remove a previously added trigger callable.
349
350        @param handle: An object previously returned by L{addTrigger}.  The
351            trigger added by that call will be removed.
352
353        @raise ValueError: If the trigger associated with C{handle} has already
354            been removed or if C{handle} is not a valid handle.
355        """
356        return getattr(self, 'removeTrigger_' + self.state)(handle)
357
358
359    def removeTrigger_BASE(self, handle):
360        """
361        Just try to remove the trigger.
362
363        @see: removeTrigger
364        """
365        try:
366            phase, callable, args, kwargs = handle
367        except (TypeError, ValueError), e:
368            raise ValueError("invalid trigger handle")
369        else:
370            if phase not in ('before', 'during', 'after'):
371                raise KeyError("invalid phase")
372            getattr(self, phase).remove((callable, args, kwargs))
373
374
375    def removeTrigger_BEFORE(self, handle):
376        """
377        Remove the trigger if it has yet to be executed, otherwise emit a
378        warning that in the future an exception will be raised when removing an
379        already-executed trigger.
380
381        @see: removeTrigger
382        """
383        phase, callable, args, kwargs = handle
384        if phase != 'before':
385            return self.removeTrigger_BASE(handle)
386        if (callable, args, kwargs) in self.finishedBefore:
387            warnings.warn(
388                "Removing already-fired system event triggers will raise an "
389                "exception in a future version of Twisted.",
390                category=DeprecationWarning,
391                stacklevel=3)
392        else:
393            self.removeTrigger_BASE(handle)
394
395
396    def fireEvent(self):
397        """
398        Call the triggers added to this event.
399        """
400        self.state = 'BEFORE'
401        self.finishedBefore = []
402        beforeResults = []
403        while self.before:
404            callable, args, kwargs = self.before.pop(0)
405            self.finishedBefore.append((callable, args, kwargs))
406            try:
407                result = callable(*args, **kwargs)
408            except:
409                log.err()
410            else:
411                if isinstance(result, Deferred):
412                    beforeResults.append(result)
413        DeferredList(beforeResults).addCallback(self._continueFiring)
414
415
416    def _continueFiring(self, ignored):
417        """
418        Call the during and after phase triggers for this event.
419        """
420        self.state = 'BASE'
421        self.finishedBefore = []
422        for phase in self.during, self.after:
423            while phase:
424                callable, args, kwargs = phase.pop(0)
425                try:
426                    callable(*args, **kwargs)
427                except:
428                    log.err()
429
430
431
432class ReactorBase(object):
433    """
434    Default base class for Reactors.
435
436    @type _stopped: C{bool}
437    @ivar _stopped: A flag which is true between paired calls to C{reactor.run}
438        and C{reactor.stop}.  This should be replaced with an explicit state
439        machine.
440
441    @type _justStopped: C{bool}
442    @ivar _justStopped: A flag which is true between the time C{reactor.stop}
443        is called and the time the shutdown system event is fired.  This is
444        used to determine whether that event should be fired after each
445        iteration through the mainloop.  This should be replaced with an
446        explicit state machine.
447
448    @type _started: C{bool}
449    @ivar _started: A flag which is true from the time C{reactor.run} is called
450        until the time C{reactor.run} returns.  This is used to prevent calls
451        to C{reactor.run} on a running reactor.  This should be replaced with
452        an explicit state machine.
453
454    @ivar running: See L{IReactorCore.running}
455    """
456    implements(IReactorCore, IReactorTime, IReactorPluggableResolver)
457
458    _stopped = True
459    installed = False
460    usingThreads = False
461    resolver = BlockingResolver()
462
463    __name__ = "twisted.internet.reactor"
464
465    def __init__(self):
466        self.threadCallQueue = []
467        self._eventTriggers = {}
468        self._pendingTimedCalls = []
469        self._newTimedCalls = []
470        self._cancellations = 0
471        self.running = False
472        self._started = False
473        self._justStopped = False
474        # reactor internal readers, e.g. the waker.
475        self._internalReaders = set()
476        self.waker = None
477
478        # Arrange for the running attribute to change to True at the right time
479        # and let a subclass possibly do other things at that time (eg install
480        # signal handlers).
481        self.addSystemEventTrigger(
482            'during', 'startup', self._reallyStartRunning)
483        self.addSystemEventTrigger('during', 'shutdown', self.crash)
484        self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
485
486        if platform.supportsThreads():
487            self._initThreads()
488        self.installWaker()
489
490    # override in subclasses
491
492    _lock = None
493
494    def installWaker(self):
495        raise NotImplementedError(
496            reflect.qual(self.__class__) + " did not implement installWaker")
497
498    def installResolver(self, resolver):
499        assert IResolverSimple.providedBy(resolver)
500        oldResolver = self.resolver
501        self.resolver = resolver
502        return oldResolver
503
504    def wakeUp(self):
505        """
506        Wake up the event loop.
507        """
508        if self.waker:
509            self.waker.wakeUp()
510        # if the waker isn't installed, the reactor isn't running, and
511        # therefore doesn't need to be woken up
512
513    def doIteration(self, delay):
514        """
515        Do one iteration over the readers and writers which have been added.
516        """
517        raise NotImplementedError(
518            reflect.qual(self.__class__) + " did not implement doIteration")
519
520    def addReader(self, reader):
521        raise NotImplementedError(
522            reflect.qual(self.__class__) + " did not implement addReader")
523
524    def addWriter(self, writer):
525        raise NotImplementedError(
526            reflect.qual(self.__class__) + " did not implement addWriter")
527
528    def removeReader(self, reader):
529        raise NotImplementedError(
530            reflect.qual(self.__class__) + " did not implement removeReader")
531
532    def removeWriter(self, writer):
533        raise NotImplementedError(
534            reflect.qual(self.__class__) + " did not implement removeWriter")
535
536    def removeAll(self):
537        raise NotImplementedError(
538            reflect.qual(self.__class__) + " did not implement removeAll")
539
540
541    def getReaders(self):
542        raise NotImplementedError(
543            reflect.qual(self.__class__) + " did not implement getReaders")
544
545
546    def getWriters(self):
547        raise NotImplementedError(
548            reflect.qual(self.__class__) + " did not implement getWriters")
549
550
551    def resolve(self, name, timeout = (1, 3, 11, 45)):
552        """Return a Deferred that will resolve a hostname.
553        """
554        if not name:
555            # XXX - This is *less than* '::', and will screw up IPv6 servers
556            return defer.succeed('0.0.0.0')
557        if abstract.isIPAddress(name):
558            return defer.succeed(name)
559        return self.resolver.getHostByName(name, timeout)
560
561    # Installation.
562
563    # IReactorCore
564    def stop(self):
565        """
566        See twisted.internet.interfaces.IReactorCore.stop.
567        """
568        if self._stopped:
569            raise error.ReactorNotRunning(
570                "Can't stop reactor that isn't running.")
571        self._stopped = True
572        self._justStopped = True
573
574
575    def crash(self):
576        """
577        See twisted.internet.interfaces.IReactorCore.crash.
578
579        Reset reactor state tracking attributes and re-initialize certain
580        state-transition helpers which were set up in C{__init__} but later
581        destroyed (through use).
582        """
583        self._started = False
584        self.running = False
585        self.addSystemEventTrigger(
586            'during', 'startup', self._reallyStartRunning)
587
588    def sigInt(self, *args):
589        """Handle a SIGINT interrupt.
590        """
591        log.msg("Received SIGINT, shutting down.")
592        self.callFromThread(self.stop)
593
594    def sigBreak(self, *args):
595        """Handle a SIGBREAK interrupt.
596        """
597        log.msg("Received SIGBREAK, shutting down.")
598        self.callFromThread(self.stop)
599
600    def sigTerm(self, *args):
601        """Handle a SIGTERM interrupt.
602        """
603        log.msg("Received SIGTERM, shutting down.")
604        self.callFromThread(self.stop)
605
606    def disconnectAll(self):
607        """Disconnect every reader, and writer in the system.
608        """
609        selectables = self.removeAll()
610        for reader in selectables:
611            log.callWithLogger(reader,
612                               reader.connectionLost,
613                               failure.Failure(main.CONNECTION_LOST))
614
615
616    def iterate(self, delay=0):
617        """See twisted.internet.interfaces.IReactorCore.iterate.
618        """
619        self.runUntilCurrent()
620        self.doIteration(delay)
621
622
623    def fireSystemEvent(self, eventType):
624        """See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
625        """
626        event = self._eventTriggers.get(eventType)
627        if event is not None:
628            event.fireEvent()
629
630
631    def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw):
632        """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
633        """
634        assert callable(_f), "%s is not callable" % _f
635        if _eventType not in self._eventTriggers:
636            self._eventTriggers[_eventType] = _ThreePhaseEvent()
637        return (_eventType, self._eventTriggers[_eventType].addTrigger(
638            _phase, _f, *args, **kw))
639
640
641    def removeSystemEventTrigger(self, triggerID):
642        """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger.
643        """
644        eventType, handle = triggerID
645        self._eventTriggers[eventType].removeTrigger(handle)
646
647
648    def callWhenRunning(self, _callable, *args, **kw):
649        """See twisted.internet.interfaces.IReactorCore.callWhenRunning.
650        """
651        if self.running:
652            _callable(*args, **kw)
653        else:
654            return self.addSystemEventTrigger('after', 'startup',
655                                              _callable, *args, **kw)
656
657    def startRunning(self):
658        """
659        Method called when reactor starts: do some initialization and fire
660        startup events.
661
662        Don't call this directly, call reactor.run() instead: it should take
663        care of calling this.
664
665        This method is somewhat misnamed.  The reactor will not necessarily be
666        in the running state by the time this method returns.  The only
667        guarantee is that it will be on its way to the running state.
668        """
669        if self._started:
670            raise error.ReactorAlreadyRunning()
671        self._started = True
672        self._stopped = False
673        threadable.registerAsIOThread()
674        self.fireSystemEvent('startup')
675
676
677    def _reallyStartRunning(self):
678        """
679        Method called to transition to the running state.  This should happen
680        in the I{during startup} event trigger phase.
681        """
682        self.running = True
683
684    # IReactorTime
685
686    seconds = staticmethod(runtimeSeconds)
687
688    def callLater(self, _seconds, _f, *args, **kw):
689        """See twisted.internet.interfaces.IReactorTime.callLater.
690        """
691        assert callable(_f), "%s is not callable" % _f
692        assert sys.maxint >= _seconds >= 0, \
693               "%s is not greater than or equal to 0 seconds" % (_seconds,)
694        tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
695                           self._cancelCallLater,
696                           self._moveCallLaterSooner,
697                           seconds=self.seconds)
698        self._newTimedCalls.append(tple)
699        return tple
700
701    def _moveCallLaterSooner(self, tple):
702        # Linear time find: slow.
703        heap = self._pendingTimedCalls
704        try:
705            pos = heap.index(tple)
706
707            # Move elt up the heap until it rests at the right place.
708            elt = heap[pos]
709            while pos != 0:
710                parent = (pos-1) // 2
711                if heap[parent] <= elt:
712                    break
713                # move parent down
714                heap[pos] = heap[parent]
715                pos = parent
716            heap[pos] = elt
717        except ValueError:
718            # element was not found in heap - oh well...
719            pass
720
721    def _cancelCallLater(self, tple):
722        self._cancellations+=1
723
724
725    def getDelayedCalls(self):
726        """Return all the outstanding delayed calls in the system.
727        They are returned in no particular order.
728        This method is not efficient -- it is really only meant for
729        test cases."""
730        return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if not x.cancelled]
731
732    def _insertNewDelayedCalls(self):
733        for call in self._newTimedCalls:
734            if call.cancelled:
735                self._cancellations-=1
736            else:
737                call.activate_delay()
738                heappush(self._pendingTimedCalls, call)
739        self._newTimedCalls = []
740
741    def timeout(self):
742        # insert new delayed calls to make sure to include them in timeout value
743        self._insertNewDelayedCalls()
744
745        if not self._pendingTimedCalls:
746            return None
747
748        return max(0, self._pendingTimedCalls[0].time - self.seconds())
749
750
751    def runUntilCurrent(self):
752        """Run all pending timed calls.
753        """
754        if self.threadCallQueue:
755            # Keep track of how many calls we actually make, as we're
756            # making them, in case another call is added to the queue
757            # while we're in this loop.
758            count = 0
759            total = len(self.threadCallQueue)
760            for (f, a, kw) in self.threadCallQueue:
761                try:
762                    f(*a, **kw)
763                except:
764                    log.err()
765                count += 1
766                if count == total:
767                    break
768            del self.threadCallQueue[:count]
769            if self.threadCallQueue:
770                self.wakeUp()
771
772        # insert new delayed calls now
773        self._insertNewDelayedCalls()
774
775        now = self.seconds()
776        while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
777            call = heappop(self._pendingTimedCalls)
778            if call.cancelled:
779                self._cancellations-=1
780                continue
781
782            if call.delayed_time > 0:
783                call.activate_delay()
784                heappush(self._pendingTimedCalls, call)
785                continue
786
787            try:
788                call.called = 1
789                call.func(*call.args, **call.kw)
790            except:
791                log.deferr()
792                if hasattr(call, "creator"):
793                    e = "\n"
794                    e += " C: previous exception occurred in " + \
795                         "a DelayedCall created here:\n"
796                    e += " C:"
797                    e += "".join(call.creator).rstrip().replace("\n","\n C:")
798                    e += "\n"
799                    log.msg(e)
800
801
802        if (self._cancellations > 50 and
803             self._cancellations > len(self._pendingTimedCalls) >> 1):
804            self._cancellations = 0
805            self._pendingTimedCalls = [x for x in self._pendingTimedCalls
806                                       if not x.cancelled]
807            heapify(self._pendingTimedCalls)
808
809        if self._justStopped:
810            self._justStopped = False
811            self.fireSystemEvent("shutdown")
812
813    # IReactorProcess
814
815    def _checkProcessArgs(self, args, env):
816        """
817        Check for valid arguments and environment to spawnProcess.
818
819        @return: A two element tuple giving values to use when creating the
820        process.  The first element of the tuple is a C{list} of C{str}
821        giving the values for argv of the child process.  The second element
822        of the tuple is either C{None} if C{env} was C{None} or a C{dict}
823        mapping C{str} environment keys to C{str} environment values.
824        """
825        # Any unicode string which Python would successfully implicitly
826        # encode to a byte string would have worked before these explicit
827        # checks were added.  Anything which would have failed with a
828        # UnicodeEncodeError during that implicit encoding step would have
829        # raised an exception in the child process and that would have been
830        # a pain in the butt to debug.
831        #
832        # So, we will explicitly attempt the same encoding which Python
833        # would implicitly do later.  If it fails, we will report an error
834        # without ever spawning a child process.  If it succeeds, we'll save
835        # the result so that Python doesn't need to do it implicitly later.
836        #
837        # For any unicode which we can actually encode, we'll also issue a
838        # deprecation warning, because no one should be passing unicode here
839        # anyway.
840        #
841        # -exarkun
842        defaultEncoding = sys.getdefaultencoding()
843
844        # Common check function
845        def argChecker(arg):
846            """
847            Return either a str or None.  If the given value is not
848            allowable for some reason, None is returned.  Otherwise, a
849            possibly different object which should be used in place of arg
850            is returned.  This forces unicode encoding to happen now, rather
851            than implicitly later.
852            """
853            if isinstance(arg, unicode):
854                try:
855                    arg = arg.encode(defaultEncoding)
856                except UnicodeEncodeError:
857                    return None
858                warnings.warn(
859                    "Argument strings and environment keys/values passed to "
860                    "reactor.spawnProcess should be str, not unicode.",
861                    category=DeprecationWarning,
862                    stacklevel=4)
863            if isinstance(arg, str) and '\0' not in arg:
864                return arg
865            return None
866
867        # Make a few tests to check input validity
868        if not isinstance(args, (tuple, list)):
869            raise TypeError("Arguments must be a tuple or list")
870
871        outputArgs = []
872        for arg in args:
873            arg = argChecker(arg)
874            if arg is None:
875                raise TypeError("Arguments contain a non-string value")
876            else:
877                outputArgs.append(arg)
878
879        outputEnv = None
880        if env is not None:
881            outputEnv = {}
882            for key, val in env.iteritems():
883                key = argChecker(key)
884                if key is None:
885                    raise TypeError("Environment contains a non-string key")
886                val = argChecker(val)
887                if val is None:
888                    raise TypeError("Environment contains a non-string value")
889                outputEnv[key] = val
890        return outputArgs, outputEnv
891
892    # IReactorThreads
893    if platform.supportsThreads():
894        threadpool = None
895        # ID of the trigger starting the threadpool
896        _threadpoolStartupID = None
897        # ID of the trigger stopping the threadpool
898        threadpoolShutdownID = None
899
900        def _initThreads(self):
901            self.usingThreads = True
902            self.resolver = ThreadedResolver(self)
903
904        def callFromThread(self, f, *args, **kw):
905            """
906            See L{twisted.internet.interfaces.IReactorThreads.callFromThread}.
907            """
908            assert callable(f), "%s is not callable" % (f,)
909            # lists are thread-safe in CPython, but not in Jython
910            # this is probably a bug in Jython, but until fixed this code
911            # won't work in Jython.
912            self.threadCallQueue.append((f, args, kw))
913            self.wakeUp()
914
915        def _initThreadPool(self):
916            """
917            Create the threadpool accessible with callFromThread.
918            """
919            from twisted.python import threadpool
920            self.threadpool = threadpool.ThreadPool(
921                0, 10, 'twisted.internet.reactor')
922            self._threadpoolStartupID = self.callWhenRunning(
923                self.threadpool.start)
924            self.threadpoolShutdownID = self.addSystemEventTrigger(
925                'during', 'shutdown', self._stopThreadPool)
926
927        def _uninstallHandler(self):
928            pass
929
930        def _stopThreadPool(self):
931            """
932            Stop the reactor threadpool.  This method is only valid if there
933            is currently a threadpool (created by L{_initThreadPool}).  It
934            is not intended to be called directly; instead, it will be
935            called by a shutdown trigger created in L{_initThreadPool}.
936            """
937            triggers = [self._threadpoolStartupID, self.threadpoolShutdownID]
938            for trigger in filter(None, triggers):
939                try:
940                    self.removeSystemEventTrigger(trigger)
941                except ValueError:
942                    pass
943            self._threadpoolStartupID = None
944            self.threadpoolShutdownID = None
945            self.threadpool.stop()
946            self.threadpool = None
947
948
949        def getThreadPool(self):
950            """
951            See L{twisted.internet.interfaces.IReactorThreads.getThreadPool}.
952            """
953            if self.threadpool is None:
954                self._initThreadPool()
955            return self.threadpool
956
957
958        def callInThread(self, _callable, *args, **kwargs):
959            """
960            See L{twisted.internet.interfaces.IReactorThreads.callInThread}.
961            """
962            self.getThreadPool().callInThread(_callable, *args, **kwargs)
963
964        def suggestThreadPoolSize(self, size):
965            """
966            See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize}.
967            """
968            self.getThreadPool().adjustPoolsize(maxthreads=size)
969    else:
970        # This is for signal handlers.
971        def callFromThread(self, f, *args, **kw):
972            assert callable(f), "%s is not callable" % (f,)
973            # See comment in the other callFromThread implementation.
974            self.threadCallQueue.append((f, args, kw))
975
976if platform.supportsThreads():
977    classImplements(ReactorBase, IReactorThreads)
978
979
980class BaseConnector(styles.Ephemeral):
981    """Basic implementation of connector.
982
983    State can be: "connecting", "connected", "disconnected"
984    """
985
986    implements(IConnector)
987
988    timeoutID = None
989    factoryStarted = 0
990
991    def __init__(self, factory, timeout, reactor):
992        self.state = "disconnected"
993        self.reactor = reactor
994        self.factory = factory
995        self.timeout = timeout
996
997    def disconnect(self):
998        """Disconnect whatever our state is."""
999        if self.state == 'connecting':
1000            self.stopConnecting()
1001        elif self.state == 'connected':
1002            self.transport.loseConnection()
1003
1004    def connect(self):
1005        """Start connection to remote server."""
1006        if self.state != "disconnected":
1007            raise RuntimeError, "can't connect in this state"
1008
1009        self.state = "connecting"
1010        if not self.factoryStarted:
1011            self.factory.doStart()
1012            self.factoryStarted = 1
1013        self.transport = transport = self._makeTransport()
1014        if self.timeout is not None:
1015            self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
1016        self.factory.startedConnecting(self)
1017
1018    def stopConnecting(self):
1019        """Stop attempting to connect."""
1020        if self.state != "connecting":
1021            raise error.NotConnectingError, "we're not trying to connect"
1022
1023        self.state = "disconnected"
1024        self.transport.failIfNotConnected(error.UserError())
1025        del self.transport
1026
1027    def cancelTimeout(self):
1028        if self.timeoutID is not None:
1029            try:
1030                self.timeoutID.cancel()
1031            except ValueError:
1032                pass
1033            del self.timeoutID
1034
1035    def buildProtocol(self, addr):
1036        self.state = "connected"
1037        self.cancelTimeout()
1038        return self.factory.buildProtocol(addr)
1039
1040    def connectionFailed(self, reason):
1041        self.cancelTimeout()
1042        self.transport = None
1043        self.state = "disconnected"
1044        self.factory.clientConnectionFailed(self, reason)
1045        if self.state == "disconnected":
1046            # factory hasn't called our connect() method
1047            self.factory.doStop()
1048            self.factoryStarted = 0
1049
1050    def connectionLost(self, reason):
1051        self.state = "disconnected"
1052        self.factory.clientConnectionLost(self, reason)
1053        if self.state == "disconnected":
1054            # factory hasn't called our connect() method
1055            self.factory.doStop()
1056            self.factoryStarted = 0
1057
1058    def getDestination(self):
1059        raise NotImplementedError(
1060            reflect.qual(self.__class__) + " did not implement "
1061            "getDestination")
1062
1063
1064
1065class BasePort(abstract.FileDescriptor):
1066    """Basic implementation of a ListeningPort.
1067
1068    Note: This does not actually implement IListeningPort.
1069    """
1070
1071    addressFamily = None
1072    socketType = None
1073
1074    def createInternetSocket(self):
1075        s = socket.socket(self.addressFamily, self.socketType)
1076        s.setblocking(0)
1077        fdesc._setCloseOnExec(s.fileno())
1078        return s
1079
1080
1081    def doWrite(self):
1082        """Raises a RuntimeError"""
1083        raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class__)
1084
1085
1086
1087class _SignalReactorMixin(object):
1088    """
1089    Private mixin to manage signals: it installs signal handlers at start time,
1090    and define run method.
1091
1092    It can only be used mixed in with L{ReactorBase}, and has to be defined
1093    first in the inheritance (so that method resolution order finds
1094    startRunning first).
1095
1096    @type _installSignalHandlers: C{bool}
1097    @ivar _installSignalHandlers: A flag which indicates whether any signal
1098        handlers will be installed during startup.  This includes handlers for
1099        SIGCHLD to monitor child processes, and SIGINT, SIGTERM, and SIGBREAK
1100        to stop the reactor.
1101    """
1102
1103    _installSignalHandlers = False
1104
1105    def _handleSignals(self):
1106        """
1107        Install the signal handlers for the Twisted event loop.
1108        """
1109        try:
1110            import signal
1111        except ImportError:
1112            log.msg("Warning: signal module unavailable -- "
1113                    "not installing signal handlers.")
1114            return
1115
1116        if signal.getsignal(signal.SIGINT) == signal.default_int_handler:
1117            # only handle if there isn't already a handler, e.g. for Pdb.
1118            signal.signal(signal.SIGINT, self.sigInt)
1119        signal.signal(signal.SIGTERM, self.sigTerm)
1120
1121        # Catch Ctrl-Break in windows
1122        if hasattr(signal, "SIGBREAK"):
1123            signal.signal(signal.SIGBREAK, self.sigBreak)
1124
1125
1126    def startRunning(self, installSignalHandlers=True):
1127        """
1128        Extend the base implementation in order to remember whether signal
1129        handlers should be installed later.
1130
1131        @type installSignalHandlers: C{bool}
1132        @param installSignalHandlers: A flag which, if set, indicates that
1133            handlers for a number of (implementation-defined) signals should be
1134            installed during startup.
1135        """
1136        self._installSignalHandlers = installSignalHandlers
1137        ReactorBase.startRunning(self)
1138
1139
1140    def _reallyStartRunning(self):
1141        """
1142        Extend the base implementation by also installing signal handlers, if
1143        C{self._installSignalHandlers} is true.
1144        """
1145        ReactorBase._reallyStartRunning(self)
1146        if self._installSignalHandlers:
1147            # Make sure this happens before after-startup events, since the
1148            # expectation of after-startup is that the reactor is fully
1149            # initialized.  Don't do it right away for historical reasons
1150            # (perhaps some before-startup triggers don't want there to be a
1151            # custom SIGCHLD handler so that they can run child processes with
1152            # some blocking api).
1153            self._handleSignals()
1154
1155
1156    def run(self, installSignalHandlers=True):
1157        self.startRunning(installSignalHandlers=installSignalHandlers)
1158        self.mainLoop()
1159
1160
1161    def mainLoop(self):
1162        while self._started:
1163            try:
1164                while self._started:
1165                    # Advance simulation time in delayed event
1166                    # processors.
1167                    self.runUntilCurrent()
1168                    t2 = self.timeout()
1169                    t = self.running and t2
1170                    self.doIteration(t)
1171            except:
1172                log.msg("Unexpected error in main loop.")
1173                log.err()
1174            else:
1175                log.msg('Main loop terminated.')
1176
1177
1178
1179__all__ = []
Note: See TracBrowser for help on using the browser.