[Twisted-Python] Perspective broker remote calls to multiple servers seem to run sequentially

Sean Hollingsworth smhollingsworth at gmail.com
Thu Jul 2 14:13:42 MDT 2009


Let me start off by warning everyone that I am extremely new to twisted
(I've been using it maybe a week) and it's highly likely some things don't
work the way I think they should. Or, I'm just using them incorrectly.

Some background on what I'm trying to accomplish:

I have a client/server system that reads in data values from a file
(anywhere between 10k and 1.5M, running every hour) and sends those off to a
server for processing. Processing each value takes some variable amount of
time and, given that I am doing this every hour, I need it to be as fast as
possible, which is why I was drawn to the asynchronous nature of python. I'm
using python because everything we do here is in python. I had the app
working well with a single server, but I need to scale it out to multiple
servers to get the processing time down. At first, I started writing my own
distributedish app, which worked (more or less), but had a enough niggly
issues that I started looking for something else. This morning I discovered
the Perspective Broker and that seemed to be the solution I needed.

The problems:

Right now, for development and testing, I have three servers running on
localhost on three ports. When I run my test code, it seems that each
Perspective Broker runs sequentially, rather than concurrently. For example,
if i watch the debugging output from the server, I see the following:

server 0 processing dataval1
server 0 processing dataval2
server 0 processing dataval3
server 0 processing dataval4
server 1 processing dataval*5*
server 1 processing dataval6
server 1 processing dataval7
server 2 processing dataval8
server 2 processing dataval9
server 2 processing dataval10

My understanding is that the perspective brokers would work concurrently. Is
that incorrect? My guess is that they should work concurrently and I am just
doing something wrong in my code, due to my very, very limited understanding
of how they work. Well, how much of twisted works, really.

Below is my the relevant code for the client (I've taken out code that just
deals with prepping data or debugging). Please keep in mind that this is
mostly testing code, while I get a better understanding of how perspective
brokers work and was cobbled together from examples and docs found online.

CLIENT CODE:

import sys, os
from twisted.spread import pb
from twisted.internet import reactor
import Queue

MY_PORT = 49981
SERVERS = 3

class Client:
    def __init__(self, host):
        self.out_count = 0
        self.in_count = 0
        self.total = 0
        self.in_file = open(sys.argv[1], 'rb')
        self.out_file = open(sys.argv[2], 'wb')

        # Create data queues for each server so that we can send data to
each server
        self.queues = []
        for server in range(0,SERVERS):
            self.queues.append(Queue.Queue())

        # Populate queues with data
        pos = 0
        for dv in self.in_file:
            dv = dv.strip()
            self.queues[pos % len(self.queues)].put(dv)
            pos += 1

        # Conenct to servers
        print 'Connecting to Servers...'
        for server in range(0, COUNT):
            factory = pb.PBClientFactory()
            reactor.connectTCP(host, server + MY_PORT, factory)
            factory.getRootObject().addCallbacks(self.connected,
self.connect_failure, [self.queues[server]])

    def connected(self, perspective, queue):
        print "connected"
        while 1:
            try:
                dv = queue.get_nowait()
            except Queue.Empty:
                break
            else:
                perspective.callRemote('process_data',
dv).addCallbacks(self.success, self.lookup_failure)

    def success(self, result):
        print result
        dv, answer = result
        self.in_count += 1
        self.out_file.write('%s,%s\n' % (dv, answer))
        if(self.in_count == self.out_count):
            self.out_file.close()
            reactor.stop()

    def connect_failure(self, _):
        print "Remote connect failure"
        self.out_file.close()
        reactor.stop()

    def lookup_failure(self, _):
        print "Remote lookup failure"
        self.out_file.close()
        reactor.stop()

Client("127.0.0.1")
reactor.run()

END CLIENT CODE

Any help anyone can provide will be greatly appreciated. I have another
question, too, but it can wait till this one is solved, since, for all I
know, whatever is causing this problem may be causing the other one.

Sean M Hollingsworth
-------------- next part --------------
An HTML attachment was scrubbed...
URL: </pipermail/twisted-python/attachments/20090702/c5be05d0/attachment.html>


More information about the Twisted-Python mailing list