Index: twisted/test/test_defer.py
===================================================================
--- twisted/test/test_defer.py	(revision 16967)
+++ twisted/test/test_defer.py	(working copy)
@@ -110,7 +110,7 @@
         dl = defer.DeferredList([], fireOnOneCallback=1)
         dl.addCallbacks(cb)
         self.failUnlessEqual(result, [])
-        
+
     def testDeferredListFireOnOneError(self):
         defr1 = defer.Deferred()
         defr2 = defer.Deferred()
@@ -137,7 +137,7 @@
         failure = result[0]
 
         # the type of the failure is a FirstError
-        self.failUnless(issubclass(failure.type, defer.FirstError), 
+        self.failUnless(issubclass(failure.type, defer.FirstError),
             'issubclass(failure.type, defer.FirstError) failed: '
             'failure.type is %r' % (failure.type,)
         )
@@ -153,8 +153,8 @@
         self.failUnlessEqual(firstError.subFailure.value.args, ("from def2",))
         self.failUnlessEqual(firstError.index, 1)
 
-        
 
+
     def testDeferredListDontConsumeErrors(self):
         d1 = defer.Deferred()
         dl = defer.DeferredList([d1])
@@ -473,7 +473,88 @@
         defer.setDebugging(True)
         d.addBoth(lambda ign: None)
 
+class FooError(Exception):
+    pass
 
+class DeferredCancellerTest(unittest.TestCase):
+    def setUp(self):
+        self.callback_results = None
+        self.errback_results = None
+        self.callback2_results = None
+        self.cancellerCalled = False
+
+    def _callback(self, data):
+        self.callback_results = data
+        return args[0]
+
+    def _callback2(self, data):
+        self.callback2_results = data
+
+    def _errback(self, data):
+        self.errback_results = data
+
+
+    def testNoCanceller(self):
+        # Deferred without a canceller errbacks defer.CancelledError
+        d=defer.Deferred()
+        d.addCallbacks(self._callback, self._errback)
+        d.cancel()
+        self.assertEquals(self.errback_results.type, defer.CancelledError)
+
+        # Test that further callbacks *are* swallowed
+        d.callback(None)
+
+        # But that a second is not
+        self.assertRaises(defer.AlreadyCalledError, d.callback, None)
+
+    def testCanceller(self):
+        def cancel(d):
+            self.cancellerCalled=True
+
+        d=defer.Deferred(canceller=cancel)
+        d.addCallbacks(self._callback, self._errback)
+        d.cancel()
+        self.assertEquals(self.cancellerCalled, True)
+        self.assertEquals(self.errback_results.type, defer.CancelledError)
+
+        # Test that further callbacks are *not* swallowed
+        self.assertRaises(defer.AlreadyCalledError, d.callback, None)
+
+    def testCancellerWithCallback(self):
+        # If we explicitly callback from the canceller, don't callback CancelledError
+        def cancel(d):
+            self.cancellerCalled=True
+            d.errback(FooError())
+        d=defer.Deferred(canceller=cancel)
+        d.addCallbacks(self._callback, self._errback)
+        d.cancel()
+        self.assertEquals(self.cancellerCalled, True)
+        self.assertEquals(self.errback_results.type, FooError)
+
+    def testCancelAlreadyCalled(self):
+        def cancel(d):
+            self.cancellerCalled=True
+        d=defer.Deferred(canceller=cancel)
+        d.callback(None)
+        self.assertRaises(defer.AlreadyCalledError, d.cancel)
+        self.assertEquals(self.cancellerCalled, False)
+
+    def testCancelNestedDeferred(self):
+        def innerCancel(d):
+            self.assertIdentical(d, innerDeferred)
+            self.cancellerCalled=True
+        def cancel(d):
+            self.assert_(False)
+
+        innerDeferred=defer.Deferred(canceller=innerCancel)
+        d=defer.Deferred(canceller=cancel)
+        d.callback(None)
+        d.addCallback(lambda data: innerDeferred)
+        d.cancel()
+        d.addCallbacks(self._callback, self._errback)
+        self.assertEquals(self.cancellerCalled, True)
+        self.assertEquals(self.errback_results.type, defer.CancelledError)
+
 class LogTestCase(unittest.TestCase):
 
     def setUp(self):
@@ -563,6 +644,10 @@
         self.failUnless(lock.locked)
         self.assertEquals(self.counter, 3)
 
+        d = lock.acquire().addBoth(lambda x: setattr(self, 'result', x))
+        d.cancel()
+        self.assertEquals(self.result.type, defer.CancelledError)
+
         lock.release()
         self.failIf(lock.locked)
 
@@ -591,6 +676,16 @@
             sem.acquire().addCallback(self._incr)
             self.assertEquals(self.counter, i)
 
+
+        success = []
+        def fail(r):
+            success.append(False)
+        def succeed(r):
+            success.append(True)
+        d = sem.acquire().addCallbacks(fail, succeed)
+        d.cancel()
+        self.assertEquals(success, [True])
+
         sem.acquire().addCallback(self._incr)
         self.assertEquals(self.counter, N)
 
@@ -638,3 +733,14 @@
         queue = defer.DeferredQueue(backlog=0)
         self.assertRaises(defer.QueueUnderflow, queue.get)
 
+        queue = defer.DeferredQueue(size=0)
+
+        success = []
+        def fail(r):
+            success.append(False)
+        def succeed(r):
+            success.append(True)
+        d = queue.get().addCallbacks(fail, succeed)
+        d.cancel()
+        self.assertEquals(success, [True])
+        self.assertRaises(defer.QueueOverflow, queue.put, None)
Index: twisted/internet/defer.py
===================================================================
--- twisted/internet/defer.py	(revision 16967)
+++ twisted/internet/defer.py	(working copy)
@@ -22,8 +22,10 @@
 class AlreadyCalledError(Exception):
     pass
 
-class TimeoutError(Exception):
+class CancelledError(Exception):
     pass
+# Backwards compatibility
+TimeoutError = CancelledError
 
 def logError(err):
     log.err(err)
@@ -117,7 +119,7 @@
     return deferred
 
 def timeout(deferred):
-    deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
+    deferred.cancel()
 
 def passthru(arg):
     return arg
@@ -149,18 +151,25 @@
 
     For more information about Deferreds, see doc/howto/defer.html or
     U{http://twistedmatrix.com/projects/core/documentation/howto/defer.html}
+
+    When creating a Deferred, you should provide a canceller function, which
+    will be called by d.cancel() to let you do any cleanup necessary if the
+    user decides not to wait for the deferred to complete.
     """
+
     called = 0
     paused = 0
     timeoutCall = None
     _debugInfo = None
+    _suppressAlreadyCalled = 0
 
     # Keep this class attribute for now, for compatibility with code that
     # sets it directly.
     debug = False
 
-    def __init__(self):
+    def __init__(self, canceller=None):
         self.callbacks = []
+        self.canceller = canceller
         if self.debug:
             self._debugInfo = DebugInfo()
             self._debugInfo.creator = traceback.format_stack()[:-1]
@@ -177,7 +186,7 @@
         cbs = ((callback, callbackArgs, callbackKeywords),
                (errback or (passthru), errbackArgs, errbackKeywords))
         self.callbacks.append(cbs)
-            
+
         if self.called:
             self._runCallbacks()
         return self
@@ -253,12 +262,14 @@
 
     def pause(self):
         """Stop processing on a Deferred until L{unpause}() is called.
+        You probably don't ever have a reason to call this function.
         """
         self.paused = self.paused + 1
 
 
     def unpause(self):
         """Process all callbacks made since L{pause}() was called.
+        You probably don't ever have a reason to call this function.
         """
         self.paused = self.paused - 1
         if self.paused:
@@ -266,12 +277,53 @@
         if self.called:
             self._runCallbacks()
 
+    def cancel(self):
+        """Cancel this deferred.
+
+        If the deferred is waiting on another deferred, forward the
+        cancellation to the other deferred.
+
+        If the deferred has not yet been errback'd/callback'd, call
+        the canceller function provided to the constructor. If that
+        function does not do a callback/errback, or if no canceller
+        function was provided, errback with CancelledError.
+
+        Otherwise, raise AlreadyCalledError.
+        """
+        canceller = self.canceller
+        if not self.called:
+            if canceller:
+                canceller(self)
+            else:
+                # Eat the callback that will eventually be fired
+                # since there was no real canceller.
+                self._suppressAlreadyCalled = 1
+
+            if not self.called:
+                # The canceller didn't do an errback of its own
+                try:
+                    raise CancelledError
+                except:
+                    self.errback(failure.Failure())
+        elif isinstance(self.result, Deferred):
+            # Waiting for another deferred -- cancel it instead
+            self.result.cancel()
+        else:
+            # Called and not waiting for another deferred
+            raise AlreadyCalledError
+
     def _continue(self, result):
         self.result = result
         self.unpause()
 
     def _startRunCallbacks(self, result):
+        # Canceller is no longer relevant
+        self.canceller = None
+
         if self.called:
+            if self._suppressAlreadyCalled:
+                self._suppressAlreadyCalled = 0
+                return
             if self.debug:
                 if self._debugInfo is None:
                     self._debugInfo = DebugInfo()
@@ -337,7 +389,8 @@
 
         @param timeoutFunc: will receive the Deferred and *args, **kw as its
         arguments.  The default timeoutFunc will call the errback with a
-        L{TimeoutError}.
+        L{CancelledError}.
+
         """
         warnings.warn(
             "Deferred.setTimeout is deprecated.  Look for timeout "
@@ -393,7 +446,7 @@
 
 class FirstError(Exception):
     """First error to occur in a DeferredList if fireOnOneErrback is set.
-    
+
     @ivar subFailure: the L{Failure} that occurred.
     @ivar index: the index of the Deferred in the DeferredList where it
     happened.
@@ -424,7 +477,7 @@
         if isinstance(other, tuple):
             return tuple(self) == other
         elif isinstance(other, FirstError):
-            return (self.subFailure == other.subFailure and 
+            return (self.subFailure == other.subFailure and
                     self.index == other.index)
         return False
 
@@ -705,12 +758,15 @@
 
     locked = 0
 
+    def _cancelAcquire(self, d):
+        self.waiting.remove(d)
+
     def acquire(self):
         """Attempt to acquire the lock.
 
         @return: a Deferred which fires on lock acquisition.
         """
-        d = Deferred()
+        d = Deferred(canceller=self._cancelAcquire)
         if self.locked:
             self.waiting.append(d)
         else:
@@ -743,13 +799,16 @@
         self.tokens = tokens
         self.limit = tokens
 
+    def _cancelAcquire(self, d):
+        self.waiting.remove(d)
+
     def acquire(self):
         """Attempt to acquire the token.
 
         @return: a Deferred which fires on token acquisition.
         """
         assert self.tokens >= 0, "Internal inconsistency??  tokens should never be negative"
-        d = Deferred()
+        d = Deferred(canceller=self._cancelAcquire)
         if not self.tokens:
             self.waiting.append(d)
         else:
@@ -803,6 +862,9 @@
         self.size = size
         self.backlog = backlog
 
+    def _cancelGet(self, d):
+        self.waiting.remove(d)
+
     def put(self, obj):
         """Add an object to this queue.
 
@@ -826,7 +888,7 @@
         if self.pending:
             return succeed(self.pending.pop(0))
         elif self.backlog is None or len(self.waiting) < self.backlog:
-            d = Deferred()
+            d = Deferred(canceller=self._cancelGet)
             self.waiting.append(d)
             return d
         else:
@@ -834,7 +896,7 @@
 
 
 __all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
-           "AlreadyCalledError", "TimeoutError", "gatherResults",
+           "AlreadyCalledError", "TimeoutError", "CancelledError", "gatherResults",
            "maybeDeferred", "waitForDeferred", "deferredGenerator",
            "DeferredLock", "DeferredSemaphore", "DeferredQueue",
           ]
Index: twisted/trial/test/test_deferred.py
===================================================================
--- twisted/trial/test/test_deferred.py	(revision 16967)
+++ twisted/trial/test/test_deferred.py	(working copy)
@@ -219,3 +219,4 @@
         self.failIf(result.wasSuccessful())
         self.failUnlessEqual(len(result.errors), 1)
         self._wasTimeout(result.errors[0][1])
+        self.failUnless(result.errors[0][1].check(defer.CancelledError))
