root / trunk / twisted / words / protocols / jabber / xmlstream.py

Revision 25457, 33.4 kB (checked in by exarkun, 8 months ago)

Merge hashlib-2763-3

Author: wsanchez, exarkun
Reviewer: exarkun, mwhudson
Fixes: #2763

Replace uses of md5 and sha modules in Twisted with use of a new twisted.python.hashlib
module which transparently uses the new hashlib standard library module if it is available
or falls back to md5 and sha if not.

Line 
1 # -*- test-case-name: twisted.words.test.test_jabberxmlstream -*-
2 #
3 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6 """
7 XMPP XML Streams
8
9 Building blocks for setting up XML Streams, including helping classes for
10 doing authentication on either client or server side, and working with XML
11 Stanzas.
12 """
13
14 from zope.interface import directlyProvides, implements
15
16 from twisted.internet import defer, protocol
17 from twisted.internet.error import ConnectionLost
18 from twisted.python import failure, log, randbytes
19 from twisted.words.protocols.jabber import error, ijabber, jid
20 from twisted.words.xish import domish, xmlstream
21 from twisted.words.xish.xmlstream import STREAM_CONNECTED_EVENT
22 from twisted.words.xish.xmlstream import STREAM_START_EVENT
23 from twisted.words.xish.xmlstream import STREAM_END_EVENT
24 from twisted.words.xish.xmlstream import STREAM_ERROR_EVENT
25
26 try:
27     from twisted.internet import ssl
28 except ImportError:
29     ssl = None
30 if ssl and not ssl.supported:
31     ssl = None
32
33 STREAM_AUTHD_EVENT = intern("//event/stream/authd")
34 INIT_FAILED_EVENT = intern("//event/xmpp/initfailed")
35
36 NS_STREAMS = 'http://etherx.jabber.org/streams'
37 NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls'
38
39 Reset = object()
40
41 def hashPassword(sid, password):
42     """
43     Create a SHA1-digest string of a session identifier and password.
44     """
45     from twisted.python.hashlib import sha1
46     return sha1("%s%s" % (sid, password)).hexdigest()
47
48
49
50 class Authenticator:
51     """
52     Base class for business logic of initializing an XmlStream
53
54     Subclass this object to enable an XmlStream to initialize and authenticate
55     to different types of stream hosts (such as clients, components, etc.).
56
57     Rules:
58       1. The Authenticator MUST dispatch a L{STREAM_AUTHD_EVENT} when the
59          stream has been completely initialized.
60       2. The Authenticator SHOULD reset all state information when
61          L{associateWithStream} is called.
62       3. The Authenticator SHOULD override L{streamStarted}, and start
63          initialization there.
64
65     @type xmlstream: L{XmlStream}
66     @ivar xmlstream: The XmlStream that needs authentication
67
68     @note: the term authenticator is historical. Authenticators perform
69            all steps required to prepare the stream for the exchange
70            of XML stanzas.
71     """
72
73     def __init__(self):
74         self.xmlstream = None
75
76
77     def connectionMade(self):
78         """
79         Called by the XmlStream when the underlying socket connection is
80         in place.
81
82         This allows the Authenticator to send an initial root element, if it's
83         connecting, or wait for an inbound root from the peer if it's accepting
84         the connection.
85
86         Subclasses can use self.xmlstream.send() to send any initial data to
87         the peer.
88         """
89
90
91     def streamStarted(self, rootElement):
92         """
93         Called by the XmlStream when the stream has started.
94
95         A stream is considered to have started when the start tag of the root
96         element has been received.
97
98         This examines L{rootElement} to see if there is a version attribute.
99         If absent, C{0.0} is assumed per RFC 3920. Subsequently, the
100         minimum of the version from the received stream header and the
101         value stored in L{xmlstream} is taken and put back in {xmlstream}.
102
103         Extensions of this method can extract more information from the
104         stream header and perform checks on them, optionally sending
105         stream errors and closing the stream.
106         """
107         if rootElement.hasAttribute("version"):
108             version = rootElement["version"].split(".")
109             try:
110                 version = (int(version[0]), int(version[1]))
111             except (IndexError, ValueError):
112                 version = (0, 0)
113         else:
114             version = (0, 0)
115
116         self.xmlstream.version = min(self.xmlstream.version, version)
117
118
119     def associateWithStream(self, xmlstream):
120         """
121         Called by the XmlStreamFactory when a connection has been made
122         to the requested peer, and an XmlStream object has been
123         instantiated.
124
125         The default implementation just saves a handle to the new
126         XmlStream.
127
128         @type xmlstream: L{XmlStream}
129         @param xmlstream: The XmlStream that will be passing events to this
130                           Authenticator.
131
132         """
133         self.xmlstream = xmlstream
134
135
136
137 class ConnectAuthenticator(Authenticator):
138     """
139     Authenticator for initiating entities.
140     """
141
142     namespace = None
143
144     def __init__(self, otherHost):
145         self.otherHost = otherHost
146
147
148     def connectionMade(self):
149         self.xmlstream.namespace = self.namespace
150         self.xmlstream.otherEntity = jid.internJID(self.otherHost)
151         self.xmlstream.sendHeader()
152
153
154     def initializeStream(self):
155         """
156         Perform stream initialization procedures.
157
158         An L{XmlStream} holds a list of initializer objects in its
159         C{initializers} attribute. This method calls these initializers in
160         order and dispatches the C{STREAM_AUTHD_EVENT} event when the list has
161         been successfully processed. Otherwise it dispatches the
162         C{INIT_FAILED_EVENT} event with the failure.
163
164         Initializers may return the special L{Reset} object to halt the
165         initialization processing. It signals that the current initializer was
166         successfully processed, but that the XML Stream has been reset. An
167         example is the TLSInitiatingInitializer.
168         """
169
170         def remove_first(result):
171             self.xmlstream.initializers.pop(0)
172
173             return result
174
175         def do_next(result):
176             """
177             Take the first initializer and process it.
178
179             On success, the initializer is removed from the list and
180             then next initializer will be tried.
181             """
182
183             if result is Reset:
184                 return None
185
186             try:
187                 init = self.xmlstream.initializers[0]
188             except IndexError:
189                 self.xmlstream.dispatch(self.xmlstream, STREAM_AUTHD_EVENT)
190                 return None
191             else:
192                 d = defer.maybeDeferred(init.initialize)
193                 d.addCallback(remove_first)
194                 d.addCallback(do_next)
195                 return d
196
197         d = defer.succeed(None)
198         d.addCallback(do_next)
199         d.addErrback(self.xmlstream.dispatch, INIT_FAILED_EVENT)
200
201
202     def streamStarted(self, rootElement):
203         """
204         Called by the XmlStream when the stream has started.
205
206         This extends L{Authenticator.streamStarted} to extract further stream
207         headers from L{rootElement}, optionally wait for stream features being
208         received and then call C{initializeStream}.
209         """
210
211         Authenticator.streamStarted(self, rootElement)
212
213         self.xmlstream.sid = rootElement.getAttribute("id")
214
215         if rootElement.hasAttribute("from"):
216             self.xmlstream.otherEntity = jid.internJID(rootElement["from"])
217
218         # Setup observer for stream features, if applicable
219         if self.xmlstream.version >= (1, 0):
220             def onFeatures(element):
221                 features = {}
222                 for feature in element.elements():
223                     features[(feature.uri, feature.name)] = feature
224
225                 self.xmlstream.features = features
226                 self.initializeStream()
227
228             self.xmlstream.addOnetimeObserver('/features[@xmlns="%s"]' %
229                                                   NS_STREAMS,
230                                               onFeatures)
231         else:
232             self.initializeStream()
233
234
235
236 class ListenAuthenticator(Authenticator):
237     """
238     Authenticator for receiving entities.
239     """
240
241     namespace = None
242
243     def associateWithStream(self, xmlstream):
244         """
245         Called by the XmlStreamFactory when a connection has been made.
246
247         Extend L{Authenticator.associateWithStream} to set the L{XmlStream}
248         to be non-initiating.
249         """
250         Authenticator.associateWithStream(self, xmlstream)
251         self.xmlstream.initiating = False
252
253
254     def streamStarted(self, rootElement):
255         """
256         Called by the XmlStream when the stream has started.
257
258         This extends L{Authenticator.streamStarted} to extract further
259         information from the stream headers from L{rootElement}.
260         """
261         Authenticator.streamStarted(self, rootElement)
262
263         self.xmlstream.namespace = rootElement.defaultUri
264
265         if rootElement.hasAttribute("to"):
266             self.xmlstream.thisEntity = jid.internJID(rootElement["to"])
267
268         self.xmlstream.prefixes = {}
269         for prefix, uri in rootElement.localPrefixes.iteritems():
270             self.xmlstream.prefixes[uri] = prefix
271
272         self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
273
274
275
276 class FeatureNotAdvertized(Exception):
277     """
278     Exception indicating a stream feature was not advertized, while required by
279     the initiating entity.
280     """
281
282
283
284 class BaseFeatureInitiatingInitializer(object):
285     """
286     Base class for initializers with a stream feature.
287
288     This assumes the associated XmlStream represents the initiating entity
289     of the connection.
290
291     @cvar feature: tuple of (uri, name) of the stream feature root element.
292     @type feature: tuple of (L{str}, L{str})
293     @ivar required: whether the stream feature is required to be advertized
294                     by the receiving entity.
295     @type required: L{bool}
296     """
297
298     implements(ijabber.IInitiatingInitializer)
299
300     feature = None
301     required = False
302
303     def __init__(self, xs):
304         self.xmlstream = xs
305
306
307     def initialize(self):
308         """
309         Initiate the initialization.
310
311         Checks if the receiving entity advertizes the stream feature. If it
312         does, the initialization is started. If it is not advertized, and the
313         C{required} instance variable is L{True}, it raises
314         L{FeatureNotAdvertized}. Otherwise, the initialization silently
315         succeeds.
316         """
317
318         if self.feature in self.xmlstream.features:
319             return self.start()
320         elif self.required:
321             raise FeatureNotAdvertized
322         else:
323             return None
324
325
326     def start(self):
327         """
328         Start the actual initialization.
329
330         May return a deferred for asynchronous initialization.
331         """
332
333
334
335 class TLSError(Exception):
336     """
337     TLS base exception.
338     """
339
340
341
342 class TLSFailed(TLSError):
343     """
344     Exception indicating failed TLS negotiation
345     """
346
347
348
349 class TLSRequired(TLSError):
350     """
351     Exception indicating required TLS negotiation.
352
353     This exception is raised when the receiving entity requires TLS
354     negotiation and the initiating does not desire to negotiate TLS.
355     """
356
357
358
359 class TLSNotSupported(TLSError):
360     """
361     Exception indicating missing TLS support.
362
363     This exception is raised when the initiating entity wants and requires to
364     negotiate TLS when the OpenSSL library is not available.
365     """
366
367
368
369 class TLSInitiatingInitializer(BaseFeatureInitiatingInitializer):
370     """
371     TLS stream initializer for the initiating entity.
372
373     It is strongly required to include this initializer in the list of
374     initializers for an XMPP stream. By default it will try to negotiate TLS.
375     An XMPP server may indicate that TLS is required. If TLS is not desired,
376     set the C{wanted} attribute to False instead of removing it from the list
377     of initializers, so a proper exception L{TLSRequired} can be raised.
378
379     @cvar wanted: indicates if TLS negotiation is wanted.
380     @type wanted: L{bool}
381     """
382
383     feature = (NS_XMPP_TLS, 'starttls')
384     wanted = True
385     _deferred = None
386
387     def onProceed(self, obj):
388         """
389         Proceed with TLS negotiation and reset the XML stream.
390         """
391
392         self.xmlstream.removeObserver('/failure', self.onFailure)
393         ctx = ssl.CertificateOptions()
394         self.xmlstream.transport.startTLS(ctx)
395         self.xmlstream.reset()
396         self.xmlstream.sendHeader()
397         self._deferred.callback(Reset)
398
399
400     def onFailure(self, obj):
401         self.xmlstream.removeObserver('/proceed', self.onProceed)
402         self._deferred.errback(TLSFailed())
403
404
405     def start(self):
406         """
407         Start TLS negotiation.
408
409         This checks if the receiving entity requires TLS, the SSL library is
410         available and uses the C{required} and C{wanted} instance variables to
411         determine what to do in the various different cases.
412
413         For example, if the SSL library is not available, and wanted and
414         required by the user, it raises an exception. However if it is not
415         required by both parties, initialization silently succeeds, moving
416         on to the next step.
417         """
418         if self.wanted:
419             if ssl is None:
420                 if self.required:
421                     return defer.fail(TLSNotSupported())
422                 else:
423                     return defer.succeed(None)
424             else:
425                 pass
426         elif self.xmlstream.features[self.feature].required:
427             return defer.fail(TLSRequired())
428         else:
429             return defer.succeed(None)
430
431         self._deferred = defer.Deferred()
432         self.xmlstream.addOnetimeObserver("/proceed", self.onProceed)
433         self.xmlstream.addOnetimeObserver("/failure", self.onFailure)
434         self.xmlstream.send(domish.Element((NS_XMPP_TLS, "starttls")))
435         return self._deferred
436
437
438
439 class XmlStream(xmlstream.XmlStream):
440     """
441     XMPP XML Stream protocol handler.
442
443     @ivar version: XML stream version as a tuple (major, minor). Initially,
444                    this is set to the minimally supported version. Upon
445                    receiving the stream header of the peer, it is set to the
446                    minimum of that value and the version on the received
447                    header.
448     @type version: (L{int}, L{int})
449     @ivar namespace: default namespace URI for stream
450     @type namespace: L{str}
451     @ivar thisEntity: JID of this entity
452     @type thisEntity: L{JID}
453     @ivar otherEntity: JID of the peer entity
454     @type otherEntity: L{JID}
455     @ivar sid: session identifier
456     @type sid: L{str}
457     @ivar initiating: True if this is the initiating stream
458     @type initiating: L{bool}
459     @ivar features: map of (uri, name) to stream features element received from
460                     the receiving entity.
461     @type features: L{dict} of (L{str}, L{str}) to L{domish.Element}.
462     @ivar prefixes: map of URI to prefixes that are to appear on stream
463                     header.
464     @type prefixes: L{dict} of L{str} to L{str}
465     @ivar initializers: list of stream initializer objects
466     @type initializers: L{list} of objects that provide L{IInitializer}
467     @ivar authenticator: associated authenticator that uses C{initializers} to
468                          initialize the XML stream.
469     """
470
471     version = (1, 0)
472     namespace = 'invalid'
473     thisEntity = None
474     otherEntity = None
475     sid = None
476     initiating = True
477
478     _headerSent = False     # True if the stream header has been sent
479
480     def __init__(self, authenticator):
481         xmlstream.XmlStream.__init__(self)
482
483         self.prefixes = {NS_STREAMS: 'stream'}
484         self.authenticator = authenticator
485         self.initializers = []
486         self.features = {}
487
488         # Reset the authenticator
489         authenticator.associateWithStream(self)
490
491
492     def _callLater(self, *args, **kwargs):
493         from twisted.internet import reactor
494         return reactor.callLater(*args, **kwargs)
495
496
497     def reset(self):
498         """
499         Reset XML Stream.
500
501         Resets the XML Parser for incoming data. This is to be used after
502         successfully negotiating a new layer, e.g. TLS and SASL. Note that
503         registered event observers will continue to be in place.
504         """
505         self._headerSent = False
506         self._initializeStream()
507
508
509     def onStreamError(self, errelem):
510         """
511         Called when a stream:error element has been received.
512
513         Dispatches a L{STREAM_ERROR_EVENT} event with the error element to
514         allow for cleanup actions and drops the connection.
515
516         @param errelem: The received error element.
517         @type errelem: L{domish.Element}
518         """
519         self.dispatch(failure.Failure(error.exceptionFromStreamError(errelem)),
520                       STREAM_ERROR_EVENT)
521         self.transport.loseConnection()
522
523
524     def sendHeader(self):
525         """
526         Send stream header.
527         """
528         # set up optional extra namespaces
529         localPrefixes = {}
530         for uri, prefix in self.prefixes.iteritems():
531             if uri != NS_STREAMS:
532                 localPrefixes[prefix] = uri
533
534         rootElement = domish.Element((NS_STREAMS, 'stream'), self.namespace,
535                                      localPrefixes=localPrefixes)
536
537         if self.otherEntity:
538             rootElement['to'] = self.otherEntity.userhost()
539
540         if self.thisEntity:
541             rootElement['from'] = self.thisEntity.userhost()
542
543         if not self.initiating and self.sid:
544             rootElement['id'] = self.sid
545
546         if self.version >= (1, 0):
547             rootElement['version'] = "%d.%d" % self.version
548
549         self.send(rootElement.toXml(prefixes=self.prefixes, closeElement=0))
550         self._headerSent = True
551
552
553     def sendFooter(self):
554         """
555         Send stream footer.
556         """
557         self.send('</stream:stream>')
558
559
560     def sendStreamError(self, streamError):
561         """
562         Send stream level error.
563
564         If we are the receiving entity, and haven't sent the header yet,
565         we sent one first.
566
567         After sending the stream error, the stream is closed and the transport
568         connection dropped.
569
570         @param streamError: stream error instance
571         @type streamError: L{error.StreamError}
572         """
573         if not self._headerSent and not self.initiating:
574             self.sendHeader()
575
576         if self._headerSent:
577             self.send(streamError.getElement())
578             self.sendFooter()
579
580         self.transport.loseConnection()
581
582
583     def send(self, obj):
584         """
585         Send data over the stream.
586
587         This overrides L{xmlstream.Xmlstream.send} to use the default namespace
588         of the stream header when serializing L{domish.IElement}s. It is
589         assumed that if you pass an object that provides L{domish.IElement},
590         it represents a direct child of the stream's root element.
591         """
592         if domish.IElement.providedBy(obj):
593             obj = obj.toXml(prefixes=self.prefixes,
594                             defaultUri=self.namespace,
595                             prefixesInScope=self.prefixes.values())
596
597         xmlstream.XmlStream.send(self, obj)
598
599
600     def connectionMade(self):
601         """
602         Called when a connection is made.
603
604         Notifies the authenticator when a connection has been made.
605         """
606         xmlstream.XmlStream.connectionMade(self)
607         self.authenticator.connectionMade()
608
609
610     def onDocumentStart(self, rootElement):
611         """
612         Called when the stream header has been received.
613
614         Extracts the header's C{id} and C{version} attributes from the root
615         element. The C{id} attribute is stored in our C{sid} attribute and the
616         C{version} attribute is parsed and the minimum of the version we sent
617         and the parsed C{version} attribute is stored as a tuple (major, minor)
618         in this class' C{version} attribute. If no C{version} attribute was
619         present, we assume version 0.0.
620
621         If appropriate (we are the initiating stream and the minimum of our and
622         the other party's version is at least 1.0), a one-time observer is
623         registered for getting the stream features. The registered function is
624         C{onFeatures}.
625
626         Ultimately, the authenticator's C{streamStarted} method will be called.
627
628         @param rootElement: The root element.
629         @type rootElement: L{domish.Element}
630         """
631         xmlstream.XmlStream.onDocumentStart(self, rootElement)
632
633         # Setup observer for stream errors
634         self.addOnetimeObserver("/error[@xmlns='%s']" % NS_STREAMS,
635                                 self.onStreamError)
636
637         self.authenticator.streamStarted(rootElement)
638
639
640
641 class XmlStreamFactory(xmlstream.XmlStreamFactory):
642     """
643     Factory for Jabber XmlStream objects as a reconnecting client.
644
645     Note that this differs from L{xmlstream.XmlStreamFactory} in that
646     it generates Jabber specific L{XmlStream} instances that have
647     authenticators.
648     """
649
650     protocol = XmlStream
651
652     def __init__(self, authenticator):
653         xmlstream.XmlStreamFactory.__init__(self, authenticator)
654         self.authenticator = authenticator
655
656
657
658 class XmlStreamServerFactory(xmlstream.BootstrapMixin,
659                              protocol.ServerFactory):
660     """
661     Factory for Jabber XmlStream objects as a server.
662
663     @since: 8.2.
664     @ivar authenticatorFactory: Factory callable that takes no arguments, to
665                                 create a fresh authenticator to be associated
666                                 with the XmlStream.
667     """
668
669     protocol = XmlStream
670
671     def __init__(self, authenticatorFactory):
672         xmlstream.BootstrapMixin.__init__(self)
673         self.authenticatorFactory = authenticatorFactory
674
675
676     def buildProtocol(self, addr):
677         """
678         Create an instance of XmlStream.
679
680         A new authenticator instance will be created and passed to the new
681         XmlStream. Registered bootstrap event observers are installed as well.
682         """
683         authenticator = self.authenticatorFactory()
684         xs = self.protocol(authenticator)
685         xs.factory = self
686         self.installBootstraps(xs)
687         return xs
688
689
690
691 class TimeoutError(Exception):
692     """
693     Exception raised when no IQ response has been received before the
694     configured timeout.
695     """
696
697
698
699 def upgradeWithIQResponseTracker(xs):
700     """
701     Enhances an XmlStream for iq response tracking.
702
703     This makes an L{XmlStream} object provide L{IIQResponseTracker}. When a
704     response is an error iq stanza, the deferred has its errback invoked with a
705     failure that holds a L{StanzaException<error.StanzaException>} that is
706     easier to examine.
707     """
708     def callback(iq):
709         """
710         Handle iq response by firing associated deferred.
711         """
712         if getattr(iq, 'handled', False):
713             return
714
715         try:
716             d = xs.iqDeferreds[iq["id"]]
717         except KeyError:
718             pass
719         else:
720             del xs.iqDeferreds[iq["id"]]
721             iq.handled = True
722             if iq['type'] == 'error':
723                 d.errback(error.exceptionFromStanza(iq))
724             else:
725                 d.callback(iq)
726
727
728     def disconnected(_):
729         """
730         Make sure deferreds do not linger on after disconnect.
731
732         This errbacks all deferreds of iq's for which no response has been
733         received with a L{ConnectionLost} failure. Otherwise, the deferreds
734         will never be fired.
735         """
736         iqDeferreds = xs.iqDeferreds
737         xs.iqDeferreds = {}
738         for d in iqDeferreds.itervalues():
739             d.errback(ConnectionLost())
740
741     xs.iqDeferreds = {}
742     xs.iqDefaultTimeout = getattr(xs, 'iqDefaultTimeout', None)
743     xs.addObserver(xmlstream.STREAM_END_EVENT, disconnected)
744     xs.addObserver('/iq[@type="result"]', callback)
745     xs.addObserver('/iq[@type="error"]', callback)
746     directlyProvides(xs, ijabber.IIQResponseTracker)
747
748
749
750 class IQ(domish.Element):
751     """
752     Wrapper for an iq stanza.
753
754     Iq stanzas are used for communications with a request-response behaviour.
755     Each iq request is associated with an XML stream and has its own unique id
756     to be able to track the response.
757
758     @ivar timeout: if set, a timeout period after which the deferred returned
759                    by C{send} will have its errback called with a
760                    L{TimeoutError} failure.
761     @type timeout: C{float}
762     """
763
764     timeout = None
765
766     def __init__(self, xmlstream, stanzaType="set"):
767         """
768         @type xmlstream: L{xmlstream.XmlStream}
769         @param xmlstream: XmlStream to use for transmission of this IQ
770
771         @type stanzaType: L{str}
772         @param stanzaType: IQ type identifier ('get' or 'set')
773         """
774         domish.Element.__init__(self, (None, "iq"))
775         self.addUniqueId()
776         self["type"] = stanzaType
777         self._xmlstream = xmlstream
778
779
780     def send(self, to=None):
781         """
782         Send out this iq.
783
784         Returns a deferred that is fired when an iq response with the same id
785         is received. Result responses will be passed to the deferred callback.
786         Error responses will be transformed into a
787         L{StanzaError<error.StanzaError>} and result in the errback of the
788         deferred being invoked.
789
790         @rtype: L{defer.Deferred}
791         """
792         if to is not None:
793             self["to"] = to
794
795         if not ijabber.IIQResponseTracker.providedBy(self._xmlstream):
796             upgradeWithIQResponseTracker(self._xmlstream)
797
798         d = defer.Deferred()
799         self._xmlstream.iqDeferreds[self['id']] = d
800
801         timeout = self.timeout or self._xmlstream.iqDefaultTimeout
802         if timeout is not None:
803             def onTimeout():
804                 del self._xmlstream.iqDeferreds[self['id']]
805                 d.errback(TimeoutError("IQ timed out"))
806
807             call = self._xmlstream._callLater(timeout, onTimeout)
808
809             def cancelTimeout(result):
810                 if call.active():
811                     call.cancel()
812
813                 return result
814
815             d.addBoth(cancelTimeout)
816
817         self._xmlstream.send(self)
818         return d
819
820
821
822 def toResponse(stanza, stanzaType=None):
823     """
824     Create a response stanza from another stanza.
825
826     This takes the addressing and id attributes from a stanza to create a (new,
827     empty) response stanza. The addressing attributes are swapped and the id
828     copied. Optionally, the stanza type of the response can be specified.
829
830     @param stanza: the original stanza
831     @type stanza: L{domish.Element}
832     @param stanzaType: optional response stanza type
833     @type stanzaType: C{str}
834     @return: the response stanza.
835     @rtype: L{domish.Element}
836     """
837
838     toAddr = stanza.getAttribute('from')
839     fromAddr = stanza.getAttribute('to')
840     stanzaID = stanza.getAttribute('id')
841
842     response = domish.Element((None, stanza.name))
843     if toAddr:
844         response['to'] = toAddr
845     if fromAddr:
846         response['from'] = fromAddr
847     if stanzaID:
848         response['id'] = stanzaID
849     if stanzaType:
850         response['type'] = stanzaType
851
852     return response
853
854
855
856 class XMPPHandler(object):
857     """
858     XMPP protocol handler.
859
860     Classes derived from this class implement (part of) one or more XMPP
861     extension protocols, and are referred to as a subprotocol implementation.
862     """
863
864     implements(ijabber.IXMPPHandler)
865
866     def __init__(self):
867         self.parent = None
868         self.xmlstream = None
869
870
871     def setHandlerParent(self, parent):
872         self.parent = parent
873         self.parent.addHandler(self)
874
875
876     def disownHandlerParent(self, parent):
877         self.parent.removeHandler(self)
878         self.parent = None
879
880
881     def makeConnection(self, xs):
882         self.xmlstream = xs
883         self.connectionMade()
884
885
886     def connectionMade(self):
887         """
888         Called after a connection has been established.
889
890         Can be overridden to perform work before stream initialization.
891         """
892
893
894     def connectionInitialized(self):
895         """
896         The XML stream has been initialized.
897
898         Can be overridden to perform work after stream initialization, e.g. to
899         set up observers and start exchanging XML stanzas.
900         """
901
902
903     def connectionLost(self, reason):
904         """
905         The XML stream has been closed.
906
907         This method can be extended to inspect the C{reason} argument and
908         act on it.
909         """
910         self.xmlstream = None
911
912
913     def send(self, obj):
914         """
915         Send data over the managed XML stream.
916
917         @note: The stream manager maintains a queue for data sent using this
918                method when there is no current initialized XML stream. This
919                data is then sent as soon as a new stream has been established
920                and initialized. Subsequently, L{connectionInitialized} will be
921                called again. If this queueing is not desired, use C{send} on
922                C{self.xmlstream}.
923
924         @param obj: data to be sent over the XML stream. This is usually an
925                     object providing L{domish.IElement}, or serialized XML. See
926                     L{xmlstream.XmlStream} for details.
927         """
928         self.parent.send(obj)
929
930
931
932 class XMPPHandlerCollection(object):
933     """
934     Collection of XMPP subprotocol handlers.
935
936     This allows for grouping of subprotocol handlers, but is not an
937     L{XMPPHandler} itself, so this is not recursive.
938
939     @ivar handlers: List of protocol handlers.
940     @type handlers: L{list} of objects providing
941                       L{IXMPPHandler}
942     """
943
944     implements(ijabber.IXMPPHandlerCollection)
945
946     def __init__(self):
947         self.handlers = []
948
949
950     def __iter__(self):
951         """
952         Act as a container for handlers.
953         """
954         return iter(self.handlers)
955
956
957     def addHandler(self, handler):
958         """
959         Add protocol handler.
960
961         Protocol handlers are expected to provide L{ijabber.IXMPPHandler}.
962         """
963         self.handlers.append(handler)
964
965
966     def removeHandler(self, handler):
967         """
968         Remove protocol handler.
969         """
970         self.handlers.remove(handler)
971
972
973
974 class StreamManager(XMPPHandlerCollection):
975     """
976     Business logic representing a managed XMPP connection.
977
978     This maintains a single XMPP connection and provides facilities for packet
979     routing and transmission. Business logic modules are objects providing
980     L{ijabber.IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
981     using L{addHandler}.
982
983     @ivar xmlstream: currently managed XML stream
984     @type xmlstream: L{XmlStream}
985     @ivar logTraffic: if true, log all traffic.
986     @type logTraffic: L{bool}
987     @ivar _initialized: Whether the stream represented by L{xmlstream} has
988                         been initialized. This is used when caching outgoing
989                         stanzas.
990     @type _initialized: C{bool}
991     @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
992     @type _packetQueue: L{list}
993     """
994
995     logTraffic = False
996
997     def __init__(self, factory):
998         XMPPHandlerCollection.__init__(self)
999         self.xmlstream = None
1000         self._packetQueue = []
1001         self._initialized = False
1002
1003         factory.addBootstrap(STREAM_CONNECTED_EVENT, self._connected)
1004         factory.addBootstrap(STREAM_AUTHD_EVENT, self._authd)
1005         factory.addBootstrap(INIT_FAILED_EVENT, self.initializationFailed)
1006         factory.addBootstrap(STREAM_END_EVENT, self._disconnected)
1007         self.factory = factory
1008
1009
1010     def addHandler(self, handler):
1011         """
1012         Add protocol handler.
1013
1014         When an XML stream has already been established, the handler's
1015         C{connectionInitialized} will be called to get it up to speed.
1016         """
1017         XMPPHandlerCollection.addHandler(self, handler)
1018
1019         # get protocol handler up to speed when a connection has already
1020         # been established
1021         if self.xmlstream and self._initialized:
1022             handler.makeConnection(self.xmlstream)
1023             handler.connectionInitialized()
1024
1025
1026     def _connected(self, xs):
1027         """
1028         Called when the transport connection has been established.
1029
1030         Here we optionally set up traffic logging (depending on L{logTraffic})
1031         and call each handler's C{makeConnection} method with the L{XmlStream}
1032         instance.
1033         """
1034         def logDataIn(buf):
1035             log.msg("RECV: %r" % buf)
1036
1037         def logDataOut(buf):
1038             log.msg("SEND: %r" % buf)
1039
1040         if self.logTraffic:
1041             xs.rawDataInFn = logDataIn
1042             xs.rawDataOutFn = logDataOut
1043
1044         self.xmlstream = xs
1045
1046         for e in self:
1047             e.makeConnection(xs)
1048
1049
1050     def _authd(self, xs):
1051         """
1052         Called when the stream has been initialized.
1053
1054         Send out cached stanzas and call each handler's
1055         C{connectionInitialized} method.
1056         """
1057         # Flush all pending packets
1058         for p in self._packetQueue:
1059             xs.send(p)
1060         self._packetQueue = []
1061         self._initialized = True
1062
1063         # Notify all child services which implement
1064         # the IService interface
1065         for e in self:
1066             e.connectionInitialized()
1067
1068
1069     def initializationFailed(self, reason):
1070         """
1071         Called when stream initialization has failed.
1072
1073         Stream initialization has halted, with the reason indicated by
1074         C{reason}. It may be retried by calling the authenticator's
1075         C{initializeStream}. See the respective authenticators for details.
1076
1077         @param reason: A failure instance indicating why stream initialization
1078                        failed.
1079         @type reason: L{failure.Failure}
1080         """
1081
1082
1083     def _disconnected(self, _):
1084         """
1085         Called when the stream has been closed.
1086
1087         From this point on, the manager doesn't interact with the
1088         L{XmlStream} anymore and notifies each handler that the connection
1089         was lost by calling its C{connectionLost} method.
1090         """
1091         self.xmlstream = None
1092         self._initialized = False
1093
1094         # Notify all child services which implement
1095         # the IService interface
1096         for e in self:
1097             e.connectionLost(None)
1098
1099
1100     def send(self, obj):
1101         """
1102         Send data over the XML stream.
1103
1104         When there is no established XML stream, the data is queued and sent
1105         out when a new XML stream has been established and initialized.
1106
1107         @param obj: data to be sent over the XML stream. See
1108                     L{xmlstream.XmlStream.send} for details.
1109         """
1110         if self._initialized:
1111             self.xmlstream.send(obj)
1112         else:
1113             self._packetQueue.append(obj)
Note: See TracBrowser for help on using the browser.