<div><span class="gmail_quote">On 7/11/05, <b class="gmail_sendername">Pavel Pergamenshchik</b> <<a href="mailto:pp64@codelock.com">pp64@codelock.com</a>> wrote:</span>
<blockquote class="gmail_quote" style="PADDING-LEFT: 1ex; MARGIN: 0px 0px 0px 0.8ex; BORDER-LEFT: #ccc 1px solid">On Mon, 11 Jul 2005 10:52:14 -0500<br>Justin Johnson <<a href="mailto:justinjohnson@gmail.com">justinjohnson@gmail.com
</a>> wrote:<br><br>> I am attempting to add spawnProcess to iocpreactor. In order to begin<br>this<br>> task I've had to do a lot of reading on Windows network programming,<br>> specifically the various Windows I/O methods, to attempt to understand
<br>what<br>> win32eventreactor and iocpreactor are doing, and also just increase my<br><br>> understanding of how reactors work in general. To understand the<br>various<br>> Winsock 2 methods that both of these reactors rely upon, I read
<br>chapters 1-5<br>> of Network Programming for Microsoft Windows[1].<br>> Before actually attempting to add spawnProcess, I would like to<br>present how<br>> I think iocpreactor works and how I think I should add spawnProcess,
<br>and<br>> hopefully be corrected or confirmed in my understanding. If I'm too<br>vague<br>> there's a good chance it's because I don't understand it very well.<br>Please<br>> feel free to point out things that you might think are obvious but
<br>aren't<br>> sure I understand.<br>> How iocpreactor works<br>> ---------------------------------<br>><br>> 1. Create an IO Completion Port.<br>> 2. Create a socket and associate it with the IOCP. This is the
<br>socket<br>> we will call AcceptEx (a non-blocking accept) on. The association<br>with the<br>> IOCP is made via CreateIoCompletionPort.<br>> 3. Setup any scheduled tasks on the reactor.<br>> 4. Call AcceptEx (which doesn't block) on the socket. AcceptEx
<br>takes<br>> an overlapped structure as a parameter. Before making the call, we<br>set two<br>> attributes of the struct: the callback and callback_args which will<br>be<br>> called when an accept event completes on the socket. The Winsock 2
<br>methods<br>> don't actually call the callback. The Winsock 2 methods handle<br>copying data<br>> related to the network event that occurred on the socket into the<br>overlapped<br>> structure and making that overlapped structure available to
<br>> GetQueuedCompletionStatus. So when we handle events on sockets via<br>> GetQueuedCompletionStatus from within doIteration, we have access<br>to the<br>> data related to the event as well as the callback and callback_args
<br>we call<br>> to handle that event. The callbacks are setup in the xxxOp classes<br>in<br>> ops.py and always result in some transport method getting called<br>(such<br>> as readDone, connectionDone, etc).
<br>> 5. From within doIteration, call GetQueuedCompletionStatus (which<br>does<br>> block) with a timeout of the time until the next scheduled task<br>needs to be<br>> run. If any event occurs on the sockets currently associated with
<br>the IOCP<br>> before that time expires, GetQueuedCompletionStatus will return<br>(stop<br>> blocking). Now we have access to the overlapped structure<br>containing data<br>> associated with the event which was copied into the overlapped
<br>structure's<br>> buffer, such as data received from WSARecv calls, as well as the<br>callback<br>> and callback_args. From within doIteration we call the callbacks<br>passing in<br>> the data related to the event. Depending on the events we are
<br>handling, we<br>> may create new sockets (e.g. end point sockets in TCP connections)<br>and<br>> associate them with the IOCP as well. All Winsock 2 API calls made<br>are<br>> non-blocking accept for GetQueuedCompletionStatus.
<br>> 6. Step 5 continues until the reactor stops.<br><br>This sounds about right. Note how this is different from the usual<br>reactor thing -- iocp notifies you when the operation is _finished_,<br>not when it can success without blocking.
</blockquote>
<div> </div>
<div>Right. Understood.</div><br>
<blockquote class="gmail_quote" style="PADDING-LEFT: 1ex; MARGIN: 0px 0px 0px 0.8ex; BORDER-LEFT: #ccc 1px solid">> How to add spawnProcess<br>> ---------------------------------------<br>><br>> 1. Create the processes via Windows APIs and associate their
<br>> stdout/err with with the IOCP via CreateIoCompletionPort calls.<br>> 2. Close stdin.<br>> 3. Notify the ProcessProtocol via protocol.makeConnection (not sure<br><br>> why, looking at win32eventreactor)
<br>> 4. Receive data from stdout/err via the completion port by calling<br>> GetQueuedCompletionStatus from within doIteration. Is this really<br>possible?<br>> ProcessProtocol's methods won't get called appropriately by letting
<br>the<br>> existing callbacks in ops.py make calls to the transport (e.g.<br>> connectionDone, readDone)?<br><br>Hrm. Not quite. In iocp, you always have a read call pending<br>(ReadFileEx, for stdout/err handles). When it completes, you get a
<br>notification in GetQueuedCompletionStatus, pass the data to your<br>Protocol and schedule the read again.<br>Do that for stdout and stderr.<br>ops.py already has a wrapper for ReadFile, but it always calls readDone<br>
and readErr on your transport. You'll need to fix that.</blockquote>
<div> </div>
<div>I think we're on the same page here. See my previous emails correcting my original idea on how this would work. At this point I've defined custom xxxOp classes as follows.</div>
<div><br>class ReadOutOp(OverlappedOp):<br> def ovDone(self, ret, bytes, (handle, buffer)):<br> if ret or not bytes:<br> #self.transport.readErr(ret, bytes)<br> self.transport.outConnectionLost
()<br> else:<br> #self.transport.readDone(bytes)<br> self.transport.protocol.outReceived(bytes)</div>
<div>
<p> def initiateOp(self, handle, buffer):<br> self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))</p>
<p>class ReadErrOp(OverlappedOp):<br> def ovDone(self, ret, bytes, (handle, buffer)):<br> if ret or not bytes:<br> #self.transport.readErr(ret, bytes)<br> self.transport.errConnectionLost()
<br> else:<br> #self.transport.readDone(bytes)<br> self.transport.protocol.errReceived(bytes)</p>
<p> def initiateOp(self, handle, buffer):<br> self.reactor.issueReadFile(handle, buffer, self.ovDone, (handle, buffer))</p>
<p>class WriteInOp(OverlappedOp):<br> def ovDone(self, ret, bytes, (handle, buffer)):<br># log.msg("WriteFileOp.ovDone", time.time())<br> if ret or not bytes:<br> #self.transport.writeErr(ret, bytes)
<br> self.transport.inConnectionLost()<br> else:<br> #self.transport.writeDone(bytes)<br> pass</p>
<p> def initiateOp(self, handle, buffer):<br># log.msg("WriteFileOp.initiateOp", time.time())<br> self.reactor.issueWriteFile(handle, buffer, self.ovDone, (handle, buffer))</p></div><br> </div>