[Twisted-Python] RE: tracking spawned jobs in a twisted application (code review request)

Schneider, Michael michael.l.schneider at ugsplm.com
Mon Mar 1 11:32:02 EST 2004


I hit the send button too soon,

Michael Schneider
Senior Software Engineering Consultant
UGS PLM Solutions - an EDS Company
"The Greatest Performance Improvement Is the transitioning from a non-working state to the working state"

> -----Original Message-----
> From: Schneider, Michael 
> Sent: Monday, March 01, 2004 11:28 AM
> To: 'twisted-python at twistedmatrix.com'
> Subject: tracking spawned jobs in a twisted application (code review
> request)
> Hello All,
> I have been pushing some code around.
> My problem is that I need to execute OS command line
> jobs with twisted.
> I started by morphing the twisted tutorial app into a
> job runner.
> My first path (It was wrong!!) was to morph the database connection
> pool to a compute pool using popen2.
> This group, suggested that I use reactor.spawn instead of popen2 .
> Attached is the resulting code.  
> Basic Approach for Command protocol.processProtocol:
> 1) crate a JobPool Object to manage list of running jobs
> 2) Create Job Runner Object to wrap individual process info,
>    and capture process output.
>    (start time, stop time ...)
> 3) Create CmdProcProtcol  to manage interaction with the reactor
> I morphed the twisted application: (Coded in comment section below)
> 	1) Create JobPoolService
> 	2) Create JobRunnerProtocol
> 	3) Create XmlRpc Job Runner Interface that dispatches 
> to job runner service.
> My questions are:
> 	- Is the the twisted way?
>       - What are some alternative approaches?
> 	- Is there a better way?
> Thank you very much,
> Mike
> ----------------------------------------------------------------
> Michael Schneider
> Senior Software Engineering Consultant
> UGS PLM Solutions - an EDS Company
> "The Greatest Performance Improvement Is the transitioning 
> from a non-working state to the working state"

# Twisted, the Framework of Your Internet

Manage execution of multiple processes\

Usage Model:
1)Create Job runner Service in App
2)Create Job Runner Protocol in App
3) Create Xmlrpc to call

    ----- from service code in twisted app
    m = RunTaskPool()
    cmdLine="dir d:\"
    def = m.runJob(cmdLine)

class JobRunnerService(service.Service):

    __implements__ = service.Service.__implements__, IJobRunnerService

    def __init__(self):
        self.taskPool = None

    def runJob(self, command):
        print "In Twisted runJob, command = " + command
        return self.taskPool.runJob(command)

    def verifyRunTaskPool(self):
        if(self.taskPool == None):
            self.taskPool = RunTaskPool()

class JobRunnerProtocol(basic.LineReceiver):

    def lineReceived(self, command):
        d = self.factory.runJob(command)
        def writeValue(value):

class JobStatusXR(xmlrpc.XMLRPC):

    def __init__(self, service):
        self.service = service

    def xmlrpc_runJob(self, command):
        return self.service.runJob(command)

    def xmlrpc_isAlive(self):
        return 1


from twisted.internet import protocol
from twisted.internet import reactor
from twisted.spread import pb
from twisted.python import reflect, log
from twisted.internet import  defer

from time import sleep
from time import time
import os

class CmdProcProtocol(protocol.ProcessProtocol):
    """Twisted Protocol Class to run command line in a sub process """
    def __init__(self, parentJobRunner):
        self._exitStatus = 0
        self.parentJobRunner = parentJobRunner
        self.data = ""
    def connectionMade(self):
        """Called at the start of the execution"""
        #print "connectionMade!"
    def outReceived(self, data):
        """Called when process writes output"""
        #print "outReceived! with %d bytes!" % len(data)
        self.data = self.data + data
        #print self.data
    def errReceived(self, data):
        #print "errReceived! with %d bytes!" % len(data)
    def inConnectionLost(self):
        #print "inConnectionLost! stdin is closed! (we probably did it)"
    def outConnectionLost(self):
        """Program has closed stdout (program terminated)"""
        #print "outConnectionLost! The child closed their stdout!"
        #print "# now is the time to examine what they wrote"
        #print "dir:", self.data

    def errConnectionLost(self):
        """Program has closed stderr (program terminated)"""
        #print "errConnectionLost! The child closed their stderr."
    def processEnded(self, status_object):
        """Child process exited"""
        self._exitStatus =  status_object.value.exitCode
        print "processEnded, status %d" % self._exitStatus
        print "quitting"
        print self.data
        #notify parent with data returned , and exit status
        self.parentJobRunner.signalRunComplete(self._exitStatus, self.data)

    def kill(self):
        """Kill this process"""
        #os.kill(self.transport.pid, signal.SIGKILL) 
