Opened 6 years ago

Last modified 3 years ago

#3682 enhancement new

twisted.internet.abstract.FileDescriptor accumulates _tempData and throws MemoryError

Reported by: gthomas Owned by: gthomas
Priority: low Milestone:
Component: core Keywords: MemoryError, FileDescriptor
Cc: Branch:
Author: Launchpad Bug:

Description

twisted.internet.abstract.FileDescriptor.doWrite method sends some data, but doesn't care about _tempDataBuffer's size.

In the write method, it tries to pause the producer, but too much ifs are there (self.provider exists and it is a not a StreamProducer)).

So when the producer produces too much data, the

        "".join(self._tempDataBuffer) #line 101

throws a MemoryError.

My suggestions:

  1. if self.dataBuffer is small, only just a few chunks should be appended to it from self._tempDataBuffer
  2. the doWrite method should try to slim self._tempDataBuffer at least till self.bufferSize (so too much data just can't accumulate in self.dataBuffer and self._tempDataBuffer).

A possible solution (twisted.internet.abstract.FileDescriptor.doWrite:

def doWrite(self):
    """Called when data can be written.

    A result that is true (which will be a negative number) implies the
    connection was lost. A false result implies the connection is still
    there; a result of 0 implies no write was done, and a result of None
    indicates that a write was done.
    """
    #LOG.debug('%s.doWrite: db=%d tdb=%d'
    #          % (self, len(self.dataBuffer)-self.offset, self._tempDataLen))
    while 1:
        # always try to do something
        # quit when dataBuffer size is acceptably low (see end)
        if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
            # If there is currently less than SEND_LIMIT bytes left to send
            # in the string, extend it with the array data.
            buf = []
            accum = len(self.dataBuffer)-self.offset
            m = 2 * max(self.SEND_LIMIT, self.bufferSize)
            chunk = None
            while self._tempDataBuffer and accum < m:
                # don't want to join too much data at once
                chunk = self._tempDataBuffer.pop(0)
                buf.append(chunk)
                n = len(chunk)
                accum += n
                self._tempDataLen -= n
            #LOG.debug('%s.doWrite: accum=%d tdb=%d' % (self, accum, self._tempDataLen,))
            del chunk, accum, m
            self.dataBuffer = (buffer(self.dataBuffer, self.offset) + "".join(buf))
            del buf
            self.offset = 0

        # Send as much data as you can.
        if self.offset:
            l = self.writeSomeData(buffer(self.dataBuffer, self.offset))
        else:
            l = self.writeSomeData(self.dataBuffer)

        # There is no writeSomeData implementation in Twisted which returns
        # 0, but the documentation for writeSomeData used to claim negative
        # integers meant connection lost.  Keep supporting this here,
        # although it may be worth deprecating and removing at some point.
        if l < 0 or isinstance(l, Exception):
            return l
        if l == 0 and self.dataBuffer:
            # Couldn't send data
            result = 0
            break
        else:
            result = None
        self.offset += l

        if len(self.dataBuffer) - self.offset + self._tempDataLen < self.bufferSize:
          # quit, 'cause dataBuffer AND _tempDataBuffer is small enough
          break
    #
    # If there is nothing left to send,
    if self.offset == len(self.dataBuffer) and not self._tempDataLen:
        self.dataBuffer = ""
        self.offset = 0
        # stop writing.
        self.stopWriting()
        # If I've got a producer who is supposed to supply me with data,
        if self.producer is not None and ((not self.streamingProducer)
                                          or self.producerPaused):
            # tell them to supply some more.
            self.producerPaused = 0
            self.producer.resumeProducing()
        elif self.disconnecting:
            # But if I was previously asked to let the connection die, do
            # so.
            return self._postLoseConnection()
        elif self._writeDisconnecting:
            # I was previously asked to to half-close the connection.
            result = self._closeWriteConnection()
            self._writeDisconnected = True
            return result
    return result

thanks, GThomas

Attachments (2)

3682-svn-diff.patch (4.2 KB) - added by gthomas 6 years ago.
The modification as "svn diff"
test_proxy.py (52.2 KB) - added by gthomas 6 years ago.
copy of test_internet.py with test_dontPausePullConsumerOnWriteAlot added to TestProducer

Download all attachments as: .zip

Change History (9)

comment:1 Changed 6 years ago by exarkun

  • Author gthomas deleted
  • Milestone regular-releases deleted

The regular-releases milestone is for release automation.

Also, please suggest changes in the form of patches generated by svn diff from the latest version of trunk.

comment:2 follow-up: Changed 6 years ago by glyph

  • Owner changed from glyph to zseil
  • Priority changed from normal to low

It would be really cool for Twisted to be resilient to this type of error, but the real way to deal with large buffers of data is to write a producer. So assuming I understand the problem being described, this is low priority.

comment:3 Changed 6 years ago by glyph

  • Owner changed from zseil to gthomas

Whoops, wrong assignee.

Changed 6 years ago by gthomas

The modification as "svn diff"

comment:4 in reply to: ↑ 2 Changed 6 years ago by gthomas

Replying to glyph:

It would be really cool for Twisted to be resilient to this type of error, but the real way to deal with large buffers of data is to write a producer. So assuming I understand the problem being described, this is low priority.

  1. attached the diff
  2. the application is a simple port forwarding proxy using twisted.web2.proxy.Proxy, so maybe the real proble lies there...

Anyway I think a little faster consume does not harm.

comment:5 follow-up: Changed 6 years ago by exarkun

It'd be good if we had some way to tell if this were actually faster. From a quick look at the patch, it seems it's replacing a relatively fast str.join with a for loop. I definitely wouldn't assume that this kind of change would make anything faster.

It'd also be good if there were a unit test which demonstrated that this prevented some failure (ie, really fixed a bug). Actually writing such a unit test may be impossible, though.

comment:6 in reply to: ↑ 5 Changed 6 years ago by gthomas

Replying to exarkun:

It'd be good if we had some way to tell if this were actually faster. From a quick look at the patch, it seems it's replacing a relatively fast str.join with a for loop. I definitely wouldn't assume that this kind of change would make anything faster.

It'd also be good if there were a unit test which demonstrated that this prevented some failure (ie, really fixed a bug). Actually writing such a unit test may be impossible, though.

Attached a test_proxy.py (copied twisted/test/test_internet.py), added test_dontPausePullConsumerWriteAlot, it first uses the new doWrite, then the old (trunk) one.
With proper ulimit and fill sizes, first case runs, the second throws a MemoryError.

bash -c 'ulimit -d 1024 -v 102400 ; PYTHONPATH=../..:$PYTHONPATH \
  python ../../bin/trial test_proxy.TestProducer'

str.join IS fast, but if _tempDataBuffer is just too big...
If I could tell WHAT is too big, then we could use the faster str.join when that buffer aint too big, and use the cautious while cycle when it is too big.

Changed 6 years ago by gthomas

copy of test_internet.py with test_dontPausePullConsumerOnWriteAlot added to TestProducer

comment:7 Changed 3 years ago by exarkun

Here are the results from the TCP throughput benchmark from lp:twisted-benchmarks.

trunk:

130383872.0 tcp_throughput/sec (130383872 tcp_throughput in 1.0 seconds)
131104768.0 tcp_throughput/sec (131104768 tcp_throughput in 1.0 seconds)
131104768.0 tcp_throughput/sec (131104768 tcp_throughput in 1.0 seconds)
131334144.0 tcp_throughput/sec (131334144 tcp_throughput in 1.0 seconds)
131530752.0 tcp_throughput/sec (131530752 tcp_throughput in 1.0 seconds)
130940928.0 tcp_throughput/sec (130940928 tcp_throughput in 1.0 seconds)
130842624.0 tcp_throughput/sec (130842624 tcp_throughput in 1.0 seconds)
130809856.0 tcp_throughput/sec (130809856 tcp_throughput in 1.0 seconds)
124977152.0 tcp_throughput/sec (124977152 tcp_throughput in 1.0 seconds)

the patch:

123142144.0 tcp_throughput/sec (123142144 tcp_throughput in 1.0 seconds)
122617856.0 tcp_throughput/sec (122617856 tcp_throughput in 1.0 seconds)
122519552.0 tcp_throughput/sec (122519552 tcp_throughput in 1.0 seconds)
123240448.0 tcp_throughput/sec (123240448 tcp_throughput in 1.0 seconds)
123273216.0 tcp_throughput/sec (123273216 tcp_throughput in 1.0 seconds)
123273216.0 tcp_throughput/sec (123273216 tcp_throughput in 1.0 seconds)
122748928.0 tcp_throughput/sec (122748928 tcp_throughput in 1.0 seconds)
123600896.0 tcp_throughput/sec (123600896 tcp_throughput in 1.0 seconds)
123076608.0 tcp_throughput/sec (123076608 tcp_throughput in 1.0 seconds)

Roughly, this is about a 5% decrease in performance. This seems like a high price to pay to avoid this problem. I suggest investigating a a change to use writev(2) instead.

Note: See TracTickets for help on using tickets.