[Twisted-Python] XMLRPC server help neede

Brett Viren bv at bnl.gov
Wed Apr 27 09:19:58 MDT 2005


Roland Hedberg <roland.hedberg at adm.umu.se> writes:
> If the packet is well formed and the server knows what to do with it,
> it should reply to the client and then perform the action.
>
> My problem is how I would go about doing this. Conceptually I could
> imaging having a workqueue where I would place the message and then
> from the point of view of the client-sever communication just forget
> about it.
>
> Anyone who has done anything similar or has an idea on how to do this ?

I do this by subclassing twisted.web.xmlrpc.XMLRPC and handling the
query in a work queue (appended below) that runs in its own thread.
In my case the client doesn't care if it sent me well formed data or
not, so immediately after putting the query in the work queue I return
from the xmlrpc method and the client is free.  In my case, I do some
sanity checking of the query inside the queue.  In your case you'd do
the checking before stuffing the queue so you could inform the client.

Here is a sketch of the server chopped out from my code:

from twisted.web import xmlrpc
class DataSource(xmlrpc.XMLRPC):
    "The XML-RPC listener"

    def __init__(self,services,sem):
        xmlrpc.XMLRPC.__init__(self)
        xmlrpc.addIntrospection(self)
        self.data_cq = CommandQueue()
        self.sem = sem
        self.services = services

    def _handle_sem_release(self,x):
        #print "DataSource._handle_sem_release(%s)"%str(x)
        self.sem.release()
        from twisted.python.failure import Failure
        if x.__class__ == Failure:
            if x.value[0] == "DEBUG": return x
            log.error(x)
        return x

    def xmlrpc_method(self,idstr,values):
        "Accept callbacks from Export API"
        d = self.sem.acquire()
        d.addCallback(lambda x: self.data_cq(self.services.method,idstr,values))
        d.AddBoth(self._handle_sem_release)
        return 0

All the real work is done in the "services.method" method.  You'll
note that I use a Semaphore class (appended below).  This is keep
other operations not shown here from being executed in the middle of
handling the query.

BTW, the Semaphore and CommandQueue classes were developed with much
help from this list.  Thanks again!

-Brett.

# ------------------------------------------------

from twisted.internet import defer
from Queue import Queue, Empty
from twisted.python import failure

class Semaphore(object):
    """Asynchronous semaphore stolen from:
    http://twistedmatrix.com/pipermail/twisted-python/2003-August/005323.html
    """
    def __init__(self, value=1, verbose=None):
        self.queue = []
        self.value = value

    def acquire(self):
        d = defer.Deferred()
        if self.value:
            self.value -= 1
            d.callback(False)
        else:
            self.queue.append(d)
        return d

    def release(self):
        if self.queue:
            self.queue.pop(0).callback(True)
        else:
            self.value += 1


class CommandQueue:

    '''Queue up commands for serial calling.  One must call the
    drain() method to start reading the internal queue.  Most likely
    one wants to call this in a thread.'''

    all_queues = []

    def __init__(self):
        "Create a CommandQueue"
        self.queue = Queue()
        self.stop = False
        CommandQueue.all_queues.append(self)
        from twisted.internet import reactor
        reactor.callInThread(self.drain)
        return

    def __call__(self,meth,*a,**k):

        '''Call meth(*a,**k) when it reaches end of queue.  Returns a
        Deferred that will pass the return of meth.'''

        deferred = defer.Deferred()
        deferred.addErrback(self._error)
        self.queue.put((deferred,meth,a,k))
        return deferred

    def _error(self,a):
        try:
            a.printTraceback(sys.stderr)
        except:
            print str(a)
        return a

    def drain(self):
        'Drain the command queue until CommandQueue.stop is True'
        while not self.stop:
            try:
                d,meth,a,k = self.queue.get(True,1)
            except Empty:
                continue
            #print "calling %s(%s,%s)"%(meth.__name__,str(a),str(k))
            try:
                res = meth(*a,**k)
            except Exception,err:
                res = failure.Failure(sys.exc_value)
            reactor.callFromThread(d.callback,res)
            #d.callback(meth(*a,**k))
            #print "callback done"
        #print "drain closing"
        return 0








More information about the Twisted-Python mailing list