root / trunk / twisted / internet / iocpreactor / reactor.py

Revision 26919, 8.8 kB (checked in by exarkun, 2 months ago)

Merge iocp-ssl-593-2

Author: exarkun
Reviewer: glyph
Fixes: #593

Add a ProtocolWrapper-based implementation of TLS and use it
to extend the IO Completion Ports reactor with implementations
of IReactorSSL and ITLSTransport.

One of the existing SSL test suites is also changed to remove
one of its base classes (and therefore some of the tests it
was running); these tests had nothing to do with SSL, they were
just redundant TCP tests.

Line 
1 # -*- test-case-name: twisted.internet.test.test_iocp -*-
2 # Copyright (c) 2008-2009 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Reactor that uses IO completion ports
7 """
8
9 import warnings, socket, sys
10
11 from zope.interface import implements
12
13 from twisted.internet import base, interfaces, main, error
14 from twisted.python import log, failure
15 from twisted.internet._dumbwin32proc import Process
16
17 from twisted.internet.iocpreactor import iocpsupport as _iocp
18 from twisted.internet.iocpreactor.const import WAIT_TIMEOUT
19 from twisted.internet.iocpreactor import tcp, udp
20
21 try:
22     from twisted.protocols.tls import TLSMemoryBIOFactory
23 except ImportError:
24     # Either pyOpenSSL isn't installed, or it is too old for this code to work.
25     # The reactor won't provide IReactorSSL.
26     TLSMemoryBIOFactory = None
27     _extraInterfaces = ()
28     warnings.warn(
29         "pyOpenSSL 0.10 or newer is required for SSL support in iocpreactor. "
30         "It is missing, so the reactor will not support SSL APIs.")
31 else:
32     _extraInterfaces = (interfaces.IReactorSSL,)
33
34 from twisted.python.compat import set
35
36 MAX_TIMEOUT = 2000 # 2 seconds, see doIteration for explanation
37
38 EVENTS_PER_LOOP = 1000 # XXX: what's a good value here?
39
40 # keys to associate with normal and waker events
41 KEY_NORMAL, KEY_WAKEUP = range(2)
42
43 _NO_GETHANDLE = error.ConnectionFdescWentAway(
44                     'Handler has no getFileHandle method')
45 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
46
47
48
49 class IOCPReactor(base._SignalReactorMixin, base.ReactorBase):
50     implements(interfaces.IReactorTCP, interfaces.IReactorUDP,
51                interfaces.IReactorMulticast, interfaces.IReactorProcess,
52                *_extraInterfaces)
53
54     port = None
55
56     def __init__(self):
57         base.ReactorBase.__init__(self)
58         self.port = _iocp.CompletionPort()
59         self.handles = set()
60
61
62     def addActiveHandle(self, handle):
63         self.handles.add(handle)
64
65
66     def removeActiveHandle(self, handle):
67         self.handles.discard(handle)
68
69
70     def doIteration(self, timeout):
71         # This function sits and waits for an IO completion event.
72         #
73         # There are two requirements: process IO events as soon as they arrive
74         # and process ctrl-break from the user in a reasonable amount of time.
75         #
76         # There are three kinds of waiting.
77         # 1) GetQueuedCompletionStatus (self.port.getEvent) to wait for IO
78         # events only.
79         # 2) Msg* family of wait functions that can stop waiting when
80         # ctrl-break is detected (then, I think, Python converts it into a
81         # KeyboardInterrupt)
82         # 3) *Ex family of wait functions that put the thread into an
83         # "alertable" wait state which is supposedly triggered by IO completion
84         #
85         # 2) and 3) can be combined. Trouble is, my IO completion is not
86         # causing 3) to trigger, possibly because I do not use an IO completion
87         # callback. Windows is weird.
88         # There are two ways to handle this. I could use MsgWaitForSingleObject
89         # here and GetQueuedCompletionStatus in a thread. Or I could poll with
90         # a reasonable interval. Guess what! Threads are hard.
91
92         processed_events = 0
93         if timeout is None:
94             timeout = MAX_TIMEOUT
95         else:
96             timeout = min(MAX_TIMEOUT, int(1000*timeout))
97         rc, bytes, key, evt = self.port.getEvent(timeout)
98         while processed_events < EVENTS_PER_LOOP:
99             if rc == WAIT_TIMEOUT:
100                 break
101             if key != KEY_WAKEUP:
102                 assert key == KEY_NORMAL
103                 if not evt.ignore:
104                     log.callWithLogger(evt.owner, self._callEventCallback,
105                                        rc, bytes, evt)
106                     processed_events += 1
107             rc, bytes, key, evt = self.port.getEvent(0)
108
109
110     def _callEventCallback(self, rc, bytes, evt):
111         owner = evt.owner
112         why = None
113         try:
114             evt.callback(rc, bytes, evt)
115             handfn = getattr(owner, 'getFileHandle', None)
116             if not handfn:
117                 why = _NO_GETHANDLE
118             elif handfn() == -1:
119                 why = _NO_FILEDESC
120             if why:
121                 return # ignore handles that were closed
122         except:
123             why = sys.exc_info()[1]
124             log.err()
125         if why:
126             owner.loseConnection(failure.Failure(why))
127
128
129     def installWaker(self):
130         pass
131
132
133     def wakeUp(self):
134         self.port.postEvent(0, KEY_WAKEUP, None)
135
136
137     def registerHandle(self, handle):
138         self.port.addHandle(handle, KEY_NORMAL)
139
140
141     def createSocket(self, af, stype):
142         skt = socket.socket(af, stype)
143         self.registerHandle(skt.fileno())
144         return skt
145
146
147     def listenTCP(self, port, factory, backlog=50, interface=''):
148         """
149         @see: twisted.internet.interfaces.IReactorTCP.listenTCP
150         """
151         p = tcp.Port(port, factory, backlog, interface, self)
152         p.startListening()
153         return p
154
155
156     def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
157         """
158         @see: twisted.internet.interfaces.IReactorTCP.connectTCP
159         """
160         c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
161         c.connect()
162         return c
163
164
165     if TLSMemoryBIOFactory is not None:
166         def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
167             """
168             @see: twisted.internet.interfaces.IReactorSSL.listenSSL
169             """
170             return self.listenTCP(
171                 port,
172                 TLSMemoryBIOFactory(contextFactory, False, factory),
173                 backlog, interface)
174
175
176         def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
177             """
178             @see: twisted.internet.interfaces.IReactorSSL.connectSSL
179             """
180             return self.connectTCP(
181                 host, port,
182                 TLSMemoryBIOFactory(contextFactory, True, factory),
183                 timeout, bindAddress)
184     else:
185         def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
186             """
187             Non-implementation of L{IReactorSSL.listenSSL}.  Some dependency
188             is not satisfied.  This implementation always raises
189             L{NotImplementedError}.
190             """
191             raise NotImplementedError(
192                 "pyOpenSSL 0.10 or newer is required for SSL support in "
193                 "iocpreactor. It is missing, so the reactor does not support "
194                 "SSL APIs.")
195
196
197         def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
198             """
199             Non-implementation of L{IReactorSSL.connectSSL}.  Some dependency
200             is not satisfied.  This implementation always raises
201             L{NotImplementedError}.
202             """
203             raise NotImplementedError(
204                 "pyOpenSSL 0.10 or newer is required for SSL support in "
205                 "iocpreactor. It is missing, so the reactor does not support "
206                 "SSL APIs.")
207
208
209     def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
210         """
211         Connects a given L{DatagramProtocol} to the given numeric UDP port.
212
213         @returns: object conforming to L{IListeningPort}.
214         """
215         p = udp.Port(port, protocol, interface, maxPacketSize, self)
216         p.startListening()
217         return p
218
219
220     def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192,
221                         listenMultiple=False):
222         """
223         Connects a given DatagramProtocol to the given numeric UDP port.
224
225         EXPERIMENTAL.
226
227         @returns: object conforming to IListeningPort.
228         """
229         p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self,
230                               listenMultiple)
231         p.startListening()
232         return p
233
234
235     def spawnProcess(self, processProtocol, executable, args=(), env={},
236                      path=None, uid=None, gid=None, usePTY=0, childFDs=None):
237         """
238         Spawn a process.
239         """
240         if uid is not None:
241             raise ValueError("Setting UID is unsupported on this platform.")
242         if gid is not None:
243             raise ValueError("Setting GID is unsupported on this platform.")
244         if usePTY:
245             raise ValueError("PTYs are unsupported on this platform.")
246         if childFDs is not None:
247             raise ValueError(
248                 "Custom child file descriptor mappings are unsupported on "
249                 "this platform.")
250         args, env = self._checkProcessArgs(args, env)
251         return Process(self, processProtocol, executable, args, env, path)
252
253
254     def removeAll(self):
255         res = list(self.handles)
256         self.handles.clear()
257         return res
258
259
260
261 def install():
262     r = IOCPReactor()
263     main.installReactor(r)
264
265
266 __all__ = ['IOCPReactor', 'install']
267
Note: See TracBrowser for help on using the browser.