Ticket #1008: final_patch2.txt

File final_patch2.txt, 33.5 KB (added by justinj, 15 years ago)
Line 
1Index: twisted/test/process_tester.py
2===================================================================
3--- twisted/test/process_tester.py      (revision 14502)
4+++ twisted/test/process_tester.py      (working copy)
5@@ -2,7 +2,8 @@
6 
7 import sys, os
8 
9-test_file = "process_test.log"
10+test_file_match = "process_test.log.*"
11+test_file = "process_test.log.%d" % os.getpid()
12 
13 def main():
14     f = open(test_file, 'wb')
15Index: twisted/test/test_process.py
16===================================================================
17--- twisted/test/test_process.py        (revision 14502)
18+++ twisted/test/test_process.py        (working copy)
19@@ -40,6 +40,10 @@
20 class TestProcessProtocol(protocol.ProcessProtocol):
21 
22     finished = 0
23+   
24+    def __init__(self):
25+       
26+        self.deferred = defer.Deferred()
27 
28     def connectionMade(self):
29         self.stages = [1]
30@@ -73,6 +77,10 @@
31     def processEnded(self, reason):
32         self.finished = 1
33         self.reason = reason
34+        if reason.check(error.ProcessDone):
35+            self.deferred.callback(None)
36+        else:
37+            self.deferred.errback(reason)
38 
39 class EchoProtocol(protocol.ProcessProtocol):
40 
41@@ -186,10 +194,45 @@
42         #self.assertEquals(f.value.signal, None)
43 
44         try:
45-            import process_tester
46-            os.remove(process_tester.test_file)
47+            import process_tester, glob
48+            for f in glob.glob(process_tester.test_file_match):
49+                os.remove(f)
50         except:
51             pass
52+           
53+    def testManyProcesses(self):
54+       
55+        def openProcess(cmd_and_args, proto):
56+            cmd = cmd_and_args[0]
57+            args = cmd_and_args[1:]
58+            proto.cmd = cmd_and_args
59+            reactor.spawnProcess(proto, cmd, cmd_and_args, env=None)
60+            return proto.deferred
61+       
62+        def _check(results, protocols):
63+            for p in protocols:
64+                self.failUnless(p.finished)
65+                self.assertEquals(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages)))
66+                # test status code
67+                f = p.reason
68+                f.trap(error.ProcessTerminated)
69+                self.assertEquals(f.value.exitCode, 23)
70+       
71+        exe = sys.executable
72+        scriptPath = util.sibpath(__file__, "process_tester.py")
73+        cmd = [exe, "-u", scriptPath]
74+        protocols = []
75+        deferreds = []
76+                   
77+        for i in xrange(200):
78+            p = TestProcessProtocol()
79+            protocols.append(p)
80+            d = openProcess(cmd, p)
81+            deferreds.append(d)
82+       
83+        deferredList = defer.DeferredList(deferreds)
84+        deferredList.addCallback(_check, protocols)
85+        return deferredList
86 
87     def testEcho(self):
88         finished = defer.Deferred()
89@@ -215,14 +258,14 @@
90         pyExe = sys.executable
91         scriptPath = util.sibpath(__file__, "process_cmdline.py")
92         p = Accumulator()
93-        reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath]+args, env=None,
94+        reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath]+args, env=None ,
95                              path=None)
96 
97         spinUntil(lambda :p.closed)
98         self.assertEquals(p.errF.getvalue(), "")
99         recvdArgs = p.outF.getvalue().splitlines()
100         self.assertEquals(recvdArgs, args)
101-       
102+
103     testEcho.timeout = 60
104 
105 class TwoProcessProtocol(protocol.ProcessProtocol):
106@@ -740,8 +783,9 @@
107         """ProcessProtocol.transport.closeStdout actually closes the pipe."""
108         d = self.doit(1)
109         def _check(errput):
110-            unittest.failIfEqual(errput.index('OSError'), -1)
111-            unittest.failIfEqual(errput.index('Broken pipe'), -1)
112+            unittest.failIfEqual(errput.find('OSError'), -1)
113+            if runtime.platform.getType() != 'win32':
114+                unittest.failIfEqual(errput.find('Broken pipe'), -1)
115         d.addCallback(_check)
116         return d
117 
118Index: twisted/internet/iocpreactor/proactor.py
119===================================================================
120--- twisted/internet/iocpreactor/proactor.py    (revision 14502)
121+++ twisted/internet/iocpreactor/proactor.py    (working copy)
122@@ -3,7 +3,8 @@
123 
124 
125 from twisted.internet import defer, base, main
126-from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorArbitrary
127+from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorArbitrary, IReactorProcess
128+from twisted.internet.iocpreactor import process
129 from twisted.python import threadable, log
130 from zope.interface import implements, implementsOnly
131 
132@@ -14,7 +15,7 @@
133     # TODO: IReactorArbitrary, IReactorUDP, IReactorMulticast,
134     # IReactorSSL (or leave it until exarkun finishes TLS)
135     # IReactorProcess, IReactorCore (cleanup)
136-    implementsOnly(IReactorTCP, IReactorUDP, IReactorArbitrary)
137+    implementsOnly(IReactorTCP, IReactorUDP, IReactorArbitrary, IReactorProcess)
138     handles = None
139     iocp = None
140 
141@@ -92,6 +93,11 @@
142         c = connectorType(*args, **kw)
143         c.connect()
144         return c
145+           
146+    def spawnProcess(self, processProtocol, executable, args=(), env={}, path=None, usePTY=0):
147+        """Spawn a process."""
148+        return process.Process(self, processProtocol, executable, args, env, path)
149+       
150 
151 def install():
152     from twisted.python import threadable
153Index: twisted/internet/iocpreactor/process.py
154===================================================================
155--- twisted/internet/iocpreactor/process.py     (revision 0)
156+++ twisted/internet/iocpreactor/process.py     (revision 0)
157@@ -0,0 +1,542 @@
158+# Win32 imports
159+import win32api
160+import win32gui
161+import win32con
162+import win32file
163+import win32pipe
164+import win32process
165+import win32security
166+from win32event import CreateEvent, SetEvent, WaitForSingleObject, MsgWaitForMultipleObjects, \
167+                       WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE, QS_ALLINPUT, QS_POSTMESSAGE, \
168+                       QS_ALLEVENTS
169+
170+# Zope & Twisted imports
171+from zope.interface import implements
172+from twisted.internet import error
173+from twisted.python import failure, components
174+from twisted.internet.interfaces import IProcessTransport
175+
176+# sibling imports
177+import ops
178+
179+# System imports
180+import os
181+import sys
182+import re
183+import time
184+import threading
185+import itertools
186+
187+# Counter for uniquely identifying pipes
188+counter = itertools.count(1)
189+
190+# Message ID must be greater than WM_USER or the system will do
191+# marshalling automatically.
192+#WM_NEW_PHANDLE = win32con.WM_USER + 1
193+#WM_CLOSE_THREAD = win32con.WM_USER + 2
194+WM_NEW_PHANDLE = win32con.WM_APP + 1
195+WM_CLOSE_THREAD = win32con.WM_APP + 2
196+
197+_cmdLineQuoteRe = re.compile(r'(\\*)"')
198+def _cmdLineQuote(s):
199+    return '"' + _cmdLineQuoteRe.sub(r'\1\1\\"', s) + '"'
200+
201+
202+
203+class Process(object):
204+    """A process that integrates with the Twisted event loop.
205+
206+    See http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dllproc/base/creating_a_child_process_with_redirected_input_and_output.asp
207+    for more info on how to create processes in Windows and access their
208+    stdout/err/in.  Another good source is http://www.informit.com/articles/article.asp?p=362660&seqNum=2.
209+   
210+    Issues:
211+
212+    If your subprocess is a python program, you need to:
213+
214+     - Run python.exe with the '-u' command line option - this turns on
215+       unbuffered I/O. Buffering stdout/err/in can cause problems, see e.g.
216+       http://support.microsoft.com/default.aspx?scid=kb;EN-US;q1903
217+
218+     - (is this still true?) If you don't want Windows messing with data passed over
219+       stdin/out/err, set the pipes to be in binary mode::
220+
221+        import os, sys, mscvrt
222+        msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
223+        msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
224+        msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
225+
226+    """
227+    implements(IProcessTransport)
228+   
229+    # I used this size because abstract.ConnectedSocket did.  I don't
230+    # know why though.
231+    bufferSize = 2**2**2**2
232+    # Per http://www-128.ibm.com/developerworks/linux/library/l-rt4/,
233+    # an extra 24 bytes are needed to handle write header.  I haven't seen
234+    # any problems not having the extra 24 bytes though, so I'm not
235+    # adding it to the size.  I comment here just in case it is discovered
236+    # to be necessary down the road.
237+    pipeBufferSize = bufferSize
238+   
239+    usedThreads = []  # threads waiting on 63 process handles
240+    availableThreads = [] # threads waiting on less than 63 process handles
241+
242+    threadToNumProcessHandles = {} # to track when a thread can handle more process handles
243+    threadToMsgWindowCreationEvent = {} # event signalled when msg queue created
244+    threadToMsgWindowCreated = {} # boolean indicated msg queue is created
245+    needWaiting = {} # used to pass process handles to new WaitForMultipleObjects thread
246+    phandleToTransport = {} # so we can call methods on the transport when a proc handle is signalled
247+    threadToMsgWindow = {} # since we need the window to call PostMessage
248+    phandleKeyToThreadHandle = {} # proc handle keys passed to PostThreadMessage to tell thread to wait on new proc handle
249+    threadToNumEnded = {}
250+
251+    def __init__(self, reactor, protocol, command, args, environment, path):
252+        self.reactor = reactor
253+        self.protocol = protocol
254+        self.outBuffer = reactor.AllocateReadBuffer(self.bufferSize)
255+        self.errBuffer = reactor.AllocateReadBuffer(self.bufferSize)
256+        # This is the buffer for *reading* stdin, which is only done to
257+        # determine if the other end of the pipe was closed.
258+        self.inBuffer = reactor.AllocateReadBuffer(self.bufferSize)
259+        # IO operation classes
260+        self.readOutOp = ops.ReadOutOp(self)
261+        self.readErrOp = ops.ReadErrOp(self)
262+        self.readInOp = ops.ReadInOp(self)
263+        self.writeInOp = ops.WriteInOp(self)
264+       
265+        self.writeBuffer = ""
266+        self.writing = False
267+        self.finished = False
268+        self.offset = 0
269+        self.writeBufferedSize = 0
270+        self.closingStdin = False
271+        self.closedStdin = False
272+        self.closedStdout = False
273+        self.closedStderr = False
274+        # Stdio handles
275+        self.hChildStdinRd = None
276+        self.hChildStdinWr = None
277+        self.hChildStdinWrDup = None
278+        self.hChildStdoutRd = None
279+        self.hChildStdoutWr = None
280+        self.hChildStdoutRdDup = None
281+        self.hChildStderrRd = None
282+        self.hChildStderrWr = None
283+        self.hChildStderrRdDup = None
284+        # handle of thread calling WaitForMultipleObjects on this process'es handle
285+        self.wfmoThread = None
286+       
287+        self.closedNotifies = 0  # increments to 3 (for stdin, stdout, stderr)
288+        self.closed = False # set to true when all 3 handles close
289+        self.exited = False # set to true when WFMO thread gets signalled proc handle.  See doWaitForProcessExit.
290+
291+        # Set the bInheritHandle flag so pipe handles are inherited.
292+        saAttr = win32security.SECURITY_ATTRIBUTES()
293+        saAttr.bInheritHandle = 1
294+       
295+        pid = win32api.GetCurrentProcess() # -1 which stands for current process
296+        self.realPid = os.getpid() # unique pid for pipe naming
297+       
298+        # Create a pipe for the child process's STDOUT.
299+        self.stdoutPipeName = r"\\.\pipe\twisted-iocp-stdout-%d-%d-%d" % (self.realPid, counter.next(), time.time())
300+        self.hChildStdoutRd = \
301+            win32pipe.CreateNamedPipe(self.stdoutPipeName,
302+                                      win32con.PIPE_ACCESS_INBOUND | win32con.FILE_FLAG_OVERLAPPED, # open mode
303+                                      win32con.PIPE_TYPE_BYTE, # pipe mode
304+                                      1, # max instances
305+                                      self.pipeBufferSize, # out buffer size
306+                                      self.pipeBufferSize, # in buffer size
307+                                      0, # timeout
308+                                      saAttr)
309+
310+        self.hChildStdoutWr = win32file.CreateFile(self.stdoutPipeName,
311+                                         win32con.GENERIC_WRITE,
312+                                         win32con.FILE_SHARE_READ|win32con.FILE_SHARE_WRITE,
313+                                         saAttr,
314+                                         win32con.OPEN_EXISTING,
315+                                         win32con.FILE_FLAG_OVERLAPPED,
316+                                         0);
317+
318+        # Create noninheritable read handle and close the inheritable read
319+        # handle.
320+        self.hChildStdoutRdDup = win32api.DuplicateHandle(
321+                                     pid, self.hChildStdoutRd,
322+                                     pid, 0,
323+                                     0,
324+                                     win32con.DUPLICATE_SAME_ACCESS)
325+        win32api.CloseHandle(self.hChildStdoutRd);
326+        self.hChildStdoutRd = self.hChildStdoutRdDup
327+       
328+        # Create a pipe for the child process's STDERR.
329+        self.stderrPipeName = r"\\.\pipe\twisted-iocp-stderr-%d-%d-%d" % (self.realPid, counter.next(), time.time())
330+        self.hChildStderrRd = \
331+            win32pipe.CreateNamedPipe(self.stderrPipeName,
332+                                      win32con.PIPE_ACCESS_INBOUND | win32con.FILE_FLAG_OVERLAPPED, # open mode
333+                                      win32con.PIPE_TYPE_BYTE, # pipe mode
334+                                      1, # max instances
335+                                      self.pipeBufferSize, # out buffer size
336+                                      self.pipeBufferSize, # in buffer size
337+                                      0, # timeout
338+                                      saAttr)
339+        self.hChildStderrWr = win32file.CreateFile(self.stderrPipeName,
340+                                         win32con.GENERIC_WRITE,
341+                                         win32con.FILE_SHARE_READ|win32con.FILE_SHARE_WRITE,
342+                                         saAttr,
343+                                         win32con.OPEN_EXISTING,
344+                                         win32con.FILE_FLAG_OVERLAPPED,
345+                                         0);
346+
347+        # Create noninheritable read handle and close the inheritable read
348+        # handle.
349+        self.hChildStderrRdDup = win32api.DuplicateHandle(
350+                                     pid, self.hChildStderrRd,
351+                                     pid, 0,
352+                                     0,
353+                                     win32con.DUPLICATE_SAME_ACCESS)
354+        win32api.CloseHandle(self.hChildStderrRd)
355+        self.hChildStderrRd = self.hChildStderrRdDup
356+       
357+       
358+        # Create a pipe for the child process's STDIN. This one is opened
359+        # in duplex mode so we can read from it too in order to detect when
360+        # the child closes their end of the pipe.
361+        self.stdinPipeName = r"\\.\pipe\twisted-iocp-stdin-%d-%d-%d" % (self.realPid, counter.next(), time.time())
362+        self.hChildStdinWr = \
363+            win32pipe.CreateNamedPipe(self.stdinPipeName,
364+                                      win32con.PIPE_ACCESS_DUPLEX | win32con.FILE_FLAG_OVERLAPPED, # open mode
365+                                      win32con.PIPE_TYPE_BYTE, # pipe mode
366+                                      1, # max instances
367+                                      self.pipeBufferSize, # out buffer size
368+                                      self.pipeBufferSize, # in buffer size
369+                                      0, # timeout
370+                                      saAttr)
371+
372+        self.hChildStdinRd = win32file.CreateFile(self.stdinPipeName,
373+                                         win32con.GENERIC_READ,
374+                                         win32con.FILE_SHARE_READ|win32con.FILE_SHARE_WRITE,
375+                                         saAttr,
376+                                         win32con.OPEN_EXISTING,
377+                                         win32con.FILE_FLAG_OVERLAPPED,
378+                                         0);
379+       
380+       
381+        # Duplicate the write handle to the pipe so it is not inherited.
382+        self.hChildStdinWrDup = win32api.DuplicateHandle(
383+                                    pid, self.hChildStdinWr,
384+                                    pid, 0,
385+                                    0,
386+                                    win32con.DUPLICATE_SAME_ACCESS)
387+        win32api.CloseHandle(self.hChildStdinWr)
388+        self.hChildStdinWr = self.hChildStdinWrDup
389+       
390+        # set the info structure for the new process.  This is where
391+        # we tell the process to use the pipes for stdout/err/in.
392+        StartupInfo = win32process.STARTUPINFO()
393+        StartupInfo.hStdOutput = self.hChildStdoutWr
394+        StartupInfo.hStdError  = self.hChildStderrWr
395+        StartupInfo.hStdInput  = self.hChildStdinRd
396+        StartupInfo.dwFlags = win32process.STARTF_USESTDHANDLES
397+       
398+        # create the process
399+        cmdline = ' '.join([_cmdLineQuote(a) for a in args])
400+        self.hProcess, hThread, dwPid, dwTid = \
401+            win32process.CreateProcess(command,     # name
402+                                       cmdline,     # command line
403+                                       None,        # process security attributes
404+                                       None,        # primary thread security attributes
405+                                       1,           # handles are inherited
406+                                       0,           # creation flags
407+                                       environment, # if NULL, use parent environment
408+                                       path,        # current directory
409+                                       StartupInfo) # STARTUPINFO pointer
410+
411+       
412+
413+        # close handles which only the child will use
414+        win32file.CloseHandle(self.hChildStderrWr)
415+        win32file.CloseHandle(self.hChildStdoutWr)
416+        win32file.CloseHandle(self.hChildStdinRd)
417+       
418+        # Begin reading on stdout and stderr, before we have output on them.
419+        self.readOutOp.initiateOp(self.hChildStdoutRd, self.outBuffer)
420+        self.readErrOp.initiateOp(self.hChildStderrRd, self.errBuffer)
421+        # Read stdin which was opened in duplex mode so we can detect when
422+        # the child closed their end of the pipe.
423+        self.readInOp.initiateOp(self.hChildStdinWr, self.inBuffer)
424+
425+        # When the process is done, call connectionLost().
426+        # This function returns right away.  Note I call this after
427+        # protocol.makeConnection to ensure that the protocol doesn't
428+        # have processEnded called before protocol.makeConnection.
429+        self.notifyOnExit()
430+       
431+        # notify protocol by calling protocol.makeConnection and specifying
432+        # ourself as the transport.
433+        self.protocol.makeConnection(self)
434+       
435+       
436+       
437+    def notifyOnExit(self):
438+        # Pass the process handle to the thread via a key into a global dict
439+        self.processHandleKey = counter.next()
440+        self.needWaiting[self.processHandleKey] = self.hProcess
441+        self.phandleToTransport[self.hProcess] = self
442+           
443+        # If there are available threads, use one of them
444+        if len(self.availableThreads) > 0:
445+            self.wfmoThread = self.availableThreads[0]
446+            self.threadToNumProcessHandles[self.wfmoThread] += 1
447+            # Update used/available thread lists
448+            if self.threadToNumProcessHandles[self.wfmoThread] == 63:
449+                self.usedThreads.append(self.wfmoThread)
450+                self.availableThreads.remove(self.wfmoThread)
451+           
452+            if self.threadToMsgWindowCreated[self.wfmoThread] is False:
453+                # Make sure the window has been created so we can send messages to it
454+                val = WaitForSingleObject(self.threadToMsgWindowCreationEvent[self.wfmoThread], INFINITE)
455+                if val != WAIT_OBJECT_0:
456+                    raise RuntimeError("WaitForSingleObject returned %d.  It should only return %d" % (val, WAIT_OBJECT_0))
457+            # Notify the thread that it should wait on the process handle.
458+            if win32api.PostMessage(
459+                                self.threadToMsgWindow[self.wfmoThread],
460+                                WM_NEW_PHANDLE, # message
461+                                self.processHandleKey, # wParam
462+                                0 # lParam
463+                                ) == 0:
464+                raise Exception("Failed to post thread message!")
465+        else:
466+            # Create a new thread and wait on the proc handle
467+            self.wfmoThread = threading.Thread(
468+                target=self.doWaitForProcessExit,
469+                name="iocpreactor.process.Process.waitForProcessExit pid=%d" % self.realPid)
470+            # Create a window creation event that will be triggered from the thread
471+            self.threadToMsgWindowCreationEvent[self.wfmoThread] = CreateEvent(None, 0, 0, None)
472+            self.threadToMsgWindowCreated[self.wfmoThread] = False
473+            self.threadToNumProcessHandles[self.wfmoThread] = 1
474+            self.availableThreads.append(self.wfmoThread)
475+            self.phandleKeyToThreadHandle[self.processHandleKey] = self.wfmoThread
476+            self.wfmoThread.setDaemon(True)
477+            self.wfmoThread.start()
478+   
479+    def doWaitForProcessExit(self):       
480+        # Create a hidden window that will receive messages for things
481+        # like adding new handles to wait on or quitting the thread.
482+        # I use the Button class because I'm too lazy to register my own.
483+        theWindow = win32gui.CreateWindow("Button", # lpClassName
484+                                          "",       # lpWindowName
485+                                          0,        # dwStyle
486+                                          0,        # x
487+                                          0,        # y
488+                                          0,        # width
489+                                          0,        # height
490+                                          0,        # parent
491+                                          0,        # menu
492+                                          0,        # hInstance
493+                                          None      # lParam
494+                                          )
495+        # list of process handles to wait for                     
496+        handles = []
497+        # First time through add first process handle to list
498+        handles.append(self.needWaiting[self.processHandleKey])
499+        # Save window so IO thread can wake us up with it
500+        threadHandle = self.phandleKeyToThreadHandle[self.processHandleKey]
501+        self.threadToMsgWindow[threadHandle] = theWindow
502+        self.threadToNumEnded[self.wfmoThread] = 0
503+
504+        # Signal an event so IO thread knows that window
505+        # is successfully created so it can call PostMessage.
506+        # Note that this line is intentionally placed after setting
507+        # threadToMsgWindow so that we don't attempt to lookup a msg window
508+        # in the IO thread before defining it here.
509+        self.threadToMsgWindowCreated[self.wfmoThread] = True
510+        SetEvent(self.threadToMsgWindowCreationEvent[self.wfmoThread])
511+       
512+        while True:
513+            val = MsgWaitForMultipleObjects(handles, 0, INFINITE, QS_POSTMESSAGE | QS_ALLEVENTS)
514+            if val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles):
515+                phandle = handles[val - WAIT_OBJECT_0]
516+                # Remove process handle from wait list
517+                handles.remove(phandle)
518+                # Tell transport process ended
519+                transport = self.phandleToTransport[phandle]
520+                self.reactor.callFromThread(transport.processEnded)
521+            elif val == WAIT_OBJECT_0 + len(handles):
522+                # We were interrupted by the IO thread calling PostMessage
523+                status, msg = win32gui.PeekMessage(theWindow,
524+                                                   0,
525+                                                   0,
526+                                                   win32con.PM_REMOVE)
527+                while status != 0:
528+                    if msg[1] == WM_NEW_PHANDLE:
529+                        # Add a process handle to wait list
530+                        phandleKey = msg[2]
531+                        handles.append(self.needWaiting[phandleKey])
532+                    elif msg[1] == WM_CLOSE_THREAD:
533+                        # Break out of while loop so thread will exit
534+                        break
535+                    else:
536+                        # Drop all other messages, since we receive all messages, not
537+                        # just WM_NEW_PHANDLE and WM_CLOSE_THREAD.
538+                        pass
539+                   
540+                    status, msg = win32gui.PeekMessage(theWindow,
541+                                                       0,
542+                                                       0,
543+                                                       win32con.PM_REMOVE)
544+            else:
545+                raise Exception("MsgWaitForMultipleObjects returned unknown value: %s" % str(val))
546+
547+    def signalProcess(self, signalID):
548+        if signalID in ("INT", "TERM", "KILL"):
549+            win32process.TerminateProcess(self.hProcess, 1)
550+
551+    def startWriting(self):
552+        if not self.writing:
553+            self.writing = True
554+            b = buffer(self.writeBuffer, self.offset, self.offset + self.bufferSize)
555+            self.writeInOp.initiateOp(self.hChildStdinWr, b)
556+
557+    def stopWriting(self):
558+        self.writing = False
559+
560+    def writeDone(self, bytes):
561+        self.writing = False
562+        self.offset += bytes
563+        self.writeBufferedSize -= bytes
564+        if self.offset == len(self.writeBuffer):
565+            self.writeBuffer = ""
566+            self.offset = 0
567+        if self.writeBuffer == "":
568+            self.writing = False
569+            # If there's nothing else to write and we're closing,
570+            # do it now.
571+            if self.closingStdin:
572+                self._closeStdin()
573+                self.connectionLostNotify()
574+        else:
575+            self.startWriting()
576+           
577+    def write(self, data):
578+        """Write data to the process' stdin."""
579+        self.writeBuffer += data
580+        self.writeBufferedSize += len(data)
581+        if not self.writing:
582+            self.startWriting()
583+
584+    def writeSequence(self, seq):
585+        """Write a list of strings to the physical connection.
586+
587+        If possible, make sure that all of the data is written to
588+        the socket at once, without first copying it all into a
589+        single string.
590+        """
591+        self.write("".join(seq))
592+
593+    def closeStdin(self):
594+        """Close the process' stdin."""
595+        if not self.closingStdin:
596+            self.closingStdin = True
597+            if not self.writing:
598+                self._closeStdin()
599+                self.connectionLostNotify()
600+
601+    def _closeStdin(self):
602+        if hasattr(self, "hChildStdinWr"):
603+            win32file.CloseHandle(self.hChildStdinWr)
604+            del self.hChildStdinWr
605+            self.closingStdin = False
606+            self.closedStdin = True
607+
608+    def closeStderr(self):
609+        if hasattr(self, "hChildStderrRd"):
610+            win32file.CloseHandle(self.hChildStderrRd)
611+            del self.hChildStderrRd
612+            self.closedStderr = True
613+            self.connectionLostNotify()
614+
615+    def closeStdout(self):
616+        if hasattr(self, "hChildStdoutRd"):
617+            win32file.CloseHandle(self.hChildStdoutRd)
618+            del self.hChildStdoutRd
619+            self.closedStdout = True
620+            self.connectionLostNotify()
621+
622+    def loseConnection(self):
623+        """Close the process' stdout, in and err."""
624+        self.closeStdin()
625+        self.closeStdout()
626+        self.closeStderr()
627+
628+    def outConnectionLost(self):
629+        self.closeStdout() # in case process closed it, not us
630+        self.protocol.outConnectionLost()
631+
632+    def errConnectionLost(self):
633+        self.closeStderr() # in case process closed it
634+        self.protocol.errConnectionLost()
635+
636+    def inConnectionLost(self):
637+        self._closeStdin()
638+        self.protocol.inConnectionLost()
639+        self.connectionLostNotify()
640+
641+    def connectionLostNotify(self):
642+        """Will be called 3 times, for stdout/err/in."""
643+        self.closedNotifies = self.closedNotifies + 1
644+        if self.closedNotifies == 3:
645+            self.closed = 1
646+            if self.exited:
647+                self.connectionLost()
648+       
649+    def processEnded(self):
650+        self.threadToNumEnded[self.wfmoThread] += 1
651+        # Decrement proc handle count for thread
652+        self.threadToNumProcessHandles[self.wfmoThread] -= 1
653+        # If we go from 63 to 62 phandles for the thread, mark it available.
654+        if self.threadToNumProcessHandles[self.wfmoThread] == 62:
655+            self.availableThreads.append(self.wfmoThread)
656+            self.usedThreads.remove(self.wfmoThread)
657+        # If we go to 0 phandles, end the thread
658+        elif self.threadToNumProcessHandles[self.wfmoThread] == 0:
659+            # Mark thread as unavailable
660+            self.availableThreads.remove(self.wfmoThread)
661+            # Notify the thread that it should exit.
662+            if not self.threadToMsgWindowCreated[self.wfmoThread]:
663+                val = WaitForSingleObject(self.threadToMsgWindowCreationEvent[self.wfmoThread], INFINITE)
664+                if val != WAIT_OBJECT_0:
665+                    raise RuntimeError("WaitForSingleObject returned %d.  It should only return %d" % (val, WAIT_OBJECT_0))
666+            # Notify the thread that it should wait on the process handle.
667+            win32api.PostMessage(
668+                                self.threadToMsgWindow[self.wfmoThread], # thread id
669+                                WM_CLOSE_THREAD, # message
670+                                self.processHandleKey, # wParam
671+                                0 # lParam
672+                                )
673+           
674+            # Cleanup thread resources
675+            del self.threadToNumProcessHandles[self.wfmoThread]
676+            del self.threadToMsgWindowCreated[self.wfmoThread]
677+            del self.wfmoThread
678+       
679+        # Cleanup process handle resources
680+        del self.needWaiting[self.processHandleKey]
681+        del self.phandleToTransport[self.hProcess]
682+       
683+        self.exited = True   
684+        # If all 3 stdio handles are closed, call connectionLost
685+        if self.closed:
686+            self.connectionLost()
687+           
688+    def connectionLost(self, reason=None):
689+        """Shut down resources."""
690+        # Get the exit status and notify the protocol
691+        exitCode = win32process.GetExitCodeProcess(self.hProcess)
692+        if exitCode == 0:
693+            err = error.ProcessDone(exitCode)
694+        else:
695+            err = error.ProcessTerminated(exitCode)
696+        self.protocol.processEnded(failure.Failure(err))
697+
698+
699+components.backwardsCompatImplements(Process)
700Index: twisted/internet/iocpreactor/ops.py
701===================================================================
702--- twisted/internet/iocpreactor/ops.py (revision 14502)
703+++ twisted/internet/iocpreactor/ops.py (working copy)
704@@ -128,3 +128,70 @@
705         if res:
706             raise error.getConnectError((res, os.strerror(res)))
707 
708+## Define custom xxxOp classes to handle IO operations related
709+## to stdout/err/in for the process transport.
710+class ReadOutOp(OverlappedOp):
711+    def ovDone(self, ret, bytes, (handle, buffer)):
712+        if ret or not bytes:
713+            self.transport.outConnectionLost()
714+        else:
715+            self.transport.protocol.outReceived(buffer[:bytes])
716+            # Keep reading
717+            try:
718+                self.initiateOp(handle, buffer)
719+            except WindowsError, e:
720+                if e.strerror == "The pipe has been ended":
721+                    self.transport.outConnectionLost()
722+                else:
723+                    raise e
724+
725+    def initiateOp(self, handle, buffer):
726+        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
727+
728+class ReadErrOp(OverlappedOp):
729+    def ovDone(self, ret, bytes, (handle, buffer)):
730+        if ret or not bytes:
731+            self.transport.errConnectionLost()
732+        else:
733+            self.transport.protocol.errReceived(buffer[:bytes])
734+            # Keep reading
735+            try:
736+                self.initiateOp(handle, buffer)
737+            except WindowsError, e:
738+                if e.strerror == "The pipe has been ended":
739+                    self.transport.errConnectionLost()
740+                else:
741+                    raise e
742+
743+    def initiateOp(self, handle, buffer):
744+        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
745+
746+class WriteInOp(OverlappedOp):
747+    def ovDone(self, ret, bytes, (handle, buffer)):
748+        if ret or not bytes:
749+            self.transport.inConnectionLost()
750+        else:
751+            self.transport.writeDone(bytes)
752+
753+    def initiateOp(self, handle, buffer):
754+        self.reactor.issueWriteFile(handle, buffer, self.ovDone, (handle, buffer))
755+
756+class ReadInOp(OverlappedOp):
757+    """Stdin pipe will be opened in duplex mode.  The parent will read
758+    stdin to detect when the child closes it so we can close our end.
759+    """
760+    def ovDone(self, ret, bytes, (handle, buffer)):
761+        if ret or not bytes:
762+            self.transport.inConnectionLost()
763+        else:
764+            # Keep reading
765+            try:
766+                self.initiateOp(handle, buffer)
767+            except WindowsError, e:
768+                if e.strerror == "The pipe has been ended":
769+                    self.transport.inConnectionLost()
770+                else:
771+                    raise e
772+                   
773+    def initiateOp(self, handle, buffer):
774+        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))