[Twisted-Python] freeing the reactor to do other jobs

David Ripton dripton at ripton.net
Fri Nov 7 10:26:17 EST 2008

On 2008.11.07 08:29:50 -0500, Jeff Dyke wrote:
> I'm using the XMLRPC server in twisted and a few methods call other,
> sometimes long running, functions/methods.  I'm trying to get my brain
> around how to free the reactor to respond to other requests while this
> is happening.

There are two ways:

1. Instead, write functions that don't block for long, but instead do a
little bit of work, schedule a call to do the rest of the work, and then
return, so the reactor can have the CPU back.

2. Farm out big chunks of work that you can't or don't want to split up
to a subprocess or thread.

> A scenario.  A call is made to the server, which selects say 10K rows
> from a db and needs to check each row against a table and if they do
> not exist, insert them.
> """ Oversimplified version of the process """
> def getData(self,user_id):
>     rows = self.getUserData(user_id)
>     for row in rows:
>         if self.existsInQueue(row['some_id']):
>             continue
>         else:
>              self.insertQueue(row)

If the long-running work is in a blocking database call, and the
database does not support a less-blocking version and you can't change
the database, then you probably want to use deferToThread for that part.

And then move the rest of getData into a separate function, that gets
called in a callback after getUserData finishes.

def getData(self,user_id):
    deferred1 = reactor.deferToThread(self.getUserData, user_id)

If adding the rows to the queue is fast, then you're done.  Just move
everything after getUserData into _addRowsToQueue.

def _addRowsToQueue(self, rows):
    for row in rows:
        if not self.existsInQueue(row['some_id']):

But if adding all the rows to the queue in one function call is too
slow, then you need to split it up.  It's a loop, so splitting it up is
easy.  Here's the simple scheduling-only version:

def _addSomeRowsToQueue(self, rows):
    if rows:
        row = rows.pop(0)
        if not self.existsInQueue(row['some_id']):
        reactor.callLater(0, self._addSomeRowsToQueue, rows)

Adding deferreds to the mix so that a callback function is called when
all the rows are added to the queue is the next step, after you
understand how this much works.

> I want the caller to wait on a result from this process, but I also
> want the reactor to be able to handle other requests as they come in.

I hope you can live with "I want something to happen using the results
from this process", rather than "I want the caller to wait on a result
from this process."

You can simulate blocking flow somewhat with deferredGenerator or
inlineCallbacks, but I recommend sticking to the old way at first.
It's simpler and less magical.

David Ripton    dripton at ripton.net

