[Twisted-Python] inlineCallbacks cascading cancelling and more

Sergey Magafurov smagafurov at naumen.ru
Mon Aug 16 04:34:35 EDT 2010


At first, sorry for my bad english
I will try to explain idea about Deferreds and inlineCallbacks

Wanted features:
1. Ability to delete callbacks (delCallbacks)
2. Automatic cancelling Deferreds if they are not needed any more (no 
callbacks registered any more, all was deleted)
3. Ability to add/del hooks on deferred's finishing with errback or 
callback (addFinalizer/delFinalizer)
4. Automatic call registered finalizers when deferred finished with 
errback or callback (and cancel due to cancel calls errback)

With this features we can make cascading cancelling of inlineCallbacks 
and cancelling full stack tree of inlineCallbacks when it is not needed 
any more (some shutdown occurs for example at top level)

See full code for details (this is runnable script):

from twisted.internet import defer
from twisted.python.failure import Failure
from sys import exc_info

import warnings

class InlineCallbacksManager(object):
     def __init__(self, *args, **kw):
         self.deferred = defer.Deferred()

     def send_result(self, g, result):
         return g.send(result)

     def throw_exception(self, g, result):
         return result.throwExceptionIntoGenerator(g)

     def stop_iteration(self):
         self.deferred.callback(None)
         return self.deferred

     def return_value(self, value):
         self.deferred.callback(value)
         return self.deferred

     def exception(self):
         self.deferred.errback()
         return self.deferred

     def _inlineCallbacks(self, result, g):
         """
         See L{inlineCallbacks}.
         """
         # This function is complicated by the need to prevent unbounded 
recursion
         # arising from repeatedly yielding immediately ready 
deferreds.  This while
         # loop and the waiting variable solve that by manually 
unfolding the
         # recursion.

         waiting = [True, # waiting for result?
                    None] # result

         while 1:
             try:
                 # Send the last result back as the result of the yield 
expression.
                 isFailure = isinstance(result, Failure)
                 if isFailure:
                     result = self.throw_exception(g, result)
                 else:
                     result = self.send_result(g, result)
             except StopIteration:
                 # fell off the end, or "return" statement
                 return self.stop_iteration()
             except defer._DefGen_Return, e:
                 # returnValue() was called; time to give a result to 
the original
                 # Deferred.  First though, let's try to identify the 
potentially
                 # confusing situation which results when returnValue() is
                 # accidentally invoked from a different function, one 
that wasn't
                 # decorated with @inlineCallbacks.

                 # The traceback starts in this frame (the one for
                 # _inlineCallbacks); the next one down should be the 
application
                 # code.
                 appCodeTrace = exc_info()[2].tb_next.tb_next
                 if isFailure:
                     # If we invoked this generator frame by throwing an 
exception
                     # into it, then throwExceptionIntoGenerator will 
consume an
                     # additional stack frame itself, so we need to skip 
that too.
                     appCodeTrace = appCodeTrace.tb_next
                 # Now that we've identified the frame being exited by the
                 # exception, let's figure out if returnValue was called 
from it
                 # directly.  returnValue itself consumes a stack frame, 
so the
                 # application code will have a tb_next, but it will 
*not* have a
                 # second tb_next.
                 if appCodeTrace.tb_next.tb_next:
                     # If returnValue was invoked non-local to the frame 
which it is
                     # exiting, identify the frame that ultimately invoked
                     # returnValue so that we can warn the user, as this 
behavior is
                     # confusing.
                     ultimateTrace = appCodeTrace
                     while ultimateTrace.tb_next.tb_next:
                         ultimateTrace = ultimateTrace.tb_next
                     filename = ultimateTrace.tb_frame.f_code.co_filename
                     lineno = ultimateTrace.tb_lineno
                     warnings.warn_explicit(
                         "returnValue() in %r causing %r to exit: "
                         "returnValue should only be invoked by 
functions decorated "
                         "with inlineCallbacks" % (
                             ultimateTrace.tb_frame.f_code.co_name,
                             appCodeTrace.tb_frame.f_code.co_name),
                         DeprecationWarning, filename, lineno)
                 return self.return_value(e.value)
             except:
                 return self.exception()

             if isinstance(result, tuple): # yield tuple support!!!
                 non_deferreds_cnt = 0
                 list_of_deferreds = []
                 for r in result:
                     if not isinstance(r, defer.Deferred):
                         r = defer.succeed(r)
                         non_deferreds_cnt += 1
                     list_of_deferreds.append(r)

                 if non_deferreds_cnt != len(result):
                     result = defer.DeferredList(list_of_deferreds, 
fireOnOneErrback=1)

             if isinstance(result, defer.Deferred):
                 # a deferred was yielded, get the result.
                 if isinstance(result, defer.DeferredList): # yield 
