[Twisted-Python] RE: tracking spawned jobs in a twisted application (code review request)

Schneider, Michael michael.l.schneider at ugsplm.com
Mon Mar 1 11:32:02 EST 2004


Sorry,

I hit the send button too soon,
Mike


 
----------------------------------------------------------------
Michael Schneider
Senior Software Engineering Consultant
UGS PLM Solutions - an EDS Company
 
"The Greatest Performance Improvement Is the transitioning from a non-working state to the working state"


> -----Original Message-----
> From: Schneider, Michael 
> Sent: Monday, March 01, 2004 11:28 AM
> To: 'twisted-python at twistedmatrix.com'
> Subject: tracking spawned jobs in a twisted application (code review
> request)
> 
> 
> Hello All,
> 
> I have been pushing some code around.
> 
> My problem is that I need to execute OS command line
> jobs with twisted.
> 
> I started by morphing the twisted tutorial app into a
> job runner.
> 
> 
> My first path (It was wrong!!) was to morph the database connection
> pool to a compute pool using popen2.
> 
> This group, suggested that I use reactor.spawn instead of popen2 .
> 
> Attached is the resulting code.  
> 
> Basic Approach for Command protocol.processProtocol:
> 
> 1) crate a JobPool Object to manage list of running jobs
> 2) Create Job Runner Object to wrap individual process info,
>    and capture process output.
>    (start time, stop time ...)
> 3) Create CmdProcProtcol  to manage interaction with the reactor
> 
> 
> I morphed the twisted application: (Coded in comment section below)
> 	1) Create JobPoolService
> 	2) Create JobRunnerProtocol
> 	3) Create XmlRpc Job Runner Interface that dispatches 
> to job runner service.
> 
> 
> 
> My questions are:
> 	- Is the the twisted way?
>       - What are some alternative approaches?
> 	- Is there a better way?
> 
> Thank you very much,
> Mike
>  
> ----------------------------------------------------------------
> Michael Schneider
> Senior Software Engineering Consultant
> UGS PLM Solutions - an EDS Company
>  
> "The Greatest Performance Improvement Is the transitioning 
> from a non-working state to the working state"
> 

# Twisted, the Framework of Your Internet

"""
Manage execution of multiple processes\

Usage Model:
1)Create Job runner Service in App
2)Create Job Runner Protocol in App
3) Create Xmlrpc to call

    ----- from service code in twisted app
    m = RunTaskPool()
    cmdLine="dir d:\"
    def = m.runJob(cmdLine)


class JobRunnerService(service.Service):

    __implements__ = service.Service.__implements__, IJobRunnerService

    def __init__(self):
        self.taskPool = None


    def runJob(self, command):
        print "In Twisted runJob, command = " + command
        self.verifyRunTaskPool()
        return self.taskPool.runJob(command)

    def verifyRunTaskPool(self):
        if(self.taskPool == None):
            self.taskPool = RunTaskPool()

class JobRunnerProtocol(basic.LineReceiver):

    def lineReceived(self, command):
        d = self.factory.runJob(command)
        d.addErrback(catchError)
        def writeValue(value):
            self.transport.write(value)
            self.transport.write('\n\n')
            self.transport.loseConnection()
        d.addCallback(writeValue)


class JobStatusXR(xmlrpc.XMLRPC):

    def __init__(self, service):
        xmlrpc.XMLRPC.__init__(self)
        self.service = service

    def xmlrpc_runJob(self, command):
        return self.service.runJob(command)


    def xmlrpc_isAlive(self):
        return 1


   
"""


from twisted.internet import protocol
from twisted.internet import reactor
from twisted.spread import pb
from twisted.python import reflect, log
from twisted.internet import  defer

from time import sleep
from time import time
import os



class CmdProcProtocol(protocol.ProcessProtocol):
    """Twisted Protocol Class to run command line in a sub process """
    def __init__(self, parentJobRunner):
        self._exitStatus = 0
        self.parentJobRunner = parentJobRunner
        self.data = ""
        
    def connectionMade(self):
        """Called at the start of the execution"""
        #print "connectionMade!"
        pass
 
    def outReceived(self, data):
        """Called when process writes output"""
        #print "outReceived! with %d bytes!" % len(data)
        self.data = self.data + data
        #print self.data
        
    def errReceived(self, data):
        #print "errReceived! with %d bytes!" % len(data)
        pass
    
    def inConnectionLost(self):
        #print "inConnectionLost! stdin is closed! (we probably did it)"
        pass
    def outConnectionLost(self):
        """Program has closed stdout (program terminated)"""
        #print "outConnectionLost! The child closed their stdout!"
        #print "# now is the time to examine what they wrote"
        #print "dir:", self.data
        pass

    def errConnectionLost(self):
        """Program has closed stderr (program terminated)"""
        #print "errConnectionLost! The child closed their stderr."
        pass
    def processEnded(self, status_object):
        """Child process exited"""
        self._exitStatus =  status_object.value.exitCode
        print "processEnded, status %d" % self._exitStatus
        print "quitting"
        print self.data
        #notify parent with data returned , and exit status
        self.parentJobRunner.signalRunComplete(self._exitStatus, self.data)

    def kill(self):
        """Kill this process"""
        #os.kill(self.transport.pid, signal.SIGKILL) 
        
