Let me start off by warning everyone that I am extremely new to twisted (I've been using it maybe a week) and it's highly likely some things don't work the way I think they should. Or, I'm just using them incorrectly.<br>
<br>Some background on what I'm trying to accomplish:<br><br>I have a client/server system that reads in data values from a file (anywhere between 10k and 1.5M, running every hour) and sends those off to a server for processing. Processing each value takes some variable amount of time and, given that I am doing this every hour, I need it to be as fast as possible, which is why I was drawn to the asynchronous nature of python. I'm using python because everything we do here is in python. I had the app working well with a single server, but I need to scale it out to multiple servers to get the processing time down. At first, I started writing my own distributedish app, which worked (more or less), but had a enough niggly issues that I started looking for something else. This morning I discovered the Perspective Broker and that seemed to be the solution I needed.<br>
<br>The problems:<br><br>Right now, for development and testing, I have three servers running on localhost on three ports. When I run my test code, it seems that each Perspective Broker runs sequentially, rather than concurrently. For example, if i watch the debugging output from the server, I see the following:<br>
<br>server 0 processing dataval1<br>server 0 processing dataval2<br>server 0 processing dataval3<br>server 0 processing dataval4<br>server 1 processing dataval<b>5</b><br>server 1 processing dataval6<br>server 1 processing dataval7<br>
server 2 processing dataval8<br>server 2 processing dataval9<br>server 2 processing dataval10<br><br>My understanding is that the perspective brokers would work concurrently. Is that incorrect? My guess is that they should work concurrently and I am just doing something wrong in my code, due to my very, very limited understanding of how they work. Well, how much of twisted works, really.<br>
<br>Below is my the relevant code for the client (I've taken out code that just deals with prepping data or debugging). Please keep in mind that this is mostly testing code, while I get a better understanding of how perspective brokers work and was cobbled together from examples and docs found online.<br>
<br>CLIENT CODE:<br><br>import sys, os<br>from twisted.spread import pb<br>from twisted.internet import reactor<br>import Queue<br><br>MY_PORT = 49981<br>SERVERS = 3<br><br>class Client:<br> def __init__(self, host):<br>
self.out_count = 0<br> self.in_count = 0<br> self.total = 0<br> self.in_file = open(sys.argv[1], 'rb')<br> self.out_file = open(sys.argv[2], 'wb')<br><br> # Create data queues for each server so that we can send data to each server <br>
self.queues = []<br> for server in range(0,SERVERS):<br> self.queues.append(Queue.Queue())<br><br> # Populate queues with data<br> pos = 0<br> for dv in self.in_file:<br> dv = dv.strip()<br>
self.queues[pos % len(self.queues)].put(dv)<br> pos += 1<br><br> # Conenct to servers<br> print 'Connecting to Servers...'<br> for server in range(0, COUNT):<br> factory = pb.PBClientFactory()<br>
reactor.connectTCP(host, server + MY_PORT, factory)<br> factory.getRootObject().addCallbacks(self.connected, self.connect_failure, [self.queues[server]])<br><br> def connected(self, perspective, queue):<br>
print "connected"<br> while 1:<br> try:<br> dv = queue.get_nowait()<br> except Queue.Empty:<br> break<br> else:<br> perspective.callRemote('process_data', dv).addCallbacks(self.success, self.lookup_failure)<br>
<br> def success(self, result):<br> print result<br> dv, answer = result<br> self.in_count += 1<br> self.out_file.write('%s,%s\n' % (dv, answer))<br> if(self.in_count == self.out_count):<br>
self.out_file.close()<br> reactor.stop()<br><br> def connect_failure(self, _):<br> print "Remote connect failure"<br> self.out_file.close()<br> reactor.stop()<br><br>
def lookup_failure(self, _):<br> print "Remote lookup failure"<br> self.out_file.close()<br> reactor.stop()<br><br>Client("127.0.0.1")<br>reactor.run()<br><br>END CLIENT CODE<br>
<br>Any help anyone can provide will be greatly appreciated. I have another question, too, but it can wait till this one is solved, since, for all I know, whatever is causing this problem may be causing the other one.<br>
<br>Sean M Hollingsworth<br>