root/trunk/twisted/internet/protocol.py

Revision 34371, 25.1 KB (checked in by itamarst, 9 days ago)

Merge gtk3-process-5512: Notice SIGCHLD in gtk3reactor.

Fixes: #5512
Author: itamar
Review: therve

The previous method for signal handler installation in glib/gtk reactors wasn't sufficient in all cases, so switched to a simpler, less error-prone method.

Line 
1# -*- test-case-name: twisted.test.test_factories,twisted.internet.test.test_protocol -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Standard implementations of Twisted protocol-related interfaces.
7
8Start here if you are looking to write a new protocol implementation for
9Twisted.  The Protocol class contains some introductory material.
10
11Maintainer: Itamar Shtull-Trauring
12"""
13
14import random
15from zope.interface import implements
16
17# Twisted Imports
18from twisted.python import log, failure, components
19from twisted.internet import interfaces, error, defer
20
21
22class Factory:
23    """
24    This is a factory which produces protocols.
25
26    By default, buildProtocol will create a protocol of the class given in
27    self.protocol.
28    """
29
30    implements(interfaces.IProtocolFactory, interfaces.ILoggingContext)
31
32    # put a subclass of Protocol here:
33    protocol = None
34
35    numPorts = 0
36    noisy = True
37
38    def logPrefix(self):
39        """
40        Describe this factory for log messages.
41        """
42        return self.__class__.__name__
43
44
45    def doStart(self):
46        """Make sure startFactory is called.
47
48        Users should not call this function themselves!
49        """
50        if not self.numPorts:
51            if self.noisy:
52                log.msg("Starting factory %r" % self)
53            self.startFactory()
54        self.numPorts = self.numPorts + 1
55
56    def doStop(self):
57        """Make sure stopFactory is called.
58
59        Users should not call this function themselves!
60        """
61        if self.numPorts == 0:
62            # this shouldn't happen, but does sometimes and this is better
63            # than blowing up in assert as we did previously.
64            return
65        self.numPorts = self.numPorts - 1
66        if not self.numPorts:
67            if self.noisy:
68                log.msg("Stopping factory %r" % self)
69            self.stopFactory()
70
71    def startFactory(self):
72        """This will be called before I begin listening on a Port or Connector.
73
74        It will only be called once, even if the factory is connected
75        to multiple ports.
76
77        This can be used to perform 'unserialization' tasks that
78        are best put off until things are actually running, such
79        as connecting to a database, opening files, etcetera.
80        """
81
82    def stopFactory(self):
83        """This will be called before I stop listening on all Ports/Connectors.
84
85        This can be overridden to perform 'shutdown' tasks such as disconnecting
86        database connections, closing files, etc.
87
88        It will be called, for example, before an application shuts down,
89        if it was connected to a port. User code should not call this function
90        directly.
91        """
92
93    def buildProtocol(self, addr):
94        """Create an instance of a subclass of Protocol.
95
96        The returned instance will handle input on an incoming server
97        connection, and an attribute \"factory\" pointing to the creating
98        factory.
99
100        Override this method to alter how Protocol instances get created.
101
102        @param addr: an object implementing L{twisted.internet.interfaces.IAddress}
103        """
104        p = self.protocol()
105        p.factory = self
106        return p
107
108
109class ClientFactory(Factory):
110    """A Protocol factory for clients.
111
112    This can be used together with the various connectXXX methods in
113    reactors.
114    """
115
116    def startedConnecting(self, connector):
117        """Called when a connection has been started.
118
119        You can call connector.stopConnecting() to stop the connection attempt.
120
121        @param connector: a Connector object.
122        """
123
124    def clientConnectionFailed(self, connector, reason):
125        """Called when a connection has failed to connect.
126
127        It may be useful to call connector.connect() - this will reconnect.
128
129        @type reason: L{twisted.python.failure.Failure}
130        """
131
132    def clientConnectionLost(self, connector, reason):
133        """Called when an established connection is lost.
134
135        It may be useful to call connector.connect() - this will reconnect.
136
137        @type reason: L{twisted.python.failure.Failure}
138        """
139
140
141class _InstanceFactory(ClientFactory):
142    """
143    Factory used by ClientCreator.
144
145    @ivar deferred: The L{Deferred} which represents this connection attempt and
146        which will be fired when it succeeds or fails.
147
148    @ivar pending: After a connection attempt succeeds or fails, a delayed call
149        which will fire the L{Deferred} representing this connection attempt.
150    """
151
152    noisy = False
153    pending = None
154
155    def __init__(self, reactor, instance, deferred):
156        self.reactor = reactor
157        self.instance = instance
158        self.deferred = deferred
159
160
161    def __repr__(self):
162        return "<ClientCreator factory: %r>" % (self.instance, )
163
164
165    def buildProtocol(self, addr):
166        """
167        Return the pre-constructed protocol instance and arrange to fire the
168        waiting L{Deferred} to indicate success establishing the connection.
169        """
170        self.pending = self.reactor.callLater(
171            0, self.fire, self.deferred.callback, self.instance)
172        self.deferred = None
173        return self.instance
174
175
176    def clientConnectionFailed(self, connector, reason):
177        """
178        Arrange to fire the waiting L{Deferred} with the given failure to
179        indicate the connection could not be established.
180        """
181        self.pending = self.reactor.callLater(
182            0, self.fire, self.deferred.errback, reason)
183        self.deferred = None
184
185
186    def fire(self, func, value):
187        """
188        Clear C{self.pending} to avoid a reference cycle and then invoke func
189        with the value.
190        """
191        self.pending = None
192        func(value)
193
194
195
196class ClientCreator:
197    """
198    Client connections that do not require a factory.
199
200    The various connect* methods create a protocol instance using the given
201    protocol class and arguments, and connect it, returning a Deferred of the
202    resulting protocol instance.
203
204    Useful for cases when we don't really need a factory.  Mainly this
205    is when there is no shared state between protocol instances, and no need
206    to reconnect.
207
208    The C{connectTCP}, C{connectUNIX}, and C{connectSSL} methods each return a
209    L{Deferred} which will fire with an instance of the protocol class passed to
210    L{ClientCreator.__init__}.  These Deferred can be cancelled to abort the
211    connection attempt (in a very unlikely case, cancelling the Deferred may not
212    prevent the protocol from being instantiated and connected to a transport;
213    if this happens, it will be disconnected immediately afterwards and the
214    Deferred will still errback with L{CancelledError}).
215    """
216
217    def __init__(self, reactor, protocolClass, *args, **kwargs):
218        self.reactor = reactor
219        self.protocolClass = protocolClass
220        self.args = args
221        self.kwargs = kwargs
222
223
224    def _connect(self, method, *args, **kwargs):
225        """
226        Initiate a connection attempt.
227
228        @param method: A callable which will actually start the connection
229            attempt.  For example, C{reactor.connectTCP}.
230
231        @param *args: Positional arguments to pass to C{method}, excluding the
232            factory.
233
234        @param **kwargs: Keyword arguments to pass to C{method}.
235
236        @return: A L{Deferred} which fires with an instance of the protocol
237            class passed to this L{ClientCreator}'s initializer or fails if the
238            connection cannot be set up for some reason.
239        """
240        def cancelConnect(deferred):
241            connector.disconnect()
242            if f.pending is not None:
243                f.pending.cancel()
244        d = defer.Deferred(cancelConnect)
245        f = _InstanceFactory(
246            self.reactor, self.protocolClass(*self.args, **self.kwargs), d)
247        connector = method(factory=f, *args, **kwargs)
248        return d
249
250
251    def connectTCP(self, host, port, timeout=30, bindAddress=None):
252        """
253        Connect to a TCP server.
254
255        The parameters are all the same as to L{IReactorTCP.connectTCP} except
256        that the factory parameter is omitted.
257
258        @return: A L{Deferred} which fires with an instance of the protocol
259            class passed to this L{ClientCreator}'s initializer or fails if the
260            connection cannot be set up for some reason.
261        """
262        return self._connect(
263            self.reactor.connectTCP, host, port, timeout=timeout,
264            bindAddress=bindAddress)
265
266
267    def connectUNIX(self, address, timeout=30, checkPID=False):
268        """
269        Connect to a Unix socket.
270
271        The parameters are all the same as to L{IReactorUNIX.connectUNIX} except
272        that the factory parameter is omitted.
273
274        @return: A L{Deferred} which fires with an instance of the protocol
275            class passed to this L{ClientCreator}'s initializer or fails if the
276            connection cannot be set up for some reason.
277        """
278        return self._connect(
279            self.reactor.connectUNIX, address, timeout=timeout,
280            checkPID=checkPID)
281
282
283    def connectSSL(self, host, port, contextFactory, timeout=30, bindAddress=None):
284        """
285        Connect to an SSL server.
286
287        The parameters are all the same as to L{IReactorSSL.connectSSL} except
288        that the factory parameter is omitted.
289
290        @return: A L{Deferred} which fires with an instance of the protocol
291            class passed to this L{ClientCreator}'s initializer or fails if the
292            connection cannot be set up for some reason.
293        """
294        return self._connect(
295            self.reactor.connectSSL, host, port,
296            contextFactory=contextFactory, timeout=timeout,
297            bindAddress=bindAddress)
298
299
300
301class ReconnectingClientFactory(ClientFactory):
302    """
303    Factory which auto-reconnects clients with an exponential back-off.
304
305    Note that clients should call my resetDelay method after they have
306    connected successfully.
307
308    @ivar maxDelay: Maximum number of seconds between connection attempts.
309    @ivar initialDelay: Delay for the first reconnection attempt.
310    @ivar factor: A multiplicitive factor by which the delay grows
311    @ivar jitter: Percentage of randomness to introduce into the delay length
312        to prevent stampeding.
313    @ivar clock: The clock used to schedule reconnection. It's mainly useful to
314        be parametrized in tests. If the factory is serialized, this attribute
315        will not be serialized, and the default value (the reactor) will be
316        restored when deserialized.
317    @type clock: L{IReactorTime}
318    @ivar maxRetries: Maximum number of consecutive unsuccessful connection
319        attempts, after which no further connection attempts will be made. If
320        this is not explicitly set, no maximum is applied.
321    """
322    maxDelay = 3600
323    initialDelay = 1.0
324    # Note: These highly sensitive factors have been precisely measured by
325    # the National Institute of Science and Technology.  Take extreme care
326    # in altering them, or you may damage your Internet!
327    # (Seriously: <http://physics.nist.gov/cuu/Constants/index.html>)
328    factor = 2.7182818284590451 # (math.e)
329    # Phi = 1.6180339887498948 # (Phi is acceptable for use as a
330    # factor if e is too large for your application.)
331    jitter = 0.11962656472 # molar Planck constant times c, joule meter/mole
332
333    delay = initialDelay
334    retries = 0
335    maxRetries = None
336    _callID = None
337    connector = None
338    clock = None
339
340    continueTrying = 1
341
342
343    def clientConnectionFailed(self, connector, reason):
344        if self.continueTrying:
345            self.connector = connector
346            self.retry()
347
348
349    def clientConnectionLost(self, connector, unused_reason):
350        if self.continueTrying:
351            self.connector = connector
352            self.retry()
353
354
355    def retry(self, connector=None):
356        """
357        Have this connector connect again, after a suitable delay.
358        """
359        if not self.continueTrying:
360            if self.noisy:
361                log.msg("Abandoning %s on explicit request" % (connector,))
362            return
363
364        if connector is None:
365            if self.connector is None:
366                raise ValueError("no connector to retry")
367            else:
368                connector = self.connector
369
370        self.retries += 1
371        if self.maxRetries is not None and (self.retries > self.maxRetries):
372            if self.noisy:
373                log.msg("Abandoning %s after %d retries." %
374                        (connector, self.retries))
375            return
376
377        self.delay = min(self.delay * self.factor, self.maxDelay)
378        if self.jitter:
379            self.delay = random.normalvariate(self.delay,
380                                              self.delay * self.jitter)
381
382        if self.noisy:
383            log.msg("%s will retry in %d seconds" % (connector, self.delay,))
384
385        def reconnector():
386            self._callID = None
387            connector.connect()
388        if self.clock is None:
389            from twisted.internet import reactor
390            self.clock = reactor
391        self._callID = self.clock.callLater(self.delay, reconnector)
392
393
394    def stopTrying(self):
395        """
396        Put a stop to any attempt to reconnect in progress.
397        """
398        # ??? Is this function really stopFactory?
399        if self._callID:
400            self._callID.cancel()
401            self._callID = None
402        self.continueTrying = 0
403        if self.connector:
404            try:
405                self.connector.stopConnecting()
406            except error.NotConnectingError:
407                pass
408
409
410    def resetDelay(self):
411        """
412        Call this method after a successful connection: it resets the delay and
413        the retry counter.
414        """
415        self.delay = self.initialDelay
416        self.retries = 0
417        self._callID = None
418        self.continueTrying = 1
419
420
421    def __getstate__(self):
422        """
423        Remove all of the state which is mutated by connection attempts and
424        failures, returning just the state which describes how reconnections
425        should be attempted.  This will make the unserialized instance
426        behave just as this one did when it was first instantiated.
427        """
428        state = self.__dict__.copy()
429        for key in ['connector', 'retries', 'delay',
430                    'continueTrying', '_callID', 'clock']:
431            if key in state:
432                del state[key]
433        return state
434
435
436
437class ServerFactory(Factory):
438    """Subclass this to indicate that your protocol.Factory is only usable for servers.
439    """
440
441
442
443class BaseProtocol:
444    """
445    This is the abstract superclass of all protocols.
446
447    Some methods have helpful default implementations here so that they can
448    easily be shared, but otherwise the direct subclasses of this class are more
449    interesting, L{Protocol} and L{ProcessProtocol}.
450    """
451    connected = 0
452    transport = None
453
454    def makeConnection(self, transport):
455        """Make a connection to a transport and a server.
456
457        This sets the 'transport' attribute of this Protocol, and calls the
458        connectionMade() callback.
459        """
460        self.connected = 1
461        self.transport = transport
462        self.connectionMade()
463
464    def connectionMade(self):
465        """Called when a connection is made.
466
467        This may be considered the initializer of the protocol, because
468        it is called when the connection is completed.  For clients,
469        this is called once the connection to the server has been
470        established; for servers, this is called after an accept() call
471        stops blocking and a socket has been received.  If you need to
472        send any greeting or initial message, do it here.
473        """
474
475connectionDone=failure.Failure(error.ConnectionDone())
476connectionDone.cleanFailure()
477
478
479class Protocol(BaseProtocol):
480    """
481    This is the base class for streaming connection-oriented protocols.
482
483    If you are going to write a new connection-oriented protocol for Twisted,
484    start here.  Any protocol implementation, either client or server, should
485    be a subclass of this class.
486
487    The API is quite simple.  Implement L{dataReceived} to handle both
488    event-based and synchronous input; output can be sent through the
489    'transport' attribute, which is to be an instance that implements
490    L{twisted.internet.interfaces.ITransport}.  Override C{connectionLost} to be
491    notified when the connection ends.
492
493    Some subclasses exist already to help you write common types of protocols:
494    see the L{twisted.protocols.basic} module for a few of them.
495    """
496    implements(interfaces.IProtocol, interfaces.ILoggingContext)
497
498    def logPrefix(self):
499        """
500        Return a prefix matching the class name, to identify log messages
501        related to this protocol instance.
502        """
503        return self.__class__.__name__
504
505
506    def dataReceived(self, data):
507        """Called whenever data is received.
508
509        Use this method to translate to a higher-level message.  Usually, some
510        callback will be made upon the receipt of each complete protocol
511        message.
512
513        @param data: a string of indeterminate length.  Please keep in mind
514            that you will probably need to buffer some data, as partial
515            (or multiple) protocol messages may be received!  I recommend
516            that unit tests for protocols call through to this method with
517            differing chunk sizes, down to one byte at a time.
518        """
519
520    def connectionLost(self, reason=connectionDone):
521        """Called when the connection is shut down.
522
523        Clear any circular references here, and any external references
524        to this Protocol.  The connection has been closed.
525
526        @type reason: L{twisted.python.failure.Failure}
527        """
528
529
530class ProtocolToConsumerAdapter(components.Adapter):
531    implements(interfaces.IConsumer)
532
533    def write(self, data):
534        self.original.dataReceived(data)
535
536    def registerProducer(self, producer, streaming):
537        pass
538
539    def unregisterProducer(self):
540        pass
541
542components.registerAdapter(ProtocolToConsumerAdapter, interfaces.IProtocol,
543                           interfaces.IConsumer)
544
545class ConsumerToProtocolAdapter(components.Adapter):
546    implements(interfaces.IProtocol)
547
548    def dataReceived(self, data):
549        self.original.write(data)
550
551    def connectionLost(self, reason):
552        pass
553
554    def makeConnection(self, transport):
555        pass
556
557    def connectionMade(self):
558        pass
559
560components.registerAdapter(ConsumerToProtocolAdapter, interfaces.IConsumer,
561                           interfaces.IProtocol)
562
563class ProcessProtocol(BaseProtocol):
564    """
565    Base process protocol implementation which does simple dispatching for
566    stdin, stdout, and stderr file descriptors.
567    """
568    implements(interfaces.IProcessProtocol)
569
570    def childDataReceived(self, childFD, data):
571        if childFD == 1:
572            self.outReceived(data)
573        elif childFD == 2:
574            self.errReceived(data)
575
576
577    def outReceived(self, data):
578        """
579        Some data was received from stdout.
580        """
581
582
583    def errReceived(self, data):
584        """
585        Some data was received from stderr.
586        """
587
588
589    def childConnectionLost(self, childFD):
590        if childFD == 0:
591            self.inConnectionLost()
592        elif childFD == 1:
593            self.outConnectionLost()
594        elif childFD == 2:
595            self.errConnectionLost()
596
597
598    def inConnectionLost(self):
599        """
600        This will be called when stdin is closed.
601        """
602
603
604    def outConnectionLost(self):
605        """
606        This will be called when stdout is closed.
607        """
608
609
610    def errConnectionLost(self):
611        """
612        This will be called when stderr is closed.
613        """
614
615
616    def processExited(self, reason):
617        """
618        This will be called when the subprocess exits.
619
620        @type reason: L{twisted.python.failure.Failure}
621        """
622
623
624    def processEnded(self, reason):
625        """
626        Called when the child process exits and all file descriptors
627        associated with it have been closed.
628
629        @type reason: L{twisted.python.failure.Failure}
630        """
631
632
633
634class AbstractDatagramProtocol:
635    """
636    Abstract protocol for datagram-oriented transports, e.g. IP, ICMP, ARP, UDP.
637    """
638
639    transport = None
640    numPorts = 0
641    noisy = True
642
643    def __getstate__(self):
644        d = self.__dict__.copy()
645        d['transport'] = None
646        return d
647
648    def doStart(self):
649        """Make sure startProtocol is called.
650
651        This will be called by makeConnection(), users should not call it.
652        """
653        if not self.numPorts:
654            if self.noisy:
655                log.msg("Starting protocol %s" % self)
656            self.startProtocol()
657        self.numPorts = self.numPorts + 1
658
659    def doStop(self):
660        """Make sure stopProtocol is called.
661
662        This will be called by the port, users should not call it.
663        """
664        assert self.numPorts > 0
665        self.numPorts = self.numPorts - 1
666        self.transport = None
667        if not self.numPorts:
668            if self.noisy:
669                log.msg("Stopping protocol %s" % self)
670            self.stopProtocol()
671
672    def startProtocol(self):
673        """Called when a transport is connected to this protocol.
674
675        Will only be called once, even if multiple ports are connected.
676        """
677
678    def stopProtocol(self):
679        """Called when the transport is disconnected.
680
681        Will only be called once, after all ports are disconnected.
682        """
683
684    def makeConnection(self, transport):
685        """Make a connection to a transport and a server.
686
687        This sets the 'transport' attribute of this DatagramProtocol, and calls the
688        doStart() callback.
689        """
690        assert self.transport == None
691        self.transport = transport
692        self.doStart()
693
694    def datagramReceived(self, datagram, addr):
695        """Called when a datagram is received.
696
697        @param datagram: the string received from the transport.
698        @param addr: tuple of source of datagram.
699        """
700
701
702class DatagramProtocol(AbstractDatagramProtocol):
703    """
704    Protocol for datagram-oriented transport, e.g. UDP.
705
706    @type transport: C{NoneType} or
707        L{IUDPTransport<twisted.internet.interfaces.IUDPTransport>} provider
708    @ivar transport: The transport with which this protocol is associated,
709        if it is associated with one.
710    """
711    implements(interfaces.ILoggingContext)
712
713    def logPrefix(self):
714        """
715        Return a prefix matching the class name, to identify log messages
716        related to this protocol instance.
717        """
718        return self.__class__.__name__
719
720
721    def connectionRefused(self):
722        """Called due to error from write in connected mode.
723
724        Note this is a result of ICMP message generated by *previous*
725        write.
726        """
727
728
729class ConnectedDatagramProtocol(DatagramProtocol):
730    """Protocol for connected datagram-oriented transport.
731
732    No longer necessary for UDP.
733    """
734
735    def datagramReceived(self, datagram):
736        """Called when a datagram is received.
737
738        @param datagram: the string received from the transport.
739        """
740
741    def connectionFailed(self, failure):
742        """Called if connecting failed.
743
744        Usually this will be due to a DNS lookup failure.
745        """
746
747
748
749class FileWrapper:
750    """A wrapper around a file-like object to make it behave as a Transport.
751
752    This doesn't actually stream the file to the attached protocol,
753    and is thus useful mainly as a utility for debugging protocols.
754    """
755
756    implements(interfaces.ITransport)
757
758    closed = 0
759    disconnecting = 0
760    producer = None
761    streamingProducer = 0
762
763    def __init__(self, file):
764        self.file = file
765
766    def write(self, data):
767        try:
768            self.file.write(data)
769        except:
770            self.handleException()
771        # self._checkProducer()
772
773    def _checkProducer(self):
774        # Cheating; this is called at "idle" times to allow producers to be
775        # found and dealt with
776        if self.producer:
777            self.producer.resumeProducing()
778
779    def registerProducer(self, producer, streaming):
780        """From abstract.FileDescriptor
781        """
782        self.producer = producer
783        self.streamingProducer = streaming
784        if not streaming:
785            producer.resumeProducing()
786
787    def unregisterProducer(self):
788        self.producer = None
789
790    def stopConsuming(self):
791        self.unregisterProducer()
792        self.loseConnection()
793
794    def writeSequence(self, iovec):
795        self.write("".join(iovec))
796
797    def loseConnection(self):
798        self.closed = 1
799        try:
800            self.file.close()
801        except (IOError, OSError):
802            self.handleException()
803
804    def getPeer(self):
805        # XXX: According to ITransport, this should return an IAddress!
806        return 'file', 'file'
807
808    def getHost(self):
809        # XXX: According to ITransport, this should return an IAddress!
810        return 'file'
811
812    def handleException(self):
813        pass
814
815    def resumeProducing(self):
816        # Never sends data anyways
817        pass
818
819    def pauseProducing(self):
820        # Never sends data anyways
821        pass
822
823    def stopProducing(self):
824        self.loseConnection()
825
826
827__all__ = ["Factory", "ClientFactory", "ReconnectingClientFactory", "connectionDone",
828           "Protocol", "ProcessProtocol", "FileWrapper", "ServerFactory",
829           "AbstractDatagramProtocol", "DatagramProtocol", "ConnectedDatagramProtocol",
830           "ClientCreator"]
Note: See TracBrowser for help on using the browser.