Ticket #3062: fastcgi.py

File fastcgi.py, 12.7 KB (added by radix, 7 years ago)

Actually it's not a patch, just a .py file

Line 
1"""
2   Twisted.web2 FastCGI backend support.
3"""
4
5"""
6Okay, FastCGI is a pretty stupid protocol.
7Let me count some reasons:
8
91) Specifies ability to multiplex streams of data over a single
10socket, but has no form of flow control. This is fine for multiplexing
11stderr, but serving more than one request over a channel with no flow
12control is just *asking* for trouble. I avoid this and enforce one
13outstanding request per connection. This basically means the whole
14"requestId" field is worthless.
15
162) Has variable length packet padding. If you want padding, just make
17it always pad to 8 bytes fercrissake!
18
193) Why does every packet need to specify the version. How about just
20sending it once.
21
224) Name/value pair format. Come *on*. Is it *possible* to come up with
23a more complex format to send them with?? Even if you think you've
24gotten it down, you probably forgot that it's a stream, and the
25name/values can be split between two packets. (Yes, this means
26*you*. Don't even try to pretend you didn't miss this detail.)
27"""
28
29from twisted.internet import protocol, tcp, unix
30from twisted.python import log
31from twisted.web2 import responsecode
32from twisted.web2.channel import cgi
33
34# Values for type component of FCGI_Header
35
36FCGI_BEGIN_REQUEST       = 1
37FCGI_ABORT_REQUEST       = 2
38FCGI_END_REQUEST         = 3
39FCGI_PARAMS              = 4
40FCGI_STDIN               = 5
41FCGI_STDOUT              = 6
42FCGI_STDERR              = 7
43FCGI_DATA                = 8
44FCGI_GET_VALUES          = 9
45FCGI_GET_VALUES_RESULT   = 10
46FCGI_UNKNOWN_TYPE        = 11
47
48typeNames = {
49    FCGI_BEGIN_REQUEST    : 'fcgi_begin_request',
50    FCGI_ABORT_REQUEST    : 'fcgi_abort_request',
51    FCGI_END_REQUEST      : 'fcgi_end_request',
52    FCGI_PARAMS           : 'fcgi_params',
53    FCGI_STDIN            : 'fcgi_stdin',
54    FCGI_STDOUT           : 'fcgi_stdout',
55    FCGI_STDERR           : 'fcgi_stderr',
56    FCGI_DATA             : 'fcgi_data',
57    FCGI_GET_VALUES       : 'fcgi_get_values',
58    FCGI_GET_VALUES_RESULT: 'fcgi_get_values_result',
59    FCGI_UNKNOWN_TYPE     : 'fcgi_unknown_type'}
60
61# Mask for flags component of FCGI_BeginRequestBody
62FCGI_KEEP_CONN = 1
63
64# Values for role component of FCGI_BeginRequestBody
65FCGI_RESPONDER  = 1
66FCGI_AUTHORIZER = 2
67FCGI_FILTER     = 3
68
69# Values for protocolStatus component of FCGI_EndRequestBody
70
71FCGI_REQUEST_COMPLETE = 0
72FCGI_CANT_MPX_CONN    = 1
73FCGI_OVERLOADED       = 2
74FCGI_UNKNOWN_ROLE     = 3
75
76FCGI_LISTENSOCK_FILENO = 0
77
78FCGI_MAX_PACKET_LEN = 0xFFFF
79
80class Record(object):
81    def __init__(self, type, reqId, content, version=1):
82        self.version = version
83        self.type = type
84        self.reqId = reqId
85        self.content = content
86        self.contentLength = len(content)
87        if self.contentLength > FCGI_MAX_PACKET_LEN:
88            raise ValueError("Record length too long: %d > %d" %
89                             (self.contentLength, FCGI_MAX_PACKET_LEN))
90        self.paddingLength = 8 - (self.contentLength & 7)
91        self.totalLength = 8 + self.contentLength + self.paddingLength
92        self.reserved = 0
93
94    def fromHeaderString(clz, rec):
95        self = object.__new__(clz)
96        self.version = ord(rec[0])
97        self.type = ord(rec[1])
98        self.reqId = (ord(rec[2])<<8)|ord(rec[3])
99        self.contentLength = (ord(rec[4])<<8)|ord(rec[5])
100        self.paddingLength = ord(rec[6])
101        self.reserved = ord(rec[7])
102        self.content = None
103        self.totalLength = 8 + self.contentLength + self.paddingLength
104        return self
105   
106    fromHeaderString = classmethod(fromHeaderString)
107
108    def toOutputString(self):
109        return ("%c%c%c%c%c%c%c%c" % (self.version, self.type,
110                                      (self.reqId&0xFF00)>>8, self.reqId&0xFF,
111                                      (self.contentLength&0xFF00)>>8,
112                                      self.contentLength & 0xFF,
113                                      self.paddingLength, self.reserved)
114               + self.content + '\0'*self.paddingLength)
115       
116    def __repr__(self):
117        return "<FastCGIRecord version=%d type=%d(%s) reqId=%d content=%r>" % (
118            self.version, self.type, typeNames.get(self.type), self.reqId, self.content)
119   
120def parseNameValues(s):
121    off = 0
122    while off < len(s):
123        nameLen = ord(s[off])
124        off += 1
125        if nameLen&0x80:
126            nameLen=(nameLen&0x7F)<<24 | ord(s[off])<<16 | ord(s[off+1])<<8 | ord(s[off+2])
127            off += 3
128        valueLen=ord(s[off])
129        off += 1
130        if valueLen&0x80:
131            valueLen=(nameLen&0x7F)<<24 | ord(s[off])<<16 | ord(s[off+1])<<8 | ord(s[off+2])
132            off += 3
133        yield (s[off:off+nameLen], s[off+nameLen:off+nameLen+valueLen])
134        off += nameLen + valueLen
135
136def getLenBytes(length):
137    if length<0x80:
138        return chr(length)
139    elif 0 < length <= 0x7FFFFFFF:
140        return (chr(0x80|(length>>24)&0x7F) + chr((length>>16)&0xFF) + 
141                chr((length>>8)&0xFF) + chr(length&0xFF))
142    else:
143        raise ValueError("Name length too long.")
144
145def writeNameValue(name, value):
146    return getLenBytes(len(name)) + getLenBytes(len(value)) + name + value
147
148class FastCGIChannelRequest(cgi.BaseCGIChannelRequest):
149
150    def __init__(self, requestFactory, reqId, keepalive):
151        self.requestFactory = requestFactory
152        self.reqId = reqId
153        self.keepalive = keepalive
154        self.params = ""
155
156    def writeHeaders(self, code, headers):
157        l = []
158        code_message = responsecode.RESPONSES.get(code, "Unknown Status")
159        l.append("Status: %s %s\n" % (code, code_message))
160        if headers is not None:
161            for name, valuelist in headers.getAllRawHeaders():
162                for value in valuelist:
163                    l.append("%s: %s\n" % (name, value))
164        l.append('\n')
165        self.transport.write(''.join(l))
166
167class FastCGIRequestTransport:
168
169    def __init__(self, protocol, reqId):
170        self.protocol = protocol
171        self.reqId = reqId
172
173    def write(self, data):
174        self.protocol.writeRequest(self.reqId, data)
175
176    def loseConnection(self):
177        self.protocol.finishRequest(self.reqId, FCGI_REQUEST_COMPLETE)
178
179    def registerProducer(self, producer, streaming):
180        producer.resumeProducing()
181   
182    def unregisterProducer(self):
183        pass
184
185class FastCGIProtocol(protocol.Protocol):
186
187    maxConnections = 100
188    maxRequests = 100
189
190    chanRequestFactory = FastCGIChannelRequest
191    transportFactory = FastCGIRequestTransport
192
193    producerPaused = False
194    pendingRecord = None
195    dataBuffer = ""
196
197    multiplexed = False
198
199    def __init__(self):
200        self._chanRequests = {}
201
202    # Packet handling
203
204    def packetReceived(self, packet):
205        #print "Got packet", packet
206        if packet.version != 1:
207            protocolError("FastCGI packet received with version != 1")
208       
209        func = getattr(self, typeNames.get(packet.type), None)
210        if func is None:
211            self.writePacket(Record(FCGI_UNKNOWN_TYPE, packet.reqId,
212                                    chr(packet.type)+"\0\0\0\0\0\0\0"))
213        else:
214            func(packet)
215
216    def fcgi_get_values(self, packet):
217        if packet.reqId != 0:
218            raise ValueError("Packet reqId should be 0!")
219       
220        content = ""
221        for name,value in parseNameValues(packet.content):
222            outval = None
223            if name == "FCGI_MAX_CONNS":
224                outval = str(self.maxConnections)
225            elif name == "FCGI_MAX_REQS":
226                outval = str(self.maxRequests)
227            elif name == "FCGI_MPXS_CONNS":
228                outval = self.multiplex and "1" or "0"
229            if outval:
230                content += writeNameValue(name, outval)
231        self.writePacket(Record(FCGI_GET_VALUES_RESULT, 0, content))
232
233    def fcgi_begin_request(self, packet):
234        role = ord(packet.content[0])<<8 | ord(packet.content[1])
235        flags = ord(packet.content[2])
236        if packet.reqId == 0:
237            raise ValueError("ReqId shouldn't be 0!")
238        if role != FCGI_RESPONDER:
239            self.finishRequest(packet.reqId, FCGI_UNKNOWN_ROLE)
240        else:
241            chanRequest = self.chanRequestFactory(self.requestFactory,
242                                                  packet.reqId,
243                                                  flags & FCGI_KEEP_CONN)
244            chanRequest.makeConnection(self.transportFactory(self,
245                                                             packet.reqId))
246            self._chanRequests[packet.reqId] = chanRequest
247
248    def fcgi_abort_request(self, packet):
249        chanRequest = self._chanRequests.get(packet.reqId)
250        if not chanRequest:
251            return
252        chanRequest.abortConnection()
253        del self._chanRequests[packet.reqId]
254
255    def fcgi_params(self, packet):
256        chanRequest = self._chanRequests.get(packet.reqId)
257        if not chanRequest:
258            return
259        if packet.content:
260            chanRequest.params += packet.content
261        else:
262            chanRequest.makeRequest(dict(parseNameValues(chanRequest.params)))
263            chanRequest.request.process()
264       
265    def fcgi_stdin(self, packet):
266        chanRequest = self._chanRequests.get(packet.reqId)
267        if not chanRequest:
268            return
269        if packet.content:
270            chanRequest.request.handleContentChunk(packet.content)
271        else:
272            chanRequest.request.handleContentComplete()
273       
274    def fcgi_data(self, packet):
275        # For filter roles only, which is currently unsupported.
276        pass
277
278    # Methods for FastCGIRequestTransport
279
280    def writeRequest(self, reqId, data):
281        if len(data) <= FCGI_MAX_PACKET_LEN:
282            self.writePacket(Record(FCGI_STDOUT, reqId, data))
283        else:
284            while data:
285                self.writePacket(Record(FCGI_STDOUT, reqId,
286                                        data[:FCGI_MAX_PACKET_LEN]))
287                data = data[FCGI_MAX_PACKET_LEN:]
288
289    def finishRequest(self, reqId, status):
290        self.writePacket(Record(FCGI_END_REQUEST, reqId,
291                                "\0\0\0\0"+chr(status)+"\0\0\0"))
292        if not self._chanRequests[reqId].keepalive:
293            self.transport.loseConnection()
294        del self._chanRequests[reqId]
295
296    # Raw data handling
297
298    def writePacket(self, packet):
299        #print "Writing record", packet
300        self.transport.write(packet.toOutputString())
301       
302    def dataReceived(self, data):
303        self.dataBuffer = self.dataBuffer + data
304        record = self.pendingRecord
305        while len(self.dataBuffer) >= 8 and not self.producerPaused:
306            if not record:
307                record = Record.fromHeaderString(self.dataBuffer[:8])
308            if len(self.dataBuffer) < record.totalLength:
309                break
310            record.content = self.dataBuffer[8:record.contentLength+8]
311            self.dataBuffer = self.dataBuffer[record.totalLength:]
312            self.packetReceived(record)
313            record = None
314        self.pendingRecord = record
315
316    # Producer interface
317
318    def pauseProducing(self):
319        self.producerPaused = True
320        self.transport.pauseProducing()
321
322    def resumeProducing(self):
323        self.producerPaused = False
324        self.transport.resumeProducing()
325        self.dataReceived('')
326
327    def stopProducing(self):
328        self.producerPaused = True
329        self.transport.stopProducing()
330
331
332class FastCGIFactory(protocol.ServerFactory):
333
334    protocol = FastCGIProtocol
335
336    def __init__(self, requestFactory):
337        self.requestFactory = requestFactory
338
339    def buildProtocol(self, addr):
340        p = protocol.ServerFactory.buildProtocol(self, addr)
341        p.requestFactory = self.requestFactory
342        return p
343
344
345class FDPortMixIn(object):
346
347    def createInternetSocket(self):
348        import socket
349        import fcntl
350        s = socket.fromfd(self.port, self.addressFamily, self.socketType)
351        s.setblocking(0)
352        if fcntl and hasattr(fcntl, 'FD_CLOEXEC'):
353            old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD)
354            fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
355        return s
356
357    def startListening(self):
358        self.socket = self.createInternetSocket()
359        self.factory.doStart()
360        self.connected = 1
361        self.numberAccepts = 100
362        self.fileno = self.socket.fileno
363        self.startReading()
364
365   
366
367class TCPFDPort(FDPortMixIn, tcp.Port):
368    pass
369
370class UNIXFDPort(FDPortMixIn, unix.Port):
371    def connectionLost(self, reason):
372        # No unlinking here.
373        tcp.Port.connectionLost(self, reason)
374
375def startFastCGI(site):
376    from twisted.internet import reactor
377    import socket
378
379    sock = socket.fromfd(FCGI_LISTENSOCK_FILENO,
380                         socket.AF_INET, socket.SOCK_STREAM)
381    if type(sock.getsockname()) is str:
382        portFactory = UNIXFDPort
383    else:
384        portFactory = TCPFDPort
385
386    reactor.listenWith(portFactory, FCGI_LISTENSOCK_FILENO,
387                       FastCGIFactory(site))
388    reactor.run()
389
390