Ticket #5981: 5981_1.patch

File 5981_1.patch, 4.0 KB (added by ephess, 4 years ago)

Patch addresses issues raised by review.

  • twisted/python/threadpool.py

     
    157157
    158158
    159159    @contextlib.contextmanager
    160     def _workerState(self, state_list):
     160    def _workerState(self, stateList, workerThread):
    161161        """
    162162        Manages adding and removing this worker from a list of workers
    163163        in a particular state.
    164164
    165         @param state_list: the list managing workers in this state
     165        @param stateList: the list managing workers in this state
     166
     167        @param workerThread: the thread the worker is running in, used to
     168            represent the worker in stateList
    166169        """
    167         ct = self.currentThread()
    168         state_list.append(ct)
    169         yield
    170         state_list.remove(ct)
     170        stateList.append(workerThread)
     171        try:
     172            yield
     173        finally:
     174            stateList.remove(workerThread)
    171175
    172176
    173177    def _worker(self):
     
    176180        from the threadpool, run it, and proceed to the next task until
    177181        threadpool is stopped.
    178182        """
     183        ct = self.currentThread()
    179184        o = self.q.get()
    180185        while o is not WorkerStop:
    181             with self._workerState(self.working):
     186            with self._workerState(self.working, ct):
    182187                ctx, function, args, kwargs, onResult = o
    183188                del o
    184189
     
    203208
    204209            del ctx, onResult, result
    205210
    206             with self._workerState(self.waiters):
     211            with self._workerState(self.waiters, ct):
    207212                o = self.q.get()
    208213
    209         ct = self.currentThread()
    210214        self.threads.remove(ct)
    211215
    212216
  • twisted/test/test_threadpool.py

     
    472472            tp.stop()
    473473
    474474
    475     def test_workerState(self):
     475    def test_workerStateTransition(self):
    476476        """
    477477        As the worker receives and completes work it should transition between
    478478        the working/waiting states.
     
    487487        self.assertEqual(len(pool.working), 0)
    488488
    489489        # fire up a worker and give it some 'work'
    490         thread_working = threading.Event()
    491         thread_finish = threading.Event()
     490        threadWorking = threading.Event()
     491        threadFinish = threading.Event()
    492492
    493493        def _thread():
    494             thread_working.set()
    495             thread_finish.wait()
     494            threadWorking.set()
     495            threadFinish.wait()
    496496
    497497        pool.callInThread(_thread)
    498         thread_working.wait()
     498        threadWorking.wait()
    499499        self.assertEqual(pool.workers, 1)
    500500        self.assertEqual(len(pool.waiters), 0)
    501501        self.assertEqual(len(pool.working), 1)
    502502
    503503        # finish work, and spin until state changes
    504         thread_finish.set()
     504        threadFinish.set()
    505505        while not len(pool.waiters):
    506506            time.sleep(0.0005)
    507507
     
    510510        self.assertEqual(len(pool.working), 0)
    511511
    512512
     513    def test_workerState(self):
     514        """
     515        Upon entering a _workerState block the threads unique identifier
     516        should be added to a stateList, and removed upon exiting the block.
     517        """
     518        pool = threadpool.ThreadPool()
     519        workerThread = object()
     520        stateList = []
     521        with pool._workerState(stateList, workerThread):
     522            self.assertIn(workerThread, stateList)
     523        self.assertNotIn(workerThread, stateList)
     524
     525        # raise an exception instead of running out to test exception state
     526        try:
     527            with pool._workerState(stateList, workerThread):
     528                self.assertIn(workerThread, stateList)
     529                raise Exception()
     530        except:
     531            pass
     532        else:
     533            self.fail("_workerState shouldn't be consuming exceptions")
     534        self.assertNotIn(workerThread, stateList)
     535
     536
    513537class RaceConditionTestCase(unittest.SynchronousTestCase):
    514538
    515539    def getTimeout(self):