root / trunk / twisted / internet / threads.py

Revision 26105, 3.4 kB (checked in by exarkun, 5 months ago)

Merge threaded-resolver-reactor-3591

Author: exarkun, itamarst
Reviewer: mwhudson
Fixes: #3591

Add IReactorThreads.getThreadPool and convert ThreadedResolver to use it and
the deferToThreadPool helper so that when doing DNS lookups, only the reactor
passed to ThreadedResolver is used, not the global reactor (in case they happen
to be different).

Line 
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Extended thread dispatching support.
6
7 For basic support see reactor threading API docs.
8
9 Maintainer: Itamar Shtull-Trauring
10 """
11
12 import Queue
13
14 from twisted.python import failure
15 from twisted.internet import defer
16
17
18 def deferToThreadPool(reactor, threadpool, f, *args, **kwargs):
19     """
20     Call the function C{f} using a thread from the given threadpool and return
21     the result as a Deferred.
22
23     This function is only used by client code which is maintaining its own
24     threadpool.  To run a function in the reactor's threadpool, use
25     C{deferToThread}.
26
27     @param reactor: The reactor in whose main thread the Deferred will be
28         invoked.
29
30     @param threadpool: An object which supports the C{callInThreadWithCallback}
31         method of C{twisted.python.threadpool.ThreadPool}.
32
33     @param f: The function to call.
34     @param *args: positional arguments to pass to f.
35     @param **kwargs: keyword arguments to pass to f.
36
37     @return: A Deferred which fires a callback with the result of f, or an
38         errback with a L{twisted.python.failure.Failure} if f throws an
39         exception.
40     """
41     d = defer.Deferred()
42
43     def onResult(success, result):
44         if success:
45             reactor.callFromThread(d.callback, result)
46         else:
47             reactor.callFromThread(d.errback, result)
48
49     threadpool.callInThreadWithCallback(onResult, f, *args, **kwargs)
50
51     return d
52
53
54 def deferToThread(f, *args, **kwargs):
55     """
56     Run a function in a thread and return the result as a Deferred.
57
58     @param f: The function to call.
59     @param *args: positional arguments to pass to f.
60     @param **kwargs: keyword arguments to pass to f.
61
62     @return: A Deferred which fires a callback with the result of f,
63     or an errback with a L{twisted.python.failure.Failure} if f throws
64     an exception.
65     """
66     from twisted.internet import reactor
67     return deferToThreadPool(reactor, reactor.getThreadPool(),
68                              f, *args, **kwargs)
69
70
71 def _runMultiple(tupleList):
72     """
73     Run a list of functions.
74     """
75     for f, args, kwargs in tupleList:
76         f(*args, **kwargs)
77
78
79 def callMultipleInThread(tupleList):
80     """
81     Run a list of functions in the same thread.
82
83     tupleList should be a list of (function, argsList, kwargsDict) tuples.
84     """
85     from twisted.internet import reactor
86     reactor.callInThread(_runMultiple, tupleList)
87
88
89 def blockingCallFromThread(reactor, f, *a, **kw):
90     """
91     Run a function in the reactor from a thread, and wait for the result
92     synchronously, i.e. until the callback chain returned by the function
93     get a result.
94
95     @param reactor: The L{IReactorThreads} provider which will be used to
96         schedule the function call.
97     @param f: the callable to run in the reactor thread
98     @type f: any callable.
99     @param a: the arguments to pass to C{f}.
100     @param kw: the keyword arguments to pass to C{f}.
101
102     @return: the result of the callback chain.
103     @raise: any error raised during the callback chain.
104     """
105     queue = Queue.Queue()
106     def _callFromThread():
107         result = defer.maybeDeferred(f, *a, **kw)
108         result.addBoth(queue.put)
109     reactor.callFromThread(_callFromThread)
110     result = queue.get()
111     if isinstance(result, failure.Failure):
112         result.raiseException()
113     return result
114
115
116 __all__ = ["deferToThread", "callMultipleInThread", "blockingCallFromThread"]
117
Note: See TracBrowser for help on using the browser.