[Twisted-Python] inlineCallbacks cascading cancelling and more

Sergey Magafurov smagafurov at naumen.ru
Tue Aug 17 06:56:06 EDT 2010


> You are currently considering your task from the viewpoint “let's make a callback chain for the perfect workflow and alter this chain in case of anything going wrong.” I think the flaw with this approach is that you are trying to make your “ideal” flow work at all in situations where it would, in fact, fail for the most of the time.
>    

I am not sure I understand you correct...
In my case when ASR or TTS resource aquiring fails I must transfer call 
to operator but not fail.

> Instead of this, you could try to reconsider the task from the viewpoint of “Let's not add any further callbacks until this is absolutely necessary”. For me, this approach rather works. Moreover, you have the means to do it — a callback can return a Deferred, and so on.
>    

Good. But I need to start acquiring ASR and TTS resources at one time 
(in parallel) because this is very long operations and they can be good 
performed in parallel because physically this is other and different 
machines (computers). Why we can't do that way? Why user have to wait 
double time?

But it doesn't matter, because there is one more thing. Ok. Let's do it 
sequentially. But what if user hangs up call? I want to immediately stop 
all deferred processes due I don't need their results any more. All 
yields inside inlineCallbacks must raise some exception.

This code shows how we can solve this with feautures described

class DestroyingError(Exception):
     pass

class DestroySupport(object):
     __destroying_deferreds = None

     def __get_destroying_deferred(self):
         if self.__destroying_deferreds is None:
             self.__destroying_deferreds = {}
         deferred = defer.Deferred()
         self.__destroying_deferreds[deferred] = 1
         # delete "destroying" `deferred` when it finished
         deferred.addFinalizer(self.__destroying_deferreds.pop, deferred)
         return deferred

     def wait_destroying(self):
         return self.__get_destroying_deferred()

     def destroy(self):
         if self.__destroying_deferreds is not None:
             # errback all "destroying" deferreds
             for deferred in self.__destroying_deferreds.keys():
                 try:
                     raise DestroyingError('destroying')
                 except DestroyingError:
                     deferred.errback()
             self.__destroying_deferreds = None

# See InlineCallbacksManager in my first message in current discussion 
thread
class 
InlineCallbacksWithDestroySupportManager(defer.InlineCallbacksManager):
     def __init__(self, instance, *args, **kw):
         defer.InlineCallbacksManager.__init__(self, instance, *args, **kw)
         # chain Deferred with "destroying" deferred
         # therefore if destroy occurs Deferred errbacked with 
DestroyingError
         destroying_deferred = instance.wait_destroying()
         destroying_deferred.chainDeferred(self.deferred)
         # unchain if deferred finished (
         #     "destroying" deferred will be cancelled also due not used 
any more
         #     and deletes from DestroySupport.__destroying_deferreds 
due to its finalizer
         #     we add in DestroySupport.__get_destroying_deferred
         # )
         
self.deferred.addFinalizer(destroying_deferred.unchainDeferredSafe, 
self.deferred)

inlineCallbacksWithDestroySupport = defer.create_inline_callbacks_decorator(
     InlineCallbacksWithDestroySupportManager
)

class CallService(DestroySupport):
     def on_connection_lost(self):
         # launch cascade destroying when connection lost!!!
         # all yields will raise DestroyingError
         # therefore all resources will be released
         self.destroy()

     @inlineCallbacksWithDestroySupport
     def incoming_call(self, call):
         call.start_play('music')
         try:
             # tuple yield feature!
             tts, asr = yield self.acquire_tts_for(call), 
self.acquire_asr_for(call)
         except TimeoutError:
             # this occurs if TimeoutError raises in 
`self.acquire_tts_for` or `self.acquire_asr_for`
             # i.e. DeferredList(..., fireOnOneErrback=1)
             call.transfer_to('operator')
             # NOTE: at this point I want to automatically _recursively_ 
(in deep)
             # stop all deferred processes
             # starting inside `self.acquire_tts_for` and 
`self.acquire_asr_for`
             # because I don't need their results (and expensive 
resources!) any more
         except DestroyingError:
             log('Destroying detected')
             raise
         else:
             try:
                 try:
                     yield tts.speak('what you want? say me after signal')
                     yield asr.start_recognition()
                     call.start_play('signal')
                     result = yield asr.wait_recognition_result()
                     ... do something with result ...
                 except DestroyingError:
                     log('Destroying detected')
                     raise
             finally:
                 tts.release()
                 asr.release()

     @inlineCallbacksWithDestroySupport
     def acquire_tts_for(self, call):
         # may raise TimeoutError inside
         tts_connection = yield self.tts.acquire_connection(timeout=10)
         try:
             # may raise TimeoutError inside
             yield call.make_audio_link_with(tts_connection, 'in', 
timeout=10)
         except Exception:
             # we MAY receive this Exception when cascading cancellation 
occurs
             tts_connection.release()
             raise
         else:
             return tts_connection

     @inlineCallbacksWithDestroySupport
     def acquire_asr_for(self, call):
         # may raise TimeoutError inside
         asr_connection = yield self.asr.acquire_connection(timeout=10)
         try:
             # may raise TimeoutError inside
             yield call.make_audio_link_with(asr_connection, 'out', 
timeout=10)
         except Exception:
             # we MAY receive this Exception when cascading cancellation 
occurs
             asr_connection.release()
             raise
         else:
             return asr_connection

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://twistedmatrix.com/pipermail/twisted-python/attachments/20100817/dd2975b7/attachment.htm 


More information about the Twisted-Python mailing list