Let me start off by warning everyone that I am extremely new to twisted (I&#39;ve been using it maybe a week) and it&#39;s highly likely some things don&#39;t work the way I think they should. Or, I&#39;m just using them incorrectly.<br>
<br>Some background on what I&#39;m trying to accomplish:<br><br>I have a client/server system that reads in data values from a file (anywhere between 10k and 1.5M, running every hour) and sends those off to a server for processing. Processing each value takes some variable amount of time and, given that I am doing this every hour, I need it to be as fast as possible, which is why I was drawn to the asynchronous nature of python. I&#39;m using python because everything we do here is in python. I had the app working well with a single server, but I need to scale it out to multiple servers to get the processing time down. At first, I started writing my own distributedish app, which worked (more or less), but had a enough niggly issues that I started looking for something else. This morning I discovered the Perspective Broker and that seemed to be the solution I needed.<br>
<br>The problems:<br><br>Right now, for development and testing, I have three servers running on localhost on three ports. When I run my test code, it seems that each Perspective Broker runs sequentially, rather than concurrently. For example, if i watch the debugging output from the server, I see the following:<br>
<br>server 0 processing dataval1<br>server 0 processing dataval2<br>server 0 processing dataval3<br>server 0 processing dataval4<br>server 1 processing dataval<b>5</b><br>server 1 processing dataval6<br>server 1 processing dataval7<br>
server 2 processing dataval8<br>server 2 processing dataval9<br>server 2 processing dataval10<br><br>My understanding is that the perspective brokers would work concurrently. Is that incorrect? My guess is that they should work concurrently and I am just doing something wrong in my code, due to my very, very limited understanding of how they work. Well, how much of twisted works, really.<br>
<br>Below is my the relevant code for the client (I&#39;ve taken out code that just deals with prepping data or debugging). Please keep in mind that this is mostly testing code, while I get a better understanding of how perspective brokers work and was cobbled together from examples and docs found online.<br>
<br>CLIENT CODE:<br><br>import sys, os<br>from twisted.spread import pb<br>from twisted.internet import reactor<br>import Queue<br><br>MY_PORT = 49981<br>SERVERS = 3<br><br>class Client:<br>    def __init__(self, host):<br>
        self.out_count = 0<br>        self.in_count = 0<br>        self.total = 0<br>        self.in_file = open(sys.argv[1], &#39;rb&#39;)<br>        self.out_file = open(sys.argv[2], &#39;wb&#39;)<br><br>        # Create data queues for each server so that we can send data to each server <br>
        self.queues = []<br>        for server in range(0,SERVERS):<br>            self.queues.append(Queue.Queue())<br><br>        # Populate queues with data<br>        pos = 0<br>        for dv in self.in_file:<br>            dv = dv.strip()<br>
            self.queues[pos % len(self.queues)].put(dv)<br>            pos += 1<br><br>        # Conenct to servers<br>        print &#39;Connecting to Servers...&#39;<br>        for server in range(0, COUNT):<br>            factory = pb.PBClientFactory()<br>
            reactor.connectTCP(host, server + MY_PORT, factory)<br>            factory.getRootObject().addCallbacks(self.connected, self.connect_failure, [self.queues[server]])<br><br>    def connected(self, perspective, queue):<br>
        print &quot;connected&quot;<br>        while 1:<br>            try:<br>                dv = queue.get_nowait()<br>            except Queue.Empty:<br>                break<br>            else:<br>                perspective.callRemote(&#39;process_data&#39;, dv).addCallbacks(self.success, self.lookup_failure)<br>
<br>    def success(self, result):<br>        print result<br>        dv, answer = result<br>        self.in_count += 1<br>        self.out_file.write(&#39;%s,%s\n&#39; % (dv, answer))<br>        if(self.in_count == self.out_count):<br>
            self.out_file.close()<br>            reactor.stop()<br><br>    def connect_failure(self, _):<br>        print &quot;Remote connect failure&quot;<br>        self.out_file.close()<br>        reactor.stop()<br><br>
    def lookup_failure(self, _):<br>        print &quot;Remote lookup failure&quot;<br>        self.out_file.close()<br>        reactor.stop()<br><br>Client(&quot;127.0.0.1&quot;)<br>reactor.run()<br><br>END CLIENT CODE<br>
<br>Any help anyone can provide will be greatly appreciated. I have another question, too, but it can wait till this one is solved, since, for all I know, whatever is causing this problem may be causing the other one.<br>
<br>Sean M Hollingsworth<br>