[Twisted-Python] Retrying function calls

Terry Jones terry at jon.es
Sun Nov 1 11:53:31 EST 2009


These days I often find myself writing code to talk to services that are
periodically briefly unavailable. An error of some kind occurs and the
correct (and documented) action to take is just to retry the original call.
Examples include using Amazon's S3 service and the Twitter API. In both of
these services, transient failures happen fairly frequently.

So I wrote the class below to retry calls, and tried to make it fairly
general. I'd be happy to hear comments on it, because it's pretty simple
and if it can be made bullet proof I imagine others will use it too.

First off, here's the class that handles the calling:

from twisted.internet import reactor, defer, task
from twisted.python import log, failure


class RetryingCall(object):
    """Calls a function repeatedly, passing it args and kw args. Failures
    are passed to a user-supplied failure testing function. If the failure
    is ignored, the function is called again after a delay whose duration
    is obtained from a user-supplied iterator. The start method (below)
    returns a deferred that fires with the eventual non-error result of
    calling the supplied function, or fires its errback if no successful
    result can be obtained before the delay backoff iterator raises
    StopIteration.
    """
    def __init__(self, f, *args, **kw):
        self._f = f
        self._args = args
        self._kw = kw
        
    def _err(self, fail):
        try:
            fail = self._failureTester(fail)
        except:
            self._deferred.errback()
        else:
            if isinstance(fail, failure.Failure):
                self._deferred.errback(fail)
            else:
                log.msg('RetryingCall: Ignoring %r' % (fail,))
                self._call()

    def _call(self):
        try:
            delay = self._backoffIterator.next()
        except StopIteration:
            log.msg('StopIteration in RetryingCall: ran out of attempts.')
            self._deferred.errback()
        else:
            d = task.deferLater(reactor, delay,
                                self._f, *self._args, **self._kw)
            d.addCallbacks(self._deferred.callback, self._err)

    def start(self, backoffIterator=None, failureTester=None):
        self._backoffIterator = backoffIterator or simpleBackoffIterator()
        self._failureTester = failureTester or (lambda f: f)
        self._deferred = defer.Deferred()
        self._call()
        return self._deferred


You call the constructor with function and its args. When you call start()
you get back a deferred that eventually fires with the result of the call,
or an error. BTW, I called it "start" to mirror task.LoopingCall.

There's a helper function for producing successive inter-call delays:

from operator import mul
from functools import partial

def simpleBackoffIterator(maxResults=10, maxDelay=5.0, now=True,
                          initDelay=0.01, incFunc=None):
    assert maxResults > 0
    remaining = maxResults
    delay = initDelay
    incFunc = incFunc or partial(mul, 2.0)
    
    if now:
        yield 0.0
        remaining -= 1
    while True:
        if remaining == 0:
            raise StopIteration
        yield (delay if delay < maxDelay else maxDelay)
        delay = incFunc(delay)
        remaining -= 1

By default this will generate the sequence of inter-call delays 0.0, 0.01,
0.02, 0.04, 0.08, 0.16, 0.32, 0.64, 1.28, 2.56 and it should be easy to see
how you could write your own. Or you can just supply a list, etc.  When the
backoff iterator finishes, the RetryingCall class gives up on trying to get
a non-error result from the function. In that case you just get a
StopIteration exception in the failure that start() deferred returns (I was
originally returning the original failure, but decided to simplify. If you
want that, you can keep it yourself in an error tracking class, see below).

You get to specify a function for testing failures. If it ever raises or
returns a failure, the start() deferred's errback is called. The failure
tester can just ignore whatever failures should be considered transient.

So, for example, if you were calling S3 and wanted to ignore 504 errors,
you could supply a failureTester arg like this:

    from twisted.web import error, http

    def test(self, failure):
        failure.trap(error.Error)
        if int(failure.value.status) != http.GATEWAY_TIMEOUT:
            return failure

As another example, while using the Twitter API you might want to allow a
range of HTTP errors and also exactly one 404 error, seeing as a 404
*might* be an error on the part of Twitter (I don't mean to suggest that
actually happens). It's probably definitive - but, why not try it once
again just to be more sure?  So, pass RetryingCall a failureTester that's
an instance of a class like this:

class TwitterFailureTester(object):
    okErrs = (http.INTERNAL_SERVER_ERROR,
              http.BAD_GATEWAY,
              http.SERVICE_UNAVAILABLE)

    def __init__(self):
        self.seen404 = False
        
    def __call__(self, failure):
        failure.trap(error.Error)
        status = int(failure.value.status)
        if status == http.NOT_FOUND:
            if self.seen404:
                return failure
            else:
                self.seen404 = True
        elif status not in self.okErrs:
            return failure

Changing existing code to use RetryingCall is pretty trivial. Take something like this

from twisted.web import client

def getUserByScreenname(screenname):
    d = client.getPage(
        'http://twitter.com/users/show.json?screen_name=glyf')
    return d

and change it to look like this:

def getUserByScreenname(screenname):
    r = RetryingCall(client.getPage,
        'http://twitter.com/users/show.json?screen_name=glyf')
    d = r.start(failureTester=TwitterFailureTester())
    return d


I wrote the above last night, so I don't know if it's fully robust. But I
dropped it into some of my own stuff last night and it seems to work. I
also have a small test suite in case anyone wants it.

My questions are: Is this robust? Can/should it be improved?  Any criticism
of the code (especially wrt Twisted best practices) would be very welcome.

Thanks!

Terry



More information about the Twisted-Python mailing list