Index: twisted/internet/defer.py
===================================================================
--- twisted/internet/defer.py	(revision 13767)
+++ twisted/internet/defer.py	(working copy)
@@ -25,8 +25,10 @@
 class AlreadyArmedError(Exception):
     pass
 
-class TimeoutError(Exception):
+class CancelledError(Exception):
     pass
+# Backwards compatibility
+TimeoutError = CancelledError
 
 def logError(err):
     log.err(err)
@@ -139,7 +141,7 @@
     return deferred
 
 def timeout(deferred):
-    deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
+    deferred.cancel()
 
 def passthru(arg):
     return arg
@@ -166,19 +168,25 @@
 
     For more information about Deferreds, see doc/howto/defer.html or
     U{http://www.twistedmatrix.com/documents/howto/defer}
+
+    When creating a Deferred, you may provide a canceller function,
+    which will be called by d.cancel() to let you do any cleanup necessary
+    if the user decides not to wait for the deferred to complete.
     """
     called = 0
     default = 0
     paused = 0
     timeoutCall = None
     _debugInfo = None
-
+    suppressAlreadyCalled = 0
+    
     # Keep this class attribute for now, for compatibility with code that
     # sets it directly.
     debug = False
 
-    def __init__(self):
+    def __init__(self, canceller=None):
         self.callbacks = []
+        self.canceller = canceller
         if self.debug:
             self._debugInfo = DebugInfo()
             self._debugInfo.creator = traceback.format_stack()[:-1]
@@ -278,12 +286,14 @@
 
     def pause(self):
         """Stop processing on a Deferred until L{unpause}() is called.
+        You probably don't ever have a reason to call this function.
         """
         self.paused = self.paused + 1
 
 
     def unpause(self):
         """Process all callbacks made since L{pause}() was called.
+        You probably don't ever have a reason to call this function.
         """
         self.paused = self.paused - 1
         if self.paused:
@@ -291,12 +301,53 @@
         if self.called:
             self._runCallbacks()
 
+    def cancel(self):
+        """Cancel this deferred.
+
+        If the deferred is waiting on another deferred, forward the
+        cancellation to the other deferred.
+
+        If the deferred has not yet been errback'd/callback'd, call
+        the canceller function provided to the constructor. If that
+        function does not do a callback/errback, or if no canceller
+        function was provided, errback with CancelledError.
+
+        Otherwise, raise AlreadyCalledError.
+        """
+        canceller=self.canceller
+        if not self.called:
+            if canceller:
+                canceller(self)
+            else:
+                # Eat the callback that will eventually be fired
+                # since there was no real canceller.
+                self.suppressAlreadyCalled = 1
+
+            if not self.called:
+                # The canceller didn't do an errback of its own
+                try:
+                    raise CancelledError
+                except:
+                    self.errback(failure.Failure())
+        elif isinstance(self.result, Deferred):
+            # Waiting for another deferred -- cancel it instead
+            self.result.cancel()
+        else:
+            # Called and not waiting for another deferred
+            raise AlreadyCalledError
+    
     def _continue(self, result):
         self.result = result
         self.unpause()
 
     def _startRunCallbacks(self, result):
+        # Canceller is no longer relevant
+        self.canceller=None
+        
         if self.called:
+            if self.suppressAlreadyCalled:
+                self.suppressAlreadyCalled = False
+                return
             if self.debug:
                 extra = "\n" + self._debugInfo._getDebugTracebacks()
                 raise AlreadyCalledError(extra)
@@ -363,7 +414,8 @@
 
         @param timeoutFunc: will receive the Deferred and *args, **kw as its
         arguments.  The default timeoutFunc will call the errback with a
-        L{TimeoutError}.
+        L{CancelledError}.
+
         """
         warnings.warn(
             "Deferred.setTimeout is deprecated.  Look for timeout "
@@ -699,12 +751,15 @@
 
     locked = 0
 
+    def _cancelAcquire(self, d):
+        self.waiting.remove(d)
+        
     def acquire(self):
         """Attempt to acquire the lock.
 
         @return: a Deferred which fires on lock acquisition.
         """
-        d = Deferred()
+        d = Deferred(canceller=self._cancelAcquire)
         if self.locked:
             self.waiting.append(d)
         else:
@@ -736,14 +791,17 @@
         _ConcurrencyPrimitive.__init__(self)
         self.tokens = tokens
         self.limit = tokens
-
+        
+    def _cancelAcquire(self, d):
+        self.waiting.remove(d)
+        
     def acquire(self):
         """Attempt to acquire the token.
 
         @return: a Deferred which fires on token acquisition.
         """
         assert self.tokens >= 0, "Internal inconsistency??  tokens should never be negative"
-        d = Deferred()
+        d = Deferred(canceller=self._cancelAcquire)
         if not self.tokens:
             self.waiting.append(d)
         else:
@@ -797,6 +855,9 @@
         self.size = size
         self.backlog = backlog
 
+    def _cancelGet(self, d):
+        self.waiting.remove(d)
+    
     def put(self, obj):
         """Add an object to this queue.
 
@@ -820,7 +881,7 @@
         if self.pending:
             return succeed(self.pending.pop(0))
         elif self.backlog is None or len(self.waiting) < self.backlog:
-            d = Deferred()
+            d = Deferred(canceller=self._cancelGet)
             self.waiting.append(d)
             return d
         else:
@@ -828,7 +889,7 @@
 
 
 __all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
-           "AlreadyCalledError", "TimeoutError", "gatherResults",
+           "AlreadyCalledError", "TimeoutError", "CancelledError", "gatherResults",
            "maybeDeferred", "waitForDeferred", "deferredGenerator",
            "DeferredLock", "DeferredSemaphore", "DeferredQueue",
           ]
Index: twisted/test/test_defer.py
===================================================================
--- twisted/test/test_defer.py	(revision 13766)
+++ twisted/test/test_defer.py	(working copy)
@@ -449,7 +449,88 @@
         else:
             self.fail("second callback failed to raise AlreadyCalledError")
 
+class FooError(Exception):
+    pass
 
+class DeferredCancellerTest(unittest.TestCase):
+    def setUp(self):
+        self.callback_results = None
+        self.errback_results = None
+        self.callback2_results = None
+        self.cancellerCalled = False
+        
+    def _callback(self, data):
+        self.callback_results = data
+        return args[0]
+
+    def _callback2(self, data):
+        self.callback2_results = data
+
+    def _errback(self, data):
+        self.errback_results = data
+
+    
+    def testNoCanceller(self):
+        # Deferred without a canceller errbacks defer.CancelledError
+        d=defer.Deferred()
+        d.addCallbacks(self._callback, self._errback)
+        d.cancel()
+        self.assertEquals(self.errback_results.type, defer.CancelledError)
+
+        # Test that further callbacks *are* swallowed
+        d.callback(None)
+
+        # But that a second is not
+        self.assertRaises(defer.AlreadyCalledError, d.callback, None)
+        
+    def testCanceller(self):
+        def cancel(d):
+            self.cancellerCalled=True
+            
+        d=defer.Deferred(canceller=cancel)
+        d.addCallbacks(self._callback, self._errback)
+        d.cancel()
+        self.assertEquals(self.cancellerCalled, True)
+        self.assertEquals(self.errback_results.type, defer.CancelledError)
+
+        # Test that further callbacks are *not* swallowed
+        self.assertRaises(defer.AlreadyCalledError, d.callback, None)
+        
+    def testCancellerWithCallback(self):
+        # If we explicitly callback from the canceller, don't callback CancelledError
+        def cancel(d):
+            self.cancellerCalled=True
+            d.errback(FooError())
+        d=defer.Deferred(canceller=cancel)
+        d.addCallbacks(self._callback, self._errback)
+        d.cancel()
+        self.assertEquals(self.cancellerCalled, True)
+        self.assertEquals(self.errback_results.type, FooError)
+
+    def testCancelAlreadyCalled(self):
+        def cancel(d):
+            self.cancellerCalled=True
+        d=defer.Deferred(canceller=cancel)
+        d.callback(None)
+        self.assertRaises(defer.AlreadyCalledError, d.cancel)
+        self.assertEquals(self.cancellerCalled, False)
+    
+    def testCancelNestedDeferred(self):
+        def innerCancel(d):
+            self.assertIdentical(d, innerDeferred)
+            self.cancellerCalled=True
+        def cancel(d):
+            self.assert_(False)
+            
+        innerDeferred=defer.Deferred(canceller=innerCancel)
+        d=defer.Deferred(canceller=cancel)
+        d.callback(None)
+        d.addCallback(lambda data: innerDeferred)
+        d.cancel()
+        d.addCallbacks(self._callback, self._errback)
+        self.assertEquals(self.cancellerCalled, True)
+        self.assertEquals(self.errback_results.type, defer.CancelledError)
+
 class LogTestCase(unittest.TestCase):
 
     def setUp(self):
@@ -539,6 +620,10 @@
         self.failUnless(lock.locked)
         self.assertEquals(self.counter, 3)
 
+        d = lock.acquire().addBoth(lambda x: setattr(self, 'result', x))
+        d.cancel()
+        self.assertEquals(self.result.type, defer.CancelledError)
+        
         lock.release()
         self.failIf(lock.locked)
 
@@ -567,12 +652,22 @@
             sem.acquire().addCallback(self._incr)
             self.assertEquals(self.counter, i)
 
+
+        success = []
+        def fail(r):
+            success.append(False)
+        def succeed(r):
+            success.append(True)
+        d = sem.acquire().addCallbacks(fail, succeed)
+        d.cancel()
+        self.assertEquals(success, [True])
+        
         sem.acquire().addCallback(self._incr)
         self.assertEquals(self.counter, N)
 
         sem.release()
         self.assertEquals(self.counter, N + 1)
-
+        
         for i in range(1, 1 + N):
             sem.release()
             self.assertEquals(self.counter, N + 1)
@@ -614,3 +709,14 @@
         queue = defer.DeferredQueue(backlog=0)
         self.assertRaises(defer.QueueUnderflow, queue.get)
 
+        queue = defer.DeferredQueue(size=0)
+
+        success = []
+        def fail(r):
+            success.append(False)
+        def succeed(r):
+            success.append(True)
+        d = queue.get().addCallbacks(fail, succeed)
+        d.cancel()
+        self.assertEquals(success, [True])
+        self.assertRaises(defer.QueueOverflow, queue.put, None)
