root/trunk/twisted/internet/defer.py

Revision 33281, 54.3 KB (checked in by jesstess, 6 months ago)

Merge addcallbacks-ret-5399

Author: zooko
Reviewer: jesstess
Fixes: #5399

Document the return values of addCallbacks and chainDeferred in
twisted.internet.defer.Deferred.

Line 
1# -*- test-case-name: twisted.test.test_defer,twisted.test.test_defgen,twisted.internet.test.test_inlinecb -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Support for results that aren't immediately available.
7
8Maintainer: Glyph Lefkowitz
9
10@var _NO_RESULT: The result used to represent the fact that there is no
11    result. B{Never ever ever use this as an actual result for a Deferred}.  You
12    have been warned.
13
14@var _CONTINUE: A marker left in L{Deferred.callbacks} to indicate a Deferred
15    chain.  Always accompanied by a Deferred instance in the args tuple pointing
16    at the Deferred which is chained to the Deferred which has this marker.
17"""
18
19import traceback
20import types
21import warnings
22from sys import exc_info
23
24# Twisted imports
25from twisted.python import log, failure, lockfile
26from twisted.python.util import unsignedID, mergeFunctionMetadata
27
28
29
30class AlreadyCalledError(Exception):
31    pass
32
33
34class CancelledError(Exception):
35    """
36    This error is raised by default when a L{Deferred} is cancelled.
37    """
38
39
40class TimeoutError(Exception):
41    """
42    This exception is deprecated.  It is used only by the deprecated
43    L{Deferred.setTimeout} method.
44    """
45
46
47
48def logError(err):
49    log.err(err)
50    return err
51
52
53
54def succeed(result):
55    """
56    Return a L{Deferred} that has already had C{.callback(result)} called.
57
58    This is useful when you're writing synchronous code to an
59    asynchronous interface: i.e., some code is calling you expecting a
60    L{Deferred} result, but you don't actually need to do anything
61    asynchronous. Just return C{defer.succeed(theResult)}.
62
63    See L{fail} for a version of this function that uses a failing
64    L{Deferred} rather than a successful one.
65
66    @param result: The result to give to the Deferred's 'callback'
67           method.
68
69    @rtype: L{Deferred}
70    """
71    d = Deferred()
72    d.callback(result)
73    return d
74
75
76
77def fail(result=None):
78    """
79    Return a L{Deferred} that has already had C{.errback(result)} called.
80
81    See L{succeed}'s docstring for rationale.
82
83    @param result: The same argument that L{Deferred.errback} takes.
84
85    @raise NoCurrentExceptionError: If C{result} is C{None} but there is no
86        current exception state.
87
88    @rtype: L{Deferred}
89    """
90    d = Deferred()
91    d.errback(result)
92    return d
93
94
95
96def execute(callable, *args, **kw):
97    """
98    Create a L{Deferred} from a callable and arguments.
99
100    Call the given function with the given arguments.  Return a L{Deferred}
101    which has been fired with its callback as the result of that invocation
102    or its C{errback} with a L{Failure} for the exception thrown.
103    """
104    try:
105        result = callable(*args, **kw)
106    except:
107        return fail()
108    else:
109        return succeed(result)
110
111
112
113def maybeDeferred(f, *args, **kw):
114    """
115    Invoke a function that may or may not return a L{Deferred}.
116
117    Call the given function with the given arguments.  If the returned
118    object is a L{Deferred}, return it.  If the returned object is a L{Failure},
119    wrap it with L{fail} and return it.  Otherwise, wrap it in L{succeed} and
120    return it.  If an exception is raised, convert it to a L{Failure}, wrap it
121    in L{fail}, and then return it.
122
123    @type f: Any callable
124    @param f: The callable to invoke
125
126    @param args: The arguments to pass to C{f}
127    @param kw: The keyword arguments to pass to C{f}
128
129    @rtype: L{Deferred}
130    @return: The result of the function call, wrapped in a L{Deferred} if
131    necessary.
132    """
133    try:
134        result = f(*args, **kw)
135    except:
136        return fail(failure.Failure(captureVars=Deferred.debug))
137
138    if isinstance(result, Deferred):
139        return result
140    elif isinstance(result, failure.Failure):
141        return fail(result)
142    else:
143        return succeed(result)
144
145
146
147def timeout(deferred):
148    deferred.errback(failure.Failure(TimeoutError("Callback timed out")))
149
150
151
152def passthru(arg):
153    return arg
154
155
156
157def setDebugging(on):
158    """
159    Enable or disable L{Deferred} debugging.
160
161    When debugging is on, the call stacks from creation and invocation are
162    recorded, and added to any L{AlreadyCalledErrors} we raise.
163    """
164    Deferred.debug=bool(on)
165
166
167
168def getDebugging():
169    """
170    Determine whether L{Deferred} debugging is enabled.
171    """
172    return Deferred.debug
173
174
175# See module docstring.
176_NO_RESULT = object()
177_CONTINUE = object()
178
179
180
181class Deferred:
182    """
183    This is a callback which will be put off until later.
184
185    Why do we want this? Well, in cases where a function in a threaded
186    program would block until it gets a result, for Twisted it should
187    not block. Instead, it should return a L{Deferred}.
188
189    This can be implemented for protocols that run over the network by
190    writing an asynchronous protocol for L{twisted.internet}. For methods
191    that come from outside packages that are not under our control, we use
192    threads (see for example L{twisted.enterprise.adbapi}).
193
194    For more information about Deferreds, see doc/core/howto/defer.html or
195    U{http://twistedmatrix.com/documents/current/core/howto/defer.html}
196
197    When creating a Deferred, you may provide a canceller function, which
198    will be called by d.cancel() to let you do any clean-up necessary if the
199    user decides not to wait for the deferred to complete.
200
201    @ivar called: A flag which is C{False} until either C{callback} or
202        C{errback} is called and afterwards always C{True}.
203    @type called: C{bool}
204
205    @ivar paused: A counter of how many unmatched C{pause} calls have been made
206        on this instance.
207    @type paused: C{int}
208
209    @ivar _suppressAlreadyCalled: A flag used by the cancellation mechanism
210        which is C{True} if the Deferred has no canceller and has been
211        cancelled, C{False} otherwise.  If C{True}, it can be expected that
212        C{callback} or C{errback} will eventually be called and the result
213        should be silently discarded.
214    @type _suppressAlreadyCalled: C{bool}
215
216    @ivar _runningCallbacks: A flag which is C{True} while this instance is
217        executing its callback chain, used to stop recursive execution of
218        L{_runCallbacks}
219    @type _runningCallbacks: C{bool}
220
221    @ivar _chainedTo: If this Deferred is waiting for the result of another
222        Deferred, this is a reference to the other Deferred.  Otherwise, C{None}.
223    """
224
225    called = False
226    paused = 0
227    _debugInfo = None
228    _suppressAlreadyCalled = False
229
230    # Are we currently running a user-installed callback?  Meant to prevent
231    # recursive running of callbacks when a reentrant call to add a callback is
232    # used.
233    _runningCallbacks = False
234
235    # Keep this class attribute for now, for compatibility with code that
236    # sets it directly.
237    debug = False
238
239    _chainedTo = None
240
241    def __init__(self, canceller=None):
242        """
243        Initialize a L{Deferred}.
244
245        @param canceller: a callable used to stop the pending operation
246            scheduled by this L{Deferred} when L{Deferred.cancel} is
247            invoked. The canceller will be passed the deferred whose
248            cancelation is requested (i.e., self).
249
250            If a canceller is not given, or does not invoke its argument's
251            C{callback} or C{errback} method, L{Deferred.cancel} will
252            invoke L{Deferred.errback} with a L{CancelledError}.
253
254            Note that if a canceller is not given, C{callback} or
255            C{errback} may still be invoked exactly once, even though
256            defer.py will have already invoked C{errback}, as described
257            above.  This allows clients of code which returns a L{Deferred}
258            to cancel it without requiring the L{Deferred} instantiator to
259            provide any specific implementation support for cancellation.
260            New in 10.1.
261
262        @type canceller: a 1-argument callable which takes a L{Deferred}. The
263            return result is ignored.
264        """
265        self.callbacks = []
266        self._canceller = canceller
267        if self.debug:
268            self._debugInfo = DebugInfo()
269            self._debugInfo.creator = traceback.format_stack()[:-1]
270
271
272    def addCallbacks(self, callback, errback=None,
273                     callbackArgs=None, callbackKeywords=None,
274                     errbackArgs=None, errbackKeywords=None):
275        """
276        Add a pair of callbacks (success and error) to this L{Deferred}.
277
278        These will be executed when the 'master' callback is run.
279
280        @return: C{self}.
281        @rtype: a L{Deferred}
282        """
283        assert callable(callback)
284        assert errback == None or callable(errback)
285        cbs = ((callback, callbackArgs, callbackKeywords),
286               (errback or (passthru), errbackArgs, errbackKeywords))
287        self.callbacks.append(cbs)
288
289        if self.called:
290            self._runCallbacks()
291        return self
292
293
294    def addCallback(self, callback, *args, **kw):
295        """
296        Convenience method for adding just a callback.
297
298        See L{addCallbacks}.
299        """
300        return self.addCallbacks(callback, callbackArgs=args,
301                                 callbackKeywords=kw)
302
303
304    def addErrback(self, errback, *args, **kw):
305        """
306        Convenience method for adding just an errback.
307
308        See L{addCallbacks}.
309        """
310        return self.addCallbacks(passthru, errback,
311                                 errbackArgs=args,
312                                 errbackKeywords=kw)
313
314
315    def addBoth(self, callback, *args, **kw):
316        """
317        Convenience method for adding a single callable as both a callback
318        and an errback.
319
320        See L{addCallbacks}.
321        """
322        return self.addCallbacks(callback, callback,
323                                 callbackArgs=args, errbackArgs=args,
324                                 callbackKeywords=kw, errbackKeywords=kw)
325
326
327    def chainDeferred(self, d):
328        """
329        Chain another L{Deferred} to this L{Deferred}.
330
331        This method adds callbacks to this L{Deferred} to call C{d}'s callback
332        or errback, as appropriate. It is merely a shorthand way of performing
333        the following::
334
335            self.addCallbacks(d.callback, d.errback)
336
337        When you chain a deferred d2 to another deferred d1 with
338        d1.chainDeferred(d2), you are making d2 participate in the callback
339        chain of d1. Thus any event that fires d1 will also fire d2.
340        However, the converse is B{not} true; if d2 is fired d1 will not be
341        affected.
342
343        Note that unlike the case where chaining is caused by a L{Deferred}
344        being returned from a callback, it is possible to cause the call
345        stack size limit to be exceeded by chaining many L{Deferred}s
346        together with C{chainDeferred}.
347
348        @return: C{self}.
349        @rtype: a L{Deferred}
350        """
351        d._chainedTo = self
352        return self.addCallbacks(d.callback, d.errback)
353
354
355    def callback(self, result):
356        """
357        Run all success callbacks that have been added to this L{Deferred}.
358
359        Each callback will have its result passed as the first argument to
360        the next; this way, the callbacks act as a 'processing chain'.  If
361        the success-callback returns a L{Failure} or raises an L{Exception},
362        processing will continue on the *error* callback chain.  If a
363        callback (or errback) returns another L{Deferred}, this L{Deferred}
364        will be chained to it (and further callbacks will not run until that
365        L{Deferred} has a result).
366        """
367        assert not isinstance(result, Deferred)
368        self._startRunCallbacks(result)
369
370
371    def errback(self, fail=None):
372        """
373        Run all error callbacks that have been added to this L{Deferred}.
374
375        Each callback will have its result passed as the first
376        argument to the next; this way, the callbacks act as a
377        'processing chain'. Also, if the error-callback returns a non-Failure
378        or doesn't raise an L{Exception}, processing will continue on the
379        *success*-callback chain.
380
381        If the argument that's passed to me is not a L{failure.Failure} instance,
382        it will be embedded in one. If no argument is passed, a
383        L{failure.Failure} instance will be created based on the current
384        traceback stack.
385
386        Passing a string as `fail' is deprecated, and will be punished with
387        a warning message.
388
389        @raise NoCurrentExceptionError: If C{fail} is C{None} but there is
390            no current exception state.
391        """
392        if fail is None:
393            fail = failure.Failure(captureVars=self.debug)
394        elif not isinstance(fail, failure.Failure):
395            fail = failure.Failure(fail)
396
397        self._startRunCallbacks(fail)
398
399
400    def pause(self):
401        """
402        Stop processing on a L{Deferred} until L{unpause}() is called.
403        """
404        self.paused = self.paused + 1
405
406
407    def unpause(self):
408        """
409        Process all callbacks made since L{pause}() was called.
410        """
411        self.paused = self.paused - 1
412        if self.paused:
413            return
414        if self.called:
415            self._runCallbacks()
416
417
418    def cancel(self):
419        """
420        Cancel this L{Deferred}.
421
422        If the L{Deferred} has not yet had its C{errback} or C{callback} method
423        invoked, call the canceller function provided to the constructor. If
424        that function does not invoke C{callback} or C{errback}, or if no
425        canceller function was provided, errback with L{CancelledError}.
426
427        If this L{Deferred} is waiting on another L{Deferred}, forward the
428        cancellation to the other L{Deferred}.
429        """
430        if not self.called:
431            canceller = self._canceller
432            if canceller:
433                canceller(self)
434            else:
435                # Arrange to eat the callback that will eventually be fired
436                # since there was no real canceller.
437                self._suppressAlreadyCalled = True
438            if not self.called:
439                # There was no canceller, or the canceller didn't call
440                # callback or errback.
441                self.errback(failure.Failure(CancelledError()))
442        elif isinstance(self.result, Deferred):
443            # Waiting for another deferred -- cancel it instead.
444            self.result.cancel()
445
446
447    def _startRunCallbacks(self, result):
448        if self.called:
449            if self._suppressAlreadyCalled:
450                self._suppressAlreadyCalled = False
451                return
452            if self.debug:
453                if self._debugInfo is None:
454                    self._debugInfo = DebugInfo()
455                extra = "\n" + self._debugInfo._getDebugTracebacks()
456                raise AlreadyCalledError(extra)
457            raise AlreadyCalledError
458        if self.debug:
459            if self._debugInfo is None:
460                self._debugInfo = DebugInfo()
461            self._debugInfo.invoker = traceback.format_stack()[:-2]
462        self.called = True
463        self.result = result
464        self._runCallbacks()
465
466
467    def _continuation(self):
468        """
469        Build a tuple of callback and errback with L{_continue} to be used by
470        L{_addContinue} and L{_removeContinue} on another Deferred.
471        """
472        return ((_CONTINUE, (self,), None),
473                (_CONTINUE, (self,), None))
474
475
476    def _runCallbacks(self):
477        """
478        Run the chain of callbacks once a result is available.
479
480        This consists of a simple loop over all of the callbacks, calling each
481        with the current result and making the current result equal to the
482        return value (or raised exception) of that call.
483
484        If C{self._runningCallbacks} is true, this loop won't run at all, since
485        it is already running above us on the call stack.  If C{self.paused} is
486        true, the loop also won't run, because that's what it means to be
487        paused.
488
489        The loop will terminate before processing all of the callbacks if a
490        C{Deferred} without a result is encountered.
491
492        If a C{Deferred} I{with} a result is encountered, that result is taken
493        and the loop proceeds.
494
495        @note: The implementation is complicated slightly by the fact that
496            chaining (associating two Deferreds with each other such that one
497            will wait for the result of the other, as happens when a Deferred is
498            returned from a callback on another Deferred) is supported
499            iteratively rather than recursively, to avoid running out of stack
500            frames when processing long chains.
501        """
502        if self._runningCallbacks:
503            # Don't recursively run callbacks
504            return
505
506        # Keep track of all the Deferreds encountered while propagating results
507        # up a chain.  The way a Deferred gets onto this stack is by having
508        # added its _continuation() to the callbacks list of a second Deferred
509        # and then that second Deferred being fired.  ie, if ever had _chainedTo
510        # set to something other than None, you might end up on this stack.
511        chain = [self]
512
513        while chain:
514            current = chain[-1]
515
516            if current.paused:
517                # This Deferred isn't going to produce a result at all.  All the
518                # Deferreds up the chain waiting on it will just have to...
519                # wait.
520                return
521
522            finished = True
523            current._chainedTo = None
524            while current.callbacks:
525                item = current.callbacks.pop(0)
526                callback, args, kw = item[
527                    isinstance(current.result, failure.Failure)]
528                args = args or ()
529                kw = kw or {}
530
531                # Avoid recursion if we can.
532                if callback is _CONTINUE:
533                    # Give the waiting Deferred our current result and then
534                    # forget about that result ourselves.
535                    chainee = args[0]
536                    chainee.result = current.result
537                    current.result = None
538                    # Making sure to update _debugInfo
539                    if current._debugInfo is not None:
540                        current._debugInfo.failResult = None
541                    chainee.paused -= 1
542                    chain.append(chainee)
543                    # Delay cleaning this Deferred and popping it from the chain
544                    # until after we've dealt with chainee.
545                    finished = False
546                    break
547
548                try:
549                    current._runningCallbacks = True
550                    try:
551                        current.result = callback(current.result, *args, **kw)
552                    finally:
553                        current._runningCallbacks = False
554                except:
555                    # Including full frame information in the Failure is quite
556                    # expensive, so we avoid it unless self.debug is set.
557                    current.result = failure.Failure(captureVars=self.debug)
558                else:
559                    if isinstance(current.result, Deferred):
560                        # The result is another Deferred.  If it has a result,
561                        # we can take it and keep going.
562                        resultResult = getattr(current.result, 'result', _NO_RESULT)
563                        if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused:
564                            # Nope, it didn't.  Pause and chain.
565                            current.pause()
566                            current._chainedTo = current.result
567                            # Note: current.result has no result, so it's not
568                            # running its callbacks right now.  Therefore we can
569                            # append to the callbacks list directly instead of
570                            # using addCallbacks.
571                            current.result.callbacks.append(current._continuation())
572                            break
573                        else:
574                            # Yep, it did.  Steal it.
575                            current.result.result = None
576                            # Make sure _debugInfo's failure state is updated.
577                            if current.result._debugInfo is not None:
578                                current.result._debugInfo.failResult = None
579                            current.result = resultResult
580
581            if finished:
582                # As much of the callback chain - perhaps all of it - as can be
583                # processed right now has been.  The current Deferred is waiting on
584                # another Deferred or for more callbacks.  Before finishing with it,
585                # make sure its _debugInfo is in the proper state.
586                if isinstance(current.result, failure.Failure):
587                    # Stash the Failure in the _debugInfo for unhandled error
588                    # reporting.
589                    current.result.cleanFailure()
590                    if current._debugInfo is None:
591                        current._debugInfo = DebugInfo()
592                    current._debugInfo.failResult = current.result
593                else:
594                    # Clear out any Failure in the _debugInfo, since the result
595                    # is no longer a Failure.
596                    if current._debugInfo is not None:
597                        current._debugInfo.failResult = None
598
599                # This Deferred is done, pop it from the chain and move back up
600                # to the Deferred which supplied us with our result.
601                chain.pop()
602
603
604    def __str__(self):
605        """
606        Return a string representation of this C{Deferred}.
607        """
608        cname = self.__class__.__name__
609        result = getattr(self, 'result', _NO_RESULT)
610        myID = hex(unsignedID(self))
611        if self._chainedTo is not None:
612            result = ' waiting on Deferred at %s' % (hex(unsignedID(self._chainedTo)),)
613        elif result is _NO_RESULT:
614            result = ''
615        else:
616            result = ' current result: %r' % (result,)
617        return "<%s at %s%s>" % (cname, myID, result)
618    __repr__ = __str__
619
620
621
622class DebugInfo:
623    """
624    Deferred debug helper.
625    """
626
627    failResult = None
628
629    def _getDebugTracebacks(self):
630        info = ''
631        if hasattr(self, "creator"):
632            info += " C: Deferred was created:\n C:"
633            info += "".join(self.creator).rstrip().replace("\n","\n C:")
634            info += "\n"
635        if hasattr(self, "invoker"):
636            info += " I: First Invoker was:\n I:"
637            info += "".join(self.invoker).rstrip().replace("\n","\n I:")
638            info += "\n"
639        return info
640
641
642    def __del__(self):
643        """
644        Print tracebacks and die.
645
646        If the *last* (and I do mean *last*) callback leaves me in an error
647        state, print a traceback (if said errback is a L{Failure}).
648        """
649        if self.failResult is not None:
650            log.msg("Unhandled error in Deferred:", isError=True)
651            debugInfo = self._getDebugTracebacks()
652            if debugInfo != '':
653                log.msg("(debug: " + debugInfo + ")", isError=True)
654            log.err(self.failResult)
655
656
657
658class FirstError(Exception):
659    """
660    First error to occur in a L{DeferredList} if C{fireOnOneErrback} is set.
661
662    @ivar subFailure: The L{Failure} that occurred.
663    @type subFailure: L{Failure}
664
665    @ivar index: The index of the L{Deferred} in the L{DeferredList} where
666        it happened.
667    @type index: C{int}
668    """
669    def __init__(self, failure, index):
670        Exception.__init__(self, failure, index)
671        self.subFailure = failure
672        self.index = index
673
674
675    def __repr__(self):
676        """
677        The I{repr} of L{FirstError} instances includes the repr of the
678        wrapped failure's exception and the index of the L{FirstError}.
679        """
680        return 'FirstError[#%d, %r]' % (self.index, self.subFailure.value)
681
682
683    def __str__(self):
684        """
685        The I{str} of L{FirstError} instances includes the I{str} of the
686        entire wrapped failure (including its traceback and exception) and
687        the index of the L{FirstError}.
688        """
689        return 'FirstError[#%d, %s]' % (self.index, self.subFailure)
690
691
692    def __cmp__(self, other):
693        """
694        Comparison between L{FirstError} and other L{FirstError} instances
695        is defined as the comparison of the index and sub-failure of each
696        instance.  L{FirstError} instances don't compare equal to anything
697        that isn't a L{FirstError} instance.
698
699        @since: 8.2
700        """
701        if isinstance(other, FirstError):
702            return cmp(
703                (self.index, self.subFailure),
704                (other.index, other.subFailure))
705        return -1
706
707
708
709class DeferredList(Deferred):
710    """
711    L{DeferredList} is a tool for collecting the results of several Deferreds.
712
713    This tracks a list of L{Deferred}s for their results, and makes a single
714    callback when they have all completed.  By default, the ultimate result is a
715    list of (success, result) tuples, 'success' being a boolean.
716    L{DeferredList} exposes the same API that L{Deferred} does, so callbacks and
717    errbacks can be added to it in the same way.
718
719    L{DeferredList} is implemented by adding callbacks and errbacks to each
720    L{Deferred} in the list passed to it.  This means callbacks and errbacks
721    added to the Deferreds before they are passed to L{DeferredList} will change
722    the result that L{DeferredList} sees (i.e., L{DeferredList} is not special).
723    Callbacks and errbacks can also be added to the Deferreds after they are
724    passed to L{DeferredList} and L{DeferredList} may change the result that
725    they see.
726
727    See the documentation for the C{__init__} arguments for more information.
728    """
729
730    fireOnOneCallback = False
731    fireOnOneErrback = False
732
733    def __init__(self, deferredList, fireOnOneCallback=False,
734                 fireOnOneErrback=False, consumeErrors=False):
735        """
736        Initialize a DeferredList.
737
738        @param deferredList: The list of deferreds to track.
739        @type deferredList:  C{list} of L{Deferred}s
740
741        @param fireOnOneCallback: (keyword param) a flag indicating that this
742            L{DeferredList} will fire when the first L{Deferred} in
743            C{deferredList} fires with a non-failure result without waiting for
744            any of the other Deferreds.  When this flag is set, the DeferredList
745            will fire with a two-tuple: the first element is the result of the
746            Deferred which fired; the second element is the index in
747            C{deferredList} of that Deferred.
748        @type fireOnOneCallback: C{bool}
749
750        @param fireOnOneErrback: (keyword param) a flag indicating that this
751            L{DeferredList} will fire when the first L{Deferred} in
752            C{deferredList} fires with a failure result without waiting for any
753            of the other Deferreds.  When this flag is set, if a Deferred in the
754            list errbacks, the DeferredList will errback with a L{FirstError}
755            failure wrapping the failure of that Deferred.
756        @type fireOnOneErrback: C{bool}
757
758        @param consumeErrors: (keyword param) a flag indicating that failures in
759            any of the included L{Deferreds} should not be propagated to
760            errbacks added to the individual L{Deferreds} after this
761            L{DeferredList} is constructed.  After constructing the
762            L{DeferredList}, any errors in the individual L{Deferred}s will be
763            converted to a callback result of C{None}.  This is useful to
764            prevent spurious 'Unhandled error in Deferred' messages from being
765            logged.  This does not prevent C{fireOnOneErrback} from working.
766        @type consumeErrors: C{bool}
767        """
768        self.resultList = [None] * len(deferredList)
769        Deferred.__init__(self)
770        if len(deferredList) == 0 and not fireOnOneCallback:
771            self.callback(self.resultList)
772
773        # These flags need to be set *before* attaching callbacks to the
774        # deferreds, because the callbacks use these flags, and will run
775        # synchronously if any of the deferreds are already fired.
776        self.fireOnOneCallback = fireOnOneCallback
777        self.fireOnOneErrback = fireOnOneErrback
778        self.consumeErrors = consumeErrors
779        self.finishedCount = 0
780
781        index = 0
782        for deferred in deferredList:
783            deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
784                                  callbackArgs=(index,SUCCESS),
785                                  errbackArgs=(index,FAILURE))
786            index = index + 1
787
788
789    def _cbDeferred(self, result, index, succeeded):
790        """
791        (internal) Callback for when one of my deferreds fires.
792        """
793        self.resultList[index] = (succeeded, result)
794
795        self.finishedCount += 1
796        if not self.called:
797            if succeeded == SUCCESS and self.fireOnOneCallback:
798                self.callback((result, index))
799            elif succeeded == FAILURE and self.fireOnOneErrback:
800                self.errback(failure.Failure(FirstError(result, index)))
801            elif self.finishedCount == len(self.resultList):
802                self.callback(self.resultList)
803
804        if succeeded == FAILURE and self.consumeErrors:
805            result = None
806
807        return result
808
809
810
811def _parseDListResult(l, fireOnOneErrback=False):
812    if __debug__:
813        for success, value in l:
814            assert success
815    return [x[1] for x in l]
816
817
818
819def gatherResults(deferredList, consumeErrors=False):
820    """
821    Returns, via a L{Deferred}, a list with the results of the given
822    L{Deferred}s - in effect, a "join" of multiple deferred operations.
823
824    The returned L{Deferred} will fire when I{all} of the provided L{Deferred}s
825    have fired, or when any one of them has failed.
826
827    This differs from L{DeferredList} in that you don't need to parse
828    the result for success/failure.
829
830    @type deferredList:  C{list} of L{Deferred}s
831
832    @param consumeErrors: (keyword param) a flag, defaulting to False,
833        indicating that failures in any of the given L{Deferreds} should not be
834        propagated to errbacks added to the individual L{Deferreds} after this
835        L{gatherResults} invocation.  Any such errors in the individual
836        L{Deferred}s will be converted to a callback result of C{None}.  This
837        is useful to prevent spurious 'Unhandled error in Deferred' messages
838        from being logged.  This parameter is available since 11.1.0.
839    @type consumeErrors: C{bool}
840    """
841    d = DeferredList(deferredList, fireOnOneErrback=True,
842                                   consumeErrors=consumeErrors)
843    d.addCallback(_parseDListResult)
844    return d
845
846
847
848# Constants for use with DeferredList
849
850SUCCESS = True
851FAILURE = False
852
853
854
855## deferredGenerator
856
857class waitForDeferred:
858    """
859    See L{deferredGenerator}.
860    """
861
862    def __init__(self, d):
863        if not isinstance(d, Deferred):
864            raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,))
865        self.d = d
866
867
868    def getResult(self):
869        if isinstance(self.result, failure.Failure):
870            self.result.raiseException()
871        return self.result
872
873
874
875def _deferGenerator(g, deferred):
876    """
877    See L{deferredGenerator}.
878    """
879    result = None
880
881    # This function is complicated by the need to prevent unbounded recursion
882    # arising from repeatedly yielding immediately ready deferreds.  This while
883    # loop and the waiting variable solve that by manually unfolding the
884    # recursion.
885
886    waiting = [True, # defgen is waiting for result?
887               None] # result
888
889    while 1:
890        try:
891            result = g.next()
892        except StopIteration:
893            deferred.callback(result)
894            return deferred
895        except:
896            deferred.errback()
897            return deferred
898
899        # Deferred.callback(Deferred) raises an error; we catch this case
900        # early here and give a nicer error message to the user in case
901        # they yield a Deferred.
902        if isinstance(result, Deferred):
903            return fail(TypeError("Yield waitForDeferred(d), not d!"))
904
905        if isinstance(result, waitForDeferred):
906            # a waitForDeferred was yielded, get the result.
907            # Pass result in so it don't get changed going around the loop
908            # This isn't a problem for waiting, as it's only reused if
909            # gotResult has already been executed.
910            def gotResult(r, result=result):
911                result.result = r
912                if waiting[0]:
913                    waiting[0] = False
914                    waiting[1] = r
915                else:
916                    _deferGenerator(g, deferred)
917            result.d.addBoth(gotResult)
918            if waiting[0]:
919                # Haven't called back yet, set flag so that we get reinvoked
920                # and return from the loop
921                waiting[0] = False
922                return deferred
923            # Reset waiting to initial values for next loop
924            waiting[0] = True
925            waiting[1] = None
926
927            result = None
928
929
930
931def deferredGenerator(f):
932    """
933    deferredGenerator and waitForDeferred help you write L{Deferred}-using code
934    that looks like a regular sequential function. If your code has a minimum
935    requirement of Python 2.5, consider the use of L{inlineCallbacks} instead,
936    which can accomplish the same thing in a more concise manner.
937
938    There are two important functions involved: L{waitForDeferred}, and
939    L{deferredGenerator}.  They are used together, like this::
940
941        @deferredGenerator
942        def thingummy():
943            thing = waitForDeferred(makeSomeRequestResultingInDeferred())
944            yield thing
945            thing = thing.getResult()
946            print thing #the result! hoorj!
947
948    L{waitForDeferred} returns something that you should immediately yield; when
949    your generator is resumed, calling C{thing.getResult()} will either give you
950    the result of the L{Deferred} if it was a success, or raise an exception if it
951    was a failure.  Calling C{getResult} is B{absolutely mandatory}.  If you do
952    not call it, I{your program will not work}.
953
954    L{deferredGenerator} takes one of these waitForDeferred-using generator
955    functions and converts it into a function that returns a L{Deferred}. The
956    result of the L{Deferred} will be the last value that your generator yielded
957    unless the last value is a L{waitForDeferred} instance, in which case the
958    result will be C{None}.  If the function raises an unhandled exception, the
959    L{Deferred} will errback instead.  Remember that C{return result} won't work;
960    use C{yield result; return} in place of that.
961
962    Note that not yielding anything from your generator will make the L{Deferred}
963    result in C{None}. Yielding a L{Deferred} from your generator is also an error
964    condition; always yield C{waitForDeferred(d)} instead.
965
966    The L{Deferred} returned from your deferred generator may also errback if your
967    generator raised an exception.  For example::
968
969        @deferredGenerator
970        def thingummy():
971            thing = waitForDeferred(makeSomeRequestResultingInDeferred())
972            yield thing
973            thing = thing.getResult()
974            if thing == 'I love Twisted':
975                # will become the result of the Deferred
976                yield 'TWISTED IS GREAT!'
977                return
978            else:
979                # will trigger an errback
980                raise Exception('DESTROY ALL LIFE')
981
982    Put succinctly, these functions connect deferred-using code with this 'fake
983    blocking' style in both directions: L{waitForDeferred} converts from a
984    L{Deferred} to the 'blocking' style, and L{deferredGenerator} converts from the
985    'blocking' style to a L{Deferred}.
986    """
987
988    def unwindGenerator(*args, **kwargs):
989        return _deferGenerator(f(*args, **kwargs), Deferred())
990    return mergeFunctionMetadata(f, unwindGenerator)
991
992
993## inlineCallbacks
994
995# BaseException is only in Py 2.5.
996try:
997    BaseException
998except NameError:
999    BaseException=Exception
1000
1001
1002
1003class _DefGen_Return(BaseException):
1004    def __init__(self, value):
1005        self.value = value
1006
1007
1008
1009def returnValue(val):
1010    """
1011    Return val from a L{inlineCallbacks} generator.
1012
1013    Note: this is currently implemented by raising an exception
1014    derived from L{BaseException}.  You might want to change any
1015    'except:' clauses to an 'except Exception:' clause so as not to
1016    catch this exception.
1017
1018    Also: while this function currently will work when called from
1019    within arbitrary functions called from within the generator, do
1020    not rely upon this behavior.
1021    """
1022    raise _DefGen_Return(val)
1023
1024
1025
1026def _inlineCallbacks(result, g, deferred):
1027    """
1028    See L{inlineCallbacks}.
1029    """
1030    # This function is complicated by the need to prevent unbounded recursion
1031    # arising from repeatedly yielding immediately ready deferreds.  This while
1032    # loop and the waiting variable solve that by manually unfolding the
1033    # recursion.
1034
1035    waiting = [True, # waiting for result?
1036               None] # result
1037
1038    while 1:
1039        try:
1040            # Send the last result back as the result of the yield expression.
1041            isFailure = isinstance(result, failure.Failure)
1042            if isFailure:
1043                result = result.throwExceptionIntoGenerator(g)
1044            else:
1045                result = g.send(result)
1046        except StopIteration:
1047            # fell off the end, or "return" statement
1048            deferred.callback(None)
1049            return deferred
1050        except _DefGen_Return, e:
1051            # returnValue() was called; time to give a result to the original
1052            # Deferred.  First though, let's try to identify the potentially
1053            # confusing situation which results when returnValue() is
1054            # accidentally invoked from a different function, one that wasn't
1055            # decorated with @inlineCallbacks.
1056
1057            # The traceback starts in this frame (the one for
1058            # _inlineCallbacks); the next one down should be the application
1059            # code.
1060            appCodeTrace = exc_info()[2].tb_next
1061            if isFailure:
1062                # If we invoked this generator frame by throwing an exception
1063                # into it, then throwExceptionIntoGenerator will consume an
1064                # additional stack frame itself, so we need to skip that too.
1065                appCodeTrace = appCodeTrace.tb_next
1066            # Now that we've identified the frame being exited by the
1067            # exception, let's figure out if returnValue was called from it
1068            # directly.  returnValue itself consumes a stack frame, so the
1069            # application code will have a tb_next, but it will *not* have a
1070            # second tb_next.
1071            if appCodeTrace.tb_next.tb_next:
1072                # If returnValue was invoked non-local to the frame which it is
1073                # exiting, identify the frame that ultimately invoked
1074                # returnValue so that we can warn the user, as this behavior is
1075                # confusing.
1076                ultimateTrace = appCodeTrace
1077                while ultimateTrace.tb_next.tb_next:
1078                    ultimateTrace = ultimateTrace.tb_next
1079                filename = ultimateTrace.tb_frame.f_code.co_filename
1080                lineno = ultimateTrace.tb_lineno
1081                warnings.warn_explicit(
1082                    "returnValue() in %r causing %r to exit: "
1083                    "returnValue should only be invoked by functions decorated "
1084                    "with inlineCallbacks" % (
1085                        ultimateTrace.tb_frame.f_code.co_name,
1086                        appCodeTrace.tb_frame.f_code.co_name),
1087                    DeprecationWarning, filename, lineno)
1088            deferred.callback(e.value)
1089            return deferred
1090        except:
1091            deferred.errback()
1092            return deferred
1093
1094        if isinstance(result, Deferred):
1095            # a deferred was yielded, get the result.
1096            def gotResult(r):
1097                if waiting[0]:
1098                    waiting[0] = False
1099                    waiting[1] = r
1100                else:
1101                    _inlineCallbacks(r, g, deferred)
1102
1103            result.addBoth(gotResult)
1104            if waiting[0]:
1105                # Haven't called back yet, set flag so that we get reinvoked
1106                # and return from the loop
1107                waiting[0] = False
1108                return deferred
1109
1110            result = waiting[1]
1111            # Reset waiting to initial values for next loop.  gotResult uses
1112            # waiting, but this isn't a problem because gotResult is only
1113            # executed once, and if it hasn't been executed yet, the return
1114            # branch above would have been taken.
1115
1116
1117            waiting[0] = True
1118            waiting[1] = None
1119
1120
1121    return deferred
1122
1123
1124
1125def inlineCallbacks(f):
1126    """
1127    WARNING: this function will not work in Python 2.4 and earlier!
1128
1129    inlineCallbacks helps you write Deferred-using code that looks like a
1130    regular sequential function. This function uses features of Python 2.5
1131    generators.  If you need to be compatible with Python 2.4 or before, use
1132    the L{deferredGenerator} function instead, which accomplishes the same
1133    thing, but with somewhat more boilerplate.  For example::
1134
1135        @inlineCallBacks
1136        def thingummy():
1137            thing = yield makeSomeRequestResultingInDeferred()
1138            print thing #the result! hoorj!
1139
1140    When you call anything that results in a L{Deferred}, you can simply yield it;
1141    your generator will automatically be resumed when the Deferred's result is
1142    available. The generator will be sent the result of the L{Deferred} with the
1143    'send' method on generators, or if the result was a failure, 'throw'.
1144
1145    Things that are not L{Deferred}s may also be yielded, and your generator
1146    will be resumed with the same object sent back. This means C{yield}
1147    performs an operation roughly equivalent to L{maybeDeferred}.
1148
1149    Your inlineCallbacks-enabled generator will return a L{Deferred} object, which
1150    will result in the return value of the generator (or will fail with a
1151    failure object if your generator raises an unhandled exception). Note that
1152    you can't use C{return result} to return a value; use C{returnValue(result)}
1153    instead. Falling off the end of the generator, or simply using C{return}
1154    will cause the L{Deferred} to have a result of C{None}.
1155
1156    Be aware that L{returnValue} will not accept a L{Deferred} as a parameter.
1157    If you believe the thing you'd like to return could be a L{Deferred}, do
1158    this::
1159
1160        result = yield result
1161        returnValue(result)
1162
1163    The L{Deferred} returned from your deferred generator may errback if your
1164    generator raised an exception::
1165
1166        @inlineCallbacks
1167        def thingummy():
1168            thing = yield makeSomeRequestResultingInDeferred()
1169            if thing == 'I love Twisted':
1170                # will become the result of the Deferred
1171                returnValue('TWISTED IS GREAT!')
1172            else:
1173                # will trigger an errback
1174                raise Exception('DESTROY ALL LIFE')
1175    """
1176    def unwindGenerator(*args, **kwargs):
1177        try:
1178            gen = f(*args, **kwargs)
1179        except _DefGen_Return:
1180            raise TypeError(
1181                "inlineCallbacks requires %r to produce a generator; instead"
1182                "caught returnValue being used in a non-generator" % (f,))
1183        if not isinstance(gen, types.GeneratorType):
1184            raise TypeError(
1185                "inlineCallbacks requires %r to produce a generator; "
1186                "instead got %r" % (f, gen))
1187        return _inlineCallbacks(None, gen, Deferred())
1188    return mergeFunctionMetadata(f, unwindGenerator)
1189
1190
1191## DeferredLock/DeferredQueue
1192
1193class _ConcurrencyPrimitive(object):
1194    def __init__(self):
1195        self.waiting = []
1196
1197
1198    def _releaseAndReturn(self, r):
1199        self.release()
1200        return r
1201
1202
1203    def run(*args, **kwargs):
1204        """
1205        Acquire, run, release.
1206
1207        This function takes a callable as its first argument and any
1208        number of other positional and keyword arguments.  When the
1209        lock or semaphore is acquired, the callable will be invoked
1210        with those arguments.
1211
1212        The callable may return a L{Deferred}; if it does, the lock or
1213        semaphore won't be released until that L{Deferred} fires.
1214
1215        @return: L{Deferred} of function result.
1216        """
1217        if len(args) < 2:
1218            if not args:
1219                raise TypeError("run() takes at least 2 arguments, none given.")
1220            raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
1221                args[0].__class__.__name__,))
1222        self, f = args[:2]
1223        args = args[2:]
1224
1225        def execute(ignoredResult):
1226            d = maybeDeferred(f, *args, **kwargs)
1227            d.addBoth(self._releaseAndReturn)
1228            return d
1229
1230        d = self.acquire()
1231        d.addCallback(execute)
1232        return d
1233
1234
1235
1236class DeferredLock(_ConcurrencyPrimitive):
1237    """
1238    A lock for event driven systems.
1239
1240    @ivar locked: C{True} when this Lock has been acquired, false at all other
1241        times.  Do not change this value, but it is useful to examine for the
1242        equivalent of a "non-blocking" acquisition.
1243    """
1244
1245    locked = False
1246
1247
1248    def _cancelAcquire(self, d):
1249        """
1250        Remove a deferred d from our waiting list, as the deferred has been
1251        canceled.
1252
1253        Note: We do not need to wrap this in a try/except to catch d not
1254        being in self.waiting because this canceller will not be called if
1255        d has fired. release() pops a deferred out of self.waiting and
1256        calls it, so the canceller will no longer be called.
1257
1258        @param d: The deferred that has been canceled.
1259        """
1260        self.waiting.remove(d)
1261
1262
1263    def acquire(self):
1264        """
1265        Attempt to acquire the lock.  Returns a L{Deferred} that fires on
1266        lock acquisition with the L{DeferredLock} as the value.  If the lock
1267        is locked, then the Deferred is placed at the end of a waiting list.
1268
1269        @return: a L{Deferred} which fires on lock acquisition.
1270        @rtype: a L{Deferred}
1271        """
1272        d = Deferred(canceller=self._cancelAcquire)
1273        if self.locked:
1274            self.waiting.append(d)
1275        else:
1276            self.locked = True
1277            d.callback(self)
1278        return d
1279
1280
1281    def release(self):
1282        """
1283        Release the lock.  If there is a waiting list, then the first
1284        L{Deferred} in that waiting list will be called back.
1285
1286        Should be called by whomever did the L{acquire}() when the shared
1287        resource is free.
1288        """
1289        assert self.locked, "Tried to release an unlocked lock"
1290        self.locked = False
1291        if self.waiting:
1292            # someone is waiting to acquire lock
1293            self.locked = True
1294            d = self.waiting.pop(0)
1295            d.callback(self)
1296
1297
1298
1299class DeferredSemaphore(_ConcurrencyPrimitive):
1300    """
1301    A semaphore for event driven systems.
1302
1303    @ivar tokens: At most this many users may acquire this semaphore at
1304        once.
1305    @type tokens: C{int}
1306
1307    @ivar limit: The difference between C{tokens} and the number of users
1308        which have currently acquired this semaphore.
1309    @type limit: C{int}
1310    """
1311
1312    def __init__(self, tokens):
1313        _ConcurrencyPrimitive.__init__(self)
1314        if tokens < 1:
1315            raise ValueError("DeferredSemaphore requires tokens >= 1")
1316        self.tokens = tokens
1317        self.limit = tokens
1318
1319
1320    def _cancelAcquire(self, d):
1321        """
1322        Remove a deferred d from our waiting list, as the deferred has been
1323        canceled.
1324
1325        Note: We do not need to wrap this in a try/except to catch d not
1326        being in self.waiting because this canceller will not be called if
1327        d has fired. release() pops a deferred out of self.waiting and
1328        calls it, so the canceller will no longer be called.
1329
1330        @param d: The deferred that has been canceled.
1331        """
1332        self.waiting.remove(d)
1333
1334
1335    def acquire(self):
1336        """
1337        Attempt to acquire the token.
1338
1339        @return: a L{Deferred} which fires on token acquisition.
1340        """
1341        assert self.tokens >= 0, "Internal inconsistency??  tokens should never be negative"
1342        d = Deferred(canceller=self._cancelAcquire)
1343        if not self.tokens:
1344            self.waiting.append(d)
1345        else:
1346            self.tokens = self.tokens - 1
1347            d.callback(self)
1348        return d
1349
1350
1351    def release(self):
1352        """
1353        Release the token.
1354
1355        Should be called by whoever did the L{acquire}() when the shared
1356        resource is free.
1357        """
1358        assert self.tokens < self.limit, "Someone released me too many times: too many tokens!"
1359        self.tokens = self.tokens + 1
1360        if self.waiting:
1361            # someone is waiting to acquire token
1362            self.tokens = self.tokens - 1
1363            d = self.waiting.pop(0)
1364            d.callback(self)
1365
1366
1367
1368class QueueOverflow(Exception):
1369    pass
1370
1371
1372
1373class QueueUnderflow(Exception):
1374    pass
1375
1376
1377
1378class DeferredQueue(object):
1379    """
1380    An event driven queue.
1381
1382    Objects may be added as usual to this queue.  When an attempt is
1383    made to retrieve an object when the queue is empty, a L{Deferred} is
1384    returned which will fire when an object becomes available.
1385
1386    @ivar size: The maximum number of objects to allow into the queue
1387    at a time.  When an attempt to add a new object would exceed this
1388    limit, L{QueueOverflow} is raised synchronously.  C{None} for no limit.
1389
1390    @ivar backlog: The maximum number of L{Deferred} gets to allow at
1391    one time.  When an attempt is made to get an object which would
1392    exceed this limit, L{QueueUnderflow} is raised synchronously.  C{None}
1393    for no limit.
1394    """
1395
1396    def __init__(self, size=None, backlog=None):
1397        self.waiting = []
1398        self.pending = []
1399        self.size = size
1400        self.backlog = backlog
1401
1402
1403    def _cancelGet(self, d):
1404        """
1405        Remove a deferred d from our waiting list, as the deferred has been
1406        canceled.
1407
1408        Note: We do not need to wrap this in a try/except to catch d not
1409        being in self.waiting because this canceller will not be called if
1410        d has fired. put() pops a deferred out of self.waiting and calls
1411        it, so the canceller will no longer be called.
1412
1413        @param d: The deferred that has been canceled.
1414        """
1415        self.waiting.remove(d)
1416
1417
1418    def put(self, obj):
1419        """
1420        Add an object to this queue.
1421
1422        @raise QueueOverflow: Too many objects are in this queue.
1423        """
1424        if self.waiting:
1425            self.waiting.pop(0).callback(obj)
1426        elif self.size is None or len(self.pending) < self.size:
1427            self.pending.append(obj)
1428        else:
1429            raise QueueOverflow()
1430
1431
1432    def get(self):
1433        """
1434        Attempt to retrieve and remove an object from the queue.
1435
1436        @return: a L{Deferred} which fires with the next object available in
1437        the queue.
1438
1439        @raise QueueUnderflow: Too many (more than C{backlog})
1440        L{Deferred}s are already waiting for an object from this queue.
1441        """
1442        if self.pending:
1443            return succeed(self.pending.pop(0))
1444        elif self.backlog is None or len(self.waiting) < self.backlog:
1445            d = Deferred(canceller=self._cancelGet)
1446            self.waiting.append(d)
1447            return d
1448        else:
1449            raise QueueUnderflow()
1450
1451
1452
1453class AlreadyTryingToLockError(Exception):
1454    """
1455    Raised when L{DeferredFilesystemLock.deferUntilLocked} is called twice on a
1456    single L{DeferredFilesystemLock}.
1457    """
1458
1459
1460
1461class DeferredFilesystemLock(lockfile.FilesystemLock):
1462    """
1463    A L{FilesystemLock} that allows for a L{Deferred} to be fired when the lock is
1464    acquired.
1465
1466    @ivar _scheduler: The object in charge of scheduling retries. In this
1467        implementation this is parameterized for testing.
1468
1469    @ivar _interval: The retry interval for an L{IReactorTime} based scheduler.
1470
1471    @ivar _tryLockCall: A L{DelayedCall} based on C{_interval} that will manage
1472        the next retry for aquiring the lock.
1473
1474    @ivar _timeoutCall: A L{DelayedCall} based on C{deferUntilLocked}'s timeout
1475        argument.  This is in charge of timing out our attempt to acquire the
1476        lock.
1477    """
1478    _interval = 1
1479    _tryLockCall = None
1480    _timeoutCall = None
1481
1482
1483    def __init__(self, name, scheduler=None):
1484        """
1485        @param name: The name of the lock to acquire
1486        @param scheduler: An object which provides L{IReactorTime}
1487        """
1488        lockfile.FilesystemLock.__init__(self, name)
1489
1490        if scheduler is None:
1491            from twisted.internet import reactor
1492            scheduler = reactor
1493
1494        self._scheduler = scheduler
1495
1496
1497    def deferUntilLocked(self, timeout=None):
1498        """
1499        Wait until we acquire this lock.  This method is not safe for
1500        concurrent use.
1501
1502        @type timeout: C{float} or C{int}
1503        @param timeout: the number of seconds after which to time out if the
1504            lock has not been acquired.
1505
1506        @return: a L{Deferred} which will callback when the lock is acquired, or
1507            errback with a L{TimeoutError} after timing out or an
1508            L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already
1509            been called and not successfully locked the file.
1510        """
1511        if self._tryLockCall is not None:
1512            return fail(
1513                AlreadyTryingToLockError(
1514                    "deferUntilLocked isn't safe for concurrent use."))
1515
1516        d = Deferred()
1517
1518        def _cancelLock():
1519            self._tryLockCall.cancel()
1520            self._tryLockCall = None
1521            self._timeoutCall = None
1522
1523            if self.lock():
1524                d.callback(None)
1525            else:
1526                d.errback(failure.Failure(
1527                        TimeoutError("Timed out aquiring lock: %s after %fs" % (
1528                                self.name,
1529                                timeout))))
1530
1531        def _tryLock():
1532            if self.lock():
1533                if self._timeoutCall is not None:
1534                    self._timeoutCall.cancel()
1535                    self._timeoutCall = None
1536
1537                self._tryLockCall = None
1538
1539                d.callback(None)
1540            else:
1541                if timeout is not None and self._timeoutCall is None:
1542                    self._timeoutCall = self._scheduler.callLater(
1543                        timeout, _cancelLock)
1544
1545                self._tryLockCall = self._scheduler.callLater(
1546                    self._interval, _tryLock)
1547
1548        _tryLock()
1549
1550        return d
1551
1552
1553
1554__all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS",
1555           "AlreadyCalledError", "TimeoutError", "gatherResults",
1556           "maybeDeferred",
1557           "waitForDeferred", "deferredGenerator", "inlineCallbacks",
1558           "returnValue",
1559           "DeferredLock", "DeferredSemaphore", "DeferredQueue",
1560           "DeferredFilesystemLock", "AlreadyTryingToLockError",
1561          ]
Note: See TracBrowser for help on using the browser.