| 1 | |
|---|
| 2 | |
|---|
| 3 | |
|---|
| 4 | |
|---|
| 5 | """ |
|---|
| 6 | Scheduling utility methods and classes. |
|---|
| 7 | |
|---|
| 8 | @author: Jp Calderone |
|---|
| 9 | """ |
|---|
| 10 | |
|---|
| 11 | __metaclass__ = type |
|---|
| 12 | |
|---|
| 13 | import time |
|---|
| 14 | |
|---|
| 15 | from zope.interface import implements |
|---|
| 16 | |
|---|
| 17 | from twisted.python import reflect |
|---|
| 18 | from twisted.python.failure import Failure |
|---|
| 19 | |
|---|
| 20 | from twisted.internet import base, defer |
|---|
| 21 | from twisted.internet.interfaces import IReactorTime |
|---|
| 22 | |
|---|
| 23 | |
|---|
| 24 | class LoopingCall: |
|---|
| 25 | """Call a function repeatedly. |
|---|
| 26 | |
|---|
| 27 | If C{f} returns a deferred, rescheduling will not take place until the |
|---|
| 28 | deferred has fired. The result value is ignored. |
|---|
| 29 | |
|---|
| 30 | @ivar f: The function to call. |
|---|
| 31 | @ivar a: A tuple of arguments to pass the function. |
|---|
| 32 | @ivar kw: A dictionary of keyword arguments to pass to the function. |
|---|
| 33 | @ivar clock: A provider of |
|---|
| 34 | L{twisted.internet.interfaces.IReactorTime}. The default is |
|---|
| 35 | L{twisted.internet.reactor}. Feel free to set this to |
|---|
| 36 | something else, but it probably ought to be set *before* |
|---|
| 37 | calling L{start}. |
|---|
| 38 | |
|---|
| 39 | @type _expectNextCallAt: C{float} |
|---|
| 40 | @ivar _expectNextCallAt: The time at which this instance most recently |
|---|
| 41 | scheduled itself to run. |
|---|
| 42 | |
|---|
| 43 | @type _realLastTime: C{float} |
|---|
| 44 | @ivar _realLastTime: When counting skips, the time at which the skip counter |
|---|
| 45 | was last invoked. |
|---|
| 46 | |
|---|
| 47 | @type _runAtStart: C{bool} |
|---|
| 48 | @ivar _runAtStart: A flag indicating whether the 'now' argument was passed |
|---|
| 49 | to L{LoopingCall.start}. |
|---|
| 50 | """ |
|---|
| 51 | |
|---|
| 52 | call = None |
|---|
| 53 | running = False |
|---|
| 54 | deferred = None |
|---|
| 55 | interval = None |
|---|
| 56 | _expectNextCallAt = 0.0 |
|---|
| 57 | _runAtStart = False |
|---|
| 58 | starttime = None |
|---|
| 59 | |
|---|
| 60 | def __init__(self, f, *a, **kw): |
|---|
| 61 | self.f = f |
|---|
| 62 | self.a = a |
|---|
| 63 | self.kw = kw |
|---|
| 64 | from twisted.internet import reactor |
|---|
| 65 | self.clock = reactor |
|---|
| 66 | |
|---|
| 67 | |
|---|
| 68 | def withCount(cls, countCallable): |
|---|
| 69 | """ |
|---|
| 70 | An alternate constructor for L{LoopingCall} that makes available the |
|---|
| 71 | number of calls which should have occurred since it was last invoked. |
|---|
| 72 | |
|---|
| 73 | Note that this number is an C{int} value; It represents the discrete |
|---|
| 74 | number of calls that should have been made. For example, if you are |
|---|
| 75 | using a looping call to display an animation with discrete frames, this |
|---|
| 76 | number would be the number of frames to advance. |
|---|
| 77 | |
|---|
| 78 | The count is normally 1, but can be higher. For example, if the reactor |
|---|
| 79 | is blocked and takes too long to invoke the L{LoopingCall}, a Deferred |
|---|
| 80 | returned from a previous call is not fired before an interval has |
|---|
| 81 | elapsed, or if the callable itself blocks for longer than an interval, |
|---|
| 82 | preventing I{itself} from being called. |
|---|
| 83 | |
|---|
| 84 | @param countCallable: A callable that will be invoked each time the |
|---|
| 85 | resulting LoopingCall is run, with an integer specifying the number |
|---|
| 86 | of calls that should have been invoked. |
|---|
| 87 | |
|---|
| 88 | @type countCallable: 1-argument callable which takes an C{int} |
|---|
| 89 | |
|---|
| 90 | @return: An instance of L{LoopingCall} with call counting enabled, |
|---|
| 91 | which provides the count as the first positional argument. |
|---|
| 92 | |
|---|
| 93 | @rtype: L{LoopingCall} |
|---|
| 94 | |
|---|
| 95 | @since: 9.0 |
|---|
| 96 | """ |
|---|
| 97 | |
|---|
| 98 | def counter(): |
|---|
| 99 | now = self.clock.seconds() |
|---|
| 100 | lastTime = self._realLastTime |
|---|
| 101 | if lastTime is None: |
|---|
| 102 | lastTime = self.starttime |
|---|
| 103 | if self._runAtStart: |
|---|
| 104 | lastTime -= self.interval |
|---|
| 105 | self._realLastTime = now |
|---|
| 106 | lastInterval = self._intervalOf(lastTime) |
|---|
| 107 | thisInterval = self._intervalOf(now) |
|---|
| 108 | count = thisInterval - lastInterval |
|---|
| 109 | return countCallable(count) |
|---|
| 110 | |
|---|
| 111 | self = cls(counter) |
|---|
| 112 | |
|---|
| 113 | self._realLastTime = None |
|---|
| 114 | |
|---|
| 115 | return self |
|---|
| 116 | |
|---|
| 117 | withCount = classmethod(withCount) |
|---|
| 118 | |
|---|
| 119 | |
|---|
| 120 | def _intervalOf(self, t): |
|---|
| 121 | """ |
|---|
| 122 | Determine the number of intervals passed as of the given point in |
|---|
| 123 | time. |
|---|
| 124 | |
|---|
| 125 | @param t: The specified time (from the start of the L{LoopingCall}) to |
|---|
| 126 | be measured in intervals |
|---|
| 127 | |
|---|
| 128 | @return: The C{int} number of intervals which have passed as of the |
|---|
| 129 | given point in time. |
|---|
| 130 | """ |
|---|
| 131 | elapsedTime = t - self.starttime |
|---|
| 132 | intervalNum = int(elapsedTime / self.interval) |
|---|
| 133 | return intervalNum |
|---|
| 134 | |
|---|
| 135 | |
|---|
| 136 | def start(self, interval, now=True): |
|---|
| 137 | """ |
|---|
| 138 | Start running function every interval seconds. |
|---|
| 139 | |
|---|
| 140 | @param interval: The number of seconds between calls. May be |
|---|
| 141 | less than one. Precision will depend on the underlying |
|---|
| 142 | platform, the available hardware, and the load on the system. |
|---|
| 143 | |
|---|
| 144 | @param now: If True, run this call right now. Otherwise, wait |
|---|
| 145 | until the interval has elapsed before beginning. |
|---|
| 146 | |
|---|
| 147 | @return: A Deferred whose callback will be invoked with |
|---|
| 148 | C{self} when C{self.stop} is called, or whose errback will be |
|---|
| 149 | invoked when the function raises an exception or returned a |
|---|
| 150 | deferred that has its errback invoked. |
|---|
| 151 | """ |
|---|
| 152 | assert not self.running, ("Tried to start an already running " |
|---|
| 153 | "LoopingCall.") |
|---|
| 154 | if interval < 0: |
|---|
| 155 | raise ValueError, "interval must be >= 0" |
|---|
| 156 | self.running = True |
|---|
| 157 | d = self.deferred = defer.Deferred() |
|---|
| 158 | self.starttime = self.clock.seconds() |
|---|
| 159 | self._expectNextCallAt = self.starttime |
|---|
| 160 | self.interval = interval |
|---|
| 161 | self._runAtStart = now |
|---|
| 162 | if now: |
|---|
| 163 | self() |
|---|
| 164 | else: |
|---|
| 165 | self._reschedule() |
|---|
| 166 | return d |
|---|
| 167 | |
|---|
| 168 | def stop(self): |
|---|
| 169 | """Stop running function. |
|---|
| 170 | """ |
|---|
| 171 | assert self.running, ("Tried to stop a LoopingCall that was " |
|---|
| 172 | "not running.") |
|---|
| 173 | self.running = False |
|---|
| 174 | if self.call is not None: |
|---|
| 175 | self.call.cancel() |
|---|
| 176 | self.call = None |
|---|
| 177 | d, self.deferred = self.deferred, None |
|---|
| 178 | d.callback(self) |
|---|
| 179 | |
|---|
| 180 | def __call__(self): |
|---|
| 181 | def cb(result): |
|---|
| 182 | if self.running: |
|---|
| 183 | self._reschedule() |
|---|
| 184 | else: |
|---|
| 185 | d, self.deferred = self.deferred, None |
|---|
| 186 | d.callback(self) |
|---|
| 187 | |
|---|
| 188 | def eb(failure): |
|---|
| 189 | self.running = False |
|---|
| 190 | d, self.deferred = self.deferred, None |
|---|
| 191 | d.errback(failure) |
|---|
| 192 | |
|---|
| 193 | self.call = None |
|---|
| 194 | d = defer.maybeDeferred(self.f, *self.a, **self.kw) |
|---|
| 195 | d.addCallback(cb) |
|---|
| 196 | d.addErrback(eb) |
|---|
| 197 | |
|---|
| 198 | |
|---|
| 199 | def _reschedule(self): |
|---|
| 200 | """ |
|---|
| 201 | Schedule the next iteration of this looping call. |
|---|
| 202 | """ |
|---|
| 203 | if self.interval == 0: |
|---|
| 204 | self.call = self.clock.callLater(0, self) |
|---|
| 205 | return |
|---|
| 206 | |
|---|
| 207 | currentTime = self.clock.seconds() |
|---|
| 208 | |
|---|
| 209 | untilNextTime = (self._expectNextCallAt - currentTime) % self.interval |
|---|
| 210 | |
|---|
| 211 | |
|---|
| 212 | nextTime = max( |
|---|
| 213 | self._expectNextCallAt + self.interval, currentTime + untilNextTime) |
|---|
| 214 | |
|---|
| 215 | |
|---|
| 216 | if nextTime == currentTime: |
|---|
| 217 | nextTime += self.interval |
|---|
| 218 | self._expectNextCallAt = nextTime |
|---|
| 219 | self.call = self.clock.callLater(nextTime - currentTime, self) |
|---|
| 220 | |
|---|
| 221 | |
|---|
| 222 | def __repr__(self): |
|---|
| 223 | if hasattr(self.f, 'func_name'): |
|---|
| 224 | func = self.f.func_name |
|---|
| 225 | if hasattr(self.f, 'im_class'): |
|---|
| 226 | func = self.f.im_class.__name__ + '.' + func |
|---|
| 227 | else: |
|---|
| 228 | func = reflect.safe_repr(self.f) |
|---|
| 229 | |
|---|
| 230 | return 'LoopingCall<%r>(%s, *%s, **%s)' % ( |
|---|
| 231 | self.interval, func, reflect.safe_repr(self.a), |
|---|
| 232 | reflect.safe_repr(self.kw)) |
|---|
| 233 | |
|---|
| 234 | |
|---|
| 235 | |
|---|
| 236 | class SchedulerError(Exception): |
|---|
| 237 | """ |
|---|
| 238 | The operation could not be completed because the scheduler or one of its |
|---|
| 239 | tasks was in an invalid state. This exception should not be raised |
|---|
| 240 | directly, but is a superclass of various scheduler-state-related |
|---|
| 241 | exceptions. |
|---|
| 242 | """ |
|---|
| 243 | |
|---|
| 244 | |
|---|
| 245 | |
|---|
| 246 | class SchedulerStopped(SchedulerError): |
|---|
| 247 | """ |
|---|
| 248 | The operation could not complete because the scheduler was stopped in |
|---|
| 249 | progress or was already stopped. |
|---|
| 250 | """ |
|---|
| 251 | |
|---|
| 252 | |
|---|
| 253 | |
|---|
| 254 | class TaskFinished(SchedulerError): |
|---|
| 255 | """ |
|---|
| 256 | The operation could not complete because the task was already completed, |
|---|
| 257 | stopped, encountered an error or otherwise permanently stopped running. |
|---|
| 258 | """ |
|---|
| 259 | |
|---|
| 260 | |
|---|
| 261 | |
|---|
| 262 | class TaskDone(TaskFinished): |
|---|
| 263 | """ |
|---|
| 264 | The operation could not complete because the task was already completed. |
|---|
| 265 | """ |
|---|
| 266 | |
|---|
| 267 | |
|---|
| 268 | |
|---|
| 269 | class TaskStopped(TaskFinished): |
|---|
| 270 | """ |
|---|
| 271 | The operation could not complete because the task was stopped. |
|---|
| 272 | """ |
|---|
| 273 | |
|---|
| 274 | |
|---|
| 275 | |
|---|
| 276 | class TaskFailed(TaskFinished): |
|---|
| 277 | """ |
|---|
| 278 | The operation could not complete because the task died with an unhandled |
|---|
| 279 | error. |
|---|
| 280 | """ |
|---|
| 281 | |
|---|
| 282 | |
|---|
| 283 | |
|---|
| 284 | class NotPaused(SchedulerError): |
|---|
| 285 | """ |
|---|
| 286 | This exception is raised when a task is resumed which was not previously |
|---|
| 287 | paused. |
|---|
| 288 | """ |
|---|
| 289 | |
|---|
| 290 | |
|---|
| 291 | |
|---|
| 292 | class _Timer(object): |
|---|
| 293 | MAX_SLICE = 0.01 |
|---|
| 294 | def __init__(self): |
|---|
| 295 | self.end = time.time() + self.MAX_SLICE |
|---|
| 296 | |
|---|
| 297 | |
|---|
| 298 | def __call__(self): |
|---|
| 299 | return time.time() >= self.end |
|---|
| 300 | |
|---|
| 301 | |
|---|
| 302 | |
|---|
| 303 | _EPSILON = 0.00000001 |
|---|
| 304 | def _defaultScheduler(x): |
|---|
| 305 | from twisted.internet import reactor |
|---|
| 306 | return reactor.callLater(_EPSILON, x) |
|---|
| 307 | |
|---|
| 308 | |
|---|
| 309 | class CooperativeTask(object): |
|---|
| 310 | """ |
|---|
| 311 | A L{CooperativeTask} is a task object inside a L{Cooperator}, which can be |
|---|
| 312 | paused, resumed, and stopped. It can also have its completion (or |
|---|
| 313 | termination) monitored. |
|---|
| 314 | |
|---|
| 315 | @see: L{CooperativeTask.cooperate} |
|---|
| 316 | |
|---|
| 317 | @ivar _iterator: the iterator to iterate when this L{CooperativeTask} is |
|---|
| 318 | asked to do work. |
|---|
| 319 | |
|---|
| 320 | @ivar _cooperator: the L{Cooperator} that this L{CooperativeTask} |
|---|
| 321 | participates in, which is used to re-insert it upon resume. |
|---|
| 322 | |
|---|
| 323 | @ivar _deferreds: the list of L{defer.Deferred}s to fire when this task |
|---|
| 324 | completes, fails, or finishes. |
|---|
| 325 | |
|---|
| 326 | @type _deferreds: L{list} |
|---|
| 327 | |
|---|
| 328 | @type _cooperator: L{Cooperator} |
|---|
| 329 | |
|---|
| 330 | @ivar _pauseCount: the number of times that this L{CooperativeTask} has |
|---|
| 331 | been paused; if 0, it is running. |
|---|
| 332 | |
|---|
| 333 | @type _pauseCount: L{int} |
|---|
| 334 | |
|---|
| 335 | @ivar _completionState: The completion-state of this L{CooperativeTask}. |
|---|
| 336 | C{None} if the task is not yet completed, an instance of L{TaskStopped} |
|---|
| 337 | if C{stop} was called to stop this task early, of L{TaskFailed} if the |
|---|
| 338 | application code in the iterator raised an exception which caused it to |
|---|
| 339 | terminate, and of L{TaskDone} if it terminated normally via raising |
|---|
| 340 | L{StopIteration}. |
|---|
| 341 | |
|---|
| 342 | @type _completionState: L{TaskFinished} |
|---|
| 343 | """ |
|---|
| 344 | |
|---|
| 345 | def __init__(self, iterator, cooperator): |
|---|
| 346 | """ |
|---|
| 347 | A private constructor: to create a new L{CooperativeTask}, see |
|---|
| 348 | L{Cooperator.cooperate}. |
|---|
| 349 | """ |
|---|
| 350 | self._iterator = iterator |
|---|
| 351 | self._cooperator = cooperator |
|---|
| 352 | self._deferreds = [] |
|---|
| 353 | self._pauseCount = 0 |
|---|
| 354 | self._completionState = None |
|---|
| 355 | self._completionResult = None |
|---|
| 356 | cooperator._addTask(self) |
|---|
| 357 | |
|---|
| 358 | |
|---|
| 359 | def whenDone(self): |
|---|
| 360 | """ |
|---|
| 361 | Get a L{defer.Deferred} notification of when this task is complete. |
|---|
| 362 | |
|---|
| 363 | @return: a L{defer.Deferred} that fires with the C{iterator} that this |
|---|
| 364 | L{CooperativeTask} was created with when the iterator has been |
|---|
| 365 | exhausted (i.e. its C{next} method has raised L{StopIteration}), or |
|---|
| 366 | fails with the exception raised by C{next} if it raises some other |
|---|
| 367 | exception. |
|---|
| 368 | |
|---|
| 369 | @rtype: L{defer.Deferred} |
|---|
| 370 | """ |
|---|
| 371 | d = defer.Deferred() |
|---|
| 372 | if self._completionState is None: |
|---|
| 373 | self._deferreds.append(d) |
|---|
| 374 | else: |
|---|
| 375 | d.callback(self._completionResult) |
|---|
| 376 | return d |
|---|
| 377 | |
|---|
| 378 | |
|---|
| 379 | def pause(self): |
|---|
| 380 | """ |
|---|
| 381 | Pause this L{CooperativeTask}. Stop doing work until |
|---|
| 382 | L{CooperativeTask.resume} is called. If C{pause} is called more than |
|---|
| 383 | once, C{resume} must be called an equal number of times to resume this |
|---|
| 384 | task. |
|---|
| 385 | |
|---|
| 386 | @raise TaskFinished: if this task has already finished or completed. |
|---|
| 387 | """ |
|---|
| 388 | self._checkFinish() |
|---|
| 389 | self._pauseCount += 1 |
|---|
| 390 | if self._pauseCount == 1: |
|---|
| 391 | self._cooperator._removeTask(self) |
|---|
| 392 | |
|---|
| 393 | |
|---|
| 394 | def resume(self): |
|---|
| 395 | """ |
|---|
| 396 | Resume processing of a paused L{CooperativeTask}. |
|---|
| 397 | |
|---|
| 398 | @raise NotPaused: if this L{CooperativeTask} is not paused. |
|---|
| 399 | """ |
|---|
| 400 | if self._pauseCount == 0: |
|---|
| 401 | raise NotPaused() |
|---|
| 402 | self._pauseCount -= 1 |
|---|
| 403 | if self._pauseCount == 0 and self._completionState is None: |
|---|
| 404 | self._cooperator._addTask(self) |
|---|
| 405 | |
|---|
| 406 | |
|---|
| 407 | def _completeWith(self, completionState, deferredResult): |
|---|
| 408 | """ |
|---|
| 409 | @param completionState: a L{TaskFinished} exception or a subclass |
|---|
| 410 | thereof, indicating what exception should be raised when subsequent |
|---|
| 411 | operations are performed. |
|---|
| 412 | |
|---|
| 413 | @param deferredResult: the result to fire all the deferreds with. |
|---|
| 414 | """ |
|---|
| 415 | self._completionState = completionState |
|---|
| 416 | self._completionResult = deferredResult |
|---|
| 417 | if not self._pauseCount: |
|---|
| 418 | self._cooperator._removeTask(self) |
|---|
| 419 | |
|---|
| 420 | |
|---|
| 421 | |
|---|
| 422 | |
|---|
| 423 | |
|---|
| 424 | |
|---|
| 425 | |
|---|
| 426 | |
|---|
| 427 | for d in self._deferreds: |
|---|
| 428 | d.callback(deferredResult) |
|---|
| 429 | |
|---|
| 430 | |
|---|
| 431 | def stop(self): |
|---|
| 432 | """ |
|---|
| 433 | Stop further processing of this task. |
|---|
| 434 | |
|---|
| 435 | @raise TaskFinished: if this L{CooperativeTask} has previously |
|---|
| 436 | completed, via C{stop}, completion, or failure. |
|---|
| 437 | """ |
|---|
| 438 | self._checkFinish() |
|---|
| 439 | self._completeWith(TaskStopped(), Failure(TaskStopped())) |
|---|
| 440 | |
|---|
| 441 | |
|---|
| 442 | def _checkFinish(self): |
|---|
| 443 | """ |
|---|
| 444 | If this task has been stopped, raise the appropriate subclass of |
|---|
| 445 | L{TaskFinished}. |
|---|
| 446 | """ |
|---|
| 447 | if self._completionState is not None: |
|---|
| 448 | raise self._completionState |
|---|
| 449 | |
|---|
| 450 | |
|---|
| 451 | def _oneWorkUnit(self): |
|---|
| 452 | """ |
|---|
| 453 | Perform one unit of work for this task, retrieving one item from its |
|---|
| 454 | iterator, stopping if there are no further items in the iterator, and |
|---|
| 455 | pausing if the result was a L{defer.Deferred}. |
|---|
| 456 | """ |
|---|
| 457 | try: |
|---|
| 458 | result = self._iterator.next() |
|---|
| 459 | except StopIteration: |
|---|
| 460 | self._completeWith(TaskDone(), self._iterator) |
|---|
| 461 | except: |
|---|
| 462 | self._completeWith(TaskFailed(), Failure()) |
|---|
| 463 | else: |
|---|
| 464 | if isinstance(result, defer.Deferred): |
|---|
| 465 | self.pause() |
|---|
| 466 | def failLater(f): |
|---|
| 467 | self._completeWith(TaskFailed(), f) |
|---|
| 468 | result.addCallbacks(lambda result: self.resume(), |
|---|
| 469 | failLater) |
|---|
| 470 | |
|---|
| 471 | |
|---|
| 472 | |
|---|
| 473 | class Cooperator(object): |
|---|
| 474 | """ |
|---|
| 475 | Cooperative task scheduler. |
|---|
| 476 | """ |
|---|
| 477 | |
|---|
| 478 | def __init__(self, |
|---|
| 479 | terminationPredicateFactory=_Timer, |
|---|
| 480 | scheduler=_defaultScheduler, |
|---|
| 481 | started=True): |
|---|
| 482 | """ |
|---|
| 483 | Create a scheduler-like object to which iterators may be added. |
|---|
| 484 | |
|---|
| 485 | @param terminationPredicateFactory: A no-argument callable which will |
|---|
| 486 | be invoked at the beginning of each step and should return a |
|---|
| 487 | no-argument callable which will return False when the step should be |
|---|
| 488 | terminated. The default factory is time-based and allows iterators to |
|---|
| 489 | run for 1/100th of a second at a time. |
|---|
| 490 | |
|---|
| 491 | @param scheduler: A one-argument callable which takes a no-argument |
|---|
| 492 | callable and should invoke it at some future point. This will be used |
|---|
| 493 | to schedule each step of this Cooperator. |
|---|
| 494 | |
|---|
| 495 | @param started: A boolean which indicates whether iterators should be |
|---|
| 496 | stepped as soon as they are added, or if they will be queued up until |
|---|
| 497 | L{Cooperator.start} is called. |
|---|
| 498 | """ |
|---|
| 499 | self._tasks = [] |
|---|
| 500 | self._metarator = iter(()) |
|---|
| 501 | self._terminationPredicateFactory = terminationPredicateFactory |
|---|
| 502 | self._scheduler = scheduler |
|---|
| 503 | self._delayedCall = None |
|---|
| 504 | self._stopped = False |
|---|
| 505 | self._started = started |
|---|
| 506 | |
|---|
| 507 | |
|---|
| 508 | def coiterate(self, iterator, doneDeferred=None): |
|---|
| 509 | """ |
|---|
| 510 | Add an iterator to the list of iterators this L{Cooperator} is |
|---|
| 511 | currently running. |
|---|
| 512 | |
|---|
| 513 | @param doneDeferred: If specified, this will be the Deferred used as |
|---|
| 514 | the completion deferred. It is suggested that you use the default, |
|---|
| 515 | which creates a new Deferred for you. |
|---|
| 516 | |
|---|
| 517 | @return: a Deferred that will fire when the iterator finishes. |
|---|
| 518 | """ |
|---|
| 519 | if doneDeferred is None: |
|---|
| 520 | doneDeferred = defer.Deferred() |
|---|
| 521 | CooperativeTask(iterator, self).whenDone().chainDeferred(doneDeferred) |
|---|
| 522 | return doneDeferred |
|---|
| 523 | |
|---|
| 524 | |
|---|
| 525 | def cooperate(self, iterator): |
|---|
| 526 | """ |
|---|
| 527 | Start running the given iterator as a long-running cooperative task, by |
|---|
| 528 | calling next() on it as a periodic timed event. |
|---|
| 529 | |
|---|
| 530 | @param iterator: the iterator to invoke. |
|---|
| 531 | |
|---|
| 532 | @return: a L{CooperativeTask} object representing this task. |
|---|
| 533 | """ |
|---|
| 534 | return CooperativeTask(iterator, self) |
|---|
| 535 | |
|---|
| 536 | |
|---|
| 537 | def _addTask(self, task): |
|---|
| 538 | """ |
|---|
| 539 | Add a L{CooperativeTask} object to this L{Cooperator}. |
|---|
| 540 | """ |
|---|
| 541 | if self._stopped: |
|---|
| 542 | self._tasks.append(task) |
|---|
| 543 | |
|---|
| 544 | task._completeWith(SchedulerStopped(), Failure(SchedulerStopped())) |
|---|
| 545 | else: |
|---|
| 546 | self._tasks.append(task) |
|---|
| 547 | self._reschedule() |
|---|
| 548 | |
|---|
| 549 | |
|---|
| 550 | def _removeTask(self, task): |
|---|
| 551 | """ |
|---|
| 552 | Remove a L{CooperativeTask} from this L{Cooperator}. |
|---|
| 553 | """ |
|---|
| 554 | self._tasks.remove(task) |
|---|
| 555 | |
|---|
| 556 | |
|---|
| 557 | def _tasksWhileNotStopped(self): |
|---|
| 558 | """ |
|---|
| 559 | Yield all L{CooperativeTask} objects in a loop as long as this |
|---|
| 560 | L{Cooperator}'s termination condition has not been met. |
|---|
| 561 | """ |
|---|
| 562 | terminator = self._terminationPredicateFactory() |
|---|
| 563 | while self._tasks: |
|---|
| 564 | for t in self._metarator: |
|---|
| 565 | yield t |
|---|
| 566 | if terminator(): |
|---|
| 567 | return |
|---|
| 568 | self._metarator = iter(self._tasks) |
|---|
| 569 | |
|---|
| 570 | |
|---|
| 571 | def _tick(self): |
|---|
| 572 | """ |
|---|
| 573 | Run one scheduler tick. |
|---|
| 574 | """ |
|---|
| 575 | self._delayedCall = None |
|---|
| 576 | for taskObj in self._tasksWhileNotStopped(): |
|---|
| 577 | taskObj._oneWorkUnit() |
|---|
| 578 | self._reschedule() |
|---|
| 579 | |
|---|
| 580 | |
|---|
| 581 | _mustScheduleOnStart = False |
|---|
| 582 | def _reschedule(self): |
|---|
| 583 | if not self._started: |
|---|
| 584 | self._mustScheduleOnStart = True |
|---|
| 585 | return |
|---|
| 586 | if self._delayedCall is None and self._tasks: |
|---|
| 587 | self._delayedCall = self._scheduler(self._tick) |
|---|
| 588 | |
|---|
| 589 | |
|---|
| 590 | def start(self): |
|---|
| 591 | """ |
|---|
| 592 | Begin scheduling steps. |
|---|
| 593 | """ |
|---|
| 594 | self._stopped = False |
|---|
| 595 | self._started = True |
|---|
| 596 | if self._mustScheduleOnStart: |
|---|
| 597 | del self._mustScheduleOnStart |
|---|
| 598 | self._reschedule() |
|---|
| 599 | |
|---|
| 600 | |
|---|
| 601 | def stop(self): |
|---|
| 602 | """ |
|---|
| 603 | Stop scheduling steps. Errback the completion Deferreds of all |
|---|
| 604 | iterators which have been added and forget about them. |
|---|
| 605 | """ |
|---|
| 606 | self._stopped = True |
|---|
| 607 | for taskObj in self._tasks: |
|---|
| 608 | taskObj._completeWith(SchedulerStopped(), |
|---|
| 609 | Failure(SchedulerStopped())) |
|---|
| 610 | self._tasks = [] |
|---|
| 611 | if self._delayedCall is not None: |
|---|
| 612 | self._delayedCall.cancel() |
|---|
| 613 | self._delayedCall = None |
|---|
| 614 | |
|---|
| 615 | |
|---|
| 616 | |
|---|
| 617 | _theCooperator = Cooperator() |
|---|
| 618 | |
|---|
| 619 | def coiterate(iterator): |
|---|
| 620 | """ |
|---|
| 621 | Cooperatively iterate over the given iterator, dividing runtime between it |
|---|
| 622 | and all other iterators which have been passed to this function and not yet |
|---|
| 623 | exhausted. |
|---|
| 624 | """ |
|---|
| 625 | return _theCooperator.coiterate(iterator) |
|---|
| 626 | |
|---|
| 627 | |
|---|
| 628 | |
|---|
| 629 | def cooperate(iterator): |
|---|
| 630 | """ |
|---|
| 631 | Start running the given iterator as a long-running cooperative task, by |
|---|
| 632 | calling next() on it as a periodic timed event. |
|---|
| 633 | |
|---|
| 634 | @param iterator: the iterator to invoke. |
|---|
| 635 | |
|---|
| 636 | @return: a L{CooperativeTask} object representing this task. |
|---|
| 637 | """ |
|---|
| 638 | return _theCooperator.cooperate(iterator) |
|---|
| 639 | |
|---|
| 640 | |
|---|
| 641 | |
|---|
| 642 | |
|---|
| 643 | class Clock: |
|---|
| 644 | """ |
|---|
| 645 | Provide a deterministic, easily-controlled implementation of |
|---|
| 646 | L{IReactorTime.callLater}. This is commonly useful for writing |
|---|
| 647 | deterministic unit tests for code which schedules events using this API. |
|---|
| 648 | """ |
|---|
| 649 | implements(IReactorTime) |
|---|
| 650 | |
|---|
| 651 | rightNow = 0.0 |
|---|
| 652 | |
|---|
| 653 | def __init__(self): |
|---|
| 654 | self.calls = [] |
|---|
| 655 | |
|---|
| 656 | def seconds(self): |
|---|
| 657 | """ |
|---|
| 658 | Pretend to be time.time(). This is used internally when an operation |
|---|
| 659 | such as L{IDelayedCall.reset} needs to determine a a time value |
|---|
| 660 | relative to the current time. |
|---|
| 661 | |
|---|
| 662 | @rtype: C{float} |
|---|
| 663 | @return: The time which should be considered the current time. |
|---|
| 664 | """ |
|---|
| 665 | return self.rightNow |
|---|
| 666 | |
|---|
| 667 | |
|---|
| 668 | def callLater(self, when, what, *a, **kw): |
|---|
| 669 | """ |
|---|
| 670 | See L{twisted.internet.interfaces.IReactorTime.callLater}. |
|---|
| 671 | """ |
|---|
| 672 | dc = base.DelayedCall(self.seconds() + when, |
|---|
| 673 | what, a, kw, |
|---|
| 674 | self.calls.remove, |
|---|
| 675 | lambda c: None, |
|---|
| 676 | self.seconds) |
|---|
| 677 | self.calls.append(dc) |
|---|
| 678 | self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime())) |
|---|
| 679 | return dc |
|---|
| 680 | |
|---|
| 681 | def getDelayedCalls(self): |
|---|
| 682 | """ |
|---|
| 683 | See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls} |
|---|
| 684 | """ |
|---|
| 685 | return self.calls |
|---|
| 686 | |
|---|
| 687 | def advance(self, amount): |
|---|
| 688 | """ |
|---|
| 689 | Move time on this clock forward by the given amount and run whatever |
|---|
| 690 | pending calls should be run. |
|---|
| 691 | |
|---|
| 692 | @type amount: C{float} |
|---|
| 693 | @param amount: The number of seconds which to advance this clock's |
|---|
| 694 | time. |
|---|
| 695 | """ |
|---|
| 696 | self.rightNow += amount |
|---|
| 697 | while self.calls and self.calls[0].getTime() <= self.seconds(): |
|---|
| 698 | call = self.calls.pop(0) |
|---|
| 699 | call.called = 1 |
|---|
| 700 | call.func(*call.args, **call.kw) |
|---|
| 701 | |
|---|
| 702 | |
|---|
| 703 | def pump(self, timings): |
|---|
| 704 | """ |
|---|
| 705 | Advance incrementally by the given set of times. |
|---|
| 706 | |
|---|
| 707 | @type timings: iterable of C{float} |
|---|
| 708 | """ |
|---|
| 709 | for amount in timings: |
|---|
| 710 | self.advance(amount) |
|---|
| 711 | |
|---|
| 712 | |
|---|
| 713 | def deferLater(clock, delay, callable, *args, **kw): |
|---|
| 714 | """ |
|---|
| 715 | Call the given function after a certain period of time has passed. |
|---|
| 716 | |
|---|
| 717 | @type clock: L{IReactorTime} provider |
|---|
| 718 | @param clock: The object which will be used to schedule the delayed |
|---|
| 719 | call. |
|---|
| 720 | |
|---|
| 721 | @type delay: C{float} or C{int} |
|---|
| 722 | @param delay: The number of seconds to wait before calling the function. |
|---|
| 723 | |
|---|
| 724 | @param callable: The object to call after the delay. |
|---|
| 725 | |
|---|
| 726 | @param *args: The positional arguments to pass to C{callable}. |
|---|
| 727 | |
|---|
| 728 | @param **kw: The keyword arguments to pass to C{callable}. |
|---|
| 729 | |
|---|
| 730 | @rtype: L{defer.Deferred} |
|---|
| 731 | |
|---|
| 732 | @return: A deferred that fires with the result of the callable when the |
|---|
| 733 | specified time has elapsed. |
|---|
| 734 | """ |
|---|
| 735 | def deferLaterCancel(deferred): |
|---|
| 736 | delayedCall.cancel() |
|---|
| 737 | d = defer.Deferred(deferLaterCancel) |
|---|
| 738 | d.addCallback(lambda ignored: callable(*args, **kw)) |
|---|
| 739 | delayedCall = clock.callLater(delay, d.callback, None) |
|---|
| 740 | return d |
|---|
| 741 | |
|---|
| 742 | |
|---|
| 743 | |
|---|
| 744 | __all__ = [ |
|---|
| 745 | 'LoopingCall', |
|---|
| 746 | |
|---|
| 747 | 'Clock', |
|---|
| 748 | |
|---|
| 749 | 'SchedulerStopped', 'Cooperator', 'coiterate', |
|---|
| 750 | |
|---|
| 751 | 'deferLater', |
|---|
| 752 | ] |
|---|