root / trunk / twisted / web2 / stream.py

Revision 18460, 33.9 kB (checked in by foom, 3 years ago)

Fix wsgi.input's readline, readlines, and iter methods. In detail:

  • Remove BufferedStream?.readline maxLength argument.
  • Add size argument to wsgi.input's readline(). This is not part of the WSGI spec, but code
    such as python stdlib cgi.py require it anyways. (#1451)
  • Add BufferedStream?.readline size argument, to enable above.
  • Fix wsgi.readlines() and wsgi.readline() to not add extraneous delimiters. (#2170)
  • Make BufferedStream?.readline return delimiter as part of the result, to enable above.
  • Add an iter method to wsgi.input, which is required by the spec. (#2166)
  • Change fileupload's use of BufferedStream?.readline to conform with above changes.

Also, add some docstrings.

Author: mkerrin, jknight
Reviewer: exarkun
Merges branch: readline-1451
Fixes #1451
Fixes #2166
Fixes #2170

Line 
1 # -*- test-case-name: twisted.web2.test.test_stream -*-
2
3 """
4 The stream module provides a simple abstraction of streaming
5 data. While Twisted already has some provisions for handling this in
6 its Producer/Consumer model, the rather complex interactions between
7 producer and consumer makes it difficult to implement something like
8 the CompoundStream object. Thus, this API.
9
10 The IStream interface is very simple. It consists of two methods:
11 read, and close. The read method should either return some data, None
12 if there is no data left to read, or a Deferred. Close frees up any
13 underlying resources and causes read to return None forevermore.
14
15 IByteStream adds a bit more to the API:
16 1) read is required to return objects conforming to the buffer interface. 
17 2) .length, which may either an integer number of bytes remaining, or
18 None if unknown
19 3) .split(position). Split takes a position, and splits the
20 stream in two pieces, returning the two new streams. Using the
21 original stream after calling split is not allowed.
22
23 There are two builtin source stream classes: FileStream and
24 MemoryStream. The first produces data from a file object, the second
25 from a buffer in memory. Any number of these can be combined into one
26 stream with the CompoundStream object. Then, to interface with other
27 parts of Twisted, there are two transcievers: StreamProducer and
28 ProducerStream. The first takes a stream and turns it into an
29 IPushProducer, which will write to a consumer. The second is a
30 consumer which is a stream, so that other producers can write to it.
31 """
32
33 from __future__ import generators
34
35 import copy, os, types, sys
36 from zope.interface import Interface, Attribute, implements
37 from twisted.internet.defer import Deferred
38 from twisted.internet import interfaces as ti_interfaces, defer, reactor, protocol, error as ti_error
39 from twisted.python import components, log
40 from twisted.python.failure import Failure
41
42 # Python 2.4.2 (only) has a broken mmap that leaks a fd every time you call it.
43 if sys.version_info[0:3] != (2,4,2):
44     try:
45         import mmap
46     except ImportError:
47         mmap = None
48 else:
49     mmap = None
50    
51 ##############################
52 ####      Interfaces      ####
53 ##############################
54
55 class IStream(Interface):
56     """A stream of arbitrary data."""
57    
58     def read():
59         """Read some data.
60
61         Returns some object representing the data.
62         If there is no more data available, returns None.
63         Can also return a Deferred resulting in one of the above.
64
65         Errors may be indicated by exception or by a Deferred of a Failure.
66         """
67        
68     def close():
69         """Prematurely close. Should also cause further reads to
70         return None."""
71
72 class IByteStream(IStream):
73     """A stream which is of bytes."""
74    
75     length = Attribute("""How much data is in this stream. Can be None if unknown.""")
76    
77     def read():
78         """Read some data.
79         
80         Returns an object conforming to the buffer interface, or
81         if there is no more data available, returns None.
82         Can also return a Deferred resulting in one of the above.
83
84         Errors may be indicated by exception or by a Deferred of a Failure.
85         """
86     def split(point):
87         """Split this stream into two, at byte position 'point'.
88
89         Returns a tuple of (before, after). After calling split, no other
90         methods should be called on this stream. Doing so will have undefined
91         behavior.
92
93         If you cannot implement split easily, you may implement it as::
94
95             return fallbackSplit(self, point)
96         """
97
98     def close():
99         """Prematurely close this stream. Should also cause further reads to
100         return None. Additionally, .length should be set to 0.
101         """
102
103 class ISendfileableStream(Interface):
104     def read(sendfile=False):
105         """
106         Read some data.
107         If sendfile == False, returns an object conforming to the buffer
108         interface, or else a Deferred.
109
110         If sendfile == True, returns either the above, or a SendfileBuffer.
111         """
112        
113 class SimpleStream(object):
114     """Superclass of simple streams with a single buffer and a offset and length
115     into that buffer."""
116     implements(IByteStream)
117    
118     length = None
119     start = None
120    
121     def read(self):
122         return None
123
124     def close(self):
125         self.length = 0
126    
127     def split(self, point):
128         if self.length is not None:
129             if point > self.length:
130                 raise ValueError("split point (%d) > length (%d)" % (point, self.length))
131         b = copy.copy(self)
132         self.length = point
133         if b.length is not None:
134             b.length -= point
135         b.start += point
136         return (self, b)
137
138 ##############################
139 ####      FileStream      ####
140 ##############################
141     
142 # maximum mmap size
143 MMAP_LIMIT = 4*1024*1024
144 # minimum mmap size
145 MMAP_THRESHOLD = 8*1024
146
147 # maximum sendfile length
148 SENDFILE_LIMIT = 16777216
149 # minimum sendfile size
150 SENDFILE_THRESHOLD = 256
151
152 def mmapwrapper(*args, **kwargs):
153     """
154     Python's mmap call sucks and ommitted the "offset" argument for no
155     discernable reason. Replace this with a mmap module that has offset.
156     """
157    
158     offset = kwargs.get('offset', None)
159     if offset in [None, 0]:
160         if 'offset' in kwargs:
161             del kwargs['offset']
162     else:
163         raise mmap.error("mmap: Python sucks and does not support offset.")
164     return mmap.mmap(*args, **kwargs)
165
166 class FileStream(SimpleStream):
167     implements(ISendfileableStream)
168     """A stream that reads data from a file. File must be a normal
169     file that supports seek, (e.g. not a pipe or device or socket)."""
170     # 65K, minus some slack
171     CHUNK_SIZE = 2 ** 2 ** 2 ** 2 - 32
172
173     f = None
174     def __init__(self, f, start=0, length=None, useMMap=bool(mmap)):
175         """
176         Create the stream from file f. If you specify start and length,
177         use only that portion of the file.
178         """
179         self.f = f
180         self.start = start
181         if length is None:
182             self.length = os.fstat(f.fileno()).st_size
183         else:
184             self.length = length
185         self.useMMap = useMMap
186        
187     def read(self, sendfile=False):
188         if self.f is None:
189             return None
190
191         length = self.length
192         if length == 0:
193             self.f = None
194             return None
195
196         if sendfile and length > SENDFILE_THRESHOLD:
197             # XXX: Yay using non-existent sendfile support!
198             # FIXME: if we return a SendfileBuffer, and then sendfile
199             #        fails, then what? Or, what if file is too short?
200             readSize = min(length, SENDFILE_LIMIT)
201             res = SendfileBuffer(self.f, self.start, readSize)
202             self.length -= readSize
203             self.start += readSize
204             return res
205
206         if self.useMMap and length > MMAP_THRESHOLD:
207             readSize = min(length, MMAP_LIMIT)
208             try:
209                 res = mmapwrapper(self.f.fileno(), readSize,
210                                   access=mmap.ACCESS_READ, offset=self.start)
211                 #madvise(res, MADV_SEQUENTIAL)
212                 self.length -= readSize
213                 self.start += readSize
214                 return res
215             except mmap.error:
216                 pass
217
218         # Fall back to standard read.
219         readSize = min(length, self.CHUNK_SIZE)
220
221         self.f.seek(self.start)
222         b = self.f.read(readSize)
223         bytesRead = len(b)
224         if not bytesRead:
225             raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
226         else:
227             self.length -= bytesRead
228             self.start += bytesRead
229             return b
230
231     def close(self):
232         self.f = None
233         SimpleStream.close(self)
234
235 components.registerAdapter(FileStream, file, IByteStream)
236
237 ##############################
238 ####     MemoryStream     ####
239 ##############################
240
241 class MemoryStream(SimpleStream):
242     """A stream that reads data from a buffer object."""
243     def __init__(self, mem, start=0, length=None):
244         """
245         Create the stream from buffer object mem. If you specify start and length,
246         use only that portion of the buffer.
247         """
248         self.mem = mem
249         self.start = start
250         if length is None:
251             self.length = len(mem) - start
252         else:
253             if len(mem) < length:
254                 raise ValueError("len(mem) < start + length")
255             self.length = length
256
257     def read(self):
258         if self.mem is None:
259             return None
260         if self.length == 0:
261             result = None
262         else:
263             result = buffer(self.mem, self.start, self.length)
264         self.mem = None
265         self.length = 0
266         return result
267
268     def close(self):
269         self.mem = None
270         SimpleStream.close(self)
271
272 components.registerAdapter(MemoryStream, str, IByteStream)
273 components.registerAdapter(MemoryStream, types.BufferType, IByteStream)
274
275 ##############################
276 ####    CompoundStream    ####
277 ##############################
278
279 class CompoundStream(object):
280     """A stream which is composed of many other streams.
281
282     Call addStream to add substreams.
283     """
284    
285     implements(IByteStream, ISendfileableStream)
286     deferred = None
287     length = 0
288    
289     def __init__(self, buckets=()):
290         self.buckets = [IByteStream(s) for s in buckets]
291        
292     def addStream(self, bucket):
293         """Add a stream to the output"""
294         bucket = IByteStream(bucket)
295         self.buckets.append(bucket)
296         if self.length is not None:
297             if bucket.length is None:
298                 self.length = None
299             else:
300                 self.length += bucket.length
301
302     def read(self, sendfile=False):
303         if self.deferred is not None:
304             raise RuntimeError("Call to read while read is already outstanding")
305
306         if not self.buckets:
307             return None
308        
309         if sendfile and ISendfileableStream.providedBy(self.buckets[0]):
310             try:
311                 result = self.buckets[0].read(sendfile)
312             except:
313                 return self._gotFailure(Failure())
314         else:
315             try:
316                 result = self.buckets[0].read()
317             except:
318                 return self._gotFailure(Failure())
319        
320         if isinstance(result, Deferred):
321             self.deferred = result
322             result.addCallbacks(self._gotRead, self._gotFailure, (sendfile,))
323             return result
324        
325         return self._gotRead(result, sendfile)
326
327     def _gotFailure(self, f):
328         self.deferred = None
329         del self.buckets[0]
330         self.close()
331         return f
332    
333     def _gotRead(self, result, sendfile):
334         self.deferred = None
335         if result is None:
336             del self.buckets[0]
337             # Next bucket
338             return self.read(sendfile)
339        
340         if self.length is not None:
341             self.length -= len(result)
342         return result
343    
344     def split(self, point):
345         num = 0
346         origPoint = point
347         for bucket in self.buckets:
348             num+=1
349
350             if point == 0:
351                 b = CompoundStream()
352                 b.buckets = self.buckets[num:]
353                 del self.buckets[num:]
354                 return self,b
355            
356             if bucket.length is None:
357                 # Indeterminate length bucket.
358                 # give up and use fallback splitter.
359                 return fallbackSplit(self, origPoint)
360            
361             if point < bucket.length:
362                 before,after = bucket.split(point)
363                 b = CompoundStream()
364                 b.buckets = self.buckets[num:]
365                 b.buckets[0] = after
366                
367                 del self.buckets[num+1:]
368                 self.buckets[num] = before
369                 return self,b
370            
371             point -= bucket.length
372    
373     def close(self):
374         for bucket in self.buckets:
375             bucket.close()
376         self.buckets = []
377         self.length = 0
378
379
380 ##############################
381 ####      readStream      ####
382 ##############################
383
384 class _StreamReader(object):
385     """Process a stream's data using callbacks for data and stream finish."""
386
387     def __init__(self, stream, gotDataCallback):
388         self.stream = stream
389         self.gotDataCallback = gotDataCallback
390         self.result = Deferred()
391
392     def run(self):
393         # self.result may be del'd in _read()
394         result = self.result
395         self._read()
396         return result
397    
398     def _read(self):
399         try:
400             result = self.stream.read()
401         except:
402             self._gotError(Failure())
403             return
404         if isinstance(result, Deferred):
405             result.addCallbacks(self._gotData, self._gotError)
406         else:
407             self._gotData(result)
408
409     def _gotError(self, failure):
410         result = self.result
411         del self.result, self.gotDataCallback, self.stream
412         result.errback(failure)
413    
414     def _gotData(self, data):
415         if data is None:
416             result = self.result
417             del self.result, self.gotDataCallback, self.stream
418             result.callback(None)
419             return
420         try:
421             self.gotDataCallback(data)
422         except:
423             self._gotError(Failure())
424             return
425         reactor.callLater(0, self._read)
426
427 def readStream(stream, gotDataCallback):
428     """Pass a stream's data to a callback.
429
430     Returns Deferred which will be triggered on finish.  Errors in
431     reading the stream or in processing it will be returned via this
432     Deferred.
433     """
434     return _StreamReader(stream, gotDataCallback).run()
435
436
437 def readAndDiscard(stream):
438     """Read all the data from the given stream, and throw it out.
439
440     Returns Deferred which will be triggered on finish.
441     """
442     return readStream(stream, lambda _: None)
443
444 def readIntoFile(stream, outFile):
445     """Read a stream and write it into a file.
446
447     Returns Deferred which will be triggered on finish.
448     """
449     def done(_):
450         outFile.close()
451         return _
452     return readStream(stream, outFile.write).addBoth(done)
453
454 def connectStream(inputStream, factory):
455     """Connect a protocol constructed from a factory to stream.
456
457     Returns an output stream from the protocol.
458
459     The protocol's transport will have a finish() method it should
460     call when done writing.
461     """
462     # XXX deal better with addresses
463     p = factory.buildProtocol(None)
464     out = ProducerStream()
465     out.disconnecting = False # XXX for LineReceiver suckage
466     p.makeConnection(out)
467     readStream(inputStream, lambda _: p.dataReceived(_)).addCallbacks(
468         lambda _: p.connectionLost(ti_error.ConnectionDone()), lambda _: p.connectionLost(_))
469     return out
470
471 ##############################
472 ####     fallbackSplit    ####
473 ##############################
474
475 def fallbackSplit(stream, point):
476     after = PostTruncaterStream(stream, point)
477     before = TruncaterStream(stream, point, after)
478     return (before, after)
479
480 class TruncaterStream(object):
481     def __init__(self, stream, point, postTruncater):
482         self.stream = stream
483         self.length = point
484         self.postTruncater = postTruncater
485        
486     def read(self):
487         if self.length == 0:
488             if self.postTruncater is not None:
489                 postTruncater = self.postTruncater
490                 self.postTruncater = None
491                 postTruncater.sendInitialSegment(self.stream.read())
492             self.stream = None
493             return None
494        
495         result = self.stream.read()
496         if isinstance(result, Deferred):
497             return result.addCallback(self._gotRead)
498         else:
499             return self._gotRead(result)
500        
501     def _gotRead(self, data):
502         if data is None:
503             raise ValueError("Ran out of data for a split of a indeterminate length source")
504         if self.length >= len(data):
505             self.length -= len(data)
506             return data
507         else:
508             before = buffer(data, 0, self.length)
509             after = buffer(data, self.length)
510             self.length = 0
511             if self.postTruncater is not None:
512                 postTruncater = self.postTruncater
513                 self.postTruncater = None
514                 postTruncater.sendInitialSegment(after)
515                 self.stream = None
516             return before
517    
518     def split(self, point):
519         if point > self.length:
520             raise ValueError("split point (%d) > length (%d)" % (point, self.length))
521
522         post = PostTruncaterStream(self.stream, point)
523         trunc = TruncaterStream(post, self.length - point, self.postTruncater)
524         self.length = point
525         self.postTruncater = post
526         return self, trunc
527    
528     def close(self):
529         if self.postTruncater is not None:
530             self.postTruncater.notifyClosed(self)
531         else:
532             # Nothing cares about the rest of the stream
533             self.stream.close()
534             self.stream = None
535             self.length = 0
536            
537
538 class PostTruncaterStream(object):
539     deferred = None
540     sentInitialSegment = False
541     truncaterClosed = None
542     closed = False
543    
544     length = None
545     def __init__(self, stream, point):
546         self.stream = stream
547         self.deferred = Deferred()
548         if stream.length is not None:
549             self.length = stream.length - point
550
551     def read(self):
552         if not self.sentInitialSegment:
553             self.sentInitialSegment = True
554             if self.truncaterClosed is not None:
555                 readAndDiscard(self.truncaterClosed)
556                 self.truncaterClosed = None
557             return self.deferred
558        
559         return self.stream.read()
560    
561     def split(self, point):
562         return fallbackSplit(self, point)
563        
564     def close(self):
565         self.closed = True
566         if self.truncaterClosed is not None:
567             # have first half close itself
568             self.truncaterClosed.postTruncater = None
569             self.truncaterClosed.close()
570         elif self.sentInitialSegment:
571             # first half already finished up
572             self.stream.close()
573            
574         self.deferred = None
575    
576     # Callbacks from TruncaterStream
577     def sendInitialSegment(self, data):
578         if self.closed:
579             # First half finished, we don't want data.
580             self.stream.close()
581             self.stream = None
582         if self.deferred is not None:
583             if isinstance(data, Deferred):
584                 data.chainDeferred(self.deferred)
585             else:
586                 self.deferred.callback(data)
587        
588     def notifyClosed(self, truncater):
589         if self.closed:
590             # we are closed, have first half really close
591             truncater.postTruncater = None
592             truncater.close()
593         elif self.sentInitialSegment:
594             # We are trying to read, read up first half
595             readAndDiscard(truncater)
596         else:
597             # Idle, store closed info.
598             self.truncaterClosed = truncater
599
600 ########################################
601 #### ProducerStream/StreamProducer  ####
602 ########################################
603             
604 class ProducerStream(object):
605     """Turns producers into a IByteStream.
606     Thus, implements IConsumer and IByteStream."""
607
608     implements(IByteStream, ti_interfaces.IConsumer)
609     length = None
610     closed = False
611     failed = False
612     producer = None
613     producerPaused = False
614     deferred = None
615    
616     bufferSize = 5
617    
618     def __init__(self, length=None):
619         self.buffer = []
620         self.length = length
621        
622     # IByteStream implementation
623     def read(self):
624         if self.buffer:
625             return self.buffer.pop(0)
626         elif self.closed:
627             self.length = 0
628             if self.failed:
629                 f = self.failure
630                 del self.failure
631                 return defer.fail(f)
632             return None
633         else:
634             deferred = self.deferred = Deferred()
635             if self.producer is not None and (not self.streamingProducer
636                                               or self.producerPaused):
637                 self.producerPaused = False
638                 self.producer.resumeProducing()
639                
640             return deferred
641        
642     def split(self, point):
643         return fallbackSplit(self, point)
644    
645     def close(self):
646         """Called by reader of stream when it is done reading."""
647         self.buffer=[]
648         self.closed = True
649         if self.producer is not None:
650             self.producer.stopProducing()
651             self.producer = None
652         self.deferred = None
653        
654     # IConsumer implementation
655     def write(self, data):
656         if self.closed:
657             return
658        
659         if self.deferred:
660             deferred = self.deferred
661             self.deferred = None
662             deferred.callback(data)
663         else:
664             self.buffer.append(data)
665             if(self.producer is not None and self.streamingProducer
666                and len(self.buffer) > self.bufferSize):
667                 self.producer.pauseProducing()
668                 self.producerPaused = True
669
670     def finish(self, failure=None):
671         """Called by producer when it is done.
672
673         If the optional failure argument is passed a Failure instance,
674         the stream will return it as errback on next Deferred.
675         """
676         self.closed = True
677         if not self.buffer:
678             self.length = 0
679         if self.deferred is not None:
680             deferred = self.deferred
681             self.deferred = None
682             if failure is not None:
683                 self.failed = True
684                 deferred.errback(failure)
685             else:
686                 deferred.callback(None)
687         else:
688             if failure is not None:
689                self.failed = True
690                self.failure = failure
691    
692     def registerProducer(self, producer, streaming):
693         if self.producer is not None:
694             raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer))
695        
696         if self.closed:
697             producer.stopProducing()
698         else:
699             self.producer = producer
700             self.streamingProducer = streaming
701             if not streaming:
702                 producer.resumeProducing()
703
704     def unregisterProducer(self):
705         self.producer = None
706        
707 class StreamProducer(object):
708     """A push producer which gets its data by reading a stream."""
709     implements(ti_interfaces.IPushProducer)
710
711     deferred = None
712     finishedCallback = None
713     paused = False
714     consumer = None
715    
716     def __init__(self, stream, enforceStr=True):
717         self.stream = stream
718         self.enforceStr = enforceStr
719        
720     def beginProducing(self, consumer):
721         if self.stream is None:
722             return defer.succeed(None)
723        
724         self.consumer = consumer
725         finishedCallback = self.finishedCallback = Deferred()
726         self.consumer.registerProducer(self, True)
727         self.resumeProducing()
728         return finishedCallback
729    
730     def resumeProducing(self):
731         self.paused = False
732         if self.deferred is not None:
733             return
734
735         try:
736             data = self.stream.read()
737         except:
738             self.stopProducing(Failure())
739             return
740        
741         if isinstance(data, Deferred):
742             self.deferred = data.addCallbacks(self._doWrite, self.stopProducing)
743         else:
744             self._doWrite(data)
745
746     def _doWrite(self, data):
747         if self.consumer is None:
748             return
749         if data is None:
750             # The end.
751             if self.consumer is not None:
752                 self.consumer.unregisterProducer()
753             if self.finishedCallback is not None:
754                 self.finishedCallback.callback(None)
755             self.finishedCallback = self.deferred = self.consumer = self.stream = None
756             return
757        
758         self.deferred = None
759         if self.enforceStr:
760             # XXX: sucks that we have to do this. make transport.write(buffer) work!
761             data = str(buffer(data))
762         self.consumer.write(data)
763        
764         if not self.paused:
765             self.resumeProducing()
766        
767     def pauseProducing(self):
768         self.paused = True
769
770     def stopProducing(self, failure=ti_error.ConnectionLost()):
771         if self.consumer is not None:
772             self.consumer.unregisterProducer()
773         if self.finishedCallback is not None:
774             if failure is not None:
775                 self.finishedCallback.errback(failure)
776             else:
777                 self.finishedCallback.callback(None)
778             self.finishedCallback = None
779         self.paused = True
780         if self.stream is not None:
781             self.stream.close()
782            
783         self.finishedCallback = self.deferred = self.consumer = self.stream = None
784
785 ##############################
786 ####    ProcessStreamer   ####
787 ##############################
788
789 class _ProcessStreamerProtocol(protocol.ProcessProtocol):
790
791     def __init__(self, inputStream, outStream, errStream):
792         self.inputStream = inputStream
793         self.outStream = outStream
794         self.errStream = errStream
795         self.resultDeferred = defer.Deferred()
796    
797     def connectionMade(self):
798         p = StreamProducer(self.inputStream)
799         # if the process stopped reading from the input stream,
800         # this is not an error condition, so it oughtn't result
801         # in a ConnectionLost() from the input stream:
802         p.stopProducing = lambda err=None: StreamProducer.stopProducing(p, err)
803        
804         d = p.beginProducing(self.transport)
805         d.addCallbacks(lambda _: self.transport.closeStdin(),
806                        self._inputError)
807
808     def _inputError(self, f):
809         log.msg("Error in input stream for %r" % self.transport)
810         log.err(f)
811         self.transport.closeStdin()
812    
813     def outReceived(self, data):
814         self.outStream.write(data)
815
816     def errReceived(self, data):
817         self.errStream.write(data)
818
819     def outConnectionLost(self):
820         self.outStream.finish()
821
822     def errConnectionLost(self):
823         self.errStream.finish()
824    
825     def processEnded(self, reason):
826         self.resultDeferred.errback(reason)
827         del self.resultDeferred
828
829
830 class ProcessStreamer(object):
831     """Runs a process hooked up to streams.
832
833     Requires an input stream, has attributes 'outStream' and 'errStream'
834     for stdout and stderr.
835
836     outStream and errStream are public attributes providing streams
837     for stdout and stderr of the process.
838     """
839
840     def __init__(self, inputStream, program, args, env={}):
841         self.outStream = ProducerStream()
842         self.errStream = ProducerStream()
843         self._protocol = _ProcessStreamerProtocol(IByteStream(inputStream), self.outStream, self.errStream)
844         self._program = program
845         self._args = args
846         self._env = env
847    
848     def run(self):
849         """Run the process.
850
851         Returns Deferred which will eventually have errback for non-clean (exit code > 0)
852         exit, with ProcessTerminated, or callback with None on exit code 0.
853         """
854         # XXX what happens if spawn fails?
855         reactor.spawnProcess(self._protocol, self._program, self._args, env=self._env)
856         del self._env
857         return self._protocol.resultDeferred.addErrback(lambda _: _.trap(ti_error.ProcessDone))
858
859 ##############################
860 ####   generatorToStream  ####
861 ##############################
862
863 class _StreamIterator(object):
864     done=False
865
866     def __iter__(self):
867         return self
868     def next(self):
869         if self.done:
870             raise StopIteration
871         return self.value
872     wait=object()
873
874 class _IteratorStream(object):
875     length = None
876    
877     def __init__(self, fun, stream, args, kwargs):
878         self._stream=stream
879         self._streamIterator = _StreamIterator()
880         self._gen = fun(self._streamIterator, *args, **kwargs)
881        
882     def read(self):
883         try:
884             val = self._gen.next()
885         except StopIteration:
886             return None
887         else:
888             if val is _StreamIterator.wait:
889                 newdata = self._stream.read()
890                 if isinstance(newdata, defer.Deferred):
891                     return newdata.addCallback(self._gotRead)
892                 else:
893                     return self._gotRead(newdata)
894             return val
895        
896     def _gotRead(self, data):
897         if data is None:
898             self._streamIterator.done=True
899         else:
900             self._streamIterator.value=data
901         return self.read()
902
903     def close(self):
904         self._stream.close()
905         del self._gen, self._stream, self._streamIterator
906
907     def split(self):
908         return fallbackSplit(self)
909    
910 def generatorToStream(fun):
911     """Converts a generator function into a stream.
912     
913     The function should take an iterator as its first argument,
914     which will be converted *from* a stream by this wrapper, and
915     yield items which are turned *into* the results from the
916     stream's 'read' call.
917     
918     One important point: before every call to input.next(), you
919     *MUST* do a "yield input.wait" first. Yielding this magic value
920     takes care of ensuring that the input is not a deferred before
921     you see it.
922     
923     >>> from twisted.web2 import stream
924     >>> from string import maketrans
925     >>> alphabet = 'abcdefghijklmnopqrstuvwxyz'
926     >>>
927     >>> def encrypt(input, key):
928     ...     code = alphabet[key:] + alphabet[:key]
929     ...     translator = maketrans(alphabet+alphabet.upper(), code+code.upper())
930     ...     yield input.wait
931     ...     for s in input:
932     ...         yield str(s).translate(translator)
933     ...         yield input.wait
934     ...
935     >>> encrypt = stream.generatorToStream(encrypt)
936     >>>
937     >>> plaintextStream = stream.MemoryStream('SampleSampleSample')
938     >>> encryptedStream = encrypt(plaintextStream, 13)
939     >>> encryptedStream.read()
940     'FnzcyrFnzcyrFnzcyr'
941     >>>
942     >>> plaintextStream = stream.MemoryStream('SampleSampleSample')
943     >>> encryptedStream = encrypt(plaintextStream, 13)
944     >>> evenMoreEncryptedStream = encrypt(encryptedStream, 13)
945     >>> evenMoreEncryptedStream.read()
946     'SampleSampleSample'
947     
948     """
949     def generatorToStream_inner(stream, *args, **kwargs):
950         return _IteratorStream(fun, stream, args, kwargs)
951     return generatorToStream_inner
952
953
954 ##############################
955 ####    BufferedStream    ####
956 ##############################
957
958 class BufferedStream(object):
959     """A stream which buffers its data to provide operations like
960     readline and readExactly."""
961    
962     data = ""
963     def __init__(self, stream):
964         self.stream = stream
965
966     def _readUntil(self, f):
967         """Internal helper function which repeatedly calls f each time
968         after more data has been received, until it returns non-None."""
969         while True:
970             r = f()
971             if r is not None:
972                 yield r; return
973            
974             newdata = self.stream.read()
975             if isinstance(newdata, defer.Deferred):
976                 newdata = defer.waitForDeferred(newdata)
977                 yield newdata; newdata = newdata.getResult()
978            
979             if newdata is None:
980                 # End Of File
981                 newdata = self.data
982                 self.data = ''
983                 yield newdata; return
984             self.data += str(newdata)
985     _readUntil = defer.deferredGenerator(_readUntil)
986
987     def readExactly(self, size=None):
988         """Read exactly size bytes of data, or, if size is None, read
989         the entire stream into a string."""
990         if size is not None and size < 0:
991             raise ValueError("readExactly: size cannot be negative: %s", size)
992        
993         def gotdata():
994             data = self.data
995             if size is not None and len(data) >= size:
996                 pre,post = data[:size], data[size:]
997                 self.data = post
998                 return pre
999         return self._readUntil(gotdata)
1000    
1001        
1002     def readline(self, delimiter='\r\n', size=None):
1003         """
1004         Read a line of data from the string, bounded by
1005         delimiter. The delimiter is included in the return value.
1006
1007         If size is specified, read and return at most that many bytes,
1008         even if the delimiter has not yet been reached. If the size
1009         limit falls within a delimiter, the rest of the delimiter, and
1010         the next line will be returned together.
1011         """
1012         if size is not None and size < 0:
1013             raise ValueError("readline: size cannot be negative: %s" % (size, ))
1014
1015         def gotdata():
1016             data = self.data
1017             if size is not None:
1018                 splitpoint = data.find(delimiter, 0, size)
1019                 if splitpoint == -1:
1020                     if len(data) >= size:
1021                         splitpoint = size
1022                 else:
1023                     splitpoint += len(delimiter)
1024             else:
1025                 splitpoint = data.find(delimiter)
1026                 if splitpoint != -1:
1027                     splitpoint += len(delimiter)
1028            
1029             if splitpoint != -1:
1030                 pre = data[:splitpoint]
1031                 self.data = data[splitpoint:]
1032                 return pre
1033         return self._readUntil(gotdata)
1034    
1035     def pushback(self, pushed):
1036         """Push data back into the buffer."""
1037        
1038         self.data = pushed + self.data
1039        
1040     def read(self):
1041         data = self.data
1042         if data:
1043             self.data = ""
1044             return data
1045         return self.stream.read()
1046
1047     def _len(self):
1048         l = self.stream.length
1049         if l is None:
1050             return None
1051         return l + len(self.data)
1052    
1053     length = property(_len)
1054    
1055     def split(self, offset):
1056         off = offset - len(self.data)
1057        
1058         pre, post = self.stream.split(max(0, off))
1059         pre = BufferedStream(pre)
1060         post = BufferedStream(post)
1061         if off < 0:
1062             pre.data = self.data[:-off]
1063             post.data = self.data[-off:]
1064         else:
1065             pre.data = self.data
1066        
1067         return pre, post
1068
1069        
1070 def substream(stream, start, end):
1071     if start > end:
1072         raise ValueError("start position must be less than end position %r"
1073                          % ((start, end),))
1074     stream = stream.split(start)[1]
1075     return stream.split(end - start)[0]
1076
1077
1078
1079 __all__ = ['IStream', 'IByteStream', 'FileStream', 'MemoryStream', 'CompoundStream',
1080            'readAndDiscard', 'fallbackSplit', 'ProducerStream', 'StreamProducer',
1081            'BufferedStream', 'readStream', 'ProcessStreamer', 'readIntoFile',
1082            'generatorToStream']
1083
Note: See TracBrowser for help on using the browser.