root / trunk / twisted / python / threadpool.py

Revision 24507, 9.0 kB (checked in by exarkun, 1 year ago)

Merge putresultindeferred-2845-4

Author: davep, exarkun
Reviewer: exarkun, therve
Fixes: #2845

Add deferToThreadPool to twisted.internet.threads. This is similar
to deferToThread, but accepts a ThreadPool instance as a parameter
and uses it as the threadpool to which to dispatch calls.

Line 
1 # -*- test-case-name: twisted.test.test_threadpool -*-
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5
6 """
7 twisted.threadpool: a pool of threads to which we dispatch tasks.
8
9 In most cases you can just use reactor.callInThread and friends
10 instead of creating a thread pool directly.
11 """
12
13 # System Imports
14 import Queue
15 import threading
16 import copy
17 import sys
18 import warnings
19
20
21 # Twisted Imports
22 from twisted.python import log, runtime, context, failure
23
24 WorkerStop = object()
25
26
27 class ThreadPool:
28     """
29     This class (hopefully) generalizes the functionality of a pool of
30     threads to which work can be dispatched.
31
32     callInThread() and stop() should only be called from
33     a single thread, unless you make a subclass where stop() and
34     _startSomeWorkers() are synchronized.
35     """
36     min = 5
37     max = 20
38     joined = False
39     started = False
40     workers = 0
41     name = None
42
43     threadFactory = threading.Thread
44     currentThread = staticmethod(threading.currentThread)
45
46     def __init__(self, minthreads=5, maxthreads=20, name=None):
47         """
48         Create a new threadpool.
49
50         @param minthreads: minimum number of threads in the pool
51
52         @param maxthreads: maximum number of threads in the pool
53         """
54         assert minthreads >= 0, 'minimum is negative'
55         assert minthreads <= maxthreads, 'minimum is greater than maximum'
56         self.q = Queue.Queue(0)
57         self.min = minthreads
58         self.max = maxthreads
59         self.name = name
60         if runtime.platform.getType() != "java":
61             self.waiters = []
62             self.threads = []
63             self.working = []
64         else:
65             self.waiters = ThreadSafeList()
66             self.threads = ThreadSafeList()
67             self.working = ThreadSafeList()
68
69     def start(self):
70         """
71         Start the threadpool.
72         """
73         self.joined = False
74         self.started = True
75         # Start some threads.
76         self.adjustPoolsize()
77
78     def startAWorker(self):
79         self.workers += 1
80         name = "PoolThread-%s-%s" % (self.name or id(self), self.workers)
81         newThread = self.threadFactory(target=self._worker, name=name)
82         self.threads.append(newThread)
83         newThread.start()
84
85     def stopAWorker(self):
86         self.q.put(WorkerStop)
87         self.workers -= 1
88
89     def __setstate__(self, state):
90         self.__dict__ = state
91         ThreadPool.__init__(self, self.min, self.max)
92
93     def __getstate__(self):
94         state = {}
95         state['min'] = self.min
96         state['max'] = self.max
97         return state
98
99     def _startSomeWorkers(self):
100         neededSize = self.q.qsize() + len(self.working)
101         # Create enough, but not too many
102         while self.workers < min(self.max, neededSize):
103             self.startAWorker()
104
105
106     def dispatch(self, owner, func, *args, **kw):
107         """
108         DEPRECATED: use L{callInThread} instead.
109
110         Dispatch a function to be a run in a thread.
111         """
112         warnings.warn("dispatch() is deprecated since Twisted 8.0, "
113                       "use callInThread() instead",
114                       DeprecationWarning, stacklevel=2)
115         self.callInThread(func, *args, **kw)
116
117
118     def callInThread(self, func, *args, **kw):
119         """
120         Call a callable object in a separate thread.
121
122         @param func: callable object to be called in separate thread
123
124         @param *args: positional arguments to be passed to func
125
126         @param **kw: keyword args to be passed to func
127         """
128         self.callInThreadWithCallback(None, func, *args, **kw)
129
130
131     def callInThreadWithCallback(self, onResult, func, *args, **kw):
132         """
133         Call a callable object in a separate thread and call onResult
134         with the return value, or a L{twisted.python.failure.Failure}
135         if the callable raises an exception.
136
137         The callable is allowed to block, but the onResult function
138         must not block and should perform as little work as possible.
139
140         A typical action for onResult for a threadpool used with a
141         Twisted reactor would be to schedule a Deferred to fire in the
142         main reactor thread using C{.callFromThread}.  Note that
143         onResult is called inside the separate thread, not inside the
144         reactor thread.
145
146         @param onResult: a callable with the signature (success, result).
147             If the callable returns normally, onResult is called with
148             (True, result) where result is the return value of the callable.
149             If the callable throws an exception, onResult is called with
150             (False, failure).
151
152             Optionally, onResult may be None, in which case it is not
153             called at all.
154
155         @param func: callable object to be called in separate thread
156
157         @param *args: positional arguments to be passed to func
158
159         @param **kwargs: keyword arguments to be passed to func
160         """
161         if self.joined:
162             return
163         ctx = context.theContextTracker.currentContext().contexts[-1]
164         o = (ctx, func, args, kw, onResult)
165         self.q.put(o)
166         if self.started:
167             self._startSomeWorkers()
168
169
170     def _runWithCallback(self, callback, errback, func, args, kwargs):
171         try:
172             result = apply(func, args, kwargs)
173         except:
174             errback(sys.exc_info()[1])
175         else:
176             callback(result)
177
178
179     def dispatchWithCallback(self, owner, callback, errback, func, *args, **kw):
180         """
181         DEPRECATED: use L{twisted.internet.threads.deferToThread} instead.
182
183         Dispatch a function, returning the result to a callback function.
184
185         The callback function will be called in the thread - make sure it is
186         thread-safe.
187         """
188         warnings.warn("dispatchWithCallback() is deprecated since Twisted 8.0, "
189                       "use twisted.internet.threads.deferToThread() instead.",
190                       DeprecationWarning, stacklevel=2)
191         self.callInThread(
192             self._runWithCallback, callback, errback, func, args, kw
193         )
194
195
196     def _worker(self):
197         """
198         Method used as target of the created threads: retrieve task to run
199         from the threadpool, run it, and proceed to the next task until
200         threadpool is stopped.
201         """
202         ct = self.currentThread()
203         o = self.q.get()
204         while o is not WorkerStop:
205             self.working.append(ct)
206             ctx, function, args, kwargs, onResult = o
207             del o
208
209             try:
210                 result = context.call(ctx, function, *args, **kwargs)
211                 success = True
212             except:
213                 success = False
214                 if onResult is None:
215                     context.call(ctx, log.err)
216                     result = None
217                 else:
218                     result = failure.Failure()
219
220             del function, args, kwargs
221
222             self.working.remove(ct)
223
224             if onResult is not None:
225                 try:
226                     context.call(ctx, onResult, success, result)
227                 except:
228                     context.call(ctx, log.err)
229
230             del ctx, onResult, result
231
232             self.waiters.append(ct)
233             o = self.q.get()
234             self.waiters.remove(ct)
235
236         self.threads.remove(ct)
237
238     def stop(self):
239         """
240         Shutdown the threads in the threadpool.
241         """
242         self.joined = True
243         threads = copy.copy(self.threads)
244         while self.workers:
245             self.q.put(WorkerStop)
246             self.workers -= 1
247
248         # and let's just make sure
249         # FIXME: threads that have died before calling stop() are not joined.
250         for thread in threads:
251             thread.join()
252
253     def adjustPoolsize(self, minthreads=None, maxthreads=None):
254         if minthreads is None:
255             minthreads = self.min
256         if maxthreads is None:
257             maxthreads = self.max
258
259         assert minthreads >= 0, 'minimum is negative'
260         assert minthreads <= maxthreads, 'minimum is greater than maximum'
261
262         self.min = minthreads
263         self.max = maxthreads
264         if not self.started:
265             return
266
267         # Kill of some threads if we have too many.
268         while self.workers > self.max:
269             self.stopAWorker()
270         # Start some threads if we have too few.
271         while self.workers < self.min:
272             self.startAWorker()
273         # Start some threads if there is a need.
274         self._startSomeWorkers()
275
276     def dumpStats(self):
277         log.msg('queue: %s'   % self.q.queue)
278         log.msg('waiters: %s' % self.waiters)
279         log.msg('workers: %s' % self.working)
280         log.msg('total: %s'   % self.threads)
281
282
283 class ThreadSafeList:
284     """
285     In Jython 2.1 lists aren't thread-safe, so this wraps it.
286     """
287
288     def __init__(self):
289         self.lock = threading.Lock()
290         self.l = []
291
292     def append(self, i):
293         self.lock.acquire()
294         try:
295             self.l.append(i)
296         finally:
297             self.lock.release()
298
299     def remove(self, i):
300         self.lock.acquire()
301         try:
302             self.l.remove(i)
303         finally:
304             self.lock.release()
305
306     def __len__(self):
307         return len(self.l)
308
Note: See TracBrowser for help on using the browser.