Ticket #1939: twisted-process-improvements-2.patch

File twisted-process-improvements-2.patch, 38.8 KB (added by naked, 16 years ago)

second version of the improvement patch

  • twisted/internet/posixbase.py

    ==== Patch <twisted-process-improvements-2> level 1
    Source: 4ed7e550-c718-0410-ac9a-b82090e61d08:/local/twisted-process-idea-dev:11419
    Target: bbbe8e31-12d6-0310-92fd-ac37d47ddeeb:/trunk:17656
            (svn://svn.twistedmatrix.com/svn/Twisted/trunk)
    Log:
     r11396@taby:  naked | 2006-07-19 10:48:04 +0300
     Branched twisted-trunk to for developing the process module.
     r11399@taby:  naked | 2006-07-19 20:56:44 +0300
     Added first helpers for process childFDs.
     r11400@taby:  naked | 2006-07-19 21:23:05 +0300
     First version of rework of childFDs handling. We should be in working state
     again, so let me try testing.
     r11401@taby:  naked | 2006-07-19 21:56:49 +0300
     Implemented ProcessPTY as the reader/writer for PTY processes.
     r11402@taby:  naked | 2006-07-19 22:16:56 +0300
     Bugfixes to PTY process handling.
     r11403@taby:  naked | 2006-07-19 22:27:38 +0300
     Added defaultWriter hack to maybe make Process behave a bit like PTYProces if
     wanted.
     r11404@taby:  naked | 2006-07-19 22:34:55 +0300
     Bugfix.
     r11405@taby:  naked | 2006-07-19 22:36:24 +0300
     Modify posixbase to use the new Process PTY interface.
     r11409@taby:  naked | 2006-07-22 22:51:07 +0300
     Halfway rework of the process improvements. Now implements a factory.
     r11410@taby:  naked | 2006-07-22 22:55:37 +0300
     A tiny rework.
     r11411@taby:  naked | 2006-07-22 22:58:17 +0300
     Reorganize file a bit.
     r11412@taby:  naked | 2006-07-22 23:04:11 +0300
     Small tunes again.
     r11413@taby:  naked | 2006-07-23 00:01:08 +0300
     Severe rework of the process spawning mechanics. Now there's a ChildProcess
     class as well.
     r11414@taby:  naked | 2006-07-23 16:15:08 +0300
     Move changing path before setting up helpers. I'm not sure if it is a good
     thing or a bad thing.
    
    === twisted/internet/posixbase.py
    ==================================================================
     
    272272                     env={}, path=None,
    273273                     uid=None, gid=None, usePTY=0, childFDs=None):
    274274        if platformType == 'posix':
    275             if usePTY:
    276                 if childFDs is not None:
    277                     raise ValueError("Using childFDs is not supported with usePTY=True.")
    278                 return process.PTYProcess(self, executable, args, env, path,
    279                                           processProtocol, uid, gid, usePTY)
    280             else:
    281                 return process.Process(self, executable, args, env, path,
    282                                        processProtocol, uid, gid, childFDs)
     275            factory = process.CompatibilityProcessProtocolFactory(processProtocol, path, uid, gid, usePTY, childFDs)
     276            return process.Process(self, executable, args, env, factory)
    283277        elif platformType == "win32":
    284278            if uid is not None or gid is not None:
    285279                raise ValueError("The uid and gid parameters are not supported on Windows.")
  • twisted/internet/process.py

    === twisted/internet/process.py
    ==================================================================
     
    8282# Call at import time
    8383detectLinuxBrokenPipeBehavior()
    8484
     85
    8586class ProcessWriter(abstract.FileDescriptor):
    8687    """(Internal) Helper class to write into a Process's input pipe.
    8788
     
    233234        self.proc.childConnectionLost(self.name, reason)
    234235
    235236
    236 class Process(styles.Ephemeral):
    237     """An operating-system Process.
     237class ProcessPTY(abstract.FileDescriptor):
     238    connected = 1
    238239
    239     This represents an operating-system process with arbitrary input/output
    240     pipes connected to it. Those pipes may represent standard input,
    241     standard output, and standard error, or any other file descriptor.
     240    def __init__(self, reactor, proc, name, fileno):
     241        abstract.FileDescriptor.__init__(self, reactor)
     242        fdesc.setNonBlocking(fileno)
     243        self.proc = proc
     244        self.name = name
     245        self.fd = fileno
     246        self.startReading()
    242247
    243     On UNIX, this is implemented using fork(), exec(), pipe()
    244     and fcntl(). These calls may not exist elsewhere so this
    245     code is not cross-platform. (also, windows can only select
    246     on sockets...)
    247     """
     248    def fileno(self):
     249        return self.fd
    248250
    249     debug = False
    250     debug_child = False
     251    def writeSomeData(self, data):
     252        try:
     253            return os.write(self.fd, data)
     254        except IOError,io:
     255            if io.args[0] == errno.EAGAIN:
     256                return 0
     257            return CONNECTION_LOST
     258        except OSError, ose:
     259            if ose.errno == errno.EAGAIN: # MacOS-X does this # FIXME: really needed?
     260                return 0
     261            raise
    251262
    252     status = -1
    253     pid = None
     263    def doRead(self):
     264        try:
     265            return fdesc.readFromFD(self.fd, self.dataReceived)
     266        except OSError: # FIXME: really needed?
     267            return CONNECTION_LOST
    254268
    255     def __init__(self,
    256                  reactor, command, args, environment, path, proto,
    257                  uid=None, gid=None, childFDs=None):
    258         """Spawn an operating-system process.
     269    def dataReceived(self, data):
     270        self.proc.childDataReceived(self.name, data)
    259271
    260         This is where the hard work of disconnecting all currently open
    261         files / forking / executing the new process happens.  (This is
    262         executed automatically when a Process is instantiated.)
     272    def connectionLost(self, reason):
     273        abstract.FileDescriptor.connectionLost(self, reason)
     274        self.proc.childConnectionLost(self.name, reason)
    263275
    264         This will also run the subprocess as a given user ID and group ID, if
    265         specified.  (Implementation Note: this doesn't support all the arcane
    266         nuances of setXXuid on UNIX: it will assume that either your effective
    267         or real UID is 0.)
    268         """
    269         if not proto:
    270             assert 'r' not in childFDs.values()
    271             assert 'w' not in childFDs.values()
    272         if not signal.getsignal(signal.SIGCHLD):
    273             log.msg("spawnProcess called, but the SIGCHLD handler is not "
    274                     "installed. This probably means you have not yet "
    275                     "called reactor.run, or called "
    276                     "reactor.run(installSignalHandler=0). You will probably "
    277                     "never see this process finish, and it may become a "
    278                     "zombie process.")
    279             # if you see this message during a unit test, look in
    280             # test-standard.xhtml or twisted.test.test_process.SignalMixin
    281             # for a workaround
    282276
    283         self.lostProcess = False
     277class ProcessHelper:
     278    def setupPrefork(self, proc):
     279        raise NotImplementedError("%s does not implement setup" %
     280                                  reflect.qual(self.__class__))
    284281
    285         settingUID = (uid is not None) or (gid is not None)
    286         if settingUID:
    287             curegid = os.getegid()
    288             currgid = os.getgid()
    289             cureuid = os.geteuid()
    290             curruid = os.getuid()
    291             if uid is None:
    292                 uid = cureuid
    293             if gid is None:
    294                 gid = curegid
    295             # prepare to change UID in subprocess
    296             os.setuid(0)
    297             os.setgid(0)
     282    def setupChild(self, childProc):
     283        raise NotImplementedError("%s does not implement setup" %
     284                                  reflect.qual(self.__class__))
    298285
    299         self.pipes = {}
    300         # keys are childFDs, we can sense them closing
    301         # values are ProcessReader/ProcessWriters
     286    def setupParent(self, proc):
     287        raise NotImplementedError("%s does not implement setup" %
     288                                  reflect.qual(self.__class__))
    302289
    303         helpers = {}
    304         # keys are childFDs
    305         # values are parentFDs
    306290
    307         if childFDs is None:
    308             childFDs = {0: "w", # we write to the child's stdin
    309                         1: "r", # we read from their stdout
    310                         2: "r", # and we read from their stderr
    311                         }
     291class SetuidHelper(ProcessHelper):
     292    def __init__(self, uid=None, gid=None):
     293        self.uid = uid
     294        self.gid = gid
    312295
    313         debug = self.debug
    314         if debug: print "childFDs", childFDs
     296    def setupPrefork(self, proc):
     297        curegid = os.getegid()
     298        currgid = os.getgid()
     299        cureuid = os.geteuid()
     300        curruid = os.getuid()
     301        if uid is None:
     302            uid = cureuid
     303        if gid is None:
     304            gid = curegid
     305        # prepare to change UID in subprocess
     306        os.setuid(0)
     307        os.setgid(0)
    315308
    316         # fdmap.keys() are filenos of pipes that are used by the child.
    317         fdmap = {} # maps childFD to parentFD
    318         for childFD, target in childFDs.items():
    319             if debug: print "[%d]" % childFD, target
    320             if target == "r":
    321                 # we need a pipe that the parent can read from
    322                 readFD, writeFD = os.pipe()
    323                 if debug: print "readFD=%d, writeFD%d" % (readFD, writeFD)
    324                 fdmap[childFD] = writeFD     # child writes to this
    325                 helpers[childFD] = readFD    # parent reads from this
    326             elif target == "w":
    327                 # we need a pipe that the parent can write to
    328                 readFD, writeFD = os.pipe()
    329                 if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD)
    330                 fdmap[childFD] = readFD      # child reads from this
    331                 helpers[childFD] = writeFD   # parent writes to this
    332             else:
    333                 assert type(target) == int, '%r should be an int' % (target,)
    334                 fdmap[childFD] = target      # parent ignores this
    335         if debug: print "fdmap", fdmap
    336         if debug: print "helpers", helpers
    337         # the child only cares about fdmap.values()
     309    def setupChild(self, childProc):
     310        switchUID(uid, gid)
    338311
    339         self.pid = os.fork()
    340         if self.pid == 0: # pid is 0 in the child process
     312    def setupParent(self, proc):
     313        os.setregid(currgid, curegid)
     314        os.setreuid(curruid, cureuid)
    341315
    342             # do not put *ANY* code outside the try block. The child process
    343             # must either exec or _exit. If it gets outside this block (due
    344             # to an exception that is not handled here, but which might be
    345             # handled higher up), there will be two copies of the parent
    346             # running in parallel, doing all kinds of damage.
    347316
    348             # After each change to this code, review it to make sure there
    349             # are no exit paths.
     317class ChdirHelper(ProcessHelper):
     318    def __init__(self, path):
     319        self.path = path
    350320
     321    def setupPrefork(self, proc):
     322        pass
     323
     324    def setupChild(self, childProc):
     325        os.chdir(self.path)
     326
     327    def setupParent(self, proc):
     328        pass
     329
     330
     331class PassthroughFD(ProcessHelper):
     332    def __init__(self, fd, dstFDs):
     333        self.fd = fd
     334        self.dstFDs = dstFDs
     335
     336    def setupPrefork(self, proc):
     337        pass
     338
     339    def setupChild(self, childProc):
     340        for dstFD in self.dstFDs:
     341            childProc.mapFD(self.fd, dstFD)
     342
     343    def setupParent(self, proc):
     344        pass
     345
     346
     347class ReadPipe(ProcessHelper):
     348    def __init__(self, name, dstFDs):
     349        self.name = name
     350        self.dstFDs = dstFDs
     351   
     352    def setupPrefork(self, proc):
     353        readFD, writeFD = os.pipe()
     354        self.parentFD = readFD
     355        self.childFD = writeFD
     356
     357    def setupChild(self, childProc):
     358        os.close(self.parentFD)
     359        for dstFD in self.dstFDs:
     360            childProc.mapFD(self.childFD, dstFD)
     361
     362    def setupParent(self, proc):
     363        os.close(self.childFD)
     364        reader = ProcessReader(proc.reactor, proc, self.name, self.parentFD)
     365        proc.addChannel(self.name, reader)
     366
     367
     368class WritePipe(ProcessHelper):
     369    def __init__(self, name, dstFDs):
     370        self.name = name
     371        self.dstFDs = dstFDs
     372   
     373    def setupPrefork(self, proc):
     374        readFD, writeFD = os.pipe()
     375        self.parentFD = writeFD
     376        self.childFD = readFD
     377
     378    def setupChild(self, childProc):
     379        os.close(self.parentFD)
     380        for dstFD in self.dstFDs:
     381            childProc.mapFD(self.childFD, dstFD)
     382
     383    def setupParent(self, proc):
     384        os.close(self.childFD)
     385        writer = ProcessWriter(proc.reactor, proc, self.name, self.parentFD)
     386        proc.addChannel(self.name, writer)
     387
     388
     389class PTY(ProcessHelper):
     390    def __init__(self, name, dstFDs, usePTY=None):
     391        self.name = name
     392        self.dstFDs = dstFDs
     393        if not pty and type(usePTY) not in (types.ListType, types.TupleType):
     394            # no pty module and we didn't get a pty to use
     395            raise NotImplementedError, "cannot use PTYProcess on platforms without the pty module."
     396        self.usePTY = usePTY
     397
     398    def setupPrefork(self, proc):
     399        if type(self.usePTY) in (types.ListType, types.TupleType):
     400            masterFD, slaveFD, ttyname = self.usePTY
     401        else:
     402            masterFD, slaveFD = pty.openpty()
     403            ttyname = os.ttyname(slaveFD)
     404        self.parentFD = masterFD
     405        self.childFD = slaveFD
     406        self.ttyname = ttyname
     407
     408    def setupChild(self, childProc):
     409        os.close(self.parentFD)
     410        if hasattr(termios, 'TIOCNOTTY'):
    351411            try:
    352                 # stop debugging, if I am!  I don't care anymore!
    353                 sys.settrace(None)
    354                 # close all parent-side pipes
    355                 self._setupChild(fdmap)
    356                 self._execChild(path, settingUID, uid, gid,
    357                                 command, args, environment)
    358             except:
    359                 # If there are errors, bail and try to write something
    360                 # descriptive to stderr.
    361                 # XXX: The parent's stderr isn't necessarily fd 2 anymore, or
    362                 #      even still available
    363                 # XXXX: however even libc assumes write(2,err) is a useful
    364                 #       thing to attempt
     412                fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
     413            except OSError:
     414                pass
     415            else:
    365416                try:
    366                     stderr = os.fdopen(2,'w')
    367                     stderr.write("Upon execvpe %s %s in environment %s\n:" %
    368                                  (command, str(args),
    369                                   "id %s" % id(environment)))
    370                     traceback.print_exc(file=stderr)
    371                     stderr.flush()
    372                     for fd in range(3):
    373                         os.close(fd)
     417                    fcntl.ioctl(fd, termios.TIOCNOTTY, '')
    374418                except:
    375                     pass # make *sure* the child terminates
    376             # Did you read the comment about not adding code here?
    377             os._exit(1)
     419                    pass
     420                os.close(fd)
     421                   
     422        os.setsid()
     423               
     424        if hasattr(termios, 'TIOCSCTTY'):
     425            fcntl.ioctl(self.childFD, termios.TIOCSCTTY, '')
    378426
    379         # we are the parent
     427        for dstFD in self.dstFDs:
     428            childProc.mapFD(self.childFD, dstFD)
    380429
    381         if settingUID:
    382             os.setregid(currgid, curegid)
    383             os.setreuid(curruid, cureuid)
    384         self.status = -1 # this records the exit status of the child
     430    def setupParent(self, proc):
     431        os.close(self.childFD)
     432        processpty = ProcessPTY(proc.reactor, proc, self.name, self.parentFD)
     433        proc.addChannel(self.name, processpty)
    385434
     435
     436class SocketPair(ProcessHelper):
     437    pass
     438
     439
     440class ProcessProtocolFactory(protocol.Factory):
     441    protocol = protocol.ProcessProtocol
     442
     443    def processSetup(self, proc):
     444        pass
     445
     446
     447class CompatibilityProcessProtocolFactory(ProcessProtocolFactory):
     448    def __init__(self, proto=None, path=None, uid=None, gid=None, usePTY=0, childFDs=None):
     449        if usePTY and childFDs is not None:
     450            raise ValueError("Using childFDs is not supported with usePTY=True.")
     451        if not proto:
     452            assert 'r' not in childFDs.values()
     453            assert 'w' not in childFDs.values()
    386454        self.proto = proto
    387        
    388         # arrange for the parent-side pipes to be read and written
    389         for childFD, parentFD in helpers.items():
    390             os.close(fdmap[childFD])
     455        self.path = path
     456        self.uid = uid
     457        self.gid = gid
     458        self.usePTY = usePTY
     459        self.childFDs = childFDs
     460        self.used = False
     461   
     462    def processSetup(self, proc):
     463        if (self.uid is not None) or (self.gid is not None):
     464            proc.addHelper(SetuidHelper(self.uid, self.gid))
    391465
    392             if childFDs[childFD] == "r":
    393                 reader = ProcessReader(reactor, self, childFD, parentFD)
    394                 self.pipes[childFD] = reader
     466        if self.path is not None:
     467            proc.addHelper(ChdirHelper(self.path))
    395468
    396             if childFDs[childFD] == "w":
    397                 writer = ProcessWriter(reactor, self, childFD, parentFD, forceReadHack=True)
    398                 self.pipes[childFD] = writer
     469        if self.usePTY:
     470            proc.addHelper(PTY(1, [0, 1, 2], self.usePTY))
     471            proc.setDefaultWriter(1)
     472        elif self.childFDs is None:
     473            proc.addHelper(WritePipe(0, [0]))
     474            proc.addHelper(ReadPipe(1, [1]))
     475            proc.addHelper(ReadPipe(2, [2]))
     476        else:
     477            for childFD, target in self.childFDs.items():
     478                if target == "r":
     479                    proc.addHelper(ReadPipe(childFD, [childFD]))
     480                elif target == "w":
     481                    proc.addHelper(WritePipe(childFD, [childFD]))
     482                else:
     483                    assert type(target) == int, '%r should be an int' % (target,)
     484                    proc.addHelper(PassthroughFD(target, [childFD]))
    399485
    400         try:
    401             # the 'transport' is used for some compatibility methods
    402             if self.proto is not None:
    403                 self.proto.makeConnection(self)
    404         except:
    405             log.err()
    406         registerReapProcessHandler(self.pid, self)
     486    def buildProtocol(self, addr):
     487        if self.used:
     488            raise ValueError("Cannot use CompatibilityProcessProtocolFactory more than once.")
     489        self.used = True
     490        if self.proto is None:
     491            return protocol.ProcessProtocol()
     492        else:
     493            return self.proto
    407494
    408     def _setupChild(self, fdmap):
     495
     496class ChildProcess(styles.Ephemeral):
     497    debug = False
     498   
     499    def __init__(self, command, args, environment, helpers):
     500        self.command = command
     501        self.args = args
     502        self.environment = environment
     503        self.helpers = helpers
     504        self.fdmap = {}
     505   
     506    def run(self):
     507        # stop debugging, if I am!  I don't care anymore!
     508        sys.settrace(None)
     509        # call all helpers
     510        for helper in self.helpers:
     511            helper.setupChild(self)
     512        # map all FDs to where they belong
     513        self._setupChild()
     514        self._execChild()
     515
     516    def mapFD(self, src, dst):
     517        # FIXME: check duplicates?
     518        self.fdmap[dst] = src
     519
     520    def _setupChild(self):
    409521        """
    410522        fdmap[childFD] = parentFD
    411523
     
    435547                   original.
    436548        """
    437549
    438         debug = self.debug_child
     550        fdmap = self.fdmap
     551        debug = self.debug
    439552        if debug:
    440553            #errfd = open("/tmp/p.err", "a", 0)
    441554            errfd = sys.stderr
     
    509622        for fd in old:
    510623            os.close(fd)
    511624
    512     def _execChild(self, path, settingUID, uid, gid,
    513                    command, args, environment):
    514         if path:
    515             os.chdir(path)
    516         # set the UID before I actually exec the process
    517         if settingUID:
    518             switchUID(uid, gid)
    519         os.execvpe(command, args, environment)
     625    def _execChild(self):
     626        os.execvpe(self.command, self.args, self.environment)
    520627
     628
     629class Process(styles.Ephemeral):
     630    """An operating-system Process.
     631
     632    This represents an operating-system process with arbitrary input/output
     633    pipes connected to it. Those pipes may represent standard input,
     634    standard output, and standard error, or any other file descriptor.
     635
     636    On UNIX, this is implemented using fork(), exec(), pipe()
     637    and fcntl(). These calls may not exist elsewhere so this
     638    code is not cross-platform. (also, windows can only select
     639    on sockets...)
     640    """
     641
     642    debug = False
     643    debug_child = False
     644
     645    status = -1
     646    pid = None
     647
     648    defaultWriter = 0 # FIXME: hack
     649
     650    def __init__(self,
     651                 reactor, command, args, environment, factory):
     652        """Spawn an operating-system process.
     653
     654        This is where the hard work of disconnecting all currently open
     655        files / forking / executing the new process happens.  (This is
     656        executed automatically when a Process is instantiated.)
     657
     658        This will also run the subprocess as a given user ID and group ID, if
     659        specified.  (Implementation Note: this doesn't support all the arcane
     660        nuances of setXXuid on UNIX: it will assume that either your effective
     661        or real UID is 0.)
     662        """
     663        if not signal.getsignal(signal.SIGCHLD):
     664            log.msg("spawnProcess called, but the SIGCHLD handler is not "
     665                    "installed. This probably means you have not yet "
     666                    "called reactor.run, or called "
     667                    "reactor.run(installSignalHandler=0). You will probably "
     668                    "never see this process finish, and it may become a "
     669                    "zombie process.")
     670            # if you see this message during a unit test, look in
     671            # test-standard.xhtml or twisted.test.test_process.SignalMixin
     672            # for a workaround
     673
     674        self.reactor = reactor # FIXME: how to handle?
     675
     676        self.factory = factory
     677       
     678        self.lostProcess = False
     679
     680        self.pipes = {}
     681        # keys are childFDs, we can sense them closing
     682        # values are ProcessReader/ProcessWriters
     683
     684        self.helpers = []
     685        # list of (helper, name, childFDlist)
     686
     687        factory.processSetup(self)
     688
     689        for helper in self.helpers:
     690            helper.setupPrefork(self)
     691
     692        # FIXME: check for duplicates in childFD lists?
     693
     694        self.pid = os.fork()
     695        if self.pid == 0: # pid is 0 in the child process
     696
     697            # do not put *ANY* code outside the try block. The child process
     698            # must either exec or _exit. If it gets outside this block (due
     699            # to an exception that is not handled here, but which might be
     700            # handled higher up), there will be two copies of the parent
     701            # running in parallel, doing all kinds of damage.
     702
     703            # After each change to this code, review it to make sure there
     704            # are no exit paths.
     705
     706            try:
     707                child = ChildProcess(command, args, environment, self.helpers)
     708                child.run()
     709            except:
     710                # If there are errors, bail and try to write something
     711                # descriptive to stderr.
     712                # XXX: The parent's stderr isn't necessarily fd 2 anymore, or
     713                #      even still available
     714                # XXXX: however even libc assumes write(2,err) is a useful
     715                #       thing to attempt
     716                try:
     717                    stderr = os.fdopen(2,'w')
     718                    stderr.write("Upon execvpe %s %s in environment %s\n:" %
     719                                 (command, str(args),
     720                                  "id %s" % id(environment)))
     721                    traceback.print_exc(file=stderr)
     722                    stderr.flush()
     723                    for fd in range(3):
     724                        os.close(fd)
     725                except:
     726                    pass # make *sure* the child terminates
     727            # Did you read the comment about not adding code here?
     728            os._exit(1)
     729
     730        # we are the parent
     731
     732        self.status = -1 # this records the exit status of the child
     733
     734        # setup the parent side helpers
     735        for helper in self.helpers:
     736            helper.setupParent(self)
     737
     738        try:
     739            self.proto = self.factory.buildProtocol(None)
     740            self.proto.makeConnection(self)
     741        except:
     742            log.err()
     743        registerReapProcessHandler(self.pid, self)
     744
     745    def addHelper(self, helper):
     746        self.helpers.append(helper)
     747
     748    def addChannel(self, name, transport):
     749        self.pipes[name] = transport
     750
     751    def setDefaultWriter(self, name):
     752        self.defaultWriter = name
     753
    521754    def reapProcess(self):
    522755        """Try to reap a process (without blocking) via waitpid.
    523756
     
    580813       
    581814        NOTE: This will silently lose data if there is no standard input.
    582815        """
    583         if self.pipes.has_key(0):
    584             self.pipes[0].write(data)
     816        if self.pipes.has_key(self.defaultWriter):
     817            self.pipes[self.defaultWriter].write(data)
    585818
    586819    def registerProducer(self, producer, streaming):
    587820        """Call this to register producer for standard input.
     
    589822        If there is no standard input producer.stopProducing() will
    590823        be called immediately.
    591824        """
    592         if self.pipes.has_key(0):
    593             self.pipes[0].registerProducer(producer, streaming)
     825        if self.pipes.has_key(self.defaultWriter):
     826            self.pipes[self.defaultWriter].registerProducer(producer, streaming)
    594827        else:
    595828            producer.stopProducing()
    596829
    597830    def unregisterProducer(self):
    598831        """Call this to unregister producer for standard input."""
    599         if self.pipes.has_key(0):
    600             self.pipes[0].unregisterProducer()
     832        if self.pipes.has_key(self.defaultWriter):
     833            self.pipes[self.defaultWriter].unregisterProducer()
    601834   
    602835    def writeSequence(self, seq):
    603836        """Call this to write to standard input on this process.
    604837
    605838        NOTE: This will silently lose data if there is no standard input.
    606839        """
    607         if self.pipes.has_key(0):
    608             self.pipes[0].writeSequence(seq)
     840        if self.pipes.has_key(self.defaultWriter):
     841            self.pipes[self.defaultWriter].writeSequence(seq)
    609842
    610843    def childDataReceived(self, name, data):
    611844        self.proto.childDataReceived(name, data)
     
    673906    def __repr__(self):
    674907        return "<%s pid=%s status=%s>" % (self.__class__.__name__,
    675908                                          self.pid, self.status)
    676 
    677 class PTYProcess(abstract.FileDescriptor, styles.Ephemeral):
    678     """An operating-system Process that uses PTY support."""
    679     status = -1
    680     pid = None
    681    
    682     def __init__(self, reactor, command, args, environment, path, proto,
    683                  uid=None, gid=None, usePTY=None):
    684         """Spawn an operating-system process.
    685 
    686         This is where the hard work of disconnecting all currently open
    687         files / forking / executing the new process happens.  (This is
    688         executed automatically when a Process is instantiated.)
    689 
    690         This will also run the subprocess as a given user ID and group ID, if
    691         specified.  (Implementation Note: this doesn't support all the arcane
    692         nuances of setXXuid on UNIX: it will assume that either your effective
    693         or real UID is 0.)
    694         """
    695         if not pty and type(usePTY) not in (types.ListType, types.TupleType):
    696             # no pty module and we didn't get a pty to use
    697             raise NotImplementedError, "cannot use PTYProcess on platforms without the pty module."
    698         abstract.FileDescriptor.__init__(self, reactor)
    699         settingUID = (uid is not None) or (gid is not None)
    700         if settingUID:
    701             curegid = os.getegid()
    702             currgid = os.getgid()
    703             cureuid = os.geteuid()
    704             curruid = os.getuid()
    705             if uid is None:
    706                 uid = cureuid
    707             if gid is None:
    708                 gid = curegid
    709             # prepare to change UID in subprocess
    710             os.setuid(0)
    711             os.setgid(0)
    712         if type(usePTY) in (types.TupleType, types.ListType):
    713             masterfd, slavefd, ttyname = usePTY
    714         else:
    715             masterfd, slavefd = pty.openpty()
    716             ttyname = os.ttyname(slavefd)
    717         pid = os.fork()
    718         self.pid = pid
    719         if pid == 0: # pid is 0 in the child process
    720             try:
    721                 sys.settrace(None)
    722                 os.close(masterfd)
    723                 if hasattr(termios, 'TIOCNOTTY'):
    724                     try:
    725                         fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
    726                     except OSError:
    727                         pass
    728                     else:
    729                         try:
    730                             fcntl.ioctl(fd, termios.TIOCNOTTY, '')
    731                         except:
    732                             pass
    733                         os.close(fd)
    734                    
    735                 os.setsid()
    736                
    737                 if hasattr(termios, 'TIOCSCTTY'):
    738                     fcntl.ioctl(slavefd, termios.TIOCSCTTY, '')
    739                
    740                 for fd in range(3):
    741                     if fd != slavefd:
    742                         os.close(fd)
    743 
    744                 os.dup2(slavefd, 0) # stdin
    745                 os.dup2(slavefd, 1) # stdout
    746                 os.dup2(slavefd, 2) # stderr
    747 
    748                 if path:
    749                     os.chdir(path)
    750                 for fd in range(3, 256):
    751                     try:    os.close(fd)
    752                     except: pass
    753 
    754                 # set the UID before I actually exec the process
    755                 if settingUID:
    756                     switchUID(uid, gid)
    757                 os.execvpe(command, args, environment)
    758             except:
    759                 stderr = os.fdopen(1, 'w')
    760                 stderr.write("Upon execvpe %s %s in environment %s:\n" %
    761                              (command, str(args),
    762                               "id %s" % id(environment)))
    763                 traceback.print_exc(file=stderr)
    764                 stderr.flush()
    765             os._exit(1)
    766         assert pid!=0
    767         os.close(slavefd)
    768         fdesc.setNonBlocking(masterfd)
    769         self.fd=masterfd
    770         self.startReading()
    771         self.connected = 1
    772         self.proto = proto
    773         self.lostProcess = 0
    774         self.status = -1
    775         try:
    776             self.proto.makeConnection(self)
    777         except:
    778             log.err()
    779         registerReapProcessHandler(self.pid, self)
    780 
    781     def reapProcess(self):
    782         """Try to reap a process (without blocking) via waitpid.
    783 
    784         This is called when sigchild is caught or a Process object loses its
    785         "connection" (stdout is closed) This ought to result in reaping all
    786         zombie processes, since it will be called twice as often as it needs
    787         to be.
    788 
    789         (Unfortunately, this is a slightly experimental approach, since
    790         UNIX has no way to be really sure that your process is going to
    791         go away w/o blocking.  I don't want to block.)
    792         """
    793         try:
    794             pid, status = os.waitpid(self.pid, os.WNOHANG)
    795         except OSError, e:
    796             if e.errno == errno.ECHILD: # no child process
    797                 pid = None
    798             else:
    799                 raise
    800         except:
    801             log.err()
    802             pid = None
    803         if pid:
    804             self.processEnded(status)
    805             unregisterReapProcessHandler(self.pid, self)
    806 
    807     # PTYs do not have stdin/stdout/stderr. They only have in and out, just
    808     # like sockets. You cannot close one without closing off the entire PTY.
    809     def closeStdin(self):
    810         pass
    811 
    812     def closeStdout(self):
    813         pass
    814 
    815     def closeStderr(self):
    816         pass
    817 
    818     def signalProcess(self, signalID):
    819         if signalID in ('HUP', 'STOP', 'INT', 'KILL'):
    820             signalID = getattr(signal, 'SIG'+signalID)
    821         os.kill(self.pid, signalID)
    822 
    823     def processEnded(self, status):
    824         self.status = status
    825         self.lostProcess += 1
    826         self.maybeCallProcessEnded()
    827 
    828     def doRead(self):
    829         """Called when my standard output stream is ready for reading.
    830         """
    831         try:
    832             return fdesc.readFromFD(self.fd, self.proto.outReceived)
    833         except OSError:
    834             return CONNECTION_LOST
    835 
    836     def fileno(self):
    837         """This returns the file number of standard output on this process.
    838         """
    839         return self.fd
    840 
    841     def maybeCallProcessEnded(self):
    842         # two things must happen before we call the ProcessProtocol's
    843         # processEnded method. 1: the child process must die and be reaped
    844         # (which calls our own processEnded method). 2: the child must close
    845         # their stdin/stdout/stderr fds, causing the pty to close, causing
    846         # our connectionLost method to be called. #2 can also be triggered
    847         # by calling .loseConnection().
    848         if self.lostProcess == 2:
    849             try:
    850                 exitCode = sig = None
    851                 if self.status != -1:
    852                     if os.WIFEXITED(self.status):
    853                         exitCode = os.WEXITSTATUS(self.status)
    854                     else:
    855                         sig = os.WTERMSIG(self.status)
    856                 else:
    857                     pass # wonder when this happens
    858                 if exitCode or sig:
    859                     e = error.ProcessTerminated(exitCode, sig, self.status)
    860                 else:
    861                     e = error.ProcessDone(self.status)
    862                 self.proto.processEnded(failure.Failure(e))
    863                 self.proto = None
    864             except:
    865                 log.err()
    866 
    867     def connectionLost(self, reason):
    868         """I call this to clean up when one or all of my connections has died.
    869         """
    870         abstract.FileDescriptor.connectionLost(self, reason)
    871         os.close(self.fd)
    872         self.lostProcess +=1
    873         self.maybeCallProcessEnded()
    874 
    875     def writeSomeData(self, data):
    876         """Write some data to the open process.
    877         """
    878         try:
    879             return os.write(self.fd, data)
    880         except IOError,io:
    881             if io.args[0] == errno.EAGAIN:
    882                 return 0
    883             return CONNECTION_LOST
    884 
    885     def __repr__(self):
    886         return "<%s pid=%s status=%s>" % (self.__class__.__name__,
    887                                           self.pid, self.status)