root/trunk/twisted/internet/task.py

Revision 33440, 24.1 KB (checked in by itamarst, 4 months ago)

Merge loopingcall-2725.

Author: jerith
Review: exarkun, itamar
Fixes: #2725

Document the running attribute of LoopingCall.

Line 
1# -*- test-case-name: twisted.test.test_task,twisted.test.test_cooperator -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Scheduling utility methods and classes.
7
8@author: Jp Calderone
9"""
10
11__metaclass__ = type
12
13import time
14
15from zope.interface import implements
16
17from twisted.python import reflect
18from twisted.python.failure import Failure
19
20from twisted.internet import base, defer
21from twisted.internet.interfaces import IReactorTime
22
23
24class LoopingCall:
25    """Call a function repeatedly.
26
27    If C{f} returns a deferred, rescheduling will not take place until the
28    deferred has fired. The result value is ignored.
29
30    @ivar f: The function to call.
31    @ivar a: A tuple of arguments to pass the function.
32    @ivar kw: A dictionary of keyword arguments to pass to the function.
33    @ivar clock: A provider of
34        L{twisted.internet.interfaces.IReactorTime}.  The default is
35        L{twisted.internet.reactor}. Feel free to set this to
36        something else, but it probably ought to be set *before*
37        calling L{start}.
38
39    @type running: C{bool}
40    @ivar running: A flag which is C{True} while C{f} is scheduled to be called
41        (or is currently being called). It is set to C{True} when L{start} is
42        called and set to C{False} when L{stop} is called or if C{f} raises an
43        exception. In either case, it will be C{False} by the time the
44        C{Deferred} returned by L{start} fires its callback or errback.
45
46    @type _expectNextCallAt: C{float}
47    @ivar _expectNextCallAt: The time at which this instance most recently
48        scheduled itself to run.
49
50    @type _realLastTime: C{float}
51    @ivar _realLastTime: When counting skips, the time at which the skip
52        counter was last invoked.
53
54    @type _runAtStart: C{bool}
55    @ivar _runAtStart: A flag indicating whether the 'now' argument was passed
56        to L{LoopingCall.start}.
57    """
58
59    call = None
60    running = False
61    deferred = None
62    interval = None
63    _expectNextCallAt = 0.0
64    _runAtStart = False
65    starttime = None
66
67    def __init__(self, f, *a, **kw):
68        self.f = f
69        self.a = a
70        self.kw = kw
71        from twisted.internet import reactor
72        self.clock = reactor
73
74
75    def withCount(cls, countCallable):
76        """
77        An alternate constructor for L{LoopingCall} that makes available the
78        number of calls which should have occurred since it was last invoked.
79
80        Note that this number is an C{int} value; It represents the discrete
81        number of calls that should have been made.  For example, if you are
82        using a looping call to display an animation with discrete frames, this
83        number would be the number of frames to advance.
84
85        The count is normally 1, but can be higher. For example, if the reactor
86        is blocked and takes too long to invoke the L{LoopingCall}, a Deferred
87        returned from a previous call is not fired before an interval has
88        elapsed, or if the callable itself blocks for longer than an interval,
89        preventing I{itself} from being called.
90
91        @param countCallable: A callable that will be invoked each time the
92            resulting LoopingCall is run, with an integer specifying the number
93            of calls that should have been invoked.
94
95        @type countCallable: 1-argument callable which takes an C{int}
96
97        @return: An instance of L{LoopingCall} with call counting enabled,
98            which provides the count as the first positional argument.
99
100        @rtype: L{LoopingCall}
101
102        @since: 9.0
103        """
104
105        def counter():
106            now = self.clock.seconds()
107            lastTime = self._realLastTime
108            if lastTime is None:
109                lastTime = self.starttime
110                if self._runAtStart:
111                    lastTime -= self.interval
112            self._realLastTime = now
113            lastInterval = self._intervalOf(lastTime)
114            thisInterval = self._intervalOf(now)
115            count = thisInterval - lastInterval
116            return countCallable(count)
117
118        self = cls(counter)
119
120        self._realLastTime = None
121
122        return self
123
124    withCount = classmethod(withCount)
125
126
127    def _intervalOf(self, t):
128        """
129        Determine the number of intervals passed as of the given point in
130        time.
131
132        @param t: The specified time (from the start of the L{LoopingCall}) to
133            be measured in intervals
134
135        @return: The C{int} number of intervals which have passed as of the
136            given point in time.
137        """
138        elapsedTime = t - self.starttime
139        intervalNum = int(elapsedTime / self.interval)
140        return intervalNum
141
142
143    def start(self, interval, now=True):
144        """
145        Start running function every interval seconds.
146
147        @param interval: The number of seconds between calls.  May be
148        less than one.  Precision will depend on the underlying
149        platform, the available hardware, and the load on the system.
150
151        @param now: If True, run this call right now.  Otherwise, wait
152        until the interval has elapsed before beginning.
153
154        @return: A Deferred whose callback will be invoked with
155        C{self} when C{self.stop} is called, or whose errback will be
156        invoked when the function raises an exception or returned a
157        deferred that has its errback invoked.
158        """
159        assert not self.running, ("Tried to start an already running "
160                                  "LoopingCall.")
161        if interval < 0:
162            raise ValueError, "interval must be >= 0"
163        self.running = True
164        d = self.deferred = defer.Deferred()
165        self.starttime = self.clock.seconds()
166        self._expectNextCallAt = self.starttime
167        self.interval = interval
168        self._runAtStart = now
169        if now:
170            self()
171        else:
172            self._reschedule()
173        return d
174
175    def stop(self):
176        """Stop running function.
177        """
178        assert self.running, ("Tried to stop a LoopingCall that was "
179                              "not running.")
180        self.running = False
181        if self.call is not None:
182            self.call.cancel()
183            self.call = None
184            d, self.deferred = self.deferred, None
185            d.callback(self)
186
187    def reset(self):
188        """
189        Skip the next iteration and reset the timer.
190
191        @since: 11.1
192        """
193        assert self.running, ("Tried to reset a LoopingCall that was "
194                              "not running.")
195        if self.call is not None:
196            self.call.cancel()
197            self.call = None
198            self._expectNextCallAt = self.clock.seconds()
199            self._reschedule()
200
201    def __call__(self):
202        def cb(result):
203            if self.running:
204                self._reschedule()
205            else:
206                d, self.deferred = self.deferred, None
207                d.callback(self)
208
209        def eb(failure):
210            self.running = False
211            d, self.deferred = self.deferred, None
212            d.errback(failure)
213
214        self.call = None
215        d = defer.maybeDeferred(self.f, *self.a, **self.kw)
216        d.addCallback(cb)
217        d.addErrback(eb)
218
219
220    def _reschedule(self):
221        """
222        Schedule the next iteration of this looping call.
223        """
224        if self.interval == 0:
225            self.call = self.clock.callLater(0, self)
226            return
227
228        currentTime = self.clock.seconds()
229        # Find how long is left until the interval comes around again.
230        untilNextTime = (self._expectNextCallAt - currentTime) % self.interval
231        # Make sure it is in the future, in case more than one interval worth
232        # of time passed since the previous call was made.
233        nextTime = max(
234            self._expectNextCallAt + self.interval, currentTime + untilNextTime)
235        # If the interval falls on the current time exactly, skip it and
236        # schedule the call for the next interval.
237        if nextTime == currentTime:
238            nextTime += self.interval
239        self._expectNextCallAt = nextTime
240        self.call = self.clock.callLater(nextTime - currentTime, self)
241
242
243    def __repr__(self):
244        if hasattr(self.f, 'func_name'):
245            func = self.f.func_name
246            if hasattr(self.f, 'im_class'):
247                func = self.f.im_class.__name__ + '.' + func
248        else:
249            func = reflect.safe_repr(self.f)
250
251        return 'LoopingCall<%r>(%s, *%s, **%s)' % (
252            self.interval, func, reflect.safe_repr(self.a),
253            reflect.safe_repr(self.kw))
254
255
256
257class SchedulerError(Exception):
258    """
259    The operation could not be completed because the scheduler or one of its
260    tasks was in an invalid state.  This exception should not be raised
261    directly, but is a superclass of various scheduler-state-related
262    exceptions.
263    """
264
265
266
267class SchedulerStopped(SchedulerError):
268    """
269    The operation could not complete because the scheduler was stopped in
270    progress or was already stopped.
271    """
272
273
274
275class TaskFinished(SchedulerError):
276    """
277    The operation could not complete because the task was already completed,
278    stopped, encountered an error or otherwise permanently stopped running.
279    """
280
281
282
283class TaskDone(TaskFinished):
284    """
285    The operation could not complete because the task was already completed.
286    """
287
288
289
290class TaskStopped(TaskFinished):
291    """
292    The operation could not complete because the task was stopped.
293    """
294
295
296
297class TaskFailed(TaskFinished):
298    """
299    The operation could not complete because the task died with an unhandled
300    error.
301    """
302
303
304
305class NotPaused(SchedulerError):
306    """
307    This exception is raised when a task is resumed which was not previously
308    paused.
309    """
310
311
312
313class _Timer(object):
314    MAX_SLICE = 0.01
315    def __init__(self):
316        self.end = time.time() + self.MAX_SLICE
317
318
319    def __call__(self):
320        return time.time() >= self.end
321
322
323
324_EPSILON = 0.00000001
325def _defaultScheduler(x):
326    from twisted.internet import reactor
327    return reactor.callLater(_EPSILON, x)
328
329
330class CooperativeTask(object):
331    """
332    A L{CooperativeTask} is a task object inside a L{Cooperator}, which can be
333    paused, resumed, and stopped.  It can also have its completion (or
334    termination) monitored.
335
336    @see: L{CooperativeTask.cooperate}
337
338    @ivar _iterator: the iterator to iterate when this L{CooperativeTask} is
339        asked to do work.
340
341    @ivar _cooperator: the L{Cooperator} that this L{CooperativeTask}
342        participates in, which is used to re-insert it upon resume.
343
344    @ivar _deferreds: the list of L{defer.Deferred}s to fire when this task
345        completes, fails, or finishes.
346
347    @type _deferreds: L{list}
348
349    @type _cooperator: L{Cooperator}
350
351    @ivar _pauseCount: the number of times that this L{CooperativeTask} has
352        been paused; if 0, it is running.
353
354    @type _pauseCount: L{int}
355
356    @ivar _completionState: The completion-state of this L{CooperativeTask}.
357        C{None} if the task is not yet completed, an instance of L{TaskStopped}
358        if C{stop} was called to stop this task early, of L{TaskFailed} if the
359        application code in the iterator raised an exception which caused it to
360        terminate, and of L{TaskDone} if it terminated normally via raising
361        L{StopIteration}.
362
363    @type _completionState: L{TaskFinished}
364    """
365
366    def __init__(self, iterator, cooperator):
367        """
368        A private constructor: to create a new L{CooperativeTask}, see
369        L{Cooperator.cooperate}.
370        """
371        self._iterator = iterator
372        self._cooperator = cooperator
373        self._deferreds = []
374        self._pauseCount = 0
375        self._completionState = None
376        self._completionResult = None
377        cooperator._addTask(self)
378
379
380    def whenDone(self):
381        """
382        Get a L{defer.Deferred} notification of when this task is complete.
383
384        @return: a L{defer.Deferred} that fires with the C{iterator} that this
385            L{CooperativeTask} was created with when the iterator has been
386            exhausted (i.e. its C{next} method has raised L{StopIteration}), or
387            fails with the exception raised by C{next} if it raises some other
388            exception.
389
390        @rtype: L{defer.Deferred}
391        """
392        d = defer.Deferred()
393        if self._completionState is None:
394            self._deferreds.append(d)
395        else:
396            d.callback(self._completionResult)
397        return d
398
399
400    def pause(self):
401        """
402        Pause this L{CooperativeTask}.  Stop doing work until
403        L{CooperativeTask.resume} is called.  If C{pause} is called more than
404        once, C{resume} must be called an equal number of times to resume this
405        task.
406
407        @raise TaskFinished: if this task has already finished or completed.
408        """
409        self._checkFinish()
410        self._pauseCount += 1
411        if self._pauseCount == 1:
412            self._cooperator._removeTask(self)
413
414
415    def resume(self):
416        """
417        Resume processing of a paused L{CooperativeTask}.
418
419        @raise NotPaused: if this L{CooperativeTask} is not paused.
420        """
421        if self._pauseCount == 0:
422            raise NotPaused()
423        self._pauseCount -= 1
424        if self._pauseCount == 0 and self._completionState is None:
425            self._cooperator._addTask(self)
426
427
428    def _completeWith(self, completionState, deferredResult):
429        """
430        @param completionState: a L{TaskFinished} exception or a subclass
431            thereof, indicating what exception should be raised when subsequent
432            operations are performed.
433
434        @param deferredResult: the result to fire all the deferreds with.
435        """
436        self._completionState = completionState
437        self._completionResult = deferredResult
438        if not self._pauseCount:
439            self._cooperator._removeTask(self)
440
441        # The Deferreds need to be invoked after all this is completed, because
442        # a Deferred may want to manipulate other tasks in a Cooperator.  For
443        # example, if you call "stop()" on a cooperator in a callback on a
444        # Deferred returned from whenDone(), this CooperativeTask must be gone
445        # from the Cooperator by that point so that _completeWith is not
446        # invoked reentrantly; that would cause these Deferreds to blow up with
447        # an AlreadyCalledError, or the _removeTask to fail with a ValueError.
448        for d in self._deferreds:
449            d.callback(deferredResult)
450
451
452    def stop(self):
453        """
454        Stop further processing of this task.
455
456        @raise TaskFinished: if this L{CooperativeTask} has previously
457            completed, via C{stop}, completion, or failure.
458        """
459        self._checkFinish()
460        self._completeWith(TaskStopped(), Failure(TaskStopped()))
461
462
463    def _checkFinish(self):
464        """
465        If this task has been stopped, raise the appropriate subclass of
466        L{TaskFinished}.
467        """
468        if self._completionState is not None:
469            raise self._completionState
470
471
472    def _oneWorkUnit(self):
473        """
474        Perform one unit of work for this task, retrieving one item from its
475        iterator, stopping if there are no further items in the iterator, and
476        pausing if the result was a L{defer.Deferred}.
477        """
478        try:
479            result = self._iterator.next()
480        except StopIteration:
481            self._completeWith(TaskDone(), self._iterator)
482        except:
483            self._completeWith(TaskFailed(), Failure())
484        else:
485            if isinstance(result, defer.Deferred):
486                self.pause()
487                def failLater(f):
488                    self._completeWith(TaskFailed(), f)
489                result.addCallbacks(lambda result: self.resume(),
490                                    failLater)
491
492
493
494class Cooperator(object):
495    """
496    Cooperative task scheduler.
497    """
498
499    def __init__(self,
500                 terminationPredicateFactory=_Timer,
501                 scheduler=_defaultScheduler,
502                 started=True):
503        """
504        Create a scheduler-like object to which iterators may be added.
505
506        @param terminationPredicateFactory: A no-argument callable which will
507        be invoked at the beginning of each step and should return a
508        no-argument callable which will return True when the step should be
509        terminated.  The default factory is time-based and allows iterators to
510        run for 1/100th of a second at a time.
511
512        @param scheduler: A one-argument callable which takes a no-argument
513        callable and should invoke it at some future point.  This will be used
514        to schedule each step of this Cooperator.
515
516        @param started: A boolean which indicates whether iterators should be
517        stepped as soon as they are added, or if they will be queued up until
518        L{Cooperator.start} is called.
519        """
520        self._tasks = []
521        self._metarator = iter(())
522        self._terminationPredicateFactory = terminationPredicateFactory
523        self._scheduler = scheduler
524        self._delayedCall = None
525        self._stopped = False
526        self._started = started
527
528
529    def coiterate(self, iterator, doneDeferred=None):
530        """
531        Add an iterator to the list of iterators this L{Cooperator} is
532        currently running.
533
534        @param doneDeferred: If specified, this will be the Deferred used as
535            the completion deferred.  It is suggested that you use the default,
536            which creates a new Deferred for you.
537
538        @return: a Deferred that will fire when the iterator finishes.
539        """
540        if doneDeferred is None:
541            doneDeferred = defer.Deferred()
542        CooperativeTask(iterator, self).whenDone().chainDeferred(doneDeferred)
543        return doneDeferred
544
545
546    def cooperate(self, iterator):
547        """
548        Start running the given iterator as a long-running cooperative task, by
549        calling next() on it as a periodic timed event.
550
551        @param iterator: the iterator to invoke.
552
553        @return: a L{CooperativeTask} object representing this task.
554        """
555        return CooperativeTask(iterator, self)
556
557
558    def _addTask(self, task):
559        """
560        Add a L{CooperativeTask} object to this L{Cooperator}.
561        """
562        if self._stopped:
563            self._tasks.append(task) # XXX silly, I know, but _completeWith
564                                     # does the inverse
565            task._completeWith(SchedulerStopped(), Failure(SchedulerStopped()))
566        else:
567            self._tasks.append(task)
568            self._reschedule()
569
570
571    def _removeTask(self, task):
572        """
573        Remove a L{CooperativeTask} from this L{Cooperator}.
574        """
575        self._tasks.remove(task)
576        # If no work left to do, cancel the delayed call:
577        if not self._tasks and self._delayedCall:
578            self._delayedCall.cancel()
579            self._delayedCall = None
580
581
582    def _tasksWhileNotStopped(self):
583        """
584        Yield all L{CooperativeTask} objects in a loop as long as this
585        L{Cooperator}'s termination condition has not been met.
586        """
587        terminator = self._terminationPredicateFactory()
588        while self._tasks:
589            for t in self._metarator:
590                yield t
591                if terminator():
592                    return
593            self._metarator = iter(self._tasks)
594
595
596    def _tick(self):
597        """
598        Run one scheduler tick.
599        """
600        self._delayedCall = None
601        for taskObj in self._tasksWhileNotStopped():
602            taskObj._oneWorkUnit()
603        self._reschedule()
604
605
606    _mustScheduleOnStart = False
607    def _reschedule(self):
608        if not self._started:
609            self._mustScheduleOnStart = True
610            return
611        if self._delayedCall is None and self._tasks:
612            self._delayedCall = self._scheduler(self._tick)
613
614
615    def start(self):
616        """
617        Begin scheduling steps.
618        """
619        self._stopped = False
620        self._started = True
621        if self._mustScheduleOnStart:
622            del self._mustScheduleOnStart
623            self._reschedule()
624
625
626    def stop(self):
627        """
628        Stop scheduling steps.  Errback the completion Deferreds of all
629        iterators which have been added and forget about them.
630        """
631        self._stopped = True
632        for taskObj in self._tasks:
633            taskObj._completeWith(SchedulerStopped(),
634                                  Failure(SchedulerStopped()))
635        self._tasks = []
636        if self._delayedCall is not None:
637            self._delayedCall.cancel()
638            self._delayedCall = None
639
640
641
642_theCooperator = Cooperator()
643
644def coiterate(iterator):
645    """
646    Cooperatively iterate over the given iterator, dividing runtime between it
647    and all other iterators which have been passed to this function and not yet
648    exhausted.
649    """
650    return _theCooperator.coiterate(iterator)
651
652
653
654def cooperate(iterator):
655    """
656    Start running the given iterator as a long-running cooperative task, by
657    calling next() on it as a periodic timed event.
658
659    @param iterator: the iterator to invoke.
660
661    @return: a L{CooperativeTask} object representing this task.
662    """
663    return _theCooperator.cooperate(iterator)
664
665
666
667class Clock:
668    """
669    Provide a deterministic, easily-controlled implementation of
670    L{IReactorTime.callLater}.  This is commonly useful for writing
671    deterministic unit tests for code which schedules events using this API.
672    """
673    implements(IReactorTime)
674
675    rightNow = 0.0
676
677    def __init__(self):
678        self.calls = []
679
680
681    def seconds(self):
682        """
683        Pretend to be time.time().  This is used internally when an operation
684        such as L{IDelayedCall.reset} needs to determine a a time value
685        relative to the current time.
686
687        @rtype: C{float}
688        @return: The time which should be considered the current time.
689        """
690        return self.rightNow
691
692
693    def _sortCalls(self):
694        """
695        Sort the pending calls according to the time they are scheduled.
696        """
697        self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime()))
698
699
700    def callLater(self, when, what, *a, **kw):
701        """
702        See L{twisted.internet.interfaces.IReactorTime.callLater}.
703        """
704        dc = base.DelayedCall(self.seconds() + when,
705                               what, a, kw,
706                               self.calls.remove,
707                               lambda c: None,
708                               self.seconds)
709        self.calls.append(dc)
710        self._sortCalls()
711        return dc
712
713
714    def getDelayedCalls(self):
715        """
716        See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls}
717        """
718        return self.calls
719
720
721    def advance(self, amount):
722        """
723        Move time on this clock forward by the given amount and run whatever
724        pending calls should be run.
725
726        @type amount: C{float}
727        @param amount: The number of seconds which to advance this clock's
728        time.
729        """
730        self.rightNow += amount
731        self._sortCalls()
732        while self.calls and self.calls[0].getTime() <= self.seconds():
733            call = self.calls.pop(0)
734            call.called = 1
735            call.func(*call.args, **call.kw)
736            self._sortCalls()
737
738
739    def pump(self, timings):
740        """
741        Advance incrementally by the given set of times.
742
743        @type timings: iterable of C{float}
744        """
745        for amount in timings:
746            self.advance(amount)
747
748
749
750def deferLater(clock, delay, callable, *args, **kw):
751    """
752    Call the given function after a certain period of time has passed.
753
754    @type clock: L{IReactorTime} provider
755    @param clock: The object which will be used to schedule the delayed
756        call.
757
758    @type delay: C{float} or C{int}
759    @param delay: The number of seconds to wait before calling the function.
760
761    @param callable: The object to call after the delay.
762
763    @param *args: The positional arguments to pass to C{callable}.
764
765    @param **kw: The keyword arguments to pass to C{callable}.
766
767    @rtype: L{defer.Deferred}
768
769    @return: A deferred that fires with the result of the callable when the
770        specified time has elapsed.
771    """
772    def deferLaterCancel(deferred):
773        delayedCall.cancel()
774    d = defer.Deferred(deferLaterCancel)
775    d.addCallback(lambda ignored: callable(*args, **kw))
776    delayedCall = clock.callLater(delay, d.callback, None)
777    return d
778
779
780
781__all__ = [
782    'LoopingCall',
783
784    'Clock',
785
786    'SchedulerStopped', 'Cooperator', 'coiterate',
787
788    'deferLater',
789    ]
Note: See TracBrowser for help on using the browser.