Ticket #411: single_while_loop.patch

File single_while_loop.patch, 22.7 KB (added by TimothyFitz, 4 years ago)

_runCallbacks implementation with only one loop.

  • twisted/test/test_defer.py

     
    11# Copyright (c) 2001-2010 Twisted Matrix Laboratories. 
    22# See LICENSE for details. 
    33 
    4  
    54""" 
    65Test cases for defer module. 
    76""" 
    87 
    9 import gc 
     8import gc, traceback 
    109 
    1110from twisted.trial import unittest 
    1211from twisted.internet import reactor, defer 
     
    239238 
    240239    def testCallbackErrors(self): 
    241240        l = [] 
    242         d = defer.Deferred().addCallback(lambda _: 1/0).addErrback(l.append) 
     241        d = defer.Deferred().addCallback(lambda _: 1 / 0).addErrback(l.append) 
    243242        d.callback(1) 
    244243        self.assert_(isinstance(l[0].value, ZeroDivisionError)) 
    245244        l = [] 
     
    265264        d2.callback(2) 
    266265        assert self.callbackResults is None, "Still should not have been called yet." 
    267266        d2.unpause() 
    268         assert self.callbackResults[0][0] == 2, "Result should have been from second deferred:%s"% (self.callbackResults,) 
     267        assert self.callbackResults[0][0] == 2, "Result should have been from second deferred:%s" % (self.callbackResults,) 
    269268 
     269 
     270    def test_chainedPausedDeferredWithResult(self): 
     271        """ 
     272        When a paused Deferred with a result is returned from a callback on 
     273        another Deferred, the other Deferred is chained to the first and waits 
     274        for it to be unpaused. 
     275        """ 
     276        expected = object() 
     277        paused = defer.Deferred() 
     278        paused.callback(expected) 
     279        paused.pause() 
     280        chained = defer.Deferred() 
     281        chained.addCallback(lambda ignored: paused) 
     282        chained.callback(None) 
     283 
     284        result = [] 
     285        chained.addCallback(result.append) 
     286        self.assertEquals(result, []) 
     287        paused.unpause() 
     288        self.assertEquals(result, [expected]) 
     289 
     290 
     291    def test_pausedDeferredChained(self): 
     292        """ 
     293        A paused Deferred encountered while pushing a result forward through a 
     294        chain does not prevent earlier Deferreds from continuing to execute 
     295        their callbacks. 
     296        """ 
     297        first = defer.Deferred() 
     298        second = defer.Deferred() 
     299        first.addCallback(lambda ignored: second) 
     300        first.callback(None) 
     301        first.pause() 
     302        second.callback(None) 
     303        result = [] 
     304        second.addCallback(result.append) 
     305        self.assertEquals(result, [None]) 
     306 
     307 
    270308    def testGatherResults(self): 
    271309        # test successful list of deferreds 
    272310        l = [] 
     
    336374        return self.assertFailure(d2, RuntimeError) 
    337375 
    338376 
     377    def test_innerCallbacksPreserved(self): 
     378        """ 
     379        When a L{Deferred} encounters a result which is another L{Deferred} 
     380        which is waiting on a third L{Deferred}, the middle L{Deferred}'s 
     381        callbacks are executed after the third L{Deferred} fires and before the 
     382        first receives a result. 
     383        """ 
     384        results = [] 
     385        failures = [] 
     386        inner = defer.Deferred() 
     387        def cb(result): 
     388            results.append(('start-of-cb', result)) 
     389            d = defer.succeed('inner') 
     390            def firstCallback(result): 
     391                results.append(('firstCallback', 'inner')) 
     392                return inner 
     393            def secondCallback(result): 
     394                results.append(('secondCallback', result)) 
     395                return result * 2 
     396            d.addCallback(firstCallback).addCallback(secondCallback) 
     397            d.addErrback(failures.append) 
     398            return d 
     399        outer = defer.succeed('outer') 
     400        outer.addCallback(cb) 
     401        inner.callback('orange') 
     402        outer.addCallback(results.append) 
     403        inner.addErrback(failures.append) 
     404        outer.addErrback(failures.append) 
     405        self.assertEqual([], failures) 
     406        self.assertEqual( 
     407            results, 
     408            [('start-of-cb', 'outer'), 
     409             ('firstCallback', 'inner'), 
     410             ('secondCallback', 'orange'), 
     411             'orangeorange']) 
     412              
     413    def test_continueCallbackNotFirst(self): 
     414        """ 
     415        The continue callback of a L{Deferred} waiting for another L{Deferred} 
     416        is not necessarily the first one. This is somewhat a whitebox test 
     417        checking that we search for that callback among the whole list of 
     418        callbacks. 
     419        """ 
     420        results = [] 
     421        failures = [] 
     422        a = defer.Deferred() 
     423 
     424        def cb(result): 
     425            results.append(('cb', result)) 
     426            d = defer.Deferred() 
     427 
     428            def firstCallback(ignored): 
     429                results.append(('firstCallback', ignored)) 
     430                return defer.gatherResults([a]) 
     431 
     432            def secondCallback(result): 
     433                results.append(('secondCallback', result)) 
     434 
     435            d.addCallback(firstCallback) 
     436            d.addCallback(secondCallback) 
     437            d.addErrback(failures.append) 
     438            d.callback(None) 
     439            return d 
     440 
     441        outer = defer.succeed('outer') 
     442        outer.addCallback(cb) 
     443        outer.addErrback(failures.append) 
     444        self.assertEquals([('cb', 'outer'), ('firstCallback', None)], results) 
     445        a.callback('withers') 
     446        self.assertEquals([], failures) 
     447        self.assertEquals( 
     448            results, 
     449            [('cb', 'outer'), 
     450             ('firstCallback', None), 
     451             ('secondCallback', ['withers'])]) 
     452 
     453 
     454    def test_callbackOrderPreserved(self): 
     455        """ 
     456        A callback added to a L{Deferred} after a previous callback attached 
     457        another L{Deferred} as a result is run after the callbacks of the other 
     458        L{Deferred} are run. 
     459        """ 
     460        results = [] 
     461        failures = [] 
     462        a = defer.Deferred() 
     463 
     464        def cb(result): 
     465            results.append(('cb', result)) 
     466            d = defer.Deferred() 
     467 
     468            def firstCallback(ignored): 
     469                results.append(('firstCallback', ignored)) 
     470                return defer.gatherResults([a]) 
     471 
     472            def secondCallback(result): 
     473                results.append(('secondCallback', result)) 
     474 
     475            d.addCallback(firstCallback) 
     476            d.addCallback(secondCallback) 
     477            d.addErrback(failures.append) 
     478            d.callback(None) 
     479            return d 
     480 
     481        outer = defer.Deferred() 
     482        outer.addCallback(cb) 
     483        outer.addCallback(lambda x: results.append('final')) 
     484        outer.addErrback(failures.append) 
     485        outer.callback('outer') 
     486        self.assertEquals([('cb', 'outer'), ('firstCallback', None)], results) 
     487        a.callback('withers') 
     488        self.assertEquals([], failures) 
     489        self.assertEquals( 
     490            results, 
     491            [('cb', 'outer'), 
     492             ('firstCallback', None), 
     493             ('secondCallback', ['withers']), 'final']) 
     494 
     495 
    339496    def test_reentrantRunCallbacks(self): 
    340497        """ 
    341498        A callback added to a L{Deferred} by a callback on that L{Deferred} 
     
    721878                hex(unsignedID(a)), hex(unsignedID(b)))) 
    722879 
    723880 
     881    def test_boundedStackDepth(self): 
     882        """ 
     883        The depth of the call stack does not grow as more L{Deferred} instances 
     884        are chained together. 
     885        """ 
     886        def chainDeferreds(howMany): 
     887            stack = [] 
     888            def recordStackDepth(ignored): 
     889                stack.append(len(traceback.extract_stack())) 
    724890 
     891            top = defer.Deferred() 
     892            innerDeferreds = [defer.Deferred() for ignored in range(howMany)] 
     893            originalInners = innerDeferreds[:] 
     894            last = defer.Deferred() 
     895 
     896            inner = innerDeferreds.pop() 
     897            top.addCallback(lambda ign, inner=inner: inner) 
     898            top.addCallback(recordStackDepth) 
     899 
     900            while innerDeferreds: 
     901                newInner = innerDeferreds.pop() 
     902                inner.addCallback(lambda ign, inner=newInner: inner) 
     903                inner = newInner 
     904            inner.addCallback(lambda ign: last) 
     905 
     906            top.callback(None) 
     907            for inner in originalInners: 
     908                inner.callback(None) 
     909 
     910            # Sanity check - the record callback is not intended to have 
     911            # fired yet. 
     912            self.assertEquals(stack, []) 
     913 
     914            # Now fire the last thing and return the stack depth at which the 
     915            # callback was invoked. 
     916            last.callback(None) 
     917            return stack[0] 
     918 
     919        # Callbacks should be invoked at the same stack depth regardless of 
     920        # how many Deferreds are chained. 
     921        self.assertEquals(chainDeferreds(1), chainDeferreds(2)) 
     922 
     923 
     924    def test_resultOfDeferredResultOfDeferredOfFiredDeferredCalled(self): 
     925        """ 
     926        Given three Deferreds, one chained to the next chained to the next, 
     927        callbacks on the middle Deferred which are added after the chain is 
     928        created are called once the last Deferred fires. 
     929 
     930        This is more of a regression-style test.  It doesn't exercise any 
     931        particular code path through the current implementation of Deferred, but 
     932        it does exercise a broken codepath through one of the variations of the 
     933        implementation proposed as a resolution to ticket #411. 
     934        """ 
     935        first = defer.Deferred() 
     936        second = defer.Deferred() 
     937        third = defer.Deferred() 
     938        first.addCallback(lambda ignored: second) 
     939        second.addCallback(lambda ignored: third) 
     940        second.callback(None) 
     941        first.callback(None) 
     942        third.callback(None) 
     943        L = [] 
     944        second.addCallback(L.append) 
     945        self.assertEquals(L, [None]) 
     946 
     947 
     948 
    725949class FirstErrorTests(unittest.TestCase): 
    726950    """ 
    727951    Tests for L{FirstError}. 
     
    12141438        """ 
    12151439        log.removeObserver(self.c.append) 
    12161440 
     1441 
     1442    def _loggedErrors(self): 
     1443        return [e for e in self.c if e["isError"]] 
     1444 
     1445 
    12171446    def _check(self): 
    12181447        """ 
    12191448        Check the output of the log observer to see if the error is present. 
    12201449        """ 
    1221         c2 = [e for e in self.c if e["isError"]] 
     1450        c2 = self._loggedErrors() 
    12221451        self.assertEquals(len(c2), 2) 
    12231452        c2[1]["failure"].trap(ZeroDivisionError) 
    12241453        self.flushLoggedErrors(ZeroDivisionError) 
     
    12291458        and its final result (the one not handled by any callback) is an 
    12301459        exception, that exception will be logged immediately. 
    12311460        """ 
    1232         defer.Deferred().addCallback(lambda x: 1/0).callback(1) 
     1461        defer.Deferred().addCallback(lambda x: 1 / 0).callback(1) 
    12331462        gc.collect() 
    12341463        self._check() 
    12351464 
     
    12391468        """ 
    12401469        def _subErrorLogWithInnerFrameRef(): 
    12411470            d = defer.Deferred() 
    1242             d.addCallback(lambda x: 1/0) 
     1471            d.addCallback(lambda x: 1 / 0) 
    12431472            d.callback(1) 
    12441473 
    12451474        _subErrorLogWithInnerFrameRef() 
     
    12521481        """ 
    12531482        def _subErrorLogWithInnerFrameCycle(): 
    12541483            d = defer.Deferred() 
    1255             d.addCallback(lambda x, d=d: 1/0) 
     1484            d.addCallback(lambda x, d=d: 1 / 0) 
    12561485            d._d = d 
    12571486            d.callback(1) 
    12581487 
     
    12611490        self._check() 
    12621491 
    12631492 
     1493    def test_chainedErrorCleanup(self): 
     1494        """ 
     1495        If one Deferred with an error result is returned from a callback on 
     1496        another Deferred, when the first Deferred is garbage collected it does 
     1497        not log its error. 
     1498        """ 
     1499        d = defer.Deferred() 
     1500        d.addCallback(lambda ign: defer.fail(RuntimeError("zoop"))) 
     1501        d.callback(None) 
     1502 
     1503        # Sanity check - this isn't too interesting, but we do want the original 
     1504        # Deferred to have gotten the failure. 
     1505        results = [] 
     1506        errors = [] 
     1507        d.addCallbacks(results.append, errors.append) 
     1508        self.assertEquals(results, []) 
     1509        self.assertEquals(len(errors), 1) 
     1510        errors[0].trap(Exception) 
     1511 
     1512        # Get rid of any references we might have to the inner Deferred (none of 
     1513        # these should really refer to it, but we're just being safe). 
     1514        del results, errors, d 
     1515        # Force a collection cycle so that there's a chance for an error to be 
     1516        # logged, if it's going to be logged. 
     1517        gc.collect() 
     1518        # And make sure it is not. 
     1519        self.assertEquals(self._loggedErrors(), []) 
     1520 
     1521 
     1522    def test_errorClearedByChaining(self): 
     1523        """ 
     1524        If a Deferred with a failure result has an errback which chains it to 
     1525        another Deferred, the initial failure is cleared by the errback so it is 
     1526        not logged. 
     1527        """ 
     1528        # Start off with a Deferred with a failure for a result 
     1529        bad = defer.fail(Exception("oh no")) 
     1530        good = defer.Deferred() 
     1531        # Give it a callback that chains it to another Deferred 
     1532        bad.addErrback(lambda ignored: good) 
     1533        # That's all, clean it up.  No Deferred here still has a failure result, 
     1534        # so nothing should be logged. 
     1535        good = bad = None 
     1536        gc.collect() 
     1537        self.assertEquals(self._loggedErrors(), []) 
     1538 
     1539 
     1540 
    12641541class DeferredTestCaseII(unittest.TestCase): 
    12651542    def setUp(self): 
    12661543        self.callbackRan = 0 
     
    15461823        d = self.lock.deferUntilLocked(timeout=5.5) 
    15471824        self.assertFailure(d, defer.TimeoutError) 
    15481825 
    1549         self.clock.pump([1]*10) 
     1826        self.clock.pump([1] * 10) 
    15501827 
    15511828        return d 
    15521829 
     
    15661843        d = self.lock.deferUntilLocked(timeout=10) 
    15671844        d.addErrback(onTimeout) 
    15681845 
    1569         self.clock.pump([1]*10) 
     1846        self.clock.pump([1] * 10) 
    15701847 
    15711848        return d 
    15721849 
  • twisted/internet/defer.py

     
    1010@var _NO_RESULT: The result used to represent the fact that there is no 
    1111    result. B{Never ever ever use this as an actual result for a Deferred}.  You 
    1212    have been warned. 
     13 
     14@var _CONTINUE: A marker left in L{Deferred.callbacks} to indicate a Deferred 
     15    chain.  Always accompanied by a Deferred instance in the args tuple pointing 
     16    at the Deferred which is chained to the Deferred which has this marker. 
    1317""" 
    1418 
    1519import traceback 
     
    169173 
    170174# See module docstring. 
    171175_NO_RESULT = object() 
     176_CONTINUE = object() 
    172177 
    173178 
    174179 
     
    425430            self.result.cancel() 
    426431 
    427432 
    428     def _continue(self, result): 
    429         self.result = result 
    430         self.unpause() 
    431  
    432  
    433433    def _startRunCallbacks(self, result): 
    434434        if self.called: 
    435435            if self._suppressAlreadyCalled: 
     
    450450        self._runCallbacks() 
    451451 
    452452 
     453    def _continuation(self): 
     454        """ 
     455        Build a tuple of callback and errback with L{_continue} to be used by 
     456        L{_addContinue} and L{_removeContinue} on another Deferred. 
     457        """ 
     458        return ((_CONTINUE, (self,), None), 
     459                (_CONTINUE, (self,), None)) 
     460 
     461 
    453462    def _runCallbacks(self): 
     463        """ 
     464        Run the chain of callbacks once a result is available. 
     465 
     466        This consists of a simple loop over all of the callbacks, calling each 
     467        with the current result and making the current result equal to the 
     468        return value (or raised exception) of that call. 
     469 
     470        If C{self._runningCallbacks} is true, this loop won't run at all, since 
     471        it is already running above us on the call stack.  If C{self.paused} is 
     472        true, the loop also won't run, because that's what it means to be 
     473        paused. 
     474 
     475        The loop will terminate before processing all of the callbacks if a 
     476        C{Deferred} without a result is encountered. 
     477 
     478        If a C{Deferred} I{with} a result is encountered, that result is taken 
     479        and the loop proceeds. 
     480 
     481        @note: The implementation is complicated slightly by the fact that 
     482            chaining (associating two Deferreds with each other such that one 
     483            will wait for the result of the other, as happens when a Deferred is 
     484            returned from a callback on another Deferred) is supported 
     485            iteratively rather than recursively, to avoid running out of stack 
     486            frames when processing long chains. 
     487        """ 
    454488        if self._runningCallbacks: 
    455489            # Don't recursively run callbacks 
    456490            return 
    457         if not self.paused: 
    458             self._chainedTo = None 
    459             while self.callbacks: 
    460                 item = self.callbacks.pop(0) 
     491 
     492        # Keep track of all the Deferreds encountered while propagating results 
     493        # up a chain.  The way a Deferred gets onto this stack is by having 
     494        # added its _continuation() to the callbacks list of a second Deferred 
     495        # and then that second Deferred being fired.  ie, if ever had _chainedTo 
     496        # set to something other than None, you might end up on this stack. 
     497        chain = [self] 
     498 
     499        while chain: 
     500            current = chain[-1] 
     501 
     502            if current.paused: 
     503                # This Deferred isn't going to produce a result at all.  All the 
     504                # Deferreds up the chain waiting on it will just have to... 
     505                # wait. 
     506                return 
     507 
     508            current._chainedTo = None 
     509            if current.callbacks: 
     510                item = current.callbacks.pop(0) 
    461511                callback, args, kw = item[ 
    462                     isinstance(self.result, failure.Failure)] 
     512                    isinstance(current.result, failure.Failure)] 
    463513                args = args or () 
    464514                kw = kw or {} 
     515 
     516                # Avoid recursion if we can. 
     517                if callback is _CONTINUE: 
     518                    # Give the waiting Deferred our current result and then 
     519                    # forget about that result ourselves. 
     520                    chainee = args[0] 
     521                    chainee.result = current.result 
     522                    current.result = None 
     523                    # Making sure to update _debugInfo 
     524                    if current._debugInfo is not None: 
     525                        current._debugInfo.failResult = None 
     526                    chainee.paused -= 1 
     527                    chain.append(chainee) 
     528                    # Delay cleaning this Deferred and popping it from the chain 
     529                    # until after we've dealt with chainee. 
     530                    continue 
     531 
    465532                try: 
    466                     self._runningCallbacks = True 
     533                    current._runningCallbacks = True 
    467534                    try: 
    468                         self.result = callback(self.result, *args, **kw) 
     535                        current.result = callback(current.result, *args, **kw) 
    469536                    finally: 
    470                         self._runningCallbacks = False 
    471                     if isinstance(self.result, Deferred): 
    472                         # note: this will cause _runCallbacks to be called 
    473                         # recursively if self.result already has a result. 
    474                         # This shouldn't cause any problems, since there is no 
    475                         # relevant state in this stack frame at this point. 
    476                         # The recursive call will continue to process 
    477                         # self.callbacks until it is empty, then return here, 
    478                         # where there is no more work to be done, so this call 
    479                         # will return as well. 
    480                         self.pause() 
    481                         self._chainedTo = self.result 
    482                         self.result.addBoth(self._continue) 
    483                         break 
     537                        current._runningCallbacks = False 
    484538                except: 
    485                     self.result = failure.Failure() 
     539                    current.result = failure.Failure() 
     540                else: 
     541                    if isinstance(current.result, Deferred): 
     542                        # The result is another Deferred.  If it has a result, 
     543                        # we can take it and keep going. 
     544                        resultResult = getattr(current.result, 'result', _NO_RESULT) 
     545                        if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused: 
     546                            # Nope, it didn't.  Pause and chain. 
     547                            current.pause() 
     548                            current._chainedTo = current.result 
     549                            # Note: current.result has no result, so it's not 
     550                            # running its callbacks right now.  Therefore we can 
     551                            # append to the callbacks list directly instead of 
     552                            # using addCallbacks. 
     553                            current.result.callbacks.append(current._continuation()) 
     554                            # Chaining a Deferred replaces any failure. 
     555                            if current._debugInfo is not None: 
     556                                current._debugInfo.failResult = None                             
     557                            break 
     558                        else: 
     559                            # Yep, it did.  Steal it. 
     560                            current.result.result = None 
     561                            # Make sure _debugInfo's failure state is updated. 
     562                            if current.result._debugInfo is not None: 
     563                                current.result._debugInfo.failResult = None 
     564                            current.result = resultResult 
     565            else: 
     566                # As much of the callback chain - perhaps all of it - as can be 
     567                # processed right now has been.  The current Deferred is waiting on 
     568                # another Deferred or for more callbacks.  Before finishing with it, 
     569                # make sure its _debugInfo is in the proper state. 
     570                if isinstance(current.result, failure.Failure): 
     571                    # Stash the Failure in the _debugInfo for unhandled error 
     572                    # reporting. 
     573                    current.result.cleanFailure() 
     574                    if current._debugInfo is None: 
     575                        current._debugInfo = DebugInfo() 
     576                    current._debugInfo.failResult = current.result 
     577                else: 
     578                    # Clear out any Failure in the _debugInfo, since the result 
     579                    # is no longer a Failure. 
     580                    if current._debugInfo is not None: 
     581                        current._debugInfo.failResult = None 
    486582 
    487         if isinstance(self.result, failure.Failure): 
    488             self.result.cleanFailure() 
    489             if self._debugInfo is None: 
    490                 self._debugInfo = DebugInfo() 
    491             self._debugInfo.failResult = self.result 
    492         else: 
    493             if self._debugInfo is not None: 
    494                 self._debugInfo.failResult = None 
     583                # This Deferred is done, pop it from the chain and move back up 
     584                # to the Deferred which supplied us with our result. 
     585                chain.pop() 
    495586 
    496587 
    497  
    498588    def __str__(self): 
    499589        """ 
    500590        Return a string representation of this C{Deferred}. 
     
    520610 
    521611    failResult = None 
    522612 
    523  
    524613    def _getDebugTracebacks(self): 
    525614        info = '' 
    526615        if hasattr(self, "creator"):