[Twisted-Python] Synchronous Code Fishbowl

Matt Goodall matt at pollenation.net
Wed May 31 08:10:43 MDT 2006


Matt Goodall wrote:
> Ed Suominen wrote:
> 
> 
>>Well of course, Glyph's "interesting module" comment was just enough of
>>a table scrap to get me running, tail wagging furiously. The result
>>(unit testing in progress) is a full-fledged SynchronousTasks object
>>that runs a priority queue of synchronous tasks with niceness
>>scheduling. See
>>
>>* http://foss.eepatents.com/sAsync/browser/trunk/sasync/syncbridge.py
>>* http://foss.eepatents.com/sAsync/browser/trunk/test/syncbridge.py
> 
> 
> I was going to post again last night about how the PriorityQueue.get()
> would never block once something had been put() into it, but I see
> you've fixed that bug by clearing the event semaphore. Unfortunately,
> the code now has a race condition.
> 
> If the SynchronousQueue._workOnTasks thread is pre-empted in
> PriorityQueue.get() between "if self.empty():" and "self.event.clear()",
> and another thread calls PriorityQueue.put() then the event semaphore
> set during put() will be cleared when get() continues.
> 
> OK, that was horrible to write so here's a picture instead ;-) ...
> 
> Thread 1                                Thread 2
> 
> # Calls get()
> self.event.wait()
> result = heapq.heappop(self.list)
> if self.empty():
> 
> <------------------ Thread 2 preempts Thread 1 ---------------------->
> 
>                                        # Calls put()
>                                        heapq.heappush(self.list, item)
>                                        self.event.set()
> 
> <---------------------- Thread 1 continues -------------------------->
> 
>     self.event.clear()
> 
> 
> PriorityQueue should probably be using a Condition to protect access to
> the heapq list *and* wait for something to be posted to it. See
> <http://docs.python.org/lib/condition-objects.html>.

I notice you've updated syncbridge to use a Condition now. Looks better
to me, although there's another bug and an improvement suggestion.

First up is that it should be "while self.empty(): self.cv.wait()".

Whenever something is put on the queue notify() is always called to wake
a consumer thread. However, there may already be a consumer thread
tearing around a loop taking items from the queue and, critically, never
waiting until the queue is empty.

By the time the newly woken consumer thread actually calls heappop to
take an item the existing consumer thread may have emptied the queue.

Ths improvement suggestion is to put the conditional's release() in a
finally block to ensure it actually happens. (Why isn't the Condition
example in the documentation written expecting exceptions?)

Anyway, this all makes get() and put() look something like:


    def get(self):
        self.cv.acquire()
        try:
            while self.empty():
                self.cv.wait()
            return heapq.heappop(self.list)
        finally:
            self.cv.release()

    def put(self, item):
        self.cv.acquire()
        try:
            heapq.heappush(self.list, item)
            self.cv.notify()
        finally:
            self.cv.release()


After that, I think the queue implementation is ok.


And some people insist that threading is easy ;-).


- Matt


-- 
     __
    /  \__     Matt Goodall, Pollenation Internet Ltd
    \__/  \    w: http://www.pollenation.net
  __/  \__/    e: matt at pollenation.net
 /  \__/  \    t: +44 (0)113 2252500
 \__/  \__/
 /  \          Any views expressed are my own and do not necessarily
 \__/          reflect the views of my employer.





More information about the Twisted-Python mailing list