root/tags/releases/twisted-8.1.0/twisted/internet/defer.py

Revision 23165, 37.5 KB (checked in by exarkun, 2 years ago)

Merge defer-fail-3145

Author: exarkun
Reviewer: therve
Fixes #3145

Clarify the documentation of twisted.internet.defer.fail and
twisted.internet.defer.Deferred.errback for the case where no
argument is supplied and there is no current exception state
(an exception will be raised). Also simplify the implementation
of fail slightly, removing some duplication it had with errback.

Line 
1# -*- test-case-name: twisted.test.test_defer -*-
2#
3# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7Support for results that aren't immediately available.
8
9Maintainer: U{Glyph Lefkowitz<mailto:glyph@twistedmatrix.com>}
10"""
11
12from __future__ import nested_scopes, generators
13import traceback
14import warnings
15
16# Twisted imports
17from twisted.python import log, failure, lockfile
18from twisted.python.util import unsignedID, mergeFunctionMetadata
19
20class AlreadyCalledError(Exception):
21    pass
22
23class TimeoutError(Exception):
24    pass
25
26def logError(err):
27    log.err(err)
28    return err
29
30def succeed(result):
31    """
32    Return a Deferred that has already had '.callback(result)' called.
33
34    This is useful when you're writing synchronous code to an
35    asynchronous interface: i.e., some code is calling you expecting a
36    Deferred result, but you don't actually need to do anything
37    asynchronous. Just return defer.succeed(theResult).
38
39    See L{fail} for a version of this function that uses a failing
40    Deferred rather than a successful one.
41
42    @param result: The result to give to the Deferred's 'callback'
43           method.
44
45    @rtype: L{Deferred}
46    """
47    d = Deferred()
48    d.callback(result)
49    return d
50
51
52def fail(result=None):
53    """
54    Return a Deferred that has already had '.errback(result)' called.
55
56    See L{succeed}'s docstring for rationale.
57
58    @param result: The same argument that L{Deferred.errback} takes.
59
60    @raise NoCurrentExceptionError: If C{result} is C{None} but there is no
61        current exception state.
62
63    @rtype: L{Deferred}
64    """
65    d = Deferred()
66    d.errback(result)
67    return d
68
69
70def execute(callable, *args, **kw):
71    """Create a deferred from a callable and arguments.
72
73    Call the given function with the given arguments.  Return a deferred which
74    has been fired with its callback as the result of that invocation or its
75    errback with a Failure for the exception thrown.
76    """
77    try:
78        result = callable(*args, **kw)
79    except:
80        return fail()
81    else:
82        return succeed(result)
83
84def maybeDeferred(f, *args, **kw):
85    """Invoke a function that may or may not return a deferred.
86
87    Call the given function with the given arguments.  If the returned
88    object is a C{Deferred}, return it.  If the returned object is a C{Failure},
89    wrap it with C{fail} and return it.  Otherwise, wrap it in C{succeed} and
90    return it.  If an exception is raised, convert it to a C{Failure}, wrap it
91    in C{fail}, and then return it.
92
93    @type f: Any callable
94    @param f: The callable to invoke
95
96    @param args: The arguments to pass to C{f}
97    @param kw: The keyword arguments to pass to C{f}
98
99    @rtype: C{Deferred}
100    @return: The result of the function call, wrapped in a C{Deferred} if
101    necessary.
102    """
103    deferred = None
104
105    try:
106        result = f(*args, **kw)
107    except:
108        return fail(failure.Failure())
109    else:
110        if isinstance(result, Deferred):
111            return result
112        elif isinstance(result, failure.Failure):
113            return fail(result)
114        else:
115            return succeed(result)
116    return deferred
117
118def timeout(deferred):
119    deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
120
121def passthru(arg):
122    return arg
123
124def setDebugging(on):
125    """Enable or disable Deferred debugging.
126
127    When debugging is on, the call stacks from creation and invocation are
128    recorded, and added to any AlreadyCalledErrors we raise.
129    """
130    Deferred.debug=bool(on)
131
132def getDebugging():
133    """Determine whether Deferred debugging is enabled.
134    """
135    return Deferred.debug
136
137class Deferred:
138    """This is a callback which will be put off until later.
139
140    Why do we want this? Well, in cases where a function in a threaded
141    program would block until it gets a result, for Twisted it should
142    not block. Instead, it should return a Deferred.
143
144    This can be implemented for protocols that run over the network by
145    writing an asynchronous protocol for twisted.internet. For methods
146    that come from outside packages that are not under our control, we use
147    threads (see for example L{twisted.enterprise.adbapi}).
148
149    For more information about Deferreds, see doc/howto/defer.html or
150    U{http://twistedmatrix.com/projects/core/documentation/howto/defer.html}
151    """
152    called = 0
153    paused = 0
154    timeoutCall = None
155    _debugInfo = None
156
157    # Are we currently running a user-installed callback?  Meant to prevent
158    # recursive running of callbacks when a reentrant call to add a callback is
159    # used.
160    _runningCallbacks = False
161
162    # Keep this class attribute for now, for compatibility with code that
163    # sets it directly.
164    debug = False
165
166    def __init__(self):
167        self.callbacks = []
168        if self.debug:
169            self._debugInfo = DebugInfo()
170            self._debugInfo.creator = traceback.format_stack()[:-1]
171
172    def addCallbacks(self, callback, errback=None,
173                     callbackArgs=None, callbackKeywords=None,
174                     errbackArgs=None, errbackKeywords=None):
175        """Add a pair of callbacks (success and error) to this Deferred.
176
177        These will be executed when the 'master' callback is run.
178        """
179        assert callable(callback)
180        assert errback == None or callable(errback)
181        cbs = ((callback, callbackArgs, callbackKeywords),
182               (errback or (passthru), errbackArgs, errbackKeywords))
183        self.callbacks.append(cbs)
184
185        if self.called:
186            self._runCallbacks()
187        return self
188
189    def addCallback(self, callback, *args, **kw):
190        """Convenience method for adding just a callback.
191
192        See L{addCallbacks}.
193        """
194        return self.addCallbacks(callback, callbackArgs=args,
195                                 callbackKeywords=kw)
196
197    def addErrback(self, errback, *args, **kw):
198        """Convenience method for adding just an errback.
199
200        See L{addCallbacks}.
201        """
202        return self.addCallbacks(passthru, errback,
203                                 errbackArgs=args,
204                                 errbackKeywords=kw)
205
206    def addBoth(self, callback, *args, **kw):
207        """Convenience method for adding a single callable as both a callback
208        and an errback.
209
210        See L{addCallbacks}.
211        """
212        return self.addCallbacks(callback, callback,
213                                 callbackArgs=args, errbackArgs=args,
214                                 callbackKeywords=kw, errbackKeywords=kw)
215
216    def chainDeferred(self, d):
217        """Chain another Deferred to this Deferred.
218
219        This method adds callbacks to this Deferred to call d's callback or
220        errback, as appropriate. It is merely a shorthand way of performing
221        the following::
222
223            self.addCallbacks(d.callback, d.errback)
224
225        When you chain a deferred d2 to another deferred d1 with
226        d1.chainDeferred(d2), you are making d2 participate in the callback
227        chain of d1. Thus any event that fires d1 will also fire d2.
228        However, the converse is B{not} true; if d2 is fired d1 will not be
229        affected.
230        """
231        return self.addCallbacks(d.callback, d.errback)
232
233    def callback(self, result):
234        """Run all success callbacks that have been added to this Deferred.
235
236        Each callback will have its result passed as the first
237        argument to the next; this way, the callbacks act as a
238        'processing chain'. Also, if the success-callback returns a Failure
239        or raises an Exception, processing will continue on the *error*-
240        callback chain.
241        """
242        assert not isinstance(result, Deferred)
243        self._startRunCallbacks(result)
244
245
246    def errback(self, fail=None):
247        """
248        Run all error callbacks that have been added to this Deferred.
249
250        Each callback will have its result passed as the first
251        argument to the next; this way, the callbacks act as a
252        'processing chain'. Also, if the error-callback returns a non-Failure
253        or doesn't raise an Exception, processing will continue on the
254        *success*-callback chain.
255
256        If the argument that's passed to me is not a failure.Failure instance,
257        it will be embedded in one. If no argument is passed, a failure.Failure
258        instance will be created based on the current traceback stack.
259
260        Passing a string as `fail' is deprecated, and will be punished with
261        a warning message.
262
263        @raise NoCurrentExceptionError: If C{fail} is C{None} but there is
264            no current exception state.
265        """
266        if not isinstance(fail, failure.Failure):
267            fail = failure.Failure(fail)
268
269        self._startRunCallbacks(fail)
270
271
272    def pause(self):
273        """Stop processing on a Deferred until L{unpause}() is called.
274        """
275        self.paused = self.paused + 1
276
277
278    def unpause(self):
279        """Process all callbacks made since L{pause}() was called.
280        """
281        self.paused = self.paused - 1
282        if self.paused:
283            return
284        if self.called:
285            self._runCallbacks()
286
287    def _continue(self, result):
288        self.result = result
289        self.unpause()
290
291    def _startRunCallbacks(self, result):
292        if self.called:
293            if self.debug:
294                if self._debugInfo is None:
295                    self._debugInfo = DebugInfo()
296                extra = "\n" + self._debugInfo._getDebugTracebacks()
297                raise AlreadyCalledError(extra)
298            raise AlreadyCalledError
299        if self.debug:
300            if self._debugInfo is None:
301                self._debugInfo = DebugInfo()
302            self._debugInfo.invoker = traceback.format_stack()[:-2]
303        self.called = True
304        self.result = result
305        if self.timeoutCall:
306            try:
307                self.timeoutCall.cancel()
308            except:
309                pass
310
311            del self.timeoutCall
312        self._runCallbacks()
313
314    def _runCallbacks(self):
315        if self._runningCallbacks:
316            # Don't recursively run callbacks
317            return
318        if not self.paused:
319            while self.callbacks:
320                item = self.callbacks.pop(0)
321                callback, args, kw = item[
322                    isinstance(self.result, failure.Failure)]
323                args = args or ()
324                kw = kw or {}
325                try:
326                    self._runningCallbacks = True
327                    try:
328                        self.result = callback(self.result, *args, **kw)
329                    finally:
330                        self._runningCallbacks = False
331                    if isinstance(self.result, Deferred):
332                        # note: this will cause _runCallbacks to be called
333                        # recursively if self.result already has a result.
334                        # This shouldn't cause any problems, since there is no
335                        # relevant state in this stack frame at this point.
336                        # The recursive call will continue to process
337                        # self.callbacks until it is empty, then return here,
338                        # where there is no more work to be done, so this call
339                        # will return as well.
340                        self.pause()
341                        self.result.addBoth(self._continue)
342                        break
343                except:
344                    self.result = failure.Failure()
345
346        if isinstance(self.result, failure.Failure):
347            self.result.cleanFailure()
348            if self._debugInfo is None:
349                self._debugInfo = DebugInfo()
350            self._debugInfo.failResult = self.result
351        else:
352            if self._debugInfo is not None:
353                self._debugInfo.failResult = None
354
355    def setTimeout(self, seconds, timeoutFunc=timeout, *args, **kw):
356        """Set a timeout function to be triggered if I am not called.
357
358        @param seconds: How long to wait (from now) before firing the
359        timeoutFunc.
360
361        @param timeoutFunc: will receive the Deferred and *args, **kw as its
362        arguments.  The default timeoutFunc will call the errback with a
363        L{TimeoutError}.
364        """
365        warnings.warn(
366            "Deferred.setTimeout is deprecated.  Look for timeout "
367            "support specific to the API you are using instead.",
368            DeprecationWarning, stacklevel=2)
369
370        if self.called:
371            return
372        assert not self.timeoutCall, "Don't call setTimeout twice on the same Deferred."
373
374        from twisted.internet import reactor
375        self.timeoutCall = reactor.callLater(
376            seconds,
377            lambda: self.called or timeoutFunc(self, *args, **kw))
378        return self.timeoutCall
379
380    def __str__(self):
381        cname = self.__class__.__name__
382        if hasattr(self, 'result'):
383            return "<%s at %s  current result: %r>" % (cname, hex(unsignedID(self)),
384                                                       self.result)
385        return "<%s at %s>" % (cname, hex(unsignedID(self)))
386    __repr__ = __str__
387
388class DebugInfo:
389    """Deferred debug helper"""
390    failResult = None
391
392    def _getDebugTracebacks(self):
393        info = ''
394        if hasattr(self, "creator"):
395            info += " C: Deferred was created:\n C:"
396            info += "".join(self.creator).rstrip().replace("\n","\n C:")
397            info += "\n"
398        if hasattr(self, "invoker"):
399            info += " I: First Invoker was:\n I:"
400            info += "".join(self.invoker).rstrip().replace("\n","\n I:")
401            info += "\n"
402        return info
403
404    def __del__(self):
405        """Print tracebacks and die.
406
407        If the *last* (and I do mean *last*) callback leaves me in an error
408        state, print a traceback (if said errback is a Failure).
409        """
410        if self.failResult is not None:
411            log.msg("Unhandled error in Deferred:", isError=True)
412            debugInfo = self._getDebugTracebacks()
413            if debugInfo != '':
414                log.msg("(debug: " + debugInfo + ")", isError=True)
415            log.err(self.failResult)
416
417class FirstError(Exception):
418    """First error to occur in a DeferredList if fireOnOneErrback is set.
419
420    @ivar subFailure: the L{Failure} that occurred.
421    @ivar index: the index of the Deferred in the DeferredList where it
422    happened.
423    """
424    def __init__(self, failure, index):
425        self.subFailure = failure
426        self.index = index
427
428    def __repr__(self):
429        return 'FirstError(%r, %d)' % (self.subFailure, self.index)
430
431    def __str__(self):
432        return repr(self)
433
434    def __getitem__(self, index):
435        warnings.warn("FirstError.__getitem__ is deprecated.  "
436                      "Use attributes instead.",
437                      category=DeprecationWarning, stacklevel=2)
438        return [self.subFailure, self.index][index]
439
440    def __getslice__(self, start, stop):
441        warnings.warn("FirstError.__getslice__ is deprecated.  "
442                      "Use attributes instead.",
443                      category=DeprecationWarning, stacklevel=2)
444        return [self.subFailure, self.index][start:stop]
445
446    def __eq__(self, other):
447        if isinstance(other, tuple):
448            return tuple(self) == other
449        elif isinstance(other, FirstError):
450            return (self.subFailure == other.subFailure and
451                    self.index == other.index)
452        return False
453
454class DeferredList(Deferred):
455    """I combine a group of deferreds into one callback.
456
457    I track a list of L{Deferred}s for their callbacks, and make a single
458    callback when they have all completed, a list of (success, result)
459    tuples, 'success' being a boolean.
460
461    Note that you can still use a L{Deferred} after putting it in a
462    DeferredList.  For example, you can suppress 'Unhandled error in Deferred'
463    messages by adding errbacks to the Deferreds *after* putting them in the
464    DeferredList, as a DeferredList won't swallow the errors.  (Although a more
465    convenient way to do this is simply to set the consumeErrors flag)
466    """
467
468    fireOnOneCallback = 0
469    fireOnOneErrback = 0
470
471    def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
472                 consumeErrors=0):
473        """Initialize a DeferredList.
474
475        @type deferredList:  C{list} of L{Deferred}s
476        @param deferredList: The list of deferreds to track.
477        @param fireOnOneCallback: (keyword param) a flag indicating that
478                             only one callback needs to be fired for me to call
479                             my callback
480        @param fireOnOneErrback: (keyword param) a flag indicating that
481                            only one errback needs to be fired for me to call
482                            my errback
483        @param consumeErrors: (keyword param) a flag indicating that any errors
484                            raised in the original deferreds should be
485                            consumed by this DeferredList.  This is useful to
486                            prevent spurious warnings being logged.
487        """
488        self.resultList = [None] * len(deferredList)
489        Deferred.__init__(self)
490        if len(deferredList) == 0 and not fireOnOneCallback:
491            self.callback(self.resultList)
492
493        # These flags need to be set *before* attaching callbacks to the
494        # deferreds, because the callbacks use these flags, and will run
495        # synchronously if any of the deferreds are already fired.
496        self.fireOnOneCallback = fireOnOneCallback
497        self.fireOnOneErrback = fireOnOneErrback
498        self.consumeErrors = consumeErrors
499        self.finishedCount = 0
500
501        index = 0
502        for deferred in deferredList:
503            deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
504                                  callbackArgs=(index,SUCCESS),
505                                  errbackArgs=(index,FAILURE))
506            index = index + 1
507
508    def _cbDeferred(self, result, index, succeeded):
509        """(internal) Callback for when one of my deferreds fires.
510        """
511        self.resultList[index] = (succeeded, result)
512
513        self.finishedCount += 1
514        if not self.called:
515            if succeeded == SUCCESS and self.fireOnOneCallback:
516                self.callback((result, index))
517            elif succeeded == FAILURE and self.fireOnOneErrback:
518                self.errback(failure.Failure(FirstError(result, index)))
519            elif self.finishedCount == len(self.resultList):
520                self.callback(self.resultList)
521
522        if succeeded == FAILURE and self.consumeErrors:
523            result = None
524
525        return result
526
527
528def _parseDListResult(l, fireOnOneErrback=0):
529    if __debug__:
530        for success, value in l:
531            assert success
532    return [x[1] for x in l]
533
534def gatherResults(deferredList):
535    """Returns list with result of given Deferreds.
536
537    This builds on C{DeferredList} but is useful since you don't
538    need to parse the result for success/failure.
539
540    @type deferredList:  C{list} of L{Deferred}s
541    """
542    d = DeferredList(deferredList, fireOnOneErrback=1)
543    d.addCallback(_parseDListResult)
544    return d
545
546# Constants for use with DeferredList
547
548SUCCESS = True
549FAILURE = False
550
551
552
553## deferredGenerator
554
555class waitForDeferred:
556    """
557    See L{deferredGenerator}.
558    """
559
560    def __init__(self, d):
561        if not isinstance(d, Deferred):
562            raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,))
563        self.d = d
564
565
566    def getResult(self):
567        if isinstance(self.result, failure.Failure):
568            self.result.raiseException()
569        return self.result
570
571
572
573def _deferGenerator(g, deferred):
574    """
575    See L{deferredGenerator}.
576    """
577    result = None
578
579    # This function is complicated by the need to prevent unbounded recursion
580    # arising from repeatedly yielding immediately ready deferreds.  This while
581    # loop and the waiting variable solve that by manually unfolding the
582    # recursion.
583
584    waiting = [True, # defgen is waiting for result?
585               None] # result
586
587    while 1:
588        try:
589            result = g.next()
590        except StopIteration:
591            deferred.callback(result)
592            return deferred
593        except:
594            deferred.errback()
595            return deferred
596
597        # Deferred.callback(Deferred) raises an error; we catch this case
598        # early here and give a nicer error message to the user in case
599        # they yield a Deferred.
600        if isinstance(result, Deferred):
601            return fail(TypeError("Yield waitForDeferred(d), not d!"))
602
603        if isinstance(result, waitForDeferred):
604            # a waitForDeferred was yielded, get the result.
605            # Pass result in so it don't get changed going around the loop
606            # This isn't a problem for waiting, as it's only reused if
607            # gotResult has already been executed.
608            def gotResult(r, result=result):
609                result.result = r
610                if waiting[0]:
611                    waiting[0] = False
612                    waiting[1] = r
613                else:
614                    _deferGenerator(g, deferred)
615            result.d.addBoth(gotResult)
616            if waiting[0]:
617                # Haven't called back yet, set flag so that we get reinvoked
618                # and return from the loop
619                waiting[0] = False
620                return deferred
621            # Reset waiting to initial values for next loop
622            waiting[0] = True
623            waiting[1] = None
624
625            result = None
626
627
628
629def deferredGenerator(f):
630    """
631    Maintainer: U{Christopher Armstrong<mailto:radix@twistedmatrix.com>}
632
633    deferredGenerator and waitForDeferred help you write Deferred-using code
634    that looks like a regular sequential function. If your code has a minimum
635    requirement of Python 2.5, consider the use of L{inlineCallbacks} instead,
636    which can accomplish the same thing in a more concise manner.
637
638    There are two important functions involved: waitForDeferred, and
639    deferredGenerator.  They are used together, like this::
640
641        def thingummy():
642            thing = waitForDeferred(makeSomeRequestResultingInDeferred())
643            yield thing
644            thing = thing.getResult()
645            print thing #the result! hoorj!
646        thingummy = deferredGenerator(thingummy)
647
648    waitForDeferred returns something that you should immediately yield; when
649    your generator is resumed, calling thing.getResult() will either give you
650    the result of the Deferred if it was a success, or raise an exception if it
651    was a failure.  Calling C{getResult} is B{absolutely mandatory}.  If you do
652    not call it, I{your program will not work}.
653
654    deferredGenerator takes one of these waitForDeferred-using generator
655    functions and converts it into a function that returns a Deferred. The
656    result of the Deferred will be the last value that your generator yielded
657    unless the last value is a waitForDeferred instance, in which case the
658    result will be C{None}.  If the function raises an unhandled exception, the
659    Deferred will errback instead.  Remember that 'return result' won't work;
660    use 'yield result; return' in place of that.
661
662    Note that not yielding anything from your generator will make the Deferred
663    result in None. Yielding a Deferred from your generator is also an error
664    condition; always yield waitForDeferred(d) instead.
665
666    The Deferred returned from your deferred generator may also errback if your
667    generator raised an exception.  For example::
668
669        def thingummy():
670            thing = waitForDeferred(makeSomeRequestResultingInDeferred())
671            yield thing
672            thing = thing.getResult()
673            if thing == 'I love Twisted':
674                # will become the result of the Deferred
675                yield 'TWISTED IS GREAT!'
676                return
677            else:
678                # will trigger an errback
679                raise Exception('DESTROY ALL LIFE')
680        thingummy = deferredGenerator(thingummy)
681
682    Put succinctly, these functions connect deferred-using code with this 'fake
683    blocking' style in both directions: waitForDeferred converts from a
684    Deferred to the 'blocking' style, and deferredGenerator converts from the
685    'blocking' style to a Deferred.
686    """
687    def unwindGenerator(*args, **kwargs):
688        return _deferGenerator(f(*args, **kwargs), Deferred())
689    return mergeFunctionMetadata(f, unwindGenerator)
690
691
692## inlineCallbacks
693
694# BaseException is only in Py 2.5.
695try:
696    BaseException
697except NameError:
698    BaseException=Exception
699
700class _DefGen_Return(BaseException):
701    def __init__(self, value):
702        self.value = value
703
704def returnValue(val):
705    """
706    Return val from a L{inlineCallbacks} generator.
707
708    Note: this is currently implemented by raising an exception
709    derived from BaseException.  You might want to change any
710    'except:' clauses to an 'except Exception:' clause so as not to
711    catch this exception.
712
713    Also: while this function currently will work when called from
714    within arbitrary functions called from within the generator, do
715    not rely upon this behavior.
716    """
717    raise _DefGen_Return(val)
718
719def _inlineCallbacks(result, g, deferred):
720    """
721    See L{inlineCallbacks}.
722    """
723    # This function is complicated by the need to prevent unbounded recursion
724    # arising from repeatedly yielding immediately ready deferreds.  This while
725    # loop and the waiting variable solve that by manually unfolding the
726    # recursion.
727
728    waiting = [True, # waiting for result?
729               None] # result
730
731    while 1:
732        try:
733            # Send the last result back as the result of the yield expression.
734            if isinstance(result, failure.Failure):
735                result = result.throwExceptionIntoGenerator(g)
736            else:
737                result = g.send(result)
738        except StopIteration:
739            # fell off the end, or "return" statement
740            deferred.callback(None)
741            return deferred
742        except _DefGen_Return, e:
743            # returnValue call
744            deferred.callback(e.value)
745            return deferred
746        except:
747            deferred.errback()
748            return deferred
749
750        if isinstance(result, Deferred):
751            # a deferred was yielded, get the result.
752            def gotResult(r):
753                if waiting[0]:
754                    waiting[0] = False
755                    waiting[1] = r
756                else:
757                    _inlineCallbacks(r, g, deferred)
758
759            result.addBoth(gotResult)
760            if waiting[0]:
761                # Haven't called back yet, set flag so that we get reinvoked
762                # and return from the loop
763                waiting[0] = False
764                return deferred
765
766            result = waiting[1]
767            # Reset waiting to initial values for next loop.  gotResult uses
768            # waiting, but this isn't a problem because gotResult is only
769            # executed once, and if it hasn't been executed yet, the return
770            # branch above would have been taken.
771
772
773            waiting[0] = True
774            waiting[1] = None
775
776
777    return deferred
778
779def inlineCallbacks(f):
780    """
781    Maintainer: U{Christopher Armstrong<mailto:radix@twistedmatrix.com>}
782
783    WARNING: this function will not work in Python 2.4 and earlier!
784
785    inlineCallbacks helps you write Deferred-using code that looks like a
786    regular sequential function. This function uses features of Python 2.5
787    generators.  If you need to be compatible with Python 2.4 or before, use
788    the L{deferredGenerator} function instead, which accomplishes the same
789    thing, but with somewhat more boilerplate.  For example::
790
791        def thingummy():
792            thing = yield makeSomeRequestResultingInDeferred()
793            print thing #the result! hoorj!
794        thingummy = inlineCallbacks(thingummy)
795
796    When you call anything that results in a Deferred, you can simply yield it;
797    your generator will automatically be resumed when the Deferred's result is
798    available. The generator will be sent the result of the Deferred with the
799    'send' method on generators, or if the result was a failure, 'throw'.
800
801    Your inlineCallbacks-enabled generator will return a Deferred object, which
802    will result in the return value of the generator (or will fail with a
803    failure object if your generator raises an unhandled exception). Note that
804    you can't use 'return result' to return a value; use 'returnValue(result)'
805    instead. Falling off the end of the generator, or simply using 'return'
806    will cause the Deferred to have a result of None.
807
808    The Deferred returned from your deferred generator may errback if your
809    generator raised an exception::
810
811        def thingummy():
812            thing = yield makeSomeRequestResultingInDeferred()
813            if thing == 'I love Twisted':
814                # will become the result of the Deferred
815                returnValue('TWISTED IS GREAT!')
816            else:
817                # will trigger an errback
818                raise Exception('DESTROY ALL LIFE')
819        thingummy = inlineCallbacks(thingummy)
820    """
821    def unwindGenerator(*args, **kwargs):
822        return _inlineCallbacks(None, f(*args, **kwargs), Deferred())
823    return mergeFunctionMetadata(f, unwindGenerator)
824
825
826## DeferredLock/DeferredQueue
827
828class _ConcurrencyPrimitive(object):
829    def __init__(self):
830        self.waiting = []
831
832    def _releaseAndReturn(self, r):
833        self.release()
834        return r
835
836    def run(*args, **kwargs):
837        """Acquire, run, release.
838
839        This function takes a callable as its first argument and any
840        number of other positional and keyword arguments.  When the
841        lock or semaphore is acquired, the callable will be invoked
842        with those arguments.
843
844        The callable may return a Deferred; if it does, the lock or
845        semaphore won't be released until that Deferred fires.
846
847        @return: Deferred of function result.
848        """
849        if len(args) < 2:
850            if not args:
851                raise TypeError("run() takes at least 2 arguments, none given.")
852            raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
853                args[0].__class__.__name__,))
854        self, f = args[:2]
855        args = args[2:]
856
857        def execute(ignoredResult):
858            d = maybeDeferred(f, *args, **kwargs)
859            d.addBoth(self._releaseAndReturn)
860            return d
861
862        d = self.acquire()
863        d.addCallback(execute)
864        return d
865
866
867class DeferredLock(_ConcurrencyPrimitive):
868    """
869    A lock for event driven systems.
870
871    @ivar locked: True when this Lock has been acquired, false at all
872    other times.  Do not change this value, but it is useful to
873    examine for the equivalent of a \"non-blocking\" acquisition.
874    """
875
876    locked = 0
877
878    def acquire(self):
879        """Attempt to acquire the lock.
880
881        @return: a Deferred which fires on lock acquisition.
882        """
883        d = Deferred()
884        if self.locked:
885            self.waiting.append(d)
886        else:
887            self.locked = 1
888            d.callback(self)
889        return d
890
891    def release(self):
892        """Release the lock.
893
894        Should be called by whomever did the acquire() when the shared
895        resource is free.
896        """
897        assert self.locked, "Tried to release an unlocked lock"
898        self.locked = 0
899        if self.waiting:
900            # someone is waiting to acquire lock
901            self.locked = 1
902            d = self.waiting.pop(0)
903            d.callback(self)
904
905class DeferredSemaphore(_ConcurrencyPrimitive):
906    """
907    A semaphore for event driven systems.
908    """
909
910    def __init__(self, tokens):
911        _ConcurrencyPrimitive.__init__(self)
912        self.tokens = tokens
913        self.limit = tokens
914
915    def acquire(self):
916        """Attempt to acquire the token.
917
918        @return: a Deferred which fires on token acquisition.
919        """
920        assert self.tokens >= 0, "Internal inconsistency??  tokens should never be negative"
921        d = Deferred()
922        if not self.tokens:
923            self.waiting.append(d)
924        else:
925            self.tokens = self.tokens - 1
926            d.callback(self)
927        return d
928
929    def release(self):
930        """Release the token.
931
932        Should be called by whoever did the acquire() when the shared
933        resource is free.
934        """
935        assert self.tokens < self.limit, "Someone released me too many times: too many tokens!"
936        self.tokens = self.tokens + 1
937        if self.waiting:
938            # someone is waiting to acquire token
939            self.tokens = self.tokens - 1
940            d = self.waiting.pop(0)
941            d.callback(self)
942
943class QueueOverflow(Exception):
944    pass
945
946class QueueUnderflow(Exception):
947    pass
948
949
950class DeferredQueue(object):
951    """
952    An event driven queue.
953
954    Objects may be added as usual to this queue.  When an attempt is
955    made to retrieve an object when the queue is empty, a Deferred is
956    returned which will fire when an object becomes available.
957
958    @ivar size: The maximum number of objects to allow into the queue
959    at a time.  When an attempt to add a new object would exceed this
960    limit, QueueOverflow is raised synchronously.  None for no limit.
961
962    @ivar backlog: The maximum number of Deferred gets to allow at
963    one time.  When an attempt is made to get an object which would
964    exceed this limit, QueueUnderflow is raised synchronously.  None
965    for no limit.
966    """
967
968    def __init__(self, size=None, backlog=None):
969        self.waiting = []
970        self.pending = []
971        self.size = size
972        self.backlog = backlog
973
974    def put(self, obj):
975        """Add an object to this queue.
976
977        @raise QueueOverflow: Too many objects are in this queue.
978        """
979        if self.waiting:
980            self.waiting.pop(0).callback(obj)
981        elif self.size is None or len(self.pending) < self.size:
982            self.pending.append(obj)
983        else:
984            raise QueueOverflow()
985
986    def get(self):
987        """Attempt to retrieve and remove an object from the queue.
988
989        @return: a Deferred which fires with the next object available in the queue.
990
991        @raise QueueUnderflow: Too many (more than C{backlog})
992        Deferreds are already waiting for an object from this queue.
993        """
994        if self.pending:
995            return succeed(self.pending.pop(0))
996        elif self.backlog is None or len(self.waiting) < self.backlog:
997            d = Deferred()
998            self.waiting.append(d)
999            return d
1000        else:
1001            raise QueueUnderflow()
1002
1003
1004class AlreadyTryingToLockError(Exception):
1005    """
1006    Raised when DeferredFilesystemLock.deferUntilLocked is called twice on a
1007    single DeferredFilesystemLock.
1008    """
1009
1010
1011class DeferredFilesystemLock(lockfile.FilesystemLock):
1012    """
1013    A FilesystemLock that allows for a deferred to be fired when the lock is
1014    acquired.
1015
1016    @ivar _scheduler: The object in charge of scheduling retries. In this
1017        implementation this is parameterized for testing.
1018
1019    @ivar _interval: The retry interval for an L{IReactorTime} based scheduler.
1020
1021    @ivar _tryLockCall: A L{DelayedCall} based on _interval that will managex
1022        the next retry for aquiring the lock.
1023
1024    @ivar _timeoutCall: A L{DelayedCall} based on deferUntilLocked's timeout
1025        argument.  This is in charge of timing out our attempt to acquire the
1026        lock.
1027    """
1028    _interval = 1
1029    _tryLockCall = None
1030    _timeoutCall = None
1031
1032    def __init__(self, name, scheduler=None):
1033        """
1034        @param name: The name of the lock to acquire
1035        @param scheduler: An object which provides L{IReactorTime}
1036        """
1037        lockfile.FilesystemLock.__init__(self, name)
1038
1039        if scheduler is None:
1040            from twisted.internet import reactor
1041            scheduler = reactor
1042
1043        self._scheduler = scheduler
1044
1045    def deferUntilLocked(self, timeout=None):
1046        """
1047        Wait until we acquire this lock.  This method is not safe for
1048        concurrent use.
1049
1050        @type timeout: C{float} or C{int}
1051        @param timeout: the number of seconds after which to time out if the
1052            lock has not been acquired.
1053
1054        @return: a deferred which will callback when the lock is acquired, or
1055            errback with a L{TimeoutError} after timing out or an
1056            L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already
1057            been called and not successfully locked the file.
1058        """
1059        if self._tryLockCall is not None:
1060            return fail(
1061                AlreadyTryingToLockError(
1062                    "deferUntilLocked isn't safe for concurrent use."))
1063
1064        d = Deferred()
1065
1066        def _cancelLock():
1067            self._tryLockCall.cancel()
1068            self._tryLockCall = None
1069            self._timeoutCall = None
1070
1071            if self.lock():
1072                d.callback(None)
1073            else:
1074                d.errback(failure.Failure(
1075                        TimeoutError("Timed out aquiring lock: %s after %fs" % (
1076                                self.name,
1077                                timeout))))
1078
1079        def _tryLock():
1080            if self.lock():
1081                if self._timeoutCall is not None:
1082                    self._timeoutCall.cancel()
1083                    self._timeoutCall = None
1084
1085                self._tryLockCall = None
1086
1087                d.callback(None)
1088            else:
1089                if timeout is not None and self._timeoutCall is None:
1090                    self._timeoutCall = self._scheduler.callLater(
1091                        timeout, _cancelLock)
1092
1093                self._tryLockCall = self._scheduler.callLater(
1094                    self._interval, _tryLock)
1095
1096        _tryLock()
1097
1098        return d
1099
1100
1101__all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
1102           "AlreadyCalledError", "TimeoutError", "gatherResults",
1103           "maybeDeferred",
1104           "waitForDeferred", "deferredGenerator", "inlineCallbacks",
1105           "DeferredLock", "DeferredSemaphore", "DeferredQueue",
1106           "DeferredFilesystemLock", "AlreadyTryingToLockError",
1107          ]
Note: See TracBrowser for help on using the browser.