[Twisted-Python] Slow Performance of Twisted Script

Thomas Greenwood tg.mufcnotforsale at gmail.com
Fri Feb 13 07:58:52 MST 2009


Hi everyone,

I have been developing my first python script and jumped straight into the
world of twisted. I like the concept but I seem to be doing something very
wrong. The script takes an NZB file and parses it (using HellaNZB's parser)
and then makes a specified number of connections to the news server and
checks that all articles are there using the STAT command. I also know
nothing about newsservers or nntp so this script is a complete shot in the
dark. First time with python, first time with twisted and first time with
nntp seems to equal disaster!

The problem is that the script is very slow, even with 20 simultaneous
connections to the internet I only produce 111k or so of traffic and only
achieve 2 article checks per connection per second. On a 24meg connection I
can download the whole thing quicker than checking it with my script! Any
ideas why it is so slow?

I have attached the script.

Thanks for any help.

Tom Greenwood
-------------- next part --------------
An HTML attachment was scrubbed...
URL: </pipermail/twisted-python/attachments/20090213/d5c5fac5/attachment.html>
-------------- next part --------------
#!/usr/bin/python
from twisted.internet import reactor    
from twisted.internet import protocol
from NZBHellaParser import NZBParser,  parseNZB
from twisted.news.nntp import NNTPClient
import sys,traceback,time

results = None
allClients = None

class NNTPStatCheck(NNTPClient):
    "Class to check if the server has all the articles required using as little bandwidth as possible."
    nextMessageID = None
    def __init__(self, _username, _password,  _newsgroup):
        NNTPClient.__init__(self)
        self.username = _username
        self.password = _password
        self.newsgroup = _newsgroup
        self.nextMessageID = 0
        self.currMessageID = None
        #self.lastTime = 0
        
    def getStat(self):
        #print "Stating for " + self.currMessageID
        #self.startTime = time.time()
        self.sendLine('STAT <%s>' % (self.currMessageID, ))
        self._newState(None, self.getStatFailed, self._stateStat)
        
    def connectionLost(self, error):
        NNTPClient.connectionLost(self)
        if self.nextMessageID >= len(results):
            self.factory.readyToStop = True;
            for client in allClients:
                if client.readyToStop == False:
                    return
            reactor.stop()
        else:
            print "Unexpected Connection Loss, Should we try again?? " + str(self.nextMessageID) + "/" + str(len(results))
                 
    def connectionMade(self):
        NNTPClient.connectionMade(self)
        print 'Connection made, logging in..'
        self.authInfo()
    
    def gotGroup(self, group):
        #print "GOT GROUP "  + str(group) + "\n"
        self.statNext()
        
    def gotStat(self, stat):
        #print "GOT STAT %s\n" % str(stat)
        results[self.currMessageID][self.newsgroup] = True
        #FIXME: Don't need to check the rest of the groups if we have it
        
    def _stateStat(self, (code, message)):
        #print str(allClients.index(self.factory)) + " " + str(time.time() - self.startTime) + " " + str(time.time() - self.lastTime)
        #self.lastTime = time.time()
        if code == 223:
            self.gotStat((code, message))
            self._endState()
            self.tryNext()
        else:
            self.getStatFailed("%s %s" % (str(code) , str(message)))
            
    def statNext(self):
        if self.nextMessageID < len(results):
            self.currMessageID = results.keys()[self.nextMessageID]
            self.nextMessageID = (self.nextMessageID + 1)
            if results[self.currMessageID] [self.newsgroup] is None:
                self.getStat()
	    else:
                self.nextMessageID = (self.nextMessageID + 1)
                self.statNext()
        else:
	    # FIXME: Should now be fixed but sometimes the client got into an unknown state
            print "Finished, disconnecting from server."
            self.quit()
            
    def tryNext(self):
        if self.nextMessageID < len(results):
            self.fetchGroup(self.newsgroup)
        else:
            print "Finished, disconnecting from server."
            self.quit()
                
    def getStatFailed(self, error):
        print str(time.time() - self.startTime)
        try:
            (code, message) = error.split(" ", 1)
        except AttributeError:
            print 'Unknown reply structure: %s\n' % str(error)
        if code == "430":
             self.messageDoesNotExist(error)
        else:
            print 'Error occured after stat: %s\n' % str(error)
        results[self.currMessageID][self.newsgroup] = False
        self.tryNext()
        
    def messageDoesNotExist(self, error):
        print 'Message ' + self.currMessageID + ' does not exist in group ' + self.newsgroup + '\n'
    
    def authInfo(self):
        self.sendLine('AUTHINFO USER ' + self.username)
        self._newState(None, self.authInfoFailed, self._authInfoUserResponse)

    def _authInfoUserResponse(self, (code, message)):
        if code == 381:
            self.sendLine('AUTHINFO PASS ' + self.password)
            self._newState(None, self.authInfoFailed, self._authInfoPassResponse)
        else:
            self.authInfoFailed('%d %s' % (code, message))
        self._endState()

    def _authInfoPassResponse(self, (code, message)):
        if code == 281:
            self.gotauthInfoOk('%d %s' % (code, message))
        else:
            self.authInfoFailed('%d %s' % (code, message))
        self._endState()

    def gotauthInfoOk(self, message):
        print 'Logged in, starting stat checks..'
        self.fetchGroup(self.newsgroup)

    def authInfoFailed(self, error):
        # FIXME: We use reactor.stop so we dont try and auth incorrectly for ever
        print 'Error occured whilst attempting auth: %s\n' % error
        reactor.stop()

        
class NNTPVerifyFactory(protocol.ClientFactory):
    readyToStop = False
    
    def __init__(self, _username, _password, _newsgroup):
        self.username = _username
        self.password = _password
        self.newsgroup = _newsgroup
        
    def clientConnectionFailed(self, connector,  reason):
        print 'Failed to connect using connector: ' + str(connector) + " because " + str(reason)
        reactor.stop()
        
    def buildProtocol(self, addr):
        p = NNTPStatCheck(self.username, self.password, self.newsgroup)
        p.factory = self
        return p
        
def checkFoundArticle(groups):
    "If Article found returns group found, if not found returns false, if failed returns None"
    for group, value in groups.iteritems():
        if value == True:
          return group
        elif value == False:
            return False;
    return None
    
def createDictionaryFromList(list):
    retVal = {}
    for item in list:
        retVal[item] = None
    return retVal

def calcConnections(numGroups,maxConn):
    if numGroups == 0:
        return 0
    if maxConn == 0:
        return 0

    if maxConn < numGroups:
        return 0        
    else:
        if (maxConn / numGroups) >= 1:
            return (maxConn / numGroups)
	else:
            return 1
	    
def printUsage():
        print "Usage: NZBVerify.py maxConnections nzbFile"
        print "  e.g: NZBVerify.py 20 test.nzb"
        print "\n"
        print "NZBVerify will distribute the connections between the groups specified in the NZB"
        print "Your usenet provider will be able to tell you how many maximum connection you can use"
        print "The value of maxConnections must be at least equal to the number of groups"
        print "There is very little error handling so if you think its got stuck, it probably has!"

def main():
    global results,  allClients
    results = {}
    allClients = []

    if len(sys.argv) < 3:
        printUsage()
        return

    try:
        maxConns = int(sys.argv[1])
    except ValueError:
        print "Please enter a valid integer for maxConnections"
        printUsage()
        return

    print "Max connections set to " + str(maxConns)    

    print "Loading NZB File from " + sys.argv[2]
    try:
        groups, messageIDs = parseNZB(sys.argv[2])
    except IOError:
        print "There was an IO error loading the NZB File. The error follows:\n"
        print traceback.print_exc()
        return

    print "Parsed NZB File with " + str(len(messageIDs)) + " articles from " + str(len(groups)) + " groups."

    connectionsPerGroup = calcConnections(len(groups),maxConns)

    print "Using " + str(connectionsPerGroup) + " connections per group."
    
    if connectionsPerGroup == 0:
        print "Finished. Can't do anything with no connections per group!"
        return

    for currMessageID in messageIDs:
        # FIXME: Need to check if we need to create a new Dictionary instance for each message or if python is clever enough to work it out itself.
        results[currMessageID] = createDictionaryFromList(groups)

    #FIXME: Can only have as many groups as connections                
    for group in groups:
	for i in range(connectionsPerGroup):
            verifier = NNTPVerifyFactory("tgreenwood at pp.newsgroups.user", "tomwap1986", str(group))
            allClients.append(verifier)
            reactor.connectTCP("us.usenet-news.net", 119, verifier)
        
    print "Connecting to server.."
    reactor.run()
    print "Got required data from server, analysing.."
    totalArticles = len(results)
    totalFound = 0
    totalNotChecked = 0
    for currMessageID,  _groups in results.iteritems():
        result = checkFoundArticle(_groups)
        if result is None:
            print "Article Not Checked: " + currMessageID
            totalNotChecked = (totalNotChecked + 1)
        elif result is False:
            print "Article Not Found: " + currMessageID
        else:
            print "Found Article: " + currMessageID + " in " + result
            totalFound = (totalFound + 1)
    print "Don't take the groups listed above as gospel, it only means that you newsserver said it had it on the server - it may not have cared that we asked for the article in a particular group!"
    print "All Finished."
    print "Found " + str(totalFound) + ", Not Found " + str(totalArticles - (totalFound+totalNotChecked)) + ", Not checked " + str(totalNotChecked)
    sys.exit(0)
  
if __name__ == '__main__':
    main()
  
-------------- next part --------------
from xml.sax import make_parser
from xml.sax.handler import ContentHandler, feature_external_ges, feature_namespaces

def parseNZB(fileName):
    """ Initialize the queue from the specified nzb file """
    # Create a parser
    parser = make_parser()
    
    # No XML namespaces here
    parser.setFeature(feature_namespaces, 0)
    parser.setFeature(feature_external_ges, 0)
    
    # Create the handler
    dh = NZBParser()
    
    # Tell the parser to use it
    parser.setContentHandler(dh)

    # Parse the input
    parser.parse(fileName)

    return (dh.groups, dh.queue)
        
class NZBParser(ContentHandler):
    """ Parse an NZB 1.0 file into a list of msgids
    http://www.newzbin.com/DTD/nzb/nzb-1.0.dtd """
    def __init__(self):
        # downloading queue to add NZB segments to
        self.queue = []

        # nzb file to parse
        #self.nzb = nzb
        self.groups = []

        # parsing variables
        self.file = None
        self.bytes = None
        self.number = None
        self.chars = None
        self.fileNeedsDownload = None
        
        self.fileCount = 0
        self.segmentCount = 0
        
    def startElement(self, name, attrs):
        if name == 'file':
            subject = self.parseUnicode(attrs.get('subject'))
            poster = self.parseUnicode(attrs.get('poster'))

            self.fileCount += 1
                
        elif name == 'group':
            self.chars = []
                        
        elif name == 'segment':
            self.bytes = int(attrs.get('bytes'))
            self.number = int(attrs.get('number'))
                        
            self.chars = []
        
    def characters(self, content):
        if self.chars is not None:
            self.chars.append(content)
        
    def endElement(self, name):
        if name == 'file':
            self.file = None
            self.fileNeedsDownload = None
                
        elif name == 'group':
            newsgroup = self.parseUnicode(''.join(self.chars))
            
            if newsgroup not in self.groups:
                self.groups.append(newsgroup)
                        
            self.chars = None
                
        elif name == 'segment':
            self.segmentCount += 1

            messageId = self.parseUnicode(''.join(self.chars))
            self.queue.append(messageId)

            self.chars = None
            self.number = None
            self.bytes = None    

    def parseUnicode(self, unicodeOrStr):
        if isinstance(unicodeOrStr, unicode):
            return unicodeOrStr.encode('latin-1')
        return unicodeOrStr


More information about the Twisted-Python mailing list