[Twisted-Python] newbie: scheduling/queuing tasks with xmlrpc

Dustin Lee qhfgva at gmail.com
Thu Mar 17 12:29:09 EST 2005


So I've got a simple prototype of what I'm trying to achieve with my
"queued tasks" xmlrpc server.

I'd appreciate any comments, suggestions.  So far it seems to work
exactly I wanted but I have a feeling I'm abusing the intended use of
deferreds.  And that by polling I'm deviating from the twisted
workflow model.  But you have to start somewhere.

Here is the code and a test client.

xmlrpc_queued_tasks_server.py==============================
#!/usr/bin/env python

# Prototype for a xmlrpc server that
# - queues up potentially long, resource intensive tasks
# - run tasks in order received and only one at a time
# - keeping track of overdue tasks
# - doing no work for a task until it is its turn

# python modules
import time
import thread
import xmlrpclib

# twisted modules
from twisted.web import xmlrpc, server
from twisted.internet import defer

# GLOBALS
XMLRPC_PORT = 7080 
    
# ----------------------------------------------------------------------
def task_monitor(task_list):
    task_run_lengths = {}
    
    while 1:
        print '-' * 70
        print 'tasks in queue:', len(task_list)
        if len(task_list) == 0:
            print 'Nothing to do...'
            pass
        else:
            d,func,args = task_list[0]
            if d.paused == 1:
                print 'unpausing:', d 
                d.unpause() 
                print 'paused =',d.paused 
                def curried_func(*_args):
                    d.callback(func(*_args))
                thread.start_new(curried_func, args)
                task_run_lengths[d] = time.time()
            elif d.paused == 0 and d.called == 1:
                if len(d.callbacks) == 0:
                    print 'removing finished task from queue:', d
                    task_list.pop(0)
                    del task_run_lengths[d]
            else:
                #print task_run_lengths[d], time.time()
                if (time.time() - task_run_lengths[d]) > 5:
                    print 'WARNING: long running process!:', d,
int(time.time() - task_run_lengths[d])

        time.sleep(1)
# ----------------------------------------------------------------------
class QueuedXMLRPCServer(xmlrpc.XMLRPC):

    job_queue = []

    def __init__(self):
        thread.start_new(task_monitor, (self.job_queue,))
        xmlrpc.XMLRPC.__init__(self)

    def render(self,request):

        # MOSTLY COPIED FROM PARENT CLASS
        request.content.seek(0, 0)
        args, functionPath = xmlrpclib.loads(request.content.read())
        try:
            function = self._getFunction(functionPath)
        except xmlrpc.Fault, f:
            self._cbRender(f, request)
        else:
            request.setHeader("content-type", "text/xml")
            d = defer.Deferred()
            d.addErrback(
                self._ebRender
            ).addCallback(
                self._cbRender, request
            )
            d.pause()
            self.job_queue.append((d,function,args))
        return server.NOT_DONE_YET

    def xmlrpc_echo(self, x):
        """ Echo back to client."""
        if x.endswith('3'):
            # cause one task to trip the warning
            time.sleep(15)
        else:
            time.sleep(3)
        return x
    
# ----------------------------------------------------------------------
if __name__ == '__main__':

    from twisted.internet import reactor
    r = QueuedXMLRPCServer()

    # create a "listner" (TCP or SSL)
    reactor.listenTCP(XMLRPC_PORT, server.Site(r))
    reactor.run()


test_xmlrpc_queue.py ===================================

#! /usr/bin/env python

import thread
import xmlrpclib
import time

def echo_caller(x):
    print 'call made by:', x
    s = xmlrpclib.Server('http://my.server.xyz:7080/')
    print s.echo('call returned from %d' % x)

for x in range(10):
    time.sleep(1)
    thread.start_new_thread(echo_caller, (x,))
raw_input('> hit enter to end')




On Tue, 8 Mar 2005 16:59:11 +1100, Andrew Bennetts
<andrew-twisted at puzzling.org> wrote:
> On Mon, Mar 07, 2005 at 10:21:14PM -0700, Dustin Lee wrote:
> [...]
> >
> > Basically the idea is that the server will receive a variety of
> > requests to perform tasks.  Some of these will be very quick to do and
> > some will take several minutes (perhaps up to an hour).  Some tasks
> > must run alone and some can run in parallel with others.  Even just a
> > simple FIFO type queuing would be fine to start.
> >
> > Any pointers on how to proceed?
> 
> The DeferredQueue class in SVN might be a good starting point:
> 
>     http://svn.twistedmatrix.com/cvs/trunk/twisted/internet/defer.py?view=auto&rev=12970&root=Twisted
> 
> -Andrew.
> 
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
> 


-- 
Dustin Lee
qhfgva=rot13(dustin)




More information about the Twisted-Python mailing list