<div><span class="gmail_quote">On 7/11/05, <b class="gmail_sendername">Pavel Pergamenshchik</b> &lt;<a href="mailto:pp64@codelock.com">pp64@codelock.com</a>&gt; wrote:</span>
<blockquote class="gmail_quote" style="PADDING-LEFT: 1ex; MARGIN: 0px 0px 0px 0.8ex; BORDER-LEFT: #ccc 1px solid">On Mon, 11 Jul 2005 10:52:14 -0500<br>Justin Johnson &lt;<a href="mailto:justinjohnson@gmail.com">justinjohnson@gmail.com
</a>&gt; wrote:<br><br>&gt; I am attempting to add spawnProcess to iocpreactor. In order to begin<br>this<br>&gt; task I've had to do a lot of reading on Windows network programming,<br>&gt; specifically the various Windows I/O methods, to attempt to understand
<br>what<br>&gt; win32eventreactor and iocpreactor are doing, and also just increase my<br><br>&gt; understanding of how reactors work in general. To understand the<br>various<br>&gt; Winsock 2 methods that both of these reactors rely upon, I read
<br>chapters 1-5<br>&gt; of Network Programming for Microsoft Windows[1].<br>&gt;&nbsp;&nbsp;Before actually attempting to add spawnProcess, I would like to<br>present how<br>&gt; I think iocpreactor works and how I think I should add spawnProcess,
<br>and<br>&gt; hopefully be corrected or confirmed in my understanding. If I'm too<br>vague<br>&gt; there's a good chance it's because I don't understand it very well.<br>Please<br>&gt; feel free to point out things that you might think are obvious but
<br>aren't<br>&gt; sure I understand.<br>&gt;&nbsp;&nbsp;How iocpreactor works<br>&gt; ---------------------------------<br>&gt;<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;1. Create an IO Completion Port.<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;2. Create a socket and associate it with the IOCP. This is the
<br>socket<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;we will call AcceptEx (a non-blocking accept) on. The association<br>with the<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;IOCP is made via CreateIoCompletionPort.<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;3. Setup any scheduled tasks on the reactor.<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;4. Call AcceptEx (which doesn't block) on the socket. AcceptEx
<br>takes<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;an overlapped structure as a parameter. Before making the call, we<br>set two<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;attributes of the struct: the callback and callback_args which will<br>be<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;called when an accept event completes on the socket. The Winsock 2
<br>methods<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;don't actually call the callback. The Winsock 2 methods handle<br>copying data<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;related to the network event that occurred on the socket into the<br>overlapped<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;structure and making that overlapped structure available to
<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;GetQueuedCompletionStatus. So when we handle events on sockets via<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;GetQueuedCompletionStatus from within doIteration, we have access<br>to the<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;data related to the event as well as the callback and callback_args
<br>we call<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;to handle that event. The callbacks are setup in the xxxOp classes<br>in<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;ops.py and always result in some transport method getting called<br>(such<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;as readDone, connectionDone, etc).
<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;5. From within doIteration, call GetQueuedCompletionStatus (which<br>does<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;block) with a timeout of the time until the next scheduled task<br>needs to be<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;run. If any event occurs on the sockets currently associated with
<br>the IOCP<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;before that time expires, GetQueuedCompletionStatus will return<br>(stop<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;blocking). Now we have access to the overlapped structure<br>containing data<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;associated with the event which was copied into the overlapped
<br>structure's<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;buffer, such as data received from WSARecv calls, as well as the<br>callback<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;and callback_args. From within doIteration we call the callbacks<br>passing in<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;the data related to the event. Depending on the events we are
<br>handling, we<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;may create new sockets (e.g. end point sockets in TCP connections)<br>and<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;associate them with the IOCP as well. All Winsock 2 API calls made<br>are<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;non-blocking accept for GetQueuedCompletionStatus.
<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;6. Step 5 continues until the reactor stops.<br><br>This sounds about right. Note how this is different from the usual<br>reactor thing -- iocp notifies you when the operation is _finished_,<br>not when it can success without blocking.
</blockquote>
<div>&nbsp;</div>
<div>Right.&nbsp; Understood.</div><br>
<blockquote class="gmail_quote" style="PADDING-LEFT: 1ex; MARGIN: 0px 0px 0px 0.8ex; BORDER-LEFT: #ccc 1px solid">&gt;&nbsp;&nbsp; How to add spawnProcess<br>&gt; ---------------------------------------<br>&gt;<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;1. Create the processes via Windows APIs and associate their
<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;stdout/err with with the IOCP via CreateIoCompletionPort calls.<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;2. Close stdin.<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;3. Notify the ProcessProtocol via protocol.makeConnection (not sure<br><br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;why, looking at win32eventreactor)
<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;4. Receive data from stdout/err via the completion port by calling<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;GetQueuedCompletionStatus from within doIteration. Is this really<br>possible?<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;ProcessProtocol's methods won't get called appropriately by letting
<br>the<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;existing callbacks in ops.py make calls to the transport (e.g.<br>&gt;&nbsp;&nbsp;&nbsp;&nbsp;connectionDone, readDone)?<br><br>Hrm. Not quite. In iocp, you always have a read call pending<br>(ReadFileEx, for stdout/err handles). When it completes, you get a
<br>notification in GetQueuedCompletionStatus, pass the data to your<br>Protocol and schedule the read again.<br>Do that for stdout and stderr.<br>ops.py already has a wrapper for ReadFile, but it always calls readDone<br>
and readErr on your transport. You'll need to fix that.</blockquote>
<div>&nbsp;</div>
<div>I think we're on the same page here.&nbsp; See my previous emails correcting my original idea on how this would work.&nbsp; At this point I've defined custom xxxOp classes as follows.</div>
<div><br>class ReadOutOp(OverlappedOp):<br>&nbsp;&nbsp;&nbsp; def ovDone(self, ret, bytes, (handle, buffer)):<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if ret or not bytes:<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; #self.transport.readErr(ret, bytes)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.transport.outConnectionLost
()<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; else:<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; #self.transport.readDone(bytes)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.transport.protocol.outReceived(bytes)</div>
<div>
<p>&nbsp;&nbsp;&nbsp; def initiateOp(self, handle, buffer):<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))</p>
<p>class ReadErrOp(OverlappedOp):<br>&nbsp;&nbsp;&nbsp; def ovDone(self, ret, bytes, (handle, buffer)):<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if ret or not bytes:<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; #self.transport.readErr(ret, bytes)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.transport.errConnectionLost()
<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; else:<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; #self.transport.readDone(bytes)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.transport.protocol.errReceived(bytes)</p>
<p>&nbsp;&nbsp;&nbsp; def initiateOp(self, handle, buffer):<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))</p>
<p>class WriteInOp(OverlappedOp):<br>&nbsp;&nbsp;&nbsp; def ovDone(self, ret, bytes, (handle, buffer)):<br>#&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; log.msg(&quot;WriteFileOp.ovDone&quot;, time.time())<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if ret or not bytes:<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; #self.transport.writeErr(ret, bytes)
<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.transport.inConnectionLost()<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; else:<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; #self.transport.writeDone(bytes)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pass</p>
<p>&nbsp;&nbsp;&nbsp; def initiateOp(self, handle, buffer):<br>#&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; log.msg(&quot;WriteFileOp.initiateOp&quot;, time.time())<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.reactor.issueWriteFile(handle, buffer, self.ovDone, (handle, buffer))</p></div><br>&nbsp;</div>