Index: twisted/test/test_threadpool.py
===================================================================
--- twisted/test/test_threadpool.py	(revision 36602)
+++ twisted/test/test_threadpool.py	(working copy)
@@ -472,7 +472,44 @@
             tp.stop()
 
 
+    def test_workerState(self):
+        """
+        As the worker receives and completes work it should transition between
+        the working/waiting states.
+        """
+        pool = threadpool.ThreadPool(0,1)
+        pool.start()
+        self.addCleanup(pool.stop)
 
+        # sanity check
+        self.assertEqual(pool.workers, 0)
+        self.assertEqual(len(pool.waiters), 0)
+        self.assertEqual(len(pool.working), 0)
+
+        # fire up a worker and give it some 'work'
+        thread_working = threading.Event()
+        thread_finish = threading.Event()
+
+        def _thread():
+            thread_working.set()
+            thread_finish.wait()
+
+        pool.callInThread(_thread)
+        thread_working.wait()
+        self.assertEqual(pool.workers, 1)
+        self.assertEqual(len(pool.waiters), 0)
+        self.assertEqual(len(pool.working), 1)
+
+        # finish work, and spin until state changes
+        thread_finish.set()
+        while not len(pool.waiters):
+            time.sleep(0.0005)
+
+        # make sure state changed correctly
+        self.assertEqual(len(pool.waiters), 1)
+        self.assertEqual(len(pool.working), 0)
+
+
 class RaceConditionTestCase(unittest.SynchronousTestCase):
 
     def getTimeout(self):
Index: twisted/python/threadpool.py
===================================================================
--- twisted/python/threadpool.py	(revision 36602)
+++ twisted/python/threadpool.py	(working copy)
@@ -15,6 +15,7 @@
     from Queue import Queue
 except ImportError:
     from queue import Queue
+import contextlib
 import threading
 import copy
 
@@ -155,34 +156,45 @@
             self._startSomeWorkers()
 
 
+    @contextlib.contextmanager
+    def _workerState(self, state_list):
+        """
+        Manages adding and removing this worker from a list of workers
+        in a particular state.
+
+        @param state_list: the list managing workers in this state
+        """
+        ct = self.currentThread()
+        state_list.append(ct)
+        yield
+        state_list.remove(ct)
+
+
     def _worker(self):
         """
         Method used as target of the created threads: retrieve a task to run
         from the threadpool, run it, and proceed to the next task until
         threadpool is stopped.
         """
-        ct = self.currentThread()
         o = self.q.get()
         while o is not WorkerStop:
-            self.working.append(ct)
-            ctx, function, args, kwargs, onResult = o
-            del o
+            with self._workerState(self.working):
+                ctx, function, args, kwargs, onResult = o
+                del o
 
-            try:
-                result = context.call(ctx, function, *args, **kwargs)
-                success = True
-            except:
-                success = False
-                if onResult is None:
-                    context.call(ctx, log.err)
-                    result = None
-                else:
-                    result = failure.Failure()
+                try:
+                    result = context.call(ctx, function, *args, **kwargs)
+                    success = True
+                except:
+                    success = False
+                    if onResult is None:
+                        context.call(ctx, log.err)
+                        result = None
+                    else:
+                        result = failure.Failure()
 
-            del function, args, kwargs
+                del function, args, kwargs
 
-            self.working.remove(ct)
-
             if onResult is not None:
                 try:
                     context.call(ctx, onResult, success, result)
@@ -191,10 +203,10 @@
 
             del ctx, onResult, result
 
-            self.waiters.append(ct)
-            o = self.q.get()
-            self.waiters.remove(ct)
+            with self._workerState(self.waiters):
+                o = self.q.get()
 
+        ct = self.currentThread()
         self.threads.remove(ct)
 
 
