| 1 | # -*- test-case-name: twisted.application.test.test_internet,twisted.test.test_application,twisted.test.test_cooperator -*- |
|---|
| 2 | # Copyright (c) Twisted Matrix Laboratories. |
|---|
| 3 | # See LICENSE for details. |
|---|
| 4 | |
|---|
| 5 | """ |
|---|
| 6 | Reactor-based Services |
|---|
| 7 | |
|---|
| 8 | Here are services to run clients, servers and periodic services using |
|---|
| 9 | the reactor. |
|---|
| 10 | |
|---|
| 11 | If you want to run a server service, L{StreamServerEndpointService} defines a |
|---|
| 12 | service that can wrap an arbitrary L{IStreamServerEndpoint |
|---|
| 13 | <twisted.internet.interfaces.IStreamServerEndpoint>} |
|---|
| 14 | as an L{IService}. See also L{twisted.application.strports.service} for |
|---|
| 15 | constructing one of these directly from a descriptive string. |
|---|
| 16 | |
|---|
| 17 | Additionally, this module (dynamically) defines various Service subclasses that |
|---|
| 18 | let you represent clients and servers in a Service hierarchy. Endpoints APIs |
|---|
| 19 | should be preferred for stream server services, but since those APIs do not yet |
|---|
| 20 | exist for clients or datagram services, many of these are still useful. |
|---|
| 21 | |
|---|
| 22 | They are as follows:: |
|---|
| 23 | |
|---|
| 24 | TCPServer, TCPClient, |
|---|
| 25 | UNIXServer, UNIXClient, |
|---|
| 26 | SSLServer, SSLClient, |
|---|
| 27 | UDPServer, UDPClient, |
|---|
| 28 | UNIXDatagramServer, UNIXDatagramClient, |
|---|
| 29 | MulticastServer |
|---|
| 30 | |
|---|
| 31 | These classes take arbitrary arguments in their constructors and pass |
|---|
| 32 | them straight on to their respective reactor.listenXXX or |
|---|
| 33 | reactor.connectXXX calls. |
|---|
| 34 | |
|---|
| 35 | For example, the following service starts a web server on port 8080: |
|---|
| 36 | C{TCPServer(8080, server.Site(r))}. See the documentation for the |
|---|
| 37 | reactor.listen/connect* methods for more information. |
|---|
| 38 | """ |
|---|
| 39 | |
|---|
| 40 | import warnings |
|---|
| 41 | |
|---|
| 42 | from twisted.python import log |
|---|
| 43 | from twisted.application import service |
|---|
| 44 | from twisted.internet import task |
|---|
| 45 | |
|---|
| 46 | from twisted.internet.defer import CancelledError |
|---|
| 47 | |
|---|
| 48 | |
|---|
| 49 | def _maybeGlobalReactor(maybeReactor): |
|---|
| 50 | """ |
|---|
| 51 | @return: the argument, or the global reactor if the argument is C{None}. |
|---|
| 52 | """ |
|---|
| 53 | if maybeReactor is None: |
|---|
| 54 | from twisted.internet import reactor |
|---|
| 55 | return reactor |
|---|
| 56 | else: |
|---|
| 57 | return maybeReactor |
|---|
| 58 | |
|---|
| 59 | |
|---|
| 60 | class _VolatileDataService(service.Service): |
|---|
| 61 | |
|---|
| 62 | volatile = [] |
|---|
| 63 | |
|---|
| 64 | def __getstate__(self): |
|---|
| 65 | d = service.Service.__getstate__(self) |
|---|
| 66 | for attr in self.volatile: |
|---|
| 67 | if attr in d: |
|---|
| 68 | del d[attr] |
|---|
| 69 | return d |
|---|
| 70 | |
|---|
| 71 | |
|---|
| 72 | |
|---|
| 73 | class _AbstractServer(_VolatileDataService): |
|---|
| 74 | """ |
|---|
| 75 | @cvar volatile: list of attribute to remove from pickling. |
|---|
| 76 | @type volatile: C{list} |
|---|
| 77 | |
|---|
| 78 | @ivar method: the type of method to call on the reactor, one of B{TCP}, |
|---|
| 79 | B{UDP}, B{SSL} or B{UNIX}. |
|---|
| 80 | @type method: C{str} |
|---|
| 81 | |
|---|
| 82 | @ivar reactor: the current running reactor. |
|---|
| 83 | @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP}, |
|---|
| 84 | C{IReactorSSL} or C{IReactorUnix}. |
|---|
| 85 | |
|---|
| 86 | @ivar _port: instance of port set when the service is started. |
|---|
| 87 | @type _port: a provider of L{twisted.internet.interfaces.IListeningPort}. |
|---|
| 88 | """ |
|---|
| 89 | |
|---|
| 90 | volatile = ['_port'] |
|---|
| 91 | method = None |
|---|
| 92 | reactor = None |
|---|
| 93 | |
|---|
| 94 | _port = None |
|---|
| 95 | |
|---|
| 96 | def __init__(self, *args, **kwargs): |
|---|
| 97 | self.args = args |
|---|
| 98 | if 'reactor' in kwargs: |
|---|
| 99 | self.reactor = kwargs.pop("reactor") |
|---|
| 100 | self.kwargs = kwargs |
|---|
| 101 | |
|---|
| 102 | |
|---|
| 103 | def privilegedStartService(self): |
|---|
| 104 | service.Service.privilegedStartService(self) |
|---|
| 105 | self._port = self._getPort() |
|---|
| 106 | |
|---|
| 107 | |
|---|
| 108 | def startService(self): |
|---|
| 109 | service.Service.startService(self) |
|---|
| 110 | if self._port is None: |
|---|
| 111 | self._port = self._getPort() |
|---|
| 112 | |
|---|
| 113 | |
|---|
| 114 | def stopService(self): |
|---|
| 115 | service.Service.stopService(self) |
|---|
| 116 | # TODO: if startup failed, should shutdown skip stopListening? |
|---|
| 117 | # _port won't exist |
|---|
| 118 | if self._port is not None: |
|---|
| 119 | d = self._port.stopListening() |
|---|
| 120 | del self._port |
|---|
| 121 | return d |
|---|
| 122 | |
|---|
| 123 | |
|---|
| 124 | def _getPort(self): |
|---|
| 125 | """ |
|---|
| 126 | Wrapper around the appropriate listen method of the reactor. |
|---|
| 127 | |
|---|
| 128 | @return: the port object returned by the listen method. |
|---|
| 129 | @rtype: an object providing |
|---|
| 130 | L{twisted.internet.interfaces.IListeningPort}. |
|---|
| 131 | """ |
|---|
| 132 | return getattr(_maybeGlobalReactor(self.reactor), |
|---|
| 133 | 'listen%s' % (self.method,))(*self.args, **self.kwargs) |
|---|
| 134 | |
|---|
| 135 | |
|---|
| 136 | |
|---|
| 137 | class _AbstractClient(_VolatileDataService): |
|---|
| 138 | """ |
|---|
| 139 | @cvar volatile: list of attribute to remove from pickling. |
|---|
| 140 | @type volatile: C{list} |
|---|
| 141 | |
|---|
| 142 | @ivar method: the type of method to call on the reactor, one of B{TCP}, |
|---|
| 143 | B{UDP}, B{SSL} or B{UNIX}. |
|---|
| 144 | @type method: C{str} |
|---|
| 145 | |
|---|
| 146 | @ivar reactor: the current running reactor. |
|---|
| 147 | @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP}, |
|---|
| 148 | C{IReactorSSL} or C{IReactorUnix}. |
|---|
| 149 | |
|---|
| 150 | @ivar _connection: instance of connection set when the service is started. |
|---|
| 151 | @type _connection: a provider of L{twisted.internet.interfaces.IConnector}. |
|---|
| 152 | """ |
|---|
| 153 | volatile = ['_connection'] |
|---|
| 154 | method = None |
|---|
| 155 | reactor = None |
|---|
| 156 | |
|---|
| 157 | _connection = None |
|---|
| 158 | |
|---|
| 159 | def __init__(self, *args, **kwargs): |
|---|
| 160 | self.args = args |
|---|
| 161 | if 'reactor' in kwargs: |
|---|
| 162 | self.reactor = kwargs.pop("reactor") |
|---|
| 163 | self.kwargs = kwargs |
|---|
| 164 | |
|---|
| 165 | |
|---|
| 166 | def startService(self): |
|---|
| 167 | service.Service.startService(self) |
|---|
| 168 | self._connection = self._getConnection() |
|---|
| 169 | |
|---|
| 170 | |
|---|
| 171 | def stopService(self): |
|---|
| 172 | service.Service.stopService(self) |
|---|
| 173 | if self._connection is not None: |
|---|
| 174 | self._connection.disconnect() |
|---|
| 175 | del self._connection |
|---|
| 176 | |
|---|
| 177 | |
|---|
| 178 | def _getConnection(self): |
|---|
| 179 | """ |
|---|
| 180 | Wrapper around the appropriate connect method of the reactor. |
|---|
| 181 | |
|---|
| 182 | @return: the port object returned by the connect method. |
|---|
| 183 | @rtype: an object providing L{twisted.internet.interfaces.IConnector}. |
|---|
| 184 | """ |
|---|
| 185 | return getattr(_maybeGlobalReactor(self.reactor), |
|---|
| 186 | 'connect%s' % (self.method,))(*self.args, **self.kwargs) |
|---|
| 187 | |
|---|
| 188 | |
|---|
| 189 | |
|---|
| 190 | _doc={ |
|---|
| 191 | 'Client': |
|---|
| 192 | """Connect to %(tran)s |
|---|
| 193 | |
|---|
| 194 | Call reactor.connect%(method)s when the service starts, with the |
|---|
| 195 | arguments given to the constructor. |
|---|
| 196 | """, |
|---|
| 197 | 'Server': |
|---|
| 198 | """Serve %(tran)s clients |
|---|
| 199 | |
|---|
| 200 | Call reactor.listen%(method)s when the service starts, with the |
|---|
| 201 | arguments given to the constructor. When the service stops, |
|---|
| 202 | stop listening. See twisted.internet.interfaces for documentation |
|---|
| 203 | on arguments to the reactor method. |
|---|
| 204 | """, |
|---|
| 205 | } |
|---|
| 206 | |
|---|
| 207 | import types |
|---|
| 208 | for tran in 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split(): |
|---|
| 209 | for side in 'Server Client'.split(): |
|---|
| 210 | if tran == "Multicast" and side == "Client": |
|---|
| 211 | continue |
|---|
| 212 | base = globals()['_Abstract'+side] |
|---|
| 213 | method = {'Generic': 'With'}.get(tran, tran) |
|---|
| 214 | doc = _doc[side]%vars() |
|---|
| 215 | klass = types.ClassType(tran+side, (base,), |
|---|
| 216 | {'method': method, '__doc__': doc}) |
|---|
| 217 | globals()[tran+side] = klass |
|---|
| 218 | |
|---|
| 219 | |
|---|
| 220 | |
|---|
| 221 | class GenericServer(_AbstractServer): |
|---|
| 222 | """ |
|---|
| 223 | Serve Generic clients |
|---|
| 224 | |
|---|
| 225 | Call reactor.listenWith when the service starts, with the arguments given to |
|---|
| 226 | the constructor. When the service stops, stop listening. See |
|---|
| 227 | twisted.internet.interfaces for documentation on arguments to the reactor |
|---|
| 228 | method. |
|---|
| 229 | |
|---|
| 230 | This service is deprecated (because reactor.listenWith is deprecated). |
|---|
| 231 | """ |
|---|
| 232 | method = 'With' |
|---|
| 233 | |
|---|
| 234 | def __init__(self, *args, **kwargs): |
|---|
| 235 | warnings.warn( |
|---|
| 236 | 'GenericServer was deprecated in Twisted 10.1.', |
|---|
| 237 | category=DeprecationWarning, |
|---|
| 238 | stacklevel=2) |
|---|
| 239 | _AbstractServer.__init__(self, *args, **kwargs) |
|---|
| 240 | |
|---|
| 241 | |
|---|
| 242 | |
|---|
| 243 | class GenericClient(_AbstractClient): |
|---|
| 244 | """ |
|---|
| 245 | Connect to Generic. |
|---|
| 246 | |
|---|
| 247 | Call reactor.connectWith when the service starts, with the arguments given |
|---|
| 248 | to the constructor. |
|---|
| 249 | |
|---|
| 250 | This service is deprecated (because reactor.connectWith is deprecated). |
|---|
| 251 | """ |
|---|
| 252 | method = 'With' |
|---|
| 253 | |
|---|
| 254 | def __init__(self, *args, **kwargs): |
|---|
| 255 | warnings.warn( |
|---|
| 256 | 'GenericClient was deprecated in Twisted 10.1.', |
|---|
| 257 | category=DeprecationWarning, |
|---|
| 258 | stacklevel=2) |
|---|
| 259 | _AbstractClient.__init__(self, *args, **kwargs) |
|---|
| 260 | |
|---|
| 261 | |
|---|
| 262 | |
|---|
| 263 | class TimerService(_VolatileDataService): |
|---|
| 264 | |
|---|
| 265 | """Service to periodically call a function |
|---|
| 266 | |
|---|
| 267 | Every C{step} seconds call the given function with the given arguments. |
|---|
| 268 | The service starts the calls when it starts, and cancels them |
|---|
| 269 | when it stops. |
|---|
| 270 | """ |
|---|
| 271 | |
|---|
| 272 | volatile = ['_loop'] |
|---|
| 273 | |
|---|
| 274 | def __init__(self, step, callable, *args, **kwargs): |
|---|
| 275 | self.step = step |
|---|
| 276 | self.call = (callable, args, kwargs) |
|---|
| 277 | |
|---|
| 278 | def startService(self): |
|---|
| 279 | service.Service.startService(self) |
|---|
| 280 | callable, args, kwargs = self.call |
|---|
| 281 | # we have to make a new LoopingCall each time we're started, because |
|---|
| 282 | # an active LoopingCall remains active when serialized. If |
|---|
| 283 | # LoopingCall were a _VolatileDataService, we wouldn't need to do |
|---|
| 284 | # this. |
|---|
| 285 | self._loop = task.LoopingCall(callable, *args, **kwargs) |
|---|
| 286 | self._loop.start(self.step, now=True).addErrback(self._failed) |
|---|
| 287 | |
|---|
| 288 | def _failed(self, why): |
|---|
| 289 | # make a note that the LoopingCall is no longer looping, so we don't |
|---|
| 290 | # try to shut it down a second time in stopService. I think this |
|---|
| 291 | # should be in LoopingCall. -warner |
|---|
| 292 | self._loop.running = False |
|---|
| 293 | log.err(why) |
|---|
| 294 | |
|---|
| 295 | def stopService(self): |
|---|
| 296 | if self._loop.running: |
|---|
| 297 | self._loop.stop() |
|---|
| 298 | return service.Service.stopService(self) |
|---|
| 299 | |
|---|
| 300 | |
|---|
| 301 | |
|---|
| 302 | class CooperatorService(service.Service): |
|---|
| 303 | """ |
|---|
| 304 | Simple L{service.IService} which starts and stops a L{twisted.internet.task.Cooperator}. |
|---|
| 305 | """ |
|---|
| 306 | def __init__(self): |
|---|
| 307 | self.coop = task.Cooperator(started=False) |
|---|
| 308 | |
|---|
| 309 | |
|---|
| 310 | def coiterate(self, iterator): |
|---|
| 311 | return self.coop.coiterate(iterator) |
|---|
| 312 | |
|---|
| 313 | |
|---|
| 314 | def startService(self): |
|---|
| 315 | self.coop.start() |
|---|
| 316 | |
|---|
| 317 | |
|---|
| 318 | def stopService(self): |
|---|
| 319 | self.coop.stop() |
|---|
| 320 | |
|---|
| 321 | |
|---|
| 322 | |
|---|
| 323 | class StreamServerEndpointService(service.Service, object): |
|---|
| 324 | """ |
|---|
| 325 | A L{StreamServerEndpointService} is an L{IService} which runs a server on a |
|---|
| 326 | listening port described by an L{IStreamServerEndpoint |
|---|
| 327 | <twisted.internet.interfaces.IStreamServerEndpoint>}. |
|---|
| 328 | |
|---|
| 329 | @ivar factory: A server factory which will be used to listen on the |
|---|
| 330 | endpoint. |
|---|
| 331 | |
|---|
| 332 | @ivar endpoint: An L{IStreamServerEndpoint |
|---|
| 333 | <twisted.internet.interfaces.IStreamServerEndpoint>} provider |
|---|
| 334 | which will be used to listen when the service starts. |
|---|
| 335 | |
|---|
| 336 | @ivar _waitingForPort: a Deferred, if C{listen} has yet been invoked on the |
|---|
| 337 | endpoint, otherwise None. |
|---|
| 338 | |
|---|
| 339 | @ivar _raiseSynchronously: Defines error-handling behavior for the case |
|---|
| 340 | where C{listen(...)} raises an exception before C{startService} or |
|---|
| 341 | C{privilegedStartService} have completed. |
|---|
| 342 | |
|---|
| 343 | @type _raiseSynchronously: C{bool} |
|---|
| 344 | |
|---|
| 345 | @since: 10.2 |
|---|
| 346 | """ |
|---|
| 347 | |
|---|
| 348 | _raiseSynchronously = None |
|---|
| 349 | |
|---|
| 350 | def __init__(self, endpoint, factory): |
|---|
| 351 | self.endpoint = endpoint |
|---|
| 352 | self.factory = factory |
|---|
| 353 | self._waitingForPort = None |
|---|
| 354 | |
|---|
| 355 | |
|---|
| 356 | def privilegedStartService(self): |
|---|
| 357 | """ |
|---|
| 358 | Start listening on the endpoint. |
|---|
| 359 | """ |
|---|
| 360 | service.Service.privilegedStartService(self) |
|---|
| 361 | self._waitingForPort = self.endpoint.listen(self.factory) |
|---|
| 362 | raisedNow = [] |
|---|
| 363 | def handleIt(err): |
|---|
| 364 | if self._raiseSynchronously: |
|---|
| 365 | raisedNow.append(err) |
|---|
| 366 | elif not err.check(CancelledError): |
|---|
| 367 | log.err(err) |
|---|
| 368 | self._waitingForPort.addErrback(handleIt) |
|---|
| 369 | if raisedNow: |
|---|
| 370 | raisedNow[0].raiseException() |
|---|
| 371 | |
|---|
| 372 | |
|---|
| 373 | def startService(self): |
|---|
| 374 | """ |
|---|
| 375 | Start listening on the endpoint, unless L{privilegedStartService} got |
|---|
| 376 | around to it already. |
|---|
| 377 | """ |
|---|
| 378 | service.Service.startService(self) |
|---|
| 379 | if self._waitingForPort is None: |
|---|
| 380 | self.privilegedStartService() |
|---|
| 381 | |
|---|
| 382 | |
|---|
| 383 | def stopService(self): |
|---|
| 384 | """ |
|---|
| 385 | Stop listening on the port if it is already listening, otherwise, |
|---|
| 386 | cancel the attempt to listen. |
|---|
| 387 | |
|---|
| 388 | @return: a L{Deferred<twisted.internet.defer.Deferred>} which fires |
|---|
| 389 | with C{None} when the port has stopped listening. |
|---|
| 390 | """ |
|---|
| 391 | self._waitingForPort.cancel() |
|---|
| 392 | def stopIt(port): |
|---|
| 393 | if port is not None: |
|---|
| 394 | return port.stopListening() |
|---|
| 395 | d = self._waitingForPort.addCallback(stopIt) |
|---|
| 396 | def stop(passthrough): |
|---|
| 397 | self.running = False |
|---|
| 398 | return passthrough |
|---|
| 399 | d.addBoth(stop) |
|---|
| 400 | return d |
|---|
| 401 | |
|---|
| 402 | |
|---|
| 403 | |
|---|
| 404 | __all__ = (['TimerService', 'CooperatorService', 'MulticastServer', |
|---|
| 405 | 'StreamServerEndpointService'] + |
|---|
| 406 | [tran+side |
|---|
| 407 | for tran in 'Generic TCP UNIX SSL UDP UNIXDatagram'.split() |
|---|
| 408 | for side in 'Server Client'.split()]) |
|---|