root / tags / releases / twisted-8.1.0 / twisted / internet / defer.py

Revision 23165, 37.5 kB (checked in by exarkun, 1 year ago)

Merge defer-fail-3145

Author: exarkun
Reviewer: therve
Fixes #3145

Clarify the documentation of twisted.internet.defer.fail and
twisted.internet.defer.Deferred.errback for the case where no
argument is supplied and there is no current exception state
(an exception will be raised). Also simplify the implementation
of fail slightly, removing some duplication it had with errback.

Line 
1 # -*- test-case-name: twisted.test.test_defer -*-
2 #
3 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6 """
7 Support for results that aren't immediately available.
8
9 Maintainer: U{Glyph Lefkowitz<mailto:glyph@twistedmatrix.com>}
10 """
11
12 from __future__ import nested_scopes, generators
13 import traceback
14 import warnings
15
16 # Twisted imports
17 from twisted.python import log, failure, lockfile
18 from twisted.python.util import unsignedID, mergeFunctionMetadata
19
20 class AlreadyCalledError(Exception):
21     pass
22
23 class TimeoutError(Exception):
24     pass
25
26 def logError(err):
27     log.err(err)
28     return err
29
30 def succeed(result):
31     """
32     Return a Deferred that has already had '.callback(result)' called.
33
34     This is useful when you're writing synchronous code to an
35     asynchronous interface: i.e., some code is calling you expecting a
36     Deferred result, but you don't actually need to do anything
37     asynchronous. Just return defer.succeed(theResult).
38
39     See L{fail} for a version of this function that uses a failing
40     Deferred rather than a successful one.
41
42     @param result: The result to give to the Deferred's 'callback'
43            method.
44
45     @rtype: L{Deferred}
46     """
47     d = Deferred()
48     d.callback(result)
49     return d
50
51
52 def fail(result=None):
53     """
54     Return a Deferred that has already had '.errback(result)' called.
55
56     See L{succeed}'s docstring for rationale.
57
58     @param result: The same argument that L{Deferred.errback} takes.
59
60     @raise NoCurrentExceptionError: If C{result} is C{None} but there is no
61         current exception state.
62
63     @rtype: L{Deferred}
64     """
65     d = Deferred()
66     d.errback(result)
67     return d
68
69
70 def execute(callable, *args, **kw):
71     """Create a deferred from a callable and arguments.
72
73     Call the given function with the given arguments.  Return a deferred which
74     has been fired with its callback as the result of that invocation or its
75     errback with a Failure for the exception thrown.
76     """
77     try:
78         result = callable(*args, **kw)
79     except:
80         return fail()
81     else:
82         return succeed(result)
83
84 def maybeDeferred(f, *args, **kw):
85     """Invoke a function that may or may not return a deferred.
86
87     Call the given function with the given arguments.  If the returned
88     object is a C{Deferred}, return it.  If the returned object is a C{Failure},
89     wrap it with C{fail} and return it.  Otherwise, wrap it in C{succeed} and
90     return it.  If an exception is raised, convert it to a C{Failure}, wrap it
91     in C{fail}, and then return it.
92
93     @type f: Any callable
94     @param f: The callable to invoke
95
96     @param args: The arguments to pass to C{f}
97     @param kw: The keyword arguments to pass to C{f}
98
99     @rtype: C{Deferred}
100     @return: The result of the function call, wrapped in a C{Deferred} if
101     necessary.
102     """
103     deferred = None
104
105     try:
106         result = f(*args, **kw)
107     except:
108         return fail(failure.Failure())
109     else:
110         if isinstance(result, Deferred):
111             return result
112         elif isinstance(result, failure.Failure):
113             return fail(result)
114         else:
115             return succeed(result)
116     return deferred
117
118 def timeout(deferred):
119     deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
120
121 def passthru(arg):
122     return arg
123
124 def setDebugging(on):
125     """Enable or disable Deferred debugging.
126
127     When debugging is on, the call stacks from creation and invocation are
128     recorded, and added to any AlreadyCalledErrors we raise.
129     """
130     Deferred.debug=bool(on)
131
132 def getDebugging():
133     """Determine whether Deferred debugging is enabled.
134     """
135     return Deferred.debug
136
137 class Deferred:
138     """This is a callback which will be put off until later.
139
140     Why do we want this? Well, in cases where a function in a threaded
141     program would block until it gets a result, for Twisted it should
142     not block. Instead, it should return a Deferred.
143
144     This can be implemented for protocols that run over the network by
145     writing an asynchronous protocol for twisted.internet. For methods
146     that come from outside packages that are not under our control, we use
147     threads (see for example L{twisted.enterprise.adbapi}).
148
149     For more information about Deferreds, see doc/howto/defer.html or
150     U{http://twistedmatrix.com/projects/core/documentation/howto/defer.html}
151     """
152     called = 0
153     paused = 0
154     timeoutCall = None
155     _debugInfo = None
156
157     # Are we currently running a user-installed callback?  Meant to prevent
158     # recursive running of callbacks when a reentrant call to add a callback is
159     # used.
160     _runningCallbacks = False
161
162     # Keep this class attribute for now, for compatibility with code that
163     # sets it directly.
164     debug = False
165
166     def __init__(self):
167         self.callbacks = []
168         if self.debug:
169             self._debugInfo = DebugInfo()
170             self._debugInfo.creator = traceback.format_stack()[:-1]
171
172     def addCallbacks(self, callback, errback=None,
173                      callbackArgs=None, callbackKeywords=None,
174                      errbackArgs=None, errbackKeywords=None):
175         """Add a pair of callbacks (success and error) to this Deferred.
176
177         These will be executed when the 'master' callback is run.
178         """
179         assert callable(callback)
180         assert errback == None or callable(errback)
181         cbs = ((callback, callbackArgs, callbackKeywords),
182                (errback or (passthru), errbackArgs, errbackKeywords))
183         self.callbacks.append(cbs)
184
185         if self.called:
186             self._runCallbacks()
187         return self
188
189     def addCallback(self, callback, *args, **kw):
190         """Convenience method for adding just a callback.
191
192         See L{addCallbacks}.
193         """
194         return self.addCallbacks(callback, callbackArgs=args,
195                                  callbackKeywords=kw)
196
197     def addErrback(self, errback, *args, **kw):
198         """Convenience method for adding just an errback.
199
200         See L{addCallbacks}.
201         """
202         return self.addCallbacks(passthru, errback,
203                                  errbackArgs=args,
204                                  errbackKeywords=kw)
205
206     def addBoth(self, callback, *args, **kw):
207         """Convenience method for adding a single callable as both a callback
208         and an errback.
209
210         See L{addCallbacks}.
211         """
212         return self.addCallbacks(callback, callback,
213                                  callbackArgs=args, errbackArgs=args,
214                                  callbackKeywords=kw, errbackKeywords=kw)
215
216     def chainDeferred(self, d):
217         """Chain another Deferred to this Deferred.
218
219         This method adds callbacks to this Deferred to call d's callback or
220         errback, as appropriate. It is merely a shorthand way of performing
221         the following::
222
223             self.addCallbacks(d.callback, d.errback)
224
225         When you chain a deferred d2 to another deferred d1 with
226         d1.chainDeferred(d2), you are making d2 participate in the callback
227         chain of d1. Thus any event that fires d1 will also fire d2.
228         However, the converse is B{not} true; if d2 is fired d1 will not be
229         affected.
230         """
231         return self.addCallbacks(d.callback, d.errback)
232
233     def callback(self, result):
234         """Run all success callbacks that have been added to this Deferred.
235
236         Each callback will have its result passed as the first
237         argument to the next; this way, the callbacks act as a
238         'processing chain'. Also, if the success-callback returns a Failure
239         or raises an Exception, processing will continue on the *error*-
240         callback chain.
241         """
242         assert not isinstance(result, Deferred)
243         self._startRunCallbacks(result)
244
245
246     def errback(self, fail=None):
247         """
248         Run all error callbacks that have been added to this Deferred.
249
250         Each callback will have its result passed as the first
251         argument to the next; this way, the callbacks act as a
252         'processing chain'. Also, if the error-callback returns a non-Failure
253         or doesn't raise an Exception, processing will continue on the
254         *success*-callback chain.
255
256         If the argument that's passed to me is not a failure.Failure instance,
257         it will be embedded in one. If no argument is passed, a failure.Failure
258         instance will be created based on the current traceback stack.
259
260         Passing a string as `fail' is deprecated, and will be punished with
261         a warning message.
262
263         @raise NoCurrentExceptionError: If C{fail} is C{None} but there is
264             no current exception state.
265         """
266         if not isinstance(fail, failure.Failure):
267             fail = failure.Failure(fail)
268
269         self._startRunCallbacks(fail)
270
271
272     def pause(self):
273         """Stop processing on a Deferred until L{unpause}() is called.
274         """
275         self.paused = self.paused + 1
276
277
278     def unpause(self):
279         """Process all callbacks made since L{pause}() was called.
280         """
281         self.paused = self.paused - 1
282         if self.paused:
283             return
284         if self.called:
285             self._runCallbacks()
286
287     def _continue(self, result):
288         self.result = result
289         self.unpause()
290
291     def _startRunCallbacks(self, result):
292         if self.called:
293             if self.debug:
294                 if self._debugInfo is None:
295                     self._debugInfo = DebugInfo()
296                 extra = "\n" + self._debugInfo._getDebugTracebacks()
297                 raise AlreadyCalledError(extra)
298             raise AlreadyCalledError
299         if self.debug:
300             if self._debugInfo is None:
301                 self._debugInfo = DebugInfo()
302             self._debugInfo.invoker = traceback.format_stack()[:-2]
303         self.called = True
304         self.result = result
305         if self.timeoutCall:
306             try:
307                 self.timeoutCall.cancel()
308             except:
309                 pass
310
311             del self.timeoutCall
312         self._runCallbacks()
313
314     def _runCallbacks(self):
315         if self._runningCallbacks:
316             # Don't recursively run callbacks
317             return
318         if not self.paused:
319             while self.callbacks:
320                 item = self.callbacks.pop(0)
321                 callback, args, kw = item[
322                     isinstance(self.result, failure.Failure)]
323                 args = args or ()
324                 kw = kw or {}
325                 try:
326                     self._runningCallbacks = True
327                     try:
328                         self.result = callback(self.result, *args, **kw)
329                     finally:
330                         self._runningCallbacks = False
331                     if isinstance(self.result, Deferred):
332                         # note: this will cause _runCallbacks to be called
333                         # recursively if self.result already has a result.
334                         # This shouldn't cause any problems, since there is no
335                         # relevant state in this stack frame at this point.
336                         # The recursive call will continue to process
337                         # self.callbacks until it is empty, then return here,
338                         # where there is no more work to be done, so this call
339                         # will return as well.
340                         self.pause()
341                         self.result.addBoth(self._continue)
342                         break
343                 except:
344                     self.result = failure.Failure()
345
346         if isinstance(self.result, failure.Failure):
347             self.result.cleanFailure()
348             if self._debugInfo is None:
349                 self._debugInfo = DebugInfo()
350             self._debugInfo.failResult = self.result
351         else:
352             if self._debugInfo is not None:
353                 self._debugInfo.failResult = None
354
355     def setTimeout(self, seconds, timeoutFunc=timeout, *args, **kw):
356         """Set a timeout function to be triggered if I am not called.
357
358         @param seconds: How long to wait (from now) before firing the
359         timeoutFunc.
360
361         @param timeoutFunc: will receive the Deferred and *args, **kw as its
362         arguments.  The default timeoutFunc will call the errback with a
363         L{TimeoutError}.
364         """
365         warnings.warn(
366             "Deferred.setTimeout is deprecated.  Look for timeout "
367             "support specific to the API you are using instead.",
368             DeprecationWarning, stacklevel=2)
369
370         if self.called:
371             return
372         assert not self.timeoutCall, "Don't call setTimeout twice on the same Deferred."
373
374         from twisted.internet import reactor
375         self.timeoutCall = reactor.callLater(
376             seconds,
377             lambda: self.called or timeoutFunc(self, *args, **kw))
378         return self.timeoutCall
379
380     def __str__(self):
381         cname = self.__class__.__name__
382         if hasattr(self, 'result'):
383             return "<%s at %s  current result: %r>" % (cname, hex(unsignedID(self)),
384                                                        self.result)
385         return "<%s at %s>" % (cname, hex(unsignedID(self)))
386     __repr__ = __str__
387
388 class DebugInfo:
389     """Deferred debug helper"""
390     failResult = None
391
392     def _getDebugTracebacks(self):
393         info = ''
394         if hasattr(self, "creator"):
395             info += " C: Deferred was created:\n C:"
396             info += "".join(self.creator).rstrip().replace("\n","\n C:")
397             info += "\n"
398         if hasattr(self, "invoker"):
399             info += " I: First Invoker was:\n I:"
400             info += "".join(self.invoker).rstrip().replace("\n","\n I:")
401             info += "\n"
402         return info
403
404     def __del__(self):
405         """Print tracebacks and die.
406
407         If the *last* (and I do mean *last*) callback leaves me in an error
408         state, print a traceback (if said errback is a Failure).
409         """
410         if self.failResult is not None:
411             log.msg("Unhandled error in Deferred:", isError=True)
412             debugInfo = self._getDebugTracebacks()
413             if debugInfo != '':
414                 log.msg("(debug: " + debugInfo + ")", isError=True)
415             log.err(self.failResult)
416
417 class FirstError(Exception):
418     """First error to occur in a DeferredList if fireOnOneErrback is set.
419
420     @ivar subFailure: the L{Failure} that occurred.
421     @ivar index: the index of the Deferred in the DeferredList where it
422     happened.
423     """
424     def __init__(self, failure, index):
425         self.subFailure = failure
426         self.index = index
427
428     def __repr__(self):
429         return 'FirstError(%r, %d)' % (self.subFailure, self.index)
430
431     def __str__(self):
432         return repr(self)
433
434     def __getitem__(self, index):
435         warnings.warn("FirstError.__getitem__ is deprecated.  "
436                       "Use attributes instead.",
437                       category=DeprecationWarning, stacklevel=2)
438         return [self.subFailure, self.index][index]
439
440     def __getslice__(self, start, stop):
441         warnings.warn("FirstError.__getslice__ is deprecated.  "
442                       "Use attributes instead.",
443                       category=DeprecationWarning, stacklevel=2)
444         return [self.subFailure, self.index][start:stop]
445
446     def __eq__(self, other):
447         if isinstance(other, tuple):
448             return tuple(self) == other
449         elif isinstance(other, FirstError):
450             return (self.subFailure == other.subFailure and
451                     self.index == other.index)
452         return False
453
454 class DeferredList(Deferred):
455     """I combine a group of deferreds into one callback.
456
457     I track a list of L{Deferred}s for their callbacks, and make a single
458     callback when they have all completed, a list of (success, result)
459     tuples, 'success' being a boolean.
460
461     Note that you can still use a L{Deferred} after putting it in a
462     DeferredList.  For example, you can suppress 'Unhandled error in Deferred'
463     messages by adding errbacks to the Deferreds *after* putting them in the
464     DeferredList, as a DeferredList won't swallow the errors.  (Although a more
465     convenient way to do this is simply to set the consumeErrors flag)
466     """
467
468     fireOnOneCallback = 0
469     fireOnOneErrback = 0
470
471     def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
472                  consumeErrors=0):
473         """Initialize a DeferredList.
474
475         @type deferredList:  C{list} of L{Deferred}s
476         @param deferredList: The list of deferreds to track.
477         @param fireOnOneCallback: (keyword param) a flag indicating that
478                              only one callback needs to be fired for me to call
479                              my callback
480         @param fireOnOneErrback: (keyword param) a flag indicating that
481                             only one errback needs to be fired for me to call
482                             my errback
483         @param consumeErrors: (keyword param) a flag indicating that any errors
484                             raised in the original deferreds should be
485                             consumed by this DeferredList.  This is useful to
486                             prevent spurious warnings being logged.
487         """
488         self.resultList = [None] * len(deferredList)
489         Deferred.__init__(self)
490         if len(deferredList) == 0 and not fireOnOneCallback:
491             self.callback(self.resultList)
492
493         # These flags need to be set *before* attaching callbacks to the
494         # deferreds, because the callbacks use these flags, and will run
495         # synchronously if any of the deferreds are already fired.
496         self.fireOnOneCallback = fireOnOneCallback
497         self.fireOnOneErrback = fireOnOneErrback
498         self.consumeErrors = consumeErrors
499         self.finishedCount = 0
500
501         index = 0
502         for deferred in deferredList:
503             deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
504                                   callbackArgs=(index,SUCCESS),
505                                   errbackArgs=(index,FAILURE))
506             index = index + 1
507
508     def _cbDeferred(self, result, index, succeeded):
509         """(internal) Callback for when one of my deferreds fires.
510         """
511         self.resultList[index] = (succeeded, result)
512
513         self.finishedCount += 1
514         if not self.called:
515             if succeeded == SUCCESS and self.fireOnOneCallback:
516                 self.callback((result, index))
517             elif succeeded == FAILURE and self.fireOnOneErrback:
518                 self.errback(failure.Failure(FirstError(result, index)))
519             elif self.finishedCount == len(self.resultList):
520                 self.callback(self.resultList)
521
522         if succeeded == FAILURE and self.consumeErrors:
523             result = None
524
525         return result
526
527
528 def _parseDListResult(l, fireOnOneErrback=0):
529     if __debug__:
530         for success, value in l:
531             assert success
532     return [x[1] for x in l]
533
534 def gatherResults(deferredList):
535     """Returns list with result of given Deferreds.
536
537     This builds on C{DeferredList} but is useful since you don't
538     need to parse the result for success/failure.
539
540     @type deferredList:  C{list} of L{Deferred}s
541     """
542     d = DeferredList(deferredList, fireOnOneErrback=1)
543     d.addCallback(_parseDListResult)
544     return d
545
546 # Constants for use with DeferredList
547
548 SUCCESS = True
549 FAILURE = False
550
551
552
553 ## deferredGenerator
554
555 class waitForDeferred:
556     """
557     See L{deferredGenerator}.
558     """
559
560     def __init__(self, d):
561         if not isinstance(d, Deferred):
562             raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,))
563         self.d = d
564
565
566     def getResult(self):
567         if isinstance(self.result, failure.Failure):
568             self.result.raiseException()
569         return self.result
570
571
572
573 def _deferGenerator(g, deferred):
574     """
575     See L{deferredGenerator}.
576     """
577     result = None
578
579     # This function is complicated by the need to prevent unbounded recursion
580     # arising from repeatedly yielding immediately ready deferreds.  This while
581     # loop and the waiting variable solve that by manually unfolding the
582     # recursion.
583
584     waiting = [True, # defgen is waiting for result?
585                None] # result
586
587     while 1:
588         try:
589             result = g.next()
590         except StopIteration:
591             deferred.callback(result)
592             return deferred
593         except:
594             deferred.errback()
595             return deferred
596
597         # Deferred.callback(Deferred) raises an error; we catch this case
598         # early here and give a nicer error message to the user in case
599         # they yield a Deferred.
600         if isinstance(result, Deferred):
601             return fail(TypeError("Yield waitForDeferred(d), not d!"))
602
603         if isinstance(result, waitForDeferred):
604             # a waitForDeferred was yielded, get the result.
605             # Pass result in so it don't get changed going around the loop
606             # This isn't a problem for waiting, as it's only reused if
607             # gotResult has already been executed.
608             def gotResult(r, result=result):
609                 result.result = r
610                 if waiting[0]:
611                     waiting[0] = False
612                     waiting[1] = r
613                 else:
614                     _deferGenerator(g, deferred)
615             result.d.addBoth(gotResult)
616             if waiting[0]:
617                 # Haven't called back yet, set flag so that we get reinvoked
618                 # and return from the loop
619                 waiting[0] = False
620                 return deferred
621             # Reset waiting to initial values for next loop
622             waiting[0] = True
623             waiting[1] = None
624
625             result = None
626
627
628
629 def deferredGenerator(f):
630     """
631     Maintainer: U{Christopher Armstrong<mailto:radix@twistedmatrix.com>}
632
633     deferredGenerator and waitForDeferred help you write Deferred-using code
634     that looks like a regular sequential function. If your code has a minimum
635     requirement of Python 2.5, consider the use of L{inlineCallbacks} instead,
636     which can accomplish the same thing in a more concise manner.
637
638     There are two important functions involved: waitForDeferred, and
639     deferredGenerator.  They are used together, like this::
640
641         def thingummy():
642             thing = waitForDeferred(makeSomeRequestResultingInDeferred())
643             yield thing
644             thing = thing.getResult()
645             print thing #the result! hoorj!
646         thingummy = deferredGenerator(thingummy)
647
648     waitForDeferred returns something that you should immediately yield; when
649     your generator is resumed, calling thing.getResult() will either give you
650     the result of the Deferred if it was a success, or raise an exception if it
651     was a failure.  Calling C{getResult} is B{absolutely mandatory}.  If you do
652     not call it, I{your program will not work}.
653
654     deferredGenerator takes one of these waitForDeferred-using generator
655     functions and converts it into a function that returns a Deferred. The
656     result of the Deferred will be the last value that your generator yielded
657     unless the last value is a waitForDeferred instance, in which case the
658     result will be C{None}.  If the function raises an unhandled exception, the
659     Deferred will errback instead.  Remember that 'return result' won't work;
660     use 'yield result; return' in place of that.
661
662     Note that not yielding anything from your generator will make the Deferred
663     result in None. Yielding a Deferred from your generator is also an error
664     condition; always yield waitForDeferred(d) instead.
665
666     The Deferred returned from your deferred generator may also errback if your
667     generator raised an exception.  For example::
668
669         def thingummy():
670             thing = waitForDeferred(makeSomeRequestResultingInDeferred())
671             yield thing
672             thing = thing.getResult()
673             if thing == 'I love Twisted':
674                 # will become the result of the Deferred
675                 yield 'TWISTED IS GREAT!'
676                 return
677             else:
678                 # will trigger an errback
679                 raise Exception('DESTROY ALL LIFE')
680         thingummy = deferredGenerator(thingummy)
681
682     Put succinctly, these functions connect deferred-using code with this 'fake
683     blocking' style in both directions: waitForDeferred converts from a
684     Deferred to the 'blocking' style, and deferredGenerator converts from the
685     'blocking' style to a Deferred.
686     """
687     def unwindGenerator(*args, **kwargs):
688         return _deferGenerator(f(*args, **kwargs), Deferred())
689     return mergeFunctionMetadata(f, unwindGenerator)
690
691
692 ## inlineCallbacks
693
694 # BaseException is only in Py 2.5.
695 try:
696     BaseException
697 except NameError:
698     BaseException=Exception
699
700 class _DefGen_Return(BaseException):
701     def __init__(self, value):
702         self.value = value
703
704 def returnValue(val):
705     """
706     Return val from a L{inlineCallbacks} generator.
707
708     Note: this is currently implemented by raising an exception
709     derived from BaseException.  You might want to change any
710     'except:' clauses to an 'except Exception:' clause so as not to
711     catch this exception.
712
713     Also: while this function currently will work when called from
714     within arbitrary functions called from within the generator, do
715     not rely upon this behavior.
716     """
717     raise _DefGen_Return(val)
718
719 def _inlineCallbacks(result, g, deferred):
720     """
721     See L{inlineCallbacks}.
722     """
723     # This function is complicated by the need to prevent unbounded recursion
724     # arising from repeatedly yielding immediately ready deferreds.  This while
725     # loop and the waiting variable solve that by manually unfolding the
726     # recursion.
727
728     waiting = [True, # waiting for result?
729                None] # result
730
731     while 1:
732         try:
733             # Send the last result back as the result of the yield expression.
734             if isinstance(result, failure.Failure):
735                 result = result.throwExceptionIntoGenerator(g)
736             else:
737                 result = g.send(result)
738         except StopIteration:
739             # fell off the end, or "return" statement
740             deferred.callback(None)
741             return deferred
742         except _DefGen_Return, e:
743             # returnValue call
744             deferred.callback(e.value)
745             return deferred
746         except:
747             deferred.errback()
748             return deferred
749
750         if isinstance(result, Deferred):
751             # a deferred was yielded, get the result.
752             def gotResult(r):
753                 if waiting[0]:
754                     waiting[0] = False
755                     waiting[1] = r
756                 else:
757                     _inlineCallbacks(r, g, deferred)
758
759             result.addBoth(gotResult)
760             if waiting[0]:
761                 # Haven't called back yet, set flag so that we get reinvoked
762                 # and return from the loop
763                 waiting[0] = False
764                 return deferred
765
766             result = waiting[1]
767             # Reset waiting to initial values for next loop.  gotResult uses
768             # waiting, but this isn't a problem because gotResult is only
769             # executed once, and if it hasn't been executed yet, the return
770             # branch above would have been taken.
771
772
773             waiting[0] = True
774             waiting[1] = None
775
776
777     return deferred
778
779 def inlineCallbacks(f):
780     """
781     Maintainer: U{Christopher Armstrong<mailto:radix@twistedmatrix.com>}
782
783     WARNING: this function will not work in Python 2.4 and earlier!
784
785     inlineCallbacks helps you write Deferred-using code that looks like a
786     regular sequential function. This function uses features of Python 2.5
787     generators.  If you need to be compatible with Python 2.4 or before, use
788     the L{deferredGenerator} function instead, which accomplishes the same
789     thing, but with somewhat more boilerplate.  For example::
790
791         def thingummy():
792             thing = yield makeSomeRequestResultingInDeferred()
793             print thing #the result! hoorj!
794         thingummy = inlineCallbacks(thingummy)
795
796     When you call anything that results in a Deferred, you can simply yield it;
797     your generator will automatically be resumed when the Deferred's result is
798     available. The generator will be sent the result of the Deferred with the
799     'send' method on generators, or if the result was a failure, 'throw'.
800
801     Your inlineCallbacks-enabled generator will return a Deferred object, which
802     will result in the return value of the generator (or will fail with a
803     failure object if your generator raises an unhandled exception). Note that
804     you can't use 'return result' to return a value; use 'returnValue(result)'
805     instead. Falling off the end of the generator, or simply using 'return'
806     will cause the Deferred to have a result of None.
807
808     The Deferred returned from your deferred generator may errback if your
809     generator raised an exception::
810
811         def thingummy():
812             thing = yield makeSomeRequestResultingInDeferred()
813             if thing == 'I love Twisted':
814                 # will become the result of the Deferred
815                 returnValue('TWISTED IS GREAT!')
816             else:
817                 # will trigger an errback
818                 raise Exception('DESTROY ALL LIFE')
819         thingummy = inlineCallbacks(thingummy)
820     """
821     def unwindGenerator(*args, **kwargs):
822         return _inlineCallbacks(None, f(*args, **kwargs), Deferred())
823     return mergeFunctionMetadata(f, unwindGenerator)
824
825
826 ## DeferredLock/DeferredQueue
827
828 class _ConcurrencyPrimitive(object):
829     def __init__(self):
830         self.waiting = []
831
832     def _releaseAndReturn(self, r):
833         self.release()
834         return r
835
836     def run(*args, **kwargs):
837         """Acquire, run, release.
838
839         This function takes a callable as its first argument and any
840         number of other positional and keyword arguments.  When the
841         lock or semaphore is acquired, the callable will be invoked
842         with those arguments.
843
844         The callable may return a Deferred; if it does, the lock or
845         semaphore won't be released until that Deferred fires.
846
847         @return: Deferred of function result.
848         """
849         if len(args) < 2:
850             if not args:
851                 raise TypeError("run() takes at least 2 arguments, none given.")
852             raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
853                 args[0].__class__.__name__,))
854         self, f = args[:2]
855         args = args[2:]
856
857         def execute(ignoredResult):
858             d = maybeDeferred(f, *args, **kwargs)
859             d.addBoth(self._releaseAndReturn)
860             return d
861
862         d = self.acquire()
863         d.addCallback(execute)
864         return d
865
866
867 class DeferredLock(_ConcurrencyPrimitive):
868     """
869     A lock for event driven systems.
870
871     @ivar locked: True when this Lock has been acquired, false at all
872     other times.  Do not change this value, but it is useful to
873     examine for the equivalent of a \"non-blocking\" acquisition.
874     """
875
876     locked = 0
877
878     def acquire(self):
879         """Attempt to acquire the lock.
880
881         @return: a Deferred which fires on lock acquisition.
882         """
883         d = Deferred()
884         if self.locked:
885             self.waiting.append(d)
886         else:
887             self.locked = 1
888             d.callback(self)
889         return d
890
891     def release(self):
892         """Release the lock.
893
894         Should be called by whomever did the acquire() when the shared
895         resource is free.
896         """
897         assert self.locked, "Tried to release an unlocked lock"
898         self.locked = 0
899         if self.waiting:
900             # someone is waiting to acquire lock
901             self.locked = 1
902             d = self.waiting.pop(0)
903             d.callback(self)
904
905 class DeferredSemaphore(_ConcurrencyPrimitive):
906     """
907     A semaphore for event driven systems.
908     """
909
910     def __init__(self, tokens):
911         _ConcurrencyPrimitive.__init__(self)
912         self.tokens = tokens
913         self.limit = tokens
914
915     def acquire(self):
916         """Attempt to acquire the token.
917
918         @return: a Deferred which fires on token acquisition.
919         """
920         assert self.tokens >= 0, "Internal inconsistency??  tokens should never be negative"
921         d = Deferred()
922         if not self.tokens:
923             self.waiting.append(d)
924         else:
925             self.tokens = self.tokens - 1
926             d.callback(self)
927         return d
928
929     def release(self):
930         """Release the token.
931
932         Should be called by whoever did the acquire() when the shared
933         resource is free.
934         """
935         assert self.tokens < self.limit, "Someone released me too many times: too many tokens!"
936         self.tokens = self.tokens + 1
937         if self.waiting:
938             # someone is waiting to acquire token
939             self.tokens = self.tokens - 1
940             d = self.waiting.pop(0)
941             d.callback(self)
942
943 class QueueOverflow(Exception):
944     pass
945
946 class QueueUnderflow(Exception):
947     pass
948
949
950 class DeferredQueue(object):
951     """
952     An event driven queue.
953
954     Objects may be added as usual to this queue.  When an attempt is
955     made to retrieve an object when the queue is empty, a Deferred is
956     returned which will fire when an object becomes available.
957
958     @ivar size: The maximum number of objects to allow into the queue
959     at a time.  When an attempt to add a new object would exceed this
960     limit, QueueOverflow is raised synchronously.  None for no limit.
961
962     @ivar backlog: The maximum number of Deferred gets to allow at
963     one time.  When an attempt is made to get an object which would
964     exceed this limit, QueueUnderflow is raised synchronously.  None
965     for no limit.
966     """
967
968     def __init__(self, size=None, backlog=None):
969         self.waiting = []
970         self.pending = []
971         self.size = size
972         self.backlog = backlog
973
974     def put(self, obj):
975         """Add an object to this queue.
976
977         @raise QueueOverflow: Too many objects are in this queue.
978         """
979         if self.waiting:
980             self.waiting.pop(0).callback(obj)
981         elif self.size is None or len(self.pending) < self.size:
982             self.pending.append(obj)
983         else:
984             raise QueueOverflow()
985
986     def get(self):
987         """Attempt to retrieve and remove an object from the queue.
988
989         @return: a Deferred which fires with the next object available in the queue.
990
991         @raise QueueUnderflow: Too many (more than C{backlog})
992         Deferreds are already waiting for an object from this queue.
993         """
994         if self.pending:
995             return succeed(self.pending.pop(0))
996         elif self.backlog is None or len(self.waiting) < self.backlog:
997             d = Deferred()
998             self.waiting.append(d)
999             return d
1000         else:
1001             raise QueueUnderflow()
1002
1003
1004 class AlreadyTryingToLockError(Exception):
1005     """
1006     Raised when DeferredFilesystemLock.deferUntilLocked is called twice on a
1007     single DeferredFilesystemLock.
1008     """
1009
1010
1011 class DeferredFilesystemLock(lockfile.FilesystemLock):
1012     """
1013     A FilesystemLock that allows for a deferred to be fired when the lock is
1014     acquired.
1015
1016     @ivar _scheduler: The object in charge of scheduling retries. In this
1017         implementation this is parameterized for testing.
1018
1019     @ivar _interval: The retry interval for an L{IReactorTime} based scheduler.
1020
1021     @ivar _tryLockCall: A L{DelayedCall} based on _interval that will managex
1022         the next retry for aquiring the lock.
1023
1024     @ivar _timeoutCall: A L{DelayedCall} based on deferUntilLocked's timeout
1025         argument.  This is in charge of timing out our attempt to acquire the
1026         lock.
1027     """
1028     _interval = 1
1029     _tryLockCall = None
1030     _timeoutCall = None
1031
1032     def __init__(self, name, scheduler=None):
1033         """
1034         @param name: The name of the lock to acquire
1035         @param scheduler: An object which provides L{IReactorTime}
1036         """
1037         lockfile.FilesystemLock.__init__(self, name)
1038
1039         if scheduler is None:
1040             from twisted.internet import reactor
1041             scheduler = reactor
1042
1043         self._scheduler = scheduler
1044
1045     def deferUntilLocked(self, timeout=None):
1046         """
1047         Wait until we acquire this lock.  This method is not safe for
1048         concurrent use.
1049
1050         @type timeout: C{float} or C{int}
1051         @param timeout: the number of seconds after which to time out if the
1052             lock has not been acquired.
1053
1054         @return: a deferred which will callback when the lock is acquired, or
1055             errback with a L{TimeoutError} after timing out or an
1056             L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already
1057             been called and not successfully locked the file.
1058         """
1059         if self._tryLockCall is not None:
1060             return fail(
1061                 AlreadyTryingToLockError(
1062                     "deferUntilLocked isn't safe for concurrent use."))
1063
1064         d = Deferred()
1065
1066         def _cancelLock():
1067             self._tryLockCall.cancel()
1068             self._tryLockCall = None
1069             self._timeoutCall = None
1070
1071             if self.lock():
1072                 d.callback(None)
1073             else:
1074                 d.errback(failure.Failure(
1075                         TimeoutError("Timed out aquiring lock: %s after %fs" % (
1076                                 self.name,
1077                                 timeout))))
1078
1079         def _tryLock():
1080             if self.lock():
1081                 if self._timeoutCall is not None:
1082                     self._timeoutCall.cancel()
1083                     self._timeoutCall = None
1084
1085                 self._tryLockCall = None
1086
1087                 d.callback(None)
1088             else:
1089                 if timeout is not None and self._timeoutCall is None:
1090                     self._timeoutCall = self._scheduler.callLater(
1091                         timeout, _cancelLock)
1092
1093                 self._tryLockCall = self._scheduler.callLater(
1094                     self._interval, _tryLock)
1095
1096         _tryLock()
1097
1098         return d
1099
1100
1101 __all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
1102            "AlreadyCalledError", "TimeoutError", "gatherResults",
1103            "maybeDeferred",
1104            "waitForDeferred", "deferredGenerator", "inlineCallbacks",
1105            "DeferredLock", "DeferredSemaphore", "DeferredQueue",
1106            "DeferredFilesystemLock", "AlreadyTryingToLockError",
1107           ]
Note: See TracBrowser for help on using the browser.