root/trunk/twisted/web2/wsgi.py

Revision 22598, 10.9 KB (checked in by exarkun, 3 years ago)

Merge blockingcallfromthread-arg-3030

Author: exarkun
Reviewer: therve
Fixes #3030

Change the signature of twisted.internet.threads.blockingCallFromThread to take
the reactor into which it will call as the first parameter. Update the docs and
the usage in twisted.web2 as well.

Line 
1# -*- test-case-name: twisted.web2.test.test_wsgi -*-
2# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6An implementation of PEP 333: Python Web Server Gateway Interface (WSGI).
7"""
8
9import os, threading
10from zope.interface import implements
11
12from twisted.internet import defer, threads
13from twisted.internet import reactor
14from twisted.python import log, failure
15from twisted.web2 import http
16from twisted.web2 import iweb
17from twisted.web2 import server
18from twisted.web2 import stream
19from twisted.web2.twcgi import createCGIEnvironment
20
21
22class AlreadyStartedResponse(Exception):
23    pass
24
25
26# This isn't a subclass of resource.Resource, because it shouldn't do
27# any method-specific actions at all. All that stuff is totally up to
28# the contained wsgi application
29class WSGIResource(object):
30    """
31    A web2 Resource which wraps the given WSGI application callable.
32
33    The WSGI application will be called in a separate thread (using
34    the reactor threadpool) whenever a request for this resource or
35    any lower part of the url hierarchy is received.
36    """
37    implements(iweb.IResource)
38
39    def __init__(self, application):
40        self.application = application
41
42    def renderHTTP(self, req):
43        # Do stuff with WSGIHandler.
44        handler = WSGIHandler(self.application, req)
45        # Get deferred
46        d = handler.responseDeferred
47        # Run it in a thread
48        reactor.callInThread(handler.run)
49        return d
50
51    def locateChild(self, request, segments):
52        return self, server.StopTraversal
53
54
55class InputStream(object):
56    """
57    This class implements the 'wsgi.input' object. The methods are
58    expected to have the same behavior as the same-named methods for
59    python's builtin file object.
60    """
61
62    def __init__(self, newstream):
63        # Called in IO thread
64        self.stream = stream.BufferedStream(newstream)
65
66    def read(self, size=None):
67        """
68        Read at most size bytes from the input, or less if EOF is
69        encountered. If size is ommitted or negative, read until EOF.
70        """
71        # Called in application thread
72        if size < 0:
73            size = None
74        return threads.blockingCallFromThread(
75            reactor, self.stream.readExactly, size)
76
77    def readline(self, size=None):
78        """
79        Read a line, delimited by a newline. If the stream reaches EOF
80        or size bytes have been read before reaching a newline (if
81        size is given), the partial line is returned.
82
83        COMPATIBILITY NOTE: the size argument is excluded from the
84        WSGI specification, but is provided here anyhow, because
85        useful libraries such as python stdlib's cgi.py assume their
86        input file-like-object supports readline with a size
87        argument. If you use it, be aware your application may not be
88        portable to other conformant WSGI servers.
89        """
90        # Called in application thread
91        if size < 0:
92            # E.g. -1, which is the default readline size for *some*
93            # other file-like-objects...
94            size = None
95
96        return threads.blockingCallFromThread(
97            reactor, self.stream.readline, '\n', size=size)
98
99    def readlines(self, hint=None):
100        """
101        Read until EOF, collecting all lines in a list, and returns
102        that list. The hint argument is ignored (as is allowed in the
103        API specification)
104        """
105        # Called in application thread
106        data = self.read()
107        lines = data.split('\n')
108        last = lines.pop()
109        lines = [s+'\n' for s in lines]
110        if last != '':
111            lines.append(last)
112        return lines
113
114    def __iter__(self):
115        """
116        Returns an iterator, each iteration of which returns the
117        result of readline(), and stops when readline() returns an
118        empty string.
119        """
120        while 1:
121            line = self.readline()
122            if not line:
123                return
124            yield line
125
126
127class ErrorStream(object):
128    """
129    This class implements the 'wsgi.error' object.
130    """
131    def flush(self):
132        # Called in application thread
133        return
134
135    def write(self, s):
136        # Called in application thread
137        log.msg("WSGI app error: "+s, isError=True)
138
139    def writelines(self, seq):
140        # Called in application thread
141        s = ''.join(seq)
142        log.msg("WSGI app error: "+s, isError=True)
143
144class WSGIHandler(object):
145    headersSent = False
146    stopped = False
147    stream = None
148
149    def __init__(self, application, request):
150        # Called in IO thread
151        self.setupEnvironment(request)
152        self.application = application
153        self.request = request
154        self.response = None
155        self.responseDeferred = defer.Deferred()
156
157    def setupEnvironment(self, request):
158        # Called in IO thread
159        env = createCGIEnvironment(request)
160        env['wsgi.version']      = (1, 0)
161        env['wsgi.url_scheme']   = env['REQUEST_SCHEME']
162        env['wsgi.input']        = InputStream(request.stream)
163        env['wsgi.errors']       = ErrorStream()
164        env['wsgi.multithread']  = True
165        env['wsgi.multiprocess'] = False
166        env['wsgi.run_once']     = False
167        env['wsgi.file_wrapper'] = FileWrapper
168        self.environment = env
169
170    def startWSGIResponse(self, status, response_headers, exc_info=None):
171        # Called in application thread
172        if exc_info is not None:
173            try:
174                if self.headersSent:
175                    raise exc_info[0], exc_info[1], exc_info[2]
176            finally:
177                exc_info = None
178        elif self.response is not None:
179            raise AlreadyStartedResponse, 'startWSGIResponse(%r)' % status
180        status = int(status.split(' ')[0])
181        self.response = http.Response(status)
182        for key, value in response_headers:
183            self.response.headers.addRawHeader(key, value)
184        return self.write
185
186
187    def run(self):
188        # Called in application thread
189        try:
190            result = self.application(self.environment, self.startWSGIResponse)
191            self.handleResult(result)
192        except:
193            if not self.headersSent:
194                reactor.callFromThread(self.__error, failure.Failure())
195            else:
196                reactor.callFromThread(self.stream.finish, failure.Failure())
197
198    def __callback(self):
199        # Called in IO thread
200        self.responseDeferred.callback(self.response)
201        self.responseDeferred = None
202
203    def __error(self, f):
204        # Called in IO thread
205        self.responseDeferred.errback(f)
206        self.responseDeferred = None
207
208    def write(self, output):
209        # Called in application thread
210        if self.response is None:
211            raise RuntimeError(
212                "Application didn't call startResponse before writing data!")
213        if not self.headersSent:
214            self.stream=self.response.stream=stream.ProducerStream()
215            self.headersSent = True
216
217            # threadsafe event object to communicate paused state.
218            self.unpaused = threading.Event()
219
220            # After this, we cannot touch self.response from this
221            # thread any more
222            def _start():
223                # Called in IO thread
224                self.stream.registerProducer(self, True)
225                self.__callback()
226                # Notify application thread to start writing
227                self.unpaused.set()
228            reactor.callFromThread(_start)
229        # Wait for unpaused to be true
230        self.unpaused.wait()
231        reactor.callFromThread(self.stream.write, output)
232
233    def writeAll(self, result):
234        # Called in application thread
235        if not self.headersSent:
236            if self.response is None:
237                raise RuntimeError(
238                    "Application didn't call startResponse before writing data!")
239            l = 0
240            for item in result:
241                l += len(item)
242            self.response.stream=stream.ProducerStream(length=l)
243            self.response.stream.buffer = list(result)
244            self.response.stream.finish()
245            reactor.callFromThread(self.__callback)
246        else:
247            # Has already been started, cannot replace the stream
248            def _write():
249                # Called in IO thread
250                for s in result:
251                    self.stream.write(s)
252                self.stream.finish()
253            reactor.callFromThread(_write)
254
255
256    def handleResult(self, result):
257        # Called in application thread
258        try:
259            if (isinstance(result, FileWrapper) and
260                   hasattr(result.filelike, 'fileno') and
261                   not self.headersSent):
262                if self.response is None:
263                    raise RuntimeError(
264                        "Application didn't call startResponse before writing data!")
265                self.headersSent = True
266                # Make FileStream and output it. We make a new file
267                # object from the fd, just in case the original one
268                # isn't an actual file object.
269                self.response.stream = stream.FileStream(
270                    os.fdopen(os.dup(result.filelike.fileno())))
271                reactor.callFromThread(self.__callback)
272                return
273
274            if type(result) in (list,tuple):
275                # If it's a list or tuple (exactly, not subtype!),
276                # then send the entire thing down to Twisted at once,
277                # and free up this thread to do other work.
278                self.writeAll(result)
279                return
280
281            # Otherwise, this thread has to keep running to provide the
282            # data.
283            for data in result:
284                if self.stopped:
285                    return
286                self.write(data)
287
288            if not self.headersSent:
289                if self.response is None:
290                    raise RuntimeError(
291                        "Application didn't call startResponse, and didn't send any data!")
292
293                self.headersSent = True
294                reactor.callFromThread(self.__callback)
295            else:
296                reactor.callFromThread(self.stream.finish)
297
298        finally:
299            if hasattr(result,'close'):
300                result.close()
301
302    def pauseProducing(self):
303        # Called in IO thread
304        self.unpaused.set()
305
306    def resumeProducing(self):
307        # Called in IO thread
308        self.unpaused.clear()
309
310    def stopProducing(self):
311        self.stopped = True
312
313class FileWrapper(object):
314    """
315    Wrapper to convert file-like objects to iterables, to implement
316    the optional 'wsgi.file_wrapper' object.
317    """
318
319    def __init__(self, filelike, blksize=8192):
320        self.filelike = filelike
321        self.blksize = blksize
322        if hasattr(filelike,'close'):
323            self.close = filelike.close
324
325    def __iter__(self):
326        return self
327
328    def next(self):
329        data = self.filelike.read(self.blksize)
330        if data:
331            return data
332        raise StopIteration
333
334__all__ = ['WSGIResource']
Note: See TracBrowser for help on using the browser.