Ticket #411: single_while_loop.patch

File single_while_loop.patch, 22.7 KB (added by TimothyFitz, 6 years ago)

_runCallbacks implementation with only one loop.

  • twisted/test/test_defer.py

     
    11# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
    22# See LICENSE for details.
    33
    4 
    54"""
    65Test cases for defer module.
    76"""
    87
    9 import gc
     8import gc, traceback
    109
    1110from twisted.trial import unittest
    1211from twisted.internet import reactor, defer
     
    239238
    240239    def testCallbackErrors(self):
    241240        l = []
    242         d = defer.Deferred().addCallback(lambda _: 1/0).addErrback(l.append)
     241        d = defer.Deferred().addCallback(lambda _: 1 / 0).addErrback(l.append)
    243242        d.callback(1)
    244243        self.assert_(isinstance(l[0].value, ZeroDivisionError))
    245244        l = []
     
    265264        d2.callback(2)
    266265        assert self.callbackResults is None, "Still should not have been called yet."
    267266        d2.unpause()
    268         assert self.callbackResults[0][0] == 2, "Result should have been from second deferred:%s"% (self.callbackResults,)
     267        assert self.callbackResults[0][0] == 2, "Result should have been from second deferred:%s" % (self.callbackResults,)
    269268
     269
     270    def test_chainedPausedDeferredWithResult(self):
     271        """
     272        When a paused Deferred with a result is returned from a callback on
     273        another Deferred, the other Deferred is chained to the first and waits
     274        for it to be unpaused.
     275        """
     276        expected = object()
     277        paused = defer.Deferred()
     278        paused.callback(expected)
     279        paused.pause()
     280        chained = defer.Deferred()
     281        chained.addCallback(lambda ignored: paused)
     282        chained.callback(None)
     283
     284        result = []
     285        chained.addCallback(result.append)
     286        self.assertEquals(result, [])
     287        paused.unpause()
     288        self.assertEquals(result, [expected])
     289
     290
     291    def test_pausedDeferredChained(self):
     292        """
     293        A paused Deferred encountered while pushing a result forward through a
     294        chain does not prevent earlier Deferreds from continuing to execute
     295        their callbacks.
     296        """
     297        first = defer.Deferred()
     298        second = defer.Deferred()
     299        first.addCallback(lambda ignored: second)
     300        first.callback(None)
     301        first.pause()
     302        second.callback(None)
     303        result = []
     304        second.addCallback(result.append)
     305        self.assertEquals(result, [None])
     306
     307
    270308    def testGatherResults(self):
    271309        # test successful list of deferreds
    272310        l = []
     
    336374        return self.assertFailure(d2, RuntimeError)
    337375
    338376
     377    def test_innerCallbacksPreserved(self):
     378        """
     379        When a L{Deferred} encounters a result which is another L{Deferred}
     380        which is waiting on a third L{Deferred}, the middle L{Deferred}'s
     381        callbacks are executed after the third L{Deferred} fires and before the
     382        first receives a result.
     383        """
     384        results = []
     385        failures = []
     386        inner = defer.Deferred()
     387        def cb(result):
     388            results.append(('start-of-cb', result))
     389            d = defer.succeed('inner')
     390            def firstCallback(result):
     391                results.append(('firstCallback', 'inner'))
     392                return inner
     393            def secondCallback(result):
     394                results.append(('secondCallback', result))
     395                return result * 2
     396            d.addCallback(firstCallback).addCallback(secondCallback)
     397            d.addErrback(failures.append)
     398            return d
     399        outer = defer.succeed('outer')
     400        outer.addCallback(cb)
     401        inner.callback('orange')
     402        outer.addCallback(results.append)
     403        inner.addErrback(failures.append)
     404        outer.addErrback(failures.append)
     405        self.assertEqual([], failures)
     406        self.assertEqual(
     407            results,
     408            [('start-of-cb', 'outer'),
     409             ('firstCallback', 'inner'),
     410             ('secondCallback', 'orange'),
     411             'orangeorange'])
     412             
     413    def test_continueCallbackNotFirst(self):
     414        """
     415        The continue callback of a L{Deferred} waiting for another L{Deferred}
     416        is not necessarily the first one. This is somewhat a whitebox test
     417        checking that we search for that callback among the whole list of
     418        callbacks.
     419        """
     420        results = []
     421        failures = []
     422        a = defer.Deferred()
     423
     424        def cb(result):
     425            results.append(('cb', result))
     426            d = defer.Deferred()
     427
     428            def firstCallback(ignored):
     429                results.append(('firstCallback', ignored))
     430                return defer.gatherResults([a])
     431
     432            def secondCallback(result):
     433                results.append(('secondCallback', result))
     434
     435            d.addCallback(firstCallback)
     436            d.addCallback(secondCallback)
     437            d.addErrback(failures.append)
     438            d.callback(None)
     439            return d
     440
     441        outer = defer.succeed('outer')
     442        outer.addCallback(cb)
     443        outer.addErrback(failures.append)
     444        self.assertEquals([('cb', 'outer'), ('firstCallback', None)], results)
     445        a.callback('withers')
     446        self.assertEquals([], failures)
     447        self.assertEquals(
     448            results,
     449            [('cb', 'outer'),
     450             ('firstCallback', None),
     451             ('secondCallback', ['withers'])])
     452
     453
     454    def test_callbackOrderPreserved(self):
     455        """
     456        A callback added to a L{Deferred} after a previous callback attached
     457        another L{Deferred} as a result is run after the callbacks of the other
     458        L{Deferred} are run.
     459        """
     460        results = []
     461        failures = []
     462        a = defer.Deferred()
     463
     464        def cb(result):
     465            results.append(('cb', result))
     466            d = defer.Deferred()
     467
     468            def firstCallback(ignored):
     469                results.append(('firstCallback', ignored))
     470                return defer.gatherResults([a])
     471
     472            def secondCallback(result):
     473                results.append(('secondCallback', result))
     474
     475            d.addCallback(firstCallback)
     476            d.addCallback(secondCallback)
     477            d.addErrback(failures.append)
     478            d.callback(None)
     479            return d
     480
     481        outer = defer.Deferred()
     482        outer.addCallback(cb)
     483        outer.addCallback(lambda x: results.append('final'))
     484        outer.addErrback(failures.append)
     485        outer.callback('outer')
     486        self.assertEquals([('cb', 'outer'), ('firstCallback', None)], results)
     487        a.callback('withers')
     488        self.assertEquals([], failures)
     489        self.assertEquals(
     490            results,
     491            [('cb', 'outer'),
     492             ('firstCallback', None),
     493             ('secondCallback', ['withers']), 'final'])
     494
     495
    339496    def test_reentrantRunCallbacks(self):
    340497        """
    341498        A callback added to a L{Deferred} by a callback on that L{Deferred}
     
    721878                hex(unsignedID(a)), hex(unsignedID(b))))
    722879
    723880
     881    def test_boundedStackDepth(self):
     882        """
     883        The depth of the call stack does not grow as more L{Deferred} instances
     884        are chained together.
     885        """
     886        def chainDeferreds(howMany):
     887            stack = []
     888            def recordStackDepth(ignored):
     889                stack.append(len(traceback.extract_stack()))
    724890
     891            top = defer.Deferred()
     892            innerDeferreds = [defer.Deferred() for ignored in range(howMany)]
     893            originalInners = innerDeferreds[:]
     894            last = defer.Deferred()
     895
     896            inner = innerDeferreds.pop()
     897            top.addCallback(lambda ign, inner=inner: inner)
     898            top.addCallback(recordStackDepth)
     899
     900            while innerDeferreds:
     901                newInner = innerDeferreds.pop()
     902                inner.addCallback(lambda ign, inner=newInner: inner)
     903                inner = newInner
     904            inner.addCallback(lambda ign: last)
     905
     906            top.callback(None)
     907            for inner in originalInners:
     908                inner.callback(None)
     909
     910            # Sanity check - the record callback is not intended to have
     911            # fired yet.
     912            self.assertEquals(stack, [])
     913
     914            # Now fire the last thing and return the stack depth at which the
     915            # callback was invoked.
     916            last.callback(None)
     917            return stack[0]
     918
     919        # Callbacks should be invoked at the same stack depth regardless of
     920        # how many Deferreds are chained.
     921        self.assertEquals(chainDeferreds(1), chainDeferreds(2))
     922
     923
     924    def test_resultOfDeferredResultOfDeferredOfFiredDeferredCalled(self):
     925        """
     926        Given three Deferreds, one chained to the next chained to the next,
     927        callbacks on the middle Deferred which are added after the chain is
     928        created are called once the last Deferred fires.
     929
     930        This is more of a regression-style test.  It doesn't exercise any
     931        particular code path through the current implementation of Deferred, but
     932        it does exercise a broken codepath through one of the variations of the
     933        implementation proposed as a resolution to ticket #411.
     934        """
     935        first = defer.Deferred()
     936        second = defer.Deferred()
     937        third = defer.Deferred()
     938        first.addCallback(lambda ignored: second)
     939        second.addCallback(lambda ignored: third)
     940        second.callback(None)
     941        first.callback(None)
     942        third.callback(None)
     943        L = []
     944        second.addCallback(L.append)
     945        self.assertEquals(L, [None])
     946
     947
     948
    725949class FirstErrorTests(unittest.TestCase):
    726950    """
    727951    Tests for L{FirstError}.
     
    12141438        """
    12151439        log.removeObserver(self.c.append)
    12161440
     1441
     1442    def _loggedErrors(self):
     1443        return [e for e in self.c if e["isError"]]
     1444
     1445
    12171446    def _check(self):
    12181447        """
    12191448        Check the output of the log observer to see if the error is present.
    12201449        """
    1221         c2 = [e for e in self.c if e["isError"]]
     1450        c2 = self._loggedErrors()
    12221451        self.assertEquals(len(c2), 2)
    12231452        c2[1]["failure"].trap(ZeroDivisionError)
    12241453        self.flushLoggedErrors(ZeroDivisionError)
     
    12291458        and its final result (the one not handled by any callback) is an
    12301459        exception, that exception will be logged immediately.
    12311460        """
    1232         defer.Deferred().addCallback(lambda x: 1/0).callback(1)
     1461        defer.Deferred().addCallback(lambda x: 1 / 0).callback(1)
    12331462        gc.collect()
    12341463        self._check()
    12351464
     
    12391468        """
    12401469        def _subErrorLogWithInnerFrameRef():
    12411470            d = defer.Deferred()
    1242             d.addCallback(lambda x: 1/0)
     1471            d.addCallback(lambda x: 1 / 0)
    12431472            d.callback(1)
    12441473
    12451474        _subErrorLogWithInnerFrameRef()
     
    12521481        """
    12531482        def _subErrorLogWithInnerFrameCycle():
    12541483            d = defer.Deferred()
    1255             d.addCallback(lambda x, d=d: 1/0)
     1484            d.addCallback(lambda x, d=d: 1 / 0)
    12561485            d._d = d
    12571486            d.callback(1)
    12581487
     
    12611490        self._check()
    12621491
    12631492
     1493    def test_chainedErrorCleanup(self):
     1494        """
     1495        If one Deferred with an error result is returned from a callback on
     1496        another Deferred, when the first Deferred is garbage collected it does
     1497        not log its error.
     1498        """
     1499        d = defer.Deferred()
     1500        d.addCallback(lambda ign: defer.fail(RuntimeError("zoop")))
     1501        d.callback(None)
     1502
     1503        # Sanity check - this isn't too interesting, but we do want the original
     1504        # Deferred to have gotten the failure.
     1505        results = []
     1506        errors = []
     1507        d.addCallbacks(results.append, errors.append)
     1508        self.assertEquals(results, [])
     1509        self.assertEquals(len(errors), 1)
     1510        errors[0].trap(Exception)
     1511
     1512        # Get rid of any references we might have to the inner Deferred (none of
     1513        # these should really refer to it, but we're just being safe).
     1514        del results, errors, d
     1515        # Force a collection cycle so that there's a chance for an error to be
     1516        # logged, if it's going to be logged.
     1517        gc.collect()
     1518        # And make sure it is not.
     1519        self.assertEquals(self._loggedErrors(), [])
     1520
     1521
     1522    def test_errorClearedByChaining(self):
     1523        """
     1524        If a Deferred with a failure result has an errback which chains it to
     1525        another Deferred, the initial failure is cleared by the errback so it is
     1526        not logged.
     1527        """
     1528        # Start off with a Deferred with a failure for a result
     1529        bad = defer.fail(Exception("oh no"))
     1530        good = defer.Deferred()
     1531        # Give it a callback that chains it to another Deferred
     1532        bad.addErrback(lambda ignored: good)
     1533        # That's all, clean it up.  No Deferred here still has a failure result,
     1534        # so nothing should be logged.
     1535        good = bad = None
     1536        gc.collect()
     1537        self.assertEquals(self._loggedErrors(), [])
     1538
     1539
     1540
    12641541class DeferredTestCaseII(unittest.TestCase):
    12651542    def setUp(self):
    12661543        self.callbackRan = 0
     
    15461823        d = self.lock.deferUntilLocked(timeout=5.5)
    15471824        self.assertFailure(d, defer.TimeoutError)
    15481825
    1549         self.clock.pump([1]*10)
     1826        self.clock.pump([1] * 10)
    15501827
    15511828        return d
    15521829
     
    15661843        d = self.lock.deferUntilLocked(timeout=10)
    15671844        d.addErrback(onTimeout)
    15681845
    1569         self.clock.pump([1]*10)
     1846        self.clock.pump([1] * 10)
    15701847
    15711848        return d
    15721849
  • twisted/internet/defer.py

     
    1010@var _NO_RESULT: The result used to represent the fact that there is no
    1111    result. B{Never ever ever use this as an actual result for a Deferred}.  You
    1212    have been warned.
     13
     14@var _CONTINUE: A marker left in L{Deferred.callbacks} to indicate a Deferred
     15    chain.  Always accompanied by a Deferred instance in the args tuple pointing
     16    at the Deferred which is chained to the Deferred which has this marker.
    1317"""
    1418
    1519import traceback
     
    169173
    170174# See module docstring.
    171175_NO_RESULT = object()
     176_CONTINUE = object()
    172177
    173178
    174179
     
    425430            self.result.cancel()
    426431
    427432
    428     def _continue(self, result):
    429         self.result = result
    430         self.unpause()
    431 
    432 
    433433    def _startRunCallbacks(self, result):
    434434        if self.called:
    435435            if self._suppressAlreadyCalled:
     
    450450        self._runCallbacks()
    451451
    452452
     453    def _continuation(self):
     454        """
     455        Build a tuple of callback and errback with L{_continue} to be used by
     456        L{_addContinue} and L{_removeContinue} on another Deferred.
     457        """
     458        return ((_CONTINUE, (self,), None),
     459                (_CONTINUE, (self,), None))
     460
     461
    453462    def _runCallbacks(self):
     463        """
     464        Run the chain of callbacks once a result is available.
     465
     466        This consists of a simple loop over all of the callbacks, calling each
     467        with the current result and making the current result equal to the
     468        return value (or raised exception) of that call.
     469
     470        If C{self._runningCallbacks} is true, this loop won't run at all, since
     471        it is already running above us on the call stack.  If C{self.paused} is
     472        true, the loop also won't run, because that's what it means to be
     473        paused.
     474
     475        The loop will terminate before processing all of the callbacks if a
     476        C{Deferred} without a result is encountered.
     477
     478        If a C{Deferred} I{with} a result is encountered, that result is taken
     479        and the loop proceeds.
     480
     481        @note: The implementation is complicated slightly by the fact that
     482            chaining (associating two Deferreds with each other such that one
     483            will wait for the result of the other, as happens when a Deferred is
     484            returned from a callback on another Deferred) is supported
     485            iteratively rather than recursively, to avoid running out of stack
     486            frames when processing long chains.
     487        """
    454488        if self._runningCallbacks:
    455489            # Don't recursively run callbacks
    456490            return
    457         if not self.paused:
    458             self._chainedTo = None
    459             while self.callbacks:
    460                 item = self.callbacks.pop(0)
     491
     492        # Keep track of all the Deferreds encountered while propagating results
     493        # up a chain.  The way a Deferred gets onto this stack is by having
     494        # added its _continuation() to the callbacks list of a second Deferred
     495        # and then that second Deferred being fired.  ie, if ever had _chainedTo
     496        # set to something other than None, you might end up on this stack.
     497        chain = [self]
     498
     499        while chain:
     500            current = chain[-1]
     501
     502            if current.paused:
     503                # This Deferred isn't going to produce a result at all.  All the
     504                # Deferreds up the chain waiting on it will just have to...
     505                # wait.
     506                return
     507
     508            current._chainedTo = None
     509            if current.callbacks:
     510                item = current.callbacks.pop(0)
    461511                callback, args, kw = item[
    462                     isinstance(self.result, failure.Failure)]
     512                    isinstance(current.result, failure.Failure)]
    463513                args = args or ()
    464514                kw = kw or {}
     515
     516                # Avoid recursion if we can.
     517                if callback is _CONTINUE:
     518                    # Give the waiting Deferred our current result and then
     519                    # forget about that result ourselves.
     520                    chainee = args[0]
     521                    chainee.result = current.result
     522                    current.result = None
     523                    # Making sure to update _debugInfo
     524                    if current._debugInfo is not None:
     525                        current._debugInfo.failResult = None
     526                    chainee.paused -= 1
     527                    chain.append(chainee)
     528                    # Delay cleaning this Deferred and popping it from the chain
     529                    # until after we've dealt with chainee.
     530                    continue
     531
    465532                try:
    466                     self._runningCallbacks = True
     533                    current._runningCallbacks = True
    467534                    try:
    468                         self.result = callback(self.result, *args, **kw)
     535                        current.result = callback(current.result, *args, **kw)
    469536                    finally:
    470                         self._runningCallbacks = False
    471                     if isinstance(self.result, Deferred):
    472                         # note: this will cause _runCallbacks to be called
    473                         # recursively if self.result already has a result.
    474                         # This shouldn't cause any problems, since there is no
    475                         # relevant state in this stack frame at this point.
    476                         # The recursive call will continue to process
    477                         # self.callbacks until it is empty, then return here,
    478                         # where there is no more work to be done, so this call
    479                         # will return as well.
    480                         self.pause()
    481                         self._chainedTo = self.result
    482                         self.result.addBoth(self._continue)
    483                         break
     537                        current._runningCallbacks = False
    484538                except:
    485                     self.result = failure.Failure()
     539                    current.result = failure.Failure()
     540                else:
     541                    if isinstance(current.result, Deferred):
     542                        # The result is another Deferred.  If it has a result,
     543                        # we can take it and keep going.
     544                        resultResult = getattr(current.result, 'result', _NO_RESULT)
     545                        if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused:
     546                            # Nope, it didn't.  Pause and chain.
     547                            current.pause()
     548                            current._chainedTo = current.result
     549                            # Note: current.result has no result, so it's not
     550                            # running its callbacks right now.  Therefore we can
     551                            # append to the callbacks list directly instead of
     552                            # using addCallbacks.
     553                            current.result.callbacks.append(current._continuation())
     554                            # Chaining a Deferred replaces any failure.
     555                            if current._debugInfo is not None:
     556                                current._debugInfo.failResult = None                           
     557                            break
     558                        else:
     559                            # Yep, it did.  Steal it.
     560                            current.result.result = None
     561                            # Make sure _debugInfo's failure state is updated.
     562                            if current.result._debugInfo is not None:
     563                                current.result._debugInfo.failResult = None
     564                            current.result = resultResult
     565            else:
     566                # As much of the callback chain - perhaps all of it - as can be
     567                # processed right now has been.  The current Deferred is waiting on
     568                # another Deferred or for more callbacks.  Before finishing with it,
     569                # make sure its _debugInfo is in the proper state.
     570                if isinstance(current.result, failure.Failure):
     571                    # Stash the Failure in the _debugInfo for unhandled error
     572                    # reporting.
     573                    current.result.cleanFailure()
     574                    if current._debugInfo is None:
     575                        current._debugInfo = DebugInfo()
     576                    current._debugInfo.failResult = current.result
     577                else:
     578                    # Clear out any Failure in the _debugInfo, since the result
     579                    # is no longer a Failure.
     580                    if current._debugInfo is not None:
     581                        current._debugInfo.failResult = None
    486582
    487         if isinstance(self.result, failure.Failure):
    488             self.result.cleanFailure()
    489             if self._debugInfo is None:
    490                 self._debugInfo = DebugInfo()
    491             self._debugInfo.failResult = self.result
    492         else:
    493             if self._debugInfo is not None:
    494                 self._debugInfo.failResult = None
     583                # This Deferred is done, pop it from the chain and move back up
     584                # to the Deferred which supplied us with our result.
     585                chain.pop()
    495586
    496587
    497 
    498588    def __str__(self):
    499589        """
    500590        Return a string representation of this C{Deferred}.
     
    520610
    521611    failResult = None
    522612
    523 
    524613    def _getDebugTracebacks(self):
    525614        info = ''
    526615        if hasattr(self, "creator"):