| 1 |
|
|---|
| 2 |
|
|---|
| 3 |
|
|---|
| 4 |
|
|---|
| 5 |
""" |
|---|
| 6 |
A win32event based implementation of the Twisted main loop. |
|---|
| 7 |
|
|---|
| 8 |
This requires win32all or ActivePython to be installed. |
|---|
| 9 |
|
|---|
| 10 |
Maintainer: Itamar Shtull-Trauring |
|---|
| 11 |
|
|---|
| 12 |
|
|---|
| 13 |
LIMITATIONS: |
|---|
| 14 |
1. WaitForMultipleObjects and thus the event loop can only handle 64 objects. |
|---|
| 15 |
2. Process running has some problems (see Process docstring). |
|---|
| 16 |
|
|---|
| 17 |
|
|---|
| 18 |
TODO: |
|---|
| 19 |
1. Event loop handling of writes is *very* problematic (this is causing failed tests). |
|---|
| 20 |
Switch to doing it the correct way, whatever that means (see below). |
|---|
| 21 |
2. Replace icky socket loopback waker with event based waker (use dummyEvent object) |
|---|
| 22 |
3. Switch everyone to using Free Software so we don't have to deal with proprietary APIs. |
|---|
| 23 |
|
|---|
| 24 |
|
|---|
| 25 |
ALTERNATIVE SOLUTIONS: |
|---|
| 26 |
- IIRC, sockets can only be registered once. So we switch to a structure |
|---|
| 27 |
like the poll() reactor, thus allowing us to deal with write events in |
|---|
| 28 |
a decent fashion. This should allow us to pass tests, but we're still |
|---|
| 29 |
limited to 64 events. |
|---|
| 30 |
|
|---|
| 31 |
Or: |
|---|
| 32 |
|
|---|
| 33 |
- Instead of doing a reactor, we make this an addon to the select reactor. |
|---|
| 34 |
The WFMO event loop runs in a separate thread. This means no need to maintain |
|---|
| 35 |
separate code for networking, 64 event limit doesn't apply to sockets, |
|---|
| 36 |
we can run processes and other win32 stuff in default event loop. The |
|---|
| 37 |
only problem is that we're stuck with the icky socket based waker. |
|---|
| 38 |
Another benefit is that this could be extended to support >64 events |
|---|
| 39 |
in a simpler manner than the previous solution. |
|---|
| 40 |
|
|---|
| 41 |
The 2nd solution is probably what will get implemented. |
|---|
| 42 |
""" |
|---|
| 43 |
|
|---|
| 44 |
|
|---|
| 45 |
import time |
|---|
| 46 |
import sys |
|---|
| 47 |
|
|---|
| 48 |
from zope.interface import implements |
|---|
| 49 |
|
|---|
| 50 |
|
|---|
| 51 |
from win32file import WSAEventSelect, FD_READ, FD_CLOSE, FD_ACCEPT, FD_CONNECT |
|---|
| 52 |
from win32event import CreateEvent, MsgWaitForMultipleObjects |
|---|
| 53 |
from win32event import WAIT_OBJECT_0, WAIT_TIMEOUT, QS_ALLINPUT, QS_ALLEVENTS |
|---|
| 54 |
|
|---|
| 55 |
import win32gui |
|---|
| 56 |
|
|---|
| 57 |
|
|---|
| 58 |
from twisted.internet import posixbase |
|---|
| 59 |
from twisted.python import log, threadable, failure |
|---|
| 60 |
from twisted.internet.interfaces import IReactorFDSet, IReactorProcess |
|---|
| 61 |
|
|---|
| 62 |
from twisted.internet._dumbwin32proc import Process |
|---|
| 63 |
|
|---|
| 64 |
|
|---|
| 65 |
class Win32Reactor(posixbase.PosixReactorBase): |
|---|
| 66 |
""" |
|---|
| 67 |
Reactor that uses Win32 event APIs. |
|---|
| 68 |
|
|---|
| 69 |
@ivar _reads: A dictionary mapping L{FileDescriptor} instances to a |
|---|
| 70 |
win32 event object used to check for read events for that descriptor. |
|---|
| 71 |
|
|---|
| 72 |
@ivar _writes: A dictionary mapping L{FileDescriptor} instances to a |
|---|
| 73 |
arbitrary value. Keys in this dictionary will be given a chance to |
|---|
| 74 |
write out their data. |
|---|
| 75 |
|
|---|
| 76 |
@ivar _events: A dictionary mapping win32 event object to tuples of |
|---|
| 77 |
L{FileDescriptor} instances and event masks. |
|---|
| 78 |
""" |
|---|
| 79 |
implements(IReactorFDSet, IReactorProcess) |
|---|
| 80 |
|
|---|
| 81 |
dummyEvent = CreateEvent(None, 0, 0, None) |
|---|
| 82 |
|
|---|
| 83 |
def __init__(self): |
|---|
| 84 |
self._reads = {} |
|---|
| 85 |
self._writes = {} |
|---|
| 86 |
self._events = {} |
|---|
| 87 |
posixbase.PosixReactorBase.__init__(self) |
|---|
| 88 |
|
|---|
| 89 |
|
|---|
| 90 |
def _makeSocketEvent(self, fd, action, why): |
|---|
| 91 |
""" |
|---|
| 92 |
Make a win32 event object for a socket. |
|---|
| 93 |
""" |
|---|
| 94 |
event = CreateEvent(None, 0, 0, None) |
|---|
| 95 |
WSAEventSelect(fd, event, why) |
|---|
| 96 |
self._events[event] = (fd, action) |
|---|
| 97 |
return event |
|---|
| 98 |
|
|---|
| 99 |
|
|---|
| 100 |
def addEvent(self, event, fd, action): |
|---|
| 101 |
""" |
|---|
| 102 |
Add a new win32 event to the event loop. |
|---|
| 103 |
""" |
|---|
| 104 |
self._events[event] = (fd, action) |
|---|
| 105 |
|
|---|
| 106 |
|
|---|
| 107 |
def removeEvent(self, event): |
|---|
| 108 |
""" |
|---|
| 109 |
Remove an event. |
|---|
| 110 |
""" |
|---|
| 111 |
del self._events[event] |
|---|
| 112 |
|
|---|
| 113 |
|
|---|
| 114 |
def addReader(self, reader): |
|---|
| 115 |
""" |
|---|
| 116 |
Add a socket FileDescriptor for notification of data available to read. |
|---|
| 117 |
""" |
|---|
| 118 |
if reader not in self._reads: |
|---|
| 119 |
self._reads[reader] = self._makeSocketEvent( |
|---|
| 120 |
reader, 'doRead', FD_READ | FD_ACCEPT | FD_CONNECT | FD_CLOSE) |
|---|
| 121 |
|
|---|
| 122 |
def addWriter(self, writer): |
|---|
| 123 |
""" |
|---|
| 124 |
Add a socket FileDescriptor for notification of data available to write. |
|---|
| 125 |
""" |
|---|
| 126 |
if writer not in self._writes: |
|---|
| 127 |
self._writes[writer] = 1 |
|---|
| 128 |
|
|---|
| 129 |
def removeReader(self, reader): |
|---|
| 130 |
"""Remove a Selectable for notification of data available to read. |
|---|
| 131 |
""" |
|---|
| 132 |
if reader in self._reads: |
|---|
| 133 |
del self._events[self._reads[reader]] |
|---|
| 134 |
del self._reads[reader] |
|---|
| 135 |
|
|---|
| 136 |
def removeWriter(self, writer): |
|---|
| 137 |
"""Remove a Selectable for notification of data available to write. |
|---|
| 138 |
""" |
|---|
| 139 |
if writer in self._writes: |
|---|
| 140 |
del self._writes[writer] |
|---|
| 141 |
|
|---|
| 142 |
def removeAll(self): |
|---|
| 143 |
""" |
|---|
| 144 |
Remove all selectables, and return a list of them. |
|---|
| 145 |
""" |
|---|
| 146 |
return self._removeAll(self._reads, self._writes) |
|---|
| 147 |
|
|---|
| 148 |
|
|---|
| 149 |
def getReaders(self): |
|---|
| 150 |
return self._reads.keys() |
|---|
| 151 |
|
|---|
| 152 |
|
|---|
| 153 |
def getWriters(self): |
|---|
| 154 |
return self._writes.keys() |
|---|
| 155 |
|
|---|
| 156 |
|
|---|
| 157 |
def doWaitForMultipleEvents(self, timeout): |
|---|
| 158 |
log.msg(channel='system', event='iteration', reactor=self) |
|---|
| 159 |
if timeout is None: |
|---|
| 160 |
|
|---|
| 161 |
timeout = 100 |
|---|
| 162 |
else: |
|---|
| 163 |
timeout = int(timeout * 1000) |
|---|
| 164 |
|
|---|
| 165 |
if not (self._events or self._writes): |
|---|
| 166 |
|
|---|
| 167 |
time.sleep(timeout / 1000.0) |
|---|
| 168 |
return |
|---|
| 169 |
|
|---|
| 170 |
canDoMoreWrites = 0 |
|---|
| 171 |
for fd in self._writes.keys(): |
|---|
| 172 |
if log.callWithLogger(fd, self._runWrite, fd): |
|---|
| 173 |
canDoMoreWrites = 1 |
|---|
| 174 |
|
|---|
| 175 |
if canDoMoreWrites: |
|---|
| 176 |
timeout = 0 |
|---|
| 177 |
|
|---|
| 178 |
handles = self._events.keys() or [self.dummyEvent] |
|---|
| 179 |
val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT | QS_ALLEVENTS) |
|---|
| 180 |
if val == WAIT_TIMEOUT: |
|---|
| 181 |
return |
|---|
| 182 |
elif val == WAIT_OBJECT_0 + len(handles): |
|---|
| 183 |
exit = win32gui.PumpWaitingMessages() |
|---|
| 184 |
if exit: |
|---|
| 185 |
self.callLater(0, self.stop) |
|---|
| 186 |
return |
|---|
| 187 |
elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles): |
|---|
| 188 |
fd, action = self._events[handles[val - WAIT_OBJECT_0]] |
|---|
| 189 |
log.callWithLogger(fd, self._runAction, action, fd) |
|---|
| 190 |
|
|---|
| 191 |
def _runWrite(self, fd): |
|---|
| 192 |
closed = 0 |
|---|
| 193 |
try: |
|---|
| 194 |
closed = fd.doWrite() |
|---|
| 195 |
except: |
|---|
| 196 |
closed = sys.exc_info()[1] |
|---|
| 197 |
log.deferr() |
|---|
| 198 |
|
|---|
| 199 |
if closed: |
|---|
| 200 |
self.removeReader(fd) |
|---|
| 201 |
self.removeWriter(fd) |
|---|
| 202 |
try: |
|---|
| 203 |
fd.connectionLost(failure.Failure(closed)) |
|---|
| 204 |
except: |
|---|
| 205 |
log.deferr() |
|---|
| 206 |
elif closed is None: |
|---|
| 207 |
return 1 |
|---|
| 208 |
|
|---|
| 209 |
def _runAction(self, action, fd): |
|---|
| 210 |
try: |
|---|
| 211 |
closed = getattr(fd, action)() |
|---|
| 212 |
except: |
|---|
| 213 |
closed = sys.exc_info()[1] |
|---|
| 214 |
log.deferr() |
|---|
| 215 |
|
|---|
| 216 |
if closed: |
|---|
| 217 |
self._disconnectSelectable(fd, closed, action == 'doRead') |
|---|
| 218 |
|
|---|
| 219 |
doIteration = doWaitForMultipleEvents |
|---|
| 220 |
|
|---|
| 221 |
def spawnProcess(self, processProtocol, executable, args=(), env={}, path=None, uid=None, gid=None, usePTY=0, childFDs=None): |
|---|
| 222 |
"""Spawn a process.""" |
|---|
| 223 |
if uid is not None: |
|---|
| 224 |
raise ValueError("Setting UID is unsupported on this platform.") |
|---|
| 225 |
if gid is not None: |
|---|
| 226 |
raise ValueError("Setting GID is unsupported on this platform.") |
|---|
| 227 |
if usePTY: |
|---|
| 228 |
raise ValueError("PTYs are unsupported on this platform.") |
|---|
| 229 |
if childFDs is not None: |
|---|
| 230 |
raise ValueError( |
|---|
| 231 |
"Custom child file descriptor mappings are unsupported on " |
|---|
| 232 |
"this platform.") |
|---|
| 233 |
args, env = self._checkProcessArgs(args, env) |
|---|
| 234 |
return Process(self, processProtocol, executable, args, env, path) |
|---|
| 235 |
|
|---|
| 236 |
|
|---|
| 237 |
def install(): |
|---|
| 238 |
threadable.init(1) |
|---|
| 239 |
r = Win32Reactor() |
|---|
| 240 |
import main |
|---|
| 241 |
main.installReactor(r) |
|---|
| 242 |
|
|---|
| 243 |
|
|---|
| 244 |
__all__ = ["Win32Reactor", "install"] |
|---|