Ticket #5981: 5981_1.patch

File 5981_1.patch, 4.0 KB (added by ephess, 22 months ago)

Patch addresses issues raised by review.

  • twisted/python/threadpool.py

     
    157157 
    158158 
    159159    @contextlib.contextmanager 
    160     def _workerState(self, state_list): 
     160    def _workerState(self, stateList, workerThread): 
    161161        """ 
    162162        Manages adding and removing this worker from a list of workers 
    163163        in a particular state. 
    164164 
    165         @param state_list: the list managing workers in this state 
     165        @param stateList: the list managing workers in this state 
     166 
     167        @param workerThread: the thread the worker is running in, used to 
     168            represent the worker in stateList 
    166169        """ 
    167         ct = self.currentThread() 
    168         state_list.append(ct) 
    169         yield 
    170         state_list.remove(ct) 
     170        stateList.append(workerThread) 
     171        try: 
     172            yield 
     173        finally: 
     174            stateList.remove(workerThread) 
    171175 
    172176 
    173177    def _worker(self): 
     
    176180        from the threadpool, run it, and proceed to the next task until 
    177181        threadpool is stopped. 
    178182        """ 
     183        ct = self.currentThread() 
    179184        o = self.q.get() 
    180185        while o is not WorkerStop: 
    181             with self._workerState(self.working): 
     186            with self._workerState(self.working, ct): 
    182187                ctx, function, args, kwargs, onResult = o 
    183188                del o 
    184189 
     
    203208 
    204209            del ctx, onResult, result 
    205210 
    206             with self._workerState(self.waiters): 
     211            with self._workerState(self.waiters, ct): 
    207212                o = self.q.get() 
    208213 
    209         ct = self.currentThread() 
    210214        self.threads.remove(ct) 
    211215 
    212216 
  • twisted/test/test_threadpool.py

     
    472472            tp.stop() 
    473473 
    474474 
    475     def test_workerState(self): 
     475    def test_workerStateTransition(self): 
    476476        """ 
    477477        As the worker receives and completes work it should transition between 
    478478        the working/waiting states. 
     
    487487        self.assertEqual(len(pool.working), 0) 
    488488 
    489489        # fire up a worker and give it some 'work' 
    490         thread_working = threading.Event() 
    491         thread_finish = threading.Event() 
     490        threadWorking = threading.Event() 
     491        threadFinish = threading.Event() 
    492492 
    493493        def _thread(): 
    494             thread_working.set() 
    495             thread_finish.wait() 
     494            threadWorking.set() 
     495            threadFinish.wait() 
    496496 
    497497        pool.callInThread(_thread) 
    498         thread_working.wait() 
     498        threadWorking.wait() 
    499499        self.assertEqual(pool.workers, 1) 
    500500        self.assertEqual(len(pool.waiters), 0) 
    501501        self.assertEqual(len(pool.working), 1) 
    502502 
    503503        # finish work, and spin until state changes 
    504         thread_finish.set() 
     504        threadFinish.set() 
    505505        while not len(pool.waiters): 
    506506            time.sleep(0.0005) 
    507507 
     
    510510        self.assertEqual(len(pool.working), 0) 
    511511 
    512512 
     513    def test_workerState(self): 
     514        """ 
     515        Upon entering a _workerState block the threads unique identifier 
     516        should be added to a stateList, and removed upon exiting the block. 
     517        """ 
     518        pool = threadpool.ThreadPool() 
     519        workerThread = object() 
     520        stateList = [] 
     521        with pool._workerState(stateList, workerThread): 
     522            self.assertIn(workerThread, stateList) 
     523        self.assertNotIn(workerThread, stateList) 
     524 
     525        # raise an exception instead of running out to test exception state 
     526        try: 
     527            with pool._workerState(stateList, workerThread): 
     528                self.assertIn(workerThread, stateList) 
     529                raise Exception() 
     530        except: 
     531            pass 
     532        else: 
     533            self.fail("_workerState shouldn't be consuming exceptions") 
     534        self.assertNotIn(workerThread, stateList) 
     535 
     536 
    513537class RaceConditionTestCase(unittest.SynchronousTestCase): 
    514538 
    515539    def getTimeout(self):