root/trunk/twisted/internet/task.py

Revision 28580, 23.0 KB (checked in by exarkun, 6 months ago)

Merge cancel-deferlater-4318

Author: exarkun
Reviewer: therve
Fixes: #4318

Implement a canceller function for the Deferred returned by deferLater which
cancels the underlying DelayedCall.

Line 
1# -*- test-case-name: twisted.test.test_task,twisted.test.test_cooperator -*-
2# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Scheduling utility methods and classes.
7
8@author: Jp Calderone
9"""
10
11__metaclass__ = type
12
13import time
14
15from zope.interface import implements
16
17from twisted.python import reflect
18from twisted.python.failure import Failure
19
20from twisted.internet import base, defer
21from twisted.internet.interfaces import IReactorTime
22
23
24class LoopingCall:
25    """Call a function repeatedly.
26
27    If C{f} returns a deferred, rescheduling will not take place until the
28    deferred has fired. The result value is ignored.
29
30    @ivar f: The function to call.
31    @ivar a: A tuple of arguments to pass the function.
32    @ivar kw: A dictionary of keyword arguments to pass to the function.
33    @ivar clock: A provider of
34        L{twisted.internet.interfaces.IReactorTime}.  The default is
35        L{twisted.internet.reactor}. Feel free to set this to
36        something else, but it probably ought to be set *before*
37        calling L{start}.
38
39    @type _expectNextCallAt: C{float}
40    @ivar _expectNextCallAt: The time at which this instance most recently
41        scheduled itself to run.
42
43    @type _realLastTime: C{float}
44    @ivar _realLastTime: When counting skips, the time at which the skip counter
45        was last invoked.
46
47    @type _runAtStart: C{bool}
48    @ivar _runAtStart: A flag indicating whether the 'now' argument was passed
49        to L{LoopingCall.start}.
50    """
51
52    call = None
53    running = False
54    deferred = None
55    interval = None
56    _expectNextCallAt = 0.0
57    _runAtStart = False
58    starttime = None
59
60    def __init__(self, f, *a, **kw):
61        self.f = f
62        self.a = a
63        self.kw = kw
64        from twisted.internet import reactor
65        self.clock = reactor
66
67
68    def withCount(cls, countCallable):
69        """
70        An alternate constructor for L{LoopingCall} that makes available the
71        number of calls which should have occurred since it was last invoked.
72
73        Note that this number is an C{int} value; It represents the discrete
74        number of calls that should have been made.  For example, if you are
75        using a looping call to display an animation with discrete frames, this
76        number would be the number of frames to advance.
77
78        The count is normally 1, but can be higher. For example, if the reactor
79        is blocked and takes too long to invoke the L{LoopingCall}, a Deferred
80        returned from a previous call is not fired before an interval has
81        elapsed, or if the callable itself blocks for longer than an interval,
82        preventing I{itself} from being called.
83
84        @param countCallable: A callable that will be invoked each time the
85            resulting LoopingCall is run, with an integer specifying the number
86            of calls that should have been invoked.
87
88        @type countCallable: 1-argument callable which takes an C{int}
89
90        @return: An instance of L{LoopingCall} with call counting enabled,
91            which provides the count as the first positional argument.
92
93        @rtype: L{LoopingCall}
94
95        @since: 9.0
96        """
97
98        def counter():
99            now = self.clock.seconds()
100            lastTime = self._realLastTime
101            if lastTime is None:
102                lastTime = self.starttime
103                if self._runAtStart:
104                    lastTime -= self.interval
105            self._realLastTime = now
106            lastInterval = self._intervalOf(lastTime)
107            thisInterval = self._intervalOf(now)
108            count = thisInterval - lastInterval
109            return countCallable(count)
110
111        self = cls(counter)
112
113        self._realLastTime = None
114
115        return self
116
117    withCount = classmethod(withCount)
118
119
120    def _intervalOf(self, t):
121        """
122        Determine the number of intervals passed as of the given point in
123        time.
124
125        @param t: The specified time (from the start of the L{LoopingCall}) to
126            be measured in intervals
127
128        @return: The C{int} number of intervals which have passed as of the
129            given point in time.
130        """
131        elapsedTime = t - self.starttime
132        intervalNum = int(elapsedTime / self.interval)
133        return intervalNum
134
135
136    def start(self, interval, now=True):
137        """
138        Start running function every interval seconds.
139
140        @param interval: The number of seconds between calls.  May be
141        less than one.  Precision will depend on the underlying
142        platform, the available hardware, and the load on the system.
143
144        @param now: If True, run this call right now.  Otherwise, wait
145        until the interval has elapsed before beginning.
146
147        @return: A Deferred whose callback will be invoked with
148        C{self} when C{self.stop} is called, or whose errback will be
149        invoked when the function raises an exception or returned a
150        deferred that has its errback invoked.
151        """
152        assert not self.running, ("Tried to start an already running "
153                                  "LoopingCall.")
154        if interval < 0:
155            raise ValueError, "interval must be >= 0"
156        self.running = True
157        d = self.deferred = defer.Deferred()
158        self.starttime = self.clock.seconds()
159        self._expectNextCallAt = self.starttime
160        self.interval = interval
161        self._runAtStart = now
162        if now:
163            self()
164        else:
165            self._reschedule()
166        return d
167
168    def stop(self):
169        """Stop running function.
170        """
171        assert self.running, ("Tried to stop a LoopingCall that was "
172                              "not running.")
173        self.running = False
174        if self.call is not None:
175            self.call.cancel()
176            self.call = None
177            d, self.deferred = self.deferred, None
178            d.callback(self)
179
180    def __call__(self):
181        def cb(result):
182            if self.running:
183                self._reschedule()
184            else:
185                d, self.deferred = self.deferred, None
186                d.callback(self)
187
188        def eb(failure):
189            self.running = False
190            d, self.deferred = self.deferred, None
191            d.errback(failure)
192
193        self.call = None
194        d = defer.maybeDeferred(self.f, *self.a, **self.kw)
195        d.addCallback(cb)
196        d.addErrback(eb)
197
198
199    def _reschedule(self):
200        """
201        Schedule the next iteration of this looping call.
202        """
203        if self.interval == 0:
204            self.call = self.clock.callLater(0, self)
205            return
206
207        currentTime = self.clock.seconds()
208        # Find how long is left until the interval comes around again.
209        untilNextTime = (self._expectNextCallAt - currentTime) % self.interval
210        # Make sure it is in the future, in case more than one interval worth
211        # of time passed since the previous call was made.
212        nextTime = max(
213            self._expectNextCallAt + self.interval, currentTime + untilNextTime)
214        # If the interval falls on the current time exactly, skip it and
215        # schedule the call for the next interval.
216        if nextTime == currentTime:
217            nextTime += self.interval
218        self._expectNextCallAt = nextTime
219        self.call = self.clock.callLater(nextTime - currentTime, self)
220
221
222    def __repr__(self):
223        if hasattr(self.f, 'func_name'):
224            func = self.f.func_name
225            if hasattr(self.f, 'im_class'):
226                func = self.f.im_class.__name__ + '.' + func
227        else:
228            func = reflect.safe_repr(self.f)
229
230        return 'LoopingCall<%r>(%s, *%s, **%s)' % (
231            self.interval, func, reflect.safe_repr(self.a),
232            reflect.safe_repr(self.kw))
233
234
235
236class SchedulerError(Exception):
237    """
238    The operation could not be completed because the scheduler or one of its
239    tasks was in an invalid state.  This exception should not be raised
240    directly, but is a superclass of various scheduler-state-related
241    exceptions.
242    """
243
244
245
246class SchedulerStopped(SchedulerError):
247    """
248    The operation could not complete because the scheduler was stopped in
249    progress or was already stopped.
250    """
251
252
253
254class TaskFinished(SchedulerError):
255    """
256    The operation could not complete because the task was already completed,
257    stopped, encountered an error or otherwise permanently stopped running.
258    """
259
260
261
262class TaskDone(TaskFinished):
263    """
264    The operation could not complete because the task was already completed.
265    """
266
267
268
269class TaskStopped(TaskFinished):
270    """
271    The operation could not complete because the task was stopped.
272    """
273
274
275
276class TaskFailed(TaskFinished):
277    """
278    The operation could not complete because the task died with an unhandled
279    error.
280    """
281
282
283
284class NotPaused(SchedulerError):
285    """
286    This exception is raised when a task is resumed which was not previously
287    paused.
288    """
289
290
291
292class _Timer(object):
293    MAX_SLICE = 0.01
294    def __init__(self):
295        self.end = time.time() + self.MAX_SLICE
296
297
298    def __call__(self):
299        return time.time() >= self.end
300
301
302
303_EPSILON = 0.00000001
304def _defaultScheduler(x):
305    from twisted.internet import reactor
306    return reactor.callLater(_EPSILON, x)
307
308
309class CooperativeTask(object):
310    """
311    A L{CooperativeTask} is a task object inside a L{Cooperator}, which can be
312    paused, resumed, and stopped.  It can also have its completion (or
313    termination) monitored.
314
315    @see: L{CooperativeTask.cooperate}
316
317    @ivar _iterator: the iterator to iterate when this L{CooperativeTask} is
318        asked to do work.
319
320    @ivar _cooperator: the L{Cooperator} that this L{CooperativeTask}
321        participates in, which is used to re-insert it upon resume.
322
323    @ivar _deferreds: the list of L{defer.Deferred}s to fire when this task
324        completes, fails, or finishes.
325
326    @type _deferreds: L{list}
327
328    @type _cooperator: L{Cooperator}
329
330    @ivar _pauseCount: the number of times that this L{CooperativeTask} has
331        been paused; if 0, it is running.
332
333    @type _pauseCount: L{int}
334
335    @ivar _completionState: The completion-state of this L{CooperativeTask}.
336        C{None} if the task is not yet completed, an instance of L{TaskStopped}
337        if C{stop} was called to stop this task early, of L{TaskFailed} if the
338        application code in the iterator raised an exception which caused it to
339        terminate, and of L{TaskDone} if it terminated normally via raising
340        L{StopIteration}.
341
342    @type _completionState: L{TaskFinished}
343    """
344
345    def __init__(self, iterator, cooperator):
346        """
347        A private constructor: to create a new L{CooperativeTask}, see
348        L{Cooperator.cooperate}.
349        """
350        self._iterator = iterator
351        self._cooperator = cooperator
352        self._deferreds = []
353        self._pauseCount = 0
354        self._completionState = None
355        self._completionResult = None
356        cooperator._addTask(self)
357
358
359    def whenDone(self):
360        """
361        Get a L{defer.Deferred} notification of when this task is complete.
362
363        @return: a L{defer.Deferred} that fires with the C{iterator} that this
364            L{CooperativeTask} was created with when the iterator has been
365            exhausted (i.e. its C{next} method has raised L{StopIteration}), or
366            fails with the exception raised by C{next} if it raises some other
367            exception.
368
369        @rtype: L{defer.Deferred}
370        """
371        d = defer.Deferred()
372        if self._completionState is None:
373            self._deferreds.append(d)
374        else:
375            d.callback(self._completionResult)
376        return d
377
378
379    def pause(self):
380        """
381        Pause this L{CooperativeTask}.  Stop doing work until
382        L{CooperativeTask.resume} is called.  If C{pause} is called more than
383        once, C{resume} must be called an equal number of times to resume this
384        task.
385
386        @raise TaskFinished: if this task has already finished or completed.
387        """
388        self._checkFinish()
389        self._pauseCount += 1
390        if self._pauseCount == 1:
391            self._cooperator._removeTask(self)
392
393
394    def resume(self):
395        """
396        Resume processing of a paused L{CooperativeTask}.
397
398        @raise NotPaused: if this L{CooperativeTask} is not paused.
399        """
400        if self._pauseCount == 0:
401            raise NotPaused()
402        self._pauseCount -= 1
403        if self._pauseCount == 0 and self._completionState is None:
404            self._cooperator._addTask(self)
405
406
407    def _completeWith(self, completionState, deferredResult):
408        """
409        @param completionState: a L{TaskFinished} exception or a subclass
410            thereof, indicating what exception should be raised when subsequent
411            operations are performed.
412
413        @param deferredResult: the result to fire all the deferreds with.
414        """
415        self._completionState = completionState
416        self._completionResult = deferredResult
417        if not self._pauseCount:
418            self._cooperator._removeTask(self)
419
420        # The Deferreds need to be invoked after all this is completed, because
421        # a Deferred may want to manipulate other tasks in a Cooperator.  For
422        # example, if you call "stop()" on a cooperator in a callback on a
423        # Deferred returned from whenDone(), this CooperativeTask must be gone
424        # from the Cooperator by that point so that _completeWith is not
425        # invoked reentrantly; that would cause these Deferreds to blow up with
426        # an AlreadyCalledError, or the _removeTask to fail with a ValueError.
427        for d in self._deferreds:
428            d.callback(deferredResult)
429
430
431    def stop(self):
432        """
433        Stop further processing of this task.
434
435        @raise TaskFinished: if this L{CooperativeTask} has previously
436            completed, via C{stop}, completion, or failure.
437        """
438        self._checkFinish()
439        self._completeWith(TaskStopped(), Failure(TaskStopped()))
440
441
442    def _checkFinish(self):
443        """
444        If this task has been stopped, raise the appropriate subclass of
445        L{TaskFinished}.
446        """
447        if self._completionState is not None:
448            raise self._completionState
449
450
451    def _oneWorkUnit(self):
452        """
453        Perform one unit of work for this task, retrieving one item from its
454        iterator, stopping if there are no further items in the iterator, and
455        pausing if the result was a L{defer.Deferred}.
456        """
457        try:
458            result = self._iterator.next()
459        except StopIteration:
460            self._completeWith(TaskDone(), self._iterator)
461        except:
462            self._completeWith(TaskFailed(), Failure())
463        else:
464            if isinstance(result, defer.Deferred):
465                self.pause()
466                def failLater(f):
467                    self._completeWith(TaskFailed(), f)
468                result.addCallbacks(lambda result: self.resume(),
469                                    failLater)
470
471
472
473class Cooperator(object):
474    """
475    Cooperative task scheduler.
476    """
477
478    def __init__(self,
479                 terminationPredicateFactory=_Timer,
480                 scheduler=_defaultScheduler,
481                 started=True):
482        """
483        Create a scheduler-like object to which iterators may be added.
484
485        @param terminationPredicateFactory: A no-argument callable which will
486        be invoked at the beginning of each step and should return a
487        no-argument callable which will return False when the step should be
488        terminated.  The default factory is time-based and allows iterators to
489        run for 1/100th of a second at a time.
490
491        @param scheduler: A one-argument callable which takes a no-argument
492        callable and should invoke it at some future point.  This will be used
493        to schedule each step of this Cooperator.
494
495        @param started: A boolean which indicates whether iterators should be
496        stepped as soon as they are added, or if they will be queued up until
497        L{Cooperator.start} is called.
498        """
499        self._tasks = []
500        self._metarator = iter(())
501        self._terminationPredicateFactory = terminationPredicateFactory
502        self._scheduler = scheduler
503        self._delayedCall = None
504        self._stopped = False
505        self._started = started
506
507
508    def coiterate(self, iterator, doneDeferred=None):
509        """
510        Add an iterator to the list of iterators this L{Cooperator} is
511        currently running.
512
513        @param doneDeferred: If specified, this will be the Deferred used as
514            the completion deferred.  It is suggested that you use the default,
515            which creates a new Deferred for you.
516
517        @return: a Deferred that will fire when the iterator finishes.
518        """
519        if doneDeferred is None:
520            doneDeferred = defer.Deferred()
521        CooperativeTask(iterator, self).whenDone().chainDeferred(doneDeferred)
522        return doneDeferred
523
524
525    def cooperate(self, iterator):
526        """
527        Start running the given iterator as a long-running cooperative task, by
528        calling next() on it as a periodic timed event.
529
530        @param iterator: the iterator to invoke.
531
532        @return: a L{CooperativeTask} object representing this task.
533        """
534        return CooperativeTask(iterator, self)
535
536
537    def _addTask(self, task):
538        """
539        Add a L{CooperativeTask} object to this L{Cooperator}.
540        """
541        if self._stopped:
542            self._tasks.append(task) # XXX silly, I know, but _completeWith
543                                     # does the inverse
544            task._completeWith(SchedulerStopped(), Failure(SchedulerStopped()))
545        else:
546            self._tasks.append(task)
547            self._reschedule()
548
549
550    def _removeTask(self, task):
551        """
552        Remove a L{CooperativeTask} from this L{Cooperator}.
553        """
554        self._tasks.remove(task)
555
556
557    def _tasksWhileNotStopped(self):
558        """
559        Yield all L{CooperativeTask} objects in a loop as long as this
560        L{Cooperator}'s termination condition has not been met.
561        """
562        terminator = self._terminationPredicateFactory()
563        while self._tasks:
564            for t in self._metarator:
565                yield t
566                if terminator():
567                    return
568            self._metarator = iter(self._tasks)
569
570
571    def _tick(self):
572        """
573        Run one scheduler tick.
574        """
575        self._delayedCall = None
576        for taskObj in self._tasksWhileNotStopped():
577            taskObj._oneWorkUnit()
578        self._reschedule()
579
580
581    _mustScheduleOnStart = False
582    def _reschedule(self):
583        if not self._started:
584            self._mustScheduleOnStart = True
585            return
586        if self._delayedCall is None and self._tasks:
587            self._delayedCall = self._scheduler(self._tick)
588
589
590    def start(self):
591        """
592        Begin scheduling steps.
593        """
594        self._stopped = False
595        self._started = True
596        if self._mustScheduleOnStart:
597            del self._mustScheduleOnStart
598            self._reschedule()
599
600
601    def stop(self):
602        """
603        Stop scheduling steps.  Errback the completion Deferreds of all
604        iterators which have been added and forget about them.
605        """
606        self._stopped = True
607        for taskObj in self._tasks:
608            taskObj._completeWith(SchedulerStopped(),
609                                  Failure(SchedulerStopped()))
610        self._tasks = []
611        if self._delayedCall is not None:
612            self._delayedCall.cancel()
613            self._delayedCall = None
614
615
616
617_theCooperator = Cooperator()
618
619def coiterate(iterator):
620    """
621    Cooperatively iterate over the given iterator, dividing runtime between it
622    and all other iterators which have been passed to this function and not yet
623    exhausted.
624    """
625    return _theCooperator.coiterate(iterator)
626
627
628
629def cooperate(iterator):
630    """
631    Start running the given iterator as a long-running cooperative task, by
632    calling next() on it as a periodic timed event.
633
634    @param iterator: the iterator to invoke.
635
636    @return: a L{CooperativeTask} object representing this task.
637    """
638    return _theCooperator.cooperate(iterator)
639
640
641
642
643class Clock:
644    """
645    Provide a deterministic, easily-controlled implementation of
646    L{IReactorTime.callLater}.  This is commonly useful for writing
647    deterministic unit tests for code which schedules events using this API.
648    """
649    implements(IReactorTime)
650
651    rightNow = 0.0
652
653    def __init__(self):
654        self.calls = []
655
656    def seconds(self):
657        """
658        Pretend to be time.time().  This is used internally when an operation
659        such as L{IDelayedCall.reset} needs to determine a a time value
660        relative to the current time.
661
662        @rtype: C{float}
663        @return: The time which should be considered the current time.
664        """
665        return self.rightNow
666
667
668    def callLater(self, when, what, *a, **kw):
669        """
670        See L{twisted.internet.interfaces.IReactorTime.callLater}.
671        """
672        dc = base.DelayedCall(self.seconds() + when,
673                               what, a, kw,
674                               self.calls.remove,
675                               lambda c: None,
676                               self.seconds)
677        self.calls.append(dc)
678        self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime()))
679        return dc
680
681    def getDelayedCalls(self):
682        """
683        See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls}
684        """
685        return self.calls
686
687    def advance(self, amount):
688        """
689        Move time on this clock forward by the given amount and run whatever
690        pending calls should be run.
691
692        @type amount: C{float}
693        @param amount: The number of seconds which to advance this clock's
694        time.
695        """
696        self.rightNow += amount
697        while self.calls and self.calls[0].getTime() <= self.seconds():
698            call = self.calls.pop(0)
699            call.called = 1
700            call.func(*call.args, **call.kw)
701
702
703    def pump(self, timings):
704        """
705        Advance incrementally by the given set of times.
706
707        @type timings: iterable of C{float}
708        """
709        for amount in timings:
710            self.advance(amount)
711
712
713def deferLater(clock, delay, callable, *args, **kw):
714    """
715    Call the given function after a certain period of time has passed.
716
717    @type clock: L{IReactorTime} provider
718    @param clock: The object which will be used to schedule the delayed
719        call.
720
721    @type delay: C{float} or C{int}
722    @param delay: The number of seconds to wait before calling the function.
723
724    @param callable: The object to call after the delay.
725
726    @param *args: The positional arguments to pass to C{callable}.
727
728    @param **kw: The keyword arguments to pass to C{callable}.
729
730    @rtype: L{defer.Deferred}
731
732    @return: A deferred that fires with the result of the callable when the
733        specified time has elapsed.
734    """
735    def deferLaterCancel(deferred):
736        delayedCall.cancel()
737    d = defer.Deferred(deferLaterCancel)
738    d.addCallback(lambda ignored: callable(*args, **kw))
739    delayedCall = clock.callLater(delay, d.callback, None)
740    return d
741
742
743
744__all__ = [
745    'LoopingCall',
746
747    'Clock',
748
749    'SchedulerStopped', 'Cooperator', 'coiterate',
750
751    'deferLater',
752    ]
Note: See TracBrowser for help on using the browser.