root / trunk / twisted / internet / posixbase.py

Revision 26118, 13.8 kB (checked in by exarkun, 5 months ago)

Merge internal-fd-tracking-3602

Author: exarkun, itamarst
Reviewer: therve
Fixes: #3602

Refactor the reactor's internal tracking of file descriptors, particular
with respect to "internal" file descriptors which are created and used by
the reactor itself, not by applications.

Line 
1 # -*- test-case-name: twisted.test.test_internet -*-
2 # Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Posix reactor base class
7 """
8
9 import warnings
10 import socket
11 import errno
12 import os
13
14 from zope.interface import implements, classImplements
15
16 from twisted.python.compat import set
17 from twisted.internet.interfaces import IReactorUNIX, IReactorUNIXDatagram
18 from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorSSL, IReactorArbitrary
19 from twisted.internet.interfaces import IReactorProcess, IReactorMulticast
20 from twisted.internet.interfaces import IHalfCloseableDescriptor
21 from twisted.internet import error
22 from twisted.internet import tcp, udp
23
24 from twisted.python import log, failure, util
25 from twisted.persisted import styles
26 from twisted.python.runtime import platformType, platform
27
28 from twisted.internet.base import ReactorBase, _SignalReactorMixin
29
30 try:
31     from twisted.internet import ssl
32     sslEnabled = True
33 except ImportError:
34     sslEnabled = False
35
36 try:
37     from twisted.internet import unix
38     unixEnabled = True
39 except ImportError:
40     unixEnabled = False
41
42 processEnabled = False
43 if platformType == 'posix':
44     from twisted.internet import fdesc
45     import process
46     processEnabled = True
47
48 if platform.isWindows():
49     try:
50         import win32process
51         processEnabled = True
52     except ImportError:
53         win32process = None
54
55
56 class _Win32Waker(log.Logger, styles.Ephemeral):
57     """I am a workaround for the lack of pipes on win32.
58
59     I am a pair of connected sockets which can wake up the main loop
60     from another thread.
61     """
62     disconnected = 0
63
64     def __init__(self, reactor):
65         """Initialize.
66         """
67         self.reactor = reactor
68         # Following select_trigger (from asyncore)'s example;
69         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
70         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
71         client.setsockopt(socket.IPPROTO_TCP, 1, 1)
72         server.bind(('127.0.0.1', 0))
73         server.listen(1)
74         client.connect(server.getsockname())
75         reader, clientaddr = server.accept()
76         client.setblocking(0)
77         reader.setblocking(0)
78         self.r = reader
79         self.w = client
80         self.fileno = self.r.fileno
81
82     def wakeUp(self):
83         """Send a byte to my connection.
84         """
85         try:
86             util.untilConcludes(self.w.send, 'x')
87         except socket.error, (err, msg):
88             if err != errno.WSAEWOULDBLOCK:
89                 raise
90
91     def doRead(self):
92         """Read some data from my connection.
93         """
94         try:
95             self.r.recv(8192)
96         except socket.error:
97             pass
98
99     def connectionLost(self, reason):
100         self.r.close()
101         self.w.close()
102
103
104 class _UnixWaker(log.Logger, styles.Ephemeral):
105     """This class provides a simple interface to wake up the event loop.
106
107     This is used by threads or signals to wake up the event loop.
108     """
109     disconnected = 0
110
111     i = None
112     o = None
113
114     def __init__(self, reactor):
115         """Initialize.
116         """
117         self.reactor = reactor
118         self.i, self.o = os.pipe()
119         fdesc.setNonBlocking(self.i)
120         fdesc._setCloseOnExec(self.i)
121         fdesc.setNonBlocking(self.o)
122         fdesc._setCloseOnExec(self.o)
123         self.fileno = lambda: self.i
124
125     def doRead(self):
126         """Read some bytes from the pipe.
127         """
128         fdesc.readFromFD(self.fileno(), lambda data: None)
129
130     def wakeUp(self):
131         """Write one byte to the pipe, and flush it.
132         """
133         # We don't use fdesc.writeToFD since we need to distinguish
134         # between EINTR (try again) and EAGAIN (do nothing).
135         if self.o is not None:
136             try:
137                 util.untilConcludes(os.write, self.o, 'x')
138             except OSError, e:
139                 if e.errno != errno.EAGAIN:
140                     raise
141
142     def connectionLost(self, reason):
143         """Close both ends of my pipe.
144         """
145         if not hasattr(self, "o"):
146             return
147         for fd in self.i, self.o:
148             try:
149                 os.close(fd)
150             except IOError:
151                 pass
152         del self.i, self.o
153
154
155 if platformType == 'posix':
156     _Waker = _UnixWaker
157 elif platformType == 'win32':
158     _Waker = _Win32Waker
159
160
161 class PosixReactorBase(_SignalReactorMixin, ReactorBase):
162     """
163     A basis for reactors that use file descriptors.
164     """
165     implements(IReactorArbitrary, IReactorTCP, IReactorUDP, IReactorMulticast)
166
167     def __init__(self):
168         ReactorBase.__init__(self)
169         if self.usingThreads or platformType == "posix":
170             self.installWaker()
171
172
173     def _disconnectSelectable(self, selectable, why, isRead, faildict={
174         error.ConnectionDone: failure.Failure(error.ConnectionDone()),
175         error.ConnectionLost: failure.Failure(error.ConnectionLost())
176         }):
177         """
178         Utility function for disconnecting a selectable.
179
180         Supports half-close notification, isRead should be boolean indicating
181         whether error resulted from doRead().
182         """
183         self.removeReader(selectable)
184         f = faildict.get(why.__class__)
185         if f:
186             if (isRead and why.__class__ ==  error.ConnectionDone
187                 and IHalfCloseableDescriptor.providedBy(selectable)):
188                 selectable.readConnectionLost(f)
189             else:
190                 self.removeWriter(selectable)
191                 selectable.connectionLost(f)
192         else:
193             self.removeWriter(selectable)
194             selectable.connectionLost(failure.Failure(why))
195
196     def installWaker(self):
197         """
198         Install a `waker' to allow threads and signals to wake up the IO thread.
199
200         We use the self-pipe trick (http://cr.yp.to/docs/selfpipe.html) to wake
201         the reactor. On Windows we use a pair of sockets.
202         """
203         if not self.waker:
204             self.waker = _Waker(self)
205             self._internalReaders.add(self.waker)
206             self.addReader(self.waker)
207
208
209     # IReactorProcess
210
211     def spawnProcess(self, processProtocol, executable, args=(),
212                      env={}, path=None,
213                      uid=None, gid=None, usePTY=0, childFDs=None):
214         args, env = self._checkProcessArgs(args, env)
215         if platformType == 'posix':
216             if usePTY:
217                 if childFDs is not None:
218                     raise ValueError("Using childFDs is not supported with usePTY=True.")
219                 return process.PTYProcess(self, executable, args, env, path,
220                                           processProtocol, uid, gid, usePTY)
221             else:
222                 return process.Process(self, executable, args, env, path,
223                                        processProtocol, uid, gid, childFDs)
224         elif platformType == "win32":
225             if uid is not None or gid is not None:
226                 raise ValueError("The uid and gid parameters are not supported on Windows.")
227             if usePTY:
228                 raise ValueError("The usePTY parameter is not supported on Windows.")
229             if childFDs:
230                 raise ValueError("Customizing childFDs is not supported on Windows.")
231
232             if win32process:
233                 from twisted.internet._dumbwin32proc import Process
234                 return Process(self, processProtocol, executable, args, env, path)
235             else:
236                 raise NotImplementedError, "spawnProcess not available since pywin32 is not installed."
237         else:
238             raise NotImplementedError, "spawnProcess only available on Windows or POSIX."
239
240     # IReactorUDP
241
242     def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
243         """Connects a given L{DatagramProtocol} to the given numeric UDP port.
244
245         @returns: object conforming to L{IListeningPort}.
246         """
247         p = udp.Port(port, protocol, interface, maxPacketSize, self)
248         p.startListening()
249         return p
250
251     def connectUDP(self, remotehost, remoteport, protocol, localport=0,
252                   interface='', maxPacketSize=8192):
253         """DEPRECATED.
254
255         Connects a L{ConnectedDatagramProtocol} instance to a UDP port.
256         """
257         warnings.warn("use listenUDP and then transport.connect().", DeprecationWarning, stacklevel=2)
258         p = udp.ConnectedPort((remotehost, remoteport), localport, protocol, interface, maxPacketSize, self)
259         p.startListening()
260         return p
261
262
263     # IReactorMulticast
264
265     def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192, listenMultiple=False):
266         """Connects a given DatagramProtocol to the given numeric UDP port.
267
268         EXPERIMENTAL.
269
270         @returns: object conforming to IListeningPort.
271         """
272         p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self, listenMultiple)
273         p.startListening()
274         return p
275
276
277     # IReactorUNIX
278
279     def connectUNIX(self, address, factory, timeout=30, checkPID=0):
280         """@see: twisted.internet.interfaces.IReactorUNIX.connectUNIX
281         """
282         assert unixEnabled, "UNIX support is not present"
283         c = unix.Connector(address, factory, timeout, self, checkPID)
284         c.connect()
285         return c
286
287     _unspecified = object()
288     def _checkMode(self, name, mode):
289         """
290         Check C{mode} to see if a value was specified for it and emit a
291         deprecation warning if so.  Return the default value if none was
292         specified, otherwise return C{mode}.
293         """
294         if mode is not self._unspecified:
295             warnings.warn(
296                 'The mode parameter of %(name)s will be removed.  Do not pass '
297                 'a value for it.  Set permissions on the containing directory '
298                 'before calling %(name)s, instead.' % dict(name=name),
299                 category=DeprecationWarning,
300                 stacklevel=3)
301         else:
302             mode = 0666
303         return mode
304
305
306     def listenUNIX(self, address, factory, backlog=50, mode=_unspecified,
307                    wantPID=0):
308         """
309         @see: twisted.internet.interfaces.IReactorUNIX.listenUNIX
310         """
311         assert unixEnabled, "UNIX support is not present"
312         mode = self._checkMode('IReactorUNIX.listenUNIX', mode)
313         p = unix.Port(address, factory, backlog, mode, self, wantPID)
314         p.startListening()
315         return p
316
317
318     # IReactorUNIXDatagram
319
320     def listenUNIXDatagram(self, address, protocol, maxPacketSize=8192,
321                            mode=_unspecified):
322         """
323         Connects a given L{DatagramProtocol} to the given path.
324
325         EXPERIMENTAL.
326
327         @returns: object conforming to L{IListeningPort}.
328         """
329         assert unixEnabled, "UNIX support is not present"
330         mode = self._checkMode('IReactorUNIXDatagram.listenUNIXDatagram', mode)
331         p = unix.DatagramPort(address, protocol, maxPacketSize, mode, self)
332         p.startListening()
333         return p
334
335     def connectUNIXDatagram(self, address, protocol, maxPacketSize=8192,
336                             mode=_unspecified, bindAddress=None):
337         """
338         Connects a L{ConnectedDatagramProtocol} instance to a path.
339
340         EXPERIMENTAL.
341         """
342         assert unixEnabled, "UNIX support is not present"
343         mopde = self._checkMode('IReactorUNIXDatagram.connectUNIXDatagram', mode)
344         p = unix.ConnectedDatagramPort(address, protocol, maxPacketSize, mode, bindAddress, self)
345         p.startListening()
346         return p
347
348
349     # IReactorTCP
350
351     def listenTCP(self, port, factory, backlog=50, interface=''):
352         """@see: twisted.internet.interfaces.IReactorTCP.listenTCP
353         """
354         p = tcp.Port(port, factory, backlog, interface, self)
355         p.startListening()
356         return p
357
358     def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
359         """@see: twisted.internet.interfaces.IReactorTCP.connectTCP
360         """
361         c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
362         c.connect()
363         return c
364
365     # IReactorSSL (sometimes, not implemented)
366
367     def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
368         """@see: twisted.internet.interfaces.IReactorSSL.connectSSL
369         """
370         assert sslEnabled, "SSL support is not present"
371         c = ssl.Connector(host, port, factory, contextFactory, timeout, bindAddress, self)
372         c.connect()
373         return c
374
375     def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
376         """@see: twisted.internet.interfaces.IReactorSSL.listenSSL
377         """
378         assert sslEnabled, "SSL support is not present"
379         p = ssl.Port(port, factory, contextFactory, backlog, interface, self)
380         p.startListening()
381         return p
382
383     # IReactorArbitrary
384     def listenWith(self, portType, *args, **kw):
385         kw['reactor'] = self
386         p = portType(*args, **kw)
387         p.startListening()
388         return p
389
390     def connectWith(self, connectorType, *args, **kw):
391         kw['reactor'] = self
392         c = connectorType(*args, **kw)
393         c.connect()
394         return c
395
396     def _removeAll(self, readers, writers):
397         """
398         Remove all readers and writers, and list of removed L{IReadDescriptor}s
399         and L{IWriteDescriptor}s.
400
401         Meant for calling from subclasses, to implement removeAll, like::
402
403           def removeAll(self):
404               return self._removeAll(self._reads, self._writes)
405
406         where C{self._reads} and C{self._writes} are iterables.
407         """
408         removedReaders = set(readers) - self._internalReaders
409         for reader in removedReaders:
410             self.removeReader(reader)
411
412         removedWriters = set(writers)
413         for writer in removedWriters:
414             self.removeWriter(writer)
415
416         return list(removedReaders | removedWriters)
417
418
419 if sslEnabled:
420     classImplements(PosixReactorBase, IReactorSSL)
421 if unixEnabled:
422     classImplements(PosixReactorBase, IReactorUNIX, IReactorUNIXDatagram)
423 if processEnabled:
424     classImplements(PosixReactorBase, IReactorProcess)
425
426 __all__ = ["PosixReactorBase"]
Note: See TracBrowser for help on using the browser.