root / trunk / twisted / internet / task.py

Revision 27001, 20.1 kB (checked in by glyph, 3 weeks ago)

Add an API to pause, resume, and cancel individual tasks within a Cooperator.

This also adds an API to provide Deferreds to multiple consumers from an individual task.

The new API is in twisted.internet.task.cooperate and twisted.internet.task.CooperativeTask.

Author: glyph

Reviewer: therve, jml

Fixes #2712

Line 
1 # -*- test-case-name: twisted.test.test_task,twisted.test.test_cooperator -*-
2 # Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Scheduling utility methods and classes.
7
8 @author: Jp Calderone
9 """
10
11 __metaclass__ = type
12
13 import time
14
15 from zope.interface import implements
16
17 from twisted.python import reflect
18 from twisted.python.failure import Failure
19
20 from twisted.internet import base, defer
21 from twisted.internet.interfaces import IReactorTime
22
23
24 class LoopingCall:
25     """Call a function repeatedly.
26
27     If C{f} returns a deferred, rescheduling will not take place until the
28     deferred has fired. The result value is ignored.
29
30     @ivar f: The function to call.
31     @ivar a: A tuple of arguments to pass the function.
32     @ivar kw: A dictionary of keyword arguments to pass to the function.
33     @ivar clock: A provider of
34         L{twisted.internet.interfaces.IReactorTime}.  The default is
35         L{twisted.internet.reactor}. Feel free to set this to
36         something else, but it probably ought to be set *before*
37         calling L{start}.
38
39     @type _lastTime: C{float}
40     @ivar _lastTime: The time at which this instance most recently scheduled
41         itself to run.
42     """
43
44     call = None
45     running = False
46     deferred = None
47     interval = None
48     _lastTime = 0.0
49     starttime = None
50
51     def __init__(self, f, *a, **kw):
52         self.f = f
53         self.a = a
54         self.kw = kw
55         from twisted.internet import reactor
56         self.clock = reactor
57
58
59     def start(self, interval, now=True):
60         """Start running function every interval seconds.
61
62         @param interval: The number of seconds between calls.  May be
63         less than one.  Precision will depend on the underlying
64         platform, the available hardware, and the load on the system.
65
66         @param now: If True, run this call right now.  Otherwise, wait
67         until the interval has elapsed before beginning.
68
69         @return: A Deferred whose callback will be invoked with
70         C{self} when C{self.stop} is called, or whose errback will be
71         invoked when the function raises an exception or returned a
72         deferred that has its errback invoked.
73         """
74         assert not self.running, ("Tried to start an already running "
75                                   "LoopingCall.")
76         if interval < 0:
77             raise ValueError, "interval must be >= 0"
78         self.running = True
79         d = self.deferred = defer.Deferred()
80         self.starttime = self.clock.seconds()
81         self._lastTime = self.starttime
82         self.interval = interval
83         if now:
84             self()
85         else:
86             self._reschedule()
87         return d
88
89     def stop(self):
90         """Stop running function.
91         """
92         assert self.running, ("Tried to stop a LoopingCall that was "
93                               "not running.")
94         self.running = False
95         if self.call is not None:
96             self.call.cancel()
97             self.call = None
98             d, self.deferred = self.deferred, None
99             d.callback(self)
100
101     def __call__(self):
102         def cb(result):
103             if self.running:
104                 self._reschedule()
105             else:
106                 d, self.deferred = self.deferred, None
107                 d.callback(self)
108
109         def eb(failure):
110             self.running = False
111             d, self.deferred = self.deferred, None
112             d.errback(failure)
113
114         self.call = None
115         d = defer.maybeDeferred(self.f, *self.a, **self.kw)
116         d.addCallback(cb)
117         d.addErrback(eb)
118
119
120     def _reschedule(self):
121         """
122         Schedule the next iteration of this looping call.
123         """
124         if self.interval == 0:
125             self.call = self.clock.callLater(0, self)
126             return
127
128         currentTime = self.clock.seconds()
129         # Find how long is left until the interval comes around again.
130         untilNextTime = (self._lastTime - currentTime) % self.interval
131         # Make sure it is in the future, in case more than one interval worth
132         # of time passed since the previous call was made.
133         nextTime = max(
134             self._lastTime + self.interval, currentTime + untilNextTime)
135         # If the interval falls on the current time exactly, skip it and
136         # schedule the call for the next interval.
137         if nextTime == currentTime:
138             nextTime += self.interval
139         self._lastTime = nextTime
140         self.call = self.clock.callLater(nextTime - currentTime, self)
141
142
143     def __repr__(self):
144         if hasattr(self.f, 'func_name'):
145             func = self.f.func_name
146             if hasattr(self.f, 'im_class'):
147                 func = self.f.im_class.__name__ + '.' + func
148         else:
149             func = reflect.safe_repr(self.f)
150
151         return 'LoopingCall<%r>(%s, *%s, **%s)' % (
152             self.interval, func, reflect.safe_repr(self.a),
153             reflect.safe_repr(self.kw))
154
155
156
157 class SchedulerError(Exception):
158     """
159     The operation could not be completed because the scheduler or one of its
160     tasks was in an invalid state.  This exception should not be raised
161     directly, but is a superclass of various scheduler-state-related
162     exceptions.
163     """
164
165
166
167 class SchedulerStopped(SchedulerError):
168     """
169     The operation could not complete because the scheduler was stopped in
170     progress or was already stopped.
171     """
172
173
174
175 class TaskFinished(SchedulerError):
176     """
177     The operation could not complete because the task was already completed,
178     stopped, encountered an error or otherwise permanently stopped running.
179     """
180
181
182
183 class TaskDone(TaskFinished):
184     """
185     The operation could not complete because the task was already completed.
186     """
187
188
189
190 class TaskStopped(TaskFinished):
191     """
192     The operation could not complete because the task was stopped.
193     """
194
195
196
197 class TaskFailed(TaskFinished):
198     """
199     The operation could not complete because the task died with an unhandled
200     error.
201     """
202
203
204
205 class NotPaused(SchedulerError):
206     """
207     This exception is raised when a task is resumed which was not previously
208     paused.
209     """
210
211
212
213 class _Timer(object):
214     MAX_SLICE = 0.01
215     def __init__(self):
216         self.end = time.time() + self.MAX_SLICE
217
218
219     def __call__(self):
220         return time.time() >= self.end
221
222
223
224 _EPSILON = 0.00000001
225 def _defaultScheduler(x):
226     from twisted.internet import reactor
227     return reactor.callLater(_EPSILON, x)
228
229
230 class CooperativeTask(object):
231     """
232     A L{CooperativeTask} is a task object inside a L{Cooperator}, which can be
233     paused, resumed, and stopped.  It can also have its completion (or
234     termination) monitored.
235
236     @see: L{CooperativeTask.cooperate}
237
238     @ivar _iterator: the iterator to iterate when this L{CooperativeTask} is
239         asked to do work.
240
241     @ivar _cooperator: the L{Cooperator} that this L{CooperativeTask}
242         participates in, which is used to re-insert it upon resume.
243
244     @ivar _deferreds: the list of L{defer.Deferred}s to fire when this task
245         completes, fails, or finishes.
246
247     @type _deferreds: L{list}
248
249     @type _cooperator: L{Cooperator}
250
251     @ivar _pauseCount: the number of times that this L{CooperativeTask} has
252         been paused; if 0, it is running.
253
254     @type _pauseCount: L{int}
255
256     @ivar _completionState: The completion-state of this L{CooperativeTask}.
257         C{None} if the task is not yet completed, an instance of L{TaskStopped}
258         if C{stop} was called to stop this task early, of L{TaskFailed} if the
259         application code in the iterator raised an exception which caused it to
260         terminate, and of L{TaskDone} if it terminated normally via raising
261         L{StopIteration}.
262
263     @type _completionState: L{TaskFinished}
264     """
265
266     def __init__(self, iterator, cooperator):
267         """
268         A private constructor: to create a new L{CooperativeTask}, see
269         L{Cooperator.cooperate}.
270         """
271         self._iterator = iterator
272         self._cooperator = cooperator
273         self._deferreds = []
274         self._pauseCount = 0
275         self._completionState = None
276         self._completionResult = None
277         cooperator._addTask(self)
278
279
280     def whenDone(self):
281         """
282         Get a L{defer.Deferred} notification of when this task is complete.
283
284         @return: a L{defer.Deferred} that fires with the C{iterator} that this
285             L{CooperativeTask} was created with when the iterator has been
286             exhausted (i.e. its C{next} method has raised L{StopIteration}), or
287             fails with the exception raised by C{next} if it raises some other
288             exception.
289
290         @rtype: L{defer.Deferred}
291         """
292         d = defer.Deferred()
293         if self._completionState is None:
294             self._deferreds.append(d)
295         else:
296             d.callback(self._completionResult)
297         return d
298
299
300     def pause(self):
301         """
302         Pause this L{CooperativeTask}.  Stop doing work until
303         L{CooperativeTask.resume} is called.  If C{pause} is called more than
304         once, C{resume} must be called an equal number of times to resume this
305         task.
306
307         @raise TaskFinished: if this task has already finished or completed.
308         """
309         self._checkFinish()
310         self._pauseCount += 1
311         if self._pauseCount == 1:
312             self._cooperator._removeTask(self)
313
314
315     def resume(self):
316         """
317         Resume processing of a paused L{CooperativeTask}.
318
319         @raise NotPaused: if this L{CooperativeTask} is not paused.
320         """
321         if self._pauseCount == 0:
322             raise NotPaused()
323         self._pauseCount -= 1
324         if self._pauseCount == 0 and self._completionState is None:
325             self._cooperator._addTask(self)
326
327
328     def _completeWith(self, completionState, deferredResult):
329         """
330         @param completionState: a L{TaskFinished} exception or a subclass
331             thereof, indicating what exception should be raised when subsequent
332             operations are performed.
333
334         @param deferredResult: the result to fire all the deferreds with.
335         """
336         self._completionState = completionState
337         self._completionResult = deferredResult
338         if not self._pauseCount:
339             self._cooperator._removeTask(self)
340
341         # The Deferreds need to be invoked after all this is completed, because
342         # a Deferred may want to manipulate other tasks in a Cooperator.  For
343         # example, if you call "stop()" on a cooperator in a callback on a
344         # Deferred returned from whenDone(), this CooperativeTask must be gone
345         # from the Cooperator by that point so that _completeWith is not
346         # invoked reentrantly; that would cause these Deferreds to blow up with
347         # an AlreadyCalledError, or the _removeTask to fail with a ValueError.
348         for d in self._deferreds:
349             d.callback(deferredResult)
350
351
352     def stop(self):
353         """
354         Stop further processing of this task.
355
356         @raise TaskFinished: if this L{CooperativeTask} has previously
357             completed, via C{stop}, completion, or failure.
358         """
359         self._checkFinish()
360         self._completeWith(TaskStopped(), Failure(TaskStopped()))
361
362
363     def _checkFinish(self):
364         """
365         If this task has been stopped, raise the appropriate subclass of
366         L{TaskFinished}.
367         """
368         if self._completionState is not None:
369             raise self._completionState
370
371
372     def _oneWorkUnit(self):
373         """
374         Perform one unit of work for this task, retrieving one item from its
375         iterator, stopping if there are no further items in the iterator, and
376         pausing if the result was a L{defer.Deferred}.
377         """
378         try:
379             result = self._iterator.next()
380         except StopIteration:
381             self._completeWith(TaskDone(), self._iterator)
382         except:
383             self._completeWith(TaskFailed(), Failure())
384         else:
385             if isinstance(result, defer.Deferred):
386                 self.pause()
387                 def failLater(f):
388                     self._completeWith(TaskFailed(), f)
389                 result.addCallbacks(lambda result: self.resume(),
390                                     failLater)
391
392
393
394 class Cooperator(object):
395     """
396     Cooperative task scheduler.
397     """
398
399     def __init__(self,
400                  terminationPredicateFactory=_Timer,
401                  scheduler=_defaultScheduler,
402                  started=True):
403         """
404         Create a scheduler-like object to which iterators may be added.
405
406         @param terminationPredicateFactory: A no-argument callable which will
407         be invoked at the beginning of each step and should return a
408         no-argument callable which will return False when the step should be
409         terminated.  The default factory is time-based and allows iterators to
410         run for 1/100th of a second at a time.
411
412         @param scheduler: A one-argument callable which takes a no-argument
413         callable and should invoke it at some future point.  This will be used
414         to schedule each step of this Cooperator.
415
416         @param started: A boolean which indicates whether iterators should be
417         stepped as soon as they are added, or if they will be queued up until
418         L{Cooperator.start} is called.
419         """
420         self._tasks = []
421         self._metarator = iter(())
422         self._terminationPredicateFactory = terminationPredicateFactory
423         self._scheduler = scheduler
424         self._delayedCall = None
425         self._stopped = False
426         self._started = started
427
428
429     def coiterate(self, iterator, doneDeferred=None):
430         """
431         Add an iterator to the list of iterators this L{Cooperator} is
432         currently running.
433
434         @param doneDeferred: If specified, this will be the Deferred used as
435             the completion deferred.  It is suggested that you use the default,
436             which creates a new Deferred for you.
437
438         @return: a Deferred that will fire when the iterator finishes.
439         """
440         if doneDeferred is None:
441             doneDeferred = defer.Deferred()
442         CooperativeTask(iterator, self).whenDone().chainDeferred(doneDeferred)
443         return doneDeferred
444
445
446     def cooperate(self, iterator):
447         """
448         Start running the given iterator as a long-running cooperative task, by
449         calling next() on it as a periodic timed event.
450
451         @param iterator: the iterator to invoke.
452
453         @return: a L{CooperativeTask} object representing this task.
454         """
455         return CooperativeTask(iterator, self)
456
457
458     def _addTask(self, task):
459         """
460         Add a L{CooperativeTask} object to this L{Cooperator}.
461         """
462         if self._stopped:
463             self._tasks.append(task) # XXX silly, I know, but _completeWith
464                                      # does the inverse
465             task._completeWith(SchedulerStopped(), Failure(SchedulerStopped()))
466         else:
467             self._tasks.append(task)
468             self._reschedule()
469
470
471     def _removeTask(self, task):
472         """
473         Remove a L{CooperativeTask} from this L{Cooperator}.
474         """
475         self._tasks.remove(task)
476
477
478     def _tasksWhileNotStopped(self):
479         """
480         Yield all L{CooperativeTask} objects in a loop as long as this
481         L{Cooperator}'s termination condition has not been met.
482         """
483         terminator = self._terminationPredicateFactory()
484         while self._tasks:
485             for t in self._metarator:
486                 yield t
487                 if terminator():
488                     return
489             self._metarator = iter(self._tasks)
490
491
492     def _tick(self):
493         """
494         Run one scheduler tick.
495         """
496         self._delayedCall = None
497         for taskObj in self._tasksWhileNotStopped():
498             taskObj._oneWorkUnit()
499         self._reschedule()
500
501
502     _mustScheduleOnStart = False
503     def _reschedule(self):
504         if not self._started:
505             self._mustScheduleOnStart = True
506             return
507         if self._delayedCall is None and self._tasks:
508             self._delayedCall = self._scheduler(self._tick)
509
510
511     def start(self):
512         """
513         Begin scheduling steps.
514         """
515         self._stopped = False
516         self._started = True
517         if self._mustScheduleOnStart:
518             del self._mustScheduleOnStart
519             self._reschedule()
520
521
522     def stop(self):
523         """
524         Stop scheduling steps.  Errback the completion Deferreds of all
525         iterators which have been added and forget about them.
526         """
527         self._stopped = True
528         for taskObj in self._tasks:
529             taskObj._completeWith(SchedulerStopped(),
530                                   Failure(SchedulerStopped()))
531         self._tasks = []
532         if self._delayedCall is not None:
533             self._delayedCall.cancel()
534             self._delayedCall = None
535
536
537
538 _theCooperator = Cooperator()
539
540 def coiterate(iterator):
541     """
542     Cooperatively iterate over the given iterator, dividing runtime between it
543     and all other iterators which have been passed to this function and not yet
544     exhausted.
545     """
546     return _theCooperator.coiterate(iterator)
547
548
549
550 def cooperate(iterator):
551     """
552     Start running the given iterator as a long-running cooperative task, by
553     calling next() on it as a periodic timed event.
554
555     @param iterator: the iterator to invoke.
556
557     @return: a L{CooperativeTask} object representing this task.
558     """
559     return _theCooperator.cooperate(iterator)
560
561
562
563
564 class Clock:
565     """
566     Provide a deterministic, easily-controlled implementation of
567     L{IReactorTime.callLater}.  This is commonly useful for writing
568     deterministic unit tests for code which schedules events using this API.
569     """
570     implements(IReactorTime)
571
572     rightNow = 0.0
573
574     def __init__(self):
575         self.calls = []
576
577     def seconds(self):
578         """
579         Pretend to be time.time().  This is used internally when an operation
580         such as L{IDelayedCall.reset} needs to determine a a time value
581         relative to the current time.
582
583         @rtype: C{float}
584         @return: The time which should be considered the current time.
585         """
586         return self.rightNow
587
588
589     def callLater(self, when, what, *a, **kw):
590         """
591         See L{twisted.internet.interfaces.IReactorTime.callLater}.
592         """
593         dc =  base.DelayedCall(self.seconds() + when,
594                                what, a, kw,
595                                self.calls.remove,
596                                lambda c: None,
597                                self.seconds)
598         self.calls.append(dc)
599         self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime()))
600         return dc
601
602     def getDelayedCalls(self):
603         """
604         See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls}
605         """
606         return self.calls
607
608     def advance(self, amount):
609         """
610         Move time on this clock forward by the given amount and run whatever
611         pending calls should be run.
612
613         @type amount: C{float}
614         @param amount: The number of seconds which to advance this clock's
615         time.
616         """
617         self.rightNow += amount
618         while self.calls and self.calls[0].getTime() <= self.seconds():
619             call = self.calls.pop(0)
620             call.called = 1
621             call.func(*call.args, **call.kw)
622
623
624     def pump(self, timings):
625         """
626         Advance incrementally by the given set of times.
627
628         @type timings: iterable of C{float}
629         """
630         for amount in timings:
631             self.advance(amount)
632
633
634 def deferLater(clock, delay, callable, *args, **kw):
635     """
636     Call the given function after a certain period of time has passed.
637
638     @type clock: L{IReactorTime} provider
639     @param clock: The object which will be used to schedule the delayed
640         call.
641
642     @type delay: C{float} or C{int}
643     @param delay: The number of seconds to wait before calling the function.
644
645     @param callable: The object to call after the delay.
646
647     @param *args: The positional arguments to pass to C{callable}.
648
649     @param **kw: The keyword arguments to pass to C{callable}.
650
651     @rtype: L{defer.Deferred}
652
653     @return: A deferred that fires with the result of the callable when the
654         specified time has elapsed.
655     """
656     d = defer.Deferred()
657     d.addCallback(lambda ignored: callable(*args, **kw))
658     clock.callLater(delay, d.callback, None)
659     return d
660
661
662
663 __all__ = [
664     'LoopingCall',
665
666     'Clock',
667
668     'SchedulerStopped', 'Cooperator', 'coiterate',
669
670     'deferLater',
671     ]
Note: See TracBrowser for help on using the browser.