| 1 | # -*- test-case-name: twisted.test.test_task,twisted.test.test_cooperator -*- |
|---|
| 2 | # Copyright (c) Twisted Matrix Laboratories. |
|---|
| 3 | # See LICENSE for details. |
|---|
| 4 | |
|---|
| 5 | """ |
|---|
| 6 | Scheduling utility methods and classes. |
|---|
| 7 | |
|---|
| 8 | @author: Jp Calderone |
|---|
| 9 | """ |
|---|
| 10 | |
|---|
| 11 | __metaclass__ = type |
|---|
| 12 | |
|---|
| 13 | import time |
|---|
| 14 | |
|---|
| 15 | from zope.interface import implements |
|---|
| 16 | |
|---|
| 17 | from twisted.python import reflect |
|---|
| 18 | from twisted.python.failure import Failure |
|---|
| 19 | |
|---|
| 20 | from twisted.internet import base, defer |
|---|
| 21 | from twisted.internet.interfaces import IReactorTime |
|---|
| 22 | |
|---|
| 23 | |
|---|
| 24 | class LoopingCall: |
|---|
| 25 | """Call a function repeatedly. |
|---|
| 26 | |
|---|
| 27 | If C{f} returns a deferred, rescheduling will not take place until the |
|---|
| 28 | deferred has fired. The result value is ignored. |
|---|
| 29 | |
|---|
| 30 | @ivar f: The function to call. |
|---|
| 31 | @ivar a: A tuple of arguments to pass the function. |
|---|
| 32 | @ivar kw: A dictionary of keyword arguments to pass to the function. |
|---|
| 33 | @ivar clock: A provider of |
|---|
| 34 | L{twisted.internet.interfaces.IReactorTime}. The default is |
|---|
| 35 | L{twisted.internet.reactor}. Feel free to set this to |
|---|
| 36 | something else, but it probably ought to be set *before* |
|---|
| 37 | calling L{start}. |
|---|
| 38 | |
|---|
| 39 | @type running: C{bool} |
|---|
| 40 | @ivar running: A flag which is C{True} while C{f} is scheduled to be called |
|---|
| 41 | (or is currently being called). It is set to C{True} when L{start} is |
|---|
| 42 | called and set to C{False} when L{stop} is called or if C{f} raises an |
|---|
| 43 | exception. In either case, it will be C{False} by the time the |
|---|
| 44 | C{Deferred} returned by L{start} fires its callback or errback. |
|---|
| 45 | |
|---|
| 46 | @type _expectNextCallAt: C{float} |
|---|
| 47 | @ivar _expectNextCallAt: The time at which this instance most recently |
|---|
| 48 | scheduled itself to run. |
|---|
| 49 | |
|---|
| 50 | @type _realLastTime: C{float} |
|---|
| 51 | @ivar _realLastTime: When counting skips, the time at which the skip |
|---|
| 52 | counter was last invoked. |
|---|
| 53 | |
|---|
| 54 | @type _runAtStart: C{bool} |
|---|
| 55 | @ivar _runAtStart: A flag indicating whether the 'now' argument was passed |
|---|
| 56 | to L{LoopingCall.start}. |
|---|
| 57 | """ |
|---|
| 58 | |
|---|
| 59 | call = None |
|---|
| 60 | running = False |
|---|
| 61 | deferred = None |
|---|
| 62 | interval = None |
|---|
| 63 | _expectNextCallAt = 0.0 |
|---|
| 64 | _runAtStart = False |
|---|
| 65 | starttime = None |
|---|
| 66 | |
|---|
| 67 | def __init__(self, f, *a, **kw): |
|---|
| 68 | self.f = f |
|---|
| 69 | self.a = a |
|---|
| 70 | self.kw = kw |
|---|
| 71 | from twisted.internet import reactor |
|---|
| 72 | self.clock = reactor |
|---|
| 73 | |
|---|
| 74 | |
|---|
| 75 | def withCount(cls, countCallable): |
|---|
| 76 | """ |
|---|
| 77 | An alternate constructor for L{LoopingCall} that makes available the |
|---|
| 78 | number of calls which should have occurred since it was last invoked. |
|---|
| 79 | |
|---|
| 80 | Note that this number is an C{int} value; It represents the discrete |
|---|
| 81 | number of calls that should have been made. For example, if you are |
|---|
| 82 | using a looping call to display an animation with discrete frames, this |
|---|
| 83 | number would be the number of frames to advance. |
|---|
| 84 | |
|---|
| 85 | The count is normally 1, but can be higher. For example, if the reactor |
|---|
| 86 | is blocked and takes too long to invoke the L{LoopingCall}, a Deferred |
|---|
| 87 | returned from a previous call is not fired before an interval has |
|---|
| 88 | elapsed, or if the callable itself blocks for longer than an interval, |
|---|
| 89 | preventing I{itself} from being called. |
|---|
| 90 | |
|---|
| 91 | @param countCallable: A callable that will be invoked each time the |
|---|
| 92 | resulting LoopingCall is run, with an integer specifying the number |
|---|
| 93 | of calls that should have been invoked. |
|---|
| 94 | |
|---|
| 95 | @type countCallable: 1-argument callable which takes an C{int} |
|---|
| 96 | |
|---|
| 97 | @return: An instance of L{LoopingCall} with call counting enabled, |
|---|
| 98 | which provides the count as the first positional argument. |
|---|
| 99 | |
|---|
| 100 | @rtype: L{LoopingCall} |
|---|
| 101 | |
|---|
| 102 | @since: 9.0 |
|---|
| 103 | """ |
|---|
| 104 | |
|---|
| 105 | def counter(): |
|---|
| 106 | now = self.clock.seconds() |
|---|
| 107 | lastTime = self._realLastTime |
|---|
| 108 | if lastTime is None: |
|---|
| 109 | lastTime = self.starttime |
|---|
| 110 | if self._runAtStart: |
|---|
| 111 | lastTime -= self.interval |
|---|
| 112 | self._realLastTime = now |
|---|
| 113 | lastInterval = self._intervalOf(lastTime) |
|---|
| 114 | thisInterval = self._intervalOf(now) |
|---|
| 115 | count = thisInterval - lastInterval |
|---|
| 116 | return countCallable(count) |
|---|
| 117 | |
|---|
| 118 | self = cls(counter) |
|---|
| 119 | |
|---|
| 120 | self._realLastTime = None |
|---|
| 121 | |
|---|
| 122 | return self |
|---|
| 123 | |
|---|
| 124 | withCount = classmethod(withCount) |
|---|
| 125 | |
|---|
| 126 | |
|---|
| 127 | def _intervalOf(self, t): |
|---|
| 128 | """ |
|---|
| 129 | Determine the number of intervals passed as of the given point in |
|---|
| 130 | time. |
|---|
| 131 | |
|---|
| 132 | @param t: The specified time (from the start of the L{LoopingCall}) to |
|---|
| 133 | be measured in intervals |
|---|
| 134 | |
|---|
| 135 | @return: The C{int} number of intervals which have passed as of the |
|---|
| 136 | given point in time. |
|---|
| 137 | """ |
|---|
| 138 | elapsedTime = t - self.starttime |
|---|
| 139 | intervalNum = int(elapsedTime / self.interval) |
|---|
| 140 | return intervalNum |
|---|
| 141 | |
|---|
| 142 | |
|---|
| 143 | def start(self, interval, now=True): |
|---|
| 144 | """ |
|---|
| 145 | Start running function every interval seconds. |
|---|
| 146 | |
|---|
| 147 | @param interval: The number of seconds between calls. May be |
|---|
| 148 | less than one. Precision will depend on the underlying |
|---|
| 149 | platform, the available hardware, and the load on the system. |
|---|
| 150 | |
|---|
| 151 | @param now: If True, run this call right now. Otherwise, wait |
|---|
| 152 | until the interval has elapsed before beginning. |
|---|
| 153 | |
|---|
| 154 | @return: A Deferred whose callback will be invoked with |
|---|
| 155 | C{self} when C{self.stop} is called, or whose errback will be |
|---|
| 156 | invoked when the function raises an exception or returned a |
|---|
| 157 | deferred that has its errback invoked. |
|---|
| 158 | """ |
|---|
| 159 | assert not self.running, ("Tried to start an already running " |
|---|
| 160 | "LoopingCall.") |
|---|
| 161 | if interval < 0: |
|---|
| 162 | raise ValueError, "interval must be >= 0" |
|---|
| 163 | self.running = True |
|---|
| 164 | d = self.deferred = defer.Deferred() |
|---|
| 165 | self.starttime = self.clock.seconds() |
|---|
| 166 | self._expectNextCallAt = self.starttime |
|---|
| 167 | self.interval = interval |
|---|
| 168 | self._runAtStart = now |
|---|
| 169 | if now: |
|---|
| 170 | self() |
|---|
| 171 | else: |
|---|
| 172 | self._reschedule() |
|---|
| 173 | return d |
|---|
| 174 | |
|---|
| 175 | def stop(self): |
|---|
| 176 | """Stop running function. |
|---|
| 177 | """ |
|---|
| 178 | assert self.running, ("Tried to stop a LoopingCall that was " |
|---|
| 179 | "not running.") |
|---|
| 180 | self.running = False |
|---|
| 181 | if self.call is not None: |
|---|
| 182 | self.call.cancel() |
|---|
| 183 | self.call = None |
|---|
| 184 | d, self.deferred = self.deferred, None |
|---|
| 185 | d.callback(self) |
|---|
| 186 | |
|---|
| 187 | def reset(self): |
|---|
| 188 | """ |
|---|
| 189 | Skip the next iteration and reset the timer. |
|---|
| 190 | |
|---|
| 191 | @since: 11.1 |
|---|
| 192 | """ |
|---|
| 193 | assert self.running, ("Tried to reset a LoopingCall that was " |
|---|
| 194 | "not running.") |
|---|
| 195 | if self.call is not None: |
|---|
| 196 | self.call.cancel() |
|---|
| 197 | self.call = None |
|---|
| 198 | self._expectNextCallAt = self.clock.seconds() |
|---|
| 199 | self._reschedule() |
|---|
| 200 | |
|---|
| 201 | def __call__(self): |
|---|
| 202 | def cb(result): |
|---|
| 203 | if self.running: |
|---|
| 204 | self._reschedule() |
|---|
| 205 | else: |
|---|
| 206 | d, self.deferred = self.deferred, None |
|---|
| 207 | d.callback(self) |
|---|
| 208 | |
|---|
| 209 | def eb(failure): |
|---|
| 210 | self.running = False |
|---|
| 211 | d, self.deferred = self.deferred, None |
|---|
| 212 | d.errback(failure) |
|---|
| 213 | |
|---|
| 214 | self.call = None |
|---|
| 215 | d = defer.maybeDeferred(self.f, *self.a, **self.kw) |
|---|
| 216 | d.addCallback(cb) |
|---|
| 217 | d.addErrback(eb) |
|---|
| 218 | |
|---|
| 219 | |
|---|
| 220 | def _reschedule(self): |
|---|
| 221 | """ |
|---|
| 222 | Schedule the next iteration of this looping call. |
|---|
| 223 | """ |
|---|
| 224 | if self.interval == 0: |
|---|
| 225 | self.call = self.clock.callLater(0, self) |
|---|
| 226 | return |
|---|
| 227 | |
|---|
| 228 | currentTime = self.clock.seconds() |
|---|
| 229 | # Find how long is left until the interval comes around again. |
|---|
| 230 | untilNextTime = (self._expectNextCallAt - currentTime) % self.interval |
|---|
| 231 | # Make sure it is in the future, in case more than one interval worth |
|---|
| 232 | # of time passed since the previous call was made. |
|---|
| 233 | nextTime = max( |
|---|
| 234 | self._expectNextCallAt + self.interval, currentTime + untilNextTime) |
|---|
| 235 | # If the interval falls on the current time exactly, skip it and |
|---|
| 236 | # schedule the call for the next interval. |
|---|
| 237 | if nextTime == currentTime: |
|---|
| 238 | nextTime += self.interval |
|---|
| 239 | self._expectNextCallAt = nextTime |
|---|
| 240 | self.call = self.clock.callLater(nextTime - currentTime, self) |
|---|
| 241 | |
|---|
| 242 | |
|---|
| 243 | def __repr__(self): |
|---|
| 244 | if hasattr(self.f, 'func_name'): |
|---|
| 245 | func = self.f.func_name |
|---|
| 246 | if hasattr(self.f, 'im_class'): |
|---|
| 247 | func = self.f.im_class.__name__ + '.' + func |
|---|
| 248 | else: |
|---|
| 249 | func = reflect.safe_repr(self.f) |
|---|
| 250 | |
|---|
| 251 | return 'LoopingCall<%r>(%s, *%s, **%s)' % ( |
|---|
| 252 | self.interval, func, reflect.safe_repr(self.a), |
|---|
| 253 | reflect.safe_repr(self.kw)) |
|---|
| 254 | |
|---|
| 255 | |
|---|
| 256 | |
|---|
| 257 | class SchedulerError(Exception): |
|---|
| 258 | """ |
|---|
| 259 | The operation could not be completed because the scheduler or one of its |
|---|
| 260 | tasks was in an invalid state. This exception should not be raised |
|---|
| 261 | directly, but is a superclass of various scheduler-state-related |
|---|
| 262 | exceptions. |
|---|
| 263 | """ |
|---|
| 264 | |
|---|
| 265 | |
|---|
| 266 | |
|---|
| 267 | class SchedulerStopped(SchedulerError): |
|---|
| 268 | """ |
|---|
| 269 | The operation could not complete because the scheduler was stopped in |
|---|
| 270 | progress or was already stopped. |
|---|
| 271 | """ |
|---|
| 272 | |
|---|
| 273 | |
|---|
| 274 | |
|---|
| 275 | class TaskFinished(SchedulerError): |
|---|
| 276 | """ |
|---|
| 277 | The operation could not complete because the task was already completed, |
|---|
| 278 | stopped, encountered an error or otherwise permanently stopped running. |
|---|
| 279 | """ |
|---|
| 280 | |
|---|
| 281 | |
|---|
| 282 | |
|---|
| 283 | class TaskDone(TaskFinished): |
|---|
| 284 | """ |
|---|
| 285 | The operation could not complete because the task was already completed. |
|---|
| 286 | """ |
|---|
| 287 | |
|---|
| 288 | |
|---|
| 289 | |
|---|
| 290 | class TaskStopped(TaskFinished): |
|---|
| 291 | """ |
|---|
| 292 | The operation could not complete because the task was stopped. |
|---|
| 293 | """ |
|---|
| 294 | |
|---|
| 295 | |
|---|
| 296 | |
|---|
| 297 | class TaskFailed(TaskFinished): |
|---|
| 298 | """ |
|---|
| 299 | The operation could not complete because the task died with an unhandled |
|---|
| 300 | error. |
|---|
| 301 | """ |
|---|
| 302 | |
|---|
| 303 | |
|---|
| 304 | |
|---|
| 305 | class NotPaused(SchedulerError): |
|---|
| 306 | """ |
|---|
| 307 | This exception is raised when a task is resumed which was not previously |
|---|
| 308 | paused. |
|---|
| 309 | """ |
|---|
| 310 | |
|---|
| 311 | |
|---|
| 312 | |
|---|
| 313 | class _Timer(object): |
|---|
| 314 | MAX_SLICE = 0.01 |
|---|
| 315 | def __init__(self): |
|---|
| 316 | self.end = time.time() + self.MAX_SLICE |
|---|
| 317 | |
|---|
| 318 | |
|---|
| 319 | def __call__(self): |
|---|
| 320 | return time.time() >= self.end |
|---|
| 321 | |
|---|
| 322 | |
|---|
| 323 | |
|---|
| 324 | _EPSILON = 0.00000001 |
|---|
| 325 | def _defaultScheduler(x): |
|---|
| 326 | from twisted.internet import reactor |
|---|
| 327 | return reactor.callLater(_EPSILON, x) |
|---|
| 328 | |
|---|
| 329 | |
|---|
| 330 | class CooperativeTask(object): |
|---|
| 331 | """ |
|---|
| 332 | A L{CooperativeTask} is a task object inside a L{Cooperator}, which can be |
|---|
| 333 | paused, resumed, and stopped. It can also have its completion (or |
|---|
| 334 | termination) monitored. |
|---|
| 335 | |
|---|
| 336 | @see: L{CooperativeTask.cooperate} |
|---|
| 337 | |
|---|
| 338 | @ivar _iterator: the iterator to iterate when this L{CooperativeTask} is |
|---|
| 339 | asked to do work. |
|---|
| 340 | |
|---|
| 341 | @ivar _cooperator: the L{Cooperator} that this L{CooperativeTask} |
|---|
| 342 | participates in, which is used to re-insert it upon resume. |
|---|
| 343 | |
|---|
| 344 | @ivar _deferreds: the list of L{defer.Deferred}s to fire when this task |
|---|
| 345 | completes, fails, or finishes. |
|---|
| 346 | |
|---|
| 347 | @type _deferreds: L{list} |
|---|
| 348 | |
|---|
| 349 | @type _cooperator: L{Cooperator} |
|---|
| 350 | |
|---|
| 351 | @ivar _pauseCount: the number of times that this L{CooperativeTask} has |
|---|
| 352 | been paused; if 0, it is running. |
|---|
| 353 | |
|---|
| 354 | @type _pauseCount: L{int} |
|---|
| 355 | |
|---|
| 356 | @ivar _completionState: The completion-state of this L{CooperativeTask}. |
|---|
| 357 | C{None} if the task is not yet completed, an instance of L{TaskStopped} |
|---|
| 358 | if C{stop} was called to stop this task early, of L{TaskFailed} if the |
|---|
| 359 | application code in the iterator raised an exception which caused it to |
|---|
| 360 | terminate, and of L{TaskDone} if it terminated normally via raising |
|---|
| 361 | L{StopIteration}. |
|---|
| 362 | |
|---|
| 363 | @type _completionState: L{TaskFinished} |
|---|
| 364 | """ |
|---|
| 365 | |
|---|
| 366 | def __init__(self, iterator, cooperator): |
|---|
| 367 | """ |
|---|
| 368 | A private constructor: to create a new L{CooperativeTask}, see |
|---|
| 369 | L{Cooperator.cooperate}. |
|---|
| 370 | """ |
|---|
| 371 | self._iterator = iterator |
|---|
| 372 | self._cooperator = cooperator |
|---|
| 373 | self._deferreds = [] |
|---|
| 374 | self._pauseCount = 0 |
|---|
| 375 | self._completionState = None |
|---|
| 376 | self._completionResult = None |
|---|
| 377 | cooperator._addTask(self) |
|---|
| 378 | |
|---|
| 379 | |
|---|
| 380 | def whenDone(self): |
|---|
| 381 | """ |
|---|
| 382 | Get a L{defer.Deferred} notification of when this task is complete. |
|---|
| 383 | |
|---|
| 384 | @return: a L{defer.Deferred} that fires with the C{iterator} that this |
|---|
| 385 | L{CooperativeTask} was created with when the iterator has been |
|---|
| 386 | exhausted (i.e. its C{next} method has raised L{StopIteration}), or |
|---|
| 387 | fails with the exception raised by C{next} if it raises some other |
|---|
| 388 | exception. |
|---|
| 389 | |
|---|
| 390 | @rtype: L{defer.Deferred} |
|---|
| 391 | """ |
|---|
| 392 | d = defer.Deferred() |
|---|
| 393 | if self._completionState is None: |
|---|
| 394 | self._deferreds.append(d) |
|---|
| 395 | else: |
|---|
| 396 | d.callback(self._completionResult) |
|---|
| 397 | return d |
|---|
| 398 | |
|---|
| 399 | |
|---|
| 400 | def pause(self): |
|---|
| 401 | """ |
|---|
| 402 | Pause this L{CooperativeTask}. Stop doing work until |
|---|
| 403 | L{CooperativeTask.resume} is called. If C{pause} is called more than |
|---|
| 404 | once, C{resume} must be called an equal number of times to resume this |
|---|
| 405 | task. |
|---|
| 406 | |
|---|
| 407 | @raise TaskFinished: if this task has already finished or completed. |
|---|
| 408 | """ |
|---|
| 409 | self._checkFinish() |
|---|
| 410 | self._pauseCount += 1 |
|---|
| 411 | if self._pauseCount == 1: |
|---|
| 412 | self._cooperator._removeTask(self) |
|---|
| 413 | |
|---|
| 414 | |
|---|
| 415 | def resume(self): |
|---|
| 416 | """ |
|---|
| 417 | Resume processing of a paused L{CooperativeTask}. |
|---|
| 418 | |
|---|
| 419 | @raise NotPaused: if this L{CooperativeTask} is not paused. |
|---|
| 420 | """ |
|---|
| 421 | if self._pauseCount == 0: |
|---|
| 422 | raise NotPaused() |
|---|
| 423 | self._pauseCount -= 1 |
|---|
| 424 | if self._pauseCount == 0 and self._completionState is None: |
|---|
| 425 | self._cooperator._addTask(self) |
|---|
| 426 | |
|---|
| 427 | |
|---|
| 428 | def _completeWith(self, completionState, deferredResult): |
|---|
| 429 | """ |
|---|
| 430 | @param completionState: a L{TaskFinished} exception or a subclass |
|---|
| 431 | thereof, indicating what exception should be raised when subsequent |
|---|
| 432 | operations are performed. |
|---|
| 433 | |
|---|
| 434 | @param deferredResult: the result to fire all the deferreds with. |
|---|
| 435 | """ |
|---|
| 436 | self._completionState = completionState |
|---|
| 437 | self._completionResult = deferredResult |
|---|
| 438 | if not self._pauseCount: |
|---|
| 439 | self._cooperator._removeTask(self) |
|---|
| 440 | |
|---|
| 441 | # The Deferreds need to be invoked after all this is completed, because |
|---|
| 442 | # a Deferred may want to manipulate other tasks in a Cooperator. For |
|---|
| 443 | # example, if you call "stop()" on a cooperator in a callback on a |
|---|
| 444 | # Deferred returned from whenDone(), this CooperativeTask must be gone |
|---|
| 445 | # from the Cooperator by that point so that _completeWith is not |
|---|
| 446 | # invoked reentrantly; that would cause these Deferreds to blow up with |
|---|
| 447 | # an AlreadyCalledError, or the _removeTask to fail with a ValueError. |
|---|
| 448 | for d in self._deferreds: |
|---|
| 449 | d.callback(deferredResult) |
|---|
| 450 | |
|---|
| 451 | |
|---|
| 452 | def stop(self): |
|---|
| 453 | """ |
|---|
| 454 | Stop further processing of this task. |
|---|
| 455 | |
|---|
| 456 | @raise TaskFinished: if this L{CooperativeTask} has previously |
|---|
| 457 | completed, via C{stop}, completion, or failure. |
|---|
| 458 | """ |
|---|
| 459 | self._checkFinish() |
|---|
| 460 | self._completeWith(TaskStopped(), Failure(TaskStopped())) |
|---|
| 461 | |
|---|
| 462 | |
|---|
| 463 | def _checkFinish(self): |
|---|
| 464 | """ |
|---|
| 465 | If this task has been stopped, raise the appropriate subclass of |
|---|
| 466 | L{TaskFinished}. |
|---|
| 467 | """ |
|---|
| 468 | if self._completionState is not None: |
|---|
| 469 | raise self._completionState |
|---|
| 470 | |
|---|
| 471 | |
|---|
| 472 | def _oneWorkUnit(self): |
|---|
| 473 | """ |
|---|
| 474 | Perform one unit of work for this task, retrieving one item from its |
|---|
| 475 | iterator, stopping if there are no further items in the iterator, and |
|---|
| 476 | pausing if the result was a L{defer.Deferred}. |
|---|
| 477 | """ |
|---|
| 478 | try: |
|---|
| 479 | result = self._iterator.next() |
|---|
| 480 | except StopIteration: |
|---|
| 481 | self._completeWith(TaskDone(), self._iterator) |
|---|
| 482 | except: |
|---|
| 483 | self._completeWith(TaskFailed(), Failure()) |
|---|
| 484 | else: |
|---|
| 485 | if isinstance(result, defer.Deferred): |
|---|
| 486 | self.pause() |
|---|
| 487 | def failLater(f): |
|---|
| 488 | self._completeWith(TaskFailed(), f) |
|---|
| 489 | result.addCallbacks(lambda result: self.resume(), |
|---|
| 490 | failLater) |
|---|
| 491 | |
|---|
| 492 | |
|---|
| 493 | |
|---|
| 494 | class Cooperator(object): |
|---|
| 495 | """ |
|---|
| 496 | Cooperative task scheduler. |
|---|
| 497 | """ |
|---|
| 498 | |
|---|
| 499 | def __init__(self, |
|---|
| 500 | terminationPredicateFactory=_Timer, |
|---|
| 501 | scheduler=_defaultScheduler, |
|---|
| 502 | started=True): |
|---|
| 503 | """ |
|---|
| 504 | Create a scheduler-like object to which iterators may be added. |
|---|
| 505 | |
|---|
| 506 | @param terminationPredicateFactory: A no-argument callable which will |
|---|
| 507 | be invoked at the beginning of each step and should return a |
|---|
| 508 | no-argument callable which will return True when the step should be |
|---|
| 509 | terminated. The default factory is time-based and allows iterators to |
|---|
| 510 | run for 1/100th of a second at a time. |
|---|
| 511 | |
|---|
| 512 | @param scheduler: A one-argument callable which takes a no-argument |
|---|
| 513 | callable and should invoke it at some future point. This will be used |
|---|
| 514 | to schedule each step of this Cooperator. |
|---|
| 515 | |
|---|
| 516 | @param started: A boolean which indicates whether iterators should be |
|---|
| 517 | stepped as soon as they are added, or if they will be queued up until |
|---|
| 518 | L{Cooperator.start} is called. |
|---|
| 519 | """ |
|---|
| 520 | self._tasks = [] |
|---|
| 521 | self._metarator = iter(()) |
|---|
| 522 | self._terminationPredicateFactory = terminationPredicateFactory |
|---|
| 523 | self._scheduler = scheduler |
|---|
| 524 | self._delayedCall = None |
|---|
| 525 | self._stopped = False |
|---|
| 526 | self._started = started |
|---|
| 527 | |
|---|
| 528 | |
|---|
| 529 | def coiterate(self, iterator, doneDeferred=None): |
|---|
| 530 | """ |
|---|
| 531 | Add an iterator to the list of iterators this L{Cooperator} is |
|---|
| 532 | currently running. |
|---|
| 533 | |
|---|
| 534 | @param doneDeferred: If specified, this will be the Deferred used as |
|---|
| 535 | the completion deferred. It is suggested that you use the default, |
|---|
| 536 | which creates a new Deferred for you. |
|---|
| 537 | |
|---|
| 538 | @return: a Deferred that will fire when the iterator finishes. |
|---|
| 539 | """ |
|---|
| 540 | if doneDeferred is None: |
|---|
| 541 | doneDeferred = defer.Deferred() |
|---|
| 542 | CooperativeTask(iterator, self).whenDone().chainDeferred(doneDeferred) |
|---|
| 543 | return doneDeferred |
|---|
| 544 | |
|---|
| 545 | |
|---|
| 546 | def cooperate(self, iterator): |
|---|
| 547 | """ |
|---|
| 548 | Start running the given iterator as a long-running cooperative task, by |
|---|
| 549 | calling next() on it as a periodic timed event. |
|---|
| 550 | |
|---|
| 551 | @param iterator: the iterator to invoke. |
|---|
| 552 | |
|---|
| 553 | @return: a L{CooperativeTask} object representing this task. |
|---|
| 554 | """ |
|---|
| 555 | return CooperativeTask(iterator, self) |
|---|
| 556 | |
|---|
| 557 | |
|---|
| 558 | def _addTask(self, task): |
|---|
| 559 | """ |
|---|
| 560 | Add a L{CooperativeTask} object to this L{Cooperator}. |
|---|
| 561 | """ |
|---|
| 562 | if self._stopped: |
|---|
| 563 | self._tasks.append(task) # XXX silly, I know, but _completeWith |
|---|
| 564 | # does the inverse |
|---|
| 565 | task._completeWith(SchedulerStopped(), Failure(SchedulerStopped())) |
|---|
| 566 | else: |
|---|
| 567 | self._tasks.append(task) |
|---|
| 568 | self._reschedule() |
|---|
| 569 | |
|---|
| 570 | |
|---|
| 571 | def _removeTask(self, task): |
|---|
| 572 | """ |
|---|
| 573 | Remove a L{CooperativeTask} from this L{Cooperator}. |
|---|
| 574 | """ |
|---|
| 575 | self._tasks.remove(task) |
|---|
| 576 | # If no work left to do, cancel the delayed call: |
|---|
| 577 | if not self._tasks and self._delayedCall: |
|---|
| 578 | self._delayedCall.cancel() |
|---|
| 579 | self._delayedCall = None |
|---|
| 580 | |
|---|
| 581 | |
|---|
| 582 | def _tasksWhileNotStopped(self): |
|---|
| 583 | """ |
|---|
| 584 | Yield all L{CooperativeTask} objects in a loop as long as this |
|---|
| 585 | L{Cooperator}'s termination condition has not been met. |
|---|
| 586 | """ |
|---|
| 587 | terminator = self._terminationPredicateFactory() |
|---|
| 588 | while self._tasks: |
|---|
| 589 | for t in self._metarator: |
|---|
| 590 | yield t |
|---|
| 591 | if terminator(): |
|---|
| 592 | return |
|---|
| 593 | self._metarator = iter(self._tasks) |
|---|
| 594 | |
|---|
| 595 | |
|---|
| 596 | def _tick(self): |
|---|
| 597 | """ |
|---|
| 598 | Run one scheduler tick. |
|---|
| 599 | """ |
|---|
| 600 | self._delayedCall = None |
|---|
| 601 | for taskObj in self._tasksWhileNotStopped(): |
|---|
| 602 | taskObj._oneWorkUnit() |
|---|
| 603 | self._reschedule() |
|---|
| 604 | |
|---|
| 605 | |
|---|
| 606 | _mustScheduleOnStart = False |
|---|
| 607 | def _reschedule(self): |
|---|
| 608 | if not self._started: |
|---|
| 609 | self._mustScheduleOnStart = True |
|---|
| 610 | return |
|---|
| 611 | if self._delayedCall is None and self._tasks: |
|---|
| 612 | self._delayedCall = self._scheduler(self._tick) |
|---|
| 613 | |
|---|
| 614 | |
|---|
| 615 | def start(self): |
|---|
| 616 | """ |
|---|
| 617 | Begin scheduling steps. |
|---|
| 618 | """ |
|---|
| 619 | self._stopped = False |
|---|
| 620 | self._started = True |
|---|
| 621 | if self._mustScheduleOnStart: |
|---|
| 622 | del self._mustScheduleOnStart |
|---|
| 623 | self._reschedule() |
|---|
| 624 | |
|---|
| 625 | |
|---|
| 626 | def stop(self): |
|---|
| 627 | """ |
|---|
| 628 | Stop scheduling steps. Errback the completion Deferreds of all |
|---|
| 629 | iterators which have been added and forget about them. |
|---|
| 630 | """ |
|---|
| 631 | self._stopped = True |
|---|
| 632 | for taskObj in self._tasks: |
|---|
| 633 | taskObj._completeWith(SchedulerStopped(), |
|---|
| 634 | Failure(SchedulerStopped())) |
|---|
| 635 | self._tasks = [] |
|---|
| 636 | if self._delayedCall is not None: |
|---|
| 637 | self._delayedCall.cancel() |
|---|
| 638 | self._delayedCall = None |
|---|
| 639 | |
|---|
| 640 | |
|---|
| 641 | |
|---|
| 642 | _theCooperator = Cooperator() |
|---|
| 643 | |
|---|
| 644 | def coiterate(iterator): |
|---|
| 645 | """ |
|---|
| 646 | Cooperatively iterate over the given iterator, dividing runtime between it |
|---|
| 647 | and all other iterators which have been passed to this function and not yet |
|---|
| 648 | exhausted. |
|---|
| 649 | """ |
|---|
| 650 | return _theCooperator.coiterate(iterator) |
|---|
| 651 | |
|---|
| 652 | |
|---|
| 653 | |
|---|
| 654 | def cooperate(iterator): |
|---|
| 655 | """ |
|---|
| 656 | Start running the given iterator as a long-running cooperative task, by |
|---|
| 657 | calling next() on it as a periodic timed event. |
|---|
| 658 | |
|---|
| 659 | @param iterator: the iterator to invoke. |
|---|
| 660 | |
|---|
| 661 | @return: a L{CooperativeTask} object representing this task. |
|---|
| 662 | """ |
|---|
| 663 | return _theCooperator.cooperate(iterator) |
|---|
| 664 | |
|---|
| 665 | |
|---|
| 666 | |
|---|
| 667 | class Clock: |
|---|
| 668 | """ |
|---|
| 669 | Provide a deterministic, easily-controlled implementation of |
|---|
| 670 | L{IReactorTime.callLater}. This is commonly useful for writing |
|---|
| 671 | deterministic unit tests for code which schedules events using this API. |
|---|
| 672 | """ |
|---|
| 673 | implements(IReactorTime) |
|---|
| 674 | |
|---|
| 675 | rightNow = 0.0 |
|---|
| 676 | |
|---|
| 677 | def __init__(self): |
|---|
| 678 | self.calls = [] |
|---|
| 679 | |
|---|
| 680 | |
|---|
| 681 | def seconds(self): |
|---|
| 682 | """ |
|---|
| 683 | Pretend to be time.time(). This is used internally when an operation |
|---|
| 684 | such as L{IDelayedCall.reset} needs to determine a a time value |
|---|
| 685 | relative to the current time. |
|---|
| 686 | |
|---|
| 687 | @rtype: C{float} |
|---|
| 688 | @return: The time which should be considered the current time. |
|---|
| 689 | """ |
|---|
| 690 | return self.rightNow |
|---|
| 691 | |
|---|
| 692 | |
|---|
| 693 | def _sortCalls(self): |
|---|
| 694 | """ |
|---|
| 695 | Sort the pending calls according to the time they are scheduled. |
|---|
| 696 | """ |
|---|
| 697 | self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime())) |
|---|
| 698 | |
|---|
| 699 | |
|---|
| 700 | def callLater(self, when, what, *a, **kw): |
|---|
| 701 | """ |
|---|
| 702 | See L{twisted.internet.interfaces.IReactorTime.callLater}. |
|---|
| 703 | """ |
|---|
| 704 | dc = base.DelayedCall(self.seconds() + when, |
|---|
| 705 | what, a, kw, |
|---|
| 706 | self.calls.remove, |
|---|
| 707 | lambda c: None, |
|---|
| 708 | self.seconds) |
|---|
| 709 | self.calls.append(dc) |
|---|
| 710 | self._sortCalls() |
|---|
| 711 | return dc |
|---|
| 712 | |
|---|
| 713 | |
|---|
| 714 | def getDelayedCalls(self): |
|---|
| 715 | """ |
|---|
| 716 | See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls} |
|---|
| 717 | """ |
|---|
| 718 | return self.calls |
|---|
| 719 | |
|---|
| 720 | |
|---|
| 721 | def advance(self, amount): |
|---|
| 722 | """ |
|---|
| 723 | Move time on this clock forward by the given amount and run whatever |
|---|
| 724 | pending calls should be run. |
|---|
| 725 | |
|---|
| 726 | @type amount: C{float} |
|---|
| 727 | @param amount: The number of seconds which to advance this clock's |
|---|
| 728 | time. |
|---|
| 729 | """ |
|---|
| 730 | self.rightNow += amount |
|---|
| 731 | self._sortCalls() |
|---|
| 732 | while self.calls and self.calls[0].getTime() <= self.seconds(): |
|---|
| 733 | call = self.calls.pop(0) |
|---|
| 734 | call.called = 1 |
|---|
| 735 | call.func(*call.args, **call.kw) |
|---|
| 736 | self._sortCalls() |
|---|
| 737 | |
|---|
| 738 | |
|---|
| 739 | def pump(self, timings): |
|---|
| 740 | """ |
|---|
| 741 | Advance incrementally by the given set of times. |
|---|
| 742 | |
|---|
| 743 | @type timings: iterable of C{float} |
|---|
| 744 | """ |
|---|
| 745 | for amount in timings: |
|---|
| 746 | self.advance(amount) |
|---|
| 747 | |
|---|
| 748 | |
|---|
| 749 | |
|---|
| 750 | def deferLater(clock, delay, callable, *args, **kw): |
|---|
| 751 | """ |
|---|
| 752 | Call the given function after a certain period of time has passed. |
|---|
| 753 | |
|---|
| 754 | @type clock: L{IReactorTime} provider |
|---|
| 755 | @param clock: The object which will be used to schedule the delayed |
|---|
| 756 | call. |
|---|
| 757 | |
|---|
| 758 | @type delay: C{float} or C{int} |
|---|
| 759 | @param delay: The number of seconds to wait before calling the function. |
|---|
| 760 | |
|---|
| 761 | @param callable: The object to call after the delay. |
|---|
| 762 | |
|---|
| 763 | @param *args: The positional arguments to pass to C{callable}. |
|---|
| 764 | |
|---|
| 765 | @param **kw: The keyword arguments to pass to C{callable}. |
|---|
| 766 | |
|---|
| 767 | @rtype: L{defer.Deferred} |
|---|
| 768 | |
|---|
| 769 | @return: A deferred that fires with the result of the callable when the |
|---|
| 770 | specified time has elapsed. |
|---|
| 771 | """ |
|---|
| 772 | def deferLaterCancel(deferred): |
|---|
| 773 | delayedCall.cancel() |
|---|
| 774 | d = defer.Deferred(deferLaterCancel) |
|---|
| 775 | d.addCallback(lambda ignored: callable(*args, **kw)) |
|---|
| 776 | delayedCall = clock.callLater(delay, d.callback, None) |
|---|
| 777 | return d |
|---|
| 778 | |
|---|
| 779 | |
|---|
| 780 | |
|---|
| 781 | __all__ = [ |
|---|
| 782 | 'LoopingCall', |
|---|
| 783 | |
|---|
| 784 | 'Clock', |
|---|
| 785 | |
|---|
| 786 | 'SchedulerStopped', 'Cooperator', 'coiterate', |
|---|
| 787 | |
|---|
| 788 | 'deferLater', |
|---|
| 789 | ] |
|---|