| 1 |
|
|---|
| 2 |
|
|---|
| 3 |
|
|---|
| 4 |
|
|---|
| 5 |
""" |
|---|
| 6 |
Very basic functionality for a Reactor implementation. |
|---|
| 7 |
""" |
|---|
| 8 |
|
|---|
| 9 |
import socket |
|---|
| 10 |
from zope.interface import implements, classImplements |
|---|
| 11 |
|
|---|
| 12 |
import sys |
|---|
| 13 |
import warnings |
|---|
| 14 |
import operator |
|---|
| 15 |
from heapq import heappush, heappop, heapify |
|---|
| 16 |
|
|---|
| 17 |
import traceback |
|---|
| 18 |
|
|---|
| 19 |
from twisted.python.compat import set |
|---|
| 20 |
from twisted.python.util import unsignedID |
|---|
| 21 |
from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThreads |
|---|
| 22 |
from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver |
|---|
| 23 |
from twisted.internet.interfaces import IConnector, IDelayedCall |
|---|
| 24 |
from twisted.internet import fdesc, main, error, abstract, defer, threads |
|---|
| 25 |
from twisted.python import log, failure, reflect |
|---|
| 26 |
from twisted.python.runtime import seconds as runtimeSeconds, platform, platformType |
|---|
| 27 |
from twisted.internet.defer import Deferred, DeferredList |
|---|
| 28 |
from twisted.persisted import styles |
|---|
| 29 |
|
|---|
| 30 |
|
|---|
| 31 |
|
|---|
| 32 |
from twisted.python import threadable |
|---|
| 33 |
|
|---|
| 34 |
|
|---|
| 35 |
class DelayedCall(styles.Ephemeral): |
|---|
| 36 |
|
|---|
| 37 |
implements(IDelayedCall) |
|---|
| 38 |
|
|---|
| 39 |
|
|---|
| 40 |
debug = False |
|---|
| 41 |
_str = None |
|---|
| 42 |
|
|---|
| 43 |
def __init__(self, time, func, args, kw, cancel, reset, |
|---|
| 44 |
seconds=runtimeSeconds): |
|---|
| 45 |
""" |
|---|
| 46 |
@param time: Seconds from the epoch at which to call C{func}. |
|---|
| 47 |
@param func: The callable to call. |
|---|
| 48 |
@param args: The positional arguments to pass to the callable. |
|---|
| 49 |
@param kw: The keyword arguments to pass to the callable. |
|---|
| 50 |
@param cancel: A callable which will be called with this |
|---|
| 51 |
DelayedCall before cancellation. |
|---|
| 52 |
@param reset: A callable which will be called with this |
|---|
| 53 |
DelayedCall after changing this DelayedCall's scheduled |
|---|
| 54 |
execution time. The callable should adjust any necessary |
|---|
| 55 |
scheduling details to ensure this DelayedCall is invoked |
|---|
| 56 |
at the new appropriate time. |
|---|
| 57 |
@param seconds: If provided, a no-argument callable which will be |
|---|
| 58 |
used to determine the current time any time that information is |
|---|
| 59 |
needed. |
|---|
| 60 |
""" |
|---|
| 61 |
self.time, self.func, self.args, self.kw = time, func, args, kw |
|---|
| 62 |
self.resetter = reset |
|---|
| 63 |
self.canceller = cancel |
|---|
| 64 |
self.seconds = seconds |
|---|
| 65 |
self.cancelled = self.called = 0 |
|---|
| 66 |
self.delayed_time = 0 |
|---|
| 67 |
if self.debug: |
|---|
| 68 |
self.creator = traceback.format_stack()[:-2] |
|---|
| 69 |
|
|---|
| 70 |
def getTime(self): |
|---|
| 71 |
"""Return the time at which this call will fire |
|---|
| 72 |
|
|---|
| 73 |
@rtype: C{float} |
|---|
| 74 |
@return: The number of seconds after the epoch at which this call is |
|---|
| 75 |
scheduled to be made. |
|---|
| 76 |
""" |
|---|
| 77 |
return self.time + self.delayed_time |
|---|
| 78 |
|
|---|
| 79 |
def cancel(self): |
|---|
| 80 |
"""Unschedule this call |
|---|
| 81 |
|
|---|
| 82 |
@raise AlreadyCancelled: Raised if this call has already been |
|---|
| 83 |
unscheduled. |
|---|
| 84 |
|
|---|
| 85 |
@raise AlreadyCalled: Raised if this call has already been made. |
|---|
| 86 |
""" |
|---|
| 87 |
if self.cancelled: |
|---|
| 88 |
raise error.AlreadyCancelled |
|---|
| 89 |
elif self.called: |
|---|
| 90 |
raise error.AlreadyCalled |
|---|
| 91 |
else: |
|---|
| 92 |
self.canceller(self) |
|---|
| 93 |
self.cancelled = 1 |
|---|
| 94 |
if self.debug: |
|---|
| 95 |
self._str = str(self) |
|---|
| 96 |
del self.func, self.args, self.kw |
|---|
| 97 |
|
|---|
| 98 |
def reset(self, secondsFromNow): |
|---|
| 99 |
"""Reschedule this call for a different time |
|---|
| 100 |
|
|---|
| 101 |
@type secondsFromNow: C{float} |
|---|
| 102 |
@param secondsFromNow: The number of seconds from the time of the |
|---|
| 103 |
C{reset} call at which this call will be scheduled. |
|---|
| 104 |
|
|---|
| 105 |
@raise AlreadyCancelled: Raised if this call has been cancelled. |
|---|
| 106 |
@raise AlreadyCalled: Raised if this call has already been made. |
|---|
| 107 |
""" |
|---|
| 108 |
if self.cancelled: |
|---|
| 109 |
raise error.AlreadyCancelled |
|---|
| 110 |
elif self.called: |
|---|
| 111 |
raise error.AlreadyCalled |
|---|
| 112 |
else: |
|---|
| 113 |
newTime = self.seconds() + secondsFromNow |
|---|
| 114 |
if newTime < self.time: |
|---|
| 115 |
self.delayed_time = 0 |
|---|
| 116 |
self.time = newTime |
|---|
| 117 |
self.resetter(self) |
|---|
| 118 |
else: |
|---|
| 119 |
self.delayed_time = newTime - self.time |
|---|
| 120 |
|
|---|
| 121 |
def delay(self, secondsLater): |
|---|
| 122 |
"""Reschedule this call for a later time |
|---|
| 123 |
|
|---|
| 124 |
@type secondsLater: C{float} |
|---|
| 125 |
@param secondsLater: The number of seconds after the originally |
|---|
| 126 |
scheduled time for which to reschedule this call. |
|---|
| 127 |
|
|---|
| 128 |
@raise AlreadyCancelled: Raised if this call has been cancelled. |
|---|
| 129 |
@raise AlreadyCalled: Raised if this call has already been made. |
|---|
| 130 |
""" |
|---|
| 131 |
if self.cancelled: |
|---|
| 132 |
raise error.AlreadyCancelled |
|---|
| 133 |
elif self.called: |
|---|
| 134 |
raise error.AlreadyCalled |
|---|
| 135 |
else: |
|---|
| 136 |
self.delayed_time += secondsLater |
|---|
| 137 |
if self.delayed_time < 0: |
|---|
| 138 |
self.activate_delay() |
|---|
| 139 |
self.resetter(self) |
|---|
| 140 |
|
|---|
| 141 |
def activate_delay(self): |
|---|
| 142 |
self.time += self.delayed_time |
|---|
| 143 |
self.delayed_time = 0 |
|---|
| 144 |
|
|---|
| 145 |
def active(self): |
|---|
| 146 |
"""Determine whether this call is still pending |
|---|
| 147 |
|
|---|
| 148 |
@rtype: C{bool} |
|---|
| 149 |
@return: True if this call has not yet been made or cancelled, |
|---|
| 150 |
False otherwise. |
|---|
| 151 |
""" |
|---|
| 152 |
return not (self.cancelled or self.called) |
|---|
| 153 |
|
|---|
| 154 |
def __le__(self, other): |
|---|
| 155 |
return self.time <= other.time |
|---|
| 156 |
|
|---|
| 157 |
|
|---|
| 158 |
def __str__(self): |
|---|
| 159 |
if self._str is not None: |
|---|
| 160 |
return self._str |
|---|
| 161 |
if hasattr(self, 'func'): |
|---|
| 162 |
if hasattr(self.func, 'func_name'): |
|---|
| 163 |
func = self.func.func_name |
|---|
| 164 |
if hasattr(self.func, 'im_class'): |
|---|
| 165 |
func = self.func.im_class.__name__ + '.' + func |
|---|
| 166 |
else: |
|---|
| 167 |
func = reflect.safe_repr(self.func) |
|---|
| 168 |
else: |
|---|
| 169 |
func = None |
|---|
| 170 |
|
|---|
| 171 |
now = self.seconds() |
|---|
| 172 |
L = ["<DelayedCall 0x%x [%ss] called=%s cancelled=%s" % ( |
|---|
| 173 |
unsignedID(self), self.time - now, self.called, |
|---|
| 174 |
self.cancelled)] |
|---|
| 175 |
if func is not None: |
|---|
| 176 |
L.extend((" ", func, "(")) |
|---|
| 177 |
if self.args: |
|---|
| 178 |
L.append(", ".join([reflect.safe_repr(e) for e in self.args])) |
|---|
| 179 |
if self.kw: |
|---|
| 180 |
L.append(", ") |
|---|
| 181 |
if self.kw: |
|---|
| 182 |
L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.iteritems()])) |
|---|
| 183 |
L.append(")") |
|---|
| 184 |
|
|---|
| 185 |
if self.debug: |
|---|
| 186 |
L.append("\n\ntraceback at creation: \n\n%s" % (' '.join(self.creator))) |
|---|
| 187 |
L.append('>') |
|---|
| 188 |
|
|---|
| 189 |
return "".join(L) |
|---|
| 190 |
|
|---|
| 191 |
|
|---|
| 192 |
|
|---|
| 193 |
class ThreadedResolver(object): |
|---|
| 194 |
""" |
|---|
| 195 |
L{ThreadedResolver} uses a reactor, a threadpool, and |
|---|
| 196 |
L{socket.gethostbyname} to perform name lookups without blocking the |
|---|
| 197 |
reactor thread. It also supports timeouts indepedently from whatever |
|---|
| 198 |
timeout logic L{socket.gethostbyname} might have. |
|---|
| 199 |
|
|---|
| 200 |
@ivar reactor: The reactor the threadpool of which will be used to call |
|---|
| 201 |
L{socket.gethostbyname} and the I/O thread of which the result will be |
|---|
| 202 |
delivered. |
|---|
| 203 |
""" |
|---|
| 204 |
implements(IResolverSimple) |
|---|
| 205 |
|
|---|
| 206 |
def __init__(self, reactor): |
|---|
| 207 |
self.reactor = reactor |
|---|
| 208 |
self._runningQueries = {} |
|---|
| 209 |
|
|---|
| 210 |
|
|---|
| 211 |
def _fail(self, name, err): |
|---|
| 212 |
err = error.DNSLookupError("address %r not found: %s" % (name, err)) |
|---|
| 213 |
return failure.Failure(err) |
|---|
| 214 |
|
|---|
| 215 |
|
|---|
| 216 |
def _cleanup(self, name, lookupDeferred): |
|---|
| 217 |
userDeferred, cancelCall = self._runningQueries[lookupDeferred] |
|---|
| 218 |
del self._runningQueries[lookupDeferred] |
|---|
| 219 |
userDeferred.errback(self._fail(name, "timeout error")) |
|---|
| 220 |
|
|---|
| 221 |
|
|---|
| 222 |
def _checkTimeout(self, result, name, lookupDeferred): |
|---|
| 223 |
try: |
|---|
| 224 |
userDeferred, cancelCall = self._runningQueries[lookupDeferred] |
|---|
| 225 |
except KeyError: |
|---|
| 226 |
pass |
|---|
| 227 |
else: |
|---|
| 228 |
del self._runningQueries[lookupDeferred] |
|---|
| 229 |
cancelCall.cancel() |
|---|
| 230 |
|
|---|
| 231 |
if isinstance(result, failure.Failure): |
|---|
| 232 |
userDeferred.errback(self._fail(name, result.getErrorMessage())) |
|---|
| 233 |
else: |
|---|
| 234 |
userDeferred.callback(result) |
|---|
| 235 |
|
|---|
| 236 |
|
|---|
| 237 |
def getHostByName(self, name, timeout = (1, 3, 11, 45)): |
|---|
| 238 |
""" |
|---|
| 239 |
See L{twisted.internet.interfaces.IResolverSimple.getHostByName}. |
|---|
| 240 |
|
|---|
| 241 |
Note that the elements of C{timeout} are summed and the result is used |
|---|
| 242 |
as a timeout for the lookup. Any intermediate timeout or retry logic |
|---|
| 243 |
is left up to the platform via L{socket.gethostbyname}. |
|---|
| 244 |
""" |
|---|
| 245 |
if timeout: |
|---|
| 246 |
timeoutDelay = reduce(operator.add, timeout) |
|---|
| 247 |
else: |
|---|
| 248 |
timeoutDelay = 60 |
|---|
| 249 |
userDeferred = defer.Deferred() |
|---|
| 250 |
lookupDeferred = threads.deferToThreadPool( |
|---|
| 251 |
self.reactor, self.reactor.getThreadPool(), |
|---|
| 252 |
socket.gethostbyname, name) |
|---|
| 253 |
cancelCall = self.reactor.callLater( |
|---|
| 254 |
timeoutDelay, self._cleanup, name, lookupDeferred) |
|---|
| 255 |
self._runningQueries[lookupDeferred] = (userDeferred, cancelCall) |
|---|
| 256 |
lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred) |
|---|
| 257 |
return userDeferred |
|---|
| 258 |
|
|---|
| 259 |
|
|---|
| 260 |
|
|---|
| 261 |
class BlockingResolver: |
|---|
| 262 |
implements(IResolverSimple) |
|---|
| 263 |
|
|---|
| 264 |
def getHostByName(self, name, timeout = (1, 3, 11, 45)): |
|---|
| 265 |
try: |
|---|
| 266 |
address = socket.gethostbyname(name) |
|---|
| 267 |
except socket.error: |
|---|
| 268 |
msg = "address %r not found" % (name,) |
|---|
| 269 |
err = error.DNSLookupError(msg) |
|---|
| 270 |
return defer.fail(err) |
|---|
| 271 |
else: |
|---|
| 272 |
return defer.succeed(address) |
|---|
| 273 |
|
|---|
| 274 |
|
|---|
| 275 |
class _ThreePhaseEvent(object): |
|---|
| 276 |
""" |
|---|
| 277 |
Collection of callables (with arguments) which can be invoked as a group in |
|---|
| 278 |
a particular order. |
|---|
| 279 |
|
|---|
| 280 |
This provides the underlying implementation for the reactor's system event |
|---|
| 281 |
triggers. An instance of this class tracks triggers for all phases of a |
|---|
| 282 |
single type of event. |
|---|
| 283 |
|
|---|
| 284 |
@ivar before: A list of the before-phase triggers containing three-tuples |
|---|
| 285 |
of a callable, a tuple of positional arguments, and a dict of keyword |
|---|
| 286 |
arguments |
|---|
| 287 |
|
|---|
| 288 |
@ivar finishedBefore: A list of the before-phase triggers which have |
|---|
| 289 |
already been executed. This is only populated in the C{'BEFORE'} state. |
|---|
| 290 |
|
|---|
| 291 |
@ivar during: A list of the during-phase triggers containing three-tuples |
|---|
| 292 |
of a callable, a tuple of positional arguments, and a dict of keyword |
|---|
| 293 |
arguments |
|---|
| 294 |
|
|---|
| 295 |
@ivar after: A list of the after-phase triggers containing three-tuples |
|---|
| 296 |
of a callable, a tuple of positional arguments, and a dict of keyword |
|---|
| 297 |
arguments |
|---|
| 298 |
|
|---|
| 299 |
@ivar state: A string indicating what is currently going on with this |
|---|
| 300 |
object. One of C{'BASE'} (for when nothing in particular is happening; |
|---|
| 301 |
this is the initial value), C{'BEFORE'} (when the before-phase triggers |
|---|
| 302 |
are in the process of being executed). |
|---|
| 303 |
""" |
|---|
| 304 |
def __init__(self): |
|---|
| 305 |
self.before = [] |
|---|
| 306 |
self.during = [] |
|---|
| 307 |
self.after = [] |
|---|
| 308 |
self.state = 'BASE' |
|---|
| 309 |
|
|---|
| 310 |
|
|---|
| 311 |
def addTrigger(self, phase, callable, *args, **kwargs): |
|---|
| 312 |
""" |
|---|
| 313 |
Add a trigger to the indicate phase. |
|---|
| 314 |
|
|---|
| 315 |
@param phase: One of C{'before'}, C{'during'}, or C{'after'}. |
|---|
| 316 |
|
|---|
| 317 |
@param callable: An object to be called when this event is triggered. |
|---|
| 318 |
@param *args: Positional arguments to pass to C{callable}. |
|---|
| 319 |
@param **kwargs: Keyword arguments to pass to C{callable}. |
|---|
| 320 |
|
|---|
| 321 |
@return: An opaque handle which may be passed to L{removeTrigger} to |
|---|
| 322 |
reverse the effects of calling this method. |
|---|
| 323 |
""" |
|---|
| 324 |
if phase not in ('before', 'during', 'after'): |
|---|
| 325 |
raise KeyError("invalid phase") |
|---|
| 326 |
getattr(self, phase).append((callable, args, kwargs)) |
|---|
| 327 |
return phase, callable, args, kwargs |
|---|
| 328 |
|
|---|
| 329 |
|
|---|
| 330 |
def removeTrigger(self, handle): |
|---|
| 331 |
""" |
|---|
| 332 |
Remove a previously added trigger callable. |
|---|
| 333 |
|
|---|
| 334 |
@param handle: An object previously returned by L{addTrigger}. The |
|---|
| 335 |
trigger added by that call will be removed. |
|---|
| 336 |
|
|---|
| 337 |
@raise ValueError: If the trigger associated with C{handle} has already |
|---|
| 338 |
been removed or if C{handle} is not a valid handle. |
|---|
| 339 |
""" |
|---|
| 340 |
return getattr(self, 'removeTrigger_' + self.state)(handle) |
|---|
| 341 |
|
|---|
| 342 |
|
|---|
| 343 |
def removeTrigger_BASE(self, handle): |
|---|
| 344 |
""" |
|---|
| 345 |
Just try to remove the trigger. |
|---|
| 346 |
|
|---|
| 347 |
@see: removeTrigger |
|---|
| 348 |
""" |
|---|
| 349 |
try: |
|---|
| 350 |
phase, callable, args, kwargs = handle |
|---|
| 351 |
except (TypeError, ValueError), e: |
|---|
| 352 |
raise ValueError("invalid trigger handle") |
|---|
| 353 |
else: |
|---|
| 354 |
if phase not in ('before', 'during', 'after'): |
|---|
| 355 |
raise KeyError("invalid phase") |
|---|
| 356 |
getattr(self, phase).remove((callable, args, kwargs)) |
|---|
| 357 |
|
|---|
| 358 |
|
|---|
| 359 |
def removeTrigger_BEFORE(self, handle): |
|---|
| 360 |
""" |
|---|
| 361 |
Remove the trigger if it has yet to be executed, otherwise emit a |
|---|
| 362 |
warning that in the future an exception will be raised when removing an |
|---|
| 363 |
already-executed trigger. |
|---|
| 364 |
|
|---|
| 365 |
@see: removeTrigger |
|---|
| 366 |
""" |
|---|
| 367 |
phase, callable, args, kwargs = handle |
|---|
| 368 |
if phase != 'before': |
|---|
| 369 |
return self.removeTrigger_BASE(handle) |
|---|
| 370 |
if (callable, args, kwargs) in self.finishedBefore: |
|---|
| 371 |
warnings.warn( |
|---|
| 372 |
"Removing already-fired system event triggers will raise an " |
|---|
| 373 |
"exception in a future version of Twisted.", |
|---|
| 374 |
category=DeprecationWarning, |
|---|
| 375 |
stacklevel=3) |
|---|
| 376 |
else: |
|---|
| 377 |
self.removeTrigger_BASE(handle) |
|---|
| 378 |
|
|---|
| 379 |
|
|---|
| 380 |
def fireEvent(self): |
|---|
| 381 |
""" |
|---|
| 382 |
Call the triggers added to this event. |
|---|
| 383 |
""" |
|---|
| 384 |
self.state = 'BEFORE' |
|---|
| 385 |
self.finishedBefore = [] |
|---|
| 386 |
beforeResults = [] |
|---|
| 387 |
while self.before: |
|---|
| 388 |
callable, args, kwargs = self.before.pop(0) |
|---|
| 389 |
self.finishedBefore.append((callable, args, kwargs)) |
|---|
| 390 |
try: |
|---|
| 391 |
result = callable(*args, **kwargs) |
|---|
| 392 |
except: |
|---|
| 393 |
log.err() |
|---|
| 394 |
else: |
|---|
| 395 |
if isinstance(result, Deferred): |
|---|
| 396 |
beforeResults.append(result) |
|---|
| 397 |
DeferredList(beforeResults).addCallback(self._continueFiring) |
|---|
| 398 |
|
|---|
| 399 |
|
|---|
| 400 |
def _continueFiring(self, ignored): |
|---|
| 401 |
""" |
|---|
| 402 |
Call the during and after phase triggers for this event. |
|---|
| 403 |
""" |
|---|
| 404 |
self.state = 'BASE' |
|---|
| 405 |
self.finishedBefore = [] |
|---|
| 406 |
for phase in self.during, self.after: |
|---|
| 407 |
while phase: |
|---|
| 408 |
callable, args, kwargs = phase.pop(0) |
|---|
| 409 |
try: |
|---|
| 410 |
callable(*args, **kwargs) |
|---|
| 411 |
except: |
|---|
| 412 |
log.err() |
|---|
| 413 |
|
|---|
| 414 |
|
|---|
| 415 |
|
|---|
| 416 |
class ReactorBase(object): |
|---|
| 417 |
""" |
|---|
| 418 |
Default base class for Reactors. |
|---|
| 419 |
|
|---|
| 420 |
@type _stopped: C{bool} |
|---|
| 421 |
@ivar _stopped: A flag which is true between paired calls to C{reactor.run} |
|---|
| 422 |
and C{reactor.stop}. This should be replaced with an explicit state |
|---|
| 423 |
machine. |
|---|
| 424 |
|
|---|
| 425 |
@type _justStopped: C{bool} |
|---|
| 426 |
@ivar _justStopped: A flag which is true between the time C{reactor.stop} |
|---|
| 427 |
is called and the time the shutdown system event is fired. This is |
|---|
| 428 |
used to determine whether that event should be fired after each |
|---|
| 429 |
iteration through the mainloop. This should be replaced with an |
|---|
| 430 |
explicit state machine. |
|---|
| 431 |
|
|---|
| 432 |
@type _started: C{bool} |
|---|
| 433 |
@ivar _started: A flag which is true from the time C{reactor.run} is called |
|---|
| 434 |
until the time C{reactor.run} returns. This is used to prevent calls |
|---|
| 435 |
to C{reactor.run} on a running reactor. This should be replaced with |
|---|
| 436 |
an explicit state machine. |
|---|
| 437 |
|
|---|
| 438 |
@ivar running: See L{IReactorCore.running} |
|---|
| 439 |
""" |
|---|
| 440 |
implements(IReactorCore, IReactorTime, IReactorPluggableResolver) |
|---|
| 441 |
|
|---|
| 442 |
_stopped = True |
|---|
| 443 |
installed = False |
|---|
| 444 |
usingThreads = False |
|---|
| 445 |
resolver = BlockingResolver() |
|---|
| 446 |
|
|---|
| 447 |
__name__ = "twisted.internet.reactor" |
|---|
| 448 |
|
|---|
| 449 |
def __init__(self): |
|---|
| 450 |
self.threadCallQueue = [] |
|---|
| 451 |
self._eventTriggers = {} |
|---|
| 452 |
self._pendingTimedCalls = [] |
|---|
| 453 |
self._newTimedCalls = [] |
|---|
| 454 |
self._cancellations = 0 |
|---|
| 455 |
self.running = False |
|---|
| 456 |
self._started = False |
|---|
| 457 |
self._justStopped = False |
|---|
| 458 |
|
|---|
| 459 |
self._internalReaders = set() |
|---|
| 460 |
self.waker = None |
|---|
| 461 |
|
|---|
| 462 |
|
|---|
| 463 |
|
|---|
| 464 |
|
|---|
| 465 |
self.addSystemEventTrigger( |
|---|
| 466 |
'during', 'startup', self._reallyStartRunning) |
|---|
| 467 |
self.addSystemEventTrigger('during', 'shutdown', self.crash) |
|---|
| 468 |
self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll) |
|---|
| 469 |
|
|---|
| 470 |
if platform.supportsThreads(): |
|---|
| 471 |
self._initThreads() |
|---|
| 472 |
|
|---|
| 473 |
|
|---|
| 474 |
|
|---|
| 475 |
_lock = None |
|---|
| 476 |
|
|---|
| 477 |
def installWaker(self): |
|---|
| 478 |
raise NotImplementedError( |
|---|
| 479 |
reflect.qual(self.__class__) + " did not implement installWaker") |
|---|
| 480 |
|
|---|
| 481 |
def installResolver(self, resolver): |
|---|
| 482 |
assert IResolverSimple.providedBy(resolver) |
|---|
| 483 |
oldResolver = self.resolver |
|---|
| 484 |
self.resolver = resolver |
|---|
| 485 |
return oldResolver |
|---|
| 486 |
|
|---|
| 487 |
def wakeUp(self): |
|---|
| 488 |
""" |
|---|
| 489 |
Wake up the event loop. |
|---|
| 490 |
""" |
|---|
| 491 |
if self.waker: |
|---|
| 492 |
self.waker.wakeUp() |
|---|
| 493 |
|
|---|
| 494 |
|
|---|
| 495 |
|
|---|
| 496 |
def doIteration(self, delay): |
|---|
| 497 |
""" |
|---|
| 498 |
Do one iteration over the readers and writers which have been added. |
|---|
| 499 |
""" |
|---|
| 500 |
raise NotImplementedError( |
|---|
| 501 |
reflect.qual(self.__class__) + " did not implement doIteration") |
|---|
| 502 |
|
|---|
| 503 |
def addReader(self, reader): |
|---|
| 504 |
raise NotImplementedError( |
|---|
| 505 |
reflect.qual(self.__class__) + " did not implement addReader") |
|---|
| 506 |
|
|---|
| 507 |
def addWriter(self, writer): |
|---|
| 508 |
raise NotImplementedError( |
|---|
| 509 |
reflect.qual(self.__class__) + " did not implement addWriter") |
|---|
| 510 |
|
|---|
| 511 |
def removeReader(self, reader): |
|---|
| 512 |
raise NotImplementedError( |
|---|
| 513 |
reflect.qual(self.__class__) + " did not implement removeReader") |
|---|
| 514 |
|
|---|
| 515 |
def removeWriter(self, writer): |
|---|
| 516 |
raise NotImplementedError( |
|---|
| 517 |
reflect.qual(self.__class__) + " did not implement removeWriter") |
|---|
| 518 |
|
|---|
| 519 |
def removeAll(self): |
|---|
| 520 |
raise NotImplementedError( |
|---|
| 521 |
reflect.qual(self.__class__) + " did not implement removeAll") |
|---|
| 522 |
|
|---|
| 523 |
|
|---|
| 524 |
def getReaders(self): |
|---|
| 525 |
raise NotImplementedError( |
|---|
| 526 |
reflect.qual(self.__class__) + " did not implement getReaders") |
|---|
| 527 |
|
|---|
| 528 |
|
|---|
| 529 |
def getWriters(self): |
|---|
| 530 |
raise NotImplementedError( |
|---|
| 531 |
reflect.qual(self.__class__) + " did not implement getWriters") |
|---|
| 532 |
|
|---|
| 533 |
|
|---|
| 534 |
def resolve(self, name, timeout = (1, 3, 11, 45)): |
|---|
| 535 |
"""Return a Deferred that will resolve a hostname. |
|---|
| 536 |
""" |
|---|
| 537 |
if not name: |
|---|
| 538 |
|
|---|
| 539 |
return defer.succeed('0.0.0.0') |
|---|
| 540 |
if abstract.isIPAddress(name): |
|---|
| 541 |
return defer.succeed(name) |
|---|
| 542 |
return self.resolver.getHostByName(name, timeout) |
|---|
| 543 |
|
|---|
| 544 |
|
|---|
| 545 |
|
|---|
| 546 |
|
|---|
| 547 |
def stop(self): |
|---|
| 548 |
""" |
|---|
| 549 |
See twisted.internet.interfaces.IReactorCore.stop. |
|---|
| 550 |
""" |
|---|
| 551 |
if self._stopped: |
|---|
| 552 |
raise error.ReactorNotRunning( |
|---|
| 553 |
"Can't stop reactor that isn't running.") |
|---|
| 554 |
self._stopped = True |
|---|
| 555 |
self._justStopped = True |
|---|
| 556 |
|
|---|
| 557 |
|
|---|
| 558 |
def crash(self): |
|---|
| 559 |
""" |
|---|
| 560 |
See twisted.internet.interfaces.IReactorCore.crash. |
|---|
| 561 |
|
|---|
| 562 |
Reset reactor state tracking attributes and re-initialize certain |
|---|
| 563 |
state-transition helpers which were set up in C{__init__} but later |
|---|
| 564 |
destroyed (through use). |
|---|
| 565 |
""" |
|---|
| 566 |
self._started = False |
|---|
| 567 |
self.running = False |
|---|
| 568 |
self.addSystemEventTrigger( |
|---|
| 569 |
'during', 'startup', self._reallyStartRunning) |
|---|
| 570 |
|
|---|
| 571 |
def sigInt(self, *args): |
|---|
| 572 |
"""Handle a SIGINT interrupt. |
|---|
| 573 |
""" |
|---|
| 574 |
log.msg("Received SIGINT, shutting down.") |
|---|
| 575 |
self.callFromThread(self.stop) |
|---|
| 576 |
|
|---|
| 577 |
def sigBreak(self, *args): |
|---|
| 578 |
"""Handle a SIGBREAK interrupt. |
|---|
| 579 |
""" |
|---|
| 580 |
log.msg("Received SIGBREAK, shutting down.") |
|---|
| 581 |
self.callFromThread(self.stop) |
|---|
| 582 |
|
|---|
| 583 |
def sigTerm(self, *args): |
|---|
| 584 |
"""Handle a SIGTERM interrupt. |
|---|
| 585 |
""" |
|---|
| 586 |
log.msg("Received SIGTERM, shutting down.") |
|---|
| 587 |
self.callFromThread(self.stop) |
|---|
| 588 |
|
|---|
| 589 |
def disconnectAll(self): |
|---|
| 590 |
"""Disconnect every reader, and writer in the system. |
|---|
| 591 |
""" |
|---|
| 592 |
selectables = self.removeAll() |
|---|
| 593 |
for reader in selectables: |
|---|
| 594 |
log.callWithLogger(reader, |
|---|
| 595 |
reader.connectionLost, |
|---|
| 596 |
failure.Failure(main.CONNECTION_LOST)) |
|---|
| 597 |
|
|---|
| 598 |
|
|---|
| 599 |
def iterate(self, delay=0): |
|---|
| 600 |
"""See twisted.internet.interfaces.IReactorCore.iterate. |
|---|
| 601 |
""" |
|---|
| 602 |
self.runUntilCurrent() |
|---|
| 603 |
self.doIteration(delay) |
|---|
| 604 |
|
|---|
| 605 |
|
|---|
| 606 |
def fireSystemEvent(self, eventType): |
|---|
| 607 |
"""See twisted.internet.interfaces.IReactorCore.fireSystemEvent. |
|---|
| 608 |
""" |
|---|
| 609 |
event = self._eventTriggers.get(eventType) |
|---|
| 610 |
if event is not None: |
|---|
| 611 |
event.fireEvent() |
|---|
| 612 |
|
|---|
| 613 |
|
|---|
| 614 |
def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw): |
|---|
| 615 |
"""See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger. |
|---|
| 616 |
""" |
|---|
| 617 |
assert callable(_f), "%s is not callable" % _f |
|---|
| 618 |
if _eventType not in self._eventTriggers: |
|---|
| 619 |
self._eventTriggers[_eventType] = _ThreePhaseEvent() |
|---|
| 620 |
return (_eventType, self._eventTriggers[_eventType].addTrigger( |
|---|
| 621 |
_phase, _f, *args, **kw)) |
|---|
| 622 |
|
|---|
| 623 |
|
|---|
| 624 |
def removeSystemEventTrigger(self, triggerID): |
|---|
| 625 |
"""See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger. |
|---|
| 626 |
""" |
|---|
| 627 |
eventType, handle = triggerID |
|---|
| 628 |
self._eventTriggers[eventType].removeTrigger(handle) |
|---|
| 629 |
|
|---|
| 630 |
|
|---|
| 631 |
def callWhenRunning(self, _callable, *args, **kw): |
|---|
| 632 |
"""See twisted.internet.interfaces.IReactorCore.callWhenRunning. |
|---|
| 633 |
""" |
|---|
| 634 |
if self.running: |
|---|
| 635 |
_callable(*args, **kw) |
|---|
| 636 |
else: |
|---|
| 637 |
return self.addSystemEventTrigger('after', 'startup', |
|---|
| 638 |
_callable, *args, **kw) |
|---|
| 639 |
|
|---|
| 640 |
def startRunning(self): |
|---|
| 641 |
""" |
|---|
| 642 |
Method called when reactor starts: do some initialization and fire |
|---|
| 643 |
startup events. |
|---|
| 644 |
|
|---|
| 645 |
Don't call this directly, call reactor.run() instead: it should take |
|---|
| 646 |
care of calling this. |
|---|
| 647 |
|
|---|
| 648 |
This method is somewhat misnamed. The reactor will not necessarily be |
|---|
| 649 |
in the running state by the time this method returns. The only |
|---|
| 650 |
guarantee is that it will be on its way to the running state. |
|---|
| 651 |
""" |
|---|
| 652 |
if self._started: |
|---|
| 653 |
raise error.ReactorAlreadyRunning() |
|---|
| 654 |
self._started = True |
|---|
| 655 |
self._stopped = False |
|---|
| 656 |
threadable.registerAsIOThread() |
|---|
| 657 |
self.fireSystemEvent('startup') |
|---|
| 658 |
|
|---|
| 659 |
|
|---|
| 660 |
def _reallyStartRunning(self): |
|---|
| 661 |
""" |
|---|
| 662 |
Method called to transition to the running state. This should happen |
|---|
| 663 |
in the I{during startup} event trigger phase. |
|---|
| 664 |
""" |
|---|
| 665 |
self.running = True |
|---|
| 666 |
|
|---|
| 667 |
|
|---|
| 668 |
|
|---|
| 669 |
seconds = staticmethod(runtimeSeconds) |
|---|
| 670 |
|
|---|
| 671 |
def callLater(self, _seconds, _f, *args, **kw): |
|---|
| 672 |
"""See twisted.internet.interfaces.IReactorTime.callLater. |
|---|
| 673 |
""" |
|---|
| 674 |
assert callable(_f), "%s is not callable" % _f |
|---|
| 675 |
assert sys.maxint >= _seconds >= 0, \ |
|---|
| 676 |
"%s is not greater than or equal to 0 seconds" % (_seconds,) |
|---|
| 677 |
tple = DelayedCall(self.seconds() + _seconds, _f, args, kw, |
|---|
| 678 |
self._cancelCallLater, |
|---|
| 679 |
self._moveCallLaterSooner, |
|---|
| 680 |
seconds=self.seconds) |
|---|
| 681 |
self._newTimedCalls.append(tple) |
|---|
| 682 |
return tple |
|---|
| 683 |
|
|---|
| 684 |
def _moveCallLaterSooner(self, tple): |
|---|
| 685 |
|
|---|
| 686 |
heap = self._pendingTimedCalls |
|---|
| 687 |
try: |
|---|
| 688 |
pos = heap.index(tple) |
|---|
| 689 |
|
|---|
| 690 |
|
|---|
| 691 |
elt = heap[pos] |
|---|
| 692 |
while pos != 0: |
|---|
| 693 |
parent = (pos-1) // 2 |
|---|
| 694 |
if heap[parent] <= elt: |
|---|
| 695 |
break |
|---|
| 696 |
|
|---|
| 697 |
heap[pos] = heap[parent] |
|---|
| 698 |
pos = parent |
|---|
| 699 |
heap[pos] = elt |
|---|
| 700 |
except ValueError: |
|---|
| 701 |
|
|---|
| 702 |
pass |
|---|
| 703 |
|
|---|
| 704 |
def _cancelCallLater(self, tple): |
|---|
| 705 |
self._cancellations+=1 |
|---|
| 706 |
|
|---|
| 707 |
def cancelCallLater(self, callID): |
|---|
| 708 |
"""See twisted.internet.interfaces.IReactorTime.cancelCallLater. |
|---|
| 709 |
""" |
|---|
| 710 |
|
|---|
| 711 |
|
|---|
| 712 |
warnings.warn("reactor.cancelCallLater(callID) is deprecated - use callID.cancel() instead") |
|---|
| 713 |
callID.cancel() |
|---|
| 714 |
|
|---|
| 715 |
def getDelayedCalls(self): |
|---|
| 716 |
"""Return all the outstanding delayed calls in the system. |
|---|
| 717 |
They are returned in no particular order. |
|---|
| 718 |
This method is not efficient -- it is really only meant for |
|---|
| 719 |
test cases.""" |
|---|
| 720 |
return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if not x.cancelled] |
|---|
| 721 |
|
|---|
| 722 |
def _insertNewDelayedCalls(self): |
|---|
| 723 |
for call in self._newTimedCalls: |
|---|
| 724 |
if call.cancelled: |
|---|
| 725 |
self._cancellations-=1 |
|---|
| 726 |
else: |
|---|
| 727 |
call.activate_delay() |
|---|
| 728 |
heappush(self._pendingTimedCalls, call) |
|---|
| 729 |
self._newTimedCalls = [] |
|---|
| 730 |
|
|---|
| 731 |
def timeout(self): |
|---|
| 732 |
|
|---|
| 733 |
self._insertNewDelayedCalls() |
|---|
| 734 |
|
|---|
| 735 |
if not self._pendingTimedCalls: |
|---|
| 736 |
return None |
|---|
| 737 |
|
|---|
| 738 |
return max(0, self._pendingTimedCalls[0].time - self.seconds()) |
|---|
| 739 |
|
|---|
| 740 |
|
|---|
| 741 |
def runUntilCurrent(self): |
|---|
| 742 |
"""Run all pending timed calls. |
|---|
| 743 |
""" |
|---|
| 744 |
if self.threadCallQueue: |
|---|
| 745 |
|
|---|
| 746 |
|
|---|
| 747 |
|
|---|
| 748 |
count = 0 |
|---|
| 749 |
total = len(self.threadCallQueue) |
|---|
| 750 |
for (f, a, kw) in self.threadCallQueue: |
|---|
| 751 |
try: |
|---|
| 752 |
f(*a, **kw) |
|---|
| 753 |
except: |
|---|
| 754 |
log.err() |
|---|
| 755 |
count += 1 |
|---|
| 756 |
if count == total: |
|---|
| 757 |
break |
|---|
| 758 |
del self.threadCallQueue[:count] |
|---|
| 759 |
if self.threadCallQueue: |
|---|
| 760 |
self.wakeUp() |
|---|
| 761 |
|
|---|
| 762 |
|
|---|
| 763 |
self._insertNewDelayedCalls() |
|---|
| 764 |
|
|---|
| 765 |
now = self.seconds() |
|---|
| 766 |
while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now): |
|---|
| 767 |
call = heappop(self._pendingTimedCalls) |
|---|
| 768 |
if call.cancelled: |
|---|
| 769 |
self._cancellations-=1 |
|---|
| 770 |
continue |
|---|
| 771 |
|
|---|
| 772 |
if call.delayed_time > 0: |
|---|
| 773 |
call.activate_delay() |
|---|
| 774 |
heappush(self._pendingTimedCalls, call) |
|---|
| 775 |
continue |
|---|
| 776 |
|
|---|
| 777 |
try: |
|---|
| 778 |
call.called = 1 |
|---|
| 779 |
call.func(*call.args, **call.kw) |
|---|
| 780 |
except: |
|---|
| 781 |
log.deferr() |
|---|
| 782 |
if hasattr(call, "creator"): |
|---|
| 783 |
e = "\n" |
|---|
| 784 |
e += " C: previous exception occurred in " + \ |
|---|
| 785 |
"a DelayedCall created here:\n" |
|---|
| 786 |
e += " C:" |
|---|
| 787 |
e += "".join(call.creator).rstrip().replace("\n","\n C:") |
|---|
| 788 |
e += "\n" |
|---|
| 789 |
log.msg(e) |
|---|
| 790 |
|
|---|
| 791 |
|
|---|
| 792 |
if (self._cancellations > 50 and |
|---|
| 793 |
self._cancellations > len(self._pendingTimedCalls) >> 1): |
|---|
| 794 |
self._cancellations = 0 |
|---|
| 795 |
self._pendingTimedCalls = [x for x in self._pendingTimedCalls |
|---|
| 796 |
if not x.cancelled] |
|---|
| 797 |
heapify(self._pendingTimedCalls) |
|---|
| 798 |
|
|---|
| 799 |
if self._justStopped: |
|---|
| 800 |
self._justStopped = False |
|---|
| 801 |
self.fireSystemEvent("shutdown") |
|---|
| 802 |
|
|---|
| 803 |
|
|---|
| 804 |
|
|---|
| 805 |
def _checkProcessArgs(self, args, env): |
|---|
| 806 |
""" |
|---|
| 807 |
Check for valid arguments and environment to spawnProcess. |
|---|
| 808 |
|
|---|
| 809 |
@return: A two element tuple giving values to use when creating the |
|---|
| 810 |
process. The first element of the tuple is a C{list} of C{str} |
|---|
| 811 |
giving the values for argv of the child process. The second element |
|---|
| 812 |
of the tuple is either C{None} if C{env} was C{None} or a C{dict} |
|---|
| 813 |
mapping C{str} environment keys to C{str} environment values. |
|---|
| 814 |
""" |
|---|
| 815 |
|
|---|
| 816 |
|
|---|
| 817 |
|
|---|
| 818 |
|
|---|
| 819 |
|
|---|
| 820 |
|
|---|
| 821 |
|
|---|
| 822 |
|
|---|
| 823 |
|
|---|
| 824 |
|
|---|
| 825 |
|
|---|
| 826 |
|
|---|
| 827 |
|
|---|
| 828 |
|
|---|
| 829 |
|
|---|
| 830 |
|
|---|
| 831 |
|
|---|
| 832 |
defaultEncoding = sys.getdefaultencoding() |
|---|
| 833 |
|
|---|
| 834 |
|
|---|
| 835 |
def argChecker(arg): |
|---|
| 836 |
""" |
|---|
| 837 |
Return either a str or None. If the given value is not |
|---|
| 838 |
allowable for some reason, None is returned. Otherwise, a |
|---|
| 839 |
possibly different object which should be used in place of arg |
|---|
| 840 |
is returned. This forces unicode encoding to happen now, rather |
|---|
| 841 |
than implicitly later. |
|---|
| 842 |
""" |
|---|
| 843 |
if isinstance(arg, unicode): |
|---|
| 844 |
try: |
|---|
| 845 |
arg = arg.encode(defaultEncoding) |
|---|
| 846 |
except UnicodeEncodeError: |
|---|
| 847 |
return None |
|---|
| 848 |
warnings.warn( |
|---|
| 849 |
"Argument strings and environment keys/values passed to " |
|---|
| 850 |
"reactor.spawnProcess should be str, not unicode.", |
|---|
| 851 |
category=DeprecationWarning, |
|---|
| 852 |
stacklevel=4) |
|---|
| 853 |
if isinstance(arg, str) and '\0' not in arg: |
|---|
| 854 |
return arg |
|---|
| 855 |
return None |
|---|
| 856 |
|
|---|
| 857 |
|
|---|
| 858 |
if not isinstance(args, (tuple, list)): |
|---|
| 859 |
raise TypeError("Arguments must be a tuple or list") |
|---|
| 860 |
|
|---|
| 861 |
outputArgs = [] |
|---|
| 862 |
for arg in args: |
|---|
| 863 |
arg = argChecker(arg) |
|---|
| 864 |
if arg is None: |
|---|
| 865 |
raise TypeError("Arguments contain a non-string value") |
|---|
| 866 |
else: |
|---|
| 867 |
outputArgs.append(arg) |
|---|
| 868 |
|
|---|
| 869 |
outputEnv = None |
|---|
| 870 |
if env is not None: |
|---|
| 871 |
outputEnv = {} |
|---|
| 872 |
for key, val in env.iteritems(): |
|---|
| 873 |
key = argChecker(key) |
|---|
| 874 |
if key is None: |
|---|
| 875 |
raise TypeError("Environment contains a non-string key") |
|---|
| 876 |
val = argChecker(val) |
|---|
| 877 |
if val is None: |
|---|
| 878 |
raise TypeError("Environment contains a non-string value") |
|---|
| 879 |
outputEnv[key] = val |
|---|
| 880 |
return outputArgs, outputEnv |
|---|
| 881 |
|
|---|
| 882 |
|
|---|
| 883 |
if platform.supportsThreads(): |
|---|
| 884 |
threadpool = None |
|---|
| 885 |
|
|---|
| 886 |
_threadpoolStartupID = None |
|---|
| 887 |
|
|---|
| 888 |
threadpoolShutdownID = None |
|---|
| 889 |
|
|---|
| 890 |
def _initThreads(self): |
|---|
| 891 |
self.usingThreads = True |
|---|
| 892 |
self.resolver = ThreadedResolver(self) |
|---|
| 893 |
self.installWaker() |
|---|
| 894 |
|
|---|
| 895 |
def callFromThread(self, f, *args, **kw): |
|---|
| 896 |
""" |
|---|
| 897 |
See L{twisted.internet.interfaces.IReactorThreads.callFromThread}. |
|---|
| 898 |
""" |
|---|
| 899 |
assert callable(f), "%s is not callable" % (f,) |
|---|
| 900 |
|
|---|
| 901 |
|
|---|
| 902 |
|
|---|
| 903 |
self.threadCallQueue.append((f, args, kw)) |
|---|
| 904 |
self.wakeUp() |
|---|
| 905 |
|
|---|
| 906 |
def _initThreadPool(self): |
|---|
| 907 |
""" |
|---|
| 908 |
Create the threadpool accessible with callFromThread. |
|---|
| 909 |
""" |
|---|
| 910 |
from twisted.python import threadpool |
|---|
| 911 |
self.threadpool = threadpool.ThreadPool( |
|---|
| 912 |
0, 10, 'twisted.internet.reactor') |
|---|
| 913 |
self._threadpoolStartupID = self.callWhenRunning( |
|---|
| 914 |
self.threadpool.start) |
|---|
| 915 |
self.threadpoolShutdownID = self.addSystemEventTrigger( |
|---|
| 916 |
'during', 'shutdown', self._stopThreadPool) |
|---|
| 917 |
|
|---|
| 918 |
def _stopThreadPool(self): |
|---|
| 919 |
""" |
|---|
| 920 |
Stop the reactor threadpool. This method is only valid if there |
|---|
| 921 |
is currently a threadpool (created by L{_initThreadPool}). It |
|---|
| 922 |
is not intended to be called directly; instead, it will be |
|---|
| 923 |
called by a shutdown trigger created in L{_initThreadPool}. |
|---|
| 924 |
""" |
|---|
| 925 |
triggers = [self._threadpoolStartupID, self.threadpoolShutdownID] |
|---|
| 926 |
for trigger in filter(None, triggers): |
|---|
| 927 |
try: |
|---|
| 928 |
self.removeSystemEventTrigger(trigger) |
|---|
| 929 |
except ValueError: |
|---|
| 930 |
pass |
|---|
| 931 |
self._threadpoolStartupID = None |
|---|
| 932 |
self.threadpoolShutdownID = None |
|---|
| 933 |
self.threadpool.stop() |
|---|
| 934 |
self.threadpool = None |
|---|
| 935 |
|
|---|
| 936 |
|
|---|
| 937 |
def getThreadPool(self): |
|---|
| 938 |
""" |
|---|
| 939 |
See L{twisted.internet.interfaces.IReactorThreads.getThreadPool}. |
|---|
| 940 |
""" |
|---|
| 941 |
if self.threadpool is None: |
|---|
| 942 |
self._initThreadPool() |
|---|
| 943 |
return self.threadpool |
|---|
| 944 |
|
|---|
| 945 |
|
|---|
| 946 |
def callInThread(self, _callable, *args, **kwargs): |
|---|
| 947 |
""" |
|---|
| 948 |
See L{twisted.internet.interfaces.IReactorThreads.callInThread}. |
|---|
| 949 |
""" |
|---|
| 950 |
self.getThreadPool().callInThread(_callable, *args, **kwargs) |
|---|
| 951 |
|
|---|
| 952 |
def suggestThreadPoolSize(self, size): |
|---|
| 953 |
""" |
|---|
| 954 |
See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize}. |
|---|
| 955 |
""" |
|---|
| 956 |
self.getThreadPool().adjustPoolsize(maxthreads=size) |
|---|
| 957 |
else: |
|---|
| 958 |
|
|---|
| 959 |
def callFromThread(self, f, *args, **kw): |
|---|
| 960 |
assert callable(f), "%s is not callable" % (f,) |
|---|
| 961 |
|
|---|
| 962 |
self.threadCallQueue.append((f, args, kw)) |
|---|
| 963 |
|
|---|
| 964 |
if platform.supportsThreads(): |
|---|
| 965 |
classImplements(ReactorBase, IReactorThreads) |
|---|
| 966 |
|
|---|
| 967 |
|
|---|
| 968 |
class BaseConnector(styles.Ephemeral): |
|---|
| 969 |
"""Basic implementation of connector. |
|---|
| 970 |
|
|---|
| 971 |
State can be: "connecting", "connected", "disconnected" |
|---|
| 972 |
""" |
|---|
| 973 |
|
|---|
| 974 |
implements(IConnector) |
|---|
| 975 |
|
|---|
| 976 |
timeoutID = None |
|---|
| 977 |
factoryStarted = 0 |
|---|
| 978 |
|
|---|
| 979 |
def __init__(self, factory, timeout, reactor): |
|---|
| 980 |
self.state = "disconnected" |
|---|
| 981 |
self.reactor = reactor |
|---|
| 982 |
self.factory = factory |
|---|
| 983 |
self.timeout = timeout |
|---|
| 984 |
|
|---|
| 985 |
def disconnect(self): |
|---|
| 986 |
"""Disconnect whatever our state is.""" |
|---|
| 987 |
if self.state == 'connecting': |
|---|
| 988 |
self.stopConnecting() |
|---|
| 989 |
elif self.state == 'connected': |
|---|
| 990 |
self.transport.loseConnection() |
|---|
| 991 |
|
|---|
| 992 |
def connect(self): |
|---|
| 993 |
"""Start connection to remote server.""" |
|---|
| 994 |
if self.state != "disconnected": |
|---|
| 995 |
raise RuntimeError, "can't connect in this state" |
|---|
| 996 |
|
|---|
| 997 |
self.state = "connecting" |
|---|
| 998 |
if not self.factoryStarted: |
|---|
| 999 |
self.factory.doStart() |
|---|
| 1000 |
self.factoryStarted = 1 |
|---|
| 1001 |
self.transport = transport = self._makeTransport() |
|---|
| 1002 |
if self.timeout is not None: |
|---|
| 1003 |
self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError()) |
|---|
| 1004 |
self.factory.startedConnecting(self) |
|---|
| 1005 |
|
|---|
| 1006 |
def stopConnecting(self): |
|---|
| 1007 |
"""Stop attempting to connect.""" |
|---|
| 1008 |
if self.state != "connecting": |
|---|
| 1009 |
raise error.NotConnectingError, "we're not trying to connect" |
|---|
| 1010 |
|
|---|
| 1011 |
self.state = "disconnected" |
|---|
| 1012 |
self.transport.failIfNotConnected(error.UserError()) |
|---|
| 1013 |
del self.transport |
|---|
| 1014 |
|
|---|
| 1015 |
def cancelTimeout(self): |
|---|
| 1016 |
if self.timeoutID is not None: |
|---|
| 1017 |
try: |
|---|
| 1018 |
self.timeoutID.cancel() |
|---|
| 1019 |
except ValueError: |
|---|
| 1020 |
pass |
|---|
| 1021 |
del self.timeoutID |
|---|
| 1022 |
|
|---|
| 1023 |
def buildProtocol(self, addr): |
|---|
| 1024 |
self.state = "connected" |
|---|
| 1025 |
self.cancelTimeout() |
|---|
| 1026 |
return self.factory.buildProtocol(addr) |
|---|
| 1027 |
|
|---|
| 1028 |
def connectionFailed(self, reason): |
|---|
| 1029 |
self.cancelTimeout() |
|---|
| 1030 |
self.transport = None |
|---|
| 1031 |
self.state = "disconnected" |
|---|
| 1032 |
self.factory.clientConnectionFailed(self, reason) |
|---|
| 1033 |
if self.state == "disconnected": |
|---|
| 1034 |
|
|---|
| 1035 |
self.factory.doStop() |
|---|
| 1036 |
self.factoryStarted = 0 |
|---|
| 1037 |
|
|---|
| 1038 |
def connectionLost(self, reason): |
|---|
| 1039 |
self.state = "disconnected" |
|---|
| 1040 |
self.factory.clientConnectionLost(self, reason) |
|---|
| 1041 |
if self.state == "disconnected": |
|---|
| 1042 |
|
|---|
| 1043 |
self.factory.doStop() |
|---|
| 1044 |
self.factoryStarted = 0 |
|---|
| 1045 |
|
|---|
| 1046 |
def getDestination(self): |
|---|
| 1047 |
raise NotImplementedError( |
|---|
| 1048 |
reflect.qual(self.__class__) + " did not implement " |
|---|
| 1049 |
"getDestination") |
|---|
| 1050 |
|
|---|
| 1051 |
|
|---|
| 1052 |
|
|---|
| 1053 |
class BasePort(abstract.FileDescriptor): |
|---|
| 1054 |
"""Basic implementation of a ListeningPort. |
|---|
| 1055 |
|
|---|
| 1056 |
Note: This does not actually implement IListeningPort. |
|---|
| 1057 |
""" |
|---|
| 1058 |
|
|---|
| 1059 |
addressFamily = None |
|---|
| 1060 |
socketType = None |
|---|
| 1061 |
|
|---|
| 1062 |
def createInternetSocket(self): |
|---|
| 1063 |
s = socket.socket(self.addressFamily, self.socketType) |
|---|
| 1064 |
s.setblocking(0) |
|---|
| 1065 |
fdesc._setCloseOnExec(s.fileno()) |
|---|
| 1066 |
return s |
|---|
| 1067 |
|
|---|
| 1068 |
|
|---|
| 1069 |
def doWrite(self): |
|---|
| 1070 |
"""Raises a RuntimeError""" |
|---|
| 1071 |
raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class__) |
|---|
| 1072 |
|
|---|
| 1073 |
|
|---|
| 1074 |
|
|---|
| 1075 |
class _SignalReactorMixin: |
|---|
| 1076 |
""" |
|---|
| 1077 |
Private mixin to manage signals: it installs signal handlers at start time, |
|---|
| 1078 |
and define run method. |
|---|
| 1079 |
|
|---|
| 1080 |
It can only be used mixed in with L{ReactorBase}, and has to be defined |
|---|
| 1081 |
first in the inheritance (so that method resolution order finds |
|---|
| 1082 |
startRunning first). |
|---|
| 1083 |
|
|---|
| 1084 |
@type _installSignalHandlers: C{bool} |
|---|
| 1085 |
@ivar _installSignalHandlers: A flag which indicates whether any signal |
|---|
| 1086 |
handlers will be installed during startup. This includes handlers for |
|---|
| 1087 |
SIGCHLD to monitor child processes, and SIGINT, SIGTERM, and SIGBREAK |
|---|
| 1088 |
to stop the reactor. |
|---|
| 1089 |
""" |
|---|
| 1090 |
|
|---|
| 1091 |
_installSignalHandlers = False |
|---|
| 1092 |
|
|---|
| 1093 |
def _handleSignals(self): |
|---|
| 1094 |
""" |
|---|
| 1095 |
Install the signal handlers for the Twisted event loop. |
|---|
| 1096 |
""" |
|---|
| 1097 |
try: |
|---|
| 1098 |
import signal |
|---|
| 1099 |
except ImportError: |
|---|
| 1100 |
log.msg("Warning: signal module unavailable -- " |
|---|
| 1101 |
"not installing signal handlers.") |
|---|
| 1102 |
return |
|---|
| 1103 |
|
|---|
| 1104 |
if signal.getsignal(signal.SIGINT) == signal.default_int_handler: |
|---|
| 1105 |
|
|---|
| 1106 |
signal.signal(signal.SIGINT, self.sigInt) |
|---|
| 1107 |
signal.signal(signal.SIGTERM, self.sigTerm) |
|---|
| 1108 |
|
|---|
| 1109 |
|
|---|
| 1110 |
if hasattr(signal, "SIGBREAK"): |
|---|
| 1111 |
signal.signal(signal.SIGBREAK, self.sigBreak) |
|---|
| 1112 |
|
|---|
| 1113 |
if platformType == 'posix': |
|---|
| 1114 |
signal.signal(signal.SIGCHLD, self._handleSigchld) |
|---|
| 1115 |
|
|---|
| 1116 |
|
|---|
| 1117 |
def _handleSigchld(self, signum, frame, _threadSupport=platform.supportsThreads()): |
|---|
| 1118 |
""" |
|---|
| 1119 |
Reap all processes on SIGCHLD. |
|---|
| 1120 |
|
|---|
| 1121 |
This gets called on SIGCHLD. We do no processing inside a signal |
|---|
| 1122 |
handler, as the calls we make here could occur between any two |
|---|
| 1123 |
python bytecode instructions. Deferring processing to the next |
|---|
| 1124 |
eventloop round prevents us from violating the state constraints |
|---|
| 1125 |
of arbitrary classes. |
|---|
| 1126 |
""" |
|---|
| 1127 |
from twisted.internet.process import reapAllProcesses |
|---|
| 1128 |
if _threadSupport: |
|---|
| 1129 |
self.callFromThread(reapAllProcesses) |
|---|
| 1130 |
else: |
|---|
| 1131 |
self.callLater(0, reapAllProcesses) |
|---|
| 1132 |
|
|---|
| 1133 |
|
|---|
| 1134 |
def startRunning(self, installSignalHandlers=True): |
|---|
| 1135 |
""" |
|---|
| 1136 |
Extend the base implementation in order to remember whether signal |
|---|
| 1137 |
handlers should be installed later. |
|---|
| 1138 |
|
|---|
| 1139 |
@type installSignalHandlers: C{bool} |
|---|
| 1140 |
@param installSignalHandlers: A flag which, if set, indicates that |
|---|
| 1141 |
handlers for a number of (implementation-defined) signals should be |
|---|
| 1142 |
installed during startup. |
|---|
| 1143 |
""" |
|---|
| 1144 |
self._installSignalHandlers = installSignalHandlers |
|---|
| 1145 |
ReactorBase.startRunning(self) |
|---|
| 1146 |
|
|---|
| 1147 |
|
|---|
| 1148 |
def _reallyStartRunning(self): |
|---|
| 1149 |
""" |
|---|
| 1150 |
Extend the base implementation by also installing signal handlers, if |
|---|
| 1151 |
C{self._installSignalHandlers} is true. |
|---|
| 1152 |
""" |
|---|
| 1153 |
ReactorBase._reallyStartRunning(self) |
|---|
| 1154 |
if self._installSignalHandlers: |
|---|
| 1155 |
|
|---|
| 1156 |
|
|---|
| 1157 |
|
|---|
| 1158 |
|
|---|
| 1159 |
|
|---|
| 1160 |
|
|---|
| 1161 |
self._handleSignals() |
|---|
| 1162 |
|
|---|
| 1163 |
|
|---|
| 1164 |
def run(self, installSignalHandlers=True): |
|---|
| 1165 |
self.startRunning(installSignalHandlers=installSignalHandlers) |
|---|
| 1166 |
self.mainLoop() |
|---|
| 1167 |
|
|---|
| 1168 |
|
|---|
| 1169 |
def mainLoop(self): |
|---|
| 1170 |
while self._started: |
|---|
| 1171 |
try: |
|---|
| 1172 |
while self._started: |
|---|
| 1173 |
|
|---|
| 1174 |
|
|---|
| 1175 |
self.runUntilCurrent() |
|---|
| 1176 |
t2 = self.timeout() |
|---|
| 1177 |
t = self.running and t2 |
|---|
| 1178 |
self.doIteration(t) |
|---|
| 1179 |
except: |
|---|
| 1180 |
log.msg("Unexpected error in main loop.") |
|---|
| 1181 |
log.err() |
|---|
| 1182 |
else: |
|---|
| 1183 |
log.msg('Main loop terminated.') |
|---|
| 1184 |
|
|---|
| 1185 |
|
|---|
| 1186 |
|
|---|
| 1187 |
__all__ = [] |
|---|