Index: twisted/test/test_defer.py
===================================================================
--- twisted/test/test_defer.py	(revision 29882)
+++ twisted/test/test_defer.py	(working copy)
@@ -1,12 +1,11 @@
 # Copyright (c) 2001-2010 Twisted Matrix Laboratories.
 # See LICENSE for details.
 
-
 """
 Test cases for defer module.
 """
 
-import gc
+import gc, traceback
 
 from twisted.trial import unittest
 from twisted.internet import reactor, defer
@@ -239,7 +238,7 @@
 
     def testCallbackErrors(self):
         l = []
-        d = defer.Deferred().addCallback(lambda _: 1/0).addErrback(l.append)
+        d = defer.Deferred().addCallback(lambda _: 1 / 0).addErrback(l.append)
         d.callback(1)
         self.assert_(isinstance(l[0].value, ZeroDivisionError))
         l = []
@@ -265,8 +264,47 @@
         d2.callback(2)
         assert self.callbackResults is None, "Still should not have been called yet."
         d2.unpause()
-        assert self.callbackResults[0][0] == 2, "Result should have been from second deferred:%s"% (self.callbackResults,)
+        assert self.callbackResults[0][0] == 2, "Result should have been from second deferred:%s" % (self.callbackResults,)
 
+
+    def test_chainedPausedDeferredWithResult(self):
+        """
+        When a paused Deferred with a result is returned from a callback on
+        another Deferred, the other Deferred is chained to the first and waits
+        for it to be unpaused.
+        """
+        expected = object()
+        paused = defer.Deferred()
+        paused.callback(expected)
+        paused.pause()
+        chained = defer.Deferred()
+        chained.addCallback(lambda ignored: paused)
+        chained.callback(None)
+
+        result = []
+        chained.addCallback(result.append)
+        self.assertEquals(result, [])
+        paused.unpause()
+        self.assertEquals(result, [expected])
+
+
+    def test_pausedDeferredChained(self):
+        """
+        A paused Deferred encountered while pushing a result forward through a
+        chain does not prevent earlier Deferreds from continuing to execute
+        their callbacks.
+        """
+        first = defer.Deferred()
+        second = defer.Deferred()
+        first.addCallback(lambda ignored: second)
+        first.callback(None)
+        first.pause()
+        second.callback(None)
+        result = []
+        second.addCallback(result.append)
+        self.assertEquals(result, [None])
+
+
     def testGatherResults(self):
         # test successful list of deferreds
         l = []
@@ -336,6 +374,125 @@
         return self.assertFailure(d2, RuntimeError)
 
 
