Ticket #5732: multicall2.patch

File multicall2.patch, 16.7 KB (added by braudel, 23 months ago)

updated patch with individual deferred per queued rpc

  • twisted/web/xmlrpc.py

     
    318318    xmlrpc_methodSignature.signature = [['array', 'string'], 
    319319                                        ['string', 'string']] 
    320320 
     321    @withRequest 
     322    def xmlrpc_multicall(self, request, procedureList): 
     323        """ 
     324        Execute several RPC methods in a single XMLRPC request using the 
     325        multicall object. 
    321326 
     327        Example: 
     328            On the server side, just load the instrospection so your 
     329            server has system.multicall. Then on the client: 
     330 
     331            from twisted.web.xmlrpc import Proxy 
     332            from twisted.web.xmlrpc import MultiCall 
     333 
     334            proxy = Proxy('url of your server') 
     335 
     336            multiRPC = Multicall( proxy ) 
     337            # queue a few calls 
     338            multiRPC.system.listMethods() 
     339            multiRPC.system.methodHelp('system.listMethods') 
     340            multiRPC.system.methodSignature('system.listMethods') 
     341 
     342            def handleResults(results): 
     343                for success, result in results: 
     344                    print result 
     345 
     346            multiRPC().addCallback(handleResults) 
     347 
     348        @param request: the http C{request} object, obtained  
     349            via the @withRequest decorator  
     350        @type request: L{http.Request} 
     351 
     352        @param procedureList: A list of dictionaries, each representing an 
     353            individual rpc call, containing the C{methodName} and the  
     354            C{params} 
     355        @type procedureList: list 
     356 
     357        @return: L{defer.DeferredList} of the deferreds for each procedure 
     358            in procedure list 
     359        @rtype: L{defer.DeferredList} 
     360        """ 
     361        def callError(error): 
     362            """  
     363            errorback to handle individual call errors 
     364 
     365            Individual errors in a multicall are returned as 
     366            dictionaries. See U{http://www.xmlrpc.com/discuss/msgReader$1208}. 
     367 
     368            @param result: C{failure} 
     369            @type result: L{Failure} 
     370 
     371            @rtype: dict 
     372            @return: a dict with keys C{faultCode} and C{faultString} 
     373            """  
     374            log.err(error.value) 
     375            return {'faultCode':   self.FAILURE, 
     376                    'faultString': (error.value.faultString 
     377                        if isinstance(error.value, Fault) 
     378                        else getattr(error.value, 'message', ''))} 
     379             
     380        def prepareCallResponse(result): 
     381            """ 
     382            callback to convert a call C{response} to a list 
     383 
     384            The xmlrpc multicall spec expects a list wrapping  
     385            each call response.  
     386            See U{http://www.xmlrpc.com/discuss/msgReader$1208}. 
     387 
     388            @param result: C{response} 
     389            @type result: any python type 
     390 
     391            @rtype: list 
     392            @return: a list with response as element 0   
     393            """ 
     394            return [result] 
     395 
     396        def run(procedurePath, params): 
     397            """ 
     398            run an individual procedure from the L{procedureList} and 
     399            returns a C{deferred} 
     400 
     401 
     402            @param procedurePath: string naming a procedure 
     403            @type procedurePath: str 
     404             
     405            @param params: list of arguments to be passed to the procedure 
     406            @type params: list 
     407 
     408            @return: a C{deferred} object with prepareCallResponse and 
     409            callError attached. 
     410            @rtype: L{defer.Deferred} 
     411            """ 
     412            try: 
     413                procedure = self._xmlrpc_parent.lookupProcedure(procedurePath) 
     414            except NoSuchFunction, e: 
     415                return defer.fail(e).addErrback(callError) 
     416            else: 
     417                if getattr(procedure, 'withRequest', False): 
     418                    call = defer.maybeDeferred(procedure, request, *params) 
     419                else: 
     420                    call = defer.maybeDeferred(procedure, *params) 
     421                 
     422                call.addCallback(prepareCallResponse) 
     423                call.addErrback(callError) 
     424                return call 
     425 
     426        results = [ 
     427            run(procedure['methodName'], procedure['params']) 
     428            for procedure in procedureList] 
     429 
     430        return (defer.DeferredList(results) 
     431            .addCallback(lambda results: [r[1] for r in results])) 
     432 
     433    xmlrpc_multicall.signature = [['array', 'array']] 
     434   
     435 
     436class _DeferredMultiCallProcedure(object): 
     437    ''' 
     438    A helper object to store calls made on the 
     439    MultiCall object for batch execution 
     440    ''' 
     441    def __init__(self, call_list, name): 
     442        self.__call_list = call_list 
     443        self.__name = name 
     444     
     445    def __getattr__(self, name): 
     446        ''' 
     447        magic to emulate x.y.name lookups for 
     448        a remote procedure 
     449        '''   
     450        return _DeferredMultiCallProcedure( 
     451            self.__call_list,  
     452            "%s.%s" % (self.__name, name) 
     453        ) 
     454     
     455    def __call__(self, *args): 
     456        ''' 
     457        "calling" the RPC on the multicall queues a deferred,  
     458        the procedure name, and its calling args.  
     459 
     460        @return: a L{defer.Deferred} that will be fired when the 
     461            results for this RPC are processed 
     462        @rtype: L{defer.Deferred} 
     463        ''' 
     464        d = defer.Deferred() 
     465        self.__call_list.append((d, self.__name, args)) 
     466        return d 
     467 
     468class MultiCall(xmlrpclib.MultiCall): 
     469    """ 
     470    @param server: a object used to boxcar method calls 
     471    @type server should be a twisted xmlrpc L{Proxy} object. 
     472 
     473    @return: a L{defer.DeferredList} of all the deferreds for 
     474        each queued rpc call 
     475    @rtype: L{defer.DeferredList} 
     476 
     477    Methods can be added to the MultiCall using normal 
     478    method call syntax e.g.: 
     479     
     480    proxy = Proxy('http://advogato.org/XMLRPC') 
     481 
     482    multicall = MultiCall(proxy) 
     483    d1 = multicall.add(2,3) 
     484    d2 = multicall.add(5,6) 
     485     
     486    or 
     487 
     488    d3 = multicall.callRemote('add', 2, 3) 
     489 
     490    To execute the multicall, call the MultiCall object  
     491    and attach callbacks, errbacks to the returned 
     492    deferred e.g.: 
     493     
     494    def printResults(results): 
     495        for result in results: 
     496            print result[1] 
     497 
     498    d = multicall() 
     499    d.addCallback(printResults) 
     500    """ 
     501 
     502    def __getattr__(self, name): 
     503        ''' get a ref to a helper object to emulate 
     504        "RPC properties lookup"  
     505        ''' 
     506        return _DeferredMultiCallProcedure(self.__call_list, name) 
     507 
     508    def callRemote(self, method, *args): 
     509        ''' queue a call for c{method} on this multicall object 
     510        with given arguments. 
     511 
     512        @return: a L{defer.Deferred} that will fire with the method response, 
     513            or a failure if the method failed. 
     514        ''' 
     515        return getattr(self, method)(*args) 
     516 
     517    def __call__(self): 
     518        """ 
     519        execute the multicall, processing the deferreds for each 
     520        procedure once the results are ready 
     521 
     522        @return: a L{defer.DeferredList} that will fire all the queued deferreds 
     523        """ 
     524        marshalled_list = [] 
     525        deferreds = [] 
     526        for deferred, name, args in self.__call_list: 
     527            marshalled_list.append({ 
     528                'methodName': name, 
     529                'params': args}) 
     530            deferreds.append(deferred) 
     531 
     532        def processResults(results, deferreds): 
     533            ''' 
     534            callback to return an xmlrpclib 
     535            MultiCallIterator of the results 
     536            ''' 
     537            for d, result in zip(deferreds, results): 
     538                if isinstance(result, dict): 
     539                    d.errback(Fault(result['faultCode'],  
     540                        result['faultString'])) 
     541                 
     542                elif isinstance(result, list): 
     543                    d.callback(result[0]) 
     544 
     545                else: 
     546                    raise ValueError( 
     547                        "unexpected type in multicall result") 
     548 
     549        self.__server.callRemote( 
     550            'system.multicall', marshalled_list 
     551        ).addCallback(processResults, deferreds) 
     552 
     553        return defer.DeferredList(deferreds) 
     554 
     555 
    322556def addIntrospection(xmlrpc): 
    323557    """ 
    324558    Add Introspection support to an XMLRPC server. 
  • twisted/web/test/test_xmlrpc.py

     
    1414from twisted.web import xmlrpc 
    1515from twisted.web.xmlrpc import ( 
    1616    XMLRPC, payloadTemplate, addIntrospection, _QueryFactory, Proxy, 
    17     withRequest) 
     17    withRequest, MultiCall) 
    1818from twisted.web import server, static, client, error, http 
    1919from twisted.internet import reactor, defer 
    2020from twisted.internet.error import ConnectionDone 
     
    2727    sslSkip = "OpenSSL not present" 
    2828else: 
    2929    sslSkip = None 
     30from twisted.internet.threads import deferToThread 
    3031 
    31  
    3232class AsyncXMLRPCTests(unittest.TestCase): 
    3333    """ 
    3434    Tests for L{XMLRPC}'s support of Deferreds. 
     
    686686                 'deferFault', 'dict', 'echo', 'fail', 'fault', 
    687687                 'pair', 'system.listMethods', 
    688688                 'system.methodHelp', 
    689                  'system.methodSignature', 'withRequest']) 
     689                 'system.methodSignature', 'system.multicall',  
     690                 'withRequest']) 
    690691 
    691692        d = self.proxy().callRemote("system.listMethods") 
    692693        d.addCallback(cbMethods) 
     
    720721        return defer.DeferredList(dl, fireOnOneErrback=True) 
    721722 
    722723 
     724class XMLRPCTestMultiCall(XMLRPCTestCase): 
     725 
     726    def setUp(self): 
     727        xmlrpc = Test() 
     728        addIntrospection(xmlrpc) 
     729        self.p = reactor.listenTCP(0, server.Site(xmlrpc),interface="127.0.0.1") 
     730        self.port = self.p.getHost().port 
     731        self.factories = [] 
     732 
     733 
     734    def test_multicall(self): 
     735        ''' 
     736        test a suscessfull multicall 
     737        ''' 
     738        inputs = range(5) 
     739        m = MultiCall(self.proxy()) 
     740        for x in inputs: 
     741            m.echo(x) 
     742 
     743        def testResults(results): 
     744            self.assertEqual(inputs, [x[1] for x in results]) 
     745 
     746        return m().addCallback(testResults) 
     747 
     748 
     749    def test_multicall_callRemote(self): 
     750        ''' 
     751        test a suscessfull multicall using 
     752        multicall.callRemote instead of attribute lookups 
     753        ''' 
     754        inputs = range(5) 
     755        m = MultiCall(self.proxy()) 
     756        for x in inputs: 
     757            m.callRemote('echo', x) 
     758 
     759        def testResults(results): 
     760            self.assertEqual(inputs, [x[1] for x in results]) 
     761 
     762        return m().addCallback(testResults) 
     763 
     764 
     765    def test_multicall_with_callbacks(self): 
     766        '''  
     767        test correct execution of callbacks added to 
     768        multicall returned deferred for each individual queued 
     769        call 
     770        ''' 
     771        inputs = range(5) 
     772        m = MultiCall(self.proxy()) 
     773        for x in inputs: 
     774            d = m.echo(x) 
     775            d.addCallback( lambda x : x*x ) 
     776 
     777        def testResults(results): 
     778            self.assertEqual([ x*x for x in inputs], [x[1] for x in results]) 
     779 
     780        return m().addCallback(testResults) 
     781 
     782    def test_multicall_errorback(self): 
     783        '''  
     784        test  an error (an invalid (not found) method )  
     785        does not propagate if properly handled in the errorback 
     786        of an individual deferred 
     787        ''' 
     788        def trapFoo(error): 
     789            error.trap(xmlrpclib.Fault) 
     790            self.assertEqual(error.value.faultString, 
     791                'procedure foo not found', 
     792                'check we have a failure message' 
     793                )  
     794            self.flushLoggedErrors(xmlrpc.NoSuchFunction) 
     795 
     796 
     797        m = MultiCall(self.proxy()) 
     798        m.echo(1) 
     799        # method not present on server 
     800        m.foo().addErrback(trapFoo) 
     801        m.echo(2) 
     802 
     803        def handleErrors(error): 
     804            error.trap(xmlrpclib.Fault) 
     805            self.assertEqual(error.value.faultString, 
     806                'xmlrpc_echo() takes exactly 2 arguments (4 given)') 
     807            self.flushLoggedErrors(TypeError) 
     808 
     809        m.echo(1,2,3).addErrback(handleErrors) 
     810 
     811        def testResults(results): 
     812            '''  
     813            the errorback should have trapped the error 
     814            ''' 
     815            self.assertEqual(results[1], (True, None), 
     816            'failure trapped in errorback does not propagate to deferredList results') 
     817 
     818        return m().addCallback(testResults) 
     819 
     820    def test_multicall_withRequest(self): 
     821        ''' 
     822        Test that methods decorated with @withRequest are handled correctly 
     823        ''' 
     824        m = MultiCall(self.proxy()) 
     825        m.echo(1) 
     826        # method decorated with withRequest 
     827        msg = 'hoho' 
     828        m.withRequest(msg) 
     829        m.echo(2) 
     830 
     831        def testResults(results): 
     832            ''' 
     833            test that a withRequest decorated method was properly handled 
     834            ''' 
     835            self.assertEqual(results[1][1],  
     836                'POST %s' % msg, 'check withRequest decorated result') 
     837        return m().addCallback(testResults) 
     838 
     839    def test_multicall_with_xmlrpclib(self): 
     840        ''' 
     841        check that the sever's response is also compatible with xmlrpclib 
     842        MultiCall client 
     843        ''' 
     844        inputs = range(5) 
     845        def doMulticall(inputs): 
     846            m = xmlrpclib.MultiCall( 
     847                xmlrpclib.ServerProxy("http://127.0.0.1:%d" % self.port) 
     848            ) 
     849            for x in inputs: 
     850                m.echo(x) 
     851            return m() 
     852 
     853        def testResults(iterator): 
     854            self.assertEqual( 
     855                inputs,  
     856                list(iterator),  
     857                'xmlrpclib multicall can talk to the twisted multicall') 
     858 
     859        return deferToThread(doMulticall, inputs).addCallback(testResults) 
     860 
    723861class XMLRPCClientErrorHandling(unittest.TestCase): 
    724862    """ 
    725863    Test error handling on the xmlrpc client. 
  • doc/web/howto/xmlrpc.xhtml

     
    234234no <code class="python">help</code> attribute is specified, the 
    235235method's documentation string is used instead.</p> 
    236236 
     237 
     238  
    237239<h2>SOAP Support</h2> 
    238240 
    239241<p>From the point of view of a Twisted developer, there is little difference 
     
    295297[8, 15] 
    296298</pre> 
    297299 
     300<h3>Using multicall Objects</h3> 
     301<p>Another informal pattern commonly used along with xmlrpc 
     302introspection is the use of a multicall object to boxcar several  
     303rpc calls in a single http request.  
     304 
     305An instance of a multicall object  
     306is created on the client side and several RPC calls are queued by  
     307calling them on this multicall instance. Every RPC called on the  
     308multicall returns a deferred that you can use to attach callbacks  
     309to each queued rpc response. 
     310 
     311Executing the multicall instance itself triggers a request to  
     312<code>system.multicall</code> with a list of dictionaries representing 
     313each of the procedures to call, returning a <code>defer.DeferredList</code> 
     314of the deferreds corresponding to each individual call.  
     315 
     316Let's see an  
     317example to talk to the server above using a multicall:</p> 
     318 
     319<pre class="python"> 
     320from twisted.web.xmlrpc import Proxy, MultiCall, NoSuchFunction 
     321from twisted.internet import reactor 
     322 
     323def printResults(results): 
     324    for result in results: 
     325        print result[1] 
     326    reactor.stop() 
     327 
     328proxy = Proxy('http://127.0.0.1:7080') 
     329 
     330multiRPC = MultiCall(proxy) 
     331 
     332# queue a few echo calls, with a callback to square the results 
     333for x in range(10): 
     334    multiRPC.echo(x).addCallback(lamdba x: x*x) 
     335 
     336# you can of course use the <code>callRemote</code> method instead 
     337# for a better twisted-like feeling 
     338for x in range(10,20): 
     339    multiRPC.callRemote('echo', x).addCallback(lamdba x: x*x) 
     340 
     341# now queue a few more echo calls with a callback that will  
     342# indicate the echoed letter and their index in the original string 
     343for index, x in enumerate('hello'): 
     344    multiRPC.echo(x).addCallback(lambda result, index: '%s found at %d' % (result, index)) 
     345 
     346# individual deferreds ideally should also handle errors 
     347def handleErrors(error): 
     348    error.trap(xmlrpclib.Fault) 
     349    print "yay! The server said %s" % error.value.faultString 
     350multiRPC.foo().addErrback(NoSuchFunction) 
     351 
     352# finally execute the multiRPC instance, to send the request,  
     353# then handle the returned DeferredList  
     354# results with the printResults callback 
     355multiRPC().addCallback(printResults) 
     356reactor.run() 
     357</pre> 
     358 
     359The <code>system.multicall</code> method is also 100% compatible with the xmlrpclib 
     360MultiCall, so you can also use the blocking xmlrpclib as the client: 
     361 
     362<pre> 
     363from xmlrpclib import ServerProxy, Multicall 
     364 
     365proxy = ServerProxy('http://127.0.0.1:7080') 
     366 
     367multiRPC = MultiCall(proxy) 
     368for x in range(10): 
     369    multiRPC.echo(x) 
     370 
     371for result in multiRPC(): 
     372    print result * result 
     373</pre> 
     374 
     375 
    298376<h2>Serving SOAP and XML-RPC simultaneously</h2> 
    299377 
    300378<p><code class="API">twisted.web.xmlrpc.XMLRPC</code> and <code