root / trunk / twisted / web2 / wsgi.py

Revision 22598, 10.9 kB (checked in by exarkun, 1 year ago)

Merge blockingcallfromthread-arg-3030

Author: exarkun
Reviewer: therve
Fixes #3030

Change the signature of twisted.internet.threads.blockingCallFromThread to take
the reactor into which it will call as the first parameter. Update the docs and
the usage in twisted.web2 as well.

Line 
1 # -*- test-case-name: twisted.web2.test.test_wsgi -*-
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 An implementation of PEP 333: Python Web Server Gateway Interface (WSGI).
7 """
8
9 import os, threading
10 from zope.interface import implements
11
12 from twisted.internet import defer, threads
13 from twisted.internet import reactor
14 from twisted.python import log, failure
15 from twisted.web2 import http
16 from twisted.web2 import iweb
17 from twisted.web2 import server
18 from twisted.web2 import stream
19 from twisted.web2.twcgi import createCGIEnvironment
20
21
22 class AlreadyStartedResponse(Exception):
23     pass
24
25
26 # This isn't a subclass of resource.Resource, because it shouldn't do
27 # any method-specific actions at all. All that stuff is totally up to
28 # the contained wsgi application
29 class WSGIResource(object):
30     """
31     A web2 Resource which wraps the given WSGI application callable.
32
33     The WSGI application will be called in a separate thread (using
34     the reactor threadpool) whenever a request for this resource or
35     any lower part of the url hierarchy is received.
36     """
37     implements(iweb.IResource)
38
39     def __init__(self, application):
40         self.application = application
41
42     def renderHTTP(self, req):
43         # Do stuff with WSGIHandler.
44         handler = WSGIHandler(self.application, req)
45         # Get deferred
46         d = handler.responseDeferred
47         # Run it in a thread
48         reactor.callInThread(handler.run)
49         return d
50
51     def locateChild(self, request, segments):
52         return self, server.StopTraversal
53
54
55 class InputStream(object):
56     """
57     This class implements the 'wsgi.input' object. The methods are
58     expected to have the same behavior as the same-named methods for
59     python's builtin file object.
60     """
61
62     def __init__(self, newstream):
63         # Called in IO thread
64         self.stream = stream.BufferedStream(newstream)
65
66     def read(self, size=None):
67         """
68         Read at most size bytes from the input, or less if EOF is
69         encountered. If size is ommitted or negative, read until EOF.
70         """
71         # Called in application thread
72         if size < 0:
73             size = None
74         return threads.blockingCallFromThread(
75             reactor, self.stream.readExactly, size)
76
77     def readline(self, size=None):
78         """
79         Read a line, delimited by a newline. If the stream reaches EOF
80         or size bytes have been read before reaching a newline (if
81         size is given), the partial line is returned.
82
83         COMPATIBILITY NOTE: the size argument is excluded from the
84         WSGI specification, but is provided here anyhow, because
85         useful libraries such as python stdlib's cgi.py assume their
86         input file-like-object supports readline with a size
87         argument. If you use it, be aware your application may not be
88         portable to other conformant WSGI servers.
89         """
90         # Called in application thread
91         if size < 0:
92             # E.g. -1, which is the default readline size for *some*
93             # other file-like-objects...
94             size = None
95
96         return threads.blockingCallFromThread(
97             reactor, self.stream.readline, '\n', size=size)
98
99     def readlines(self, hint=None):
100         """
101         Read until EOF, collecting all lines in a list, and returns
102         that list. The hint argument is ignored (as is allowed in the
103         API specification)
104         """
105         # Called in application thread
106         data = self.read()
107         lines = data.split('\n')
108         last = lines.pop()
109         lines = [s+'\n' for s in lines]
110         if last != '':
111             lines.append(last)
112         return lines
113
114     def __iter__(self):
115         """
116         Returns an iterator, each iteration of which returns the
117         result of readline(), and stops when readline() returns an
118         empty string.
119         """
120         while 1:
121             line = self.readline()
122             if not line:
123                 return
124             yield line
125
126
127 class ErrorStream(object):
128     """
129     This class implements the 'wsgi.error' object.
130     """
131     def flush(self):
132         # Called in application thread
133         return
134
135     def write(self, s):
136         # Called in application thread
137         log.msg("WSGI app error: "+s, isError=True)
138
139     def writelines(self, seq):
140         # Called in application thread
141         s = ''.join(seq)
142         log.msg("WSGI app error: "+s, isError=True)
143
144 class WSGIHandler(object):
145     headersSent = False
146     stopped = False
147     stream = None
148
149     def __init__(self, application, request):
150         # Called in IO thread
151         self.setupEnvironment(request)
152         self.application = application
153         self.request = request
154         self.response = None
155         self.responseDeferred = defer.Deferred()
156
157     def setupEnvironment(self, request):
158         # Called in IO thread
159         env = createCGIEnvironment(request)
160         env['wsgi.version']      = (1, 0)
161         env['wsgi.url_scheme']   = env['REQUEST_SCHEME']
162         env['wsgi.input']        = InputStream(request.stream)
163         env['wsgi.errors']       = ErrorStream()
164         env['wsgi.multithread']  = True
165         env['wsgi.multiprocess'] = False
166         env['wsgi.run_once']     = False
167         env['wsgi.file_wrapper'] = FileWrapper
168         self.environment = env
169
170     def startWSGIResponse(self, status, response_headers, exc_info=None):
171         # Called in application thread
172         if exc_info is not None:
173             try:
174                 if self.headersSent:
175                     raise exc_info[0], exc_info[1], exc_info[2]
176             finally:
177                 exc_info = None
178         elif self.response is not None:
179             raise AlreadyStartedResponse, 'startWSGIResponse(%r)' % status
180         status = int(status.split(' ')[0])
181         self.response = http.Response(status)
182         for key, value in response_headers:
183             self.response.headers.addRawHeader(key, value)
184         return self.write
185
186
187     def run(self):
188         # Called in application thread
189         try:
190             result = self.application(self.environment, self.startWSGIResponse)
191             self.handleResult(result)
192         except:
193             if not self.headersSent:
194                 reactor.callFromThread(self.__error, failure.Failure())
195             else:
196                 reactor.callFromThread(self.stream.finish, failure.Failure())
197
198     def __callback(self):
199         # Called in IO thread
200         self.responseDeferred.callback(self.response)
201         self.responseDeferred = None
202
203     def __error(self, f):
204         # Called in IO thread
205         self.responseDeferred.errback(f)
206         self.responseDeferred = None
207
208     def write(self, output):
209         # Called in application thread
210         if self.response is None:
211             raise RuntimeError(
212                 "Application didn't call startResponse before writing data!")
213         if not self.headersSent:
214             self.stream=self.response.stream=stream.ProducerStream()
215             self.headersSent = True
216
217             # threadsafe event object to communicate paused state.
218             self.unpaused = threading.Event()
219
220             # After this, we cannot touch self.response from this
221             # thread any more
222             def _start():
223                 # Called in IO thread
224                 self.stream.registerProducer(self, True)
225                 self.__callback()
226                 # Notify application thread to start writing
227                 self.unpaused.set()
228             reactor.callFromThread(_start)
229         # Wait for unpaused to be true
230         self.unpaused.wait()
231         reactor.callFromThread(self.stream.write, output)
232
233     def writeAll(self, result):
234         # Called in application thread
235         if not self.headersSent:
236             if self.response is None:
237                 raise RuntimeError(
238                     "Application didn't call startResponse before writing data!")
239             l = 0
240             for item in result:
241                 l += len(item)
242             self.response.stream=stream.ProducerStream(length=l)
243             self.response.stream.buffer = list(result)
244             self.response.stream.finish()
245             reactor.callFromThread(self.__callback)
246         else:
247             # Has already been started, cannot replace the stream
248             def _write():
249                 # Called in IO thread
250                 for s in result:
251                     self.stream.write(s)
252                 self.stream.finish()
253             reactor.callFromThread(_write)
254
255
256     def handleResult(self, result):
257         # Called in application thread
258         try:
259             if (isinstance(result, FileWrapper) and
260                    hasattr(result.filelike, 'fileno') and
261                    not self.headersSent):
262                 if self.response is None:
263                     raise RuntimeError(
264                         "Application didn't call startResponse before writing data!")
265                 self.headersSent = True
266                 # Make FileStream and output it. We make a new file
267                 # object from the fd, just in case the original one
268                 # isn't an actual file object.
269                 self.response.stream = stream.FileStream(
270                     os.fdopen(os.dup(result.filelike.fileno())))
271                 reactor.callFromThread(self.__callback)
272                 return
273
274             if type(result) in (list,tuple):
275                 # If it's a list or tuple (exactly, not subtype!),
276                 # then send the entire thing down to Twisted at once,
277                 # and free up this thread to do other work.
278                 self.writeAll(result)
279                 return
280
281             # Otherwise, this thread has to keep running to provide the
282             # data.
283             for data in result:
284                 if self.stopped:
285                     return
286                 self.write(data)
287
288             if not self.headersSent:
289                 if self.response is None:
290                     raise RuntimeError(
291                         "Application didn't call startResponse, and didn't send any data!")
292
293                 self.headersSent = True
294                 reactor.callFromThread(self.__callback)
295             else:
296                 reactor.callFromThread(self.stream.finish)
297
298         finally:
299             if hasattr(result,'close'):
300                 result.close()
301
302     def pauseProducing(self):
303         # Called in IO thread
304         self.unpaused.set()
305
306     def resumeProducing(self):
307         # Called in IO thread
308         self.unpaused.clear()
309
310     def stopProducing(self):
311         self.stopped = True
312
313 class FileWrapper(object):
314     """
315     Wrapper to convert file-like objects to iterables, to implement
316     the optional 'wsgi.file_wrapper' object.
317     """
318
319     def __init__(self, filelike, blksize=8192):
320         self.filelike = filelike
321         self.blksize = blksize
322         if hasattr(filelike,'close'):
323             self.close = filelike.close
324
325     def __iter__(self):
326         return self
327
328     def next(self):
329         data = self.filelike.read(self.blksize)
330         if data:
331             return data
332         raise StopIteration
333
334 __all__ = ['WSGIResource']
Note: See TracBrowser for help on using the browser.