+    def test_innerCallbacksPreserved(self):
+        """
+        When a L{Deferred} encounters a result which is another L{Deferred}
+        which is waiting on a third L{Deferred}, the middle L{Deferred}'s
+        callbacks are executed after the third L{Deferred} fires and before the
+        first receives a result.
+        """
+        results = []
+        failures = []
+        inner = defer.Deferred()
+        def cb(result):
+            results.append(('start-of-cb', result))
+            d = defer.succeed('inner')
+            def firstCallback(result):
+                results.append(('firstCallback', 'inner'))
+                return inner
+            def secondCallback(result):
+                results.append(('secondCallback', result))
+                return result * 2
+            d.addCallback(firstCallback).addCallback(secondCallback)
+            d.addErrback(failures.append)
+            return d
+        outer = defer.succeed('outer')
+        outer.addCallback(cb)
+        inner.callback('orange')
+        outer.addCallback(results.append)
+        inner.addErrback(failures.append)
+        outer.addErrback(failures.append)
+        self.assertEqual([], failures)
+        self.assertEqual(
+            results,
+            [('start-of-cb', 'outer'),
+             ('firstCallback', 'inner'),
+             ('secondCallback', 'orange'),
+             'orangeorange'])
+             
+    def test_continueCallbackNotFirst(self):
+        """
+        The continue callback of a L{Deferred} waiting for another L{Deferred}
+        is not necessarily the first one. This is somewhat a whitebox test
+        checking that we search for that callback among the whole list of
+        callbacks.
+        """
+        results = []
+        failures = []
+        a = defer.Deferred()
+
+        def cb(result):
+            results.append(('cb', result))
+            d = defer.Deferred()
+
+            def firstCallback(ignored):
+                results.append(('firstCallback', ignored))
+                return defer.gatherResults([a])
+
+            def secondCallback(result):
+                results.append(('secondCallback', result))
+
+            d.addCallback(firstCallback)
+            d.addCallback(secondCallback)
+            d.addErrback(failures.append)
+            d.callback(None)
+            return d
+
+        outer = defer.succeed('outer')
+        outer.addCallback(cb)
+        outer.addErrback(failures.append)
+        self.assertEquals([('cb', 'outer'), ('firstCallback', None)], results)
+        a.callback('withers')
+        self.assertEquals([], failures)
+        self.assertEquals(
+            results,
+            [('cb', 'outer'),
+             ('firstCallback', None),
+             ('secondCallback', ['withers'])])
+
+
+    def test_callbackOrderPreserved(self):
+        """
+        A callback added to a L{Deferred} after a previous callback attached
+        another L{Deferred} as a result is run after the callbacks of the other
+        L{Deferred} are run.
+        """
+        results = []
+        failures = []
+        a = defer.Deferred()
+
+        def cb(result):
+            results.append(('cb', result))
+            d = defer.Deferred()
+
+            def firstCallback(ignored):
+                results.append(('firstCallback', ignored))
+                return defer.gatherResults([a])
+
+            def secondCallback(result):
+                results.append(('secondCallback', result))
+
+            d.addCallback(firstCallback)
+            d.addCallback(secondCallback)
+            d.addErrback(failures.append)
+            d.callback(None)
+            return d
+
+        outer = defer.Deferred()
+        outer.addCallback(cb)
+        outer.addCallback(lambda x: results.append('final'))
+        outer.addErrback(failures.append)
+        outer.callback('outer')
+        self.assertEquals([('cb', 'outer'), ('firstCallback', None)], results)
+        a.callback('withers')
+        self.assertEquals([], failures)
+        self.assertEquals(
+            results,
+            [('cb', 'outer'),
+             ('firstCallback', None),
+             ('secondCallback', ['withers']), 'final'])
+
+
     def test_reentrantRunCallbacks(self):
         """
         A callback added to a L{Deferred} by a callback on that L{Deferred}
@@ -721,7 +878,74 @@
                 hex(unsignedID(a)), hex(unsignedID(b))))
 
 
+    def test_boundedStackDepth(self):
+        """
+        The depth of the call stack does not grow as more L{Deferred} instances
+        are chained together.
+        """
+        def chainDeferreds(howMany):
+            stack = []
+            def recordStackDepth(ignored):
+                stack.append(len(traceback.extract_stack()))
 
+            top = defer.Deferred()
+            innerDeferreds = [defer.Deferred() for ignored in range(howMany)]
+            originalInners = innerDeferreds[:]
+            last = defer.Deferred()
+
+            inner = innerDeferreds.pop()
+            top.addCallback(lambda ign, inner=inner: inner)
+            top.addCallback(recordStackDepth)
+
+            while innerDeferreds:
+                newInner = innerDeferreds.pop()
+                inner.addCallback(lambda ign, inner=newInner: inner)
+                inner = newInner
+            inner.addCallback(lambda ign: last)
+
+            top.callback(None)
+            for inner in originalInners:
+                inner.callback(None)
+
+            # Sanity check - the record callback is not intended to have
+            # fired yet.
+            self.assertEquals(stack, [])
+
+            # Now fire the last thing and return the stack depth at which the
+            # callback was invoked.
+            last.callback(None)
+            return stack[0]
+
+        # Callbacks should be invoked at the same stack depth regardless of
+        # how many Deferreds are chained.
+        self.assertEquals(chainDeferreds(1), chainDeferreds(2))
+
+
+    def test_resultOfDeferredResultOfDeferredOfFiredDeferredCalled(self):
+        """
+        Given three Deferreds, one chained to the next chained to the next,
+        callbacks on the middle Deferred which are added after the chain is
+        created are called once the last Deferred fires.
+
+        This is more of a regression-style test.  It doesn't exercise any
+        particular code path through the current implementation of Deferred, but
+        it does exercise a broken codepath through one of the variations of the
+        implementation proposed as a resolution to ticket #411.
+        """
+        first = defer.Deferred()
+        second = defer.Deferred()
+        third = defer.Deferred()
+        first.addCallback(lambda ignored: second)
+        second.addCallback(lambda ignored: third)
+        second.callback(None)
+        first.callback(None)
+        third.callback(None)
+        L = []
+        second.addCallback(L.append)
+        self.assertEquals(L, [None])
+
+
+
 class FirstErrorTests(unittest.TestCase):
     """
     Tests for L{FirstError}.
