Ticket #5981: 5981_1.patch
| File 5981_1.patch, 4.0 KB (added by ephess, 5 months ago) |
|---|
-
twisted/python/threadpool.py
157 157 158 158 159 159 @contextlib.contextmanager 160 def _workerState(self, state _list):160 def _workerState(self, stateList, workerThread): 161 161 """ 162 162 Manages adding and removing this worker from a list of workers 163 163 in a particular state. 164 164 165 @param state_list: the list managing workers in this state 165 @param stateList: the list managing workers in this state 166 167 @param workerThread: the thread the worker is running in, used to 168 represent the worker in stateList 166 169 """ 167 ct = self.currentThread() 168 state_list.append(ct) 169 yield 170 state_list.remove(ct) 170 stateList.append(workerThread) 171 try: 172 yield 173 finally: 174 stateList.remove(workerThread) 171 175 172 176 173 177 def _worker(self): … … 176 180 from the threadpool, run it, and proceed to the next task until 177 181 threadpool is stopped. 178 182 """ 183 ct = self.currentThread() 179 184 o = self.q.get() 180 185 while o is not WorkerStop: 181 with self._workerState(self.working ):186 with self._workerState(self.working, ct): 182 187 ctx, function, args, kwargs, onResult = o 183 188 del o 184 189 … … 203 208 204 209 del ctx, onResult, result 205 210 206 with self._workerState(self.waiters ):211 with self._workerState(self.waiters, ct): 207 212 o = self.q.get() 208 213 209 ct = self.currentThread()210 214 self.threads.remove(ct) 211 215 212 216 -
twisted/test/test_threadpool.py
472 472 tp.stop() 473 473 474 474 475 def test_workerState (self):475 def test_workerStateTransition(self): 476 476 """ 477 477 As the worker receives and completes work it should transition between 478 478 the working/waiting states. … … 487 487 self.assertEqual(len(pool.working), 0) 488 488 489 489 # fire up a worker and give it some 'work' 490 thread _working = threading.Event()491 thread _finish = threading.Event()490 threadWorking = threading.Event() 491 threadFinish = threading.Event() 492 492 493 493 def _thread(): 494 thread _working.set()495 thread _finish.wait()494 threadWorking.set() 495 threadFinish.wait() 496 496 497 497 pool.callInThread(_thread) 498 thread _working.wait()498 threadWorking.wait() 499 499 self.assertEqual(pool.workers, 1) 500 500 self.assertEqual(len(pool.waiters), 0) 501 501 self.assertEqual(len(pool.working), 1) 502 502 503 503 # finish work, and spin until state changes 504 thread _finish.set()504 threadFinish.set() 505 505 while not len(pool.waiters): 506 506 time.sleep(0.0005) 507 507 … … 510 510 self.assertEqual(len(pool.working), 0) 511 511 512 512 513 def test_workerState(self): 514 """ 515 Upon entering a _workerState block the threads unique identifier 516 should be added to a stateList, and removed upon exiting the block. 517 """ 518 pool = threadpool.ThreadPool() 519 workerThread = object() 520 stateList = [] 521 with pool._workerState(stateList, workerThread): 522 self.assertIn(workerThread, stateList) 523 self.assertNotIn(workerThread, stateList) 524 525 # raise an exception instead of running out to test exception state 526 try: 527 with pool._workerState(stateList, workerThread): 528 self.assertIn(workerThread, stateList) 529 raise Exception() 530 except: 531 pass 532 else: 533 self.fail("_workerState shouldn't be consuming exceptions") 534 self.assertNotIn(workerThread, stateList) 535 536 513 537 class RaceConditionTestCase(unittest.SynchronousTestCase): 514 538 515 539 def getTimeout(self):
