Ticket #5981: 5981_0.patch
| File 5981_0.patch, 4.2 KB (added by ephess, 5 months ago) |
|---|
-
twisted/test/test_threadpool.py
472 472 tp.stop() 473 473 474 474 475 def test_workerState(self): 476 """ 477 As the worker receives and completes work it should transition between 478 the working/waiting states. 479 """ 480 pool = threadpool.ThreadPool(0,1) 481 pool.start() 482 self.addCleanup(pool.stop) 475 483 484 # sanity check 485 self.assertEqual(pool.workers, 0) 486 self.assertEqual(len(pool.waiters), 0) 487 self.assertEqual(len(pool.working), 0) 488 489 # fire up a worker and give it some 'work' 490 thread_working = threading.Event() 491 thread_finish = threading.Event() 492 493 def _thread(): 494 thread_working.set() 495 thread_finish.wait() 496 497 pool.callInThread(_thread) 498 thread_working.wait() 499 self.assertEqual(pool.workers, 1) 500 self.assertEqual(len(pool.waiters), 0) 501 self.assertEqual(len(pool.working), 1) 502 503 # finish work, and spin until state changes 504 thread_finish.set() 505 while not len(pool.waiters): 506 time.sleep(0.0005) 507 508 # make sure state changed correctly 509 self.assertEqual(len(pool.waiters), 1) 510 self.assertEqual(len(pool.working), 0) 511 512 476 513 class RaceConditionTestCase(unittest.SynchronousTestCase): 477 514 478 515 def getTimeout(self): -
twisted/python/threadpool.py
15 15 from Queue import Queue 16 16 except ImportError: 17 17 from queue import Queue 18 import contextlib 18 19 import threading 19 20 import copy 20 21 … … 155 156 self._startSomeWorkers() 156 157 157 158 159 @contextlib.contextmanager 160 def _workerState(self, state_list): 161 """ 162 Manages adding and removing this worker from a list of workers 163 in a particular state. 164 165 @param state_list: the list managing workers in this state 166 """ 167 ct = self.currentThread() 168 state_list.append(ct) 169 yield 170 state_list.remove(ct) 171 172 158 173 def _worker(self): 159 174 """ 160 175 Method used as target of the created threads: retrieve a task to run 161 176 from the threadpool, run it, and proceed to the next task until 162 177 threadpool is stopped. 163 178 """ 164 ct = self.currentThread()165 179 o = self.q.get() 166 180 while o is not WorkerStop: 167 self.working.append(ct)168 ctx, function, args, kwargs, onResult = o169 del o181 with self._workerState(self.working): 182 ctx, function, args, kwargs, onResult = o 183 del o 170 184 171 try:172 result = context.call(ctx, function, *args, **kwargs)173 success = True174 except:175 success = False176 if onResult is None:177 context.call(ctx, log.err)178 result = None179 else:180 result = failure.Failure()185 try: 186 result = context.call(ctx, function, *args, **kwargs) 187 success = True 188 except: 189 success = False 190 if onResult is None: 191 context.call(ctx, log.err) 192 result = None 193 else: 194 result = failure.Failure() 181 195 182 del function, args, kwargs196 del function, args, kwargs 183 197 184 self.working.remove(ct)185 186 198 if onResult is not None: 187 199 try: 188 200 context.call(ctx, onResult, success, result) … … 191 203 192 204 del ctx, onResult, result 193 205 194 self.waiters.append(ct) 195 o = self.q.get() 196 self.waiters.remove(ct) 206 with self._workerState(self.waiters): 207 o = self.q.get() 197 208 209 ct = self.currentThread() 198 210 self.threads.remove(ct) 199 211 200 212
