root / trunk / twisted / internet / win32eventreactor.py

Revision 24441, 7.5 kB (checked in by thijs, 1 year ago)

Merge maintainer-email-2438: Get rid of references to maintainer email addresses from code.

Author: thijs
Reviewer: exarkun
Fixes: #2438

Line 
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4
5 """
6 A win32event based implementation of the Twisted main loop.
7
8 This requires win32all or ActivePython to be installed.
9
10 Maintainer: Itamar Shtull-Trauring
11
12
13 LIMITATIONS:
14  1. WaitForMultipleObjects and thus the event loop can only handle 64 objects.
15  2. Process running has some problems (see Process docstring).
16
17
18 TODO:
19  1. Event loop handling of writes is *very* problematic (this is causing failed tests).
20     Switch to doing it the correct way, whatever that means (see below).
21  2. Replace icky socket loopback waker with event based waker (use dummyEvent object)
22  3. Switch everyone to using Free Software so we don't have to deal with proprietary APIs.
23
24
25 ALTERNATIVE SOLUTIONS:
26  - IIRC, sockets can only be registered once. So we switch to a structure
27    like the poll() reactor, thus allowing us to deal with write events in
28    a decent fashion. This should allow us to pass tests, but we're still
29    limited to 64 events.
30
31 Or:
32
33  - Instead of doing a reactor, we make this an addon to the select reactor.
34    The WFMO event loop runs in a separate thread. This means no need to maintain
35    separate code for networking, 64 event limit doesn't apply to sockets,
36    we can run processes and other win32 stuff in default event loop. The
37    only problem is that we're stuck with the icky socket based waker.
38    Another benefit is that this could be extended to support >64 events
39    in a simpler manner than the previous solution.
40
41 The 2nd solution is probably what will get implemented.
42 """
43
44 # System imports
45 import time
46 import sys
47
48 from zope.interface import implements
49
50 # Win32 imports
51 from win32file import WSAEventSelect, FD_READ, FD_CLOSE, FD_ACCEPT, FD_CONNECT
52 from win32event import CreateEvent, MsgWaitForMultipleObjects
53 from win32event import WAIT_OBJECT_0, WAIT_TIMEOUT, QS_ALLINPUT, QS_ALLEVENTS
54
55 import win32gui
56
57 # Twisted imports
58 from twisted.internet import posixbase
59 from twisted.python import log, threadable, failure
60 from twisted.internet.interfaces import IReactorFDSet, IReactorProcess
61
62 from twisted.internet._dumbwin32proc import Process
63
64
65 class Win32Reactor(posixbase.PosixReactorBase):
66     """
67     Reactor that uses Win32 event APIs.
68
69     @ivar _reads: A dictionary mapping L{FileDescriptor} instances to a
70         win32 event object used to check for read events for that descriptor.
71
72     @ivar _writes: A dictionary mapping L{FileDescriptor} instances to a
73         arbitrary value.  Keys in this dictionary will be given a chance to
74         write out their data.
75
76     @ivar _events: A dictionary mapping win32 event object to tuples of
77         L{FileDescriptor} instances and event masks.
78     """
79     implements(IReactorFDSet, IReactorProcess)
80
81     dummyEvent = CreateEvent(None, 0, 0, None)
82
83     def __init__(self):
84         self._reads = {}
85         self._writes = {}
86         self._events = {}
87         posixbase.PosixReactorBase.__init__(self)
88
89
90     def _makeSocketEvent(self, fd, action, why):
91         """
92         Make a win32 event object for a socket.
93         """
94         event = CreateEvent(None, 0, 0, None)
95         WSAEventSelect(fd, event, why)
96         self._events[event] = (fd, action)
97         return event
98
99
100     def addEvent(self, event, fd, action):
101         """
102         Add a new win32 event to the event loop.
103         """
104         self._events[event] = (fd, action)
105
106
107     def removeEvent(self, event):
108         """
109         Remove an event.
110         """
111         del self._events[event]
112
113
114     def addReader(self, reader):
115         """
116         Add a socket FileDescriptor for notification of data available to read.
117         """
118         if reader not in self._reads:
119             self._reads[reader] = self._makeSocketEvent(
120                 reader, 'doRead', FD_READ | FD_ACCEPT | FD_CONNECT | FD_CLOSE)
121
122     def addWriter(self, writer):
123         """
124         Add a socket FileDescriptor for notification of data available to write.
125         """
126         if writer not in self._writes:
127             self._writes[writer] = 1
128
129     def removeReader(self, reader):
130         """Remove a Selectable for notification of data available to read.
131         """
132         if reader in self._reads:
133             del self._events[self._reads[reader]]
134             del self._reads[reader]
135
136     def removeWriter(self, writer):
137         """Remove a Selectable for notification of data available to write.
138         """
139         if writer in self._writes:
140             del self._writes[writer]
141
142     def removeAll(self):
143         """
144         Remove all selectables, and return a list of them.
145         """
146         return self._removeAll(self._reads, self._writes)
147
148
149     def getReaders(self):
150         return self._reads.keys()
151
152
153     def getWriters(self):
154         return self._writes.keys()
155
156
157     def doWaitForMultipleEvents(self, timeout):
158         log.msg(channel='system', event='iteration', reactor=self)
159         if timeout is None:
160             #timeout = INFINITE
161             timeout = 100
162         else:
163             timeout = int(timeout * 1000)
164
165         if not (self._events or self._writes):
166             # sleep so we don't suck up CPU time
167             time.sleep(timeout / 1000.0)
168             return
169
170         canDoMoreWrites = 0
171         for fd in self._writes.keys():
172             if log.callWithLogger(fd, self._runWrite, fd):
173                 canDoMoreWrites = 1
174
175         if canDoMoreWrites:
176             timeout = 0
177
178         handles = self._events.keys() or [self.dummyEvent]
179         val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT | QS_ALLEVENTS)
180         if val == WAIT_TIMEOUT:
181             return
182         elif val == WAIT_OBJECT_0 + len(handles):
183             exit = win32gui.PumpWaitingMessages()
184             if exit:
185                 self.callLater(0, self.stop)
186                 return
187         elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles):
188             fd, action = self._events[handles[val - WAIT_OBJECT_0]]
189             log.callWithLogger(fd, self._runAction, action, fd)
190
191     def _runWrite(self, fd):
192         closed = 0
193         try:
194             closed = fd.doWrite()
195         except:
196             closed = sys.exc_info()[1]
197             log.deferr()
198
199         if closed:
200             self.removeReader(fd)
201             self.removeWriter(fd)
202             try:
203                 fd.connectionLost(failure.Failure(closed))
204             except:
205                 log.deferr()
206         elif closed is None:
207             return 1
208
209     def _runAction(self, action, fd):
210         try:
211             closed = getattr(fd, action)()
212         except:
213             closed = sys.exc_info()[1]
214             log.deferr()
215
216         if closed:
217             self._disconnectSelectable(fd, closed, action == 'doRead')
218
219     doIteration = doWaitForMultipleEvents
220
221     def spawnProcess(self, processProtocol, executable, args=(), env={}, path=None, uid=None, gid=None, usePTY=0, childFDs=None):
222         """Spawn a process."""
223         if uid is not None:
224             raise ValueError("Setting UID is unsupported on this platform.")
225         if gid is not None:
226             raise ValueError("Setting GID is unsupported on this platform.")
227         if usePTY:
228             raise ValueError("PTYs are unsupported on this platform.")
229         if childFDs is not None:
230             raise ValueError(
231                 "Custom child file descriptor mappings are unsupported on "
232                 "this platform.")
233         args, env = self._checkProcessArgs(args, env)
234         return Process(self, processProtocol, executable, args, env, path)
235
236
237 def install():
238     threadable.init(1)
239     r = Win32Reactor()
240     import main
241     main.installReactor(r)
242
243
244 __all__ = ["Win32Reactor", "install"]
Note: See TracBrowser for help on using the browser.