Ticket #1008: the_patch3.txt

File the_patch3.txt, 33.5 KB (added by justinj, 15 years ago)
Line 
1Index: twisted/test/process_tester.py
2===================================================================
3--- twisted/test/process_tester.py      (revision 14622)
4+++ twisted/test/process_tester.py      (working copy)
5@@ -2,7 +2,8 @@
6 
7 import sys, os
8 
9-test_file = "process_test.log"
10+test_file_match = "process_test.log.*"
11+test_file = "process_test.log.%d" % os.getpid()
12 
13 def main():
14     f = open(test_file, 'wb')
15Index: twisted/test/test_process.py
16===================================================================
17--- twisted/test/test_process.py        (revision 14622)
18+++ twisted/test/test_process.py        (working copy)
19@@ -36,7 +36,6 @@
20         self.finished = 1
21         self.reason = reason
22 
23-
24 class TestProcessProtocol(protocol.ProcessProtocol):
25 
26     finished = 0
27@@ -164,6 +163,16 @@
28             log.msg("Uninstalled SIGCHLD signal handler.")
29             signal.signal(signal.SIGCHLD, self.sigchldHandler)
30 
31+class TestManyProcessProtocol(TestProcessProtocol):
32+    def __init__(self):
33+        self.deferred = defer.Deferred()
34+
35+    def processEnded(self, reason):
36+        TestProcessProtocol.processEnded(self, reason)
37+        if reason.check(error.ProcessDone):
38+            self.deferred.callback(None)
39+        else:
40+            self.deferred.errback(reason)
41 
42 class ProcessTestCase(SignalMixin, unittest.TestCase):
43     """Test running a process."""
44@@ -186,11 +195,39 @@
45         #self.assertEquals(f.value.signal, None)
46 
47         try:
48-            import process_tester
49-            os.remove(process_tester.test_file)
50+            import process_tester, glob
51+            for f in glob.glob(process_tester.test_file_match):
52+                os.remove(f)
53         except:
54             pass
55 
56+    def testManyProcesses(self):
57+       
58+        def _check(results, protocols):
59+            for p in protocols:
60+                self.failUnless(p.finished)
61+                self.assertEquals(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages)))
62+                # test status code
63+                f = p.reason
64+                f.trap(error.ProcessTerminated)
65+                self.assertEquals(f.value.exitCode, 23)
66+       
67+        exe = sys.executable
68+        scriptPath = util.sibpath(__file__, "process_tester.py")
69+        args = [exe, "-u", scriptPath]
70+        protocols = []
71+        deferreds = []
72+                   
73+        for i in xrange(50):
74+            p = TestManyProcessProtocol()
75+            protocols.append(p)
76+            reactor.spawnProcess(p, exe, args, env=None)
77+            deferreds.append(p.deferred)
78+       
79+        deferredList = defer.DeferredList(deferreds, consumeErrors=True)
80+        deferredList.addCallback(_check, protocols)
81+        return deferredList
82+           
83     def testEcho(self):
84         finished = defer.Deferred()
85         p = EchoProtocol(finished)
86@@ -215,14 +252,14 @@
87         pyExe = sys.executable
88         scriptPath = util.sibpath(__file__, "process_cmdline.py")
89         p = Accumulator()
90-        reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath]+args, env=None,
91+        reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath]+args, env=None ,
92                              path=None)
93 
94         spinUntil(lambda :p.closed)
95         self.assertEquals(p.errF.getvalue(), "")
96         recvdArgs = p.outF.getvalue().splitlines()
97         self.assertEquals(recvdArgs, args)
98-       
99+
100     testEcho.timeout = 60
101 
102 class TwoProcessProtocol(protocol.ProcessProtocol):
103@@ -740,8 +777,9 @@
104         """ProcessProtocol.transport.closeStdout actually closes the pipe."""
105         d = self.doit(1)
106         def _check(errput):
107-            unittest.failIfEqual(errput.index('OSError'), -1)
108-            unittest.failIfEqual(errput.index('Broken pipe'), -1)
109+            unittest.failIfEqual(errput.find('OSError'), -1)
110+            if runtime.platform.getType() != 'win32':
111+                unittest.failIfEqual(errput.find('Broken pipe'), -1)
112         d.addCallback(_check)
113         return d
114 
115Index: twisted/internet/iocpreactor/proactor.py
116===================================================================
117--- twisted/internet/iocpreactor/proactor.py    (revision 14622)
118+++ twisted/internet/iocpreactor/proactor.py    (working copy)
119@@ -3,7 +3,8 @@
120 
121 
122 from twisted.internet import defer, base, main
123-from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorArbitrary
124+from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorArbitrary, IReactorProcess
125+from twisted.internet.iocpreactor import process
126 from twisted.python import threadable, log
127 from zope.interface import implements, implementsOnly
128 
129@@ -14,7 +15,7 @@
130     # TODO: IReactorArbitrary, IReactorUDP, IReactorMulticast,
131     # IReactorSSL (or leave it until exarkun finishes TLS)
132     # IReactorProcess, IReactorCore (cleanup)
133-    implementsOnly(IReactorTCP, IReactorUDP, IReactorArbitrary)
134+    implementsOnly(IReactorTCP, IReactorUDP, IReactorArbitrary, IReactorProcess)
135     handles = None
136     iocp = None
137 
138@@ -92,6 +93,11 @@
139         c = connectorType(*args, **kw)
140         c.connect()
141         return c
142+           
143+    def spawnProcess(self, processProtocol, executable, args=(), env={}, path=None, usePTY=0):
144+        """Spawn a process."""
145+        return process.Process(self, processProtocol, executable, args, env, path)
146+       
147 
148 def install():
149     from twisted.python import threadable
150Index: twisted/internet/iocpreactor/process.py
151===================================================================
152--- twisted/internet/iocpreactor/process.py     (revision 0)
153+++ twisted/internet/iocpreactor/process.py     (revision 0)
154@@ -0,0 +1,542 @@
155+# Win32 imports
156+import win32api
157+import win32gui
158+import win32con
159+import win32file
160+import win32pipe
161+import win32process
162+import win32security
163+from win32event import CreateEvent, SetEvent, WaitForSingleObject, MsgWaitForMultipleObjects, \
164+                       WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE, QS_ALLINPUT, QS_POSTMESSAGE, \
165+                       QS_ALLEVENTS
166+
167+# Zope & Twisted imports
168+from zope.interface import implements
169+from twisted.internet import error
170+from twisted.python import failure, components
171+from twisted.internet.interfaces import IProcessTransport
172+
173+# sibling imports
174+import ops
175+
176+# System imports
177+import os
178+import sys
179+import re
180+import time
181+import threading
182+import itertools
183+
184+# Counter for uniquely identifying pipes
185+counter = itertools.count(1)
186+
187+# Message ID must be greater than WM_USER or the system will do
188+# marshalling automatically.
189+#WM_NEW_PHANDLE = win32con.WM_USER + 1
190+#WM_CLOSE_THREAD = win32con.WM_USER + 2
191+WM_NEW_PHANDLE = win32con.WM_APP + 1
192+WM_CLOSE_THREAD = win32con.WM_APP + 2
193+
194+_cmdLineQuoteRe = re.compile(r'(\\*)"')
195+def _cmdLineQuote(s):
196+    return '"' + _cmdLineQuoteRe.sub(r'\1\1\\"', s) + '"'
197+
198+
199+
200+class Process(object):
201+    """A process that integrates with the Twisted event loop.
202+
203+    See http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dllproc/base/creating_a_child_process_with_redirected_input_and_output.asp
204+    for more info on how to create processes in Windows and access their
205+    stdout/err/in.  Another good source is http://www.informit.com/articles/article.asp?p=362660&seqNum=2.
206+   
207+    Issues:
208+
209+    If your subprocess is a python program, you need to:
210+
211+     - Run python.exe with the '-u' command line option - this turns on
212+       unbuffered I/O. Buffering stdout/err/in can cause problems, see e.g.
213+       http://support.microsoft.com/default.aspx?scid=kb;EN-US;q1903
214+
215+     - (is this still true?) If you don't want Windows messing with data passed over
216+       stdin/out/err, set the pipes to be in binary mode::
217+
218+        import os, sys, mscvrt
219+        msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
220+        msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
221+        msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
222+
223+    """
224+    implements(IProcessTransport)
225+   
226+    # I used this size because abstract.ConnectedSocket did.  I don't
227+    # know why though.
228+    bufferSize = 2**2**2**2
229+    # Per http://www-128.ibm.com/developerworks/linux/library/l-rt4/,
230+    # an extra 24 bytes are needed to handle write header.  I haven't seen
231+    # any problems not having the extra 24 bytes though, so I'm not
232+    # adding it to the size.  I comment here just in case it is discovered
233+    # to be necessary down the road.
234+    pipeBufferSize = bufferSize
235+   
236+    usedThreads = []  # threads waiting on 63 process handles
237+    availableThreads = [] # threads waiting on less than 63 process handles
238+
239+    threadToNumProcessHandles = {} # to track when a thread can handle more process handles
240+    threadToMsgWindowCreationEvent = {} # event signalled when msg queue created
241+    threadToMsgWindowCreated = {} # boolean indicated msg queue is created
242+    needWaiting = {} # used to pass process handles to new WaitForMultipleObjects thread
243+    phandleToTransport = {} # so we can call methods on the transport when a proc handle is signalled
244+    threadToMsgWindow = {} # since we need the window to call PostMessage
245+    phandleKeyToThreadHandle = {} # proc handle keys passed to PostThreadMessage to tell thread to wait on new proc handle
246+    threadToNumEnded = {}
247+
248+    def __init__(self, reactor, protocol, command, args, environment, path):
249+        self.reactor = reactor
250+        self.protocol = protocol
251+        self.outBuffer = reactor.AllocateReadBuffer(self.bufferSize)
252+        self.errBuffer = reactor.AllocateReadBuffer(self.bufferSize)
253+        # This is the buffer for *reading* stdin, which is only done to
254+        # determine if the other end of the pipe was closed.
255+        self.inBuffer = reactor.AllocateReadBuffer(self.bufferSize)
256+        # IO operation classes
257+        self.readOutOp = ops.ReadOutOp(self)
258+        self.readErrOp = ops.ReadErrOp(self)
259+        self.readInOp = ops.ReadInOp(self)
260+        self.writeInOp = ops.WriteInOp(self)
261+       
262+        self.writeBuffer = ""
263+        self.writing = False
264+        self.finished = False
265+        self.offset = 0
266+        self.writeBufferedSize = 0
267+        self.closingStdin = False
268+        self.closedStdin = False
269+        self.closedStdout = False
270+        self.closedStderr = False
271+        # Stdio handles
272+        self.hChildStdinRd = None
273+        self.hChildStdinWr = None
274+        self.hChildStdinWrDup = None
275+        self.hChildStdoutRd = None
276+        self.hChildStdoutWr = None
277+        self.hChildStdoutRdDup = None
278+        self.hChildStderrRd = None
279+        self.hChildStderrWr = None
280+        self.hChildStderrRdDup = None
281+        # handle of thread calling WaitForMultipleObjects on this process'es handle
282+        self.wfmoThread = None
283+       
284+        self.closedNotifies = 0  # increments to 3 (for stdin, stdout, stderr)
285+        self.closed = False # set to true when all 3 handles close
286+        self.exited = False # set to true when WFMO thread gets signalled proc handle.  See doWaitForProcessExit.
287+
288+        # Set the bInheritHandle flag so pipe handles are inherited.
289+        saAttr = win32security.SECURITY_ATTRIBUTES()
290+        saAttr.bInheritHandle = 1
291+       
292+        pid = win32api.GetCurrentProcess() # -1 which stands for current process
293+        self.realPid = os.getpid() # unique pid for pipe naming
294+       
295+        # Create a pipe for the child process's STDOUT.
296+        self.stdoutPipeName = r"\\.\pipe\twisted-iocp-stdout-%d-%d-%d" % (self.realPid, counter.next(), time.time())
297+        self.hChildStdoutRd = \
298+            win32pipe.CreateNamedPipe(self.stdoutPipeName,
299+                                      win32con.PIPE_ACCESS_INBOUND | win32con.FILE_FLAG_OVERLAPPED, # open mode
300+                                      win32con.PIPE_TYPE_BYTE, # pipe mode
301+                                      1, # max instances
302+                                      self.pipeBufferSize, # out buffer size
303+                                      self.pipeBufferSize, # in buffer size
304+                                      0, # timeout
305+                                      saAttr)
306+
307+        self.hChildStdoutWr = win32file.CreateFile(self.stdoutPipeName,
308+                                         win32con.GENERIC_WRITE,
309+                                         win32con.FILE_SHARE_READ|win32con.FILE_SHARE_WRITE,
310+                                         saAttr,
311+                                         win32con.OPEN_EXISTING,
312+                                         win32con.FILE_FLAG_OVERLAPPED,
313+                                         0);
314+
315+        # Create noninheritable read handle and close the inheritable read
316+        # handle.
317+        self.hChildStdoutRdDup = win32api.DuplicateHandle(
318+                                     pid, self.hChildStdoutRd,
319+                                     pid, 0,
320+                                     0,
321+                                     win32con.DUPLICATE_SAME_ACCESS)
322+        win32api.CloseHandle(self.hChildStdoutRd);
323+        self.hChildStdoutRd = self.hChildStdoutRdDup
324+       
325+        # Create a pipe for the child process's STDERR.
326+        self.stderrPipeName = r"\\.\pipe\twisted-iocp-stderr-%d-%d-%d" % (self.realPid, counter.next(), time.time())
327+        self.hChildStderrRd = \
328+            win32pipe.CreateNamedPipe(self.stderrPipeName,
329+                                      win32con.PIPE_ACCESS_INBOUND | win32con.FILE_FLAG_OVERLAPPED, # open mode
330+                                      win32con.PIPE_TYPE_BYTE, # pipe mode
331+                                      1, # max instances
332+                                      self.pipeBufferSize, # out buffer size
333+                                      self.pipeBufferSize, # in buffer size
334+                                      0, # timeout
335+                                      saAttr)
336+        self.hChildStderrWr = win32file.CreateFile(self.stderrPipeName,
337+                                         win32con.GENERIC_WRITE,
338+                                         win32con.FILE_SHARE_READ|win32con.FILE_SHARE_WRITE,
339+                                         saAttr,
340+                                         win32con.OPEN_EXISTING,
341+                                         win32con.FILE_FLAG_OVERLAPPED,
342+                                         0);
343+
344+        # Create noninheritable read handle and close the inheritable read
345+        # handle.
346+        self.hChildStderrRdDup = win32api.DuplicateHandle(
347+                                     pid, self.hChildStderrRd,
348+                                     pid, 0,
349+                                     0,
350+                                     win32con.DUPLICATE_SAME_ACCESS)
351+        win32api.CloseHandle(self.hChildStderrRd)
352+        self.hChildStderrRd = self.hChildStderrRdDup
353+       
354+       
355+        # Create a pipe for the child process's STDIN. This one is opened
356+        # in duplex mode so we can read from it too in order to detect when
357+        # the child closes their end of the pipe.
358+        self.stdinPipeName = r"\\.\pipe\twisted-iocp-stdin-%d-%d-%d" % (self.realPid, counter.next(), time.time())
359+        self.hChildStdinWr = \
360+            win32pipe.CreateNamedPipe(self.stdinPipeName,
361+                                      win32con.PIPE_ACCESS_DUPLEX | win32con.FILE_FLAG_OVERLAPPED, # open mode
362+                                      win32con.PIPE_TYPE_BYTE, # pipe mode
363+                                      1, # max instances
364+                                      self.pipeBufferSize, # out buffer size
365+                                      self.pipeBufferSize, # in buffer size
366+                                      0, # timeout
367+                                      saAttr)
368+
369+        self.hChildStdinRd = win32file.CreateFile(self.stdinPipeName,
370+                                         win32con.GENERIC_READ,
371+                                         win32con.FILE_SHARE_READ|win32con.FILE_SHARE_WRITE,
372+                                         saAttr,
373+                                         win32con.OPEN_EXISTING,
374+                                         win32con.FILE_FLAG_OVERLAPPED,
375+                                         0);
376+       
377+       
378+        # Duplicate the write handle to the pipe so it is not inherited.
379+        self.hChildStdinWrDup = win32api.DuplicateHandle(
380+                                    pid, self.hChildStdinWr,
381+                                    pid, 0,
382+                                    0,
383+                                    win32con.DUPLICATE_SAME_ACCESS)
384+        win32api.CloseHandle(self.hChildStdinWr)
385+        self.hChildStdinWr = self.hChildStdinWrDup
386+       
387+        # set the info structure for the new process.  This is where
388+        # we tell the process to use the pipes for stdout/err/in.
389+        StartupInfo = win32process.STARTUPINFO()
390+        StartupInfo.hStdOutput = self.hChildStdoutWr
391+        StartupInfo.hStdError  = self.hChildStderrWr
392+        StartupInfo.hStdInput  = self.hChildStdinRd
393+        StartupInfo.dwFlags = win32process.STARTF_USESTDHANDLES
394+       
395+        # create the process
396+        cmdline = ' '.join([_cmdLineQuote(a) for a in args])
397+        self.hProcess, hThread, dwPid, dwTid = \
398+            win32process.CreateProcess(command,     # name
399+                                       cmdline,     # command line
400+                                       None,        # process security attributes
401+                                       None,        # primary thread security attributes
402+                                       1,           # handles are inherited
403+                                       0,           # creation flags
404+                                       environment, # if NULL, use parent environment
405+                                       path,        # current directory
406+                                       StartupInfo) # STARTUPINFO pointer
407+
408+       
409+
410+        # close handles which only the child will use
411+        win32file.CloseHandle(self.hChildStderrWr)
412+        win32file.CloseHandle(self.hChildStdoutWr)
413+        win32file.CloseHandle(self.hChildStdinRd)
414+       
415+        # Begin reading on stdout and stderr, before we have output on them.
416+        self.readOutOp.initiateOp(self.hChildStdoutRd, self.outBuffer)
417+        self.readErrOp.initiateOp(self.hChildStderrRd, self.errBuffer)
418+        # Read stdin which was opened in duplex mode so we can detect when
419+        # the child closed their end of the pipe.
420+        self.readInOp.initiateOp(self.hChildStdinWr, self.inBuffer)
421+
422+        # When the process is done, call connectionLost().
423+        # This function returns right away.  Note I call this after
424+        # protocol.makeConnection to ensure that the protocol doesn't
425+        # have processEnded called before protocol.makeConnection.
426+        self.notifyOnExit()
427+       
428+        # notify protocol by calling protocol.makeConnection and specifying
429+        # ourself as the transport.
430+        self.protocol.makeConnection(self)
431+       
432+       
433+       
434+    def notifyOnExit(self):
435+        # Pass the process handle to the thread via a key into a global dict
436+        self.processHandleKey = counter.next()
437+        self.needWaiting[self.processHandleKey] = self.hProcess
438+        self.phandleToTransport[self.hProcess] = self
439+           
440+        # If there are available threads, use one of them
441+        if len(self.availableThreads) > 0:
442+            self.wfmoThread = self.availableThreads[0]
443+            self.threadToNumProcessHandles[self.wfmoThread] += 1
444+            # Update used/available thread lists
445+            if self.threadToNumProcessHandles[self.wfmoThread] == 63:
446+                self.usedThreads.append(self.wfmoThread)
447+                self.availableThreads.remove(self.wfmoThread)
448+           
449+            if self.threadToMsgWindowCreated[self.wfmoThread] is False:
450+                # Make sure the window has been created so we can send messages to it
451+                val = WaitForSingleObject(self.threadToMsgWindowCreationEvent[self.wfmoThread], INFINITE)
452+                if val != WAIT_OBJECT_0:
453+                    raise RuntimeError("WaitForSingleObject returned %d.  It should only return %d" % (val, WAIT_OBJECT_0))
454+            # Notify the thread that it should wait on the process handle.
455+            if win32api.PostMessage(
456+                                self.threadToMsgWindow[self.wfmoThread],
457+                                WM_NEW_PHANDLE, # message
458+                                self.processHandleKey, # wParam
459+                                0 # lParam
460+                                ) == 0:
461+                raise Exception("Failed to post thread message!")
462+        else:
463+            # Create a new thread and wait on the proc handle
464+            self.wfmoThread = threading.Thread(
465+                target=self.doWaitForProcessExit,
466+                name="iocpreactor.process.Process.waitForProcessExit pid=%d" % self.realPid)
467+            # Create a window creation event that will be triggered from the thread
468+            self.threadToMsgWindowCreationEvent[self.wfmoThread] = CreateEvent(None, 0, 0, None)
469+            self.threadToMsgWindowCreated[self.wfmoThread] = False
470+            self.threadToNumProcessHandles[self.wfmoThread] = 1
471+            self.availableThreads.append(self.wfmoThread)
472+            self.phandleKeyToThreadHandle[self.processHandleKey] = self.wfmoThread
473+            self.wfmoThread.setDaemon(True)
474+            self.wfmoThread.start()
475+   
476+    def doWaitForProcessExit(self):       
477+        # Create a hidden window that will receive messages for things
478+        # like adding new handles to wait on or quitting the thread.
479+        # I use the Button class because I'm too lazy to register my own.
480+        theWindow = win32gui.CreateWindow("Button", # lpClassName
481+                                          "",       # lpWindowName
482+                                          0,        # dwStyle
483+                                          0,        # x
484+                                          0,        # y
485+                                          0,        # width
486+                                          0,        # height
487+                                          0,        # parent
488+                                          0,        # menu
489+                                          0,        # hInstance
490+                                          None      # lParam
491+                                          )
492+        # list of process handles to wait for                     
493+        handles = []
494+        # First time through add first process handle to list
495+        handles.append(self.needWaiting[self.processHandleKey])
496+        # Save window so IO thread can wake us up with it
497+        threadHandle = self.phandleKeyToThreadHandle[self.processHandleKey]
498+        self.threadToMsgWindow[threadHandle] = theWindow
499+        self.threadToNumEnded[self.wfmoThread] = 0
500+
501+        # Signal an event so IO thread knows that window
502+        # is successfully created so it can call PostMessage.
503+        # Note that this line is intentionally placed after setting
504+        # threadToMsgWindow so that we don't attempt to lookup a msg window
505+        # in the IO thread before defining it here.
506+        self.threadToMsgWindowCreated[self.wfmoThread] = True
507+        SetEvent(self.threadToMsgWindowCreationEvent[self.wfmoThread])
508+       
509+        while True:
510+            val = MsgWaitForMultipleObjects(handles, 0, INFINITE, QS_POSTMESSAGE | QS_ALLEVENTS)
511+            if val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles):
512+                phandle = handles[val - WAIT_OBJECT_0]
513+                # Remove process handle from wait list
514+                handles.remove(phandle)
515+                # Tell transport process ended
516+                transport = self.phandleToTransport[phandle]
517+                self.reactor.callFromThread(transport.processEnded)
518+            elif val == WAIT_OBJECT_0 + len(handles):
519+                # We were interrupted by the IO thread calling PostMessage
520+                status, msg = win32gui.PeekMessage(theWindow,
521+                                                   0,
522+                                                   0,
523+                                                   win32con.PM_REMOVE)
524+                while status != 0:
525+                    if msg[1] == WM_NEW_PHANDLE:
526+                        # Add a process handle to wait list
527+                        phandleKey = msg[2]
528+                        handles.append(self.needWaiting[phandleKey])
529+                    elif msg[1] == WM_CLOSE_THREAD:
530+                        # Break out of while loop so thread will exit
531+                        break
532+                    else:
533+                        # Drop all other messages, since we receive all messages, not
534+                        # just WM_NEW_PHANDLE and WM_CLOSE_THREAD.
535+                        pass
536+                   
537+                    status, msg = win32gui.PeekMessage(theWindow,
538+                                                       0,
539+                                                       0,
540+                                                       win32con.PM_REMOVE)
541+            else:
542+                raise Exception("MsgWaitForMultipleObjects returned unknown value: %s" % str(val))
543+
544+    def signalProcess(self, signalID):
545+        if signalID in ("INT", "TERM", "KILL"):
546+            win32process.TerminateProcess(self.hProcess, 1)
547+
548+    def startWriting(self):
549+        if not self.writing:
550+            self.writing = True
551+            b = buffer(self.writeBuffer, self.offset, self.offset + self.bufferSize)
552+            self.writeInOp.initiateOp(self.hChildStdinWr, b)
553+
554+    def stopWriting(self):
555+        self.writing = False
556+
557+    def writeDone(self, bytes):
558+        self.writing = False
559+        self.offset += bytes
560+        self.writeBufferedSize -= bytes
561+        if self.offset == len(self.writeBuffer):
562+            self.writeBuffer = ""
563+            self.offset = 0
564+        if self.writeBuffer == "":
565+            self.writing = False
566+            # If there's nothing else to write and we're closing,
567+            # do it now.
568+            if self.closingStdin:
569+                self._closeStdin()
570+                self.connectionLostNotify()
571+        else:
572+            self.startWriting()
573+           
574+    def write(self, data):
575+        """Write data to the process' stdin."""
576+        self.writeBuffer += data
577+        self.writeBufferedSize += len(data)
578+        if not self.writing:
579+            self.startWriting()
580+
581+    def writeSequence(self, seq):
582+        """Write a list of strings to the physical connection.
583+
584+        If possible, make sure that all of the data is written to
585+        the socket at once, without first copying it all into a
586+        single string.
587+        """
588+        self.write("".join(seq))
589+
590+    def closeStdin(self):
591+        """Close the process' stdin."""
592+        if not self.closingStdin:
593+            self.closingStdin = True
594+            if not self.writing:
595+                self._closeStdin()
596+                self.connectionLostNotify()
597+
598+    def _closeStdin(self):
599+        if hasattr(self, "hChildStdinWr"):
600+            win32file.CloseHandle(self.hChildStdinWr)
601+            del self.hChildStdinWr
602+            self.closingStdin = False
603+            self.closedStdin = True
604+
605+    def closeStderr(self):
606+        if hasattr(self, "hChildStderrRd"):
607+            win32file.CloseHandle(self.hChildStderrRd)
608+            del self.hChildStderrRd
609+            self.closedStderr = True
610+            self.connectionLostNotify()
611+
612+    def closeStdout(self):
613+        if hasattr(self, "hChildStdoutRd"):
614+            win32file.CloseHandle(self.hChildStdoutRd)
615+            del self.hChildStdoutRd
616+            self.closedStdout = True
617+            self.connectionLostNotify()
618+
619+    def loseConnection(self):
620+        """Close the process' stdout, in and err."""
621+        self.closeStdin()
622+        self.closeStdout()
623+        self.closeStderr()
624+
625+    def outConnectionLost(self):
626+        self.closeStdout() # in case process closed it, not us
627+        self.protocol.outConnectionLost()
628+
629+    def errConnectionLost(self):
630+        self.closeStderr() # in case process closed it
631+        self.protocol.errConnectionLost()
632+
633+    def inConnectionLost(self):
634+        self._closeStdin()
635+        self.protocol.inConnectionLost()
636+        self.connectionLostNotify()
637+
638+    def connectionLostNotify(self):
639+        """Will be called 3 times, for stdout/err/in."""
640+        self.closedNotifies = self.closedNotifies + 1
641+        if self.closedNotifies == 3:
642+            self.closed = 1
643+            if self.exited:
644+                self.connectionLost()
645+       
646+    def processEnded(self):
647+        self.threadToNumEnded[self.wfmoThread] += 1
648+        # Decrement proc handle count for thread
649+        self.threadToNumProcessHandles[self.wfmoThread] -= 1
650+        # If we go from 63 to 62 phandles for the thread, mark it available.
651+        if self.threadToNumProcessHandles[self.wfmoThread] == 62:
652+            self.availableThreads.append(self.wfmoThread)
653+            self.usedThreads.remove(self.wfmoThread)
654+        # If we go to 0 phandles, end the thread
655+        elif self.threadToNumProcessHandles[self.wfmoThread] == 0:
656+            # Mark thread as unavailable
657+            self.availableThreads.remove(self.wfmoThread)
658+            # Notify the thread that it should exit.
659+            if not self.threadToMsgWindowCreated[self.wfmoThread]:
660+                val = WaitForSingleObject(self.threadToMsgWindowCreationEvent[self.wfmoThread], INFINITE)
661+                if val != WAIT_OBJECT_0:
662+                    raise RuntimeError("WaitForSingleObject returned %d.  It should only return %d" % (val, WAIT_OBJECT_0))
663+            # Notify the thread that it should wait on the process handle.
664+            win32api.PostMessage(
665+                                self.threadToMsgWindow[self.wfmoThread], # thread id
666+                                WM_CLOSE_THREAD, # message
667+                                self.processHandleKey, # wParam
668+                                0 # lParam
669+                                )
670+           
671+            # Cleanup thread resources
672+            del self.threadToNumProcessHandles[self.wfmoThread]
673+            del self.threadToMsgWindowCreated[self.wfmoThread]
674+            del self.wfmoThread
675+       
676+        # Cleanup process handle resources
677+        del self.needWaiting[self.processHandleKey]
678+        del self.phandleToTransport[self.hProcess]
679+       
680+        self.exited = True   
681+        # If all 3 stdio handles are closed, call connectionLost
682+        if self.closed:
683+            self.connectionLost()
684+           
685+    def connectionLost(self, reason=None):
686+        """Shut down resources."""
687+        # Get the exit status and notify the protocol
688+        exitCode = win32process.GetExitCodeProcess(self.hProcess)
689+        if exitCode == 0:
690+            err = error.ProcessDone(exitCode)
691+        else:
692+            err = error.ProcessTerminated(exitCode)
693+        self.protocol.processEnded(failure.Failure(err))
694+
695+
696+components.backwardsCompatImplements(Process)
697Index: twisted/internet/iocpreactor/ops.py
698===================================================================
699--- twisted/internet/iocpreactor/ops.py (revision 14622)
700+++ twisted/internet/iocpreactor/ops.py (working copy)
701@@ -128,3 +128,70 @@
702         if res:
703             raise error.getConnectError((res, os.strerror(res)))
704 
705+## Define custom xxxOp classes to handle IO operations related
706+## to stdout/err/in for the process transport.
707+class ReadOutOp(OverlappedOp):
708+    def ovDone(self, ret, bytes, (handle, buffer)):
709+        if ret or not bytes:
710+            self.transport.outConnectionLost()
711+        else:
712+            self.transport.protocol.outReceived(buffer[:bytes])
713+            # Keep reading
714+            try:
715+                self.initiateOp(handle, buffer)
716+            except WindowsError, e:
717+                if e.strerror == "The pipe has been ended":
718+                    self.transport.outConnectionLost()
719+                else:
720+                    raise e
721+
722+    def initiateOp(self, handle, buffer):
723+        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
724+
725+class ReadErrOp(OverlappedOp):
726+    def ovDone(self, ret, bytes, (handle, buffer)):
727+        if ret or not bytes:
728+            self.transport.errConnectionLost()
729+        else:
730+            self.transport.protocol.errReceived(buffer[:bytes])
731+            # Keep reading
732+            try:
733+                self.initiateOp(handle, buffer)
734+            except WindowsError, e:
735+                if e.strerror == "The pipe has been ended":
736+                    self.transport.errConnectionLost()
737+                else:
738+                    raise e
739+
740+    def initiateOp(self, handle, buffer):
741+        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))
742+
743+class WriteInOp(OverlappedOp):
744+    def ovDone(self, ret, bytes, (handle, buffer)):
745+        if ret or not bytes:
746+            self.transport.inConnectionLost()
747+        else:
748+            self.transport.writeDone(bytes)
749+
750+    def initiateOp(self, handle, buffer):
751+        self.reactor.issueWriteFile(handle, buffer, self.ovDone, (handle, buffer))
752+
753+class ReadInOp(OverlappedOp):
754+    """Stdin pipe will be opened in duplex mode.  The parent will read
755+    stdin to detect when the child closes it so we can close our end.
756+    """
757+    def ovDone(self, ret, bytes, (handle, buffer)):
758+        if ret or not bytes:
759+            self.transport.inConnectionLost()
760+        else:
761+            # Keep reading
762+            try:
763+                self.initiateOp(handle, buffer)
764+            except WindowsError, e:
765+                if e.strerror == "The pipe has been ended":
766+                    self.transport.inConnectionLost()
767+                else:
768+                    raise e
769+                   
770+    def initiateOp(self, handle, buffer):
771+        self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))