[Twisted-Python] reactor.stop() won't, threads and Queue to blame?
Brett Viren
bv at bnl.gov
Sun Oct 24 20:39:07 EDT 2004
Hi,
I can't seem to make reactor.stop() actually stop the reactor and
allow my program to exit. I hope someone can help.
What I'm actually trying to do is implement an execution pipeline so I
can serialize certain jobs but still run them in a thread so the main
loop can go about its business. I do this with a class, CommandQueue,
that has a Queue.Queue and a method that can be called via
reactor.callInThread() that slurps the queue and runs any callables
sent down it. Each callable's result is obtained from a Deferred.
This thread loops until I set CommandQueue.stop=True. (Maybe someone
knows a better way?)
To try to triger the shutdown, I add a callback to the Deferred
associated with the final callable sent down the queue. If I instead
call shutdown function via reactor.callLater() then the reactor will
actually stop.
I don't know if it matters, but /bin/ps only shows one instance of
python running. Usually I see one instance per thread.
The code below and its output shows the problem.
#!/usr/bin/env python2.3
from twisted.python import threadable
threadable.init(1)
from twisted.internet import reactor,defer
from Queue import Queue,Empty
class CommandQueue:
'''Queue up commands for serial calling. One must call the
drain() method to start reading the internal queue. Most likely
one wants to call this in a thread.'''
def __init__(self):
"Create a CommandQueue"
self.queue = Queue()
self.stop = False
return
def __call__(self,meth,*a,**k):
'''Call meth(*a,**k) when it reaches end of queue. Returns a
Deferred that will pass the return of meth.'''
deferred = defer.Deferred()
self.queue.put((deferred,meth,a,k))
return deferred
def drain(self):
'Drain the command queue until CommandQueue.stop is True'
while not self.stop:
try:
d,meth,a,k = self.queue.get(True,1)
except Empty:
print " queue empty"
continue
print "calling %s(%s,%s)"%(meth.__name__,str(a),str(k))
d.callback(meth(*a,**k))
print "callback done"
print "drain closing"
return 0
def test1():
import time
cq = CommandQueue()
reactor.callInThread(cq.drain)
def shutdown(x=None):
print "Stopping CommandQueue"
cq.stop = True
print "Stopping reactory"
reactor.stop()
print "reactor.stop()'ed"
def burp(x):
for n in range(0,x):
time.sleep(1)
print x,n
return x
def chirp(x):
print "okay:",x
return None
def ouch(x):
print "bad:",x
return x
last = 3
for n in range(0,last):
print "dispatching",n
d = cq(burp,n).addCallbacks(chirp,ouch)
if last-n == 1:
d.addCallbacks(shutdown,ouch)
if __name__ == '__main__':
print "running test1"
test1()
print "end test1"
reactor.run()
print "reactor exitted"
#=-------------- end ------------=#
Running this produces the following:
[i386]bviren at aviator:test> ./test-commandqueue.py
running test1
dispatching 0
dispatching 1
dispatching 2
end test1
calling burp((0,),{})
okay: 0
callback done
calling burp((1,),{})
1 0
okay: 1
callback done
calling burp((2,),{})
2 0
2 1
okay: 2
Stopping CommandQueue
Stopping reactory
reactor.stop()'ed
callback done
drain closing
At which point I have to hit Control-C and finally get:
reactor exitted
Here is the mod to use reactor.callLater():
...
last = 3
for n in range(0,last):
print "dispatching",n
d = cq(burp,n).addCallbacks(chirp,ouch)
#if last-n == 1:
# d.addCallbacks(shutdown,ouch)
reactor.callLater(10,shutdown)
...
So, any ideas as to what I'm doing wrong?
Thanks,
-Brett.
More information about the Twisted-Python
mailing list