Index: twisted/python/threadpool.py
===================================================================
--- twisted/python/threadpool.py	(revision 36613)
+++ twisted/python/threadpool.py	(working copy)
@@ -157,17 +157,21 @@
 
 
     @contextlib.contextmanager
-    def _workerState(self, state_list):
+    def _workerState(self, stateList, workerThread):
         """
         Manages adding and removing this worker from a list of workers
         in a particular state.
 
-        @param state_list: the list managing workers in this state
+        @param stateList: the list managing workers in this state
+
+        @param workerThread: the thread the worker is running in, used to
+            represent the worker in stateList
         """
-        ct = self.currentThread()
-        state_list.append(ct)
-        yield
-        state_list.remove(ct)
+        stateList.append(workerThread)
+        try:
+            yield
+        finally:
+            stateList.remove(workerThread)
 
 
     def _worker(self):
@@ -176,9 +180,10 @@
         from the threadpool, run it, and proceed to the next task until
         threadpool is stopped.
         """
+        ct = self.currentThread()
         o = self.q.get()
         while o is not WorkerStop:
-            with self._workerState(self.working):
+            with self._workerState(self.working, ct):
                 ctx, function, args, kwargs, onResult = o
                 del o
 
@@ -203,10 +208,9 @@
 
             del ctx, onResult, result
 
-            with self._workerState(self.waiters):
+            with self._workerState(self.waiters, ct):
                 o = self.q.get()
 
-        ct = self.currentThread()
         self.threads.remove(ct)
 
 
Index: twisted/test/test_threadpool.py
===================================================================
--- twisted/test/test_threadpool.py	(revision 36613)
+++ twisted/test/test_threadpool.py	(working copy)
@@ -472,7 +472,7 @@
             tp.stop()
 
 
-    def test_workerState(self):
+    def test_workerStateTransition(self):
         """
         As the worker receives and completes work it should transition between
         the working/waiting states.
@@ -487,21 +487,21 @@
         self.assertEqual(len(pool.working), 0)
 
         # fire up a worker and give it some 'work'
-        thread_working = threading.Event()
-        thread_finish = threading.Event()
+        threadWorking = threading.Event()
+        threadFinish = threading.Event()
 
         def _thread():
-            thread_working.set()
-            thread_finish.wait()
+            threadWorking.set()
+            threadFinish.wait()
 
         pool.callInThread(_thread)
-        thread_working.wait()
+        threadWorking.wait()
         self.assertEqual(pool.workers, 1)
         self.assertEqual(len(pool.waiters), 0)
         self.assertEqual(len(pool.working), 1)
 
         # finish work, and spin until state changes
-        thread_finish.set()
+        threadFinish.set()
         while not len(pool.waiters):
             time.sleep(0.0005)
 
@@ -510,6 +510,30 @@
         self.assertEqual(len(pool.working), 0)
 
 
+    def test_workerState(self):
+        """
+        Upon entering a _workerState block the threads unique identifier
+        should be added to a stateList, and removed upon exiting the block.
+        """
+        pool = threadpool.ThreadPool()
+        workerThread = object()
+        stateList = []
+        with pool._workerState(stateList, workerThread):
+            self.assertIn(workerThread, stateList)
+        self.assertNotIn(workerThread, stateList)
+
+        # raise an exception instead of running out to test exception state
+        try:
+            with pool._workerState(stateList, workerThread):
+                self.assertIn(workerThread, stateList)
+                raise Exception()
+        except:
+            pass
+        else:
+            self.fail("_workerState shouldn't be consuming exceptions")
+        self.assertNotIn(workerThread, stateList)
+
+
 class RaceConditionTestCase(unittest.SynchronousTestCase):
 
     def getTimeout(self):
