root/tags/releases/twisted-8.2.0/twisted/internet/defer.py

Revision 25200, 37.6 KB (checked in by exarkun, 22 months ago)

Merge firsterror-display-3298

Author: exarkun, itamar
Reviewer: thijs, radix
Fixes: #3298

Change the __str__ and __repr__ of FirstError so that they include better
and more complete information about the underlying failure (and therefore the
real cause of the exception). This means that when these errors are logged, the
log will now include information about the exception which caused the FirstError.

The deprecated tuple-like interface is also removed.

Line 
1# -*- test-case-name: twisted.test.test_defer -*-
2#
3# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7Support for results that aren't immediately available.
8
9Maintainer: Glyph Lefkowitz
10"""
11
12from __future__ import nested_scopes, generators
13import traceback
14import warnings
15
16# Twisted imports
17from twisted.python import log, failure, lockfile
18from twisted.python.util import unsignedID, mergeFunctionMetadata
19
20class AlreadyCalledError(Exception):
21    pass
22
23class TimeoutError(Exception):
24    pass
25
26def logError(err):
27    log.err(err)
28    return err
29
30def succeed(result):
31    """
32    Return a Deferred that has already had '.callback(result)' called.
33
34    This is useful when you're writing synchronous code to an
35    asynchronous interface: i.e., some code is calling you expecting a
36    Deferred result, but you don't actually need to do anything
37    asynchronous. Just return defer.succeed(theResult).
38
39    See L{fail} for a version of this function that uses a failing
40    Deferred rather than a successful one.
41
42    @param result: The result to give to the Deferred's 'callback'
43           method.
44
45    @rtype: L{Deferred}
46    """
47    d = Deferred()
48    d.callback(result)
49    return d
50
51
52def fail(result=None):
53    """
54    Return a Deferred that has already had '.errback(result)' called.
55
56    See L{succeed}'s docstring for rationale.
57
58    @param result: The same argument that L{Deferred.errback} takes.
59
60    @raise NoCurrentExceptionError: If C{result} is C{None} but there is no
61        current exception state.
62
63    @rtype: L{Deferred}
64    """
65    d = Deferred()
66    d.errback(result)
67    return d
68
69
70def execute(callable, *args, **kw):
71    """Create a deferred from a callable and arguments.
72
73    Call the given function with the given arguments.  Return a deferred which
74    has been fired with its callback as the result of that invocation or its
75    errback with a Failure for the exception thrown.
76    """
77    try:
78        result = callable(*args, **kw)
79    except:
80        return fail()
81    else:
82        return succeed(result)
83
84def maybeDeferred(f, *args, **kw):
85    """Invoke a function that may or may not return a deferred.
86
87    Call the given function with the given arguments.  If the returned
88    object is a C{Deferred}, return it.  If the returned object is a C{Failure},
89    wrap it with C{fail} and return it.  Otherwise, wrap it in C{succeed} and
90    return it.  If an exception is raised, convert it to a C{Failure}, wrap it
91    in C{fail}, and then return it.
92
93    @type f: Any callable
94    @param f: The callable to invoke
95
96    @param args: The arguments to pass to C{f}
97    @param kw: The keyword arguments to pass to C{f}
98
99    @rtype: C{Deferred}
100    @return: The result of the function call, wrapped in a C{Deferred} if
101    necessary.
102    """
103    deferred = None
104
105    try:
106        result = f(*args, **kw)
107    except:
108        return fail(failure.Failure())
109    else:
110        if isinstance(result, Deferred):
111            return result
112        elif isinstance(result, failure.Failure):
113            return fail(result)
114        else:
115            return succeed(result)
116    return deferred
117
118def timeout(deferred):
119    deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
120
121def passthru(arg):
122    return arg
123
124def setDebugging(on):
125    """Enable or disable Deferred debugging.
126
127    When debugging is on, the call stacks from creation and invocation are
128    recorded, and added to any AlreadyCalledErrors we raise.
129    """
130    Deferred.debug=bool(on)
131
132def getDebugging():
133    """Determine whether Deferred debugging is enabled.
134    """
135    return Deferred.debug
136
137class Deferred:
138    """This is a callback which will be put off until later.
139
140    Why do we want this? Well, in cases where a function in a threaded
141    program would block until it gets a result, for Twisted it should
142    not block. Instead, it should return a Deferred.
143
144    This can be implemented for protocols that run over the network by
145    writing an asynchronous protocol for twisted.internet. For methods
146    that come from outside packages that are not under our control, we use
147    threads (see for example L{twisted.enterprise.adbapi}).
148
149    For more information about Deferreds, see doc/howto/defer.html or
150    U{http://twistedmatrix.com/projects/core/documentation/howto/defer.html}
151    """
152    called = 0
153    paused = 0
154    timeoutCall = None
155    _debugInfo = None
156
157    # Are we currently running a user-installed callback?  Meant to prevent
158    # recursive running of callbacks when a reentrant call to add a callback is
159    # used.
160    _runningCallbacks = False
161
162    # Keep this class attribute for now, for compatibility with code that
163    # sets it directly.
164    debug = False
165
166    def __init__(self):
167        self.callbacks = []
168        if self.debug:
169            self._debugInfo = DebugInfo()
170            self._debugInfo.creator = traceback.format_stack()[:-1]
171
172    def addCallbacks(self, callback, errback=None,
173                     callbackArgs=None, callbackKeywords=None,
174                     errbackArgs=None, errbackKeywords=None):
175        """Add a pair of callbacks (success and error) to this Deferred.
176
177        These will be executed when the 'master' callback is run.
178        """
179        assert callable(callback)
180        assert errback == None or callable(errback)
181        cbs = ((callback, callbackArgs, callbackKeywords),
182               (errback or (passthru), errbackArgs, errbackKeywords))
183        self.callbacks.append(cbs)
184
185        if self.called:
186            self._runCallbacks()
187        return self
188
189    def addCallback(self, callback, *args, **kw):
190        """Convenience method for adding just a callback.
191
192        See L{addCallbacks}.
193        """
194        return self.addCallbacks(callback, callbackArgs=args,
195                                 callbackKeywords=kw)
196
197    def addErrback(self, errback, *args, **kw):
198        """Convenience method for adding just an errback.
199
200        See L{addCallbacks}.
201        """
202        return self.addCallbacks(passthru, errback,
203                                 errbackArgs=args,
204                                 errbackKeywords=kw)
205
206    def addBoth(self, callback, *args, **kw):
207        """Convenience method for adding a single callable as both a callback
208        and an errback.
209
210        See L{addCallbacks}.
211        """
212        return self.addCallbacks(callback, callback,
213                                 callbackArgs=args, errbackArgs=args,
214                                 callbackKeywords=kw, errbackKeywords=kw)
215
216    def chainDeferred(self, d):
217        """Chain another Deferred to this Deferred.
218
219        This method adds callbacks to this Deferred to call d's callback or
220        errback, as appropriate. It is merely a shorthand way of performing
221        the following::
222
223            self.addCallbacks(d.callback, d.errback)
224
225        When you chain a deferred d2 to another deferred d1 with
226        d1.chainDeferred(d2), you are making d2 participate in the callback
227        chain of d1. Thus any event that fires d1 will also fire d2.
228        However, the converse is B{not} true; if d2 is fired d1 will not be
229        affected.
230        """
231        return self.addCallbacks(d.callback, d.errback)
232
233    def callback(self, result):
234        """Run all success callbacks that have been added to this Deferred.
235
236        Each callback will have its result passed as the first
237        argument to the next; this way, the callbacks act as a
238        'processing chain'. Also, if the success-callback returns a Failure
239        or raises an Exception, processing will continue on the *error*-
240        callback chain.
241        """
242        assert not isinstance(result, Deferred)
243        self._startRunCallbacks(result)
244
245
246    def errback(self, fail=None):
247        """
248        Run all error callbacks that have been added to this Deferred.
249
250        Each callback will have its result passed as the first
251        argument to the next; this way, the callbacks act as a
252        'processing chain'. Also, if the error-callback returns a non-Failure
253        or doesn't raise an Exception, processing will continue on the
254        *success*-callback chain.
255
256        If the argument that's passed to me is not a failure.Failure instance,
257        it will be embedded in one. If no argument is passed, a failure.Failure
258        instance will be created based on the current traceback stack.
259
260        Passing a string as `fail' is deprecated, and will be punished with
261        a warning message.
262
263        @raise NoCurrentExceptionError: If C{fail} is C{None} but there is
264            no current exception state.
265        """
266        if not isinstance(fail, failure.Failure):
267            fail = failure.Failure(fail)
268
269        self._startRunCallbacks(fail)
270
271
272    def pause(self):
273        """Stop processing on a Deferred until L{unpause}() is called.
274        """
275        self.paused = self.paused + 1
276
277
278    def unpause(self):
279        """Process all callbacks made since L{pause}() was called.
280        """
281        self.paused = self.paused - 1
282        if self.paused:
283            return
284        if self.called:
285            self._runCallbacks()
286
287    def _continue(self, result):
288        self.result = result
289        self.unpause()
290
291    def _startRunCallbacks(self, result):
292        if self.called:
293            if self.debug:
294                if self._debugInfo is None:
295                    self._debugInfo = DebugInfo()
296                extra = "\n" + self._debugInfo._getDebugTracebacks()
297                raise AlreadyCalledError(extra)
298            raise AlreadyCalledError
299        if self.debug:
300            if self._debugInfo is None:
301                self._debugInfo = DebugInfo()
302            self._debugInfo.invoker = traceback.format_stack()[:-2]
303        self.called = True
304        self.result = result
305        if self.timeoutCall:
306            try:
307                self.timeoutCall.cancel()
308            except:
309                pass
310
311            del self.timeoutCall
312        self._runCallbacks()
313
314    def _runCallbacks(self):
315        if self._runningCallbacks:
316            # Don't recursively run callbacks
317            return
318        if not self.paused:
319            while self.callbacks:
320                item = self.callbacks.pop(0)
321                callback, args, kw = item[
322                    isinstance(self.result, failure.Failure)]
323                args = args or ()
324                kw = kw or {}
325                try:
326                    self._runningCallbacks = True
327                    try:
328                        self.result = callback(self.result, *args, **kw)
329                    finally:
330                        self._runningCallbacks = False
331                    if isinstance(self.result, Deferred):
332                        # note: this will cause _runCallbacks to be called
333                        # recursively if self.result already has a result.
334                        # This shouldn't cause any problems, since there is no
335                        # relevant state in this stack frame at this point.
336                        # The recursive call will continue to process
337                        # self.callbacks until it is empty, then return here,
338                        # where there is no more work to be done, so this call
339                        # will return as well.
340                        self.pause()
341                        self.result.addBoth(self._continue)
342                        break
343                except:
344                    self.result = failure.Failure()
345
346        if isinstance(self.result, failure.Failure):
347            self.result.cleanFailure()
348            if self._debugInfo is None:
349                self._debugInfo = DebugInfo()
350            self._debugInfo.failResult = self.result
351        else:
352            if self._debugInfo is not None:
353                self._debugInfo.failResult = None
354
355    def setTimeout(self, seconds, timeoutFunc=timeout, *args, **kw):
356        """Set a timeout function to be triggered if I am not called.
357
358        @param seconds: How long to wait (from now) before firing the
359        timeoutFunc.
360
361        @param timeoutFunc: will receive the Deferred and *args, **kw as its
362        arguments.  The default timeoutFunc will call the errback with a
363        L{TimeoutError}.
364        """
365        warnings.warn(
366            "Deferred.setTimeout is deprecated.  Look for timeout "
367            "support specific to the API you are using instead.",
368            DeprecationWarning, stacklevel=2)
369
370        if self.called:
371            return
372        assert not self.timeoutCall, "Don't call setTimeout twice on the same Deferred."
373
374        from twisted.internet import reactor
375        self.timeoutCall = reactor.callLater(
376            seconds,
377            lambda: self.called or timeoutFunc(self, *args, **kw))
378        return self.timeoutCall
379
380    def __str__(self):
381        cname = self.__class__.__name__
382        if hasattr(self, 'result'):
383            return "<%s at %s  current result: %r>" % (cname, hex(unsignedID(self)),
384                                                       self.result)
385        return "<%s at %s>" % (cname, hex(unsignedID(self)))
386    __repr__ = __str__
387
388class DebugInfo:
389    """Deferred debug helper"""
390    failResult = None
391
392    def _getDebugTracebacks(self):
393        info = ''
394        if hasattr(self, "creator"):
395            info += " C: Deferred was created:\n C:"
396            info += "".join(self.creator).rstrip().replace("\n","\n C:")
397            info += "\n"
398        if hasattr(self, "invoker"):
399            info += " I: First Invoker was:\n I:"
400            info += "".join(self.invoker).rstrip().replace("\n","\n I:")
401            info += "\n"
402        return info
403
404    def __del__(self):
405        """Print tracebacks and die.
406
407        If the *last* (and I do mean *last*) callback leaves me in an error
408        state, print a traceback (if said errback is a Failure).
409        """
410        if self.failResult is not None:
411            log.msg("Unhandled error in Deferred:", isError=True)
412            debugInfo = self._getDebugTracebacks()
413            if debugInfo != '':
414                log.msg("(debug: " + debugInfo + ")", isError=True)
415            log.err(self.failResult)
416
417
418
419class FirstError(Exception):
420    """
421    First error to occur in a L{DeferredList} if C{fireOnOneErrback} is set.
422
423    @ivar subFailure: The L{Failure} that occurred.
424    @type subFailure: L{Failure}
425
426    @ivar index: The index of the L{Deferred} in the L{DeferredList} where
427        it happened.
428    @type index: C{int}
429    """
430    def __init__(self, failure, index):
431        Exception.__init__(self, failure, index)
432        self.subFailure = failure
433        self.index = index
434
435
436    def __repr__(self):
437        """
438        The I{repr} of L{FirstError} instances includes the repr of the
439        wrapped failure's exception and the index of the L{FirstError}.
440        """
441        return 'FirstError[#%d, %r]' % (self.index, self.subFailure.value)
442
443
444    def __str__(self):
445        """
446        The I{str} of L{FirstError} instances includes the I{str} of the
447        entire wrapped failure (including its traceback and exception) and
448        the index of the L{FirstError}.
449        """
450        return 'FirstError[#%d, %s]' % (self.index, self.subFailure)
451
452
453    def __cmp__(self, other):
454        """
455        Comparison between L{FirstError} and other L{FirstError} instances
456        is defined as the comparison of the index and sub-failure of each
457        instance.  L{FirstError} instances don't compare equal to anything
458        that isn't a L{FirstError} instance.
459
460        @since: 8.2
461        """
462        if isinstance(other, FirstError):
463            return cmp(
464                (self.index, self.subFailure),
465                (other.index, other.subFailure))
466        return -1
467
468
469
470class DeferredList(Deferred):
471    """I combine a group of deferreds into one callback.
472
473    I track a list of L{Deferred}s for their callbacks, and make a single
474    callback when they have all completed, a list of (success, result)
475    tuples, 'success' being a boolean.
476
477    Note that you can still use a L{Deferred} after putting it in a
478    DeferredList.  For example, you can suppress 'Unhandled error in Deferred'
479    messages by adding errbacks to the Deferreds *after* putting them in the
480    DeferredList, as a DeferredList won't swallow the errors.  (Although a more
481    convenient way to do this is simply to set the consumeErrors flag)
482    """
483
484    fireOnOneCallback = 0
485    fireOnOneErrback = 0
486
487    def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
488                 consumeErrors=0):
489        """Initialize a DeferredList.
490
491        @type deferredList:  C{list} of L{Deferred}s
492        @param deferredList: The list of deferreds to track.
493        @param fireOnOneCallback: (keyword param) a flag indicating that
494                             only one callback needs to be fired for me to call
495                             my callback
496        @param fireOnOneErrback: (keyword param) a flag indicating that
497                            only one errback needs to be fired for me to call
498                            my errback
499        @param consumeErrors: (keyword param) a flag indicating that any errors
500                            raised in the original deferreds should be
501                            consumed by this DeferredList.  This is useful to
502                            prevent spurious warnings being logged.
503        """
504        self.resultList = [None] * len(deferredList)
505        Deferred.__init__(self)
506        if len(deferredList) == 0 and not fireOnOneCallback:
507            self.callback(self.resultList)
508
509        # These flags need to be set *before* attaching callbacks to the
510        # deferreds, because the callbacks use these flags, and will run
511        # synchronously if any of the deferreds are already fired.
512        self.fireOnOneCallback = fireOnOneCallback
513        self.fireOnOneErrback = fireOnOneErrback
514        self.consumeErrors = consumeErrors
515        self.finishedCount = 0
516
517        index = 0
518        for deferred in deferredList:
519            deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
520                                  callbackArgs=(index,SUCCESS),
521                                  errbackArgs=(index,FAILURE))
522            index = index + 1
523
524    def _cbDeferred(self, result, index, succeeded):
525        """(internal) Callback for when one of my deferreds fires.
526        """
527        self.resultList[index] = (succeeded, result)
528
529        self.finishedCount += 1
530        if not self.called:
531            if succeeded == SUCCESS and self.fireOnOneCallback:
532                self.callback((result, index))
533            elif succeeded == FAILURE and self.fireOnOneErrback:
534                self.errback(failure.Failure(FirstError(result, index)))
535            elif self.finishedCount == len(self.resultList):
536                self.callback(self.resultList)
537
538        if succeeded == FAILURE and self.consumeErrors:
539            result = None
540
541        return result
542
543
544def _parseDListResult(l, fireOnOneErrback=0):
545    if __debug__:
546        for success, value in l:
547            assert success
548    return [x[1] for x in l]
549
550def gatherResults(deferredList):
551    """Returns list with result of given Deferreds.
552
553    This builds on C{DeferredList} but is useful since you don't
554    need to parse the result for success/failure.
555
556    @type deferredList:  C{list} of L{Deferred}s
557    """
558    d = DeferredList(deferredList, fireOnOneErrback=1)
559    d.addCallback(_parseDListResult)
560    return d
561
562# Constants for use with DeferredList
563
564SUCCESS = True
565FAILURE = False
566
567
568
569## deferredGenerator
570
571class waitForDeferred:
572    """
573    See L{deferredGenerator}.
574    """
575
576    def __init__(self, d):
577        if not isinstance(d, Deferred):
578            raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,))
579        self.d = d
580
581
582    def getResult(self):
583        if isinstance(self.result, failure.Failure):
584            self.result.raiseException()
585        return self.result
586
587
588
589def _deferGenerator(g, deferred):
590    """
591    See L{deferredGenerator}.
592    """
593    result = None
594
595    # This function is complicated by the need to prevent unbounded recursion
596    # arising from repeatedly yielding immediately ready deferreds.  This while
597    # loop and the waiting variable solve that by manually unfolding the
598    # recursion.
599
600    waiting = [True, # defgen is waiting for result?
601               None] # result
602
603    while 1:
604        try:
605            result = g.next()
606        except StopIteration:
607            deferred.callback(result)
608            return deferred
609        except:
610            deferred.errback()
611            return deferred
612
613        # Deferred.callback(Deferred) raises an error; we catch this case
614        # early here and give a nicer error message to the user in case
615        # they yield a Deferred.
616        if isinstance(result, Deferred):
617            return fail(TypeError("Yield waitForDeferred(d), not d!"))
618
619        if isinstance(result, waitForDeferred):
620            # a waitForDeferred was yielded, get the result.
621            # Pass result in so it don't get changed going around the loop
622            # This isn't a problem for waiting, as it's only reused if
623            # gotResult has already been executed.
624            def gotResult(r, result=result):
625                result.result = r
626                if waiting[0]:
627                    waiting[0] = False
628                    waiting[1] = r
629                else:
630                    _deferGenerator(g, deferred)
631            result.d.addBoth(gotResult)
632            if waiting[0]:
633                # Haven't called back yet, set flag so that we get reinvoked
634                # and return from the loop
635                waiting[0] = False
636                return deferred
637            # Reset waiting to initial values for next loop
638            waiting[0] = True
639            waiting[1] = None
640
641            result = None
642
643
644
645def deferredGenerator(f):
646    """
647    deferredGenerator and waitForDeferred help you write Deferred-using code
648    that looks like a regular sequential function. If your code has a minimum
649    requirement of Python 2.5, consider the use of L{inlineCallbacks} instead,
650    which can accomplish the same thing in a more concise manner.
651
652    There are two important functions involved: waitForDeferred, and
653    deferredGenerator.  They are used together, like this::
654
655        def thingummy():
656            thing = waitForDeferred(makeSomeRequestResultingInDeferred())
657            yield thing
658            thing = thing.getResult()
659            print thing #the result! hoorj!
660        thingummy = deferredGenerator(thingummy)
661
662    waitForDeferred returns something that you should immediately yield; when
663    your generator is resumed, calling thing.getResult() will either give you
664    the result of the Deferred if it was a success, or raise an exception if it
665    was a failure.  Calling C{getResult} is B{absolutely mandatory}.  If you do
666    not call it, I{your program will not work}.
667
668    deferredGenerator takes one of these waitForDeferred-using generator
669    functions and converts it into a function that returns a Deferred. The
670    result of the Deferred will be the last value that your generator yielded
671    unless the last value is a waitForDeferred instance, in which case the
672    result will be C{None}.  If the function raises an unhandled exception, the
673    Deferred will errback instead.  Remember that 'return result' won't work;
674    use 'yield result; return' in place of that.
675
676    Note that not yielding anything from your generator will make the Deferred
677    result in None. Yielding a Deferred from your generator is also an error
678    condition; always yield waitForDeferred(d) instead.
679
680    The Deferred returned from your deferred generator may also errback if your
681    generator raised an exception.  For example::
682
683        def thingummy():
684            thing = waitForDeferred(makeSomeRequestResultingInDeferred())
685            yield thing
686            thing = thing.getResult()
687            if thing == 'I love Twisted':
688                # will become the result of the Deferred
689                yield 'TWISTED IS GREAT!'
690                return
691            else:
692                # will trigger an errback
693                raise Exception('DESTROY ALL LIFE')
694        thingummy = deferredGenerator(thingummy)
695
696    Put succinctly, these functions connect deferred-using code with this 'fake
697    blocking' style in both directions: waitForDeferred converts from a
698    Deferred to the 'blocking' style, and deferredGenerator converts from the
699    'blocking' style to a Deferred.
700    """
701    def unwindGenerator(*args, **kwargs):
702        return _deferGenerator(f(*args, **kwargs), Deferred())
703    return mergeFunctionMetadata(f, unwindGenerator)
704
705
706## inlineCallbacks
707
708# BaseException is only in Py 2.5.
709try:
710    BaseException
711except NameError:
712    BaseException=Exception
713
714class _DefGen_Return(BaseException):
715    def __init__(self, value):
716        self.value = value
717
718def returnValue(val):
719    """
720    Return val from a L{inlineCallbacks} generator.
721
722    Note: this is currently implemented by raising an exception
723    derived from BaseException.  You might want to change any
724    'except:' clauses to an 'except Exception:' clause so as not to
725    catch this exception.
726
727    Also: while this function currently will work when called from
728    within arbitrary functions called from within the generator, do
729    not rely upon this behavior.
730    """
731    raise _DefGen_Return(val)
732
733def _inlineCallbacks(result, g, deferred):
734    """
735    See L{inlineCallbacks}.
736    """
737    # This function is complicated by the need to prevent unbounded recursion
738    # arising from repeatedly yielding immediately ready deferreds.  This while
739    # loop and the waiting variable solve that by manually unfolding the
740    # recursion.
741
742    waiting = [True, # waiting for result?
743               None] # result
744
745    while 1:
746        try:
747            # Send the last result back as the result of the yield expression.
748            if isinstance(result, failure.Failure):
749                result = result.throwExceptionIntoGenerator(g)
750            else:
751                result = g.send(result)
752        except StopIteration:
753            # fell off the end, or "return" statement
754            deferred.callback(None)
755            return deferred
756        except _DefGen_Return, e:
757            # returnValue call
758            deferred.callback(e.value)
759            return deferred
760        except:
761            deferred.errback()
762            return deferred
763
764        if isinstance(result, Deferred):
765            # a deferred was yielded, get the result.
766            def gotResult(r):
767                if waiting[0]:
768                    waiting[0] = False
769                    waiting[1] = r
770                else:
771                    _inlineCallbacks(r, g, deferred)
772
773            result.addBoth(gotResult)
774            if waiting[0]:
775                # Haven't called back yet, set flag so that we get reinvoked
776                # and return from the loop
777                waiting[0] = False
778                return deferred
779
780            result = waiting[1]
781            # Reset waiting to initial values for next loop.  gotResult uses
782            # waiting, but this isn't a problem because gotResult is only
783            # executed once, and if it hasn't been executed yet, the return
784            # branch above would have been taken.
785
786
787            waiting[0] = True
788            waiting[1] = None
789
790
791    return deferred
792
793def inlineCallbacks(f):
794    """
795    WARNING: this function will not work in Python 2.4 and earlier!
796
797    inlineCallbacks helps you write Deferred-using code that looks like a
798    regular sequential function. This function uses features of Python 2.5
799    generators.  If you need to be compatible with Python 2.4 or before, use
800    the L{deferredGenerator} function instead, which accomplishes the same
801    thing, but with somewhat more boilerplate.  For example::
802
803        def thingummy():
804            thing = yield makeSomeRequestResultingInDeferred()
805            print thing #the result! hoorj!
806        thingummy = inlineCallbacks(thingummy)
807
808    When you call anything that results in a Deferred, you can simply yield it;
809    your generator will automatically be resumed when the Deferred's result is
810    available. The generator will be sent the result of the Deferred with the
811    'send' method on generators, or if the result was a failure, 'throw'.
812
813    Your inlineCallbacks-enabled generator will return a Deferred object, which
814    will result in the return value of the generator (or will fail with a
815    failure object if your generator raises an unhandled exception). Note that
816    you can't use 'return result' to return a value; use 'returnValue(result)'
817    instead. Falling off the end of the generator, or simply using 'return'
818    will cause the Deferred to have a result of None.
819
820    The Deferred returned from your deferred generator may errback if your
821    generator raised an exception::
822
823        def thingummy():
824            thing = yield makeSomeRequestResultingInDeferred()
825            if thing == 'I love Twisted':
826                # will become the result of the Deferred
827                returnValue('TWISTED IS GREAT!')
828            else:
829                # will trigger an errback
830                raise Exception('DESTROY ALL LIFE')
831        thingummy = inlineCallbacks(thingummy)
832    """
833    def unwindGenerator(*args, **kwargs):
834        return _inlineCallbacks(None, f(*args, **kwargs), Deferred())
835    return mergeFunctionMetadata(f, unwindGenerator)
836
837
838## DeferredLock/DeferredQueue
839
840class _ConcurrencyPrimitive(object):
841    def __init__(self):
842        self.waiting = []
843
844    def _releaseAndReturn(self, r):
845        self.release()
846        return r
847
848    def run(*args, **kwargs):
849        """Acquire, run, release.
850
851        This function takes a callable as its first argument and any
852        number of other positional and keyword arguments.  When the
853        lock or semaphore is acquired, the callable will be invoked
854        with those arguments.
855
856        The callable may return a Deferred; if it does, the lock or
857        semaphore won't be released until that Deferred fires.
858
859        @return: Deferred of function result.
860        """
861        if len(args) < 2:
862            if not args:
863                raise TypeError("run() takes at least 2 arguments, none given.")
864            raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
865                args[0].__class__.__name__,))
866        self, f = args[:2]
867        args = args[2:]
868
869        def execute(ignoredResult):
870            d = maybeDeferred(f, *args, **kwargs)
871            d.addBoth(self._releaseAndReturn)
872            return d
873
874        d = self.acquire()
875        d.addCallback(execute)
876        return d
877
878
879class DeferredLock(_ConcurrencyPrimitive):
880    """
881    A lock for event driven systems.
882
883    @ivar locked: True when this Lock has been acquired, false at all
884    other times.  Do not change this value, but it is useful to
885    examine for the equivalent of a \"non-blocking\" acquisition.
886    """
887
888    locked = 0
889
890    def acquire(self):
891        """Attempt to acquire the lock.
892
893        @return: a Deferred which fires on lock acquisition.
894        """
895        d = Deferred()
896        if self.locked:
897            self.waiting.append(d)
898        else:
899            self.locked = 1
900            d.callback(self)
901        return d
902
903    def release(self):
904        """Release the lock.
905
906        Should be called by whomever did the acquire() when the shared
907        resource is free.
908        """
909        assert self.locked, "Tried to release an unlocked lock"
910        self.locked = 0
911        if self.waiting:
912            # someone is waiting to acquire lock
913            self.locked = 1
914            d = self.waiting.pop(0)
915            d.callback(self)
916
917class DeferredSemaphore(_ConcurrencyPrimitive):
918    """
919    A semaphore for event driven systems.
920    """
921
922    def __init__(self, tokens):
923        _ConcurrencyPrimitive.__init__(self)
924        self.tokens = tokens
925        self.limit = tokens
926
927    def acquire(self):
928        """Attempt to acquire the token.
929
930        @return: a Deferred which fires on token acquisition.
931        """
932        assert self.tokens >= 0, "Internal inconsistency??  tokens should never be negative"
933        d = Deferred()
934        if not self.tokens:
935            self.waiting.append(d)
936        else:
937            self.tokens = self.tokens - 1
938            d.callback(self)
939        return d
940
941    def release(self):
942        """Release the token.
943
944        Should be called by whoever did the acquire() when the shared
945        resource is free.
946        """
947        assert self.tokens < self.limit, "Someone released me too many times: too many tokens!"
948        self.tokens = self.tokens + 1
949        if self.waiting:
950            # someone is waiting to acquire token
951            self.tokens = self.tokens - 1
952            d = self.waiting.pop(0)
953            d.callback(self)
954
955class QueueOverflow(Exception):
956    pass
957
958class QueueUnderflow(Exception):
959    pass
960
961
962class DeferredQueue(object):
963    """
964    An event driven queue.
965
966    Objects may be added as usual to this queue.  When an attempt is
967    made to retrieve an object when the queue is empty, a Deferred is
968    returned which will fire when an object becomes available.
969
970    @ivar size: The maximum number of objects to allow into the queue
971    at a time.  When an attempt to add a new object would exceed this
972    limit, QueueOverflow is raised synchronously.  None for no limit.
973
974    @ivar backlog: The maximum number of Deferred gets to allow at
975    one time.  When an attempt is made to get an object which would
976    exceed this limit, QueueUnderflow is raised synchronously.  None
977    for no limit.
978    """
979
980    def __init__(self, size=None, backlog=None):
981        self.waiting = []
982        self.pending = []
983        self.size = size
984        self.backlog = backlog
985
986    def put(self, obj):
987        """Add an object to this queue.
988
989        @raise QueueOverflow: Too many objects are in this queue.
990        """
991        if self.waiting:
992            self.waiting.pop(0).callback(obj)
993        elif self.size is None or len(self.pending) < self.size:
994            self.pending.append(obj)
995        else:
996            raise QueueOverflow()
997
998    def get(self):
999        """Attempt to retrieve and remove an object from the queue.
1000
1001        @return: a Deferred which fires with the next object available in the queue.
1002
1003        @raise QueueUnderflow: Too many (more than C{backlog})
1004        Deferreds are already waiting for an object from this queue.
1005        """
1006        if self.pending:
1007            return succeed(self.pending.pop(0))
1008        elif self.backlog is None or len(self.waiting) < self.backlog:
1009            d = Deferred()
1010            self.waiting.append(d)
1011            return d
1012        else:
1013            raise QueueUnderflow()
1014
1015
1016class AlreadyTryingToLockError(Exception):
1017    """
1018    Raised when DeferredFilesystemLock.deferUntilLocked is called twice on a
1019    single DeferredFilesystemLock.
1020    """
1021
1022
1023class DeferredFilesystemLock(lockfile.FilesystemLock):
1024    """
1025    A FilesystemLock that allows for a deferred to be fired when the lock is
1026    acquired.
1027
1028    @ivar _scheduler: The object in charge of scheduling retries. In this
1029        implementation this is parameterized for testing.
1030
1031    @ivar _interval: The retry interval for an L{IReactorTime} based scheduler.
1032
1033    @ivar _tryLockCall: A L{DelayedCall} based on _interval that will managex
1034        the next retry for aquiring the lock.
1035
1036    @ivar _timeoutCall: A L{DelayedCall} based on deferUntilLocked's timeout
1037        argument.  This is in charge of timing out our attempt to acquire the
1038        lock.
1039    """
1040    _interval = 1
1041    _tryLockCall = None
1042    _timeoutCall = None
1043
1044    def __init__(self, name, scheduler=None):
1045        """
1046        @param name: The name of the lock to acquire
1047        @param scheduler: An object which provides L{IReactorTime}
1048        """
1049        lockfile.FilesystemLock.__init__(self, name)
1050
1051        if scheduler is None:
1052            from twisted.internet import reactor
1053            scheduler = reactor
1054
1055        self._scheduler = scheduler
1056
1057    def deferUntilLocked(self, timeout=None):
1058        """
1059        Wait until we acquire this lock.  This method is not safe for
1060        concurrent use.
1061
1062        @type timeout: C{float} or C{int}
1063        @param timeout: the number of seconds after which to time out if the
1064            lock has not been acquired.
1065
1066        @return: a deferred which will callback when the lock is acquired, or
1067            errback with a L{TimeoutError} after timing out or an
1068            L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already
1069            been called and not successfully locked the file.
1070        """
1071        if self._tryLockCall is not None:
1072            return fail(
1073                AlreadyTryingToLockError(
1074                    "deferUntilLocked isn't safe for concurrent use."))
1075
1076        d = Deferred()
1077
1078        def _cancelLock():
1079            self._tryLockCall.cancel()
1080            self._tryLockCall = None
1081            self._timeoutCall = None
1082
1083            if self.lock():
1084                d.callback(None)
1085            else:
1086                d.errback(failure.Failure(
1087                        TimeoutError("Timed out aquiring lock: %s after %fs" % (
1088                                self.name,
1089                                timeout))))
1090
1091        def _tryLock():
1092            if self.lock():
1093                if self._timeoutCall is not None:
1094                    self._timeoutCall.cancel()
1095                    self._timeoutCall = None
1096
1097                self._tryLockCall = None
1098
1099                d.callback(None)
1100            else:
1101                if timeout is not None and self._timeoutCall is None:
1102                    self._timeoutCall = self._scheduler.callLater(
1103                        timeout, _cancelLock)
1104
1105                self._tryLockCall = self._scheduler.callLater(
1106                    self._interval, _tryLock)
1107
1108        _tryLock()
1109
1110        return d
1111
1112
1113__all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
1114           "AlreadyCalledError", "TimeoutError", "gatherResults",
1115           "maybeDeferred",
1116           "waitForDeferred", "deferredGenerator", "inlineCallbacks",
1117           "DeferredLock", "DeferredSemaphore", "DeferredQueue",
1118           "DeferredFilesystemLock", "AlreadyTryingToLockError",
1119          ]
Note: See TracBrowser for help on using the browser.