[Twisted-Python] A simple DeferredPool class

Terry Jones terry at jon.es
Sun Dec 6 16:14:30 MST 2009


I submitted a talk on Twisted Deferreds to the US PyCon about, but it was
unfortunately rejected. The main point I'd planned to make was how nice the
deferred mechanism is, as evidenced by the number of times I've wanted to
do things that might be complex in other scenarios but which just fall out
using deferreds. More importantly, you rarely need to use anything more
than the basic building blocks found in t.i.defer.

I like posting small self-contained examples of this sort of thing, so here
are another couple.  If you know deferreds well I guess these will seem
trivial. If not, you might find them valuable.

Here's the situation I found myself in today.  I'll use an example based on
twisted.web. Suppose you have a twisted.web service running and you want to
shut the service down. But for various reasons you don't want to interrupt
any of the work that it has in progress.

The currently outstanding work the service is doing is bound up in a set of
deferreds that have not yet fired.  E.g., a request has come in and the
server called something that returns a deferred, attached a callback to it
that will finish the request, and returned NOT_DONE_YET. Something like
this:

    def render_GET(self, request):
        d = defer.maybeDeferred(someFunc)
        d.addCallback(_finish, request)
        return server.NOT_DONE_YET

    def _finish(self, result, request):
        request.finish()
 
A the moment that you send the command to shut the service down, zero of
more instances of d (above) may exist, and these will be in various stages
of completion. Given this setup, how can you arrange to wait for all
outstanding requests (if any) before shutting the service down?

A simple solution is just to define a DeferredPool class that maintains a
pool of deferreds which can provide you with a method to (at any time)
obtain a deferred that will fire when/if the pool size next goes to zero.
Here's what I wrote (untested):

    from twisted.internet import defer

    class DeferredPool(object):
        def __init__(self, initialContents=None):
            self._pool = set()
            self._waiting = []
            if initialContents:
                for d in initialContents:
                    self.add(d)

        def _fired(self, result, d):
            self._pool.remove(d)
            if not self._pool:
                waiting, self._waiting = self._waiting, []
                for waiter in waiting:
                    waiter.callback(None)
            return result

        def add(self, d):
            d.addBoth(self._fired, d)
            self._pool.add(d)
            return d

        def deferUntilEmpty(self, testImmediately=True):
            if testImmediately and not self._pool:
                return defer.succeed(None)
            else:
                d = defer.Deferred()
                self._waiting.append(d)
                return d


In your server's startService method, you can do this:

    def startService(self):
        self.pool = DeferredPool()

and the above render_GET code gets changed to look like this:

    def render_GET(self, request):
        d = defer.maybeDeferred(someFunc)
        d.addCallback(_finish, request)
        self.pool.add(d)
        return server.NOT_DONE_YET

and in your stopService method:

    def stopService(self):
        d = self.pool.deferUntilEmpty()
        d.addCallback( whatever else you need to do )
        return d

And that's it.

As usual with these simple deferred solutions, it's simple, it's general,
and it's more widely useful than you might have initially planned.  The
DeferredPool class is a little reminiscent of DeferredList, in that the
deferreds that are submitted each get a call/errback added to themselves
that monitors the progress of the collection and which triggers the waiting
deferreds (if any) once some condition is satisfied. But it's more dynamic,
as the pool can grown and shrink while you're waiting, it can have zero or
more waiters, it's long-lived as the pool can go to down zero size and come
back up and have more waiters added to it, etc. And, as with all these nice
tricks, the existence of the pool and its operation is totally transparent
to the deferreds in use by the original code. Yes, you have to insert one
call to add a deferred to the pool, but that's it.

That's part of what I like the most about this sort of this: adding
transparent call/errbacks into the chain of existing deferreds to do
something no-one thought of initially, and knowing that unless you do
something really dumb you'll have no noticeable effect on the operation of
the original code. That's part of why I find Twisted's deferreds so
elegant.

I also wrote a somewhat more general (and slower) version of questionable
utility. It allows you to pass a function when you ask for a deferred - and
the deferred you get fires when your function returns True. Your function
is called each time a deferred in the pool fires. If you don't pass a
function, you get the above behavior (fire when the pool is empty).  I wont
post the code it here. If you want it, just ask.

Comments welcome on all this, of course.  I'm interested to hear how people
would write tests for the above.

Terry




More information about the Twisted-Python mailing list