tuple support!!!
                     def gotResult(r):
                         if isinstance(r, Failure):
                             r = r.value.subFailure
                         else:
                             r = tuple(_r for _s, _r in r)
                         if waiting[0]:
                             waiting[0] = False
                             waiting[1] = r
                         else:
                             self.deferred.delFinalizer(result.delBoth, 
gotResult) # cascading cancelling support!!!
                             self._inlineCallbacks(r, g)
                 else:
                     def gotResult(r):
                         if waiting[0]:
                             waiting[0] = False
                             waiting[1] = r
                         else:
                             self.deferred.delFinalizer(result.delBoth, 
gotResult) # cascading cancelling support!!!
                             self._inlineCallbacks(r, g)

                 result.addBoth(gotResult)
                 if waiting[0]:
                     # Haven't called back yet, set flag so that we get 
reinvoked
                     # and return from the loop
                     waiting[0] = False
                     self.deferred.addFinalizer(result.delBoth, 
gotResult) # cascading cancelling support!!!
                     return self.deferred

                 result = waiting[1]
                 # Reset waiting to initial values for next loop.  
gotResult uses
                 # waiting, but this isn't a problem because gotResult 
is only
                 # executed once, and if it hasn't been executed yet, 
the return
                 # branch above would have been taken.

                 waiting[0] = True
                 waiting[1] = None

from twisted.python.util import mergeFunctionMetadata

def create_inline_callbacks_decorator(manager_factory):
     def inline_callbacks(f):
         def unwind_generator(*args, **kwargs):
             manager = manager_factory(*args, **kwargs)
             return manager._inlineCallbacks(None, f(*args, **kwargs))
         return mergeFunctionMetadata(f, unwind_generator)
     return inline_callbacks

inlineCallbacks = create_inline_callbacks_decorator(InlineCallbacksManager)

defer.inlineCallbacks = inlineCallbacks

# # # #
# Deferred
#

# Fixes:
# 1. raise CancelledError with current traceback (original twisted code 
raises with empty traceback)
# 2. ability to cancal with given traceback (parameter `failure`, if 
callable must return failure)
def deferred_cancel(self, failure=None):
     if not self.called:
         canceller = self._canceller
         if canceller:
             canceller(self)
         else:
             # Arrange to eat the callback that will eventually be fired
             # since there was no real canceller.
             self._suppressAlreadyCalled = 1
         if not self.called:
             # There was no canceller, or the canceller didn't call
             # callback or errback.
             if failure is not None:
                 if callable(failure):
                     failure = failure()
                 self.errback(failure)
             else:
                 try:
                     raise defer.CancelledError()
                 except defer.CancelledError:
                     self.errback(Failure())

     elif isinstance(self.result, defer.Deferred):
         # Waiting for another deferred -- cancel it instead.
         self.result.cancel()
defer.Deferred.cancel = deferred_cancel

# Fixes: add `finalizers` member
original_deferred___init__ = defer.Deferred.__init__
def deferred___init__(self, *args, **kw):
     original_deferred___init__(self, *args, **kw)
     self.finalizers = []
defer.Deferred.__init__ = deferred___init__

def deferred_addFinalizer(self, callback, *args, **kw):
     assert callable(callback)
     self.finalizers.append((callback, args, kw))
     if self.called:
         self._runFinalizers()
     return self
defer.Deferred.addFinalizer = deferred_addFinalizer

def deferred_delFinalizer(self, callback, *args, **kw):
     if self.called:
         defer.AlreadyCalledError
     assert callable(callback)
     self.finalizers.remove((callback, args, kw))
     return self
defer.Deferred.delFinalizer = deferred_delFinalizer

def deferred__runFinalizers(self):
     if not self._finalized:
         self._finalized = True
         for callback, args, kw in self.finalizers:
             callback(*args, **kw)
