| 1 |
|
|---|
| 2 |
|
|---|
| 3 |
|
|---|
| 4 |
|
|---|
| 5 |
""" |
|---|
| 6 |
Scheduling utility methods and classes. |
|---|
| 7 |
|
|---|
| 8 |
@author: Jp Calderone |
|---|
| 9 |
""" |
|---|
| 10 |
|
|---|
| 11 |
__metaclass__ = type |
|---|
| 12 |
|
|---|
| 13 |
import time |
|---|
| 14 |
|
|---|
| 15 |
from zope.interface import implements |
|---|
| 16 |
|
|---|
| 17 |
from twisted.python import reflect |
|---|
| 18 |
from twisted.python.failure import Failure |
|---|
| 19 |
|
|---|
| 20 |
from twisted.internet import base, defer |
|---|
| 21 |
from twisted.internet.interfaces import IReactorTime |
|---|
| 22 |
|
|---|
| 23 |
|
|---|
| 24 |
class LoopingCall: |
|---|
| 25 |
"""Call a function repeatedly. |
|---|
| 26 |
|
|---|
| 27 |
If C{f} returns a deferred, rescheduling will not take place until the |
|---|
| 28 |
deferred has fired. The result value is ignored. |
|---|
| 29 |
|
|---|
| 30 |
@ivar f: The function to call. |
|---|
| 31 |
@ivar a: A tuple of arguments to pass the function. |
|---|
| 32 |
@ivar kw: A dictionary of keyword arguments to pass to the function. |
|---|
| 33 |
@ivar clock: A provider of |
|---|
| 34 |
L{twisted.internet.interfaces.IReactorTime}. The default is |
|---|
| 35 |
L{twisted.internet.reactor}. Feel free to set this to |
|---|
| 36 |
something else, but it probably ought to be set *before* |
|---|
| 37 |
calling L{start}. |
|---|
| 38 |
|
|---|
| 39 |
@type _lastTime: C{float} |
|---|
| 40 |
@ivar _lastTime: The time at which this instance most recently scheduled |
|---|
| 41 |
itself to run. |
|---|
| 42 |
""" |
|---|
| 43 |
|
|---|
| 44 |
call = None |
|---|
| 45 |
running = False |
|---|
| 46 |
deferred = None |
|---|
| 47 |
interval = None |
|---|
| 48 |
_lastTime = 0.0 |
|---|
| 49 |
starttime = None |
|---|
| 50 |
|
|---|
| 51 |
def __init__(self, f, *a, **kw): |
|---|
| 52 |
self.f = f |
|---|
| 53 |
self.a = a |
|---|
| 54 |
self.kw = kw |
|---|
| 55 |
from twisted.internet import reactor |
|---|
| 56 |
self.clock = reactor |
|---|
| 57 |
|
|---|
| 58 |
|
|---|
| 59 |
def start(self, interval, now=True): |
|---|
| 60 |
"""Start running function every interval seconds. |
|---|
| 61 |
|
|---|
| 62 |
@param interval: The number of seconds between calls. May be |
|---|
| 63 |
less than one. Precision will depend on the underlying |
|---|
| 64 |
platform, the available hardware, and the load on the system. |
|---|
| 65 |
|
|---|
| 66 |
@param now: If True, run this call right now. Otherwise, wait |
|---|
| 67 |
until the interval has elapsed before beginning. |
|---|
| 68 |
|
|---|
| 69 |
@return: A Deferred whose callback will be invoked with |
|---|
| 70 |
C{self} when C{self.stop} is called, or whose errback will be |
|---|
| 71 |
invoked when the function raises an exception or returned a |
|---|
| 72 |
deferred that has its errback invoked. |
|---|
| 73 |
""" |
|---|
| 74 |
assert not self.running, ("Tried to start an already running " |
|---|
| 75 |
"LoopingCall.") |
|---|
| 76 |
if interval < 0: |
|---|
| 77 |
raise ValueError, "interval must be >= 0" |
|---|
| 78 |
self.running = True |
|---|
| 79 |
d = self.deferred = defer.Deferred() |
|---|
| 80 |
self.starttime = self.clock.seconds() |
|---|
| 81 |
self._lastTime = self.starttime |
|---|
| 82 |
self.interval = interval |
|---|
| 83 |
if now: |
|---|
| 84 |
self() |
|---|
| 85 |
else: |
|---|
| 86 |
self._reschedule() |
|---|
| 87 |
return d |
|---|
| 88 |
|
|---|
| 89 |
def stop(self): |
|---|
| 90 |
"""Stop running function. |
|---|
| 91 |
""" |
|---|
| 92 |
assert self.running, ("Tried to stop a LoopingCall that was " |
|---|
| 93 |
"not running.") |
|---|
| 94 |
self.running = False |
|---|
| 95 |
if self.call is not None: |
|---|
| 96 |
self.call.cancel() |
|---|
| 97 |
self.call = None |
|---|
| 98 |
d, self.deferred = self.deferred, None |
|---|
| 99 |
d.callback(self) |
|---|
| 100 |
|
|---|
| 101 |
def __call__(self): |
|---|
| 102 |
def cb(result): |
|---|
| 103 |
if self.running: |
|---|
| 104 |
self._reschedule() |
|---|
| 105 |
else: |
|---|
| 106 |
d, self.deferred = self.deferred, None |
|---|
| 107 |
d.callback(self) |
|---|
| 108 |
|
|---|
| 109 |
def eb(failure): |
|---|
| 110 |
self.running = False |
|---|
| 111 |
d, self.deferred = self.deferred, None |
|---|
| 112 |
d.errback(failure) |
|---|
| 113 |
|
|---|
| 114 |
self.call = None |
|---|
| 115 |
d = defer.maybeDeferred(self.f, *self.a, **self.kw) |
|---|
| 116 |
d.addCallback(cb) |
|---|
| 117 |
d.addErrback(eb) |
|---|
| 118 |
|
|---|
| 119 |
|
|---|
| 120 |
def _reschedule(self): |
|---|
| 121 |
""" |
|---|
| 122 |
Schedule the next iteration of this looping call. |
|---|
| 123 |
""" |
|---|
| 124 |
if self.interval == 0: |
|---|
| 125 |
self.call = self.clock.callLater(0, self) |
|---|
| 126 |
return |
|---|
| 127 |
|
|---|
| 128 |
currentTime = self.clock.seconds() |
|---|
| 129 |
|
|---|
| 130 |
untilNextTime = (self._lastTime - currentTime) % self.interval |
|---|
| 131 |
|
|---|
| 132 |
|
|---|
| 133 |
nextTime = max( |
|---|
| 134 |
self._lastTime + self.interval, currentTime + untilNextTime) |
|---|
| 135 |
|
|---|
| 136 |
|
|---|
| 137 |
if nextTime == currentTime: |
|---|
| 138 |
nextTime += self.interval |
|---|
| 139 |
self._lastTime = nextTime |
|---|
| 140 |
self.call = self.clock.callLater(nextTime - currentTime, self) |
|---|
| 141 |
|
|---|
| 142 |
|
|---|
| 143 |
def __repr__(self): |
|---|
| 144 |
if hasattr(self.f, 'func_name'): |
|---|
| 145 |
func = self.f.func_name |
|---|
| 146 |
if hasattr(self.f, 'im_class'): |
|---|
| 147 |
func = self.f.im_class.__name__ + '.' + func |
|---|
| 148 |
else: |
|---|
| 149 |
func = reflect.safe_repr(self.f) |
|---|
| 150 |
|
|---|
| 151 |
return 'LoopingCall<%r>(%s, *%s, **%s)' % ( |
|---|
| 152 |
self.interval, func, reflect.safe_repr(self.a), |
|---|
| 153 |
reflect.safe_repr(self.kw)) |
|---|
| 154 |
|
|---|
| 155 |
|
|---|
| 156 |
|
|---|
| 157 |
class SchedulerError(Exception): |
|---|
| 158 |
""" |
|---|
| 159 |
The operation could not be completed because the scheduler or one of its |
|---|
| 160 |
tasks was in an invalid state. This exception should not be raised |
|---|
| 161 |
directly, but is a superclass of various scheduler-state-related |
|---|
| 162 |
exceptions. |
|---|
| 163 |
""" |
|---|
| 164 |
|
|---|
| 165 |
|
|---|
| 166 |
|
|---|
| 167 |
class SchedulerStopped(SchedulerError): |
|---|
| 168 |
""" |
|---|
| 169 |
The operation could not complete because the scheduler was stopped in |
|---|
| 170 |
progress or was already stopped. |
|---|
| 171 |
""" |
|---|
| 172 |
|
|---|
| 173 |
|
|---|
| 174 |
|
|---|
| 175 |
class TaskFinished(SchedulerError): |
|---|
| 176 |
""" |
|---|
| 177 |
The operation could not complete because the task was already completed, |
|---|
| 178 |
stopped, encountered an error or otherwise permanently stopped running. |
|---|
| 179 |
""" |
|---|
| 180 |
|
|---|
| 181 |
|
|---|
| 182 |
|
|---|
| 183 |
class TaskDone(TaskFinished): |
|---|
| 184 |
""" |
|---|
| 185 |
The operation could not complete because the task was already completed. |
|---|
| 186 |
""" |
|---|
| 187 |
|
|---|
| 188 |
|
|---|
| 189 |
|
|---|
| 190 |
class TaskStopped(TaskFinished): |
|---|
| 191 |
""" |
|---|
| 192 |
The operation could not complete because the task was stopped. |
|---|
| 193 |
""" |
|---|
| 194 |
|
|---|
| 195 |
|
|---|
| 196 |
|
|---|
| 197 |
class TaskFailed(TaskFinished): |
|---|
| 198 |
""" |
|---|
| 199 |
The operation could not complete because the task died with an unhandled |
|---|
| 200 |
error. |
|---|
| 201 |
""" |
|---|
| 202 |
|
|---|
| 203 |
|
|---|
| 204 |
|
|---|
| 205 |
class NotPaused(SchedulerError): |
|---|
| 206 |
""" |
|---|
| 207 |
This exception is raised when a task is resumed which was not previously |
|---|
| 208 |
paused. |
|---|
| 209 |
""" |
|---|
| 210 |
|
|---|
| 211 |
|
|---|
| 212 |
|
|---|
| 213 |
class _Timer(object): |
|---|
| 214 |
MAX_SLICE = 0.01 |
|---|
| 215 |
def __init__(self): |
|---|
| 216 |
self.end = time.time() + self.MAX_SLICE |
|---|
| 217 |
|
|---|
| 218 |
|
|---|
| 219 |
def __call__(self): |
|---|
| 220 |
return time.time() >= self.end |
|---|
| 221 |
|
|---|
| 222 |
|
|---|
| 223 |
|
|---|
| 224 |
_EPSILON = 0.00000001 |
|---|
| 225 |
def _defaultScheduler(x): |
|---|
| 226 |
from twisted.internet import reactor |
|---|
| 227 |
return reactor.callLater(_EPSILON, x) |
|---|
| 228 |
|
|---|
| 229 |
|
|---|
| 230 |
class CooperativeTask(object): |
|---|
| 231 |
""" |
|---|
| 232 |
A L{CooperativeTask} is a task object inside a L{Cooperator}, which can be |
|---|
| 233 |
paused, resumed, and stopped. It can also have its completion (or |
|---|
| 234 |
termination) monitored. |
|---|
| 235 |
|
|---|
| 236 |
@see: L{CooperativeTask.cooperate} |
|---|
| 237 |
|
|---|
| 238 |
@ivar _iterator: the iterator to iterate when this L{CooperativeTask} is |
|---|
| 239 |
asked to do work. |
|---|
| 240 |
|
|---|
| 241 |
@ivar _cooperator: the L{Cooperator} that this L{CooperativeTask} |
|---|
| 242 |
participates in, which is used to re-insert it upon resume. |
|---|
| 243 |
|
|---|
| 244 |
@ivar _deferreds: the list of L{defer.Deferred}s to fire when this task |
|---|
| 245 |
completes, fails, or finishes. |
|---|
| 246 |
|
|---|
| 247 |
@type _deferreds: L{list} |
|---|
| 248 |
|
|---|
| 249 |
@type _cooperator: L{Cooperator} |
|---|
| 250 |
|
|---|
| 251 |
@ivar _pauseCount: the number of times that this L{CooperativeTask} has |
|---|
| 252 |
been paused; if 0, it is running. |
|---|
| 253 |
|
|---|
| 254 |
@type _pauseCount: L{int} |
|---|
| 255 |
|
|---|
| 256 |
@ivar _completionState: The completion-state of this L{CooperativeTask}. |
|---|
| 257 |
C{None} if the task is not yet completed, an instance of L{TaskStopped} |
|---|
| 258 |
if C{stop} was called to stop this task early, of L{TaskFailed} if the |
|---|
| 259 |
application code in the iterator raised an exception which caused it to |
|---|
| 260 |
terminate, and of L{TaskDone} if it terminated normally via raising |
|---|
| 261 |
L{StopIteration}. |
|---|
| 262 |
|
|---|
| 263 |
@type _completionState: L{TaskFinished} |
|---|
| 264 |
""" |
|---|
| 265 |
|
|---|
| 266 |
def __init__(self, iterator, cooperator): |
|---|
| 267 |
""" |
|---|
| 268 |
A private constructor: to create a new L{CooperativeTask}, see |
|---|
| 269 |
L{Cooperator.cooperate}. |
|---|
| 270 |
""" |
|---|
| 271 |
self._iterator = iterator |
|---|
| 272 |
self._cooperator = cooperator |
|---|
| 273 |
self._deferreds = [] |
|---|
| 274 |
self._pauseCount = 0 |
|---|
| 275 |
self._completionState = None |
|---|
| 276 |
self._completionResult = None |
|---|
| 277 |
cooperator._addTask(self) |
|---|
| 278 |
|
|---|
| 279 |
|
|---|
| 280 |
def whenDone(self): |
|---|
| 281 |
""" |
|---|
| 282 |
Get a L{defer.Deferred} notification of when this task is complete. |
|---|
| 283 |
|
|---|
| 284 |
@return: a L{defer.Deferred} that fires with the C{iterator} that this |
|---|
| 285 |
L{CooperativeTask} was created with when the iterator has been |
|---|
| 286 |
exhausted (i.e. its C{next} method has raised L{StopIteration}), or |
|---|
| 287 |
fails with the exception raised by C{next} if it raises some other |
|---|
| 288 |
exception. |
|---|
| 289 |
|
|---|
| 290 |
@rtype: L{defer.Deferred} |
|---|
| 291 |
""" |
|---|
| 292 |
d = defer.Deferred() |
|---|
| 293 |
if self._completionState is None: |
|---|
| 294 |
self._deferreds.append(d) |
|---|
| 295 |
else: |
|---|
| 296 |
d.callback(self._completionResult) |
|---|
| 297 |
return d |
|---|
| 298 |
|
|---|
| 299 |
|
|---|
| 300 |
def pause(self): |
|---|
| 301 |
""" |
|---|
| 302 |
Pause this L{CooperativeTask}. Stop doing work until |
|---|
| 303 |
L{CooperativeTask.resume} is called. If C{pause} is called more than |
|---|
| 304 |
once, C{resume} must be called an equal number of times to resume this |
|---|
| 305 |
task. |
|---|
| 306 |
|
|---|
| 307 |
@raise TaskFinished: if this task has already finished or completed. |
|---|
| 308 |
""" |
|---|
| 309 |
self._checkFinish() |
|---|
| 310 |
self._pauseCount += 1 |
|---|
| 311 |
if self._pauseCount == 1: |
|---|
| 312 |
self._cooperator._removeTask(self) |
|---|
| 313 |
|
|---|
| 314 |
|
|---|
| 315 |
def resume(self): |
|---|
| 316 |
""" |
|---|
| 317 |
Resume processing of a paused L{CooperativeTask}. |
|---|
| 318 |
|
|---|
| 319 |
@raise NotPaused: if this L{CooperativeTask} is not paused. |
|---|
| 320 |
""" |
|---|
| 321 |
if self._pauseCount == 0: |
|---|
| 322 |
raise NotPaused() |
|---|
| 323 |
self._pauseCount -= 1 |
|---|
| 324 |
if self._pauseCount == 0 and self._completionState is None: |
|---|
| 325 |
self._cooperator._addTask(self) |
|---|
| 326 |
|
|---|
| 327 |
|
|---|
| 328 |
def _completeWith(self, completionState, deferredResult): |
|---|
| 329 |
""" |
|---|
| 330 |
@param completionState: a L{TaskFinished} exception or a subclass |
|---|
| 331 |
thereof, indicating what exception should be raised when subsequent |
|---|
| 332 |
operations are performed. |
|---|
| 333 |
|
|---|
| 334 |
@param deferredResult: the result to fire all the deferreds with. |
|---|
| 335 |
""" |
|---|
| 336 |
self._completionState = completionState |
|---|
| 337 |
self._completionResult = deferredResult |
|---|
| 338 |
if not self._pauseCount: |
|---|
| 339 |
self._cooperator._removeTask(self) |
|---|
| 340 |
|
|---|
| 341 |
|
|---|
| 342 |
|
|---|
| 343 |
|
|---|
| 344 |
|
|---|
| 345 |
|
|---|
| 346 |
|
|---|
| 347 |
|
|---|
| 348 |
for d in self._deferreds: |
|---|
| 349 |
d.callback(deferredResult) |
|---|
| 350 |
|
|---|
| 351 |
|
|---|
| 352 |
def stop(self): |
|---|
| 353 |
""" |
|---|
| 354 |
Stop further processing of this task. |
|---|
| 355 |
|
|---|
| 356 |
@raise TaskFinished: if this L{CooperativeTask} has previously |
|---|
| 357 |
completed, via C{stop}, completion, or failure. |
|---|
| 358 |
""" |
|---|
| 359 |
self._checkFinish() |
|---|
| 360 |
self._completeWith(TaskStopped(), Failure(TaskStopped())) |
|---|
| 361 |
|
|---|
| 362 |
|
|---|
| 363 |
def _checkFinish(self): |
|---|
| 364 |
""" |
|---|
| 365 |
If this task has been stopped, raise the appropriate subclass of |
|---|
| 366 |
L{TaskFinished}. |
|---|
| 367 |
""" |
|---|
| 368 |
if self._completionState is not None: |
|---|
| 369 |
raise self._completionState |
|---|
| 370 |
|
|---|
| 371 |
|
|---|
| 372 |
def _oneWorkUnit(self): |
|---|
| 373 |
""" |
|---|
| 374 |
Perform one unit of work for this task, retrieving one item from its |
|---|
| 375 |
iterator, stopping if there are no further items in the iterator, and |
|---|
| 376 |
pausing if the result was a L{defer.Deferred}. |
|---|
| 377 |
""" |
|---|
| 378 |
try: |
|---|
| 379 |
result = self._iterator.next() |
|---|
| 380 |
except StopIteration: |
|---|
| 381 |
self._completeWith(TaskDone(), self._iterator) |
|---|
| 382 |
except: |
|---|
| 383 |
self._completeWith(TaskFailed(), Failure()) |
|---|
| 384 |
else: |
|---|
| 385 |
if isinstance(result, defer.Deferred): |
|---|
| 386 |
self.pause() |
|---|
| 387 |
def failLater(f): |
|---|
| 388 |
self._completeWith(TaskFailed(), f) |
|---|
| 389 |
result.addCallbacks(lambda result: self.resume(), |
|---|
| 390 |
failLater) |
|---|
| 391 |
|
|---|
| 392 |
|
|---|
| 393 |
|
|---|
| 394 |
class Cooperator(object): |
|---|
| 395 |
""" |
|---|
| 396 |
Cooperative task scheduler. |
|---|
| 397 |
""" |
|---|
| 398 |
|
|---|
| 399 |
def __init__(self, |
|---|
| 400 |
terminationPredicateFactory=_Timer, |
|---|
| 401 |
scheduler=_defaultScheduler, |
|---|
| 402 |
started=True): |
|---|
| 403 |
""" |
|---|
| 404 |
Create a scheduler-like object to which iterators may be added. |
|---|
| 405 |
|
|---|
| 406 |
@param terminationPredicateFactory: A no-argument callable which will |
|---|
| 407 |
be invoked at the beginning of each step and should return a |
|---|
| 408 |
no-argument callable which will return False when the step should be |
|---|
| 409 |
terminated. The default factory is time-based and allows iterators to |
|---|
| 410 |
run for 1/100th of a second at a time. |
|---|
| 411 |
|
|---|
| 412 |
@param scheduler: A one-argument callable which takes a no-argument |
|---|
| 413 |
callable and should invoke it at some future point. This will be used |
|---|
| 414 |
to schedule each step of this Cooperator. |
|---|
| 415 |
|
|---|
| 416 |
@param started: A boolean which indicates whether iterators should be |
|---|
| 417 |
stepped as soon as they are added, or if they will be queued up until |
|---|
| 418 |
L{Cooperator.start} is called. |
|---|
| 419 |
""" |
|---|
| 420 |
self._tasks = [] |
|---|
| 421 |
self._metarator = iter(()) |
|---|
| 422 |
self._terminationPredicateFactory = terminationPredicateFactory |
|---|
| 423 |
self._scheduler = scheduler |
|---|
| 424 |
self._delayedCall = None |
|---|
| 425 |
self._stopped = False |
|---|
| 426 |
self._started = started |
|---|
| 427 |
|
|---|
| 428 |
|
|---|
| 429 |
def coiterate(self, iterator, doneDeferred=None): |
|---|
| 430 |
""" |
|---|
| 431 |
Add an iterator to the list of iterators this L{Cooperator} is |
|---|
| 432 |
currently running. |
|---|
| 433 |
|
|---|
| 434 |
@param doneDeferred: If specified, this will be the Deferred used as |
|---|
| 435 |
the completion deferred. It is suggested that you use the default, |
|---|
| 436 |
which creates a new Deferred for you. |
|---|
| 437 |
|
|---|
| 438 |
@return: a Deferred that will fire when the iterator finishes. |
|---|
| 439 |
""" |
|---|
| 440 |
if doneDeferred is None: |
|---|
| 441 |
doneDeferred = defer.Deferred() |
|---|
| 442 |
CooperativeTask(iterator, self).whenDone().chainDeferred(doneDeferred) |
|---|
| 443 |
return doneDeferred |
|---|
| 444 |
|
|---|
| 445 |
|
|---|
| 446 |
def cooperate(self, iterator): |
|---|
| 447 |
""" |
|---|
| 448 |
Start running the given iterator as a long-running cooperative task, by |
|---|
| 449 |
calling next() on it as a periodic timed event. |
|---|
| 450 |
|
|---|
| 451 |
@param iterator: the iterator to invoke. |
|---|
| 452 |
|
|---|
| 453 |
@return: a L{CooperativeTask} object representing this task. |
|---|
| 454 |
""" |
|---|
| 455 |
return CooperativeTask(iterator, self) |
|---|
| 456 |
|
|---|
| 457 |
|
|---|
| 458 |
def _addTask(self, task): |
|---|
| 459 |
""" |
|---|
| 460 |
Add a L{CooperativeTask} object to this L{Cooperator}. |
|---|
| 461 |
""" |
|---|
| 462 |
if self._stopped: |
|---|
| 463 |
self._tasks.append(task) |
|---|
| 464 |
|
|---|
| 465 |
task._completeWith(SchedulerStopped(), Failure(SchedulerStopped())) |
|---|
| 466 |
else: |
|---|
| 467 |
self._tasks.append(task) |
|---|
| 468 |
self._reschedule() |
|---|
| 469 |
|
|---|
| 470 |
|
|---|
| 471 |
def _removeTask(self, task): |
|---|
| 472 |
""" |
|---|
| 473 |
Remove a L{CooperativeTask} from this L{Cooperator}. |
|---|
| 474 |
""" |
|---|
| 475 |
self._tasks.remove(task) |
|---|
| 476 |
|
|---|
| 477 |
|
|---|
| 478 |
def _tasksWhileNotStopped(self): |
|---|
| 479 |
""" |
|---|
| 480 |
Yield all L{CooperativeTask} objects in a loop as long as this |
|---|
| 481 |
L{Cooperator}'s termination condition has not been met. |
|---|
| 482 |
""" |
|---|
| 483 |
terminator = self._terminationPredicateFactory() |
|---|
| 484 |
while self._tasks: |
|---|
| 485 |
for t in self._metarator: |
|---|
| 486 |
yield t |
|---|
| 487 |
if terminator(): |
|---|
| 488 |
return |
|---|
| 489 |
self._metarator = iter(self._tasks) |
|---|
| 490 |
|
|---|
| 491 |
|
|---|
| 492 |
def _tick(self): |
|---|
| 493 |
""" |
|---|
| 494 |
Run one scheduler tick. |
|---|
| 495 |
""" |
|---|
| 496 |
self._delayedCall = None |
|---|
| 497 |
for taskObj in self._tasksWhileNotStopped(): |
|---|
| 498 |
taskObj._oneWorkUnit() |
|---|
| 499 |
self._reschedule() |
|---|
| 500 |
|
|---|
| 501 |
|
|---|
| 502 |
_mustScheduleOnStart = False |
|---|
| 503 |
def _reschedule(self): |
|---|
| 504 |
if not self._started: |
|---|
| 505 |
self._mustScheduleOnStart = True |
|---|
| 506 |
return |
|---|
| 507 |
if self._delayedCall is None and self._tasks: |
|---|
| 508 |
self._delayedCall = self._scheduler(self._tick) |
|---|
| 509 |
|
|---|
| 510 |
|
|---|
| 511 |
def start(self): |
|---|
| 512 |
""" |
|---|
| 513 |
Begin scheduling steps. |
|---|
| 514 |
""" |
|---|
| 515 |
self._stopped = False |
|---|
| 516 |
self._started = True |
|---|
| 517 |
if self._mustScheduleOnStart: |
|---|
| 518 |
del self._mustScheduleOnStart |
|---|
| 519 |
self._reschedule() |
|---|
| 520 |
|
|---|
| 521 |
|
|---|
| 522 |
def stop(self): |
|---|
| 523 |
""" |
|---|
| 524 |
Stop scheduling steps. Errback the completion Deferreds of all |
|---|
| 525 |
iterators which have been added and forget about them. |
|---|
| 526 |
""" |
|---|
| 527 |
self._stopped = True |
|---|
| 528 |
for taskObj in self._tasks: |
|---|
| 529 |
taskObj._completeWith(SchedulerStopped(), |
|---|
| 530 |
Failure(SchedulerStopped())) |
|---|
| 531 |
self._tasks = [] |
|---|
| 532 |
if self._delayedCall is not None: |
|---|
| 533 |
self._delayedCall.cancel() |
|---|
| 534 |
self._delayedCall = None |
|---|
| 535 |
|
|---|
| 536 |
|
|---|
| 537 |
|
|---|
| 538 |
_theCooperator = Cooperator() |
|---|
| 539 |
|
|---|
| 540 |
def coiterate(iterator): |
|---|
| 541 |
""" |
|---|
| 542 |
Cooperatively iterate over the given iterator, dividing runtime between it |
|---|
| 543 |
and all other iterators which have been passed to this function and not yet |
|---|
| 544 |
exhausted. |
|---|
| 545 |
""" |
|---|
| 546 |
return _theCooperator.coiterate(iterator) |
|---|
| 547 |
|
|---|
| 548 |
|
|---|
| 549 |
|
|---|
| 550 |
def cooperate(iterator): |
|---|
| 551 |
""" |
|---|
| 552 |
Start running the given iterator as a long-running cooperative task, by |
|---|
| 553 |
calling next() on it as a periodic timed event. |
|---|
| 554 |
|
|---|
| 555 |
@param iterator: the iterator to invoke. |
|---|
| 556 |
|
|---|
| 557 |
@return: a L{CooperativeTask} object representing this task. |
|---|
| 558 |
""" |
|---|
| 559 |
return _theCooperator.cooperate(iterator) |
|---|
| 560 |
|
|---|
| 561 |
|
|---|
| 562 |
|
|---|
| 563 |
|
|---|
| 564 |
class Clock: |
|---|
| 565 |
""" |
|---|
| 566 |
Provide a deterministic, easily-controlled implementation of |
|---|
| 567 |
L{IReactorTime.callLater}. This is commonly useful for writing |
|---|
| 568 |
deterministic unit tests for code which schedules events using this API. |
|---|
| 569 |
""" |
|---|
| 570 |
implements(IReactorTime) |
|---|
| 571 |
|
|---|
| 572 |
rightNow = 0.0 |
|---|
| 573 |
|
|---|
| 574 |
def __init__(self): |
|---|
| 575 |
self.calls = [] |
|---|
| 576 |
|
|---|
| 577 |
def seconds(self): |
|---|
| 578 |
""" |
|---|
| 579 |
Pretend to be time.time(). This is used internally when an operation |
|---|
| 580 |
such as L{IDelayedCall.reset} needs to determine a a time value |
|---|
| 581 |
relative to the current time. |
|---|
| 582 |
|
|---|
| 583 |
@rtype: C{float} |
|---|
| 584 |
@return: The time which should be considered the current time. |
|---|
| 585 |
""" |
|---|
| 586 |
return self.rightNow |
|---|
| 587 |
|
|---|
| 588 |
|
|---|
| 589 |
def callLater(self, when, what, *a, **kw): |
|---|
| 590 |
""" |
|---|
| 591 |
See L{twisted.internet.interfaces.IReactorTime.callLater}. |
|---|
| 592 |
""" |
|---|
| 593 |
dc = base.DelayedCall(self.seconds() + when, |
|---|
| 594 |
what, a, kw, |
|---|
| 595 |
self.calls.remove, |
|---|
| 596 |
lambda c: None, |
|---|
| 597 |
self.seconds) |
|---|
| 598 |
self.calls.append(dc) |
|---|
| 599 |
self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime())) |
|---|
| 600 |
return dc |
|---|
| 601 |
|
|---|
| 602 |
def getDelayedCalls(self): |
|---|
| 603 |
""" |
|---|
| 604 |
See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls} |
|---|
| 605 |
""" |
|---|
| 606 |
return self.calls |
|---|
| 607 |
|
|---|
| 608 |
def advance(self, amount): |
|---|
| 609 |
""" |
|---|
| 610 |
Move time on this clock forward by the given amount and run whatever |
|---|
| 611 |
pending calls should be run. |
|---|
| 612 |
|
|---|
| 613 |
@type amount: C{float} |
|---|
| 614 |
@param amount: The number of seconds which to advance this clock's |
|---|
| 615 |
time. |
|---|
| 616 |
""" |
|---|
| 617 |
self.rightNow += amount |
|---|
| 618 |
while self.calls and self.calls[0].getTime() <= self.seconds(): |
|---|
| 619 |
call = self.calls.pop(0) |
|---|
| 620 |
call.called = 1 |
|---|
| 621 |
call.func(*call.args, **call.kw) |
|---|
| 622 |
|
|---|
| 623 |
|
|---|
| 624 |
def pump(self, timings): |
|---|
| 625 |
""" |
|---|
| 626 |
Advance incrementally by the given set of times. |
|---|
| 627 |
|
|---|
| 628 |
@type timings: iterable of C{float} |
|---|
| 629 |
""" |
|---|
| 630 |
for amount in timings: |
|---|
| 631 |
self.advance(amount) |
|---|
| 632 |
|
|---|
| 633 |
|
|---|
| 634 |
def deferLater(clock, delay, callable, *args, **kw): |
|---|
| 635 |
""" |
|---|
| 636 |
Call the given function after a certain period of time has passed. |
|---|
| 637 |
|
|---|
| 638 |
@type clock: L{IReactorTime} provider |
|---|
| 639 |
@param clock: The object which will be used to schedule the delayed |
|---|
| 640 |
call. |
|---|
| 641 |
|
|---|
| 642 |
@type delay: C{float} or C{int} |
|---|
| 643 |
@param delay: The number of seconds to wait before calling the function. |
|---|
| 644 |
|
|---|
| 645 |
@param callable: The object to call after the delay. |
|---|
| 646 |
|
|---|
| 647 |
@param *args: The positional arguments to pass to C{callable}. |
|---|
| 648 |
|
|---|
| 649 |
@param **kw: The keyword arguments to pass to C{callable}. |
|---|
| 650 |
|
|---|
| 651 |
@rtype: L{defer.Deferred} |
|---|
| 652 |
|
|---|
| 653 |
@return: A deferred that fires with the result of the callable when the |
|---|
| 654 |
specified time has elapsed. |
|---|
| 655 |
""" |
|---|
| 656 |
d = defer.Deferred() |
|---|
| 657 |
d.addCallback(lambda ignored: callable(*args, **kw)) |
|---|
| 658 |
clock.callLater(delay, d.callback, None) |
|---|
| 659 |
return d |
|---|
| 660 |
|
|---|
| 661 |
|
|---|
| 662 |
|
|---|
| 663 |
__all__ = [ |
|---|
| 664 |
'LoopingCall', |
|---|
| 665 |
|
|---|
| 666 |
'Clock', |
|---|
| 667 |
|
|---|
| 668 |
'SchedulerStopped', 'Cooperator', 'coiterate', |
|---|
| 669 |
|
|---|
| 670 |
'deferLater', |
|---|
| 671 |
] |
|---|