| 1 |
|
|---|
| 2 |
|
|---|
| 3 |
""" |
|---|
| 4 |
The stream module provides a simple abstraction of streaming |
|---|
| 5 |
data. While Twisted already has some provisions for handling this in |
|---|
| 6 |
its Producer/Consumer model, the rather complex interactions between |
|---|
| 7 |
producer and consumer makes it difficult to implement something like |
|---|
| 8 |
the CompoundStream object. Thus, this API. |
|---|
| 9 |
|
|---|
| 10 |
The IStream interface is very simple. It consists of two methods: |
|---|
| 11 |
read, and close. The read method should either return some data, None |
|---|
| 12 |
if there is no data left to read, or a Deferred. Close frees up any |
|---|
| 13 |
underlying resources and causes read to return None forevermore. |
|---|
| 14 |
|
|---|
| 15 |
IByteStream adds a bit more to the API: |
|---|
| 16 |
1) read is required to return objects conforming to the buffer interface. |
|---|
| 17 |
2) .length, which may either an integer number of bytes remaining, or |
|---|
| 18 |
None if unknown |
|---|
| 19 |
3) .split(position). Split takes a position, and splits the |
|---|
| 20 |
stream in two pieces, returning the two new streams. Using the |
|---|
| 21 |
original stream after calling split is not allowed. |
|---|
| 22 |
|
|---|
| 23 |
There are two builtin source stream classes: FileStream and |
|---|
| 24 |
MemoryStream. The first produces data from a file object, the second |
|---|
| 25 |
from a buffer in memory. Any number of these can be combined into one |
|---|
| 26 |
stream with the CompoundStream object. Then, to interface with other |
|---|
| 27 |
parts of Twisted, there are two transcievers: StreamProducer and |
|---|
| 28 |
ProducerStream. The first takes a stream and turns it into an |
|---|
| 29 |
IPushProducer, which will write to a consumer. The second is a |
|---|
| 30 |
consumer which is a stream, so that other producers can write to it. |
|---|
| 31 |
""" |
|---|
| 32 |
|
|---|
| 33 |
from __future__ import generators |
|---|
| 34 |
|
|---|
| 35 |
import copy, os, types, sys |
|---|
| 36 |
from zope.interface import Interface, Attribute, implements |
|---|
| 37 |
from twisted.internet.defer import Deferred |
|---|
| 38 |
from twisted.internet import interfaces as ti_interfaces, defer, reactor, protocol, error as ti_error |
|---|
| 39 |
from twisted.python import components, log |
|---|
| 40 |
from twisted.python.failure import Failure |
|---|
| 41 |
|
|---|
| 42 |
|
|---|
| 43 |
if sys.version_info[0:3] != (2,4,2): |
|---|
| 44 |
try: |
|---|
| 45 |
import mmap |
|---|
| 46 |
except ImportError: |
|---|
| 47 |
mmap = None |
|---|
| 48 |
else: |
|---|
| 49 |
mmap = None |
|---|
| 50 |
|
|---|
| 51 |
|
|---|
| 52 |
|
|---|
| 53 |
|
|---|
| 54 |
|
|---|
| 55 |
class IStream(Interface): |
|---|
| 56 |
"""A stream of arbitrary data.""" |
|---|
| 57 |
|
|---|
| 58 |
def read(): |
|---|
| 59 |
"""Read some data. |
|---|
| 60 |
|
|---|
| 61 |
Returns some object representing the data. |
|---|
| 62 |
If there is no more data available, returns None. |
|---|
| 63 |
Can also return a Deferred resulting in one of the above. |
|---|
| 64 |
|
|---|
| 65 |
Errors may be indicated by exception or by a Deferred of a Failure. |
|---|
| 66 |
""" |
|---|
| 67 |
|
|---|
| 68 |
def close(): |
|---|
| 69 |
"""Prematurely close. Should also cause further reads to |
|---|
| 70 |
return None.""" |
|---|
| 71 |
|
|---|
| 72 |
class IByteStream(IStream): |
|---|
| 73 |
"""A stream which is of bytes.""" |
|---|
| 74 |
|
|---|
| 75 |
length = Attribute("""How much data is in this stream. Can be None if unknown.""") |
|---|
| 76 |
|
|---|
| 77 |
def read(): |
|---|
| 78 |
"""Read some data. |
|---|
| 79 |
|
|---|
| 80 |
Returns an object conforming to the buffer interface, or |
|---|
| 81 |
if there is no more data available, returns None. |
|---|
| 82 |
Can also return a Deferred resulting in one of the above. |
|---|
| 83 |
|
|---|
| 84 |
Errors may be indicated by exception or by a Deferred of a Failure. |
|---|
| 85 |
""" |
|---|
| 86 |
def split(point): |
|---|
| 87 |
"""Split this stream into two, at byte position 'point'. |
|---|
| 88 |
|
|---|
| 89 |
Returns a tuple of (before, after). After calling split, no other |
|---|
| 90 |
methods should be called on this stream. Doing so will have undefined |
|---|
| 91 |
behavior. |
|---|
| 92 |
|
|---|
| 93 |
If you cannot implement split easily, you may implement it as:: |
|---|
| 94 |
|
|---|
| 95 |
return fallbackSplit(self, point) |
|---|
| 96 |
""" |
|---|
| 97 |
|
|---|
| 98 |
def close(): |
|---|
| 99 |
"""Prematurely close this stream. Should also cause further reads to |
|---|
| 100 |
return None. Additionally, .length should be set to 0. |
|---|
| 101 |
""" |
|---|
| 102 |
|
|---|
| 103 |
class ISendfileableStream(Interface): |
|---|
| 104 |
def read(sendfile=False): |
|---|
| 105 |
""" |
|---|
| 106 |
Read some data. |
|---|
| 107 |
If sendfile == False, returns an object conforming to the buffer |
|---|
| 108 |
interface, or else a Deferred. |
|---|
| 109 |
|
|---|
| 110 |
If sendfile == True, returns either the above, or a SendfileBuffer. |
|---|
| 111 |
""" |
|---|
| 112 |
|
|---|
| 113 |
class SimpleStream(object): |
|---|
| 114 |
"""Superclass of simple streams with a single buffer and a offset and length |
|---|
| 115 |
into that buffer.""" |
|---|
| 116 |
implements(IByteStream) |
|---|
| 117 |
|
|---|
| 118 |
length = None |
|---|
| 119 |
start = None |
|---|
| 120 |
|
|---|
| 121 |
def read(self): |
|---|
| 122 |
return None |
|---|
| 123 |
|
|---|
| 124 |
def close(self): |
|---|
| 125 |
self.length = 0 |
|---|
| 126 |
|
|---|
| 127 |
def split(self, point): |
|---|
| 128 |
if self.length is not None: |
|---|
| 129 |
if point > self.length: |
|---|
| 130 |
raise ValueError("split point (%d) > length (%d)" % (point, self.length)) |
|---|
| 131 |
b = copy.copy(self) |
|---|
| 132 |
self.length = point |
|---|
| 133 |
if b.length is not None: |
|---|
| 134 |
b.length -= point |
|---|
| 135 |
b.start += point |
|---|
| 136 |
return (self, b) |
|---|
| 137 |
|
|---|
| 138 |
|
|---|
| 139 |
|
|---|
| 140 |
|
|---|
| 141 |
|
|---|
| 142 |
|
|---|
| 143 |
MMAP_LIMIT = 4*1024*1024 |
|---|
| 144 |
|
|---|
| 145 |
MMAP_THRESHOLD = 8*1024 |
|---|
| 146 |
|
|---|
| 147 |
|
|---|
| 148 |
SENDFILE_LIMIT = 16777216 |
|---|
| 149 |
|
|---|
| 150 |
SENDFILE_THRESHOLD = 256 |
|---|
| 151 |
|
|---|
| 152 |
def mmapwrapper(*args, **kwargs): |
|---|
| 153 |
""" |
|---|
| 154 |
Python's mmap call sucks and ommitted the "offset" argument for no |
|---|
| 155 |
discernable reason. Replace this with a mmap module that has offset. |
|---|
| 156 |
""" |
|---|
| 157 |
|
|---|
| 158 |
offset = kwargs.get('offset', None) |
|---|
| 159 |
if offset in [None, 0]: |
|---|
| 160 |
if 'offset' in kwargs: |
|---|
| 161 |
del kwargs['offset'] |
|---|
| 162 |
else: |
|---|
| 163 |
raise mmap.error("mmap: Python sucks and does not support offset.") |
|---|
| 164 |
return mmap.mmap(*args, **kwargs) |
|---|
| 165 |
|
|---|
| 166 |
class FileStream(SimpleStream): |
|---|
| 167 |
implements(ISendfileableStream) |
|---|
| 168 |
"""A stream that reads data from a file. File must be a normal |
|---|
| 169 |
file that supports seek, (e.g. not a pipe or device or socket).""" |
|---|
| 170 |
|
|---|
| 171 |
CHUNK_SIZE = 2 ** 2 ** 2 ** 2 - 32 |
|---|
| 172 |
|
|---|
| 173 |
f = None |
|---|
| 174 |
def __init__(self, f, start=0, length=None, useMMap=bool(mmap)): |
|---|
| 175 |
""" |
|---|
| 176 |
Create the stream from file f. If you specify start and length, |
|---|
| 177 |
use only that portion of the file. |
|---|
| 178 |
""" |
|---|
| 179 |
self.f = f |
|---|
| 180 |
self.start = start |
|---|
| 181 |
if length is None: |
|---|
| 182 |
self.length = os.fstat(f.fileno()).st_size |
|---|
| 183 |
else: |
|---|
| 184 |
self.length = length |
|---|
| 185 |
self.useMMap = useMMap |
|---|
| 186 |
|
|---|
| 187 |
def read(self, sendfile=False): |
|---|
| 188 |
if self.f is None: |
|---|
| 189 |
return None |
|---|
| 190 |
|
|---|
| 191 |
length = self.length |
|---|
| 192 |
if length == 0: |
|---|
| 193 |
self.f = None |
|---|
| 194 |
return None |
|---|
| 195 |
|
|---|
| 196 |
if sendfile and length > SENDFILE_THRESHOLD: |
|---|
| 197 |
|
|---|
| 198 |
|
|---|
| 199 |
|
|---|
| 200 |
readSize = min(length, SENDFILE_LIMIT) |
|---|
| 201 |
res = SendfileBuffer(self.f, self.start, readSize) |
|---|
| 202 |
self.length -= readSize |
|---|
| 203 |
self.start += readSize |
|---|
| 204 |
return res |
|---|
| 205 |
|
|---|
| 206 |
if self.useMMap and length > MMAP_THRESHOLD: |
|---|
| 207 |
readSize = min(length, MMAP_LIMIT) |
|---|
| 208 |
try: |
|---|
| 209 |
res = mmapwrapper(self.f.fileno(), readSize, |
|---|
| 210 |
access=mmap.ACCESS_READ, offset=self.start) |
|---|
| 211 |
|
|---|
| 212 |
self.length -= readSize |
|---|
| 213 |
self.start += readSize |
|---|
| 214 |
return res |
|---|
| 215 |
except mmap.error: |
|---|
| 216 |
pass |
|---|
| 217 |
|
|---|
| 218 |
|
|---|
| 219 |
readSize = min(length, self.CHUNK_SIZE) |
|---|
| 220 |
|
|---|
| 221 |
self.f.seek(self.start) |
|---|
| 222 |
b = self.f.read(readSize) |
|---|
| 223 |
bytesRead = len(b) |
|---|
| 224 |
if not bytesRead: |
|---|
| 225 |
raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length)) |
|---|
| 226 |
else: |
|---|
| 227 |
self.length -= bytesRead |
|---|
| 228 |
self.start += bytesRead |
|---|
| 229 |
return b |
|---|
| 230 |
|
|---|
| 231 |
def close(self): |
|---|
| 232 |
self.f = None |
|---|
| 233 |
SimpleStream.close(self) |
|---|
| 234 |
|
|---|
| 235 |
components.registerAdapter(FileStream, file, IByteStream) |
|---|
| 236 |
|
|---|
| 237 |
|
|---|
| 238 |
|
|---|
| 239 |
|
|---|
| 240 |
|
|---|
| 241 |
class MemoryStream(SimpleStream): |
|---|
| 242 |
"""A stream that reads data from a buffer object.""" |
|---|
| 243 |
def __init__(self, mem, start=0, length=None): |
|---|
| 244 |
""" |
|---|
| 245 |
Create the stream from buffer object mem. If you specify start and length, |
|---|
| 246 |
use only that portion of the buffer. |
|---|
| 247 |
""" |
|---|
| 248 |
self.mem = mem |
|---|
| 249 |
self.start = start |
|---|
| 250 |
if length is None: |
|---|
| 251 |
self.length = len(mem) - start |
|---|
| 252 |
else: |
|---|
| 253 |
if len(mem) < length: |
|---|
| 254 |
raise ValueError("len(mem) < start + length") |
|---|
| 255 |
self.length = length |
|---|
| 256 |
|
|---|
| 257 |
def read(self): |
|---|
| 258 |
if self.mem is None: |
|---|
| 259 |
return None |
|---|
| 260 |
if self.length == 0: |
|---|
| 261 |
result = None |
|---|
| 262 |
else: |
|---|
| 263 |
result = buffer(self.mem, self.start, self.length) |
|---|
| 264 |
self.mem = None |
|---|
| 265 |
self.length = 0 |
|---|
| 266 |
return result |
|---|
| 267 |
|
|---|
| 268 |
def close(self): |
|---|
| 269 |
self.mem = None |
|---|
| 270 |
SimpleStream.close(self) |
|---|
| 271 |
|
|---|
| 272 |
components.registerAdapter(MemoryStream, str, IByteStream) |
|---|
| 273 |
components.registerAdapter(MemoryStream, types.BufferType, IByteStream) |
|---|
| 274 |
|
|---|
| 275 |
|
|---|
| 276 |
|
|---|
| 277 |
|
|---|
| 278 |
|
|---|
| 279 |
class CompoundStream(object): |
|---|
| 280 |
"""A stream which is composed of many other streams. |
|---|
| 281 |
|
|---|
| 282 |
Call addStream to add substreams. |
|---|
| 283 |
""" |
|---|
| 284 |
|
|---|
| 285 |
implements(IByteStream, ISendfileableStream) |
|---|
| 286 |
deferred = None |
|---|
| 287 |
length = 0 |
|---|
| 288 |
|
|---|
| 289 |
def __init__(self, buckets=()): |
|---|
| 290 |
self.buckets = [IByteStream(s) for s in buckets] |
|---|
| 291 |
|
|---|
| 292 |
def addStream(self, bucket): |
|---|
| 293 |
"""Add a stream to the output""" |
|---|
| 294 |
bucket = IByteStream(bucket) |
|---|
| 295 |
self.buckets.append(bucket) |
|---|
| 296 |
if self.length is not None: |
|---|
| 297 |
if bucket.length is None: |
|---|
| 298 |
self.length = None |
|---|
| 299 |
else: |
|---|
| 300 |
self.length += bucket.length |
|---|
| 301 |
|
|---|
| 302 |
def read(self, sendfile=False): |
|---|
| 303 |
if self.deferred is not None: |
|---|
| 304 |
raise RuntimeError("Call to read while read is already outstanding") |
|---|
| 305 |
|
|---|
| 306 |
if not self.buckets: |
|---|
| 307 |
return None |
|---|
| 308 |
|
|---|
| 309 |
if sendfile and ISendfileableStream.providedBy(self.buckets[0]): |
|---|
| 310 |
try: |
|---|
| 311 |
result = self.buckets[0].read(sendfile) |
|---|
| 312 |
except: |
|---|
| 313 |
return self._gotFailure(Failure()) |
|---|
| 314 |
else: |
|---|
| 315 |
try: |
|---|
| 316 |
result = self.buckets[0].read() |
|---|
| 317 |
except: |
|---|
| 318 |
return self._gotFailure(Failure()) |
|---|
| 319 |
|
|---|
| 320 |
if isinstance(result, Deferred): |
|---|
| 321 |
self.deferred = result |
|---|
| 322 |
result.addCallbacks(self._gotRead, self._gotFailure, (sendfile,)) |
|---|
| 323 |
return result |
|---|
| 324 |
|
|---|
| 325 |
return self._gotRead(result, sendfile) |
|---|
| 326 |
|
|---|
| 327 |
def _gotFailure(self, f): |
|---|
| 328 |
self.deferred = None |
|---|
| 329 |
del self.buckets[0] |
|---|
| 330 |
self.close() |
|---|
| 331 |
return f |
|---|
| 332 |
|
|---|
| 333 |
def _gotRead(self, result, sendfile): |
|---|
| 334 |
self.deferred = None |
|---|
| 335 |
if result is None: |
|---|
| 336 |
del self.buckets[0] |
|---|
| 337 |
|
|---|
| 338 |
return self.read(sendfile) |
|---|
| 339 |
|
|---|
| 340 |
if self.length is not None: |
|---|
| 341 |
self.length -= len(result) |
|---|
| 342 |
return result |
|---|
| 343 |
|
|---|
| 344 |
def split(self, point): |
|---|
| 345 |
num = 0 |
|---|
| 346 |
origPoint = point |
|---|
| 347 |
for bucket in self.buckets: |
|---|
| 348 |
num+=1 |
|---|
| 349 |
|
|---|
| 350 |
if point == 0: |
|---|
| 351 |
b = CompoundStream() |
|---|
| 352 |
b.buckets = self.buckets[num:] |
|---|
| 353 |
del self.buckets[num:] |
|---|
| 354 |
return self,b |
|---|
| 355 |
|
|---|
| 356 |
if bucket.length is None: |
|---|
| 357 |
|
|---|
| 358 |
|
|---|
| 359 |
return fallbackSplit(self, origPoint) |
|---|
| 360 |
|
|---|
| 361 |
if point < bucket.length: |
|---|
| 362 |
before,after = bucket.split(point) |
|---|
| 363 |
b = CompoundStream() |
|---|
| 364 |
b.buckets = self.buckets[num:] |
|---|
| 365 |
b.buckets[0] = after |
|---|
| 366 |
|
|---|
| 367 |
del self.buckets[num+1:] |
|---|
| 368 |
self.buckets[num] = before |
|---|
| 369 |
return self,b |
|---|
| 370 |
|
|---|
| 371 |
point -= bucket.length |
|---|
| 372 |
|
|---|
| 373 |
def close(self): |
|---|
| 374 |
for bucket in self.buckets: |
|---|
| 375 |
bucket.close() |
|---|
| 376 |
self.buckets = [] |
|---|
| 377 |
self.length = 0 |
|---|
| 378 |
|
|---|
| 379 |
|
|---|
| 380 |
|
|---|
| 381 |
|
|---|
| 382 |
|
|---|
| 383 |
|
|---|
| 384 |
class _StreamReader(object): |
|---|
| 385 |
"""Process a stream's data using callbacks for data and stream finish.""" |
|---|
| 386 |
|
|---|
| 387 |
def __init__(self, stream, gotDataCallback): |
|---|
| 388 |
self.stream = stream |
|---|
| 389 |
self.gotDataCallback = gotDataCallback |
|---|
| 390 |
self.result = Deferred() |
|---|
| 391 |
|
|---|
| 392 |
def run(self): |
|---|
| 393 |
|
|---|
| 394 |
result = self.result |
|---|
| 395 |
self._read() |
|---|
| 396 |
return result |
|---|
| 397 |
|
|---|
| 398 |
def _read(self): |
|---|
| 399 |
try: |
|---|
| 400 |
result = self.stream.read() |
|---|
| 401 |
except: |
|---|
| 402 |
self._gotError(Failure()) |
|---|
| 403 |
return |
|---|
| 404 |
if isinstance(result, Deferred): |
|---|
| 405 |
result.addCallbacks(self._gotData, self._gotError) |
|---|
| 406 |
else: |
|---|
| 407 |
self._gotData(result) |
|---|
| 408 |
|
|---|
| 409 |
def _gotError(self, failure): |
|---|
| 410 |
result = self.result |
|---|
| 411 |
del self.result, self.gotDataCallback, self.stream |
|---|
| 412 |
result.errback(failure) |
|---|
| 413 |
|
|---|
| 414 |
def _gotData(self, data): |
|---|
| 415 |
if data is None: |
|---|
| 416 |
result = self.result |
|---|
| 417 |
del self.result, self.gotDataCallback, self.stream |
|---|
| 418 |
result.callback(None) |
|---|
| 419 |
return |
|---|
| 420 |
try: |
|---|
| 421 |
self.gotDataCallback(data) |
|---|
| 422 |
except: |
|---|
| 423 |
self._gotError(Failure()) |
|---|
| 424 |
return |
|---|
| 425 |
reactor.callLater(0, self._read) |
|---|
| 426 |
|
|---|
| 427 |
def readStream(stream, gotDataCallback): |
|---|
| 428 |
"""Pass a stream's data to a callback. |
|---|
| 429 |
|
|---|
| 430 |
Returns Deferred which will be triggered on finish. Errors in |
|---|
| 431 |
reading the stream or in processing it will be returned via this |
|---|
| 432 |
Deferred. |
|---|
| 433 |
""" |
|---|
| 434 |
return _StreamReader(stream, gotDataCallback).run() |
|---|
| 435 |
|
|---|
| 436 |
|
|---|
| 437 |
def readAndDiscard(stream): |
|---|
| 438 |
"""Read all the data from the given stream, and throw it out. |
|---|
| 439 |
|
|---|
| 440 |
Returns Deferred which will be triggered on finish. |
|---|
| 441 |
""" |
|---|
| 442 |
return readStream(stream, lambda _: None) |
|---|
| 443 |
|
|---|
| 444 |
def readIntoFile(stream, outFile): |
|---|
| 445 |
"""Read a stream and write it into a file. |
|---|
| 446 |
|
|---|
| 447 |
Returns Deferred which will be triggered on finish. |
|---|
| 448 |
""" |
|---|
| 449 |
def done(_): |
|---|
| 450 |
outFile.close() |
|---|
| 451 |
return _ |
|---|
| 452 |
return readStream(stream, outFile.write).addBoth(done) |
|---|
| 453 |
|
|---|
| 454 |
def connectStream(inputStream, factory): |
|---|
| 455 |
"""Connect a protocol constructed from a factory to stream. |
|---|
| 456 |
|
|---|
| 457 |
Returns an output stream from the protocol. |
|---|
| 458 |
|
|---|
| 459 |
The protocol's transport will have a finish() method it should |
|---|
| 460 |
call when done writing. |
|---|
| 461 |
""" |
|---|
| 462 |
|
|---|
| 463 |
p = factory.buildProtocol(None) |
|---|
| 464 |
out = ProducerStream() |
|---|
| 465 |
out.disconnecting = False |
|---|
| 466 |
p.makeConnection(out) |
|---|
| 467 |
readStream(inputStream, lambda _: p.dataReceived(_)).addCallbacks( |
|---|
| 468 |
lambda _: p.connectionLost(ti_error.ConnectionDone()), lambda _: p.connectionLost(_)) |
|---|
| 469 |
return out |
|---|
| 470 |
|
|---|
| 471 |
|
|---|
| 472 |
|
|---|
| 473 |
|
|---|
| 474 |
|
|---|
| 475 |
def fallbackSplit(stream, point): |
|---|
| 476 |
after = PostTruncaterStream(stream, point) |
|---|
| 477 |
before = TruncaterStream(stream, point, after) |
|---|
| 478 |
return (before, after) |
|---|
| 479 |
|
|---|
| 480 |
class TruncaterStream(object): |
|---|
| 481 |
def __init__(self, stream, point, postTruncater): |
|---|
| 482 |
self.stream = stream |
|---|
| 483 |
self.length = point |
|---|
| 484 |
self.postTruncater = postTruncater |
|---|
| 485 |
|
|---|
| 486 |
def read(self): |
|---|
| 487 |
if self.length == 0: |
|---|
| 488 |
if self.postTruncater is not None: |
|---|
| 489 |
postTruncater = self.postTruncater |
|---|
| 490 |
self.postTruncater = None |
|---|
| 491 |
postTruncater.sendInitialSegment(self.stream.read()) |
|---|
| 492 |
self.stream = None |
|---|
| 493 |
return None |
|---|
| 494 |
|
|---|
| 495 |
result = self.stream.read() |
|---|
| 496 |
if isinstance(result, Deferred): |
|---|
| 497 |
return result.addCallback(self._gotRead) |
|---|
| 498 |
else: |
|---|
| 499 |
return self._gotRead(result) |
|---|
| 500 |
|
|---|
| 501 |
def _gotRead(self, data): |
|---|
| 502 |
if data is None: |
|---|
| 503 |
raise ValueError("Ran out of data for a split of a indeterminate length source") |
|---|
| 504 |
if self.length >= len(data): |
|---|
| 505 |
self.length -= len(data) |
|---|
| 506 |
return data |
|---|
| 507 |
else: |
|---|
| 508 |
before = buffer(data, 0, self.length) |
|---|
| 509 |
after = buffer(data, self.length) |
|---|
| 510 |
self.length = 0 |
|---|
| 511 |
if self.postTruncater is not None: |
|---|
| 512 |
postTruncater = self.postTruncater |
|---|
| 513 |
self.postTruncater = None |
|---|
| 514 |
postTruncater.sendInitialSegment(after) |
|---|
| 515 |
self.stream = None |
|---|
| 516 |
return before |
|---|
| 517 |
|
|---|
| 518 |
def split(self, point): |
|---|
| 519 |
if point > self.length: |
|---|
| 520 |
raise ValueError("split point (%d) > length (%d)" % (point, self.length)) |
|---|
| 521 |
|
|---|
| 522 |
post = PostTruncaterStream(self.stream, point) |
|---|
| 523 |
trunc = TruncaterStream(post, self.length - point, self.postTruncater) |
|---|
| 524 |
self.length = point |
|---|
| 525 |
self.postTruncater = post |
|---|
| 526 |
return self, trunc |
|---|
| 527 |
|
|---|
| 528 |
def close(self): |
|---|
| 529 |
if self.postTruncater is not None: |
|---|
| 530 |
self.postTruncater.notifyClosed(self) |
|---|
| 531 |
else: |
|---|
| 532 |
|
|---|
| 533 |
self.stream.close() |
|---|
| 534 |
self.stream = None |
|---|
| 535 |
self.length = 0 |
|---|
| 536 |
|
|---|
| 537 |
|
|---|
| 538 |
class PostTruncaterStream(object): |
|---|
| 539 |
deferred = None |
|---|
| 540 |
sentInitialSegment = False |
|---|
| 541 |
truncaterClosed = None |
|---|
| 542 |
closed = False |
|---|
| 543 |
|
|---|
| 544 |
length = None |
|---|
| 545 |
def __init__(self, stream, point): |
|---|
| 546 |
self.stream = stream |
|---|
| 547 |
self.deferred = Deferred() |
|---|
| 548 |
if stream.length is not None: |
|---|
| 549 |
self.length = stream.length - point |
|---|
| 550 |
|
|---|
| 551 |
def read(self): |
|---|
| 552 |
if not self.sentInitialSegment: |
|---|
| 553 |
self.sentInitialSegment = True |
|---|
| 554 |
if self.truncaterClosed is not None: |
|---|
| 555 |
readAndDiscard(self.truncaterClosed) |
|---|
| 556 |
self.truncaterClosed = None |
|---|
| 557 |
return self.deferred |
|---|
| 558 |
|
|---|
| 559 |
return self.stream.read() |
|---|
| 560 |
|
|---|
| 561 |
def split(self, point): |
|---|
| 562 |
return fallbackSplit(self, point) |
|---|
| 563 |
|
|---|
| 564 |
def close(self): |
|---|
| 565 |
self.closed = True |
|---|
| 566 |
if self.truncaterClosed is not None: |
|---|
| 567 |
|
|---|
| 568 |
self.truncaterClosed.postTruncater = None |
|---|
| 569 |
self.truncaterClosed.close() |
|---|
| 570 |
elif self.sentInitialSegment: |
|---|
| 571 |
|
|---|
| 572 |
self.stream.close() |
|---|
| 573 |
|
|---|
| 574 |
self.deferred = None |
|---|
| 575 |
|
|---|
| 576 |
|
|---|
| 577 |
def sendInitialSegment(self, data): |
|---|
| 578 |
if self.closed: |
|---|
| 579 |
|
|---|
| 580 |
self.stream.close() |
|---|
| 581 |
self.stream = None |
|---|
| 582 |
if self.deferred is not None: |
|---|
| 583 |
if isinstance(data, Deferred): |
|---|
| 584 |
data.chainDeferred(self.deferred) |
|---|
| 585 |
else: |
|---|
| 586 |
self.deferred.callback(data) |
|---|
| 587 |
|
|---|
| 588 |
def notifyClosed(self, truncater): |
|---|
| 589 |
if self.closed: |
|---|
| 590 |
|
|---|
| 591 |
truncater.postTruncater = None |
|---|
| 592 |
truncater.close() |
|---|
| 593 |
elif self.sentInitialSegment: |
|---|
| 594 |
|
|---|
| 595 |
readAndDiscard(truncater) |
|---|
| 596 |
else: |
|---|
| 597 |
|
|---|
| 598 |
self.truncaterClosed = truncater |
|---|
| 599 |
|
|---|
| 600 |
|
|---|
| 601 |
|
|---|
| 602 |
|
|---|
| 603 |
|
|---|
| 604 |
class ProducerStream(object): |
|---|
| 605 |
"""Turns producers into a IByteStream. |
|---|
| 606 |
Thus, implements IConsumer and IByteStream.""" |
|---|
| 607 |
|
|---|
| 608 |
implements(IByteStream, ti_interfaces.IConsumer) |
|---|
| 609 |
length = None |
|---|
| 610 |
closed = False |
|---|
| 611 |
failed = False |
|---|
| 612 |
producer = None |
|---|
| 613 |
producerPaused = False |
|---|
| 614 |
deferred = None |
|---|
| 615 |
|
|---|
| 616 |
bufferSize = 5 |
|---|
| 617 |
|
|---|
| 618 |
def __init__(self, length=None): |
|---|
| 619 |
self.buffer = [] |
|---|
| 620 |
self.length = length |
|---|
| 621 |
|
|---|
| 622 |
|
|---|
| 623 |
def read(self): |
|---|
| 624 |
if self.buffer: |
|---|
| 625 |
return self.buffer.pop(0) |
|---|
| 626 |
elif self.closed: |
|---|
| 627 |
self.length = 0 |
|---|
| 628 |
if self.failed: |
|---|
| 629 |
f = self.failure |
|---|
| 630 |
del self.failure |
|---|
| 631 |
return defer.fail(f) |
|---|
| 632 |
return None |
|---|
| 633 |
else: |
|---|
| 634 |
deferred = self.deferred = Deferred() |
|---|
| 635 |
if self.producer is not None and (not self.streamingProducer |
|---|
| 636 |
or self.producerPaused): |
|---|
| 637 |
self.producerPaused = False |
|---|
| 638 |
self.producer.resumeProducing() |
|---|
| 639 |
|
|---|
| 640 |
return deferred |
|---|
| 641 |
|
|---|
| 642 |
def split(self, point): |
|---|
| 643 |
return fallbackSplit(self, point) |
|---|
| 644 |
|
|---|
| 645 |
def close(self): |
|---|
| 646 |
"""Called by reader of stream when it is done reading.""" |
|---|
| 647 |
self.buffer=[] |
|---|
| 648 |
self.closed = True |
|---|
| 649 |
if self.producer is not None: |
|---|
| 650 |
self.producer.stopProducing() |
|---|
| 651 |
self.producer = None |
|---|
| 652 |
self.deferred = None |
|---|
| 653 |
|
|---|
| 654 |
|
|---|
| 655 |
def write(self, data): |
|---|
| 656 |
if self.closed: |
|---|
| 657 |
return |
|---|
| 658 |
|
|---|
| 659 |
if self.deferred: |
|---|
| 660 |
deferred = self.deferred |
|---|
| 661 |
self.deferred = None |
|---|
| 662 |
deferred.callback(data) |
|---|
| 663 |
else: |
|---|
| 664 |
self.buffer.append(data) |
|---|
| 665 |
if(self.producer is not None and self.streamingProducer |
|---|
| 666 |
and len(self.buffer) > self.bufferSize): |
|---|
| 667 |
self.producer.pauseProducing() |
|---|
| 668 |
self.producerPaused = True |
|---|
| 669 |
|
|---|
| 670 |
def finish(self, failure=None): |
|---|
| 671 |
"""Called by producer when it is done. |
|---|
| 672 |
|
|---|
| 673 |
If the optional failure argument is passed a Failure instance, |
|---|
| 674 |
the stream will return it as errback on next Deferred. |
|---|
| 675 |
""" |
|---|
| 676 |
self.closed = True |
|---|
| 677 |
if not self.buffer: |
|---|
| 678 |
self.length = 0 |
|---|
| 679 |
if self.deferred is not None: |
|---|
| 680 |
deferred = self.deferred |
|---|
| 681 |
self.deferred = None |
|---|
| 682 |
if failure is not None: |
|---|
| 683 |
self.failed = True |
|---|
| 684 |
deferred.errback(failure) |
|---|
| 685 |
else: |
|---|
| 686 |
deferred.callback(None) |
|---|
| 687 |
else: |
|---|
| 688 |
if failure is not None: |
|---|
| 689 |
self.failed = True |
|---|
| 690 |
self.failure = failure |
|---|
| 691 |
|
|---|
| 692 |
def registerProducer(self, producer, streaming): |
|---|
| 693 |
if self.producer is not None: |
|---|
| 694 |
raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer)) |
|---|
| 695 |
|
|---|
| 696 |
if self.closed: |
|---|
| 697 |
producer.stopProducing() |
|---|
| 698 |
else: |
|---|
| 699 |
self.producer = producer |
|---|
| 700 |
self.streamingProducer = streaming |
|---|
| 701 |
if not streaming: |
|---|
| 702 |
producer.resumeProducing() |
|---|
| 703 |
|
|---|
| 704 |
def unregisterProducer(self): |
|---|
| 705 |
self.producer = None |
|---|
| 706 |
|
|---|
| 707 |
class StreamProducer(object): |
|---|
| 708 |
"""A push producer which gets its data by reading a stream.""" |
|---|
| 709 |
implements(ti_interfaces.IPushProducer) |
|---|
| 710 |
|
|---|
| 711 |
deferred = None |
|---|
| 712 |
finishedCallback = None |
|---|
| 713 |
paused = False |
|---|
| 714 |
consumer = None |
|---|
| 715 |
|
|---|
| 716 |
def __init__(self, stream, enforceStr=True): |
|---|
| 717 |
self.stream = stream |
|---|
| 718 |
self.enforceStr = enforceStr |
|---|
| 719 |
|
|---|
| 720 |
def beginProducing(self, consumer): |
|---|
| 721 |
if self.stream is None: |
|---|
| 722 |
return defer.succeed(None) |
|---|
| 723 |
|
|---|
| 724 |
self.consumer = consumer |
|---|
| 725 |
finishedCallback = self.finishedCallback = Deferred() |
|---|
| 726 |
self.consumer.registerProducer(self, True) |
|---|
| 727 |
self.resumeProducing() |
|---|
| 728 |
return finishedCallback |
|---|
| 729 |
|
|---|
| 730 |
def resumeProducing(self): |
|---|
| 731 |
self.paused = False |
|---|
| 732 |
if self.deferred is not None: |
|---|
| 733 |
return |
|---|
| 734 |
|
|---|
| 735 |
try: |
|---|
| 736 |
data = self.stream.read() |
|---|
| 737 |
except: |
|---|
| 738 |
self.stopProducing(Failure()) |
|---|
| 739 |
return |
|---|
| 740 |
|
|---|
| 741 |
if isinstance(data, Deferred): |
|---|
| 742 |
self.deferred = data.addCallbacks(self._doWrite, self.stopProducing) |
|---|
| 743 |
else: |
|---|
| 744 |
self._doWrite(data) |
|---|
| 745 |
|
|---|
| 746 |
def _doWrite(self, data): |
|---|
| 747 |
if self.consumer is None: |
|---|
| 748 |
return |
|---|
| 749 |
if data is None: |
|---|
| 750 |
|
|---|
| 751 |
if self.consumer is not None: |
|---|
| 752 |
self.consumer.unregisterProducer() |
|---|
| 753 |
if self.finishedCallback is not None: |
|---|
| 754 |
self.finishedCallback.callback(None) |
|---|
| 755 |
self.finishedCallback = self.deferred = self.consumer = self.stream = None |
|---|
| 756 |
return |
|---|
| 757 |
|
|---|
| 758 |
self.deferred = None |
|---|
| 759 |
if self.enforceStr: |
|---|
| 760 |
|
|---|
| 761 |
data = str(buffer(data)) |
|---|
| 762 |
self.consumer.write(data) |
|---|
| 763 |
|
|---|
| 764 |
if not self.paused: |
|---|
| 765 |
self.resumeProducing() |
|---|
| 766 |
|
|---|
| 767 |
def pauseProducing(self): |
|---|
| 768 |
self.paused = True |
|---|
| 769 |
|
|---|
| 770 |
def stopProducing(self, failure=ti_error.ConnectionLost()): |
|---|
| 771 |
if self.consumer is not None: |
|---|
| 772 |
self.consumer.unregisterProducer() |
|---|
| 773 |
if self.finishedCallback is not None: |
|---|
| 774 |
if failure is not None: |
|---|
| 775 |
self.finishedCallback.errback(failure) |
|---|
| 776 |
else: |
|---|
| 777 |
self.finishedCallback.callback(None) |
|---|
| 778 |
self.finishedCallback = None |
|---|
| 779 |
self.paused = True |
|---|
| 780 |
if self.stream is not None: |
|---|
| 781 |
self.stream.close() |
|---|
| 782 |
|
|---|
| 783 |
self.finishedCallback = self.deferred = self.consumer = self.stream = None |
|---|
| 784 |
|
|---|
| 785 |
|
|---|
| 786 |
|
|---|
| 787 |
|
|---|
| 788 |
|
|---|
| 789 |
class _ProcessStreamerProtocol(protocol.ProcessProtocol): |
|---|
| 790 |
|
|---|
| 791 |
def __init__(self, inputStream, outStream, errStream): |
|---|
| 792 |
self.inputStream = inputStream |
|---|
| 793 |
self.outStream = outStream |
|---|
| 794 |
self.errStream = errStream |
|---|
| 795 |
self.resultDeferred = defer.Deferred() |
|---|
| 796 |
|
|---|
| 797 |
def connectionMade(self): |
|---|
| 798 |
p = StreamProducer(self.inputStream) |
|---|
| 799 |
|
|---|
| 800 |
|
|---|
| 801 |
|
|---|
| 802 |
p.stopProducing = lambda err=None: StreamProducer.stopProducing(p, err) |
|---|
| 803 |
|
|---|
| 804 |
d = p.beginProducing(self.transport) |
|---|
| 805 |
d.addCallbacks(lambda _: self.transport.closeStdin(), |
|---|
| 806 |
self._inputError) |
|---|
| 807 |
|
|---|
| 808 |
def _inputError(self, f): |
|---|
| 809 |
log.msg("Error in input stream for %r" % self.transport) |
|---|
| 810 |
log.err(f) |
|---|
| 811 |
self.transport.closeStdin() |
|---|
| 812 |
|
|---|
| 813 |
def outReceived(self, data): |
|---|
| 814 |
self.outStream.write(data) |
|---|
| 815 |
|
|---|
| 816 |
def errReceived(self, data): |
|---|
| 817 |
self.errStream.write(data) |
|---|
| 818 |
|
|---|
| 819 |
def outConnectionLost(self): |
|---|
| 820 |
self.outStream.finish() |
|---|
| 821 |
|
|---|
| 822 |
def errConnectionLost(self): |
|---|
| 823 |
self.errStream.finish() |
|---|
| 824 |
|
|---|
| 825 |
def processEnded(self, reason): |
|---|
| 826 |
self.resultDeferred.errback(reason) |
|---|
| 827 |
del self.resultDeferred |
|---|
| 828 |
|
|---|
| 829 |
|
|---|
| 830 |
class ProcessStreamer(object): |
|---|
| 831 |
"""Runs a process hooked up to streams. |
|---|
| 832 |
|
|---|
| 833 |
Requires an input stream, has attributes 'outStream' and 'errStream' |
|---|
| 834 |
for stdout and stderr. |
|---|
| 835 |
|
|---|
| 836 |
outStream and errStream are public attributes providing streams |
|---|
| 837 |
for stdout and stderr of the process. |
|---|
| 838 |
""" |
|---|
| 839 |
|
|---|
| 840 |
def __init__(self, inputStream, program, args, env={}): |
|---|
| 841 |
self.outStream = ProducerStream() |
|---|
| 842 |
self.errStream = ProducerStream() |
|---|
| 843 |
self._protocol = _ProcessStreamerProtocol(IByteStream(inputStream), self.outStream, self.errStream) |
|---|
| 844 |
self._program = program |
|---|
| 845 |
self._args = args |
|---|
| 846 |
self._env = env |
|---|
| 847 |
|
|---|
| 848 |
def run(self): |
|---|
| 849 |
"""Run the process. |
|---|
| 850 |
|
|---|
| 851 |
Returns Deferred which will eventually have errback for non-clean (exit code > 0) |
|---|
| 852 |
exit, with ProcessTerminated, or callback with None on exit code 0. |
|---|
| 853 |
""" |
|---|
| 854 |
|
|---|
| 855 |
reactor.spawnProcess(self._protocol, self._program, self._args, env=self._env) |
|---|
| 856 |
del self._env |
|---|
| 857 |
return self._protocol.resultDeferred.addErrback(lambda _: _.trap(ti_error.ProcessDone)) |
|---|
| 858 |
|
|---|
| 859 |
|
|---|
| 860 |
|
|---|
| 861 |
|
|---|
| 862 |
|
|---|
| 863 |
class _StreamIterator(object): |
|---|
| 864 |
done=False |
|---|
| 865 |
|
|---|
| 866 |
def __iter__(self): |
|---|
| 867 |
return self |
|---|
| 868 |
def next(self): |
|---|
| 869 |
if self.done: |
|---|
| 870 |
raise StopIteration |
|---|
| 871 |
return self.value |
|---|
| 872 |
wait=object() |
|---|
| 873 |
|
|---|
| 874 |
class _IteratorStream(object): |
|---|
| 875 |
length = None |
|---|
| 876 |
|
|---|
| 877 |
def __init__(self, fun, stream, args, kwargs): |
|---|
| 878 |
self._stream=stream |
|---|
| 879 |
self._streamIterator = _StreamIterator() |
|---|
| 880 |
self._gen = fun(self._streamIterator, *args, **kwargs) |
|---|
| 881 |
|
|---|
| 882 |
def read(self): |
|---|
| 883 |
try: |
|---|
| 884 |
val = self._gen.next() |
|---|
| 885 |
except StopIteration: |
|---|
| 886 |
return None |
|---|
| 887 |
else: |
|---|
| 888 |
if val is _StreamIterator.wait: |
|---|
| 889 |
newdata = self._stream.read() |
|---|
| 890 |
if isinstance(newdata, defer.Deferred): |
|---|
| 891 |
return newdata.addCallback(self._gotRead) |
|---|
| 892 |
else: |
|---|
| 893 |
return self._gotRead(newdata) |
|---|
| 894 |
return val |
|---|
| 895 |
|
|---|
| 896 |
def _gotRead(self, data): |
|---|
| 897 |
if data is None: |
|---|
| 898 |
self._streamIterator.done=True |
|---|
| 899 |
else: |
|---|
| 900 |
self._streamIterator.value=data |
|---|
| 901 |
return self.read() |
|---|
| 902 |
|
|---|
| 903 |
def close(self): |
|---|
| 904 |
self._stream.close() |
|---|
| 905 |
del self._gen, self._stream, self._streamIterator |
|---|
| 906 |
|
|---|
| 907 |
def split(self): |
|---|
| 908 |
return fallbackSplit(self) |
|---|
| 909 |
|
|---|
| 910 |
def generatorToStream(fun): |
|---|
| 911 |
"""Converts a generator function into a stream. |
|---|
| 912 |
|
|---|
| 913 |
The function should take an iterator as its first argument, |
|---|
| 914 |
which will be converted *from* a stream by this wrapper, and |
|---|
| 915 |
yield items which are turned *into* the results from the |
|---|
| 916 |
stream's 'read' call. |
|---|
| 917 |
|
|---|
| 918 |
One important point: before every call to input.next(), you |
|---|
| 919 |
*MUST* do a "yield input.wait" first. Yielding this magic value |
|---|
| 920 |
takes care of ensuring that the input is not a deferred before |
|---|
| 921 |
you see it. |
|---|
| 922 |
|
|---|
| 923 |
>>> from twisted.web2 import stream |
|---|
| 924 |
>>> from string import maketrans |
|---|
| 925 |
>>> alphabet = 'abcdefghijklmnopqrstuvwxyz' |
|---|
| 926 |
>>> |
|---|
| 927 |
>>> def encrypt(input, key): |
|---|
| 928 |
... code = alphabet[key:] + alphabet[:key] |
|---|
| 929 |
... translator = maketrans(alphabet+alphabet.upper(), code+code.upper()) |
|---|
| 930 |
... yield input.wait |
|---|
| 931 |
... for s in input: |
|---|
| 932 |
... yield str(s).translate(translator) |
|---|
| 933 |
... yield input.wait |
|---|
| 934 |
... |
|---|
| 935 |
>>> encrypt = stream.generatorToStream(encrypt) |
|---|
| 936 |
>>> |
|---|
| 937 |
>>> plaintextStream = stream.MemoryStream('SampleSampleSample') |
|---|
| 938 |
>>> encryptedStream = encrypt(plaintextStream, 13) |
|---|
| 939 |
>>> encryptedStream.read() |
|---|
| 940 |
'FnzcyrFnzcyrFnzcyr' |
|---|
| 941 |
>>> |
|---|
| 942 |
>>> plaintextStream = stream.MemoryStream('SampleSampleSample') |
|---|
| 943 |
>>> encryptedStream = encrypt(plaintextStream, 13) |
|---|
| 944 |
>>> evenMoreEncryptedStream = encrypt(encryptedStream, 13) |
|---|
| 945 |
>>> evenMoreEncryptedStream.read() |
|---|
| 946 |
'SampleSampleSample' |
|---|
| 947 |
|
|---|
| 948 |
""" |
|---|
| 949 |
def generatorToStream_inner(stream, *args, **kwargs): |
|---|
| 950 |
return _IteratorStream(fun, stream, args, kwargs) |
|---|
| 951 |
return generatorToStream_inner |
|---|
| 952 |
|
|---|
| 953 |
|
|---|
| 954 |
|
|---|
| 955 |
|
|---|
| 956 |
|
|---|
| 957 |
|
|---|
| 958 |
class BufferedStream(object): |
|---|
| 959 |
"""A stream which buffers its data to provide operations like |
|---|
| 960 |
readline and readExactly.""" |
|---|
| 961 |
|
|---|
| 962 |
data = "" |
|---|
| 963 |
def __init__(self, stream): |
|---|
| 964 |
self.stream = stream |
|---|
| 965 |
|
|---|
| 966 |
def _readUntil(self, f): |
|---|
| 967 |
"""Internal helper function which repeatedly calls f each time |
|---|
| 968 |
after more data has been received, until it returns non-None.""" |
|---|
| 969 |
while True: |
|---|
| 970 |
r = f() |
|---|
| 971 |
if r is not None: |
|---|
| 972 |
yield r; return |
|---|
| 973 |
|
|---|
| 974 |
newdata = self.stream.read() |
|---|
| 975 |
if isinstance(newdata, defer.Deferred): |
|---|
| 976 |
newdata = defer.waitForDeferred(newdata) |
|---|
| 977 |
yield newdata; newdata = newdata.getResult() |
|---|
| 978 |
|
|---|
| 979 |
if newdata is None: |
|---|
| 980 |
|
|---|
| 981 |
newdata = self.data |
|---|
| 982 |
self.data = '' |
|---|
| 983 |
yield newdata; return |
|---|
| 984 |
self.data += str(newdata) |
|---|
| 985 |
_readUntil = defer.deferredGenerator(_readUntil) |
|---|
| 986 |
|
|---|
| 987 |
def readExactly(self, size=None): |
|---|
| 988 |
"""Read exactly size bytes of data, or, if size is None, read |
|---|
| 989 |
the entire stream into a string.""" |
|---|
| 990 |
if size is not None and size < 0: |
|---|
| 991 |
raise ValueError("readExactly: size cannot be negative: %s", size) |
|---|
| 992 |
|
|---|
| 993 |
def gotdata(): |
|---|
| 994 |
data = self.data |
|---|
| 995 |
if size is not None and len(data) >= size: |
|---|
| 996 |
pre,post = data[:size], data[size:] |
|---|
| 997 |
self.data = post |
|---|
| 998 |
return pre |
|---|
| 999 |
return self._readUntil(gotdata) |
|---|
| 1000 |
|
|---|
| 1001 |
|
|---|
| 1002 |
def readline(self, delimiter='\r\n', size=None): |
|---|
| 1003 |
""" |
|---|
| 1004 |
Read a line of data from the string, bounded by |
|---|
| 1005 |
delimiter. The delimiter is included in the return value. |
|---|
| 1006 |
|
|---|
| 1007 |
If size is specified, read and return at most that many bytes, |
|---|
| 1008 |
even if the delimiter has not yet been reached. If the size |
|---|
| 1009 |
limit falls within a delimiter, the rest of the delimiter, and |
|---|
| 1010 |
the next line will be returned together. |
|---|
| 1011 |
""" |
|---|
| 1012 |
if size is not None and size < 0: |
|---|
| 1013 |
raise ValueError("readline: size cannot be negative: %s" % (size, )) |
|---|
| 1014 |
|
|---|
| 1015 |
def gotdata(): |
|---|
| 1016 |
data = self.data |
|---|
| 1017 |
if size is not None: |
|---|
| 1018 |
splitpoint = data.find(delimiter, 0, size) |
|---|
| 1019 |
if splitpoint == -1: |
|---|
| 1020 |
if len(data) >= size: |
|---|
| 1021 |
splitpoint = size |
|---|
| 1022 |
else: |
|---|
| 1023 |
splitpoint += len(delimiter) |
|---|
| 1024 |
else: |
|---|
| 1025 |
splitpoint = data.find(delimiter) |
|---|
| 1026 |
if splitpoint != -1: |
|---|
| 1027 |
splitpoint += len(delimiter) |
|---|
| 1028 |
|
|---|
| 1029 |
if splitpoint != -1: |
|---|
| 1030 |
pre = data[:splitpoint] |
|---|
| 1031 |
self.data = data[splitpoint:] |
|---|
| 1032 |
return pre |
|---|
| 1033 |
return self._readUntil(gotdata) |
|---|
| 1034 |
|
|---|
| 1035 |
def pushback(self, pushed): |
|---|
| 1036 |
"""Push data back into the buffer.""" |
|---|
| 1037 |
|
|---|
| 1038 |
self.data = pushed + self.data |
|---|
| 1039 |
|
|---|
| 1040 |
def read(self): |
|---|
| 1041 |
data = self.data |
|---|
| 1042 |
if data: |
|---|
| 1043 |
self.data = "" |
|---|
| 1044 |
return data |
|---|
| 1045 |
return self.stream.read() |
|---|
| 1046 |
|
|---|
| 1047 |
def _len(self): |
|---|
| 1048 |
l = self.stream.length |
|---|
| 1049 |
if l is None: |
|---|
| 1050 |
return None |
|---|
| 1051 |
return l + len(self.data) |
|---|
| 1052 |
|
|---|
| 1053 |
length = property(_len) |
|---|
| 1054 |
|
|---|
| 1055 |
def split(self, offset): |
|---|
| 1056 |
off = offset - len(self.data) |
|---|
| 1057 |
|
|---|
| 1058 |
pre, post = self.stream.split(max(0, off)) |
|---|
| 1059 |
pre = BufferedStream(pre) |
|---|
| 1060 |
post = BufferedStream(post) |
|---|
| 1061 |
if off < 0: |
|---|
| 1062 |
pre.data = self.data[:-off] |
|---|
| 1063 |
post.data = self.data[-off:] |
|---|
| 1064 |
else: |
|---|
| 1065 |
pre.data = self.data |
|---|
| 1066 |
|
|---|
| 1067 |
return pre, post |
|---|
| 1068 |
|
|---|
| 1069 |
|
|---|
| 1070 |
def substream(stream, start, end): |
|---|
| 1071 |
if start > end: |
|---|
| 1072 |
raise ValueError("start position must be less than end position %r" |
|---|
| 1073 |
% ((start, end),)) |
|---|
| 1074 |
stream = stream.split(start)[1] |
|---|
| 1075 |
return stream.split(end - start)[0] |
|---|
| 1076 |
|
|---|
| 1077 |
|
|---|
| 1078 |
|
|---|
| 1079 |
__all__ = ['IStream', 'IByteStream', 'FileStream', 'MemoryStream', 'CompoundStream', |
|---|
| 1080 |
'readAndDiscard', 'fallbackSplit', 'ProducerStream', 'StreamProducer', |
|---|
| 1081 |
'BufferedStream', 'readStream', 'ProcessStreamer', 'readIntoFile', |
|---|
| 1082 |
'generatorToStream'] |
|---|
| 1083 |
|
|---|