root/tags/releases/twisted-8.2.0/twisted/internet/protocol.py

Revision 24441, 20.4 KB (checked in by thijs, 2 years ago)

Merge maintainer-email-2438: Get rid of references to maintainer email addresses from code.

Author: thijs
Reviewer: exarkun
Fixes: #2438

Line 
1# -*- test-case-name: twisted.test.test_factories -*-
2#
3# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6
7"""
8Standard implementations of Twisted protocol-related interfaces.
9
10Start here if you are looking to write a new protocol implementation for
11Twisted.  The Protocol class contains some introductory material.
12
13Maintainer: Itamar Shtull-Trauring
14"""
15
16import random
17from zope.interface import implements
18
19# Twisted Imports
20from twisted.python import log, failure, components
21from twisted.internet import interfaces, error, defer
22
23
24class Factory:
25    """This is a factory which produces protocols.
26
27    By default, buildProtocol will create a protocol of the class given in
28    self.protocol.
29    """
30
31    implements(interfaces.IProtocolFactory)
32
33    # put a subclass of Protocol here:
34    protocol = None
35
36    numPorts = 0
37    noisy = True
38
39    def doStart(self):
40        """Make sure startFactory is called.
41
42        Users should not call this function themselves!
43        """
44        if not self.numPorts:
45            if self.noisy:
46                log.msg("Starting factory %r" % self)
47            self.startFactory()
48        self.numPorts = self.numPorts + 1
49
50    def doStop(self):
51        """Make sure stopFactory is called.
52
53        Users should not call this function themselves!
54        """
55        if self.numPorts == 0:
56            # this shouldn't happen, but does sometimes and this is better
57            # than blowing up in assert as we did previously.
58            return
59        self.numPorts = self.numPorts - 1
60        if not self.numPorts:
61            if self.noisy:
62                log.msg("Stopping factory %r" % self)
63            self.stopFactory()
64
65    def startFactory(self):
66        """This will be called before I begin listening on a Port or Connector.
67
68        It will only be called once, even if the factory is connected
69        to multiple ports.
70
71        This can be used to perform 'unserialization' tasks that
72        are best put off until things are actually running, such
73        as connecting to a database, opening files, etcetera.
74        """
75
76    def stopFactory(self):
77        """This will be called before I stop listening on all Ports/Connectors.
78
79        This can be overridden to perform 'shutdown' tasks such as disconnecting
80        database connections, closing files, etc.
81
82        It will be called, for example, before an application shuts down,
83        if it was connected to a port. User code should not call this function
84        directly.
85        """
86
87    def buildProtocol(self, addr):
88        """Create an instance of a subclass of Protocol.
89
90        The returned instance will handle input on an incoming server
91        connection, and an attribute \"factory\" pointing to the creating
92        factory.
93
94        Override this method to alter how Protocol instances get created.
95
96        @param addr: an object implementing L{twisted.internet.interfaces.IAddress}
97        """
98        p = self.protocol()
99        p.factory = self
100        return p
101
102
103class ClientFactory(Factory):
104    """A Protocol factory for clients.
105
106    This can be used together with the various connectXXX methods in
107    reactors.
108    """
109
110    def startedConnecting(self, connector):
111        """Called when a connection has been started.
112
113        You can call connector.stopConnecting() to stop the connection attempt.
114
115        @param connector: a Connector object.
116        """
117
118    def clientConnectionFailed(self, connector, reason):
119        """Called when a connection has failed to connect.
120
121        It may be useful to call connector.connect() - this will reconnect.
122
123        @type reason: L{twisted.python.failure.Failure}
124        """
125
126    def clientConnectionLost(self, connector, reason):
127        """Called when an established connection is lost.
128
129        It may be useful to call connector.connect() - this will reconnect.
130
131        @type reason: L{twisted.python.failure.Failure}
132        """
133
134
135class _InstanceFactory(ClientFactory):
136    """Factory used by ClientCreator."""
137
138    noisy = False
139
140    def __init__(self, reactor, instance, deferred):
141        self.reactor = reactor
142        self.instance = instance
143        self.deferred = deferred
144
145    def __repr__(self):
146        return "<ClientCreator factory: %r>" % (self.instance, )
147
148    def buildProtocol(self, addr):
149        self.reactor.callLater(0, self.deferred.callback, self.instance)
150        del self.deferred
151        return self.instance
152
153    def clientConnectionFailed(self, connector, reason):
154        self.reactor.callLater(0, self.deferred.errback, reason)
155        del self.deferred
156
157
158class ClientCreator:
159    """Client connections that do not require a factory.
160
161    The various connect* methods create a protocol instance using the given
162    protocol class and arguments, and connect it, returning a Deferred of the
163    resulting protocol instance.
164
165    Useful for cases when we don't really need a factory.  Mainly this
166    is when there is no shared state between protocol instances, and no need
167    to reconnect.
168    """
169
170    def __init__(self, reactor, protocolClass, *args, **kwargs):
171        self.reactor = reactor
172        self.protocolClass = protocolClass
173        self.args = args
174        self.kwargs = kwargs
175
176    def connectTCP(self, host, port, timeout=30, bindAddress=None):
177        """Connect to remote host, return Deferred of resulting protocol instance."""
178        d = defer.Deferred()
179        f = _InstanceFactory(self.reactor, self.protocolClass(*self.args, **self.kwargs), d)
180        self.reactor.connectTCP(host, port, f, timeout=timeout, bindAddress=bindAddress)
181        return d
182
183    def connectUNIX(self, address, timeout = 30, checkPID=0):
184        """Connect to Unix socket, return Deferred of resulting protocol instance."""
185        d = defer.Deferred()
186        f = _InstanceFactory(self.reactor, self.protocolClass(*self.args, **self.kwargs), d)
187        self.reactor.connectUNIX(address, f, timeout = timeout, checkPID=checkPID)
188        return d
189
190    def connectSSL(self, host, port, contextFactory, timeout=30, bindAddress=None):
191        """Connect to SSL server, return Deferred of resulting protocol instance."""
192        d = defer.Deferred()
193        f = _InstanceFactory(self.reactor, self.protocolClass(*self.args, **self.kwargs), d)
194        self.reactor.connectSSL(host, port, f, contextFactory, timeout=timeout, bindAddress=bindAddress)
195        return d
196
197
198class ReconnectingClientFactory(ClientFactory):
199    """My clients auto-reconnect with an exponential back-off.
200
201    Note that clients should call my resetDelay method after they have
202    connected successfully.
203
204    @ivar maxDelay: Maximum number of seconds between connection attempts.
205    @ivar initialDelay: Delay for the first reconnection attempt.
206    @ivar factor: a multiplicitive factor by which the delay grows
207    @ivar jitter: percentage of randomness to introduce into the delay length
208        to prevent stampeding.
209    """
210    maxDelay = 3600
211    initialDelay = 1.0
212    # Note: These highly sensitive factors have been precisely measured by
213    # the National Institute of Science and Technology.  Take extreme care
214    # in altering them, or you may damage your Internet!
215    # (Seriously: <http://physics.nist.gov/cuu/Constants/index.html>)
216    factor = 2.7182818284590451 # (math.e)
217    # Phi = 1.6180339887498948 # (Phi is acceptable for use as a
218    # factor if e is too large for your application.)
219    jitter = 0.11962656472 # molar Planck constant times c, joule meter/mole
220
221    delay = initialDelay
222    retries = 0
223    maxRetries = None
224    _callID = None
225    connector = None
226
227    continueTrying = 1
228
229    def clientConnectionFailed(self, connector, reason):
230        if self.continueTrying:
231            self.connector = connector
232            self.retry()
233
234    def clientConnectionLost(self, connector, unused_reason):
235        if self.continueTrying:
236            self.connector = connector
237            self.retry()
238
239    def retry(self, connector=None):
240        """Have this connector connect again, after a suitable delay.
241        """
242        if not self.continueTrying:
243            if self.noisy:
244                log.msg("Abandoning %s on explicit request" % (connector,))
245            return
246
247        if connector is None:
248            if self.connector is None:
249                raise ValueError("no connector to retry")
250            else:
251                connector = self.connector
252
253        self.retries += 1
254        if self.maxRetries is not None and (self.retries > self.maxRetries):
255            if self.noisy:
256                log.msg("Abandoning %s after %d retries." %
257                        (connector, self.retries))
258            return
259
260        self.delay = min(self.delay * self.factor, self.maxDelay)
261        if self.jitter:
262            self.delay = random.normalvariate(self.delay,
263                                              self.delay * self.jitter)
264
265        if self.noisy:
266            log.msg("%s will retry in %d seconds" % (connector, self.delay,))
267        from twisted.internet import reactor
268
269        def reconnector():
270            self._callID = None
271            connector.connect()
272        self._callID = reactor.callLater(self.delay, reconnector)
273
274    def stopTrying(self):
275        """I put a stop to any attempt to reconnect in progress.
276        """
277        # ??? Is this function really stopFactory?
278        if self._callID:
279            self._callID.cancel()
280            self._callID = None
281        if self.connector:
282            # Hopefully this doesn't just make clientConnectionFailed
283            # retry again.
284            try:
285                self.connector.stopConnecting()
286            except error.NotConnectingError:
287                pass
288        self.continueTrying = 0
289
290    def resetDelay(self):
291        """Call me after a successful connection to reset.
292
293        I reset the delay and the retry counter.
294        """
295        self.delay = self.initialDelay
296        self.retries = 0
297        self._callID = None
298        self.continueTrying = 1
299
300
301    def __getstate__(self):
302        """
303        Remove all of the state which is mutated by connection attempts and
304        failures, returning just the state which describes how reconnections
305        should be attempted.  This will make the unserialized instance
306        behave just as this one did when it was first instantiated.
307        """
308        state = self.__dict__.copy()
309        for key in ['connector', 'retries', 'delay',
310                    'continueTrying', '_callID']:
311            if key in state:
312                del state[key]
313        return state
314
315
316
317class ServerFactory(Factory):
318    """Subclass this to indicate that your protocol.Factory is only usable for servers.
319    """
320
321
322class BaseProtocol:
323    """This is the abstract superclass of all protocols.
324
325    If you are going to write a new protocol for Twisted, start here.  The
326    docstrings of this class explain how you can get started.  Any protocol
327    implementation, either client or server, should be a subclass of me.
328
329    My API is quite simple.  Implement dataReceived(data) to handle both
330    event-based and synchronous input; output can be sent through the
331    'transport' attribute, which is to be an instance that implements
332    L{twisted.internet.interfaces.ITransport}.
333
334    Some subclasses exist already to help you write common types of protocols:
335    see the L{twisted.protocols.basic} module for a few of them.
336    """
337
338    connected = 0
339    transport = None
340
341    def makeConnection(self, transport):
342        """Make a connection to a transport and a server.
343
344        This sets the 'transport' attribute of this Protocol, and calls the
345        connectionMade() callback.
346        """
347        self.connected = 1
348        self.transport = transport
349        self.connectionMade()
350
351    def connectionMade(self):
352        """Called when a connection is made.
353
354        This may be considered the initializer of the protocol, because
355        it is called when the connection is completed.  For clients,
356        this is called once the connection to the server has been
357        established; for servers, this is called after an accept() call
358        stops blocking and a socket has been received.  If you need to
359        send any greeting or initial message, do it here.
360        """
361
362connectionDone=failure.Failure(error.ConnectionDone())
363connectionDone.cleanFailure()
364
365
366class Protocol(BaseProtocol):
367
368    implements(interfaces.IProtocol)
369
370    def dataReceived(self, data):
371        """Called whenever data is received.
372
373        Use this method to translate to a higher-level message.  Usually, some
374        callback will be made upon the receipt of each complete protocol
375        message.
376
377        @param data: a string of indeterminate length.  Please keep in mind
378            that you will probably need to buffer some data, as partial
379            (or multiple) protocol messages may be received!  I recommend
380            that unit tests for protocols call through to this method with
381            differing chunk sizes, down to one byte at a time.
382        """
383
384    def connectionLost(self, reason=connectionDone):
385        """Called when the connection is shut down.
386
387        Clear any circular references here, and any external references
388        to this Protocol.  The connection has been closed.
389
390        @type reason: L{twisted.python.failure.Failure}
391        """
392
393
394class ProtocolToConsumerAdapter(components.Adapter):
395    implements(interfaces.IConsumer)
396
397    def write(self, data):
398        self.original.dataReceived(data)
399
400    def registerProducer(self, producer, streaming):
401        pass
402
403    def unregisterProducer(self):
404        pass
405
406components.registerAdapter(ProtocolToConsumerAdapter, interfaces.IProtocol,
407                           interfaces.IConsumer)
408
409class ConsumerToProtocolAdapter(components.Adapter):
410    implements(interfaces.IProtocol)
411
412    def dataReceived(self, data):
413        self.original.write(data)
414
415    def connectionLost(self, reason):
416        pass
417
418    def makeConnection(self, transport):
419        pass
420
421    def connectionMade(self):
422        pass
423
424components.registerAdapter(ConsumerToProtocolAdapter, interfaces.IConsumer,
425                           interfaces.IProtocol)
426
427class ProcessProtocol(BaseProtocol):
428    """
429    Base process protocol implementation which does simple dispatching for
430    stdin, stdout, and stderr file descriptors.
431    """
432    implements(interfaces.IProcessProtocol)
433
434    def childDataReceived(self, childFD, data):
435        if childFD == 1:
436            self.outReceived(data)
437        elif childFD == 2:
438            self.errReceived(data)
439
440
441    def outReceived(self, data):
442        """
443        Some data was received from stdout.
444        """
445
446
447    def errReceived(self, data):
448        """
449        Some data was received from stderr.
450        """
451
452
453    def childConnectionLost(self, childFD):
454        if childFD == 0:
455            self.inConnectionLost()
456        elif childFD == 1:
457            self.outConnectionLost()
458        elif childFD == 2:
459            self.errConnectionLost()
460
461
462    def inConnectionLost(self):
463        """
464        This will be called when stdin is closed.
465        """
466
467
468    def outConnectionLost(self):
469        """
470        This will be called when stdout is closed.
471        """
472
473
474    def errConnectionLost(self):
475        """
476        This will be called when stderr is closed.
477        """
478
479
480    def processExited(self, reason):
481        """
482        This will be called when the subprocess exits.
483
484        @type reason: L{twisted.python.failure.Failure}
485        """
486
487
488    def processEnded(self, reason):
489        """
490        This will be called when the subprocess is finished.
491
492        @type reason: L{twisted.python.failure.Failure}
493        """
494
495
496
497class AbstractDatagramProtocol:
498    """
499    Abstract protocol for datagram-oriented transports, e.g. IP, ICMP, ARP, UDP.
500    """
501
502    transport = None
503    numPorts = 0
504    noisy = True
505
506    def __getstate__(self):
507        d = self.__dict__.copy()
508        d['transport'] = None
509        return d
510
511    def doStart(self):
512        """Make sure startProtocol is called.
513
514        This will be called by makeConnection(), users should not call it.
515        """
516        if not self.numPorts:
517            if self.noisy:
518                log.msg("Starting protocol %s" % self)
519            self.startProtocol()
520        self.numPorts = self.numPorts + 1
521
522    def doStop(self):
523        """Make sure stopProtocol is called.
524
525        This will be called by the port, users should not call it.
526        """
527        assert self.numPorts > 0
528        self.numPorts = self.numPorts - 1
529        self.transport = None
530        if not self.numPorts:
531            if self.noisy:
532                log.msg("Stopping protocol %s" % self)
533            self.stopProtocol()
534
535    def startProtocol(self):
536        """Called when a transport is connected to this protocol.
537
538        Will only be called once, even if multiple ports are connected.
539        """
540
541    def stopProtocol(self):
542        """Called when the transport is disconnected.
543
544        Will only be called once, after all ports are disconnected.
545        """
546
547    def makeConnection(self, transport):
548        """Make a connection to a transport and a server.
549
550        This sets the 'transport' attribute of this DatagramProtocol, and calls the
551        doStart() callback.
552        """
553        assert self.transport == None
554        self.transport = transport
555        self.doStart()
556
557    def datagramReceived(self, datagram, addr):
558        """Called when a datagram is received.
559
560        @param datagram: the string received from the transport.
561        @param addr: tuple of source of datagram.
562        """
563
564
565class DatagramProtocol(AbstractDatagramProtocol):
566    """
567    Protocol for datagram-oriented transport, e.g. UDP.
568
569    @type transport: C{NoneType} or
570        L{IUDPTransport<twisted.internet.interfaces.IUDPTransport>} provider
571    @ivar transport: The transport with which this protocol is associated,
572        if it is associated with one.
573    """
574
575    def connectionRefused(self):
576        """Called due to error from write in connected mode.
577
578        Note this is a result of ICMP message generated by *previous*
579        write.
580        """
581
582
583class ConnectedDatagramProtocol(DatagramProtocol):
584    """Protocol for connected datagram-oriented transport.
585
586    No longer necessary for UDP.
587    """
588
589    def datagramReceived(self, datagram):
590        """Called when a datagram is received.
591
592        @param datagram: the string received from the transport.
593        """
594
595    def connectionFailed(self, failure):
596        """Called if connecting failed.
597
598        Usually this will be due to a DNS lookup failure.
599        """
600
601
602
603class FileWrapper:
604    """A wrapper around a file-like object to make it behave as a Transport.
605
606    This doesn't actually stream the file to the attached protocol,
607    and is thus useful mainly as a utility for debugging protocols.
608    """
609
610    implements(interfaces.ITransport)
611
612    closed = 0
613    disconnecting = 0
614    producer = None
615    streamingProducer = 0
616
617    def __init__(self, file):
618        self.file = file
619
620    def write(self, data):
621        try:
622            self.file.write(data)
623        except:
624            self.handleException()
625        # self._checkProducer()
626
627    def _checkProducer(self):
628        # Cheating; this is called at "idle" times to allow producers to be
629        # found and dealt with
630        if self.producer:
631            self.producer.resumeProducing()
632
633    def registerProducer(self, producer, streaming):
634        """From abstract.FileDescriptor
635        """
636        self.producer = producer
637        self.streamingProducer = streaming
638        if not streaming:
639            producer.resumeProducing()
640
641    def unregisterProducer(self):
642        self.producer = None
643
644    def stopConsuming(self):
645        self.unregisterProducer()
646        self.loseConnection()
647
648    def writeSequence(self, iovec):
649        self.write("".join(iovec))
650
651    def loseConnection(self):
652        self.closed = 1
653        try:
654            self.file.close()
655        except (IOError, OSError):
656            self.handleException()
657
658    def getPeer(self):
659        # XXX: According to ITransport, this should return an IAddress!
660        return 'file', 'file'
661
662    def getHost(self):
663        # XXX: According to ITransport, this should return an IAddress!
664        return 'file'
665
666    def handleException(self):
667        pass
668
669    def resumeProducing(self):
670        # Never sends data anyways
671        pass
672
673    def pauseProducing(self):
674        # Never sends data anyways
675        pass
676
677    def stopProducing(self):
678        self.loseConnection()
679
680
681__all__ = ["Factory", "ClientFactory", "ReconnectingClientFactory", "connectionDone",
682           "Protocol", "ProcessProtocol", "FileWrapper", "ServerFactory",
683           "AbstractDatagramProtocol", "DatagramProtocol", "ConnectedDatagramProtocol",
684           "ClientCreator"]
Note: See TracBrowser for help on using the browser.