defer.Deferred._runFinalizers = deferred__runFinalizers
defer.Deferred._finalized = False

# Fixes: run `finalizers` when done
original_deferred_callback = defer.Deferred.callback
def deferred_callback(self, result):
     original_deferred_callback(self, result)
     self._runFinalizers()
defer.Deferred.callback = deferred_callback

# Fixes: run `finalizers` when done
original_deferred_errback = defer.Deferred.errback
def deferred_errback(self, fail=None):
     original_deferred_errback(self, fail=fail)
     self._runFinalizers()
defer.Deferred.errback = deferred_errback

def _skip_result(result):
     pass

def deferred_delCallbacks(self, callback, errback=None,
                  callbackArgs=None, callbackKeywords=None,
                  errbackArgs=None, errbackKeywords=None):
     if self.called:
         defer.AlreadyCalledError

     assert callable(callback)
     assert errback == None or callable(errback)
     cbs = ((callback, callbackArgs, callbackKeywords),
            (errback or (defer.passthru), errbackArgs, errbackKeywords))
     self.callbacks.remove(cbs)

     if not self.callbacks:
         self.addBoth(_skip_result)
         self.cancel()

     return self
defer.Deferred.delCallbacks = deferred_delCallbacks

def deferred_delCallback(self, callback, *args, **kw):
     return self.delCallbacks(callback, callbackArgs=args,
                              callbackKeywords=kw)
defer.Deferred.delCallback = deferred_delCallback

def deferred_delErrback(self, errback, *args, **kw):
     return self.delCallbacks(defer.passthru, errback,
                              errbackArgs=args,
                              errbackKeywords=kw)
defer.Deferred.delErrback = deferred_delErrback

def deferred_delBoth(self, callback, *args, **kw):
     return self.delCallbacks(callback, callback,
                              callbackArgs=args, errbackArgs=args,
                              callbackKeywords=kw, errbackKeywords=kw)
defer.Deferred.delBoth = deferred_delBoth

def deferred_unchainDeferred(self, d):
     return self.delCallbacks(d.callback, d.errback)
defer.Deferred.unchainDeferred = deferred_unchainDeferred

def deferred_delCallbacksSafe(self, *args, **kw):
     if not self.called:
         return self.delCallbacks(*args, **kw)
     return self
defer.Deferred.delCallbacksSafe = deferred_delCallbacksSafe

def deferred_delCallbackSafe(self, *args, **kw):
     if not self.called:
         return self.delCallback(*args, **kw)
     return self
defer.Deferred.delCallbackSafe = deferred_delCallbackSafe

def deferred_delErrbackSafe(self, *args, **kw):
     if not self.called:
         return self.delErrback(*args, **kw)
     return self
defer.Deferred.delErrbackSafe = deferred_delErrbackSafe

def deferred_delBothSafe(self, *args, **kw):
     if not self.called:
         return self.delBoth(*args, **kw)
     return self
defer.Deferred.delBothSafe = deferred_delBothSafe

def deferred_unchainDeferredSafe(self, *args, **kw):
     if not self.called:
         return self.unchainDeferred(*args, **kw)
     return self
defer.Deferred.unchainDeferredSafe = deferred_unchainDeferredSafe

# # # #
# DeferredList
#

# fixes: delCallbacks when finished
original_deferred_list___init__ = defer.DeferredList.__init__
def deferred_list___init__(self, deferredList, *args, **kw):
     original_deferred_list___init__(self, deferredList, *args, **kw)
     index = 0
     for deferred in deferredList:
         self.addFinalizer(
             deferred.delCallbacksSafe,
             self._cbDeferred,
             self._cbDeferred,
             callbackArgs=(index,defer.SUCCESS),
             errbackArgs=(index,defer.FAILURE),
         )
         index = index + 1
defer.DeferredList.__init__ = deferred_list___init__

# # # #
# Test
#

