root/trunk/twisted/application/internet.py

Revision 32527, 12.0 KB (checked in by thijs, 8 months ago)

Merge pydoctor-errors-5244: Fixed incorrect epytext markup in docstrings.

Author: magmatt, thijs
Reviewer: exarkun
Fixes: #5244

Line 
1# -*- test-case-name: twisted.application.test.test_internet,twisted.test.test_application,twisted.test.test_cooperator -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Reactor-based Services
7
8Here are services to run clients, servers and periodic services using
9the reactor.
10
11If you want to run a server service, L{StreamServerEndpointService} defines a
12service that can wrap an arbitrary L{IStreamServerEndpoint
13<twisted.internet.interfaces.IStreamServerEndpoint>}
14as an L{IService}. See also L{twisted.application.strports.service} for
15constructing one of these directly from a descriptive string.
16
17Additionally, this module (dynamically) defines various Service subclasses that
18let you represent clients and servers in a Service hierarchy.  Endpoints APIs
19should be preferred for stream server services, but since those APIs do not yet
20exist for clients or datagram services, many of these are still useful.
21
22They are as follows::
23
24  TCPServer, TCPClient,
25  UNIXServer, UNIXClient,
26  SSLServer, SSLClient,
27  UDPServer, UDPClient,
28  UNIXDatagramServer, UNIXDatagramClient,
29  MulticastServer
30
31These classes take arbitrary arguments in their constructors and pass
32them straight on to their respective reactor.listenXXX or
33reactor.connectXXX calls.
34
35For example, the following service starts a web server on port 8080:
36C{TCPServer(8080, server.Site(r))}.  See the documentation for the
37reactor.listen/connect* methods for more information.
38"""
39
40import warnings
41
42from twisted.python import log
43from twisted.application import service
44from twisted.internet import task
45
46from twisted.internet.defer import CancelledError
47
48
49def _maybeGlobalReactor(maybeReactor):
50    """
51    @return: the argument, or the global reactor if the argument is C{None}.
52    """
53    if maybeReactor is None:
54        from twisted.internet import reactor
55        return reactor
56    else:
57        return maybeReactor
58
59
60class _VolatileDataService(service.Service):
61
62    volatile = []
63
64    def __getstate__(self):
65        d = service.Service.__getstate__(self)
66        for attr in self.volatile:
67            if attr in d:
68                del d[attr]
69        return d
70
71
72
73class _AbstractServer(_VolatileDataService):
74    """
75    @cvar volatile: list of attribute to remove from pickling.
76    @type volatile: C{list}
77
78    @ivar method: the type of method to call on the reactor, one of B{TCP},
79        B{UDP}, B{SSL} or B{UNIX}.
80    @type method: C{str}
81
82    @ivar reactor: the current running reactor.
83    @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP},
84        C{IReactorSSL} or C{IReactorUnix}.
85
86    @ivar _port: instance of port set when the service is started.
87    @type _port: a provider of L{twisted.internet.interfaces.IListeningPort}.
88    """
89
90    volatile = ['_port']
91    method = None
92    reactor = None
93
94    _port = None
95
96    def __init__(self, *args, **kwargs):
97        self.args = args
98        if 'reactor' in kwargs:
99            self.reactor = kwargs.pop("reactor")
100        self.kwargs = kwargs
101
102
103    def privilegedStartService(self):
104        service.Service.privilegedStartService(self)
105        self._port = self._getPort()
106
107
108    def startService(self):
109        service.Service.startService(self)
110        if self._port is None:
111            self._port = self._getPort()
112
113
114    def stopService(self):
115        service.Service.stopService(self)
116        # TODO: if startup failed, should shutdown skip stopListening?
117        # _port won't exist
118        if self._port is not None:
119            d = self._port.stopListening()
120            del self._port
121            return d
122
123
124    def _getPort(self):
125        """
126        Wrapper around the appropriate listen method of the reactor.
127
128        @return: the port object returned by the listen method.
129        @rtype: an object providing
130            L{twisted.internet.interfaces.IListeningPort}.
131        """
132        return getattr(_maybeGlobalReactor(self.reactor),
133                       'listen%s' % (self.method,))(*self.args, **self.kwargs)
134
135
136
137class _AbstractClient(_VolatileDataService):
138    """
139    @cvar volatile: list of attribute to remove from pickling.
140    @type volatile: C{list}
141
142    @ivar method: the type of method to call on the reactor, one of B{TCP},
143        B{UDP}, B{SSL} or B{UNIX}.
144    @type method: C{str}
145
146    @ivar reactor: the current running reactor.
147    @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP},
148        C{IReactorSSL} or C{IReactorUnix}.
149
150    @ivar _connection: instance of connection set when the service is started.
151    @type _connection: a provider of L{twisted.internet.interfaces.IConnector}.
152    """
153    volatile = ['_connection']
154    method = None
155    reactor = None
156
157    _connection = None
158
159    def __init__(self, *args, **kwargs):
160        self.args = args
161        if 'reactor' in kwargs:
162            self.reactor = kwargs.pop("reactor")
163        self.kwargs = kwargs
164
165
166    def startService(self):
167        service.Service.startService(self)
168        self._connection = self._getConnection()
169
170
171    def stopService(self):
172        service.Service.stopService(self)
173        if self._connection is not None:
174            self._connection.disconnect()
175            del self._connection
176
177
178    def _getConnection(self):
179        """
180        Wrapper around the appropriate connect method of the reactor.
181
182        @return: the port object returned by the connect method.
183        @rtype: an object providing L{twisted.internet.interfaces.IConnector}.
184        """
185        return getattr(_maybeGlobalReactor(self.reactor),
186                       'connect%s' % (self.method,))(*self.args, **self.kwargs)
187
188
189
190_doc={
191'Client':
192"""Connect to %(tran)s
193
194Call reactor.connect%(method)s when the service starts, with the
195arguments given to the constructor.
196""",
197'Server':
198"""Serve %(tran)s clients
199
200Call reactor.listen%(method)s when the service starts, with the
201arguments given to the constructor. When the service stops,
202stop listening. See twisted.internet.interfaces for documentation
203on arguments to the reactor method.
204""",
205}
206
207import types
208for tran in 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split():
209    for side in 'Server Client'.split():
210        if tran == "Multicast" and side == "Client":
211            continue
212        base = globals()['_Abstract'+side]
213        method = {'Generic': 'With'}.get(tran, tran)
214        doc = _doc[side]%vars()
215        klass = types.ClassType(tran+side, (base,),
216                                {'method': method, '__doc__': doc})
217        globals()[tran+side] = klass
218
219
220
221class GenericServer(_AbstractServer):
222    """
223    Serve Generic clients
224
225    Call reactor.listenWith when the service starts, with the arguments given to
226    the constructor. When the service stops, stop listening. See
227    twisted.internet.interfaces for documentation on arguments to the reactor
228    method.
229
230    This service is deprecated (because reactor.listenWith is deprecated).
231    """
232    method = 'With'
233
234    def __init__(self, *args, **kwargs):
235        warnings.warn(
236            'GenericServer was deprecated in Twisted 10.1.',
237            category=DeprecationWarning,
238            stacklevel=2)
239        _AbstractServer.__init__(self, *args, **kwargs)
240
241
242
243class GenericClient(_AbstractClient):
244    """
245    Connect to Generic.
246
247    Call reactor.connectWith when the service starts, with the arguments given
248    to the constructor.
249
250    This service is deprecated (because reactor.connectWith is deprecated).
251    """
252    method = 'With'
253
254    def __init__(self, *args, **kwargs):
255        warnings.warn(
256            'GenericClient was deprecated in Twisted 10.1.',
257            category=DeprecationWarning,
258            stacklevel=2)
259        _AbstractClient.__init__(self, *args, **kwargs)
260
261
262
263class TimerService(_VolatileDataService):
264
265    """Service to periodically call a function
266
267    Every C{step} seconds call the given function with the given arguments.
268    The service starts the calls when it starts, and cancels them
269    when it stops.
270    """
271
272    volatile = ['_loop']
273
274    def __init__(self, step, callable, *args, **kwargs):
275        self.step = step
276        self.call = (callable, args, kwargs)
277
278    def startService(self):
279        service.Service.startService(self)
280        callable, args, kwargs = self.call
281        # we have to make a new LoopingCall each time we're started, because
282        # an active LoopingCall remains active when serialized. If
283        # LoopingCall were a _VolatileDataService, we wouldn't need to do
284        # this.
285        self._loop = task.LoopingCall(callable, *args, **kwargs)
286        self._loop.start(self.step, now=True).addErrback(self._failed)
287
288    def _failed(self, why):
289        # make a note that the LoopingCall is no longer looping, so we don't
290        # try to shut it down a second time in stopService. I think this
291        # should be in LoopingCall. -warner
292        self._loop.running = False
293        log.err(why)
294
295    def stopService(self):
296        if self._loop.running:
297            self._loop.stop()
298        return service.Service.stopService(self)
299
300
301
302class CooperatorService(service.Service):
303    """
304    Simple L{service.IService} which starts and stops a L{twisted.internet.task.Cooperator}.
305    """
306    def __init__(self):
307        self.coop = task.Cooperator(started=False)
308
309
310    def coiterate(self, iterator):
311        return self.coop.coiterate(iterator)
312
313
314    def startService(self):
315        self.coop.start()
316
317
318    def stopService(self):
319        self.coop.stop()
320
321
322
323class StreamServerEndpointService(service.Service, object):
324    """
325    A L{StreamServerEndpointService} is an L{IService} which runs a server on a
326    listening port described by an L{IStreamServerEndpoint
327    <twisted.internet.interfaces.IStreamServerEndpoint>}.
328
329    @ivar factory: A server factory which will be used to listen on the
330        endpoint.
331
332    @ivar endpoint: An L{IStreamServerEndpoint
333        <twisted.internet.interfaces.IStreamServerEndpoint>} provider
334        which will be used to listen when the service starts.
335
336    @ivar _waitingForPort: a Deferred, if C{listen} has yet been invoked on the
337        endpoint, otherwise None.
338
339    @ivar _raiseSynchronously: Defines error-handling behavior for the case
340        where C{listen(...)} raises an exception before C{startService} or
341        C{privilegedStartService} have completed.
342
343    @type _raiseSynchronously: C{bool}
344
345    @since: 10.2
346    """
347
348    _raiseSynchronously = None
349
350    def __init__(self, endpoint, factory):
351        self.endpoint = endpoint
352        self.factory = factory
353        self._waitingForPort = None
354
355
356    def privilegedStartService(self):
357        """
358        Start listening on the endpoint.
359        """
360        service.Service.privilegedStartService(self)
361        self._waitingForPort = self.endpoint.listen(self.factory)
362        raisedNow = []
363        def handleIt(err):
364            if self._raiseSynchronously:
365                raisedNow.append(err)
366            elif not err.check(CancelledError):
367                log.err(err)
368        self._waitingForPort.addErrback(handleIt)
369        if raisedNow:
370            raisedNow[0].raiseException()
371
372
373    def startService(self):
374        """
375        Start listening on the endpoint, unless L{privilegedStartService} got
376        around to it already.
377        """
378        service.Service.startService(self)
379        if self._waitingForPort is None:
380            self.privilegedStartService()
381
382
383    def stopService(self):
384        """
385        Stop listening on the port if it is already listening, otherwise,
386        cancel the attempt to listen.
387
388        @return: a L{Deferred<twisted.internet.defer.Deferred>} which fires
389            with C{None} when the port has stopped listening.
390        """
391        self._waitingForPort.cancel()
392        def stopIt(port):
393            if port is not None:
394                return port.stopListening()
395        d = self._waitingForPort.addCallback(stopIt)
396        def stop(passthrough):
397            self.running = False
398            return passthrough
399        d.addBoth(stop)
400        return d
401
402
403
404__all__ = (['TimerService', 'CooperatorService', 'MulticastServer',
405            'StreamServerEndpointService'] +
406           [tran+side
407            for tran in 'Generic TCP UNIX SSL UDP UNIXDatagram'.split()
408            for side in 'Server Client'.split()])
Note: See TracBrowser for help on using the browser.