class JobRunner:
    """
    I am a lightweight wrapper for run job.
    """
  

    def __init__(self, parentJobRunnerPool,  jobRunDeferred):

        #command to execute in Command Proc Protocol 
        self.command = ""

        #set 
        self.parentJobRunnerPool = parentJobRunnerPool
        self._startTime = -1
        self._endTime = -1
        self._timeLimit = -1
        self._exitcode = None
        self._log = "job not yet started"
        self._jobRunDeferred = jobRunDeferred
        
    def getLogCallback(self,*args, **kw):
        """Called by Process Run Protocol when child process execution is complete """

        #Notify JobRunner Pool That this job has completed execution
        self.parentJobRunnerPool.jobCompleteCallback(self)

        #return output of job
        return self._log

    def getExitCode(self):
        return self._exitcode
        
    def getStartTime(self):
        """Get Time Job Started Executing """
        return self._startTime
    
    def getEndTime(self):
        """Get Time Job Stopped Executing """
        return self._endTime
    
    def getRunTime(self):
        """ get time executing"""
        if(self._startTime < 0):
            #job has not started yet, return -1
            return -1
        
        if(self._endTime < 0):
            #job is running, but not yet finished, return time spent so far
            return (time() - self._startTime)
        
        #job is complete, return run time
        return (self._endTime - self._startTime)
        

    
    def runJob(self, *args, **kw):
        """Execute Job:  NOTE: this function executes in its own thread """
        
        #set execution start time
        
        self.command = (args[0])[0]
        

        print "In Run Job daemon command : " + self.command
     
        self._startTime = time()
        
        print "Start Running Command : " +  str(self.command) + "  " + str(self.getStartTime())
        self._log = "Job is Running: "

        #setup reactor to run job
        self.runCommand(self.command)
   


    def runCommand(self, commandStr):
        """Create Command Line Protocol, and call spawnProcess"""
        commandStr = 'cmd.exe /c ' + commandStr


        pp= CmdProcProtocol(self)
        reactor.spawnProcess(pp,
                             commandStr ,
                             env=os.environ)
        
        
    def signalRunComplete(self, exitCode, stdoutString):
        # Pass Control back to caller to self.runJob(..)
        # by triggering defered callback,
        # this callback will return this object

        self._exitCode = exitCode
        self._log = stdoutString
             
        # set execution time
        self._endTime = time()
      
        print "Done Running Job "  + self.command
        
        reactor.callFromThread(self._jobRunDeferred.callback, self)
        
        
    

class RunTaskPool(pb.Referenceable):
 
    running = 0 # true when the pool is operating
    


    def __init__(self,  *args, **kw ):
        """See RunTaskPool.__doc__
        """
 
    
        self.args = args
        self.kw = kw
        self._jobRunnerJobs = []

        self.jobs = {}  # running Jobs, hashed on thread id


    
 
        self.startID = reactor.callWhenRunning(self.start)

    def start(self):
        """Start of execution
        """
        
        if not self.running:
            
            self.shutdownID = reactor.addSystemEventTrigger('during',
                                                            'shutdown',
                                                            self.finalClose)
            self.running = 1

    def jobCompleteCallback(self, jobRunner):
        """Callback triggered by JobRunner when execution of job is complete """
        print "Deleting Job Object from running queue"
        if jobRunner in self._jobRunnerJobs:
            self._jobRunnerJobs.remove(jobRunner)

        

    def runJob(self, *args, **kw ):
        """run job and return the result as a defered object
        """

        print "MLS: runJob - setting up to run in thread "
    
   
        d = defer.Deferred()  
        jobRunner = JobRunner(self,d)
        
        #Add to list of JobRunnerJobs  managed
        self._jobRunnerJobs.append(jobRunner)

        
        print "MLS: in _runJobRunner "
        
        # run Job in thread pool thread
        jobRunner.runJob(args, kw)
            
        # return defered used by jobRunner

        
        d.addCallback(jobRunner.getLogCallback)
        return d

  

    def close(self):
        """Close all pool connections and shutdown the pool."""

        from twisted.internet import reactor
        if self.shutdownID:
            reactor.removeSystemEventTrigger(self.shutdownID)
            self.shutdownID = None
        if self.startID:
            reactor.removeSystemEventTrigger(self.startID)
            self.startID = None
        self.finalClose()

    def finalClose(self):
        """This should only be called by the shutdown trigger."""

        self.running = 0
        for job in self.jobs.values():
            if self.noisy:
                log.msg(' closing: %s%s' % (
                                                     self.kw or ''))
            job.close()
        self.jobs.clear()

    def __getstate__(self):
        return {'noisy': self.noisy,
                'min': self.min,
                'max': self.max,
                'kw': self.kw}

    def __setstate__(self, state):
        self.__dict__ = state
        apply(self.__init__, self.kw)

def startup():
         for x in range(1,14):
            d = m.runJob()
            d.addCallback(printSelf)
            
    
def printSelf(jobRunner):
        print "Job Runner Done"

    




More information about the Twisted-Python mailing list