@@ -1214,11 +1438,16 @@
         """
         log.removeObserver(self.c.append)
 
+
+    def _loggedErrors(self):
+        return [e for e in self.c if e["isError"]]
+
+
     def _check(self):
         """
         Check the output of the log observer to see if the error is present.
         """
-        c2 = [e for e in self.c if e["isError"]]
+        c2 = self._loggedErrors()
         self.assertEquals(len(c2), 2)
         c2[1]["failure"].trap(ZeroDivisionError)
         self.flushLoggedErrors(ZeroDivisionError)
@@ -1229,7 +1458,7 @@
         and its final result (the one not handled by any callback) is an
         exception, that exception will be logged immediately.
         """
-        defer.Deferred().addCallback(lambda x: 1/0).callback(1)
+        defer.Deferred().addCallback(lambda x: 1 / 0).callback(1)
         gc.collect()
         self._check()
 
@@ -1239,7 +1468,7 @@
         """
         def _subErrorLogWithInnerFrameRef():
             d = defer.Deferred()
-            d.addCallback(lambda x: 1/0)
+            d.addCallback(lambda x: 1 / 0)
             d.callback(1)
 
         _subErrorLogWithInnerFrameRef()
@@ -1252,7 +1481,7 @@
         """
         def _subErrorLogWithInnerFrameCycle():
             d = defer.Deferred()
-            d.addCallback(lambda x, d=d: 1/0)
+            d.addCallback(lambda x, d=d: 1 / 0)
             d._d = d
             d.callback(1)
 
@@ -1261,6 +1490,54 @@
         self._check()
 
 
+    def test_chainedErrorCleanup(self):
+        """
+        If one Deferred with an error result is returned from a callback on
+        another Deferred, when the first Deferred is garbage collected it does
+        not log its error.
+        """
+        d = defer.Deferred()
+        d.addCallback(lambda ign: defer.fail(RuntimeError("zoop")))
+        d.callback(None)
+
+        # Sanity check - this isn't too interesting, but we do want the original
+        # Deferred to have gotten the failure.
+        results = []
+        errors = []
+        d.addCallbacks(results.append, errors.append)
+        self.assertEquals(results, [])
+        self.assertEquals(len(errors), 1)
+        errors[0].trap(Exception)
+
+        # Get rid of any references we might have to the inner Deferred (none of
+        # these should really refer to it, but we're just being safe).
+        del results, errors, d
+        # Force a collection cycle so that there's a chance for an error to be
+        # logged, if it's going to be logged.
+        gc.collect()
+        # And make sure it is not.
+        self.assertEquals(self._loggedErrors(), [])
+
+
+    def test_errorClearedByChaining(self):
+        """
+        If a Deferred with a failure result has an errback which chains it to
+        another Deferred, the initial failure is cleared by the errback so it is
+        not logged.
+        """
+        # Start off with a Deferred with a failure for a result
+        bad = defer.fail(Exception("oh no"))
+        good = defer.Deferred()
+        # Give it a callback that chains it to another Deferred
+        bad.addErrback(lambda ignored: good)
+        # That's all, clean it up.  No Deferred here still has a failure result,
+        # so nothing should be logged.
+        good = bad = None
+        gc.collect()
+        self.assertEquals(self._loggedErrors(), [])
+
+
+
 class DeferredTestCaseII(unittest.TestCase):
     def setUp(self):
         self.callbackRan = 0
