root / trunk / twisted / spread / pb.py

Revision 25457, 45.7 kB (checked in by exarkun, 8 months ago)

Merge hashlib-2763-3

Author: wsanchez, exarkun
Reviewer: exarkun, mwhudson
Fixes: #2763

Replace uses of md5 and sha modules in Twisted with use of a new twisted.python.hashlib
module which transparently uses the new hashlib standard library module if it is available
or falls back to md5 and sha if not.

Line 
1 # -*- test-case-name: twisted.test.test_pb -*-
2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Perspective Broker
7
8 \"This isn\'t a professional opinion, but it's probably got enough
9 internet to kill you.\" --glyph
10
11 Introduction
12 ============
13
14 This is a broker for proxies for and copies of objects.  It provides a
15 translucent interface layer to those proxies.
16
17 The protocol is not opaque, because it provides objects which represent the
18 remote proxies and require no context (server references, IDs) to operate on.
19
20 It is not transparent because it does I{not} attempt to make remote objects
21 behave identically, or even similiarly, to local objects.  Method calls are
22 invoked asynchronously, and specific rules are applied when serializing
23 arguments.
24
25 To get started, begin with L{PBClientFactory} and L{PBServerFactory}.
26
27 @author: Glyph Lefkowitz
28 """
29
30 import random
31 import new
32 import types
33
34 from zope.interface import implements, Interface
35
36 # Twisted Imports
37 from twisted.python import log, failure, reflect
38 from twisted.python.versions import Version
39 from twisted.python.deprecate import deprecated
40 from twisted.python.hashlib import md5
41 from twisted.internet import defer, protocol
42 from twisted.cred.portal import Portal
43 from twisted.cred.credentials import IAnonymous, ICredentials
44 from twisted.cred.credentials import IUsernameHashedPassword, Anonymous
45 from twisted.persisted import styles
46 from twisted.python.components import registerAdapter
47
48 from twisted.spread.interfaces import IJellyable, IUnjellyable
49 from twisted.spread.jelly import jelly, unjelly, globalSecurity
50 from twisted.spread import banana
51
52 from twisted.spread.flavors import Serializable
53 from twisted.spread.flavors import Referenceable, NoSuchMethod
54 from twisted.spread.flavors import Root, IPBRoot
55 from twisted.spread.flavors import ViewPoint
56 from twisted.spread.flavors import Viewable
57 from twisted.spread.flavors import Copyable
58 from twisted.spread.flavors import Jellyable
59 from twisted.spread.flavors import Cacheable
60 from twisted.spread.flavors import RemoteCopy
61 from twisted.spread.flavors import RemoteCache
62 from twisted.spread.flavors import RemoteCacheObserver
63 from twisted.spread.flavors import copyTags
64
65 from twisted.spread.flavors import setUnjellyableForClass
66 from twisted.spread.flavors import setUnjellyableFactoryForClass
67 from twisted.spread.flavors import setUnjellyableForClassTree
68 # These three are backwards compatibility aliases for the previous three.
69 # Ultimately they should be deprecated. -exarkun
70 from twisted.spread.flavors import setCopierForClass
71 from twisted.spread.flavors import setFactoryForClass
72 from twisted.spread.flavors import setCopierForClassTree
73
74
75 MAX_BROKER_REFS = 1024
76
77 portno = 8787
78
79
80 class ProtocolError(Exception):
81     """
82     This error is raised when an invalid protocol statement is received.
83     """
84
85 class DeadReferenceError(ProtocolError):
86     """
87     This error is raised when a method is called on a dead reference (one whose
88     broker has been disconnected).
89     """
90
91 class Error(Exception):
92     """
93     This error can be raised to generate known error conditions.
94
95     When a PB callable method (perspective_, remote_, view_) raises
96     this error, it indicates that a traceback should not be printed,
97     but instead, the string representation of the exception should be
98     sent.
99     """
100
101 class RemoteMethod:
102     """This is a translucent reference to a remote message.
103     """
104     def __init__(self, obj, name):
105         """Initialize with a L{RemoteReference} and the name of this message.
106         """
107         self.obj = obj
108         self.name = name
109
110     def __cmp__(self, other):
111         return cmp((self.obj, self.name), other)
112
113     def __hash__(self):
114         return hash((self.obj, self.name))
115
116     def __call__(self, *args, **kw):
117         """Asynchronously invoke a remote method.
118         """
119         return self.obj.broker._sendMessage('',self.obj.perspective, self.obj.luid,  self.name, args, kw)
120
121
122
123 def noOperation(*args, **kw):
124     """
125     Do nothing.
126
127     Neque porro quisquam est qui dolorem ipsum quia dolor sit amet,
128     consectetur, adipisci velit...
129     """
130 noOperation = deprecated(Version("twisted", 8, 2, 0))(noOperation)
131
132
133
134 class PBConnectionLost(Exception):
135     pass
136
137
138
139 def printTraceback(tb):
140     """
141     Print a traceback (string) to the standard log.
142     """
143     log.msg('Perspective Broker Traceback:' )
144     log.msg(tb)
145 printTraceback = deprecated(Version("twisted", 8, 2, 0))(printTraceback)
146
147
148 class IPerspective(Interface):
149     """
150     per*spec*tive, n. : The relationship of aspects of a subject to each
151     other and to a whole: 'a perspective of history'; 'a need to view
152     the problem in the proper perspective'.
153
154     This is a Perspective Broker-specific wrapper for an avatar. That
155     is to say, a PB-published view on to the business logic for the
156     system's concept of a 'user'.
157
158     The concept of attached/detached is no longer implemented by the
159     framework. The realm is expected to implement such semantics if
160     needed.
161     """
162
163     def perspectiveMessageReceived(broker, message, args, kwargs):
164         """
165         This method is called when a network message is received.
166
167         @arg broker: The Perspective Broker.
168
169         @type message: str
170         @arg message: The name of the method called by the other end.
171
172         @type args: list in jelly format
173         @arg args: The arguments that were passed by the other end. It
174                    is recommend that you use the `unserialize' method of the
175                    broker to decode this.
176
177         @type kwargs: dict in jelly format
178         @arg kwargs: The keyword arguments that were passed by the
179                      other end.  It is recommended that you use the
180                      `unserialize' method of the broker to decode this.
181
182         @rtype: A jelly list.
183         @return: It is recommended that you use the `serialize' method
184                  of the broker on whatever object you need to return to
185                  generate the return value.
186         """
187
188
189
190 class Avatar:
191     """
192     A default IPerspective implementor.
193
194     This class is intended to be subclassed, and a realm should return
195     an instance of such a subclass when IPerspective is requested of
196     it.
197
198     A peer requesting a perspective will receive only a
199     L{RemoteReference} to a pb.Avatar.  When a method is called on
200     that L{RemoteReference}, it will translate to a method on the
201     remote perspective named 'perspective_methodname'.  (For more
202     information on invoking methods on other objects, see
203     L{flavors.ViewPoint}.)
204     """
205
206     implements(IPerspective)
207
208     def perspectiveMessageReceived(self, broker, message, args, kw):
209         """
210         This method is called when a network message is received.
211
212         This will call::
213
214             self.perspective_%(message)s(*broker.unserialize(args),
215                                          **broker.unserialize(kw))
216
217         to handle the method; subclasses of Avatar are expected to
218         implement methods using this naming convention.
219         """
220
221         args = broker.unserialize(args, self)
222         kw = broker.unserialize(kw, self)
223         method = getattr(self, "perspective_%s" % message)
224         try:
225             state = method(*args, **kw)
226         except TypeError:
227             log.msg("%s didn't accept %s and %s" % (method, args, kw))
228             raise
229         return broker.serialize(state, self, method, args, kw)
230
231
232
233 class AsReferenceable(Referenceable):
234     """
235     A reference directed towards another object.
236     """
237
238     def __init__(self, object, messageType="remote"):
239         self.remoteMessageReceived = getattr(
240             object, messageType + "MessageReceived")
241
242
243
244 class RemoteReference(Serializable, styles.Ephemeral):
245     """
246     A translucent reference to a remote object.
247
248     I may be a reference to a L{flavors.ViewPoint}, a
249     L{flavors.Referenceable}, or an L{IPerspective} implementor (e.g.,
250     pb.Avatar).  From the client's perspective, it is not possible to
251     tell which except by convention.
252
253     I am a \"translucent\" reference because although no additional
254     bookkeeping overhead is given to the application programmer for
255     manipulating a reference, return values are asynchronous.
256
257     See also L{twisted.internet.defer}.
258
259     @ivar broker: The broker I am obtained through.
260     @type broker: L{Broker}
261     """
262
263     implements(IUnjellyable)
264
265     def __init__(self, perspective, broker, luid, doRefCount):
266         """(internal) Initialize me with a broker and a locally-unique ID.
267
268         The ID is unique only to the particular Perspective Broker
269         instance.
270         """
271         self.luid = luid
272         self.broker = broker
273         self.doRefCount = doRefCount
274         self.perspective = perspective
275         self.disconnectCallbacks = []
276
277     def notifyOnDisconnect(self, callback):
278         """Register a callback to be called if our broker gets disconnected.
279
280         This callback will be called with one argument, this instance.
281         """
282         assert callable(callback)
283         self.disconnectCallbacks.append(callback)
284         if len(self.disconnectCallbacks) == 1:
285             self.broker.notifyOnDisconnect(self._disconnected)
286
287     def dontNotifyOnDisconnect(self, callback):
288         """Remove a callback that was registered with notifyOnDisconnect."""
289         self.disconnectCallbacks.remove(callback)
290         if not self.disconnectCallbacks:
291             self.broker.dontNotifyOnDisconnect(self._disconnected)
292
293     def _disconnected(self):
294         """Called if we are disconnected and have callbacks registered."""
295         for callback in self.disconnectCallbacks:
296             callback(self)
297         self.disconnectCallbacks = None
298
299     def jellyFor(self, jellier):
300         """If I am being sent back to where I came from, serialize as a local backreference.
301         """
302         if jellier.invoker:
303             assert self.broker == jellier.invoker, "Can't send references to brokers other than their own."
304             return "local", self.luid
305         else:
306             return "unpersistable", "References cannot be serialized"
307
308     def unjellyFor(self, unjellier, unjellyList):
309         self.__init__(unjellier.invoker.unserializingPerspective, unjellier.invoker, unjellyList[1], 1)
310         return self
311
312     def callRemote(self, _name, *args, **kw):
313         """Asynchronously invoke a remote method.
314
315         @type _name:   C{string}
316         @param _name:  the name of the remote method to invoke
317         @param args: arguments to serialize for the remote function
318         @param kw:  keyword arguments to serialize for the remote function.
319         @rtype:   L{twisted.internet.defer.Deferred}
320         @returns: a Deferred which will be fired when the result of
321                   this remote call is received.
322         """
323         # note that we use '_name' instead of 'name' so the user can call
324         # remote methods with 'name' as a keyword parameter, like this:
325         #  ref.callRemote("getPeopleNamed", count=12, name="Bob")
326
327         return self.broker._sendMessage('',self.perspective, self.luid,
328                                         _name, args, kw)
329
330     def remoteMethod(self, key):
331         """Get a L{RemoteMethod} for this key.
332         """
333         return RemoteMethod(self, key)
334
335     def __cmp__(self,other):
336         """Compare me [to another L{RemoteReference}].
337         """
338         if isinstance(other, RemoteReference):
339             if other.broker == self.broker:
340                 return cmp(self.luid, other.luid)
341         return cmp(self.broker, other)
342
343     def __hash__(self):
344         """Hash me.
345         """
346         return self.luid
347
348     def __del__(self):
349         """Do distributed reference counting on finalization.
350         """
351         if self.doRefCount:
352             self.broker.sendDecRef(self.luid)
353
354 setUnjellyableForClass("remote", RemoteReference)
355
356 class Local:
357     """(internal) A reference to a local object.
358     """
359
360     def __init__(self, object, perspective=None):
361         """Initialize.
362         """
363         self.object = object
364         self.perspective = perspective
365         self.refcount = 1
366
367     def __repr__(self):
368         return "<pb.Local %r ref:%s>" % (self.object, self.refcount)
369
370     def incref(self):
371         """Increment and return my reference count.
372         """
373         self.refcount = self.refcount + 1
374         return self.refcount
375
376     def decref(self):
377         """Decrement and return my reference count.
378         """
379         self.refcount = self.refcount - 1
380         return self.refcount
381
382
383 ##
384 # Failure
385 ##
386
387 class CopyableFailure(failure.Failure, Copyable):
388     """
389     A L{flavors.RemoteCopy} and L{flavors.Copyable} version of
390     L{twisted.python.failure.Failure} for serialization.
391     """
392
393     unsafeTracebacks = 0
394
395     def getStateToCopy(self):
396         """
397         Collect state related to the exception which occurred, discarding
398         state which cannot reasonably be serialized.
399         """
400         state = self.__dict__.copy()
401         state['tb'] = None
402         state['frames'] = []
403         state['stack'] = []
404         if isinstance(self.value, failure.Failure):
405             state['value'] = failure2Copyable(self.value, self.unsafeTracebacks)
406         else:
407             state['value'] = str(self.value) # Exception instance
408         if isinstance(self.type, str):
409             state['type'] = self.type
410         else:
411             state['type'] = reflect.qual(self.type) # Exception class
412         if self.unsafeTracebacks:
413             state['traceback'] = self.getTraceback()
414         else:
415             state['traceback'] = 'Traceback unavailable\n'
416         return state
417
418
419 class CopiedFailure(RemoteCopy, failure.Failure):
420     def printTraceback(self, file=None, elideFrameworkCode=0, detail='default'):
421         if file is None:
422             file = log.logfile
423         file.write("Traceback from remote host -- ")
424         file.write(self.traceback)
425
426     printBriefTraceback = printTraceback
427     printDetailedTraceback = printTraceback
428
429 setUnjellyableForClass(CopyableFailure, CopiedFailure)
430
431 def failure2Copyable(fail, unsafeTracebacks=0):
432     f = new.instance(CopyableFailure, fail.__dict__)
433     f.unsafeTracebacks = unsafeTracebacks
434     return f
435
436 class Broker(banana.Banana):
437     """I am a broker for objects.
438     """
439
440     version = 6
441     username = None
442     factory = None
443
444     def __init__(self, isClient=1, security=globalSecurity):
445         banana.Banana.__init__(self, isClient)
446         self.disconnected = 0
447         self.disconnects = []
448         self.failures = []
449         self.connects = []
450         self.localObjects = {}
451         self.security = security
452         self.pageProducers = []
453         self.currentRequestID = 0
454         self.currentLocalID = 0
455         # Some terms:
456         #  PUID: process unique ID; return value of id() function.  type "int".
457         #  LUID: locally unique ID; an ID unique to an object mapped over this
458         #        connection. type "int"
459         #  GUID: (not used yet) globally unique ID; an ID for an object which
460         #        may be on a redirected or meta server.  Type as yet undecided.
461         # Dictionary mapping LUIDs to local objects.
462         # set above to allow root object to be assigned before connection is made
463         # self.localObjects = {}
464         # Dictionary mapping PUIDs to LUIDs.
465         self.luids = {}
466         # Dictionary mapping LUIDs to local (remotely cached) objects. Remotely
467         # cached means that they're objects which originate here, and were
468         # copied remotely.
469         self.remotelyCachedObjects = {}
470         # Dictionary mapping PUIDs to (cached) LUIDs
471         self.remotelyCachedLUIDs = {}
472         # Dictionary mapping (remote) LUIDs to (locally cached) objects.
473         self.locallyCachedObjects = {}
474         self.waitingForAnswers = {}
475
476         # Mapping from LUIDs to weakref objects with callbacks for performing
477         # any local cleanup which may be necessary for the corresponding
478         # object once it no longer exists.
479         self._localCleanup = {}
480
481
482     def resumeProducing(self):
483         """Called when the consumer attached to me runs out of buffer.
484         """
485         # Go backwards over the list so we can remove indexes from it as we go
486         for pageridx in xrange(len(self.pageProducers)-1, -1, -1):
487             pager = self.pageProducers[pageridx]
488             pager.sendNextPage()
489             if not pager.stillPaging():
490                 del self.pageProducers[pageridx]
491         if not self.pageProducers:
492             self.transport.unregisterProducer()
493
494     # Streaming producer methods; not necessary to implement.
495     def pauseProducing(self):
496         pass
497
498     def stopProducing(self):
499         pass
500
501     def registerPageProducer(self, pager):
502         self.pageProducers.append(pager)
503         if len(self.pageProducers) == 1:
504             self.transport.registerProducer(self, 0)
505
506     def expressionReceived(self, sexp):
507         """Evaluate an expression as it's received.
508         """
509         if isinstance(sexp, types.ListType):
510             command = sexp[0]
511             methodName = "proto_%s" % command
512             method = getattr(self, methodName, None)
513             if method:
514                 method(*sexp[1:])
515             else:
516                 self.sendCall("didNotUnderstand", command)
517         else:
518             raise ProtocolError("Non-list expression received.")
519
520
521     def proto_version(self, vnum):
522         """Protocol message: (version version-number)
523
524         Check to make sure that both ends of the protocol are speaking
525         the same version dialect.
526         """
527
528         if vnum != self.version:
529             raise ProtocolError("Version Incompatibility: %s %s" % (self.version, vnum))
530
531
532     def sendCall(self, *exp):
533         """Utility method to send an expression to the other side of the connection.
534         """
535         self.sendEncoded(exp)
536
537     def proto_didNotUnderstand(self, command):
538         """Respond to stock 'C{didNotUnderstand}' message.
539
540         Log the command that was not understood and continue. (Note:
541         this will probably be changed to close the connection or raise
542         an exception in the future.)
543         """
544         log.msg("Didn't understand command: %r" % command)
545
546     def connectionReady(self):
547         """Initialize. Called after Banana negotiation is done.
548         """
549         self.sendCall("version", self.version)
550         for notifier in self.connects:
551             try:
552                 notifier()
553             except:
554                 log.deferr()
555         self.connects = None
556         if self.factory: # in tests we won't have factory
557             self.factory.clientConnectionMade(self)
558
559     def connectionFailed(self):
560         # XXX should never get called anymore? check!
561         for notifier in self.failures:
562             try:
563                 notifier()
564             except:
565                 log.deferr()
566         self.failures = None
567
568     waitingForAnswers = None
569
570     def connectionLost(self, reason):
571         """The connection was lost.
572         """
573         self.disconnected = 1
574         # nuke potential circular references.
575         self.luids = None
576         if self.waitingForAnswers:
577             for d in self.waitingForAnswers.values():
578                 try:
579                     d.errback(failure.Failure(PBConnectionLost(reason)))
580                 except:
581                     log.deferr()
582         # Assure all Cacheable.stoppedObserving are called
583         for lobj in self.remotelyCachedObjects.values():
584             cacheable = lobj.object
585             perspective = lobj.perspective
586             try:
587                 cacheable.stoppedObserving(perspective, RemoteCacheObserver(self, cacheable, perspective))
588             except:
589                 log.deferr()
590         # Loop on a copy to prevent notifiers to mixup
591         # the list by calling dontNotifyOnDisconnect
592         for notifier in self.disconnects[:]:
593             try:
594                 notifier()
595             except:
596                 log.deferr()
597         self.disconnects = None
598         self.waitingForAnswers = None
599         self.localSecurity = None
600         self.remoteSecurity = None
601         self.remotelyCachedObjects = None
602         self.remotelyCachedLUIDs = None
603         self.locallyCachedObjects = None
604         self.localObjects = None
605
606     def notifyOnDisconnect(self, notifier):
607         """Call the given callback when the Broker disconnects."""
608         assert callable(notifier)
609         self.disconnects.append(notifier)
610
611     def notifyOnFail(self, notifier):
612         """Call the given callback if the Broker fails to connect."""
613         assert callable(notifier)
614         self.failures.append(notifier)
615
616     def notifyOnConnect(self, notifier):
617         """Call the given callback when the Broker connects."""
618         assert callable(notifier)
619         if self.connects is None:
620             try:
621                 notifier()
622             except:
623                 log.err()
624         else:
625             self.connects.append(notifier)
626
627     def dontNotifyOnDisconnect(self, notifier):
628         """Remove a callback from list of disconnect callbacks."""
629         try:
630             self.disconnects.remove(notifier)
631         except ValueError:
632             pass
633
634     def localObjectForID(self, luid):
635         """
636         Get a local object for a locally unique ID.
637
638         @return: An object previously stored with L{registerReference} or
639             C{None} if there is no object which corresponds to the given
640             identifier.
641         """
642         lob = self.localObjects.get(luid)
643         if lob is None:
644             return
645         return lob.object
646
647     maxBrokerRefsViolations = 0
648
649     def registerReference(self, object):
650         """Get an ID for a local object.
651
652         Store a persistent reference to a local object and map its id()
653         to a generated, session-unique ID and return that ID.
654         """
655
656         assert object is not None
657         puid = object.processUniqueID()
658         luid = self.luids.get(puid)
659         if luid is None:
660             if len(self.localObjects) > MAX_BROKER_REFS:
661                 self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1
662                 if self.maxBrokerRefsViolations > 3:
663                     self.transport.loseConnection()
664                     raise Error("Maximum PB reference count exceeded.  "
665                                 "Goodbye.")
666                 raise Error("Maximum PB reference count exceeded.")
667
668             luid = self.newLocalID()
669             self.localObjects[luid] = Local(object)
670             self.luids[puid] = luid
671         else:
672             self.localObjects[luid].incref()
673         return luid
674
675     def setNameForLocal(self, name, object):
676         """Store a special (string) ID for this object.
677
678         This is how you specify a 'base' set of objects that the remote
679         protocol can connect to.
680         """
681         assert object is not None
682         self.localObjects[name] = Local(object)
683
684     def remoteForName(self, name):
685         """Returns an object from the remote name mapping.
686
687         Note that this does not check the validity of the name, only
688         creates a translucent reference for it.
689         """
690         return RemoteReference(None, self, name, 0)
691
692     def cachedRemotelyAs(self, instance, incref=0):
693         """Returns an ID that says what this instance is cached as remotely, or C{None} if it's not.
694         """
695
696         puid = instance.processUniqueID()
697         luid = self.remotelyCachedLUIDs.get(puid)
698         if (luid is not None) and (incref):
699             self.remotelyCachedObjects[luid].incref()
700         return luid
701
702     def remotelyCachedForLUID(self, luid):
703         """Returns an instance which is cached remotely, with this LUID.
704         """
705         return self.remotelyCachedObjects[luid].object
706
707     def cacheRemotely(self, instance):
708         """
709         XXX"""
710         puid = instance.processUniqueID()
711         luid = self.newLocalID()
712         if len(self.remotelyCachedObjects) > MAX_BROKER_REFS:
713             self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1
714             if self.maxBrokerRefsViolations > 3:
715                 self.transport.loseConnection()
716                 raise Error("Maximum PB cache count exceeded.  "
717                             "Goodbye.")
718             raise Error("Maximum PB cache count exceeded.")
719
720         self.remotelyCachedLUIDs[puid] = luid
721         # This table may not be necessary -- for now, it's to make sure that no
722         # monkey business happens with id(instance)
723         self.remotelyCachedObjects[luid] = Local(instance, self.serializingPerspective)
724         return luid
725
726     def cacheLocally(self, cid, instance):
727         """(internal)
728
729         Store a non-filled-out cached instance locally.
730         """
731         self.locallyCachedObjects[cid] = instance
732
733     def cachedLocallyAs(self, cid):
734         instance = self.locallyCachedObjects[cid]
735         return instance
736
737     def serialize(self, object, perspective=None, method=None, args=None, kw=None):
738         """Jelly an object according to the remote security rules for this broker.
739         """
740
741         if isinstance(object, defer.Deferred):
742             object.addCallbacks(self.serialize, lambda x: x,
743                                 callbackKeywords={
744                 'perspective': perspective,
745                 'method': method,
746                 'args': args,
747                 'kw': kw
748                 })
749             return object
750
751         # XXX This call is NOT REENTRANT and testing for reentrancy is just
752         # crazy, so it likely won't be.  Don't ever write methods that call the
753         # broker's serialize() method recursively (e.g. sending a method call
754         # from within a getState (this causes concurrency problems anyway so
755         # you really, really shouldn't do it))
756
757         # self.jellier = _NetJellier(self)
758         self.serializingPerspective = perspective
759         self.jellyMethod = method
760         self.jellyArgs = args
761         self.jellyKw = kw
762         try:
763             return jelly(object, self.security, None, self)
764         finally:
765             self.serializingPerspective = None
766             self.jellyMethod = None
767             self.jellyArgs = None
768             self.jellyKw = None
769
770     def unserialize(self, sexp, perspective = None):
771         """Unjelly an sexp according to the local security rules for this broker.
772         """
773
774         self.unserializingPerspective = perspective
775         try:
776             return unjelly(sexp, self.security, None, self)
777         finally:
778             self.unserializingPerspective = None
779
780     def newLocalID(self):
781         """Generate a new LUID.
782         """
783         self.currentLocalID = self.currentLocalID + 1
784         return self.currentLocalID
785
786     def newRequestID(self):
787         """Generate a new request ID.
788         """
789         self.currentRequestID = self.currentRequestID + 1
790         return self.currentRequestID
791
792     def _sendMessage(self, prefix, perspective, objectID, message, args, kw):
793         pbc = None
794         pbe = None
795         answerRequired = 1
796         if kw.has_key('pbcallback'):
797             pbc = kw['pbcallback']
798             del kw['pbcallback']
799         if kw.has_key('pberrback'):
800             pbe = kw['pberrback']
801             del kw['pberrback']
802         if kw.has_key('pbanswer'):
803             assert (not pbe) and (not pbc), "You can't specify a no-answer requirement."
804             answerRequired = kw['pbanswer']
805             del kw['pbanswer']
806         if self.disconnected:
807             raise DeadReferenceError("Calling Stale Broker")
808         try:
809             netArgs = self.serialize(args, perspective=perspective, method=message)
810             netKw = self.serialize(kw, perspective=perspective, method=message)
811         except:
812             return defer.fail(failure.Failure())
813         requestID = self.newRequestID()
814         if answerRequired:
815             rval = defer.Deferred()
816             self.waitingForAnswers[requestID] = rval
817             if pbc or pbe:
818                 log.msg('warning! using deprecated "pbcallback"')
819                 rval.addCallbacks(pbc, pbe)
820         else:
821             rval = None
822         self.sendCall(prefix+"message", requestID, objectID, message, answerRequired, netArgs, netKw)
823         return rval
824
825     def proto_message(self, requestID, objectID, message, answerRequired, netArgs, netKw):
826         self._recvMessage(self.localObjectForID, requestID, objectID, message, answerRequired, netArgs, netKw)
827     def proto_cachemessage(self, requestID, objectID, message, answerRequired, netArgs, netKw):
828         self._recvMessage(self.cachedLocallyAs, requestID, objectID, message, answerRequired, netArgs, netKw)
829
830     def _recvMessage(self, findObjMethod, requestID, objectID, message, answerRequired, netArgs, netKw):
831         """Received a message-send.
832
833         Look up message based on object, unserialize the arguments, and
834         invoke it with args, and send an 'answer' or 'error' response.
835         """
836         try:
837             object = findObjMethod(objectID)
838             if object is None:
839                 raise Error("Invalid Object ID")
840             netResult = object.remoteMessageReceived(self, message, netArgs, netKw)
841         except Error, e:
842             if answerRequired:
843                 # If the error is Jellyable or explicitly allowed via our
844                 # security options, send it back and let the code on the
845                 # other end deal with unjellying.  If it isn't Jellyable,
846                 # wrap it in a CopyableFailure, which ensures it can be
847                 # unjellied on the other end.  We have to do this because
848                 # all errors must be sent back.
849                 if isinstance(e, Jellyable) or self.security.isClassAllowed(e.__class__):
850                     self._sendError(e, requestID)
851                 else:
852                     self._sendError(CopyableFailure(e), requestID)
853         except:
854             if answerRequired:
855                 log.msg("Peer will receive following PB traceback:", isError=True)
856                 f = CopyableFailure()
857                 self._sendError(f, requestID)
858             log.err()
859         else:
860             if answerRequired:
861                 if isinstance(netResult, defer.Deferred):
862                     args = (requestID,)
863                     netResult.addCallbacks(self._sendAnswer, self._sendFailureOrError,
864                                            callbackArgs=args, errbackArgs=args)
865                     # XXX Should this be done somewhere else?
866                 else:
867                     self._sendAnswer(netResult, requestID)
868     ##
869     # success
870     ##
871
872     def _sendAnswer(self, netResult, requestID):
873         """(internal) Send an answer to a previously sent message.
874         """
875         self.sendCall("answer", requestID, netResult)
876
877     def proto_answer(self, requestID, netResult):
878         """(internal) Got an answer to a previously sent message.
879
880         Look up the appropriate callback and call it.
881         """
882         d = self.waitingForAnswers[requestID]
883         del self.waitingForAnswers[requestID]
884         d.callback(self.unserialize(netResult))
885
886     ##
887     # failure
888     ##
889     def _sendFailureOrError(self, fail, requestID):
890         """
891         Call L{_sendError} or L{_sendFailure}, depending on whether C{fail}
892         represents an L{Error} subclass or not.
893         """
894         if fail.check(Error) is None:
895             self._sendFailure(fail, requestID)
896         else:
897             self._sendError(fail, requestID)
898
899
900     def _sendFailure(self, fail, requestID):
901         """Log error and then send it."""
902         log.msg("Peer will receive following PB traceback:")
903         log.err(fail)
904         self._sendError(fail, requestID)
905
906     def _sendError(self, fail, requestID):
907         """(internal) Send an error for a previously sent message.
908         """
909         if isinstance(fail, failure.Failure):
910             # If the failures value is jellyable or allowed through security,
911             # send the value
912             if (isinstance(fail.value, Jellyable) or
913                 self.security.isClassAllowed(fail.value.__class__)):
914                 fail = fail.value
915             elif not isinstance(fail, CopyableFailure):
916                 fail = failure2Copyable(fail, self.factory.unsafeTracebacks)
917         if isinstance(fail, CopyableFailure):
918             fail.unsafeTracebacks = self.factory.unsafeTracebacks
919         self.sendCall("error", requestID, self.serialize(fail))
920
921     def proto_error(self, requestID, fail):
922         """(internal) Deal with an error.
923         """
924         d = self.waitingForAnswers[requestID]
925         del self.waitingForAnswers[requestID]
926         d.errback(self.unserialize(fail))
927
928     ##
929     # refcounts
930     ##
931
932     def sendDecRef(self, objectID):
933         """(internal) Send a DECREF directive.
934         """
935         self.sendCall("decref", objectID)
936
937     def proto_decref(self, objectID):
938         """(internal) Decrement the reference count of an object.
939
940         If the reference count is zero, it will free the reference to this
941         object.
942         """
943         refs = self.localObjects[objectID].decref()
944         if refs == 0:
945             puid = self.localObjects[objectID].object.processUniqueID()
946             del self.luids[puid]
947             del self.localObjects[objectID]
948             self._localCleanup.pop(puid, lambda: None)()
949
950     ##
951     # caching
952     ##
953
954     def decCacheRef(self, objectID):
955         """(internal) Send a DECACHE directive.
956         """
957         self.sendCall("decache", objectID)
958
959     def proto_decache(self, objectID):
960         """(internal) Decrement the reference count of a cached object.
961
962         If the reference count is zero, free the reference, then send an
963         'uncached' directive.
964         """
965         refs = self.remotelyCachedObjects[objectID].decref()
966         # log.msg('decaching: %s #refs: %s' % (objectID, refs))
967         if refs == 0:
968             lobj = self.remotelyCachedObjects[objectID]
969             cacheable = lobj.object
970             perspective = lobj.perspective
971             # TODO: force_decache needs to be able to force-invalidate a
972             # cacheable reference.
973             try:
974                 cacheable.stoppedObserving(perspective, RemoteCacheObserver(self, cacheable, perspective))
975             except:
976                 log.deferr()
977             puid = cacheable.processUniqueID()
978             del self.remotelyCachedLUIDs[puid]
979             del self.remotelyCachedObjects[objectID]
980             self.sendCall("uncache", objectID)
981
982     def proto_uncache(self, objectID):
983         """(internal) Tell the client it is now OK to uncache an object.
984         """
985         # log.msg("uncaching locally %d" % objectID)
986         obj = self.locallyCachedObjects[objectID]
987         obj.broker = None
988 ##         def reallyDel(obj=obj):
989 ##             obj.__really_del__()
990 ##         obj.__del__ = reallyDel
991         del self.locallyCachedObjects[objectID]
992
993
994
995 def respond(challenge, password):
996     """Respond to a challenge.
997
998     This is useful for challenge/response authentication.
999     """
1000     m = md5()
1001     m.update(password)
1002     hashedPassword = m.digest()
1003     m = md5()
1004     m.update(hashedPassword)
1005     m.update(challenge)
1006     doubleHashedPassword = m.digest()
1007     return doubleHashedPassword
1008
1009 def challenge():
1010     """I return some random data."""
1011     crap = ''
1012     for x in range(random.randrange(15,25)):
1013         crap = crap + chr(random.randint(65,90))
1014     crap = md5(crap).digest()
1015     return crap
1016
1017
1018 class PBClientFactory(protocol.ClientFactory):
1019     """
1020     Client factory for PB brokers.
1021
1022     As with all client factories, use with reactor.connectTCP/SSL/etc..
1023     getPerspective and getRootObject can be called either before or
1024     after the connect.
1025     """
1026
1027     protocol = Broker
1028     unsafeTracebacks = False
1029
1030     def __init__(self, unsafeTracebacks=False, security=globalSecurity):
1031         """
1032         @param unsafeTracebacks: if set, tracebacks for exceptions will be sent
1033             over the wire.
1034         @type unsafeTracebacks: C{bool}
1035
1036         @param security: security options used by the broker, default to
1037             C{globalSecurity}.
1038         @type security: L{twisted.spread.jelly.SecurityOptions}
1039         """
1040         self.unsafeTracebacks = unsafeTracebacks
1041         self.security = security
1042         self._reset()
1043
1044
1045     def buildProtocol(self, addr):
1046         """
1047         Build the broker instance, passing the security options to it.
1048         """
1049         p = self.protocol(isClient=True, security=self.security)
1050         p.factory = self
1051         return p
1052
1053
1054     def _reset(self):
1055         self.rootObjectRequests = [] # list of deferred
1056         self._broker = None
1057         self._root = None
1058
1059     def _failAll(self, reason):
1060         deferreds = self.rootObjectRequests
1061         self._reset()
1062         for d in deferreds:
1063             d.errback(reason)
1064
1065     def clientConnectionFailed(self, connector, reason):
1066         self._failAll(reason)
1067
1068     def clientConnectionLost(self, connector, reason, reconnecting=0):
1069         """Reconnecting subclasses should call with reconnecting=1."""
1070         if reconnecting:
1071             # any pending requests will go to next connection attempt
1072             # so we don't fail them.
1073             self._broker = None
1074             self._root = None
1075         else:
1076             self._failAll(reason)
1077
1078     def clientConnectionMade(self, broker):
1079         self._broker = broker
1080         self._root = broker.remoteForName("root")
1081         ds = self.rootObjectRequests
1082         self.rootObjectRequests = []
1083         for d in ds:
1084             d.callback(self._root)
1085
1086     def getRootObject(self):
1087         """Get root object of remote PB server.
1088
1089         @return: Deferred of the root object.
1090         """
1091         if self._broker and not self._broker.disconnected:
1092            return defer.succeed(self._root)
1093         d = defer.Deferred()
1094         self.rootObjectRequests.append(d)
1095         return d
1096
1097     def disconnect(self):
1098         """If the factory is connected, close the connection.
1099
1100         Note that if you set up the factory to reconnect, you will need to
1101         implement extra logic to prevent automatic reconnection after this
1102         is called.
1103         """
1104         if self._broker:
1105             self._broker.transport.loseConnection()
1106
1107     def _cbSendUsername(self, root, username, password, client):
1108         return root.callRemote("login", username).addCallback(
1109             self._cbResponse, password, client)
1110
1111     def _cbResponse(self, (challenge, challenger), password, client):
1112         return challenger.callRemote("respond", respond(challenge, password), client)
1113
1114
1115     def _cbLoginAnonymous(self, root, client):
1116         """
1117         Attempt an anonymous login on the given remote root object.
1118
1119         @type root: L{RemoteReference}
1120         @param root: The object on which to attempt the login, most likely
1121             returned by a call to L{PBClientFactory.getRootObject}.
1122
1123         @param client: A jellyable object which will be used as the I{mind}
1124             parameter for the login attempt.
1125
1126         @rtype: L{Deferred}
1127         @return: A L{Deferred} which will be called back with a
1128             L{RemoteReference} to an avatar when anonymous login succeeds, or
1129             which will errback if anonymous login fails.
1130         """
1131         return root.callRemote("loginAnonymous", client)
1132
1133
1134     def login(self, credentials, client=None):
1135         """
1136         Login and get perspective from remote PB server.
1137
1138         Currently the following credentials are supported::
1139
1140             L{twisted.cred.credentials.IUsernamePassword}
1141             L{twisted.cred.credentials.IAnonymous}
1142
1143         @rtype: L{Deferred}
1144         @return: A L{Deferred} which will be called back with a
1145             L{RemoteReference} for the avatar logged in to, or which will
1146             errback if login fails.
1147         """
1148         d = self.getRootObject()
1149
1150         if IAnonymous.providedBy(credentials):
1151             d.addCallback(self._cbLoginAnonymous, client)
1152         else:
1153             d.addCallback(
1154                 self._cbSendUsername, credentials.username,
1155                 credentials.password, client)
1156         return d
1157
1158
1159
1160 class PBServerFactory(protocol.ServerFactory):
1161     """
1162     Server factory for perspective broker.
1163
1164     Login is done using a Portal object, whose realm is expected to return
1165     avatars implementing IPerspective. The credential checkers in the portal
1166     should accept IUsernameHashedPassword or IUsernameMD5Password.
1167
1168     Alternatively, any object providing or adaptable to L{IPBRoot} can be
1169     used instead of a portal to provide the root object of the PB server.
1170     """
1171
1172     unsafeTracebacks = False
1173
1174     # object broker factory
1175     protocol = Broker
1176
1177     def __init__(self, root, unsafeTracebacks=False, security=globalSecurity):
1178         """
1179         @param root: factory providing the root Referenceable used by the broker.
1180         @type root: object providing or adaptable to L{IPBRoot}.
1181
1182         @param unsafeTracebacks: if set, tracebacks for exceptions will be sent
1183             over the wire.
1184         @type unsafeTracebacks: C{bool}
1185
1186         @param security: security options used by the broker, default to
1187             C{globalSecurity}.
1188         @type security: L{twisted.spread.jelly.SecurityOptions}
1189         """
1190         self.root = IPBRoot(root)
1191         self.unsafeTracebacks = unsafeTracebacks
1192         self.security = security
1193
1194
1195     def buildProtocol(self, addr):
1196         """
1197         Return a Broker attached to the factory (as the service provider).
1198         """
1199         proto = self.protocol(isClient=False, security=self.security)
1200         proto.factory = self
1201         proto.setNameForLocal("root", self.root.rootObject(proto))
1202         return proto
1203
1204     def clientConnectionMade(self, protocol):
1205         # XXX does this method make any sense?
1206         pass
1207
1208
1209 class IUsernameMD5Password(ICredentials):
1210     """I encapsulate a username and a hashed password.
1211
1212     This credential is used for username/password over
1213     PB. CredentialCheckers which check this kind of credential must
1214     store the passwords in plaintext form or as a MD5 digest.
1215
1216     @type username: C{str} or C{Deferred}
1217     @ivar username: The username associated with these credentials.
1218     """
1219
1220     def checkPassword(password):
1221         """Validate these credentials against the correct password.
1222
1223         @param password: The correct, plaintext password against which to
1224             check.
1225
1226         @return: a deferred which becomes, or a boolean indicating if the
1227             password matches.
1228         """
1229
1230     def checkMD5Password(password):
1231         """Validate these credentials against the correct MD5 digest of password.
1232
1233         @param password: The correct, plaintext password against which to
1234             check.
1235
1236         @return: a deferred which becomes, or a boolean indicating if the
1237             password matches.
1238         """
1239
1240
1241 class _PortalRoot:
1242     """Root object, used to login to portal."""
1243
1244     implements(IPBRoot)
1245
1246     def __init__(self, portal):
1247         self.portal = portal
1248
1249     def rootObject(self, broker):
1250         return _PortalWrapper(self.portal, broker)
1251
1252 registerAdapter(_PortalRoot, Portal, IPBRoot)
1253
1254
1255
1256 class _JellyableAvatarMixin:
1257     """
1258     Helper class for code which deals with avatars which PB must be capable of
1259     sending to a peer.
1260     """
1261     def _cbLogin(self, (interface, avatar, logout)):
1262         """
1263         Ensure that the avatar to be returned to the client is jellyable and
1264         set up disconnection notification to call the realm's logout object.
1265         """
1266         if not IJellyable.providedBy(avatar):
1267             avatar = AsReferenceable(avatar, "perspective")
1268
1269         puid = avatar.processUniqueID()
1270
1271         def dereferenceLogout():
1272             self.broker.dontNotifyOnDisconnect(logout)
1273             logout()
1274
1275         self.broker._localCleanup[puid] = dereferenceLogout
1276         # No special helper function is necessary for notifyOnDisconnect
1277         # because dereference callbacks won't be invoked if the connection is
1278         # randomly dropped.  I'm not sure those are ideal semantics, but this
1279         # is the only user of the (private) API at the moment and it works just
1280         # fine as things are. -exarkun
1281         self.broker.notifyOnDisconnect(logout)
1282         return avatar
1283
1284
1285
1286 class _PortalWrapper(Referenceable, _JellyableAvatarMixin):
1287     """
1288     Root Referenceable object, used to login to portal.
1289     """
1290
1291     def __init__(self, portal, broker):
1292         self.portal = portal
1293         self.broker = broker
1294
1295
1296     def remote_login(self, username):
1297         """
1298         Start of username/password login.
1299         """
1300         c = challenge()
1301         return c, _PortalAuthChallenger(self.portal, self.broker, username, c)
1302
1303
1304     def remote_loginAnonymous(self, mind):
1305         """
1306         Attempt an anonymous login.
1307
1308         @param mind: An object to use as the mind parameter to the portal login
1309             call (possibly None).
1310
1311         @rtype: L{Deferred}
1312         @return: A Deferred which will be called back with an avatar when login
1313             succeeds or which will be errbacked if login fails somehow.
1314         """
1315         d = self.portal.login(Anonymous(), mind, IPerspective)
1316         d.addCallback(self._cbLogin)
1317         return d
1318
1319
1320
1321 class _PortalAuthChallenger(Referenceable, _JellyableAvatarMixin):
1322     """
1323     Called with response to password challenge.
1324     """
1325     implements(IUsernameHashedPassword, IUsernameMD5Password)
1326
1327     def __init__(self, portal, broker, username, challenge):
1328         self.portal = portal
1329         self.broker = broker
1330         self.username = username
1331         self.challenge = challenge
1332
1333
1334     def remote_respond(self, response, mind):
1335         self.response = response
1336         d = self.portal.login(self, mind, IPerspective)
1337         d.addCallback(self._cbLogin)
1338         return d
1339
1340
1341     # IUsernameHashedPassword:
1342     def checkPassword(self, password):
1343         return self.checkMD5Password(md5(password).digest())
1344
1345
1346     # IUsernameMD5Password
1347     def checkMD5Password(self, md5Password):
1348         md = md5()
1349         md.update(md5Password)
1350         md.update(self.challenge)
1351         correct = md.digest()
1352         return self.response == correct
1353
1354
1355 __all__ = [
1356     # Everything from flavors is exposed publically here.
1357     'IPBRoot', 'Serializable', 'Referenceable', 'NoSuchMethod', 'Root',
1358     'ViewPoint', 'Viewable', 'Copyable', 'Jellyable', 'Cacheable',
1359     'RemoteCopy', 'RemoteCache', 'RemoteCacheObserver', 'copyTags',
1360     'setUnjellyableForClass', 'setUnjellyableFactoryForClass',
1361     'setUnjellyableForClassTree',
1362
1363     'MAX_BROKER_REFS', 'portno',
1364
1365     'ProtocolError', 'DeadReferenceError', 'Error', 'PBConnectionLost',
1366     'RemoteMethod', 'IPerspective', 'Avatar', 'AsReferenceable',
1367     'RemoteReference', 'CopyableFailure', 'CopiedFailure', 'failure2Copyable',
1368     'Broker', 'respond', 'challenge', 'PBClientFactory', 'PBServerFactory',
1369     'IUsernameMD5Password',
1370     ]
Note: See TracBrowser for help on using the browser.