if __name__ == '__main__':
     from twisted.internet import reactor
     import threading
     import time

     def log(s):
         print time.strftime('%H:%M:%S'), s

     # some long deferred work
     def deferred_work():
         cancel_flag = [False]
         def my_canceller(d):
             cancel_flag[0] = True
         res = defer.Deferred(canceller=my_canceller)
         def work():
             log('start work')
             cnt = 7
             while cnt:
                 cnt -= 1
                 time.sleep(1)
                 if cancel_flag[0]:
                     log('work cancelled')
                     return
             log('finish work, work not cancelled!!!')
             return res.callback(7)
         threading.Thread(target=work).start()
         return res

     # # # #
     # Shuttingdown test
     # notes: register all inlineCallbacks deferreds and cancel them all 
when shuttingdown occurs
     #

     class ShuttingdownError(BaseException):
         pass

     # base class for shuttingdown support
     class ShuttingdownSupport(object):
         shuttingdown = 0

         def __init__(self):
             self._deferreds = {}

         def shutdown(self):
             self.shuttingdown = 1
             self.do_shutdown()

         def do_shutdown(self):
             log('SHUTDOWN')
             for deferred in self._deferreds.keys():
                 try:
                     raise ShuttingdownError
                 except ShuttingdownError:
                     deferred.errback()

         def _reg_deferred(self, deferred):
             self._deferreds[deferred] = 1

         def _unreg_deferred(self, deferred):
             del self._deferreds[deferred]

     # special inlineCallbacks manager for shuttingdown support
     class 
InlineCallbacksManagerWithShuttingdownSupport(InlineCallbacksManager):
         def __init__(self, instance, *args, **kw):
             self.deferred = defer.Deferred()
             self.instance = instance
             instance._reg_deferred(self.deferred)
             self.deferred.addFinalizer(instance._unreg_deferred, 
self.deferred)

         def send_result(self, g, result):
             if self.instance.shuttingdown: # if shuttingdown occurs 
while waiting result then raise ShuttingdownError into generator
                 try:
                     raise ShuttingdownError
                 except ShuttingdownError:
                     failure = Failure()
                     self.throw_exception(g, failure)
             else:
                 return InlineCallbacksManager.send_result(self, g, result)

     # decorator for methods of ShuttingdownSupport objects
     inlineCallbacksShuttingdown = 
create_inline_callbacks_decorator(InlineCallbacksManagerWithShuttingdownSupport)

     class ShuttingdownTest(ShuttingdownSupport):
         @inlineCallbacksShuttingdown
         def test1(self):
             log('start ShuttingdownTest.test1')
             try:
                 res = yield deferred_work()
             except defer.CancelledError:
                 log('ShuttingdownTest.test1 cancelled')
                 raise
             except ShuttingdownError:
                 log('ShuttingdownTest.test1 shuttingdown detected')
                 raise
             log('finish ShuttingdownTest.test1')
             defer.returnValue(res)

         @inlineCallbacksShuttingdown
         def test2(self):
             log('start ShuttingdownTest.test2')
             try:
                 res = yield self.test1(), self.test1() # test yield 
tuple!!!
             except defer.CancelledError:
                 log('ShuttingdownTest.test2 cancelled')
                 raise
             except ShuttingdownError:
                 log('ShuttingdownTest.test2 shuttingdown detected')
                 raise
             log('finish ShuttingdownTest.test2')
             defer.returnValue(res)

     # @defer.inlineCallbacks
     # def test1():
         # log('start test1')
         # try:
             # res = yield deferred_work()
         # except defer.CancelledError:
             # log('test1 cancelled')
             # raise
         # log('finish test1, test1 not cancelled!!!')
         # defer.returnValue(res)

     # @defer.inlineCallbacks
     # def test2():
         # log('start test2')
         # try:
             # res = yield test1()
         # except defer.CancelledError:
             # log('test2 cancelled')
             # raise
         # log('finish test2, test2 not cancelled!!!')
         # defer.returnValue(res)

     def ok(result):
         log('ok ' + repr(result))

     def error(result):
         log('error ' + repr(result))

     # def run_test1():
         # deferred = test2()
         # deferred.addCallbacks(ok, error)
         # reactor.callLater(1, deferred.cancel)

     # reactor.callLater(1, run_test1)
     # run_test1()

     def run_test2():
         obj = ShuttingdownTest()
         deferred = obj.test2()
         deferred.addCallbacks(ok, error)
         reactor.callLater(1, obj.shutdown)

     # reactor.callLater(3, run_test2)
     run_test2()

     reactor.callLater(10, reactor.stop)
     reactor.run()




More information about the Twisted-Python mailing list