Ticket #5981: 5981_0.patch

File 5981_0.patch, 4.2 KB (added by ephess, 3 years ago)
  • twisted/test/test_threadpool.py

     
    472472            tp.stop()
    473473
    474474
     475    def test_workerState(self):
     476        """
     477        As the worker receives and completes work it should transition between
     478        the working/waiting states.
     479        """
     480        pool = threadpool.ThreadPool(0,1)
     481        pool.start()
     482        self.addCleanup(pool.stop)
    475483
     484        # sanity check
     485        self.assertEqual(pool.workers, 0)
     486        self.assertEqual(len(pool.waiters), 0)
     487        self.assertEqual(len(pool.working), 0)
     488
     489        # fire up a worker and give it some 'work'
     490        thread_working = threading.Event()
     491        thread_finish = threading.Event()
     492
     493        def _thread():
     494            thread_working.set()
     495            thread_finish.wait()
     496
     497        pool.callInThread(_thread)
     498        thread_working.wait()
     499        self.assertEqual(pool.workers, 1)
     500        self.assertEqual(len(pool.waiters), 0)
     501        self.assertEqual(len(pool.working), 1)
     502
     503        # finish work, and spin until state changes
     504        thread_finish.set()
     505        while not len(pool.waiters):
     506            time.sleep(0.0005)
     507
     508        # make sure state changed correctly
     509        self.assertEqual(len(pool.waiters), 1)
     510        self.assertEqual(len(pool.working), 0)
     511
     512
    476513class RaceConditionTestCase(unittest.SynchronousTestCase):
    477514
    478515    def getTimeout(self):
  • twisted/python/threadpool.py

     
    1515    from Queue import Queue
    1616except ImportError:
    1717    from queue import Queue
     18import contextlib
    1819import threading
    1920import copy
    2021
     
    155156            self._startSomeWorkers()
    156157
    157158
     159    @contextlib.contextmanager
     160    def _workerState(self, state_list):
     161        """
     162        Manages adding and removing this worker from a list of workers
     163        in a particular state.
     164
     165        @param state_list: the list managing workers in this state
     166        """
     167        ct = self.currentThread()
     168        state_list.append(ct)
     169        yield
     170        state_list.remove(ct)
     171
     172
    158173    def _worker(self):
    159174        """
    160175        Method used as target of the created threads: retrieve a task to run
    161176        from the threadpool, run it, and proceed to the next task until
    162177        threadpool is stopped.
    163178        """
    164         ct = self.currentThread()
    165179        o = self.q.get()
    166180        while o is not WorkerStop:
    167             self.working.append(ct)
    168             ctx, function, args, kwargs, onResult = o
    169             del o
     181            with self._workerState(self.working):
     182                ctx, function, args, kwargs, onResult = o
     183                del o
    170184
    171             try:
    172                 result = context.call(ctx, function, *args, **kwargs)
    173                 success = True
    174             except:
    175                 success = False
    176                 if onResult is None:
    177                     context.call(ctx, log.err)
    178                     result = None
    179                 else:
    180                     result = failure.Failure()
     185                try:
     186                    result = context.call(ctx, function, *args, **kwargs)
     187                    success = True
     188                except:
     189                    success = False
     190                    if onResult is None:
     191                        context.call(ctx, log.err)
     192                        result = None
     193                    else:
     194                        result = failure.Failure()
    181195
    182             del function, args, kwargs
     196                del function, args, kwargs
    183197
    184             self.working.remove(ct)
    185 
    186198            if onResult is not None:
    187199                try:
    188200                    context.call(ctx, onResult, success, result)
     
    191203
    192204            del ctx, onResult, result
    193205
    194             self.waiters.append(ct)
    195             o = self.q.get()
    196             self.waiters.remove(ct)
     206            with self._workerState(self.waiters):
     207                o = self.q.get()
    197208
     209        ct = self.currentThread()
    198210        self.threads.remove(ct)
    199211
    200212