Ticket #5732: multicall2.patch

File multicall2.patch, 16.7 KB (added by braudel, 4 years ago)

updated patch with individual deferred per queued rpc

  • twisted/web/xmlrpc.py

     
    318318    xmlrpc_methodSignature.signature = [['array', 'string'],
    319319                                        ['string', 'string']]
    320320
     321    @withRequest
     322    def xmlrpc_multicall(self, request, procedureList):
     323        """
     324        Execute several RPC methods in a single XMLRPC request using the
     325        multicall object.
    321326
     327        Example:
     328            On the server side, just load the instrospection so your
     329            server has system.multicall. Then on the client:
     330
     331            from twisted.web.xmlrpc import Proxy
     332            from twisted.web.xmlrpc import MultiCall
     333
     334            proxy = Proxy('url of your server')
     335
     336            multiRPC = Multicall( proxy )
     337            # queue a few calls
     338            multiRPC.system.listMethods()
     339            multiRPC.system.methodHelp('system.listMethods')
     340            multiRPC.system.methodSignature('system.listMethods')
     341
     342            def handleResults(results):
     343                for success, result in results:
     344                    print result
     345
     346            multiRPC().addCallback(handleResults)
     347
     348        @param request: the http C{request} object, obtained
     349            via the @withRequest decorator
     350        @type request: L{http.Request}
     351
     352        @param procedureList: A list of dictionaries, each representing an
     353            individual rpc call, containing the C{methodName} and the
     354            C{params}
     355        @type procedureList: list
     356
     357        @return: L{defer.DeferredList} of the deferreds for each procedure
     358            in procedure list
     359        @rtype: L{defer.DeferredList}
     360        """
     361        def callError(error):
     362            """
     363            errorback to handle individual call errors
     364
     365            Individual errors in a multicall are returned as
     366            dictionaries. See U{http://www.xmlrpc.com/discuss/msgReader$1208}.
     367
     368            @param result: C{failure}
     369            @type result: L{Failure}
     370
     371            @rtype: dict
     372            @return: a dict with keys C{faultCode} and C{faultString}
     373            """
     374            log.err(error.value)
     375            return {'faultCode':   self.FAILURE,
     376                    'faultString': (error.value.faultString
     377                        if isinstance(error.value, Fault)
     378                        else getattr(error.value, 'message', ''))}
     379           
     380        def prepareCallResponse(result):
     381            """
     382            callback to convert a call C{response} to a list
     383
     384            The xmlrpc multicall spec expects a list wrapping
     385            each call response.
     386            See U{http://www.xmlrpc.com/discuss/msgReader$1208}.
     387
     388            @param result: C{response}
     389            @type result: any python type
     390
     391            @rtype: list
     392            @return: a list with response as element 0 
     393            """
     394            return [result]
     395
     396        def run(procedurePath, params):
     397            """
     398            run an individual procedure from the L{procedureList} and
     399            returns a C{deferred}
     400
     401
     402            @param procedurePath: string naming a procedure
     403            @type procedurePath: str
     404           
     405            @param params: list of arguments to be passed to the procedure
     406            @type params: list
     407
     408            @return: a C{deferred} object with prepareCallResponse and
     409            callError attached.
     410            @rtype: L{defer.Deferred}
     411            """
     412            try:
     413                procedure = self._xmlrpc_parent.lookupProcedure(procedurePath)
     414            except NoSuchFunction, e:
     415                return defer.fail(e).addErrback(callError)
     416            else:
     417                if getattr(procedure, 'withRequest', False):
     418                    call = defer.maybeDeferred(procedure, request, *params)
     419                else:
     420                    call = defer.maybeDeferred(procedure, *params)
     421               
     422                call.addCallback(prepareCallResponse)
     423                call.addErrback(callError)
     424                return call
     425
     426        results = [
     427            run(procedure['methodName'], procedure['params'])
     428            for procedure in procedureList]
     429
     430        return (defer.DeferredList(results)
     431            .addCallback(lambda results: [r[1] for r in results]))
     432
     433    xmlrpc_multicall.signature = [['array', 'array']]
     434 
     435
     436class _DeferredMultiCallProcedure(object):
     437    '''
     438    A helper object to store calls made on the
     439    MultiCall object for batch execution
     440    '''
     441    def __init__(self, call_list, name):
     442        self.__call_list = call_list
     443        self.__name = name
     444   
     445    def __getattr__(self, name):
     446        '''
     447        magic to emulate x.y.name lookups for
     448        a remote procedure
     449        ''' 
     450        return _DeferredMultiCallProcedure(
     451            self.__call_list,
     452            "%s.%s" % (self.__name, name)
     453        )
     454   
     455    def __call__(self, *args):
     456        '''
     457        "calling" the RPC on the multicall queues a deferred,
     458        the procedure name, and its calling args.
     459
     460        @return: a L{defer.Deferred} that will be fired when the
     461            results for this RPC are processed
     462        @rtype: L{defer.Deferred}
     463        '''
     464        d = defer.Deferred()
     465        self.__call_list.append((d, self.__name, args))
     466        return d
     467
     468class MultiCall(xmlrpclib.MultiCall):
     469    """
     470    @param server: a object used to boxcar method calls
     471    @type server should be a twisted xmlrpc L{Proxy} object.
     472
     473    @return: a L{defer.DeferredList} of all the deferreds for
     474        each queued rpc call
     475    @rtype: L{defer.DeferredList}
     476
     477    Methods can be added to the MultiCall using normal
     478    method call syntax e.g.:
     479   
     480    proxy = Proxy('http://advogato.org/XMLRPC')
     481
     482    multicall = MultiCall(proxy)
     483    d1 = multicall.add(2,3)
     484    d2 = multicall.add(5,6)
     485   
     486    or
     487
     488    d3 = multicall.callRemote('add', 2, 3)
     489
     490    To execute the multicall, call the MultiCall object
     491    and attach callbacks, errbacks to the returned
     492    deferred e.g.:
     493   
     494    def printResults(results):
     495        for result in results:
     496            print result[1]
     497
     498    d = multicall()
     499    d.addCallback(printResults)
     500    """
     501
     502    def __getattr__(self, name):
     503        ''' get a ref to a helper object to emulate
     504        "RPC properties lookup"
     505        '''
     506        return _DeferredMultiCallProcedure(self.__call_list, name)
     507
     508    def callRemote(self, method, *args):
     509        ''' queue a call for c{method} on this multicall object
     510        with given arguments.
     511
     512        @return: a L{defer.Deferred} that will fire with the method response,
     513            or a failure if the method failed.
     514        '''
     515        return getattr(self, method)(*args)
     516
     517    def __call__(self):
     518        """
     519        execute the multicall, processing the deferreds for each
     520        procedure once the results are ready
     521
     522        @return: a L{defer.DeferredList} that will fire all the queued deferreds
     523        """
     524        marshalled_list = []
     525        deferreds = []
     526        for deferred, name, args in self.__call_list:
     527            marshalled_list.append({
     528                'methodName': name,
     529                'params': args})
     530            deferreds.append(deferred)
     531
     532        def processResults(results, deferreds):
     533            '''
     534            callback to return an xmlrpclib
     535            MultiCallIterator of the results
     536            '''
     537            for d, result in zip(deferreds, results):
     538                if isinstance(result, dict):
     539                    d.errback(Fault(result['faultCode'],
     540                        result['faultString']))
     541               
     542                elif isinstance(result, list):
     543                    d.callback(result[0])
     544
     545                else:
     546                    raise ValueError(
     547                        "unexpected type in multicall result")
     548
     549        self.__server.callRemote(
     550            'system.multicall', marshalled_list
     551        ).addCallback(processResults, deferreds)
     552
     553        return defer.DeferredList(deferreds)
     554
     555
    322556def addIntrospection(xmlrpc):
    323557    """
    324558    Add Introspection support to an XMLRPC server.
  • twisted/web/test/test_xmlrpc.py

     
    1414from twisted.web import xmlrpc
    1515from twisted.web.xmlrpc import (
    1616    XMLRPC, payloadTemplate, addIntrospection, _QueryFactory, Proxy,
    17     withRequest)
     17    withRequest, MultiCall)
    1818from twisted.web import server, static, client, error, http
    1919from twisted.internet import reactor, defer
    2020from twisted.internet.error import ConnectionDone
     
    2727    sslSkip = "OpenSSL not present"
    2828else:
    2929    sslSkip = None
     30from twisted.internet.threads import deferToThread
    3031
    31 
    3232class AsyncXMLRPCTests(unittest.TestCase):
    3333    """
    3434    Tests for L{XMLRPC}'s support of Deferreds.
     
    686686                 'deferFault', 'dict', 'echo', 'fail', 'fault',
    687687                 'pair', 'system.listMethods',
    688688                 'system.methodHelp',
    689                  'system.methodSignature', 'withRequest'])
     689                 'system.methodSignature', 'system.multicall',
     690                 'withRequest'])
    690691
    691692        d = self.proxy().callRemote("system.listMethods")
    692693        d.addCallback(cbMethods)
     
    720721        return defer.DeferredList(dl, fireOnOneErrback=True)
    721722
    722723
     724class XMLRPCTestMultiCall(XMLRPCTestCase):
     725
     726    def setUp(self):
     727        xmlrpc = Test()
     728        addIntrospection(xmlrpc)
     729        self.p = reactor.listenTCP(0, server.Site(xmlrpc),interface="127.0.0.1")
     730        self.port = self.p.getHost().port
     731        self.factories = []
     732
     733
     734    def test_multicall(self):
     735        '''
     736        test a suscessfull multicall
     737        '''
     738        inputs = range(5)
     739        m = MultiCall(self.proxy())
     740        for x in inputs:
     741            m.echo(x)
     742
     743        def testResults(results):
     744            self.assertEqual(inputs, [x[1] for x in results])
     745
     746        return m().addCallback(testResults)
     747
     748
     749    def test_multicall_callRemote(self):
     750        '''
     751        test a suscessfull multicall using
     752        multicall.callRemote instead of attribute lookups
     753        '''
     754        inputs = range(5)
     755        m = MultiCall(self.proxy())
     756        for x in inputs:
     757            m.callRemote('echo', x)
     758
     759        def testResults(results):
     760            self.assertEqual(inputs, [x[1] for x in results])
     761
     762        return m().addCallback(testResults)
     763
     764
     765    def test_multicall_with_callbacks(self):
     766        '''
     767        test correct execution of callbacks added to
     768        multicall returned deferred for each individual queued
     769        call
     770        '''
     771        inputs = range(5)
     772        m = MultiCall(self.proxy())
     773        for x in inputs:
     774            d = m.echo(x)
     775            d.addCallback( lambda x : x*x )
     776
     777        def testResults(results):
     778            self.assertEqual([ x*x for x in inputs], [x[1] for x in results])
     779
     780        return m().addCallback(testResults)
     781
     782    def test_multicall_errorback(self):
     783        '''
     784        test  an error (an invalid (not found) method )
     785        does not propagate if properly handled in the errorback
     786        of an individual deferred
     787        '''
     788        def trapFoo(error):
     789            error.trap(xmlrpclib.Fault)
     790            self.assertEqual(error.value.faultString,
     791                'procedure foo not found',
     792                'check we have a failure message'
     793                )
     794            self.flushLoggedErrors(xmlrpc.NoSuchFunction)
     795
     796
     797        m = MultiCall(self.proxy())
     798        m.echo(1)
     799        # method not present on server
     800        m.foo().addErrback(trapFoo)
     801        m.echo(2)
     802
     803        def handleErrors(error):
     804            error.trap(xmlrpclib.Fault)
     805            self.assertEqual(error.value.faultString,
     806                'xmlrpc_echo() takes exactly 2 arguments (4 given)')
     807            self.flushLoggedErrors(TypeError)
     808
     809        m.echo(1,2,3).addErrback(handleErrors)
     810
     811        def testResults(results):
     812            '''
     813            the errorback should have trapped the error
     814            '''
     815            self.assertEqual(results[1], (True, None),
     816            'failure trapped in errorback does not propagate to deferredList results')
     817
     818        return m().addCallback(testResults)
     819
     820    def test_multicall_withRequest(self):
     821        '''
     822        Test that methods decorated with @withRequest are handled correctly
     823        '''
     824        m = MultiCall(self.proxy())
     825        m.echo(1)
     826        # method decorated with withRequest
     827        msg = 'hoho'
     828        m.withRequest(msg)
     829        m.echo(2)
     830
     831        def testResults(results):
     832            '''
     833            test that a withRequest decorated method was properly handled
     834            '''
     835            self.assertEqual(results[1][1],
     836                'POST %s' % msg, 'check withRequest decorated result')
     837        return m().addCallback(testResults)
     838
     839    def test_multicall_with_xmlrpclib(self):
     840        '''
     841        check that the sever's response is also compatible with xmlrpclib
     842        MultiCall client
     843        '''
     844        inputs = range(5)
     845        def doMulticall(inputs):
     846            m = xmlrpclib.MultiCall(
     847                xmlrpclib.ServerProxy("http://127.0.0.1:%d" % self.port)
     848            )
     849            for x in inputs:
     850                m.echo(x)
     851            return m()
     852
     853        def testResults(iterator):
     854            self.assertEqual(
     855                inputs,
     856                list(iterator),
     857                'xmlrpclib multicall can talk to the twisted multicall')
     858
     859        return deferToThread(doMulticall, inputs).addCallback(testResults)
     860
    723861class XMLRPCClientErrorHandling(unittest.TestCase):
    724862    """
    725863    Test error handling on the xmlrpc client.
  • doc/web/howto/xmlrpc.xhtml

     
    234234no <code class="python">help</code> attribute is specified, the
    235235method's documentation string is used instead.</p>
    236236
     237
     238 
    237239<h2>SOAP Support</h2>
    238240
    239241<p>From the point of view of a Twisted developer, there is little difference
     
    295297[8, 15]
    296298</pre>
    297299
     300<h3>Using multicall Objects</h3>
     301<p>Another informal pattern commonly used along with xmlrpc
     302introspection is the use of a multicall object to boxcar several
     303rpc calls in a single http request.
     304
     305An instance of a multicall object
     306is created on the client side and several RPC calls are queued by
     307calling them on this multicall instance. Every RPC called on the
     308multicall returns a deferred that you can use to attach callbacks
     309to each queued rpc response.
     310
     311Executing the multicall instance itself triggers a request to
     312<code>system.multicall</code> with a list of dictionaries representing
     313each of the procedures to call, returning a <code>defer.DeferredList</code>
     314of the deferreds corresponding to each individual call.
     315
     316Let's see an
     317example to talk to the server above using a multicall:</p>
     318
     319<pre class="python">
     320from twisted.web.xmlrpc import Proxy, MultiCall, NoSuchFunction
     321from twisted.internet import reactor
     322
     323def printResults(results):
     324    for result in results:
     325        print result[1]
     326    reactor.stop()
     327
     328proxy = Proxy('http://127.0.0.1:7080')
     329
     330multiRPC = MultiCall(proxy)
     331
     332# queue a few echo calls, with a callback to square the results
     333for x in range(10):
     334    multiRPC.echo(x).addCallback(lamdba x: x*x)
     335
     336# you can of course use the <code>callRemote</code> method instead
     337# for a better twisted-like feeling
     338for x in range(10,20):
     339    multiRPC.callRemote('echo', x).addCallback(lamdba x: x*x)
     340
     341# now queue a few more echo calls with a callback that will
     342# indicate the echoed letter and their index in the original string
     343for index, x in enumerate('hello'):
     344    multiRPC.echo(x).addCallback(lambda result, index: '%s found at %d' % (result, index))
     345
     346# individual deferreds ideally should also handle errors
     347def handleErrors(error):
     348    error.trap(xmlrpclib.Fault)
     349    print "yay! The server said %s" % error.value.faultString
     350multiRPC.foo().addErrback(NoSuchFunction)
     351
     352# finally execute the multiRPC instance, to send the request,
     353# then handle the returned DeferredList
     354# results with the printResults callback
     355multiRPC().addCallback(printResults)
     356reactor.run()
     357</pre>
     358
     359The <code>system.multicall</code> method is also 100% compatible with the xmlrpclib
     360MultiCall, so you can also use the blocking xmlrpclib as the client:
     361
     362<pre>
     363from xmlrpclib import ServerProxy, Multicall
     364
     365proxy = ServerProxy('http://127.0.0.1:7080')
     366
     367multiRPC = MultiCall(proxy)
     368for x in range(10):
     369    multiRPC.echo(x)
     370
     371for result in multiRPC():
     372    print result * result
     373</pre>
     374
     375
    298376<h2>Serving SOAP and XML-RPC simultaneously</h2>
    299377
    300378<p><code class="API">twisted.web.xmlrpc.XMLRPC</code> and <code