class JobRunner:
    I am a lightweight wrapper for run job.

    def __init__(self, parentJobRunnerPool,  jobRunDeferred):

        #command to execute in Command Proc Protocol 
        self.command = ""

        self.parentJobRunnerPool = parentJobRunnerPool
        self._startTime = -1
        self._endTime = -1
        self._timeLimit = -1
        self._exitcode = None
        self._log = "job not yet started"
        self._jobRunDeferred = jobRunDeferred
    def getLogCallback(self,*args, **kw):
        """Called by Process Run Protocol when child process execution is complete """

        #Notify JobRunner Pool That this job has completed execution

        #return output of job
        return self._log

    def getExitCode(self):
        return self._exitcode
    def getStartTime(self):
        """Get Time Job Started Executing """
        return self._startTime
    def getEndTime(self):
        """Get Time Job Stopped Executing """
        return self._endTime
    def getRunTime(self):
        """ get time executing"""
        if(self._startTime < 0):
            #job has not started yet, return -1
            return -1
        if(self._endTime < 0):
            #job is running, but not yet finished, return time spent so far
            return (time() - self._startTime)
        #job is complete, return run time
        return (self._endTime - self._startTime)

    def runJob(self, *args, **kw):
        """Execute Job:  NOTE: this function executes in its own thread """
        #set execution start time
        self.command = (args[0])[0]

        print "In Run Job daemon command : " + self.command
        self._startTime = time()
        print "Start Running Command : " +  str(self.command) + "  " + str(self.getStartTime())
        self._log = "Job is Running: "

        #setup reactor to run job

    def runCommand(self, commandStr):
        """Create Command Line Protocol, and call spawnProcess"""
        commandStr = 'cmd.exe /c ' + commandStr

        pp= CmdProcProtocol(self)
                             commandStr ,
    def signalRunComplete(self, exitCode, stdoutString):
        # Pass Control back to caller to self.runJob(..)
        # by triggering defered callback,
        # this callback will return this object

        self._exitCode = exitCode
        self._log = stdoutString
        # set execution time
        self._endTime = time()
        print "Done Running Job "  + self.command
        reactor.callFromThread(self._jobRunDeferred.callback, self)

class RunTaskPool(pb.Referenceable):
    running = 0 # true when the pool is operating

    def __init__(self,  *args, **kw ):
        """See RunTaskPool.__doc__
        self.args = args
        self.kw = kw
        self._jobRunnerJobs = []

        self.jobs = {}  # running Jobs, hashed on thread id

        self.startID = reactor.callWhenRunning(self.start)

    def start(self):
        """Start of execution
        if not self.running:
            self.shutdownID = reactor.addSystemEventTrigger('during',
            self.running = 1

    def jobCompleteCallback(self, jobRunner):
        """Callback triggered by JobRunner when execution of job is complete """
        print "Deleting Job Object from running queue"
        if jobRunner in self._jobRunnerJobs:


    def runJob(self, *args, **kw ):
        """run job and return the result as a defered object

        print "MLS: runJob - setting up to run in thread "
        d = defer.Deferred()  
        jobRunner = JobRunner(self,d)
        #Add to list of JobRunnerJobs  managed

        print "MLS: in _runJobRunner "
        # run Job in thread pool thread
        jobRunner.runJob(args, kw)
        # return defered used by jobRunner

        return d


    def close(self):
        """Close all pool connections and shutdown the pool."""

        from twisted.internet import reactor
        if self.shutdownID:
            self.shutdownID = None
        if self.startID:
            self.startID = None

    def finalClose(self):
        """This should only be called by the shutdown trigger."""

        self.running = 0
        for job in self.jobs.values():
            if self.noisy:
                log.msg(' closing: %s%s' % (
                                                     self.kw or ''))

    def __getstate__(self):
        return {'noisy': self.noisy,
                'min': self.min,
                'max': self.max,
                'kw': self.kw}

    def __setstate__(self, state):
        self.__dict__ = state
        apply(self.__init__, self.kw)

def startup():
         for x in range(1,14):
            d = m.runJob()
def printSelf(jobRunner):
        print "Job Runner Done"


More information about the Twisted-Python mailing list