Ticket #5981: 5981_0.patch

File 5981_0.patch, 4.2 KB (added by ephess, 2 years ago)
  • twisted/test/test_threadpool.py

     
    472472            tp.stop() 
    473473 
    474474 
     475    def test_workerState(self): 
     476        """ 
     477        As the worker receives and completes work it should transition between 
     478        the working/waiting states. 
     479        """ 
     480        pool = threadpool.ThreadPool(0,1) 
     481        pool.start() 
     482        self.addCleanup(pool.stop) 
    475483 
     484        # sanity check 
     485        self.assertEqual(pool.workers, 0) 
     486        self.assertEqual(len(pool.waiters), 0) 
     487        self.assertEqual(len(pool.working), 0) 
     488 
     489        # fire up a worker and give it some 'work' 
     490        thread_working = threading.Event() 
     491        thread_finish = threading.Event() 
     492 
     493        def _thread(): 
     494            thread_working.set() 
     495            thread_finish.wait() 
     496 
     497        pool.callInThread(_thread) 
     498        thread_working.wait() 
     499        self.assertEqual(pool.workers, 1) 
     500        self.assertEqual(len(pool.waiters), 0) 
     501        self.assertEqual(len(pool.working), 1) 
     502 
     503        # finish work, and spin until state changes 
     504        thread_finish.set() 
     505        while not len(pool.waiters): 
     506            time.sleep(0.0005) 
     507 
     508        # make sure state changed correctly 
     509        self.assertEqual(len(pool.waiters), 1) 
     510        self.assertEqual(len(pool.working), 0) 
     511 
     512 
    476513class RaceConditionTestCase(unittest.SynchronousTestCase): 
    477514 
    478515    def getTimeout(self): 
  • twisted/python/threadpool.py

     
    1515    from Queue import Queue 
    1616except ImportError: 
    1717    from queue import Queue 
     18import contextlib 
    1819import threading 
    1920import copy 
    2021 
     
    155156            self._startSomeWorkers() 
    156157 
    157158 
     159    @contextlib.contextmanager 
     160    def _workerState(self, state_list): 
     161        """ 
     162        Manages adding and removing this worker from a list of workers 
     163        in a particular state. 
     164 
     165        @param state_list: the list managing workers in this state 
     166        """ 
     167        ct = self.currentThread() 
     168        state_list.append(ct) 
     169        yield 
     170        state_list.remove(ct) 
     171 
     172 
    158173    def _worker(self): 
    159174        """ 
    160175        Method used as target of the created threads: retrieve a task to run 
    161176        from the threadpool, run it, and proceed to the next task until 
    162177        threadpool is stopped. 
    163178        """ 
    164         ct = self.currentThread() 
    165179        o = self.q.get() 
    166180        while o is not WorkerStop: 
    167             self.working.append(ct) 
    168             ctx, function, args, kwargs, onResult = o 
    169             del o 
     181            with self._workerState(self.working): 
     182                ctx, function, args, kwargs, onResult = o 
     183                del o 
    170184 
    171             try: 
    172                 result = context.call(ctx, function, *args, **kwargs) 
    173                 success = True 
    174             except: 
    175                 success = False 
    176                 if onResult is None: 
    177                     context.call(ctx, log.err) 
    178                     result = None 
    179                 else: 
    180                     result = failure.Failure() 
     185                try: 
     186                    result = context.call(ctx, function, *args, **kwargs) 
     187                    success = True 
     188                except: 
     189                    success = False 
     190                    if onResult is None: 
     191                        context.call(ctx, log.err) 
     192                        result = None 
     193                    else: 
     194                        result = failure.Failure() 
    181195 
    182             del function, args, kwargs 
     196                del function, args, kwargs 
    183197 
    184             self.working.remove(ct) 
    185  
    186198            if onResult is not None: 
    187199                try: 
    188200                    context.call(ctx, onResult, success, result) 
     
    191203 
    192204            del ctx, onResult, result 
    193205 
    194             self.waiters.append(ct) 
    195             o = self.q.get() 
    196             self.waiters.remove(ct) 
     206            with self._workerState(self.waiters): 
     207                o = self.q.get() 
    197208 
     209        ct = self.currentThread() 
    198210        self.threads.remove(ct) 
    199211 
    200212