[Twisted-Python] newbie question about using deferreds and threads

Martin Waite martin at datacash.com
Thu Mar 6 04:31:49 EST 2003


On Wed, 2003-03-05 at 10:04, Christopher Armstrong wrote:
> 
> Mapping of RID:Deferred is quite sane, I think. It could probably
> even be called an idiom, by now.
> 
> "somehow activates the Deferred" is just
> self.requests[rid].callback(val), where self.requests is your mapping,
> rid is the request ID, and val is the value that was recently made
> available.
> 
Thanks for the advice Chris.

I've hacked away for a few hours and nearly have something working.

I have two database threads, one running write_db_request and the
other running read_db_response.  The listener posts completed
requests (end of request is a line containing only a '.') to
write_db_request via a queue, and also allocates a RID and pops
a deferred into the request hash.

The read_db_response thread polls the database for 'new' responses,
pulls the RID and response data, and activates the deferred
via the request hash.  The deferred calls printData which in turn
invokes Frontd.Respond, which prints the response and closes the 
connection.

I still have a way to go on this.  Problems are

1) I need a mutex around the request hash. I can use the 
   lock mechanism from the thread module for this.

2) If i do use a mutex (and I already am indirectly by
   using a Queue), then aren't I running the risk of blocking ?

3) When using telnet to test the program, I enter some data
   followed by the terminator (".\n") - and I get a response.
   However the connection remains open until I enter a newline.

4) The database threads continue to run after the program has stopped.

I include my code below for your entertainment. Any pointers on how 
I can improve on this are welcomed.

My next line of attack is to try and get rid of the database threads
and also bring in the Application object.  If I get rid of the threads, 
I get rid of the mutex issues.  Will adbapi allow me to do this ?

regards,
Martin

========================================================================

from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor
from twisted.internet import defer

from Queue import Queue
from sys import stdout
import MySQLdb
import re

q = Queue(0)
request = {}

rid = 0

class Frontd(Protocol):

    def connectionMade(self):
        self.data = ''
        self.active = 1

    def dataReceived(self,data):
        stdout.write( data )
        if self.active == 1:
            self.data = self.data + data
            if ( re.search("^\s*\.\s*$", self.data, re.M ) ):
                global rid;  
                rid = rid + 1 
                q.put( { 'rid' : rid, 'msg' : self.data } )
                d = defer.Deferred()  
                d.addCallback(printData, self)
                
                request[rid] = d   # need a mutex around this !!!!
                self.active = 0
                #self.transport.stopReading()

    def respond(self,data):
        self.transport.write(data + "\n")
        self.transport.loseConnection()

class FrontdFactory(Factory):

    protocol = Frontd

def printData(d, frontd):
    frontd.respond(d)
    
def read_db_response():
    import time
    cursor = conn_in.cursor()
    while (1) :
        time.sleep(0.5)
        cursor.execute( "update msg set status = 'wip' where status = 'new'" )
        cursor.execute( "select rid, response from msg where status = 'wip'" )
        rows = cursor.fetchall()
        for row in rows:
            stdout.write( "rid %d response '%s'\n" % (row[0], row[1]))
            request[row[0]].callback( row[1] )
        cursor.execute( "update msg set status = 'done' where status = 'wip'" )

def write_db_request():
    import time
    cursor = conn_out.cursor()
    while (1) :
        req = q.get()
        cursor.execute( "insert into msg set rid=%s, msg=%s, response='help'", (req['rid'], req['msg']) )

conn_in  = MySQLdb.connect(  host = 'localhost', user = 'martin', passwd = 'password', db = 'test' );
conn_out = MySQLdb.connect(  host = 'localhost', user = 'martin', passwd = 'password', db = 'test' );

# run method in thread
reactor.callInThread( read_db_response )
reactor.callInThread( write_db_request )

reactor.listenTCP(8007, FrontdFactory())
reactor.run()






More information about the Twisted-Python mailing list