[Twisted-Python] reactor.stop() won't, threads and Queue to blame?

Brett Viren bv at bnl.gov
Sun Oct 24 18:39:07 MDT 2004


Hi,

I can't seem to make reactor.stop() actually stop the reactor and
allow my program to exit.  I hope someone can help.

What I'm actually trying to do is implement an execution pipeline so I
can serialize certain jobs but still run them in a thread so the main
loop can go about its business.  I do this with a class, CommandQueue,
that has a Queue.Queue and a method that can be called via
reactor.callInThread() that slurps the queue and runs any callables
sent down it.  Each callable's result is obtained from a Deferred.
This thread loops until I set CommandQueue.stop=True.  (Maybe someone
knows a better way?)

To try to triger the shutdown, I add a callback to the Deferred
associated with the final callable sent down the queue.  If I instead
call shutdown function via reactor.callLater() then the reactor will
actually stop.

I don't know if it matters, but /bin/ps only shows one instance of
python running.  Usually I see one instance per thread.


The code below and its output shows the problem.


#!/usr/bin/env python2.3

from twisted.python import threadable
threadable.init(1)
from twisted.internet import reactor,defer
from Queue import Queue,Empty


class CommandQueue:

    '''Queue up commands for serial calling.  One must call the
    drain() method to start reading the internal queue.  Most likely
    one wants to call this in a thread.'''

    def __init__(self):
        "Create a CommandQueue"
        self.queue = Queue()
        self.stop = False
        return

    def __call__(self,meth,*a,**k):

        '''Call meth(*a,**k) when it reaches end of queue.  Returns a
        Deferred that will pass the return of meth.'''

        deferred = defer.Deferred()
        self.queue.put((deferred,meth,a,k))
        return deferred

    def drain(self):
        'Drain the command queue until CommandQueue.stop is True'
        while not self.stop:
            try:
                d,meth,a,k = self.queue.get(True,1)
            except Empty:
                print "  queue empty"
                continue
            print "calling %s(%s,%s)"%(meth.__name__,str(a),str(k))
            d.callback(meth(*a,**k))
            print "callback done"
        print "drain closing"
        return 0

def test1():
    import time
    cq = CommandQueue()
    reactor.callInThread(cq.drain)

    def shutdown(x=None):
        print "Stopping CommandQueue"
        cq.stop = True
        print "Stopping reactory"
        reactor.stop()
        print "reactor.stop()'ed"

    def burp(x):
        for n in range(0,x):
            time.sleep(1)
            print x,n
        return x
    def chirp(x):
        print "okay:",x
        return None
    def ouch(x):
        print "bad:",x
        return x
    
    last = 3
    for n in range(0,last):
        print "dispatching",n
        d = cq(burp,n).addCallbacks(chirp,ouch)
        if last-n == 1:
            d.addCallbacks(shutdown,ouch)


if __name__ == '__main__':
    print "running test1"
    test1()
    print "end test1"
    reactor.run()
    print "reactor exitted"

#=-------------- end ------------=#


Running this produces the following:


[i386]bviren at aviator:test> ./test-commandqueue.py
running test1
dispatching 0
dispatching 1
dispatching 2
end test1
calling burp((0,),{})
okay: 0
callback done
calling burp((1,),{})
1 0
okay: 1
callback done
calling burp((2,),{})
2 0
2 1
okay: 2
Stopping CommandQueue
Stopping reactory
reactor.stop()'ed
callback done
drain closing

At which point I have to hit Control-C and finally get:

reactor exitted

Here is the mod to use reactor.callLater():

    ...
    last = 3
    for n in range(0,last):
        print "dispatching",n
        d = cq(burp,n).addCallbacks(chirp,ouch)
        #if last-n == 1:
        #    d.addCallbacks(shutdown,ouch)
    reactor.callLater(10,shutdown)
    ...


So, any ideas as to what I'm doing wrong?  

Thanks,
-Brett.





More information about the Twisted-Python mailing list