root/trunk/twisted/internet/base.py

Revision 32109, 40.6 KB (checked in by exarkun, 11 months ago)

Merge win32-gtk-win32events-4862

Author: exarkun
Reviewer: itamarst
Fixes: #4862

Add IReactorWin32Events to the select, iocp, and Gtk reactors on Windows
(if the third-party dependencies are available) by having each of those
reactors run a Win32Reactor in a secondary thread and delegating event-
related methods to that reactor.

Significantly, this means all reactors on Windows now support serial ports.

Line 
1# -*- test-case-name: twisted.test.test_internet -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Very basic functionality for a Reactor implementation.
7"""
8
9import socket # needed only for sync-dns
10from zope.interface import implements, classImplements
11
12import sys
13import warnings
14from heapq import heappush, heappop, heapify
15
16import traceback
17
18from twisted.python.compat import set
19from twisted.python.util import unsignedID
20from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThreads
21from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver
22from twisted.internet.interfaces import IConnector, IDelayedCall
23from twisted.internet import fdesc, main, error, abstract, defer, threads
24from twisted.python import log, failure, reflect
25from twisted.python.runtime import seconds as runtimeSeconds, platform
26from twisted.internet.defer import Deferred, DeferredList
27from twisted.persisted import styles
28
29# This import is for side-effects!  Even if you don't see any code using it
30# in this module, don't delete it.
31from twisted.python import threadable
32
33
34class DelayedCall(styles.Ephemeral):
35
36    implements(IDelayedCall)
37    # enable .debug to record creator call stack, and it will be logged if
38    # an exception occurs while the function is being run
39    debug = False
40    _str = None
41
42    def __init__(self, time, func, args, kw, cancel, reset,
43                 seconds=runtimeSeconds):
44        """
45        @param time: Seconds from the epoch at which to call C{func}.
46        @param func: The callable to call.
47        @param args: The positional arguments to pass to the callable.
48        @param kw: The keyword arguments to pass to the callable.
49        @param cancel: A callable which will be called with this
50            DelayedCall before cancellation.
51        @param reset: A callable which will be called with this
52            DelayedCall after changing this DelayedCall's scheduled
53            execution time. The callable should adjust any necessary
54            scheduling details to ensure this DelayedCall is invoked
55            at the new appropriate time.
56        @param seconds: If provided, a no-argument callable which will be
57            used to determine the current time any time that information is
58            needed.
59        """
60        self.time, self.func, self.args, self.kw = time, func, args, kw
61        self.resetter = reset
62        self.canceller = cancel
63        self.seconds = seconds
64        self.cancelled = self.called = 0
65        self.delayed_time = 0
66        if self.debug:
67            self.creator = traceback.format_stack()[:-2]
68
69    def getTime(self):
70        """Return the time at which this call will fire
71
72        @rtype: C{float}
73        @return: The number of seconds after the epoch at which this call is
74        scheduled to be made.
75        """
76        return self.time + self.delayed_time
77
78    def cancel(self):
79        """Unschedule this call
80
81        @raise AlreadyCancelled: Raised if this call has already been
82        unscheduled.
83
84        @raise AlreadyCalled: Raised if this call has already been made.
85        """
86        if self.cancelled:
87            raise error.AlreadyCancelled
88        elif self.called:
89            raise error.AlreadyCalled
90        else:
91            self.canceller(self)
92            self.cancelled = 1
93            if self.debug:
94                self._str = str(self)
95            del self.func, self.args, self.kw
96
97    def reset(self, secondsFromNow):
98        """Reschedule this call for a different time
99
100        @type secondsFromNow: C{float}
101        @param secondsFromNow: The number of seconds from the time of the
102        C{reset} call at which this call will be scheduled.
103
104        @raise AlreadyCancelled: Raised if this call has been cancelled.
105        @raise AlreadyCalled: Raised if this call has already been made.
106        """
107        if self.cancelled:
108            raise error.AlreadyCancelled
109        elif self.called:
110            raise error.AlreadyCalled
111        else:
112            newTime = self.seconds() + secondsFromNow
113            if newTime < self.time:
114                self.delayed_time = 0
115                self.time = newTime
116                self.resetter(self)
117            else:
118                self.delayed_time = newTime - self.time
119
120    def delay(self, secondsLater):
121        """Reschedule this call for a later time
122
123        @type secondsLater: C{float}
124        @param secondsLater: The number of seconds after the originally
125        scheduled time for which to reschedule this call.
126
127        @raise AlreadyCancelled: Raised if this call has been cancelled.
128        @raise AlreadyCalled: Raised if this call has already been made.
129        """
130        if self.cancelled:
131            raise error.AlreadyCancelled
132        elif self.called:
133            raise error.AlreadyCalled
134        else:
135            self.delayed_time += secondsLater
136            if self.delayed_time < 0:
137                self.activate_delay()
138                self.resetter(self)
139
140    def activate_delay(self):
141        self.time += self.delayed_time
142        self.delayed_time = 0
143
144    def active(self):
145        """Determine whether this call is still pending
146
147        @rtype: C{bool}
148        @return: True if this call has not yet been made or cancelled,
149        False otherwise.
150        """
151        return not (self.cancelled or self.called)
152
153
154    def __le__(self, other):
155        """
156        Implement C{<=} operator between two L{DelayedCall} instances.
157
158        Comparison is based on the C{time} attribute (unadjusted by the
159        delayed time).
160        """
161        return self.time <= other.time
162
163
164    def __lt__(self, other):
165        """
166        Implement C{<} operator between two L{DelayedCall} instances.
167
168        Comparison is based on the C{time} attribute (unadjusted by the
169        delayed time).
170        """
171        return self.time < other.time
172
173
174    def __str__(self):
175        if self._str is not None:
176            return self._str
177        if hasattr(self, 'func'):
178            if hasattr(self.func, 'func_name'):
179                func = self.func.func_name
180                if hasattr(self.func, 'im_class'):
181                    func = self.func.im_class.__name__ + '.' + func
182            else:
183                func = reflect.safe_repr(self.func)
184        else:
185            func = None
186
187        now = self.seconds()
188        L = ["<DelayedCall 0x%x [%ss] called=%s cancelled=%s" % (
189                unsignedID(self), self.time - now, self.called,
190                self.cancelled)]
191        if func is not None:
192            L.extend((" ", func, "("))
193            if self.args:
194                L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
195                if self.kw:
196                    L.append(", ")
197            if self.kw:
198                L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.iteritems()]))
199            L.append(")")
200
201        if self.debug:
202            L.append("\n\ntraceback at creation: \n\n%s" % ('    '.join(self.creator)))
203        L.append('>')
204
205        return "".join(L)
206
207
208
209class ThreadedResolver(object):
210    """
211    L{ThreadedResolver} uses a reactor, a threadpool, and
212    L{socket.gethostbyname} to perform name lookups without blocking the
213    reactor thread.  It also supports timeouts indepedently from whatever
214    timeout logic L{socket.gethostbyname} might have.
215
216    @ivar reactor: The reactor the threadpool of which will be used to call
217        L{socket.gethostbyname} and the I/O thread of which the result will be
218        delivered.
219    """
220    implements(IResolverSimple)
221
222    def __init__(self, reactor):
223        self.reactor = reactor
224        self._runningQueries = {}
225
226
227    def _fail(self, name, err):
228        err = error.DNSLookupError("address %r not found: %s" % (name, err))
229        return failure.Failure(err)
230
231
232    def _cleanup(self, name, lookupDeferred):
233        userDeferred, cancelCall = self._runningQueries[lookupDeferred]
234        del self._runningQueries[lookupDeferred]
235        userDeferred.errback(self._fail(name, "timeout error"))
236
237
238    def _checkTimeout(self, result, name, lookupDeferred):
239        try:
240            userDeferred, cancelCall = self._runningQueries[lookupDeferred]
241        except KeyError:
242            pass
243        else:
244            del self._runningQueries[lookupDeferred]
245            cancelCall.cancel()
246
247            if isinstance(result, failure.Failure):
248                userDeferred.errback(self._fail(name, result.getErrorMessage()))
249            else:
250                userDeferred.callback(result)
251
252
253    def getHostByName(self, name, timeout = (1, 3, 11, 45)):
254        """
255        See L{twisted.internet.interfaces.IResolverSimple.getHostByName}.
256
257        Note that the elements of C{timeout} are summed and the result is used
258        as a timeout for the lookup.  Any intermediate timeout or retry logic
259        is left up to the platform via L{socket.gethostbyname}.
260        """
261        if timeout:
262            timeoutDelay = sum(timeout)
263        else:
264            timeoutDelay = 60
265        userDeferred = defer.Deferred()
266        lookupDeferred = threads.deferToThreadPool(
267            self.reactor, self.reactor.getThreadPool(),
268            socket.gethostbyname, name)
269        cancelCall = self.reactor.callLater(
270            timeoutDelay, self._cleanup, name, lookupDeferred)
271        self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
272        lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
273        return userDeferred
274
275
276
277class BlockingResolver:
278    implements(IResolverSimple)
279
280    def getHostByName(self, name, timeout = (1, 3, 11, 45)):
281        try:
282            address = socket.gethostbyname(name)
283        except socket.error:
284            msg = "address %r not found" % (name,)
285            err = error.DNSLookupError(msg)
286            return defer.fail(err)
287        else:
288            return defer.succeed(address)
289
290
291class _ThreePhaseEvent(object):
292    """
293    Collection of callables (with arguments) which can be invoked as a group in
294    a particular order.
295
296    This provides the underlying implementation for the reactor's system event
297    triggers.  An instance of this class tracks triggers for all phases of a
298    single type of event.
299
300    @ivar before: A list of the before-phase triggers containing three-tuples
301        of a callable, a tuple of positional arguments, and a dict of keyword
302        arguments
303
304    @ivar finishedBefore: A list of the before-phase triggers which have
305        already been executed.  This is only populated in the C{'BEFORE'} state.
306
307    @ivar during: A list of the during-phase triggers containing three-tuples
308        of a callable, a tuple of positional arguments, and a dict of keyword
309        arguments
310
311    @ivar after: A list of the after-phase triggers containing three-tuples
312        of a callable, a tuple of positional arguments, and a dict of keyword
313        arguments
314
315    @ivar state: A string indicating what is currently going on with this
316        object.  One of C{'BASE'} (for when nothing in particular is happening;
317        this is the initial value), C{'BEFORE'} (when the before-phase triggers
318        are in the process of being executed).
319    """
320    def __init__(self):
321        self.before = []
322        self.during = []
323        self.after = []
324        self.state = 'BASE'
325
326
327    def addTrigger(self, phase, callable, *args, **kwargs):
328        """
329        Add a trigger to the indicate phase.
330
331        @param phase: One of C{'before'}, C{'during'}, or C{'after'}.
332
333        @param callable: An object to be called when this event is triggered.
334        @param *args: Positional arguments to pass to C{callable}.
335        @param **kwargs: Keyword arguments to pass to C{callable}.
336
337        @return: An opaque handle which may be passed to L{removeTrigger} to
338            reverse the effects of calling this method.
339        """
340        if phase not in ('before', 'during', 'after'):
341            raise KeyError("invalid phase")
342        getattr(self, phase).append((callable, args, kwargs))
343        return phase, callable, args, kwargs
344
345
346    def removeTrigger(self, handle):
347        """
348        Remove a previously added trigger callable.
349
350        @param handle: An object previously returned by L{addTrigger}.  The
351            trigger added by that call will be removed.
352
353        @raise ValueError: If the trigger associated with C{handle} has already
354            been removed or if C{handle} is not a valid handle.
355        """
356        return getattr(self, 'removeTrigger_' + self.state)(handle)
357
358
359    def removeTrigger_BASE(self, handle):
360        """
361        Just try to remove the trigger.
362
363        @see: removeTrigger
364        """
365        try:
366            phase, callable, args, kwargs = handle
367        except (TypeError, ValueError):
368            raise ValueError("invalid trigger handle")
369        else:
370            if phase not in ('before', 'during', 'after'):
371                raise KeyError("invalid phase")
372            getattr(self, phase).remove((callable, args, kwargs))
373
374
375    def removeTrigger_BEFORE(self, handle):
376        """
377        Remove the trigger if it has yet to be executed, otherwise emit a
378        warning that in the future an exception will be raised when removing an
379        already-executed trigger.
380
381        @see: removeTrigger
382        """
383        phase, callable, args, kwargs = handle
384        if phase != 'before':
385            return self.removeTrigger_BASE(handle)
386        if (callable, args, kwargs) in self.finishedBefore:
387            warnings.warn(
388                "Removing already-fired system event triggers will raise an "
389                "exception in a future version of Twisted.",
390                category=DeprecationWarning,
391                stacklevel=3)
392        else:
393            self.removeTrigger_BASE(handle)
394
395
396    def fireEvent(self):
397        """
398        Call the triggers added to this event.
399        """
400        self.state = 'BEFORE'
401        self.finishedBefore = []
402        beforeResults = []
403        while self.before:
404            callable, args, kwargs = self.before.pop(0)
405            self.finishedBefore.append((callable, args, kwargs))
406            try:
407                result = callable(*args, **kwargs)
408            except:
409                log.err()
410            else:
411                if isinstance(result, Deferred):
412                    beforeResults.append(result)
413        DeferredList(beforeResults).addCallback(self._continueFiring)
414
415
416    def _continueFiring(self, ignored):
417        """
418        Call the during and after phase triggers for this event.
419        """
420        self.state = 'BASE'
421        self.finishedBefore = []
422        for phase in self.during, self.after:
423            while phase:
424                callable, args, kwargs = phase.pop(0)
425                try:
426                    callable(*args, **kwargs)
427                except:
428                    log.err()
429
430
431
432class ReactorBase(object):
433    """
434    Default base class for Reactors.
435
436    @type _stopped: C{bool}
437    @ivar _stopped: A flag which is true between paired calls to C{reactor.run}
438        and C{reactor.stop}.  This should be replaced with an explicit state
439        machine.
440
441    @type _justStopped: C{bool}
442    @ivar _justStopped: A flag which is true between the time C{reactor.stop}
443        is called and the time the shutdown system event is fired.  This is
444        used to determine whether that event should be fired after each
445        iteration through the mainloop.  This should be replaced with an
446        explicit state machine.
447
448    @type _started: C{bool}
449    @ivar _started: A flag which is true from the time C{reactor.run} is called
450        until the time C{reactor.run} returns.  This is used to prevent calls
451        to C{reactor.run} on a running reactor.  This should be replaced with
452        an explicit state machine.
453
454    @ivar running: See L{IReactorCore.running}
455
456    @ivar _registerAsIOThread: A flag controlling whether the reactor will
457        register the thread it is running in as the I/O thread when it starts.
458        If C{True}, registration will be done, otherwise it will not be.
459    """
460    implements(IReactorCore, IReactorTime, IReactorPluggableResolver)
461
462    _registerAsIOThread = True
463
464    _stopped = True
465    installed = False
466    usingThreads = False
467    resolver = BlockingResolver()
468
469    __name__ = "twisted.internet.reactor"
470
471    def __init__(self):
472        self.threadCallQueue = []
473        self._eventTriggers = {}
474        self._pendingTimedCalls = []
475        self._newTimedCalls = []
476        self._cancellations = 0
477        self.running = False
478        self._started = False
479        self._justStopped = False
480        self._startedBefore = False
481        # reactor internal readers, e.g. the waker.
482        self._internalReaders = set()
483        self.waker = None
484
485        # Arrange for the running attribute to change to True at the right time
486        # and let a subclass possibly do other things at that time (eg install
487        # signal handlers).
488        self.addSystemEventTrigger(
489            'during', 'startup', self._reallyStartRunning)
490        self.addSystemEventTrigger('during', 'shutdown', self.crash)
491        self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
492
493        if platform.supportsThreads():
494            self._initThreads()
495        self.installWaker()
496
497    # override in subclasses
498
499    _lock = None
500
501    def installWaker(self):
502        raise NotImplementedError(
503            reflect.qual(self.__class__) + " did not implement installWaker")
504
505    def installResolver(self, resolver):
506        assert IResolverSimple.providedBy(resolver)
507        oldResolver = self.resolver
508        self.resolver = resolver
509        return oldResolver
510
511    def wakeUp(self):
512        """
513        Wake up the event loop.
514        """
515        if self.waker:
516            self.waker.wakeUp()
517        # if the waker isn't installed, the reactor isn't running, and
518        # therefore doesn't need to be woken up
519
520    def doIteration(self, delay):
521        """
522        Do one iteration over the readers and writers which have been added.
523        """
524        raise NotImplementedError(
525            reflect.qual(self.__class__) + " did not implement doIteration")
526
527    def addReader(self, reader):
528        raise NotImplementedError(
529            reflect.qual(self.__class__) + " did not implement addReader")
530
531    def addWriter(self, writer):
532        raise NotImplementedError(
533            reflect.qual(self.__class__) + " did not implement addWriter")
534
535    def removeReader(self, reader):
536        raise NotImplementedError(
537            reflect.qual(self.__class__) + " did not implement removeReader")
538
539    def removeWriter(self, writer):
540        raise NotImplementedError(
541            reflect.qual(self.__class__) + " did not implement removeWriter")
542
543    def removeAll(self):
544        raise NotImplementedError(
545            reflect.qual(self.__class__) + " did not implement removeAll")
546
547
548    def getReaders(self):
549        raise NotImplementedError(
550            reflect.qual(self.__class__) + " did not implement getReaders")
551
552
553    def getWriters(self):
554        raise NotImplementedError(
555            reflect.qual(self.__class__) + " did not implement getWriters")
556
557
558    def resolve(self, name, timeout = (1, 3, 11, 45)):
559        """Return a Deferred that will resolve a hostname.
560        """
561        if not name:
562            # XXX - This is *less than* '::', and will screw up IPv6 servers
563            return defer.succeed('0.0.0.0')
564        if abstract.isIPAddress(name):
565            return defer.succeed(name)
566        return self.resolver.getHostByName(name, timeout)
567
568    # Installation.
569
570    # IReactorCore
571    def stop(self):
572        """
573        See twisted.internet.interfaces.IReactorCore.stop.
574        """
575        if self._stopped:
576            raise error.ReactorNotRunning(
577                "Can't stop reactor that isn't running.")
578        self._stopped = True
579        self._justStopped = True
580        self._startedBefore = True
581
582
583    def crash(self):
584        """
585        See twisted.internet.interfaces.IReactorCore.crash.
586
587        Reset reactor state tracking attributes and re-initialize certain
588        state-transition helpers which were set up in C{__init__} but later
589        destroyed (through use).
590        """
591        self._started = False
592        self.running = False
593        self.addSystemEventTrigger(
594            'during', 'startup', self._reallyStartRunning)
595
596    def sigInt(self, *args):
597        """Handle a SIGINT interrupt.
598        """
599        log.msg("Received SIGINT, shutting down.")
600        self.callFromThread(self.stop)
601
602    def sigBreak(self, *args):
603        """Handle a SIGBREAK interrupt.
604        """
605        log.msg("Received SIGBREAK, shutting down.")
606        self.callFromThread(self.stop)
607
608    def sigTerm(self, *args):
609        """Handle a SIGTERM interrupt.
610        """
611        log.msg("Received SIGTERM, shutting down.")
612        self.callFromThread(self.stop)
613
614    def disconnectAll(self):
615        """Disconnect every reader, and writer in the system.
616        """
617        selectables = self.removeAll()
618        for reader in selectables:
619            log.callWithLogger(reader,
620                               reader.connectionLost,
621                               failure.Failure(main.CONNECTION_LOST))
622
623
624    def iterate(self, delay=0):
625        """See twisted.internet.interfaces.IReactorCore.iterate.
626        """
627        self.runUntilCurrent()
628        self.doIteration(delay)
629
630
631    def fireSystemEvent(self, eventType):
632        """See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
633        """
634        event = self._eventTriggers.get(eventType)
635        if event is not None:
636            event.fireEvent()
637
638
639    def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw):
640        """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
641        """
642        assert callable(_f), "%s is not callable" % _f
643        if _eventType not in self._eventTriggers:
644            self._eventTriggers[_eventType] = _ThreePhaseEvent()
645        return (_eventType, self._eventTriggers[_eventType].addTrigger(
646            _phase, _f, *args, **kw))
647
648
649    def removeSystemEventTrigger(self, triggerID):
650        """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger.
651        """
652        eventType, handle = triggerID
653        self._eventTriggers[eventType].removeTrigger(handle)
654
655
656    def callWhenRunning(self, _callable, *args, **kw):
657        """See twisted.internet.interfaces.IReactorCore.callWhenRunning.
658        """
659        if self.running:
660            _callable(*args, **kw)
661        else:
662            return self.addSystemEventTrigger('after', 'startup',
663                                              _callable, *args, **kw)
664
665    def startRunning(self):
666        """
667        Method called when reactor starts: do some initialization and fire
668        startup events.
669
670        Don't call this directly, call reactor.run() instead: it should take
671        care of calling this.
672
673        This method is somewhat misnamed.  The reactor will not necessarily be
674        in the running state by the time this method returns.  The only
675        guarantee is that it will be on its way to the running state.
676        """
677        if self._started:
678            raise error.ReactorAlreadyRunning()
679        if self._startedBefore:
680            raise error.ReactorNotRestartable()
681        self._started = True
682        self._stopped = False
683        if self._registerAsIOThread:
684            threadable.registerAsIOThread()
685        self.fireSystemEvent('startup')
686
687
688    def _reallyStartRunning(self):
689        """
690        Method called to transition to the running state.  This should happen
691        in the I{during startup} event trigger phase.
692        """
693        self.running = True
694
695    # IReactorTime
696
697    seconds = staticmethod(runtimeSeconds)
698
699    def callLater(self, _seconds, _f, *args, **kw):
700        """See twisted.internet.interfaces.IReactorTime.callLater.
701        """
702        assert callable(_f), "%s is not callable" % _f
703        assert sys.maxint >= _seconds >= 0, \
704               "%s is not greater than or equal to 0 seconds" % (_seconds,)
705        tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
706                           self._cancelCallLater,
707                           self._moveCallLaterSooner,
708                           seconds=self.seconds)
709        self._newTimedCalls.append(tple)
710        return tple
711
712    def _moveCallLaterSooner(self, tple):
713        # Linear time find: slow.
714        heap = self._pendingTimedCalls
715        try:
716            pos = heap.index(tple)
717
718            # Move elt up the heap until it rests at the right place.
719            elt = heap[pos]
720            while pos != 0:
721                parent = (pos-1) // 2
722                if heap[parent] <= elt:
723                    break
724                # move parent down
725                heap[pos] = heap[parent]
726                pos = parent
727            heap[pos] = elt
728        except ValueError:
729            # element was not found in heap - oh well...
730            pass
731
732    def _cancelCallLater(self, tple):
733        self._cancellations+=1
734
735
736    def getDelayedCalls(self):
737        """Return all the outstanding delayed calls in the system.
738        They are returned in no particular order.
739        This method is not efficient -- it is really only meant for
740        test cases."""
741        return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if not x.cancelled]
742
743    def _insertNewDelayedCalls(self):
744        for call in self._newTimedCalls:
745            if call.cancelled:
746                self._cancellations-=1
747            else:
748                call.activate_delay()
749                heappush(self._pendingTimedCalls, call)
750        self._newTimedCalls = []
751
752    def timeout(self):
753        # insert new delayed calls to make sure to include them in timeout value
754        self._insertNewDelayedCalls()
755
756        if not self._pendingTimedCalls:
757            return None
758
759        return max(0, self._pendingTimedCalls[0].time - self.seconds())
760
761
762    def runUntilCurrent(self):
763        """Run all pending timed calls.
764        """
765        if self.threadCallQueue:
766            # Keep track of how many calls we actually make, as we're
767            # making them, in case another call is added to the queue
768            # while we're in this loop.
769            count = 0
770            total = len(self.threadCallQueue)
771            for (f, a, kw) in self.threadCallQueue:
772                try:
773                    f(*a, **kw)
774                except:
775                    log.err()
776                count += 1
777                if count == total:
778                    break
779            del self.threadCallQueue[:count]
780            if self.threadCallQueue:
781                self.wakeUp()
782
783        # insert new delayed calls now
784        self._insertNewDelayedCalls()
785
786        now = self.seconds()
787        while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
788            call = heappop(self._pendingTimedCalls)
789            if call.cancelled:
790                self._cancellations-=1
791                continue
792
793            if call.delayed_time > 0:
794                call.activate_delay()
795                heappush(self._pendingTimedCalls, call)
796                continue
797
798            try:
799                call.called = 1
800                call.func(*call.args, **call.kw)
801            except:
802                log.deferr()
803                if hasattr(call, "creator"):
804                    e = "\n"
805                    e += " C: previous exception occurred in " + \
806                         "a DelayedCall created here:\n"
807                    e += " C:"
808                    e += "".join(call.creator).rstrip().replace("\n","\n C:")
809                    e += "\n"
810                    log.msg(e)
811
812
813        if (self._cancellations > 50 and
814             self._cancellations > len(self._pendingTimedCalls) >> 1):
815            self._cancellations = 0
816            self._pendingTimedCalls = [x for x in self._pendingTimedCalls
817                                       if not x.cancelled]
818            heapify(self._pendingTimedCalls)
819
820        if self._justStopped:
821            self._justStopped = False
822            self.fireSystemEvent("shutdown")
823
824    # IReactorProcess
825
826    def _checkProcessArgs(self, args, env):
827        """
828        Check for valid arguments and environment to spawnProcess.
829
830        @return: A two element tuple giving values to use when creating the
831        process.  The first element of the tuple is a C{list} of C{str}
832        giving the values for argv of the child process.  The second element
833        of the tuple is either C{None} if C{env} was C{None} or a C{dict}
834        mapping C{str} environment keys to C{str} environment values.
835        """
836        # Any unicode string which Python would successfully implicitly
837        # encode to a byte string would have worked before these explicit
838        # checks were added.  Anything which would have failed with a
839        # UnicodeEncodeError during that implicit encoding step would have
840        # raised an exception in the child process and that would have been
841        # a pain in the butt to debug.
842        #
843        # So, we will explicitly attempt the same encoding which Python
844        # would implicitly do later.  If it fails, we will report an error
845        # without ever spawning a child process.  If it succeeds, we'll save
846        # the result so that Python doesn't need to do it implicitly later.
847        #
848        # For any unicode which we can actually encode, we'll also issue a
849        # deprecation warning, because no one should be passing unicode here
850        # anyway.
851        #
852        # -exarkun
853        defaultEncoding = sys.getdefaultencoding()
854
855        # Common check function
856        def argChecker(arg):
857            """
858            Return either a str or None.  If the given value is not
859            allowable for some reason, None is returned.  Otherwise, a
860            possibly different object which should be used in place of arg
861            is returned.  This forces unicode encoding to happen now, rather
862            than implicitly later.
863            """
864            if isinstance(arg, unicode):
865                try:
866                    arg = arg.encode(defaultEncoding)
867                except UnicodeEncodeError:
868                    return None
869                warnings.warn(
870                    "Argument strings and environment keys/values passed to "
871                    "reactor.spawnProcess should be str, not unicode.",
872                    category=DeprecationWarning,
873                    stacklevel=4)
874            if isinstance(arg, str) and '\0' not in arg:
875                return arg
876            return None
877
878        # Make a few tests to check input validity
879        if not isinstance(args, (tuple, list)):
880            raise TypeError("Arguments must be a tuple or list")
881
882        outputArgs = []
883        for arg in args:
884            arg = argChecker(arg)
885            if arg is None:
886                raise TypeError("Arguments contain a non-string value")
887            else:
888                outputArgs.append(arg)
889
890        outputEnv = None
891        if env is not None:
892            outputEnv = {}
893            for key, val in env.iteritems():
894                key = argChecker(key)
895                if key is None:
896                    raise TypeError("Environment contains a non-string key")
897                val = argChecker(val)
898                if val is None:
899                    raise TypeError("Environment contains a non-string value")
900                outputEnv[key] = val
901        return outputArgs, outputEnv
902
903    # IReactorThreads
904    if platform.supportsThreads():
905        threadpool = None
906        # ID of the trigger starting the threadpool
907        _threadpoolStartupID = None
908        # ID of the trigger stopping the threadpool
909        threadpoolShutdownID = None
910
911        def _initThreads(self):
912            self.usingThreads = True
913            self.resolver = ThreadedResolver(self)
914
915        def callFromThread(self, f, *args, **kw):
916            """
917            See L{twisted.internet.interfaces.IReactorThreads.callFromThread}.
918            """
919            assert callable(f), "%s is not callable" % (f,)
920            # lists are thread-safe in CPython, but not in Jython
921            # this is probably a bug in Jython, but until fixed this code
922            # won't work in Jython.
923            self.threadCallQueue.append((f, args, kw))
924            self.wakeUp()
925
926        def _initThreadPool(self):
927            """
928            Create the threadpool accessible with callFromThread.
929            """
930            from twisted.python import threadpool
931            self.threadpool = threadpool.ThreadPool(
932                0, 10, 'twisted.internet.reactor')
933            self._threadpoolStartupID = self.callWhenRunning(
934                self.threadpool.start)
935            self.threadpoolShutdownID = self.addSystemEventTrigger(
936                'during', 'shutdown', self._stopThreadPool)
937
938        def _uninstallHandler(self):
939            pass
940
941        def _stopThreadPool(self):
942            """
943            Stop the reactor threadpool.  This method is only valid if there
944            is currently a threadpool (created by L{_initThreadPool}).  It
945            is not intended to be called directly; instead, it will be
946            called by a shutdown trigger created in L{_initThreadPool}.
947            """
948            triggers = [self._threadpoolStartupID, self.threadpoolShutdownID]
949            for trigger in filter(None, triggers):
950                try:
951                    self.removeSystemEventTrigger(trigger)
952                except ValueError:
953                    pass
954            self._threadpoolStartupID = None
955            self.threadpoolShutdownID = None
956            self.threadpool.stop()
957            self.threadpool = None
958
959
960        def getThreadPool(self):
961            """
962            See L{twisted.internet.interfaces.IReactorThreads.getThreadPool}.
963            """
964            if self.threadpool is None:
965                self._initThreadPool()
966            return self.threadpool
967
968
969        def callInThread(self, _callable, *args, **kwargs):
970            """
971            See L{twisted.internet.interfaces.IReactorThreads.callInThread}.
972            """
973            self.getThreadPool().callInThread(_callable, *args, **kwargs)
974
975        def suggestThreadPoolSize(self, size):
976            """
977            See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize}.
978            """
979            self.getThreadPool().adjustPoolsize(maxthreads=size)
980    else:
981        # This is for signal handlers.
982        def callFromThread(self, f, *args, **kw):
983            assert callable(f), "%s is not callable" % (f,)
984            # See comment in the other callFromThread implementation.
985            self.threadCallQueue.append((f, args, kw))
986
987if platform.supportsThreads():
988    classImplements(ReactorBase, IReactorThreads)
989
990
991class BaseConnector(styles.Ephemeral):
992    """Basic implementation of connector.
993
994    State can be: "connecting", "connected", "disconnected"
995    """
996
997    implements(IConnector)
998
999    timeoutID = None
1000    factoryStarted = 0
1001
1002    def __init__(self, factory, timeout, reactor):
1003        self.state = "disconnected"
1004        self.reactor = reactor
1005        self.factory = factory
1006        self.timeout = timeout
1007
1008    def disconnect(self):
1009        """Disconnect whatever our state is."""
1010        if self.state == 'connecting':
1011            self.stopConnecting()
1012        elif self.state == 'connected':
1013            self.transport.loseConnection()
1014
1015    def connect(self):
1016        """Start connection to remote server."""
1017        if self.state != "disconnected":
1018            raise RuntimeError, "can't connect in this state"
1019
1020        self.state = "connecting"
1021        if not self.factoryStarted:
1022            self.factory.doStart()
1023            self.factoryStarted = 1
1024        self.transport = transport = self._makeTransport()
1025        if self.timeout is not None:
1026            self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
1027        self.factory.startedConnecting(self)
1028
1029    def stopConnecting(self):
1030        """Stop attempting to connect."""
1031        if self.state != "connecting":
1032            raise error.NotConnectingError, "we're not trying to connect"
1033
1034        self.state = "disconnected"
1035        self.transport.failIfNotConnected(error.UserError())
1036        del self.transport
1037
1038    def cancelTimeout(self):
1039        if self.timeoutID is not None:
1040            try:
1041                self.timeoutID.cancel()
1042            except ValueError:
1043                pass
1044            del self.timeoutID
1045
1046    def buildProtocol(self, addr):
1047        self.state = "connected"
1048        self.cancelTimeout()
1049        return self.factory.buildProtocol(addr)
1050
1051    def connectionFailed(self, reason):
1052        self.cancelTimeout()
1053        self.transport = None
1054        self.state = "disconnected"
1055        self.factory.clientConnectionFailed(self, reason)
1056        if self.state == "disconnected":
1057            # factory hasn't called our connect() method
1058            self.factory.doStop()
1059            self.factoryStarted = 0
1060
1061    def connectionLost(self, reason):
1062        self.state = "disconnected"
1063        self.factory.clientConnectionLost(self, reason)
1064        if self.state == "disconnected":
1065            # factory hasn't called our connect() method
1066            self.factory.doStop()
1067            self.factoryStarted = 0
1068
1069    def getDestination(self):
1070        raise NotImplementedError(
1071            reflect.qual(self.__class__) + " did not implement "
1072            "getDestination")
1073
1074
1075
1076class BasePort(abstract.FileDescriptor):
1077    """Basic implementation of a ListeningPort.
1078
1079    Note: This does not actually implement IListeningPort.
1080    """
1081
1082    addressFamily = None
1083    socketType = None
1084
1085    def createInternetSocket(self):
1086        s = socket.socket(self.addressFamily, self.socketType)
1087        s.setblocking(0)
1088        fdesc._setCloseOnExec(s.fileno())
1089        return s
1090
1091
1092    def doWrite(self):
1093        """Raises a RuntimeError"""
1094        raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class__)
1095
1096
1097
1098class _SignalReactorMixin(object):
1099    """
1100    Private mixin to manage signals: it installs signal handlers at start time,
1101    and define run method.
1102
1103    It can only be used mixed in with L{ReactorBase}, and has to be defined
1104    first in the inheritance (so that method resolution order finds
1105    startRunning first).
1106
1107    @type _installSignalHandlers: C{bool}
1108    @ivar _installSignalHandlers: A flag which indicates whether any signal
1109        handlers will be installed during startup.  This includes handlers for
1110        SIGCHLD to monitor child processes, and SIGINT, SIGTERM, and SIGBREAK
1111        to stop the reactor.
1112    """
1113
1114    _installSignalHandlers = False
1115
1116    def _handleSignals(self):
1117        """
1118        Install the signal handlers for the Twisted event loop.
1119        """
1120        try:
1121            import signal
1122        except ImportError:
1123            log.msg("Warning: signal module unavailable -- "
1124                    "not installing signal handlers.")
1125            return
1126
1127        if signal.getsignal(signal.SIGINT) == signal.default_int_handler:
1128            # only handle if there isn't already a handler, e.g. for Pdb.
1129            signal.signal(signal.SIGINT, self.sigInt)
1130        signal.signal(signal.SIGTERM, self.sigTerm)
1131
1132        # Catch Ctrl-Break in windows
1133        if hasattr(signal, "SIGBREAK"):
1134            signal.signal(signal.SIGBREAK, self.sigBreak)
1135
1136
1137    def startRunning(self, installSignalHandlers=True):
1138        """
1139        Extend the base implementation in order to remember whether signal
1140        handlers should be installed later.
1141
1142        @type installSignalHandlers: C{bool}
1143        @param installSignalHandlers: A flag which, if set, indicates that
1144            handlers for a number of (implementation-defined) signals should be
1145            installed during startup.
1146        """
1147        self._installSignalHandlers = installSignalHandlers
1148        ReactorBase.startRunning(self)
1149
1150
1151    def _reallyStartRunning(self):
1152        """
1153        Extend the base implementation by also installing signal handlers, if
1154        C{self._installSignalHandlers} is true.
1155        """
1156        ReactorBase._reallyStartRunning(self)
1157        if self._installSignalHandlers:
1158            # Make sure this happens before after-startup events, since the
1159            # expectation of after-startup is that the reactor is fully
1160            # initialized.  Don't do it right away for historical reasons
1161            # (perhaps some before-startup triggers don't want there to be a
1162            # custom SIGCHLD handler so that they can run child processes with
1163            # some blocking api).
1164            self._handleSignals()
1165
1166
1167    def run(self, installSignalHandlers=True):
1168        self.startRunning(installSignalHandlers=installSignalHandlers)
1169        self.mainLoop()
1170
1171
1172    def mainLoop(self):
1173        while self._started:
1174            try:
1175                while self._started:
1176                    # Advance simulation time in delayed event
1177                    # processors.
1178                    self.runUntilCurrent()
1179                    t2 = self.timeout()
1180                    t = self.running and t2
1181                    self.doIteration(t)
1182            except:
1183                log.msg("Unexpected error in main loop.")
1184                log.err()
1185            else:
1186                log.msg('Main loop terminated.')
1187
1188
1189
1190__all__ = []
Note: See TracBrowser for help on using the browser.