| 1 |
|
|---|
| 2 |
|
|---|
| 3 |
|
|---|
| 4 |
|
|---|
| 5 |
|
|---|
| 6 |
""" |
|---|
| 7 |
XMPP XML Streams |
|---|
| 8 |
|
|---|
| 9 |
Building blocks for setting up XML Streams, including helping classes for |
|---|
| 10 |
doing authentication on either client or server side, and working with XML |
|---|
| 11 |
Stanzas. |
|---|
| 12 |
""" |
|---|
| 13 |
|
|---|
| 14 |
from zope.interface import directlyProvides, implements |
|---|
| 15 |
|
|---|
| 16 |
from twisted.internet import defer, protocol |
|---|
| 17 |
from twisted.internet.error import ConnectionLost |
|---|
| 18 |
from twisted.python import failure, log, randbytes |
|---|
| 19 |
from twisted.words.protocols.jabber import error, ijabber, jid |
|---|
| 20 |
from twisted.words.xish import domish, xmlstream |
|---|
| 21 |
from twisted.words.xish.xmlstream import STREAM_CONNECTED_EVENT |
|---|
| 22 |
from twisted.words.xish.xmlstream import STREAM_START_EVENT |
|---|
| 23 |
from twisted.words.xish.xmlstream import STREAM_END_EVENT |
|---|
| 24 |
from twisted.words.xish.xmlstream import STREAM_ERROR_EVENT |
|---|
| 25 |
|
|---|
| 26 |
try: |
|---|
| 27 |
from twisted.internet import ssl |
|---|
| 28 |
except ImportError: |
|---|
| 29 |
ssl = None |
|---|
| 30 |
if ssl and not ssl.supported: |
|---|
| 31 |
ssl = None |
|---|
| 32 |
|
|---|
| 33 |
STREAM_AUTHD_EVENT = intern("//event/stream/authd") |
|---|
| 34 |
INIT_FAILED_EVENT = intern("//event/xmpp/initfailed") |
|---|
| 35 |
|
|---|
| 36 |
NS_STREAMS = 'http://etherx.jabber.org/streams' |
|---|
| 37 |
NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls' |
|---|
| 38 |
|
|---|
| 39 |
Reset = object() |
|---|
| 40 |
|
|---|
| 41 |
def hashPassword(sid, password): |
|---|
| 42 |
""" |
|---|
| 43 |
Create a SHA1-digest string of a session identifier and password. |
|---|
| 44 |
""" |
|---|
| 45 |
from twisted.python.hashlib import sha1 |
|---|
| 46 |
return sha1("%s%s" % (sid, password)).hexdigest() |
|---|
| 47 |
|
|---|
| 48 |
|
|---|
| 49 |
|
|---|
| 50 |
class Authenticator: |
|---|
| 51 |
""" |
|---|
| 52 |
Base class for business logic of initializing an XmlStream |
|---|
| 53 |
|
|---|
| 54 |
Subclass this object to enable an XmlStream to initialize and authenticate |
|---|
| 55 |
to different types of stream hosts (such as clients, components, etc.). |
|---|
| 56 |
|
|---|
| 57 |
Rules: |
|---|
| 58 |
1. The Authenticator MUST dispatch a L{STREAM_AUTHD_EVENT} when the |
|---|
| 59 |
stream has been completely initialized. |
|---|
| 60 |
2. The Authenticator SHOULD reset all state information when |
|---|
| 61 |
L{associateWithStream} is called. |
|---|
| 62 |
3. The Authenticator SHOULD override L{streamStarted}, and start |
|---|
| 63 |
initialization there. |
|---|
| 64 |
|
|---|
| 65 |
@type xmlstream: L{XmlStream} |
|---|
| 66 |
@ivar xmlstream: The XmlStream that needs authentication |
|---|
| 67 |
|
|---|
| 68 |
@note: the term authenticator is historical. Authenticators perform |
|---|
| 69 |
all steps required to prepare the stream for the exchange |
|---|
| 70 |
of XML stanzas. |
|---|
| 71 |
""" |
|---|
| 72 |
|
|---|
| 73 |
def __init__(self): |
|---|
| 74 |
self.xmlstream = None |
|---|
| 75 |
|
|---|
| 76 |
|
|---|
| 77 |
def connectionMade(self): |
|---|
| 78 |
""" |
|---|
| 79 |
Called by the XmlStream when the underlying socket connection is |
|---|
| 80 |
in place. |
|---|
| 81 |
|
|---|
| 82 |
This allows the Authenticator to send an initial root element, if it's |
|---|
| 83 |
connecting, or wait for an inbound root from the peer if it's accepting |
|---|
| 84 |
the connection. |
|---|
| 85 |
|
|---|
| 86 |
Subclasses can use self.xmlstream.send() to send any initial data to |
|---|
| 87 |
the peer. |
|---|
| 88 |
""" |
|---|
| 89 |
|
|---|
| 90 |
|
|---|
| 91 |
def streamStarted(self, rootElement): |
|---|
| 92 |
""" |
|---|
| 93 |
Called by the XmlStream when the stream has started. |
|---|
| 94 |
|
|---|
| 95 |
A stream is considered to have started when the start tag of the root |
|---|
| 96 |
element has been received. |
|---|
| 97 |
|
|---|
| 98 |
This examines L{rootElement} to see if there is a version attribute. |
|---|
| 99 |
If absent, C{0.0} is assumed per RFC 3920. Subsequently, the |
|---|
| 100 |
minimum of the version from the received stream header and the |
|---|
| 101 |
value stored in L{xmlstream} is taken and put back in {xmlstream}. |
|---|
| 102 |
|
|---|
| 103 |
Extensions of this method can extract more information from the |
|---|
| 104 |
stream header and perform checks on them, optionally sending |
|---|
| 105 |
stream errors and closing the stream. |
|---|
| 106 |
""" |
|---|
| 107 |
if rootElement.hasAttribute("version"): |
|---|
| 108 |
version = rootElement["version"].split(".") |
|---|
| 109 |
try: |
|---|
| 110 |
version = (int(version[0]), int(version[1])) |
|---|
| 111 |
except (IndexError, ValueError): |
|---|
| 112 |
version = (0, 0) |
|---|
| 113 |
else: |
|---|
| 114 |
version = (0, 0) |
|---|
| 115 |
|
|---|
| 116 |
self.xmlstream.version = min(self.xmlstream.version, version) |
|---|
| 117 |
|
|---|
| 118 |
|
|---|
| 119 |
def associateWithStream(self, xmlstream): |
|---|
| 120 |
""" |
|---|
| 121 |
Called by the XmlStreamFactory when a connection has been made |
|---|
| 122 |
to the requested peer, and an XmlStream object has been |
|---|
| 123 |
instantiated. |
|---|
| 124 |
|
|---|
| 125 |
The default implementation just saves a handle to the new |
|---|
| 126 |
XmlStream. |
|---|
| 127 |
|
|---|
| 128 |
@type xmlstream: L{XmlStream} |
|---|
| 129 |
@param xmlstream: The XmlStream that will be passing events to this |
|---|
| 130 |
Authenticator. |
|---|
| 131 |
|
|---|
| 132 |
""" |
|---|
| 133 |
self.xmlstream = xmlstream |
|---|
| 134 |
|
|---|
| 135 |
|
|---|
| 136 |
|
|---|
| 137 |
class ConnectAuthenticator(Authenticator): |
|---|
| 138 |
""" |
|---|
| 139 |
Authenticator for initiating entities. |
|---|
| 140 |
""" |
|---|
| 141 |
|
|---|
| 142 |
namespace = None |
|---|
| 143 |
|
|---|
| 144 |
def __init__(self, otherHost): |
|---|
| 145 |
self.otherHost = otherHost |
|---|
| 146 |
|
|---|
| 147 |
|
|---|
| 148 |
def connectionMade(self): |
|---|
| 149 |
self.xmlstream.namespace = self.namespace |
|---|
| 150 |
self.xmlstream.otherEntity = jid.internJID(self.otherHost) |
|---|
| 151 |
self.xmlstream.sendHeader() |
|---|
| 152 |
|
|---|
| 153 |
|
|---|
| 154 |
def initializeStream(self): |
|---|
| 155 |
""" |
|---|
| 156 |
Perform stream initialization procedures. |
|---|
| 157 |
|
|---|
| 158 |
An L{XmlStream} holds a list of initializer objects in its |
|---|
| 159 |
C{initializers} attribute. This method calls these initializers in |
|---|
| 160 |
order and dispatches the C{STREAM_AUTHD_EVENT} event when the list has |
|---|
| 161 |
been successfully processed. Otherwise it dispatches the |
|---|
| 162 |
C{INIT_FAILED_EVENT} event with the failure. |
|---|
| 163 |
|
|---|
| 164 |
Initializers may return the special L{Reset} object to halt the |
|---|
| 165 |
initialization processing. It signals that the current initializer was |
|---|
| 166 |
successfully processed, but that the XML Stream has been reset. An |
|---|
| 167 |
example is the TLSInitiatingInitializer. |
|---|
| 168 |
""" |
|---|
| 169 |
|
|---|
| 170 |
def remove_first(result): |
|---|
| 171 |
self.xmlstream.initializers.pop(0) |
|---|
| 172 |
|
|---|
| 173 |
return result |
|---|
| 174 |
|
|---|
| 175 |
def do_next(result): |
|---|
| 176 |
""" |
|---|
| 177 |
Take the first initializer and process it. |
|---|
| 178 |
|
|---|
| 179 |
On success, the initializer is removed from the list and |
|---|
| 180 |
then next initializer will be tried. |
|---|
| 181 |
""" |
|---|
| 182 |
|
|---|
| 183 |
if result is Reset: |
|---|
| 184 |
return None |
|---|
| 185 |
|
|---|
| 186 |
try: |
|---|
| 187 |
init = self.xmlstream.initializers[0] |
|---|
| 188 |
except IndexError: |
|---|
| 189 |
self.xmlstream.dispatch(self.xmlstream, STREAM_AUTHD_EVENT) |
|---|
| 190 |
return None |
|---|
| 191 |
else: |
|---|
| 192 |
d = defer.maybeDeferred(init.initialize) |
|---|
| 193 |
d.addCallback(remove_first) |
|---|
| 194 |
d.addCallback(do_next) |
|---|
| 195 |
return d |
|---|
| 196 |
|
|---|
| 197 |
d = defer.succeed(None) |
|---|
| 198 |
d.addCallback(do_next) |
|---|
| 199 |
d.addErrback(self.xmlstream.dispatch, INIT_FAILED_EVENT) |
|---|
| 200 |
|
|---|
| 201 |
|
|---|
| 202 |
def streamStarted(self, rootElement): |
|---|
| 203 |
""" |
|---|
| 204 |
Called by the XmlStream when the stream has started. |
|---|
| 205 |
|
|---|
| 206 |
This extends L{Authenticator.streamStarted} to extract further stream |
|---|
| 207 |
headers from L{rootElement}, optionally wait for stream features being |
|---|
| 208 |
received and then call C{initializeStream}. |
|---|
| 209 |
""" |
|---|
| 210 |
|
|---|
| 211 |
Authenticator.streamStarted(self, rootElement) |
|---|
| 212 |
|
|---|
| 213 |
self.xmlstream.sid = rootElement.getAttribute("id") |
|---|
| 214 |
|
|---|
| 215 |
if rootElement.hasAttribute("from"): |
|---|
| 216 |
self.xmlstream.otherEntity = jid.internJID(rootElement["from"]) |
|---|
| 217 |
|
|---|
| 218 |
|
|---|
| 219 |
if self.xmlstream.version >= (1, 0): |
|---|
| 220 |
def onFeatures(element): |
|---|
| 221 |
features = {} |
|---|
| 222 |
for feature in element.elements(): |
|---|
| 223 |
features[(feature.uri, feature.name)] = feature |
|---|
| 224 |
|
|---|
| 225 |
self.xmlstream.features = features |
|---|
| 226 |
self.initializeStream() |
|---|
| 227 |
|
|---|
| 228 |
self.xmlstream.addOnetimeObserver('/features[@xmlns="%s"]' % |
|---|
| 229 |
NS_STREAMS, |
|---|
| 230 |
onFeatures) |
|---|
| 231 |
else: |
|---|
| 232 |
self.initializeStream() |
|---|
| 233 |
|
|---|
| 234 |
|
|---|
| 235 |
|
|---|
| 236 |
class ListenAuthenticator(Authenticator): |
|---|
| 237 |
""" |
|---|
| 238 |
Authenticator for receiving entities. |
|---|
| 239 |
""" |
|---|
| 240 |
|
|---|
| 241 |
namespace = None |
|---|
| 242 |
|
|---|
| 243 |
def associateWithStream(self, xmlstream): |
|---|
| 244 |
""" |
|---|
| 245 |
Called by the XmlStreamFactory when a connection has been made. |
|---|
| 246 |
|
|---|
| 247 |
Extend L{Authenticator.associateWithStream} to set the L{XmlStream} |
|---|
| 248 |
to be non-initiating. |
|---|
| 249 |
""" |
|---|
| 250 |
Authenticator.associateWithStream(self, xmlstream) |
|---|
| 251 |
self.xmlstream.initiating = False |
|---|
| 252 |
|
|---|
| 253 |
|
|---|
| 254 |
def streamStarted(self, rootElement): |
|---|
| 255 |
""" |
|---|
| 256 |
Called by the XmlStream when the stream has started. |
|---|
| 257 |
|
|---|
| 258 |
This extends L{Authenticator.streamStarted} to extract further |
|---|
| 259 |
information from the stream headers from L{rootElement}. |
|---|
| 260 |
""" |
|---|
| 261 |
Authenticator.streamStarted(self, rootElement) |
|---|
| 262 |
|
|---|
| 263 |
self.xmlstream.namespace = rootElement.defaultUri |
|---|
| 264 |
|
|---|
| 265 |
if rootElement.hasAttribute("to"): |
|---|
| 266 |
self.xmlstream.thisEntity = jid.internJID(rootElement["to"]) |
|---|
| 267 |
|
|---|
| 268 |
self.xmlstream.prefixes = {} |
|---|
| 269 |
for prefix, uri in rootElement.localPrefixes.iteritems(): |
|---|
| 270 |
self.xmlstream.prefixes[uri] = prefix |
|---|
| 271 |
|
|---|
| 272 |
self.xmlstream.sid = randbytes.secureRandom(8).encode('hex') |
|---|
| 273 |
|
|---|
| 274 |
|
|---|
| 275 |
|
|---|
| 276 |
class FeatureNotAdvertized(Exception): |
|---|
| 277 |
""" |
|---|
| 278 |
Exception indicating a stream feature was not advertized, while required by |
|---|
| 279 |
the initiating entity. |
|---|
| 280 |
""" |
|---|
| 281 |
|
|---|
| 282 |
|
|---|
| 283 |
|
|---|
| 284 |
class BaseFeatureInitiatingInitializer(object): |
|---|
| 285 |
""" |
|---|
| 286 |
Base class for initializers with a stream feature. |
|---|
| 287 |
|
|---|
| 288 |
This assumes the associated XmlStream represents the initiating entity |
|---|
| 289 |
of the connection. |
|---|
| 290 |
|
|---|
| 291 |
@cvar feature: tuple of (uri, name) of the stream feature root element. |
|---|
| 292 |
@type feature: tuple of (L{str}, L{str}) |
|---|
| 293 |
@ivar required: whether the stream feature is required to be advertized |
|---|
| 294 |
by the receiving entity. |
|---|
| 295 |
@type required: L{bool} |
|---|
| 296 |
""" |
|---|
| 297 |
|
|---|
| 298 |
implements(ijabber.IInitiatingInitializer) |
|---|
| 299 |
|
|---|
| 300 |
feature = None |
|---|
| 301 |
required = False |
|---|
| 302 |
|
|---|
| 303 |
def __init__(self, xs): |
|---|
| 304 |
self.xmlstream = xs |
|---|
| 305 |
|
|---|
| 306 |
|
|---|
| 307 |
def initialize(self): |
|---|
| 308 |
""" |
|---|
| 309 |
Initiate the initialization. |
|---|
| 310 |
|
|---|
| 311 |
Checks if the receiving entity advertizes the stream feature. If it |
|---|
| 312 |
does, the initialization is started. If it is not advertized, and the |
|---|
| 313 |
C{required} instance variable is L{True}, it raises |
|---|
| 314 |
L{FeatureNotAdvertized}. Otherwise, the initialization silently |
|---|
| 315 |
succeeds. |
|---|
| 316 |
""" |
|---|
| 317 |
|
|---|
| 318 |
if self.feature in self.xmlstream.features: |
|---|
| 319 |
return self.start() |
|---|
| 320 |
elif self.required: |
|---|
| 321 |
raise FeatureNotAdvertized |
|---|
| 322 |
else: |
|---|
| 323 |
return None |
|---|
| 324 |
|
|---|
| 325 |
|
|---|
| 326 |
def start(self): |
|---|
| 327 |
""" |
|---|
| 328 |
Start the actual initialization. |
|---|
| 329 |
|
|---|
| 330 |
May return a deferred for asynchronous initialization. |
|---|
| 331 |
""" |
|---|
| 332 |
|
|---|
| 333 |
|
|---|
| 334 |
|
|---|
| 335 |
class TLSError(Exception): |
|---|
| 336 |
""" |
|---|
| 337 |
TLS base exception. |
|---|
| 338 |
""" |
|---|
| 339 |
|
|---|
| 340 |
|
|---|
| 341 |
|
|---|
| 342 |
class TLSFailed(TLSError): |
|---|
| 343 |
""" |
|---|
| 344 |
Exception indicating failed TLS negotiation |
|---|
| 345 |
""" |
|---|
| 346 |
|
|---|
| 347 |
|
|---|
| 348 |
|
|---|
| 349 |
class TLSRequired(TLSError): |
|---|
| 350 |
""" |
|---|
| 351 |
Exception indicating required TLS negotiation. |
|---|
| 352 |
|
|---|
| 353 |
This exception is raised when the receiving entity requires TLS |
|---|
| 354 |
negotiation and the initiating does not desire to negotiate TLS. |
|---|
| 355 |
""" |
|---|
| 356 |
|
|---|
| 357 |
|
|---|
| 358 |
|
|---|
| 359 |
class TLSNotSupported(TLSError): |
|---|
| 360 |
""" |
|---|
| 361 |
Exception indicating missing TLS support. |
|---|
| 362 |
|
|---|
| 363 |
This exception is raised when the initiating entity wants and requires to |
|---|
| 364 |
negotiate TLS when the OpenSSL library is not available. |
|---|
| 365 |
""" |
|---|
| 366 |
|
|---|
| 367 |
|
|---|
| 368 |
|
|---|
| 369 |
class TLSInitiatingInitializer(BaseFeatureInitiatingInitializer): |
|---|
| 370 |
""" |
|---|
| 371 |
TLS stream initializer for the initiating entity. |
|---|
| 372 |
|
|---|
| 373 |
It is strongly required to include this initializer in the list of |
|---|
| 374 |
initializers for an XMPP stream. By default it will try to negotiate TLS. |
|---|
| 375 |
An XMPP server may indicate that TLS is required. If TLS is not desired, |
|---|
| 376 |
set the C{wanted} attribute to False instead of removing it from the list |
|---|
| 377 |
of initializers, so a proper exception L{TLSRequired} can be raised. |
|---|
| 378 |
|
|---|
| 379 |
@cvar wanted: indicates if TLS negotiation is wanted. |
|---|
| 380 |
@type wanted: L{bool} |
|---|
| 381 |
""" |
|---|
| 382 |
|
|---|
| 383 |
feature = (NS_XMPP_TLS, 'starttls') |
|---|
| 384 |
wanted = True |
|---|
| 385 |
_deferred = None |
|---|
| 386 |
|
|---|
| 387 |
def onProceed(self, obj): |
|---|
| 388 |
""" |
|---|
| 389 |
Proceed with TLS negotiation and reset the XML stream. |
|---|
| 390 |
""" |
|---|
| 391 |
|
|---|
| 392 |
self.xmlstream.removeObserver('/failure', self.onFailure) |
|---|
| 393 |
ctx = ssl.CertificateOptions() |
|---|
| 394 |
self.xmlstream.transport.startTLS(ctx) |
|---|
| 395 |
self.xmlstream.reset() |
|---|
| 396 |
self.xmlstream.sendHeader() |
|---|
| 397 |
self._deferred.callback(Reset) |
|---|
| 398 |
|
|---|
| 399 |
|
|---|
| 400 |
def onFailure(self, obj): |
|---|
| 401 |
self.xmlstream.removeObserver('/proceed', self.onProceed) |
|---|
| 402 |
self._deferred.errback(TLSFailed()) |
|---|
| 403 |
|
|---|
| 404 |
|
|---|
| 405 |
def start(self): |
|---|
| 406 |
""" |
|---|
| 407 |
Start TLS negotiation. |
|---|
| 408 |
|
|---|
| 409 |
This checks if the receiving entity requires TLS, the SSL library is |
|---|
| 410 |
available and uses the C{required} and C{wanted} instance variables to |
|---|
| 411 |
determine what to do in the various different cases. |
|---|
| 412 |
|
|---|
| 413 |
For example, if the SSL library is not available, and wanted and |
|---|
| 414 |
required by the user, it raises an exception. However if it is not |
|---|
| 415 |
required by both parties, initialization silently succeeds, moving |
|---|
| 416 |
on to the next step. |
|---|
| 417 |
""" |
|---|
| 418 |
if self.wanted: |
|---|
| 419 |
if ssl is None: |
|---|
| 420 |
if self.required: |
|---|
| 421 |
return defer.fail(TLSNotSupported()) |
|---|
| 422 |
else: |
|---|
| 423 |
return defer.succeed(None) |
|---|
| 424 |
else: |
|---|
| 425 |
pass |
|---|
| 426 |
elif self.xmlstream.features[self.feature].required: |
|---|
| 427 |
return defer.fail(TLSRequired()) |
|---|
| 428 |
else: |
|---|
| 429 |
return defer.succeed(None) |
|---|
| 430 |
|
|---|
| 431 |
self._deferred = defer.Deferred() |
|---|
| 432 |
self.xmlstream.addOnetimeObserver("/proceed", self.onProceed) |
|---|
| 433 |
self.xmlstream.addOnetimeObserver("/failure", self.onFailure) |
|---|
| 434 |
self.xmlstream.send(domish.Element((NS_XMPP_TLS, "starttls"))) |
|---|
| 435 |
return self._deferred |
|---|
| 436 |
|
|---|
| 437 |
|
|---|
| 438 |
|
|---|
| 439 |
class XmlStream(xmlstream.XmlStream): |
|---|
| 440 |
""" |
|---|
| 441 |
XMPP XML Stream protocol handler. |
|---|
| 442 |
|
|---|
| 443 |
@ivar version: XML stream version as a tuple (major, minor). Initially, |
|---|
| 444 |
this is set to the minimally supported version. Upon |
|---|
| 445 |
receiving the stream header of the peer, it is set to the |
|---|
| 446 |
minimum of that value and the version on the received |
|---|
| 447 |
header. |
|---|
| 448 |
@type version: (L{int}, L{int}) |
|---|
| 449 |
@ivar namespace: default namespace URI for stream |
|---|
| 450 |
@type namespace: L{str} |
|---|
| 451 |
@ivar thisEntity: JID of this entity |
|---|
| 452 |
@type thisEntity: L{JID} |
|---|
| 453 |
@ivar otherEntity: JID of the peer entity |
|---|
| 454 |
@type otherEntity: L{JID} |
|---|
| 455 |
@ivar sid: session identifier |
|---|
| 456 |
@type sid: L{str} |
|---|
| 457 |
@ivar initiating: True if this is the initiating stream |
|---|
| 458 |
@type initiating: L{bool} |
|---|
| 459 |
@ivar features: map of (uri, name) to stream features element received from |
|---|
| 460 |
the receiving entity. |
|---|
| 461 |
@type features: L{dict} of (L{str}, L{str}) to L{domish.Element}. |
|---|
| 462 |
@ivar prefixes: map of URI to prefixes that are to appear on stream |
|---|
| 463 |
header. |
|---|
| 464 |
@type prefixes: L{dict} of L{str} to L{str} |
|---|
| 465 |
@ivar initializers: list of stream initializer objects |
|---|
| 466 |
@type initializers: L{list} of objects that provide L{IInitializer} |
|---|
| 467 |
@ivar authenticator: associated authenticator that uses C{initializers} to |
|---|
| 468 |
initialize the XML stream. |
|---|
| 469 |
""" |
|---|
| 470 |
|
|---|
| 471 |
version = (1, 0) |
|---|
| 472 |
namespace = 'invalid' |
|---|
| 473 |
thisEntity = None |
|---|
| 474 |
otherEntity = None |
|---|
| 475 |
sid = None |
|---|
| 476 |
initiating = True |
|---|
| 477 |
|
|---|
| 478 |
_headerSent = False |
|---|
| 479 |
|
|---|
| 480 |
def __init__(self, authenticator): |
|---|
| 481 |
xmlstream.XmlStream.__init__(self) |
|---|
| 482 |
|
|---|
| 483 |
self.prefixes = {NS_STREAMS: 'stream'} |
|---|
| 484 |
self.authenticator = authenticator |
|---|
| 485 |
self.initializers = [] |
|---|
| 486 |
self.features = {} |
|---|
| 487 |
|
|---|
| 488 |
|
|---|
| 489 |
authenticator.associateWithStream(self) |
|---|
| 490 |
|
|---|
| 491 |
|
|---|
| 492 |
def _callLater(self, *args, **kwargs): |
|---|
| 493 |
from twisted.internet import reactor |
|---|
| 494 |
return reactor.callLater(*args, **kwargs) |
|---|
| 495 |
|
|---|
| 496 |
|
|---|
| 497 |
def reset(self): |
|---|
| 498 |
""" |
|---|
| 499 |
Reset XML Stream. |
|---|
| 500 |
|
|---|
| 501 |
Resets the XML Parser for incoming data. This is to be used after |
|---|
| 502 |
successfully negotiating a new layer, e.g. TLS and SASL. Note that |
|---|
| 503 |
registered event observers will continue to be in place. |
|---|
| 504 |
""" |
|---|
| 505 |
self._headerSent = False |
|---|
| 506 |
self._initializeStream() |
|---|
| 507 |
|
|---|
| 508 |
|
|---|
| 509 |
def onStreamError(self, errelem): |
|---|
| 510 |
""" |
|---|
| 511 |
Called when a stream:error element has been received. |
|---|
| 512 |
|
|---|
| 513 |
Dispatches a L{STREAM_ERROR_EVENT} event with the error element to |
|---|
| 514 |
allow for cleanup actions and drops the connection. |
|---|
| 515 |
|
|---|
| 516 |
@param errelem: The received error element. |
|---|
| 517 |
@type errelem: L{domish.Element} |
|---|
| 518 |
""" |
|---|
| 519 |
self.dispatch(failure.Failure(error.exceptionFromStreamError(errelem)), |
|---|
| 520 |
STREAM_ERROR_EVENT) |
|---|
| 521 |
self.transport.loseConnection() |
|---|
| 522 |
|
|---|
| 523 |
|
|---|
| 524 |
def sendHeader(self): |
|---|
| 525 |
""" |
|---|
| 526 |
Send stream header. |
|---|
| 527 |
""" |
|---|
| 528 |
|
|---|
| 529 |
localPrefixes = {} |
|---|
| 530 |
for uri, prefix in self.prefixes.iteritems(): |
|---|
| 531 |
if uri != NS_STREAMS: |
|---|
| 532 |
localPrefixes[prefix] = uri |
|---|
| 533 |
|
|---|
| 534 |
rootElement = domish.Element((NS_STREAMS, 'stream'), self.namespace, |
|---|
| 535 |
localPrefixes=localPrefixes) |
|---|
| 536 |
|
|---|
| 537 |
if self.otherEntity: |
|---|
| 538 |
rootElement['to'] = self.otherEntity.userhost() |
|---|
| 539 |
|
|---|
| 540 |
if self.thisEntity: |
|---|
| 541 |
rootElement['from'] = self.thisEntity.userhost() |
|---|
| 542 |
|
|---|
| 543 |
if not self.initiating and self.sid: |
|---|
| 544 |
rootElement['id'] = self.sid |
|---|
| 545 |
|
|---|
| 546 |
if self.version >= (1, 0): |
|---|
| 547 |
rootElement['version'] = "%d.%d" % self.version |
|---|
| 548 |
|
|---|
| 549 |
self.send(rootElement.toXml(prefixes=self.prefixes, closeElement=0)) |
|---|
| 550 |
self._headerSent = True |
|---|
| 551 |
|
|---|
| 552 |
|
|---|
| 553 |
def sendFooter(self): |
|---|
| 554 |
""" |
|---|
| 555 |
Send stream footer. |
|---|
| 556 |
""" |
|---|
| 557 |
self.send('</stream:stream>') |
|---|
| 558 |
|
|---|
| 559 |
|
|---|
| 560 |
def sendStreamError(self, streamError): |
|---|
| 561 |
""" |
|---|
| 562 |
Send stream level error. |
|---|
| 563 |
|
|---|
| 564 |
If we are the receiving entity, and haven't sent the header yet, |
|---|
| 565 |
we sent one first. |
|---|
| 566 |
|
|---|
| 567 |
After sending the stream error, the stream is closed and the transport |
|---|
| 568 |
connection dropped. |
|---|
| 569 |
|
|---|
| 570 |
@param streamError: stream error instance |
|---|
| 571 |
@type streamError: L{error.StreamError} |
|---|
| 572 |
""" |
|---|
| 573 |
if not self._headerSent and not self.initiating: |
|---|
| 574 |
self.sendHeader() |
|---|
| 575 |
|
|---|
| 576 |
if self._headerSent: |
|---|
| 577 |
self.send(streamError.getElement()) |
|---|
| 578 |
self.sendFooter() |
|---|
| 579 |
|
|---|
| 580 |
self.transport.loseConnection() |
|---|
| 581 |
|
|---|
| 582 |
|
|---|
| 583 |
def send(self, obj): |
|---|
| 584 |
""" |
|---|
| 585 |
Send data over the stream. |
|---|
| 586 |
|
|---|
| 587 |
This overrides L{xmlstream.Xmlstream.send} to use the default namespace |
|---|
| 588 |
of the stream header when serializing L{domish.IElement}s. It is |
|---|
| 589 |
assumed that if you pass an object that provides L{domish.IElement}, |
|---|
| 590 |
it represents a direct child of the stream's root element. |
|---|
| 591 |
""" |
|---|
| 592 |
if domish.IElement.providedBy(obj): |
|---|
| 593 |
obj = obj.toXml(prefixes=self.prefixes, |
|---|
| 594 |
defaultUri=self.namespace, |
|---|
| 595 |
prefixesInScope=self.prefixes.values()) |
|---|
| 596 |
|
|---|
| 597 |
xmlstream.XmlStream.send(self, obj) |
|---|
| 598 |
|
|---|
| 599 |
|
|---|
| 600 |
def connectionMade(self): |
|---|
| 601 |
""" |
|---|
| 602 |
Called when a connection is made. |
|---|
| 603 |
|
|---|
| 604 |
Notifies the authenticator when a connection has been made. |
|---|
| 605 |
""" |
|---|
| 606 |
xmlstream.XmlStream.connectionMade(self) |
|---|
| 607 |
self.authenticator.connectionMade() |
|---|
| 608 |
|
|---|
| 609 |
|
|---|
| 610 |
def onDocumentStart(self, rootElement): |
|---|
| 611 |
""" |
|---|
| 612 |
Called when the stream header has been received. |
|---|
| 613 |
|
|---|
| 614 |
Extracts the header's C{id} and C{version} attributes from the root |
|---|
| 615 |
element. The C{id} attribute is stored in our C{sid} attribute and the |
|---|
| 616 |
C{version} attribute is parsed and the minimum of the version we sent |
|---|
| 617 |
and the parsed C{version} attribute is stored as a tuple (major, minor) |
|---|
| 618 |
in this class' C{version} attribute. If no C{version} attribute was |
|---|
| 619 |
present, we assume version 0.0. |
|---|
| 620 |
|
|---|
| 621 |
If appropriate (we are the initiating stream and the minimum of our and |
|---|
| 622 |
the other party's version is at least 1.0), a one-time observer is |
|---|
| 623 |
registered for getting the stream features. The registered function is |
|---|
| 624 |
C{onFeatures}. |
|---|
| 625 |
|
|---|
| 626 |
Ultimately, the authenticator's C{streamStarted} method will be called. |
|---|
| 627 |
|
|---|
| 628 |
@param rootElement: The root element. |
|---|
| 629 |
@type rootElement: L{domish.Element} |
|---|
| 630 |
""" |
|---|
| 631 |
xmlstream.XmlStream.onDocumentStart(self, rootElement) |
|---|
| 632 |
|
|---|
| 633 |
|
|---|
| 634 |
self.addOnetimeObserver("/error[@xmlns='%s']" % NS_STREAMS, |
|---|
| 635 |
self.onStreamError) |
|---|
| 636 |
|
|---|
| 637 |
self.authenticator.streamStarted(rootElement) |
|---|
| 638 |
|
|---|
| 639 |
|
|---|
| 640 |
|
|---|
| 641 |
class XmlStreamFactory(xmlstream.XmlStreamFactory): |
|---|
| 642 |
""" |
|---|
| 643 |
Factory for Jabber XmlStream objects as a reconnecting client. |
|---|
| 644 |
|
|---|
| 645 |
Note that this differs from L{xmlstream.XmlStreamFactory} in that |
|---|
| 646 |
it generates Jabber specific L{XmlStream} instances that have |
|---|
| 647 |
authenticators. |
|---|
| 648 |
""" |
|---|
| 649 |
|
|---|
| 650 |
protocol = XmlStream |
|---|
| 651 |
|
|---|
| 652 |
def __init__(self, authenticator): |
|---|
| 653 |
xmlstream.XmlStreamFactory.__init__(self, authenticator) |
|---|
| 654 |
self.authenticator = authenticator |
|---|
| 655 |
|
|---|
| 656 |
|
|---|
| 657 |
|
|---|
| 658 |
class XmlStreamServerFactory(xmlstream.BootstrapMixin, |
|---|
| 659 |
protocol.ServerFactory): |
|---|
| 660 |
""" |
|---|
| 661 |
Factory for Jabber XmlStream objects as a server. |
|---|
| 662 |
|
|---|
| 663 |
@since: 8.2. |
|---|
| 664 |
@ivar authenticatorFactory: Factory callable that takes no arguments, to |
|---|
| 665 |
create a fresh authenticator to be associated |
|---|
| 666 |
with the XmlStream. |
|---|
| 667 |
""" |
|---|
| 668 |
|
|---|
| 669 |
protocol = XmlStream |
|---|
| 670 |
|
|---|
| 671 |
def __init__(self, authenticatorFactory): |
|---|
| 672 |
xmlstream.BootstrapMixin.__init__(self) |
|---|
| 673 |
self.authenticatorFactory = authenticatorFactory |
|---|
| 674 |
|
|---|
| 675 |
|
|---|
| 676 |
def buildProtocol(self, addr): |
|---|
| 677 |
""" |
|---|
| 678 |
Create an instance of XmlStream. |
|---|
| 679 |
|
|---|
| 680 |
A new authenticator instance will be created and passed to the new |
|---|
| 681 |
XmlStream. Registered bootstrap event observers are installed as well. |
|---|
| 682 |
""" |
|---|
| 683 |
authenticator = self.authenticatorFactory() |
|---|
| 684 |
xs = self.protocol(authenticator) |
|---|
| 685 |
xs.factory = self |
|---|
| 686 |
self.installBootstraps(xs) |
|---|
| 687 |
return xs |
|---|
| 688 |
|
|---|
| 689 |
|
|---|
| 690 |
|
|---|
| 691 |
class TimeoutError(Exception): |
|---|
| 692 |
""" |
|---|
| 693 |
Exception raised when no IQ response has been received before the |
|---|
| 694 |
configured timeout. |
|---|
| 695 |
""" |
|---|
| 696 |
|
|---|
| 697 |
|
|---|
| 698 |
|
|---|
| 699 |
def upgradeWithIQResponseTracker(xs): |
|---|
| 700 |
""" |
|---|
| 701 |
Enhances an XmlStream for iq response tracking. |
|---|
| 702 |
|
|---|
| 703 |
This makes an L{XmlStream} object provide L{IIQResponseTracker}. When a |
|---|
| 704 |
response is an error iq stanza, the deferred has its errback invoked with a |
|---|
| 705 |
failure that holds a L{StanzaException<error.StanzaException>} that is |
|---|
| 706 |
easier to examine. |
|---|
| 707 |
""" |
|---|
| 708 |
def callback(iq): |
|---|
| 709 |
""" |
|---|
| 710 |
Handle iq response by firing associated deferred. |
|---|
| 711 |
""" |
|---|
| 712 |
if getattr(iq, 'handled', False): |
|---|
| 713 |
return |
|---|
| 714 |
|
|---|
| 715 |
try: |
|---|
| 716 |
d = xs.iqDeferreds[iq["id"]] |
|---|
| 717 |
except KeyError: |
|---|
| 718 |
pass |
|---|
| 719 |
else: |
|---|
| 720 |
del xs.iqDeferreds[iq["id"]] |
|---|
| 721 |
iq.handled = True |
|---|
| 722 |
if iq['type'] == 'error': |
|---|
| 723 |
d.errback(error.exceptionFromStanza(iq)) |
|---|
| 724 |
else: |
|---|
| 725 |
d.callback(iq) |
|---|
| 726 |
|
|---|
| 727 |
|
|---|
| 728 |
def disconnected(_): |
|---|
| 729 |
""" |
|---|
| 730 |
Make sure deferreds do not linger on after disconnect. |
|---|
| 731 |
|
|---|
| 732 |
This errbacks all deferreds of iq's for which no response has been |
|---|
| 733 |
received with a L{ConnectionLost} failure. Otherwise, the deferreds |
|---|
| 734 |
will never be fired. |
|---|
| 735 |
""" |
|---|
| 736 |
iqDeferreds = xs.iqDeferreds |
|---|
| 737 |
xs.iqDeferreds = {} |
|---|
| 738 |
for d in iqDeferreds.itervalues(): |
|---|
| 739 |
d.errback(ConnectionLost()) |
|---|
| 740 |
|
|---|
| 741 |
xs.iqDeferreds = {} |
|---|
| 742 |
xs.iqDefaultTimeout = getattr(xs, 'iqDefaultTimeout', None) |
|---|
| 743 |
xs.addObserver(xmlstream.STREAM_END_EVENT, disconnected) |
|---|
| 744 |
xs.addObserver('/iq[@type="result"]', callback) |
|---|
| 745 |
xs.addObserver('/iq[@type="error"]', callback) |
|---|
| 746 |
directlyProvides(xs, ijabber.IIQResponseTracker) |
|---|
| 747 |
|
|---|
| 748 |
|
|---|
| 749 |
|
|---|
| 750 |
class IQ(domish.Element): |
|---|
| 751 |
""" |
|---|
| 752 |
Wrapper for an iq stanza. |
|---|
| 753 |
|
|---|
| 754 |
Iq stanzas are used for communications with a request-response behaviour. |
|---|
| 755 |
Each iq request is associated with an XML stream and has its own unique id |
|---|
| 756 |
to be able to track the response. |
|---|
| 757 |
|
|---|
| 758 |
@ivar timeout: if set, a timeout period after which the deferred returned |
|---|
| 759 |
by C{send} will have its errback called with a |
|---|
| 760 |
L{TimeoutError} failure. |
|---|
| 761 |
@type timeout: C{float} |
|---|
| 762 |
""" |
|---|
| 763 |
|
|---|
| 764 |
timeout = None |
|---|
| 765 |
|
|---|
| 766 |
def __init__(self, xmlstream, stanzaType="set"): |
|---|
| 767 |
""" |
|---|
| 768 |
@type xmlstream: L{xmlstream.XmlStream} |
|---|
| 769 |
@param xmlstream: XmlStream to use for transmission of this IQ |
|---|
| 770 |
|
|---|
| 771 |
@type stanzaType: L{str} |
|---|
| 772 |
@param stanzaType: IQ type identifier ('get' or 'set') |
|---|
| 773 |
""" |
|---|
| 774 |
domish.Element.__init__(self, (None, "iq")) |
|---|
| 775 |
self.addUniqueId() |
|---|
| 776 |
self["type"] = stanzaType |
|---|
| 777 |
self._xmlstream = xmlstream |
|---|
| 778 |
|
|---|
| 779 |
|
|---|
| 780 |
def send(self, to=None): |
|---|
| 781 |
""" |
|---|
| 782 |
Send out this iq. |
|---|
| 783 |
|
|---|
| 784 |
Returns a deferred that is fired when an iq response with the same id |
|---|
| 785 |
is received. Result responses will be passed to the deferred callback. |
|---|
| 786 |
Error responses will be transformed into a |
|---|
| 787 |
L{StanzaError<error.StanzaError>} and result in the errback of the |
|---|
| 788 |
deferred being invoked. |
|---|
| 789 |
|
|---|
| 790 |
@rtype: L{defer.Deferred} |
|---|
| 791 |
""" |
|---|
| 792 |
if to is not None: |
|---|
| 793 |
self["to"] = to |
|---|
| 794 |
|
|---|
| 795 |
if not ijabber.IIQResponseTracker.providedBy(self._xmlstream): |
|---|
| 796 |
upgradeWithIQResponseTracker(self._xmlstream) |
|---|
| 797 |
|
|---|
| 798 |
d = defer.Deferred() |
|---|
| 799 |
self._xmlstream.iqDeferreds[self['id']] = d |
|---|
| 800 |
|
|---|
| 801 |
timeout = self.timeout or self._xmlstream.iqDefaultTimeout |
|---|
| 802 |
if timeout is not None: |
|---|
| 803 |
def onTimeout(): |
|---|
| 804 |
del self._xmlstream.iqDeferreds[self['id']] |
|---|
| 805 |
d.errback(TimeoutError("IQ timed out")) |
|---|
| 806 |
|
|---|
| 807 |
call = self._xmlstream._callLater(timeout, onTimeout) |
|---|
| 808 |
|
|---|
| 809 |
def cancelTimeout(result): |
|---|
| 810 |
if call.active(): |
|---|
| 811 |
call.cancel() |
|---|
| 812 |
|
|---|
| 813 |
return result |
|---|
| 814 |
|
|---|
| 815 |
d.addBoth(cancelTimeout) |
|---|
| 816 |
|
|---|
| 817 |
self._xmlstream.send(self) |
|---|
| 818 |
return d |
|---|
| 819 |
|
|---|
| 820 |
|
|---|
| 821 |
|
|---|
| 822 |
def toResponse(stanza, stanzaType=None): |
|---|
| 823 |
""" |
|---|
| 824 |
Create a response stanza from another stanza. |
|---|
| 825 |
|
|---|
| 826 |
This takes the addressing and id attributes from a stanza to create a (new, |
|---|
| 827 |
empty) response stanza. The addressing attributes are swapped and the id |
|---|
| 828 |
copied. Optionally, the stanza type of the response can be specified. |
|---|
| 829 |
|
|---|
| 830 |
@param stanza: the original stanza |
|---|
| 831 |
@type stanza: L{domish.Element} |
|---|
| 832 |
@param stanzaType: optional response stanza type |
|---|
| 833 |
@type stanzaType: C{str} |
|---|
| 834 |
@return: the response stanza. |
|---|
| 835 |
@rtype: L{domish.Element} |
|---|
| 836 |
""" |
|---|
| 837 |
|
|---|
| 838 |
toAddr = stanza.getAttribute('from') |
|---|
| 839 |
fromAddr = stanza.getAttribute('to') |
|---|
| 840 |
stanzaID = stanza.getAttribute('id') |
|---|
| 841 |
|
|---|
| 842 |
response = domish.Element((None, stanza.name)) |
|---|
| 843 |
if toAddr: |
|---|
| 844 |
response['to'] = toAddr |
|---|
| 845 |
if fromAddr: |
|---|
| 846 |
response['from'] = fromAddr |
|---|
| 847 |
if stanzaID: |
|---|
| 848 |
response['id'] = stanzaID |
|---|
| 849 |
if stanzaType: |
|---|
| 850 |
response['type'] = stanzaType |
|---|
| 851 |
|
|---|
| 852 |
return response |
|---|
| 853 |
|
|---|
| 854 |
|
|---|
| 855 |
|
|---|
| 856 |
class XMPPHandler(object): |
|---|
| 857 |
""" |
|---|
| 858 |
XMPP protocol handler. |
|---|
| 859 |
|
|---|
| 860 |
Classes derived from this class implement (part of) one or more XMPP |
|---|
| 861 |
extension protocols, and are referred to as a subprotocol implementation. |
|---|
| 862 |
""" |
|---|
| 863 |
|
|---|
| 864 |
implements(ijabber.IXMPPHandler) |
|---|
| 865 |
|
|---|
| 866 |
def __init__(self): |
|---|
| 867 |
self.parent = None |
|---|
| 868 |
self.xmlstream = None |
|---|
| 869 |
|
|---|
| 870 |
|
|---|
| 871 |
def setHandlerParent(self, parent): |
|---|
| 872 |
self.parent = parent |
|---|
| 873 |
self.parent.addHandler(self) |
|---|
| 874 |
|
|---|
| 875 |
|
|---|
| 876 |
def disownHandlerParent(self, parent): |
|---|
| 877 |
self.parent.removeHandler(self) |
|---|
| 878 |
self.parent = None |
|---|
| 879 |
|
|---|
| 880 |
|
|---|
| 881 |
def makeConnection(self, xs): |
|---|
| 882 |
self.xmlstream = xs |
|---|
| 883 |
self.connectionMade() |
|---|
| 884 |
|
|---|
| 885 |
|
|---|
| 886 |
def connectionMade(self): |
|---|
| 887 |
""" |
|---|
| 888 |
Called after a connection has been established. |
|---|
| 889 |
|
|---|
| 890 |
Can be overridden to perform work before stream initialization. |
|---|
| 891 |
""" |
|---|
| 892 |
|
|---|
| 893 |
|
|---|
| 894 |
def connectionInitialized(self): |
|---|
| 895 |
""" |
|---|
| 896 |
The XML stream has been initialized. |
|---|
| 897 |
|
|---|
| 898 |
Can be overridden to perform work after stream initialization, e.g. to |
|---|
| 899 |
set up observers and start exchanging XML stanzas. |
|---|
| 900 |
""" |
|---|
| 901 |
|
|---|
| 902 |
|
|---|
| 903 |
def connectionLost(self, reason): |
|---|
| 904 |
""" |
|---|
| 905 |
The XML stream has been closed. |
|---|
| 906 |
|
|---|
| 907 |
This method can be extended to inspect the C{reason} argument and |
|---|
| 908 |
act on it. |
|---|
| 909 |
""" |
|---|
| 910 |
self.xmlstream = None |
|---|
| 911 |
|
|---|
| 912 |
|
|---|
| 913 |
def send(self, obj): |
|---|
| 914 |
""" |
|---|
| 915 |
Send data over the managed XML stream. |
|---|
| 916 |
|
|---|
| 917 |
@note: The stream manager maintains a queue for data sent using this |
|---|
| 918 |
method when there is no current initialized XML stream. This |
|---|
| 919 |
data is then sent as soon as a new stream has been established |
|---|
| 920 |
and initialized. Subsequently, L{connectionInitialized} will be |
|---|
| 921 |
called again. If this queueing is not desired, use C{send} on |
|---|
| 922 |
C{self.xmlstream}. |
|---|
| 923 |
|
|---|
| 924 |
@param obj: data to be sent over the XML stream. This is usually an |
|---|
| 925 |
object providing L{domish.IElement}, or serialized XML. See |
|---|
| 926 |
L{xmlstream.XmlStream} for details. |
|---|
| 927 |
""" |
|---|
| 928 |
self.parent.send(obj) |
|---|
| 929 |
|
|---|
| 930 |
|
|---|
| 931 |
|
|---|
| 932 |
class XMPPHandlerCollection(object): |
|---|
| 933 |
""" |
|---|
| 934 |
Collection of XMPP subprotocol handlers. |
|---|
| 935 |
|
|---|
| 936 |
This allows for grouping of subprotocol handlers, but is not an |
|---|
| 937 |
L{XMPPHandler} itself, so this is not recursive. |
|---|
| 938 |
|
|---|
| 939 |
@ivar handlers: List of protocol handlers. |
|---|
| 940 |
@type handlers: L{list} of objects providing |
|---|
| 941 |
L{IXMPPHandler} |
|---|
| 942 |
""" |
|---|
| 943 |
|
|---|
| 944 |
implements(ijabber.IXMPPHandlerCollection) |
|---|
| 945 |
|
|---|
| 946 |
def __init__(self): |
|---|
| 947 |
self.handlers = [] |
|---|
| 948 |
|
|---|
| 949 |
|
|---|
| 950 |
def __iter__(self): |
|---|
| 951 |
""" |
|---|
| 952 |
Act as a container for handlers. |
|---|
| 953 |
""" |
|---|
| 954 |
return iter(self.handlers) |
|---|
| 955 |
|
|---|
| 956 |
|
|---|
| 957 |
def addHandler(self, handler): |
|---|
| 958 |
""" |
|---|
| 959 |
Add protocol handler. |
|---|
| 960 |
|
|---|
| 961 |
Protocol handlers are expected to provide L{ijabber.IXMPPHandler}. |
|---|
| 962 |
""" |
|---|
| 963 |
self.handlers.append(handler) |
|---|
| 964 |
|
|---|
| 965 |
|
|---|
| 966 |
def removeHandler(self, handler): |
|---|
| 967 |
""" |
|---|
| 968 |
Remove protocol handler. |
|---|
| 969 |
""" |
|---|
| 970 |
self.handlers.remove(handler) |
|---|
| 971 |
|
|---|
| 972 |
|
|---|
| 973 |
|
|---|
| 974 |
class StreamManager(XMPPHandlerCollection): |
|---|
| 975 |
""" |
|---|
| 976 |
Business logic representing a managed XMPP connection. |
|---|
| 977 |
|
|---|
| 978 |
This maintains a single XMPP connection and provides facilities for packet |
|---|
| 979 |
routing and transmission. Business logic modules are objects providing |
|---|
| 980 |
L{ijabber.IXMPPHandler} (like subclasses of L{XMPPHandler}), and added |
|---|
| 981 |
using L{addHandler}. |
|---|
| 982 |
|
|---|
| 983 |
@ivar xmlstream: currently managed XML stream |
|---|
| 984 |
@type xmlstream: L{XmlStream} |
|---|
| 985 |
@ivar logTraffic: if true, log all traffic. |
|---|
| 986 |
@type logTraffic: L{bool} |
|---|
| 987 |
@ivar _initialized: Whether the stream represented by L{xmlstream} has |
|---|
| 988 |
been initialized. This is used when caching outgoing |
|---|
| 989 |
stanzas. |
|---|
| 990 |
@type _initialized: C{bool} |
|---|
| 991 |
@ivar _packetQueue: internal buffer of unsent data. See L{send} for details. |
|---|
| 992 |
@type _packetQueue: L{list} |
|---|
| 993 |
""" |
|---|
| 994 |
|
|---|
| 995 |
logTraffic = False |
|---|
| 996 |
|
|---|
| 997 |
def __init__(self, factory): |
|---|
| 998 |
XMPPHandlerCollection.__init__(self) |
|---|
| 999 |
self.xmlstream = None |
|---|
| 1000 |
self._packetQueue = [] |
|---|
| 1001 |
self._initialized = False |
|---|
| 1002 |
|
|---|
| 1003 |
factory.addBootstrap(STREAM_CONNECTED_EVENT, self._connected) |
|---|
| 1004 |
factory.addBootstrap(STREAM_AUTHD_EVENT, self._authd) |
|---|
| 1005 |
factory.addBootstrap(INIT_FAILED_EVENT, self.initializationFailed) |
|---|
| 1006 |
factory.addBootstrap(STREAM_END_EVENT, self._disconnected) |
|---|
| 1007 |
self.factory = factory |
|---|
| 1008 |
|
|---|
| 1009 |
|
|---|
| 1010 |
def addHandler(self, handler): |
|---|
| 1011 |
""" |
|---|
| 1012 |
Add protocol handler. |
|---|
| 1013 |
|
|---|
| 1014 |
When an XML stream has already been established, the handler's |
|---|
| 1015 |
C{connectionInitialized} will be called to get it up to speed. |
|---|
| 1016 |
""" |
|---|
| 1017 |
XMPPHandlerCollection.addHandler(self, handler) |
|---|
| 1018 |
|
|---|
| 1019 |
|
|---|
| 1020 |
|
|---|
| 1021 |
if self.xmlstream and self._initialized: |
|---|
| 1022 |
handler.makeConnection(self.xmlstream) |
|---|
| 1023 |
handler.connectionInitialized() |
|---|
| 1024 |
|
|---|
| 1025 |
|
|---|
| 1026 |
def _connected(self, xs): |
|---|
| 1027 |
""" |
|---|
| 1028 |
Called when the transport connection has been established. |
|---|
| 1029 |
|
|---|
| 1030 |
Here we optionally set up traffic logging (depending on L{logTraffic}) |
|---|
| 1031 |
and call each handler's C{makeConnection} method with the L{XmlStream} |
|---|
| 1032 |
instance. |
|---|
| 1033 |
""" |
|---|
| 1034 |
def logDataIn(buf): |
|---|
| 1035 |
log.msg("RECV: %r" % buf) |
|---|
| 1036 |
|
|---|
| 1037 |
def logDataOut(buf): |
|---|
| 1038 |
log.msg("SEND: %r" % buf) |
|---|
| 1039 |
|
|---|
| 1040 |
if self.logTraffic: |
|---|
| 1041 |
xs.rawDataInFn = logDataIn |
|---|
| 1042 |
xs.rawDataOutFn = logDataOut |
|---|
| 1043 |
|
|---|
| 1044 |
self.xmlstream = xs |
|---|
| 1045 |
|
|---|
| 1046 |
for e in self: |
|---|
| 1047 |
e.makeConnection(xs) |
|---|
| 1048 |
|
|---|
| 1049 |
|
|---|
| 1050 |
def _authd(self, xs): |
|---|
| 1051 |
""" |
|---|
| 1052 |
Called when the stream has been initialized. |
|---|
| 1053 |
|
|---|
| 1054 |
Send out cached stanzas and call each handler's |
|---|
| 1055 |
C{connectionInitialized} method. |
|---|
| 1056 |
""" |
|---|
| 1057 |
|
|---|
| 1058 |
for p in self._packetQueue: |
|---|
| 1059 |
xs.send(p) |
|---|
| 1060 |
self._packetQueue = [] |
|---|
| 1061 |
self._initialized = True |
|---|
| 1062 |
|
|---|
| 1063 |
|
|---|
| 1064 |
|
|---|
| 1065 |
for e in self: |
|---|
| 1066 |
e.connectionInitialized() |
|---|
| 1067 |
|
|---|
| 1068 |
|
|---|
| 1069 |
def initializationFailed(self, reason): |
|---|
| 1070 |
""" |
|---|
| 1071 |
Called when stream initialization has failed. |
|---|
| 1072 |
|
|---|
| 1073 |
Stream initialization has halted, with the reason indicated by |
|---|
| 1074 |
C{reason}. It may be retried by calling the authenticator's |
|---|
| 1075 |
C{initializeStream}. See the respective authenticators for details. |
|---|
| 1076 |
|
|---|
| 1077 |
@param reason: A failure instance indicating why stream initialization |
|---|
| 1078 |
failed. |
|---|
| 1079 |
@type reason: L{failure.Failure} |
|---|
| 1080 |
""" |
|---|
| 1081 |
|
|---|
| 1082 |
|
|---|
| 1083 |
def _disconnected(self, _): |
|---|
| 1084 |
""" |
|---|
| 1085 |
Called when the stream has been closed. |
|---|
| 1086 |
|
|---|
| 1087 |
From this point on, the manager doesn't interact with the |
|---|
| 1088 |
L{XmlStream} anymore and notifies each handler that the connection |
|---|
| 1089 |
was lost by calling its C{connectionLost} method. |
|---|
| 1090 |
""" |
|---|
| 1091 |
self.xmlstream = None |
|---|
| 1092 |
self._initialized = False |
|---|
| 1093 |
|
|---|
| 1094 |
|
|---|
| 1095 |
|
|---|
| 1096 |
for e in self: |
|---|
| 1097 |
e.connectionLost(None) |
|---|
| 1098 |
|
|---|
| 1099 |
|
|---|
| 1100 |
def send(self, obj): |
|---|
| 1101 |
""" |
|---|
| 1102 |
Send data over the XML stream. |
|---|
| 1103 |
|
|---|
| 1104 |
When there is no established XML stream, the data is queued and sent |
|---|
| 1105 |
out when a new XML stream has been established and initialized. |
|---|
| 1106 |
|
|---|
| 1107 |
@param obj: data to be sent over the XML stream. See |
|---|
| 1108 |
L{xmlstream.XmlStream.send} for details. |
|---|
| 1109 |
""" |
|---|
| 1110 |
if self._initialized: |
|---|
| 1111 |
self.xmlstream.send(obj) |
|---|
| 1112 |
else: |
|---|
| 1113 |
self._packetQueue.append(obj) |
|---|