@@ -1546,7 +1823,7 @@
         d = self.lock.deferUntilLocked(timeout=5.5)
         self.assertFailure(d, defer.TimeoutError)
 
-        self.clock.pump([1]*10)
+        self.clock.pump([1] * 10)
 
         return d
 
@@ -1566,7 +1843,7 @@
         d = self.lock.deferUntilLocked(timeout=10)
         d.addErrback(onTimeout)
 
-        self.clock.pump([1]*10)
+        self.clock.pump([1] * 10)
 
         return d
 
Index: twisted/internet/defer.py
===================================================================
--- twisted/internet/defer.py	(revision 29882)
+++ twisted/internet/defer.py	(working copy)
@@ -10,6 +10,10 @@
 @var _NO_RESULT: The result used to represent the fact that there is no
     result. B{Never ever ever use this as an actual result for a Deferred}.  You
     have been warned.
+
+@var _CONTINUE: A marker left in L{Deferred.callbacks} to indicate a Deferred
+    chain.  Always accompanied by a Deferred instance in the args tuple pointing
+    at the Deferred which is chained to the Deferred which has this marker.
 """
 
 import traceback
@@ -169,6 +173,7 @@
 
 # See module docstring.
 _NO_RESULT = object()
+_CONTINUE = object()
 
 
 
@@ -425,11 +430,6 @@
             self.result.cancel()
 
 
-    def _continue(self, result):
-        self.result = result
-        self.unpause()
-
-
     def _startRunCallbacks(self, result):
         if self.called:
             if self._suppressAlreadyCalled:
@@ -450,51 +450,141 @@
         self._runCallbacks()
 
 
+    def _continuation(self):
+        """
+        Build a tuple of callback and errback with L{_continue} to be used by
+        L{_addContinue} and L{_removeContinue} on another Deferred.
+        """
+        return ((_CONTINUE, (self,), None),
+                (_CONTINUE, (self,), None))
+
+
     def _runCallbacks(self):
+        """
+        Run the chain of callbacks once a result is available.
+
+        This consists of a simple loop over all of the callbacks, calling each
+        with the current result and making the current result equal to the
+        return value (or raised exception) of that call.
+
+        If C{self._runningCallbacks} is true, this loop won't run at all, since
+        it is already running above us on the call stack.  If C{self.paused} is
+        true, the loop also won't run, because that's what it means to be
+        paused.
+
+        The loop will terminate before processing all of the callbacks if a
+        C{Deferred} without a result is encountered.
+
+        If a C{Deferred} I{with} a result is encountered, that result is taken
+        and the loop proceeds.
+
+        @note: The implementation is complicated slightly by the fact that
+            chaining (associating two Deferreds with each other such that one
+            will wait for the result of the other, as happens when a Deferred is
+            returned from a callback on another Deferred) is supported
+            iteratively rather than recursively, to avoid running out of stack
+            frames when processing long chains.
+        """
         if self._runningCallbacks:
             # Don't recursively run callbacks
             return
-        if not self.paused:
-            self._chainedTo = None
-            while self.callbacks:
-                item = self.callbacks.pop(0)
+
+        # Keep track of all the Deferreds encountered while propagating results
+        # up a chain.  The way a Deferred gets onto this stack is by having
+        # added its _continuation() to the callbacks list of a second Deferred
+        # and then that second Deferred being fired.  ie, if ever had _chainedTo
+        # set to something other than None, you might end up on this stack.
+        chain = [self]
+
+        while chain:
+            current = chain[-1]
+
+            if current.paused:
+                # This Deferred isn't going to produce a result at all.  All the
+                # Deferreds up the chain waiting on it will just have to...
+                # wait.
+                return
+
+            current._chainedTo = None
+            if current.callbacks:
+                item = current.callbacks.pop(0)
                 callback, args, kw = item[
-                    isinstance(self.result, failure.Failure)]
+                    isinstance(current.result, failure.Failure)]
                 args = args or ()
                 kw = kw or {}
+
+                # Avoid recursion if we can.
+                if callback is _CONTINUE:
+                    # Give the waiting Deferred our current result and then
+                    # forget about that result ourselves.
+                    chainee = args[0]
+                    chainee.result = current.result
+                    current.result = None
+                    # Making sure to update _debugInfo
+                    if current._debugInfo is not None:
+                        current._debugInfo.failResult = None
+                    chainee.paused -= 1
+                    chain.append(chainee)
+                    # Delay cleaning this Deferred and popping it from the chain
+                    # until after we've dealt with chainee.
+                    continue
+
                 try:
-                    self._runningCallbacks = True
+                    current._runningCallbacks = True
                     try:
-                        self.result = callback(self.result, *args, **kw)
+                        current.result = callback(current.result, *args, **kw)
                     finally:
-                        self._runningCallbacks = False
-                    if isinstance(self.result, Deferred):
-                        # note: this will cause _runCallbacks to be called
-                        # recursively if self.result already has a result.
-                        # This shouldn't cause any problems, since there is no
-                        # relevant state in this stack frame at this point.
-                        # The recursive call will continue to process
-                        # self.callbacks until it is empty, then return here,
-                        # where there is no more work to be done, so this call
-                        # will return as well.
-                        self.pause()
-                        self._chainedTo = self.result
-                        self.result.addBoth(self._continue)
-                        break
+                        current._runningCallbacks = False
                 except:
-                    self.result = failure.Failure()
+                    current.result = failure.Failure()
+                else:
+                    if isinstance(current.result, Deferred):
+                        # The result is another Deferred.  If it has a result,
+                        # we can take it and keep going.
+                        resultResult = getattr(current.result, 'result', _NO_RESULT)
+                        if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused:
+                            # Nope, it didn't.  Pause and chain.
+                            current.pause()
+                            current._chainedTo = current.result
+                            # Note: current.result has no result, so it's not
+                            # running its callbacks right now.  Therefore we can
+                            # append to the callbacks list directly instead of
+                            # using addCallbacks.
+                            current.result.callbacks.append(current._continuation())
+                            # Chaining a Deferred replaces any failure.
+                            if current._debugInfo is not None:
+                                current._debugInfo.failResult = None                            
+                            break
+                        else:
+                            # Yep, it did.  Steal it.
+                            current.result.result = None
+                            # Make sure _debugInfo's failure state is updated.
+                            if current.result._debugInfo is not None:
+                                current.result._debugInfo.failResult = None
+                            current.result = resultResult
+            else:
+                # As much of the callback chain - perhaps all of it - as can be
+                # processed right now has been.  The current Deferred is waiting on
+                # another Deferred or for more callbacks.  Before finishing with it,
+                # make sure its _debugInfo is in the proper state.
+                if isinstance(current.result, failure.Failure):
+                    # Stash the Failure in the _debugInfo for unhandled error
+                    # reporting.
+                    current.result.cleanFailure()
+                    if current._debugInfo is None:
+                        current._debugInfo = DebugInfo()
+                    current._debugInfo.failResult = current.result
+                else:
+                    # Clear out any Failure in the _debugInfo, since the result
+                    # is no longer a Failure.
+                    if current._debugInfo is not None:
+                        current._debugInfo.failResult = None
 
-        if isinstance(self.result, failure.Failure):
-            self.result.cleanFailure()
-            if self._debugInfo is None:
-                self._debugInfo = DebugInfo()
-            self._debugInfo.failResult = self.result
-        else:
-            if self._debugInfo is not None:
-                self._debugInfo.failResult = None
+                # This Deferred is done, pop it from the chain and move back up
+                # to the Deferred which supplied us with our result.
+                chain.pop()
 
 
-
     def __str__(self):
         """
         Return a string representation of this C{Deferred}.
@@ -520,7 +610,6 @@
 
     failResult = None
 
-
     def _getDebugTracebacks(self):
         info = ''
         if hasattr(self, "creator"):
