root / trunk / twisted / internet / _pollingfile.py

Revision 16524, 7.8 kB (checked in by glyph, 3 years ago)

Further enhance win32 portability of subprocess support.

Author: glyph

Reviewer: jerub

Fixes #1553 -- finally.

twisted.internet.stdio previously did not support Windows properly: it did not
provide IConsumer, IHalfCloseableXXX or IProducer. This merge fixes those
problems, making it much easier to spawn subprocesses which themselves use
Twisted. Much better test coverage of stdio for all platforms is also
included.

Line 
1 # -*- test-case-name: twisted.web2.test -*-
2 """
3
4 Implements a simple polling interface for file descriptors that don't work with
5 select() - this is pretty much only useful on Windows.
6
7 """
8
9 from zope.interface import implements
10
11 from twisted.internet.interfaces import IConsumer, IPushProducer
12
13 MIN_TIMEOUT = 0.000000001
14 MAX_TIMEOUT = 0.1
15
16 class _PollableResource:
17     active = True
18
19     def activate(self):
20         self.active = True
21
22     def deactivate(self):
23         self.active = False
24
25 class _PollingTimer:
26     # Everything is private here because it is really an implementation detail.
27
28     def __init__(self, reactor):
29         self.reactor = reactor
30         self._resources = []
31         self._pollTimer = None
32         self._currentTimeout = MAX_TIMEOUT
33         self._paused = False
34
35     def _addPollableResource(self, res):
36         self._resources.append(res)
37         self._checkPollingState()
38
39     def _checkPollingState(self):
40         for resource in self._resources:
41             if resource.active:
42                 self._startPolling()
43                 break
44         else:
45             self._stopPolling()
46
47     def _startPolling(self):
48         if self._pollTimer is None:
49             self._pollTimer = self._reschedule()
50
51     def _stopPolling(self):
52         if self._pollTimer is not None:
53             self._pollTimer.cancel()
54             self._pollTimer = None
55
56     def _pause(self):
57         self._paused = True
58
59     def _unpause(self):
60         self._paused = False
61         self._checkPollingState()
62
63     def _reschedule(self):
64         if not self._paused:
65             return self.reactor.callLater(self._currentTimeout, self._pollEvent)
66
67     def _pollEvent(self):
68         workUnits = 0.
69         anyActive = []
70         for resource in self._resources:
71             if resource.active:
72                 workUnits += resource.checkWork()
73                 # Check AFTER work has been done
74                 if resource.active:
75                     anyActive.append(resource)
76
77         newTimeout = self._currentTimeout
78         if workUnits:
79             newTimeout = self._currentTimeout / (workUnits + 1.)
80             if newTimeout < MIN_TIMEOUT:
81                 newTimeout = MIN_TIMEOUT
82         else:
83             newTimeout = self._currentTimeout * 2.
84             if newTimeout > MAX_TIMEOUT:
85                 newTimeout = MAX_TIMEOUT
86         self._currentTimeout = newTimeout
87         if anyActive:
88             self._pollTimer = self._reschedule()
89
90
91 # If we ever (let's hope not) need the above functionality on UNIX, this could
92 # be factored into a different module.
93
94 import win32pipe
95 import win32file
96 import win32api
97 import pywintypes
98
99 class _PollableReadPipe(_PollableResource):
100
101     implements(IPushProducer)
102
103     def __init__(self, pipe, receivedCallback, lostCallback):
104         # security attributes for pipes
105         self.pipe = pipe
106         self.receivedCallback = receivedCallback
107         self.lostCallback = lostCallback
108
109     def checkWork(self):
110         finished = 0
111         fullDataRead = []
112
113         while 1:
114             try:
115                 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
116                 # finished = (result == -1)
117                 if not bytesToRead:
118                     break
119                 hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
120                 fullDataRead.append(data)
121             except win32api.error:
122                 finished = 1
123                 break
124
125         dataBuf = ''.join(fullDataRead)
126         if dataBuf:
127             self.receivedCallback(dataBuf)
128         if finished:
129             self.cleanup()
130         return len(dataBuf)
131
132     def cleanup(self):
133         self.deactivate()
134         self.lostCallback()
135
136     def close(self):
137         try:
138             win32api.CloseHandle(self.pipe)
139         except pywintypes.error:
140             # You can't close std handles...?
141             pass
142
143     def stopProducing(self):
144         self.close()
145
146     def pauseProducing(self):
147         self.deactivate()
148
149     def resumeProducing(self):
150         self.activate()
151
152
153 FULL_BUFFER_SIZE = 64 * 1024
154
155 class _PollableWritePipe(_PollableResource):
156
157     implements(IConsumer)
158
159     def __init__(self, writePipe, lostCallback):
160         self.disconnecting = False
161         self.producer = None
162         self.producerPaused = 0
163         self.streamingProducer = 0
164         self.outQueue = []
165         self.writePipe = writePipe
166         self.lostCallback = lostCallback
167         try:
168             win32pipe.SetNamedPipeHandleState(writePipe,
169                                               win32pipe.PIPE_NOWAIT,
170                                               None,
171                                               None)
172         except pywintypes.error:
173             # Maybe it's an invalid handle.  Who knows.
174             pass
175
176     def close(self):
177         self.disconnecting = True
178
179     def bufferFull(self):
180         if self.producer is not None:
181             self.producerPaused = 1
182             self.producer.pauseProducing()
183
184     def bufferEmpty(self):
185         if self.producer is not None and ((not self.streamingProducer) or
186                                           self.producerPaused):
187             self.producer.producerPaused = 0
188             self.producer.resumeProducing()
189             return True
190         return False
191
192     # almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh
193
194     def registerProducer(self, producer, streaming):
195         """Register to receive data from a producer.
196
197         This sets this selectable to be a consumer for a producer.  When this
198         selectable runs out of data on a write() call, it will ask the producer
199         to resumeProducing(). A producer should implement the IProducer
200         interface.
201
202         FileDescriptor provides some infrastructure for producer methods.
203         """
204         if self.producer is not None:
205             raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer))
206         if not self.active:
207             producer.stopProducing()
208         else:
209             self.producer = producer
210             self.streamingProducer = streaming
211             if not streaming:
212                 producer.resumeProducing()
213
214     def unregisterProducer(self):
215         """Stop consuming data from a producer, without disconnecting.
216         """
217         self.producer = None
218
219     def writeConnectionLost(self):
220         self.deactivate()
221         try:
222             win32api.CloseHandle(self.writePipe)
223         except pywintypes.error:
224             # OMG what
225             pass
226         self.lostCallback()
227
228     def writeSequence(self, seq):
229         self.outQueue.extend(seq)
230
231     def write(self, data):
232         if self.disconnecting:
233             return
234         self.outQueue.append(data)
235         if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
236             self.bufferFull()
237
238     def checkWork(self):
239         numBytesWritten = 0
240         if not self.outQueue:
241             if self.disconnecting:
242                 self.writeConnectionLost()
243                 return 0
244             try:
245                 win32file.WriteFile(self.writePipe, '', None)
246             except pywintypes.error:
247                 self.writeConnectionLost()
248                 return numBytesWritten
249         while self.outQueue:
250             data = self.outQueue.pop(0)
251             errCode = 0
252             try:
253                 errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
254                                                              data, None)
255             except win32api.error:
256                 self.writeConnectionLost()
257                 break
258             else:
259                 # assert not errCode, "wtf an error code???"
260                 numBytesWritten += nBytesWritten
261                 if len(data) > nBytesWritten:
262                     self.outQueue.insert(0, data[nBytesWritten:])
263                     break
264         else:
265             resumed = self.bufferEmpty()
266             if not resumed and self.disconnecting:
267                 self.writeConnectionLost()
268         return numBytesWritten
269
270
Note: See TracBrowser for help on using the browser.