Ticket #2611: reduce_basic_protocols_complexity.patch

File reduce_basic_protocols_complexity.patch, 15.5 KB (added by danderson, 14 years ago)

Patch closing this ticket

  • twisted/protocols/basic.py

     
    1515# System imports
    1616import re
    1717import struct
     18import warnings
    1819
    1920# Twisted imports
    2021from twisted.internet import protocol, defer, interfaces, error
     
    125126
    126127    @cvar delimiter: The line-ending delimiter to use. By default this is
    127128                     '\\r\\n'.
     129    @type delimiter: C{str}
     130
    128131    @cvar MAX_LENGTH: The maximum length of a line to allow (If a
    129132                      sent line is longer than this, the connection is dropped).
    130133                      Default is 16384.
     134    @type MAX_LENGTH: C{int}
     135
     136    @ivar _buffer: A list of data pieces received and buffered, waiting
     137                   for a delimiter.
     138    @type _buffer: C{list} of C{str}
     139
     140    @ivar _buffer_size: The amount of data stored in L{_buffer}.
     141    @type _buffer_size: C{int}
    131142    """
    132     _buffer = ''
     143    _buffer = None
     144    _buffer_size = 0
    133145    delimiter = '\r\n'
    134146    MAX_LENGTH = 16384
    135147
    136148    def dataReceived(self, data):
    137149        """Translates bytes into lines, and calls lineReceived."""
    138         lines  = (self._buffer+data).split(self.delimiter)
    139         self._buffer = lines.pop(-1)
     150        if self._buffer is None:
     151            self._buffer = []
     152        lines = data.split(self.delimiter)
     153        trailing = lines.pop()
     154
    140155        for line in lines:
    141156            if self.transport.disconnecting:
    142157                # this is necessary because the transport may be told to lose
     
    144159                # important to disregard all the lines in that packet following
    145160                # the one that told it to close.
    146161                return
     162            self._buffer.append(line)
     163            line = ''.join(self._buffer)
     164            self._buffer, self._buffer_size = [], 0
     165
    147166            if len(line) > self.MAX_LENGTH:
    148167                return self.lineLengthExceeded(line)
    149168            else:
    150169                self.lineReceived(line)
    151         if len(self._buffer) > self.MAX_LENGTH:
    152             return self.lineLengthExceeded(self._buffer)
    153170
     171        self._buffer.append(trailing)
     172        self._buffer_size += len(trailing)
     173        if self._buffer_size > self.MAX_LENGTH:
     174            return self.lineLengthExceeded(''.join(self._buffer))
     175
    154176    def lineReceived(self, line):
    155177        """Override this for when each line is received.
    156178        """
     
    197219
    198220    @cvar delimiter: The line-ending delimiter to use. By default this is
    199221                     '\\r\\n'.
     222    @type delimiter: C{str}
     223
    200224    @cvar MAX_LENGTH: The maximum length of a line to allow (If a
    201225                      sent line is longer than this, the connection is dropped).
    202226                      Default is 16384.
     227    @type MAX_LENGTH: C{int}
     228
     229    @ivar _buffer: A list of data pieces received and buffered. Either
     230                   an incomplete line, or raw data when the protocol is
     231                   paused.
     232    @type _buffer: C{list} of C{str}
     233
     234    @ivar _buffer_size: The amount of data stored in L{_buffer}.
     235    @type _buffer_size: C{int}
     236
     237    @ivar _was_paused: True if the protocol was paused the last time
     238                       L{dataReceived} was called. This is needed to
     239                       detect unpausing and trigger processing of
     240                       buffered data.
     241    @type _was_paused: C{bool}
    203242    """
    204243    line_mode = 1
    205     __buffer = ''
     244    _buffer = None
     245    _buffer_size = 0
     246    _was_paused = False
    206247    delimiter = '\r\n'
    207248    MAX_LENGTH = 16384
    208249
     250    def _appendToLineBuffer(self, data):
     251        self._buffer.append(data)
     252        self._buffer_size += len(data)
     253
    209254    def clearLineBuffer(self):
    210255        """Clear buffered data."""
    211         self.__buffer = ""
     256        self._buffer, self._buffer_size = [], 0
    212257
    213258    def dataReceived(self, data):
    214259        """Protocol.dataReceived.
    215260        Translates bytes into lines, and calls lineReceived (or
    216261        rawDataReceived, depending on mode.)
    217262        """
    218         self.__buffer = self.__buffer+data
     263        if self._buffer is None:
     264            self._buffer = []
     265
     266        if self.paused:
     267            self._was_paused = True
     268            self._appendToLineBuffer(data)
     269            return
     270        elif self._was_paused:
     271            # This is the call of dataReceived that follows an
     272            # unpausing. The buffer has been accumulating data without
     273            # processing it, so we need to send it all back through the
     274            # pipes.
     275            self._was_paused = False
     276            self._buffer.append(data)
     277            buf = self._buffer
     278            self.clearLineBuffer()
     279            for chunk in buf:
     280                why = self.dataReceived(chunk)
     281                if why or self.transport and self.transport.disconnecting:
     282                    return why
     283            return
     284        self._was_paused = False
     285
    219286        while self.line_mode and not self.paused:
    220287            try:
    221                 line, self.__buffer = self.__buffer.split(self.delimiter, 1)
     288                line, data = data.split(self.delimiter, 1)
    222289            except ValueError:
    223                 if len(self.__buffer) > self.MAX_LENGTH:
    224                     line, self.__buffer = self.__buffer, ''
     290                self._appendToLineBuffer(data)
     291                if self._buffer_size > self.MAX_LENGTH:
     292                    line = ''.join(self._buffer)
     293                    self.clearLineBuffer()
    225294                    return self.lineLengthExceeded(line)
    226295                break
    227296            else:
    228                 linelength = len(line)
    229                 if linelength > self.MAX_LENGTH:
    230                     exceeded = line + self.__buffer
    231                     self.__buffer = ''
    232                     return self.lineLengthExceeded(exceeded)
     297                self._buffer.append(line)
     298                line = ''.join(self._buffer)
     299                self.clearLineBuffer()
     300                if len(line) > self.MAX_LENGTH:
     301                    return self.lineLengthExceeded(line)
    233302                why = self.lineReceived(line)
    234303                if why or self.transport and self.transport.disconnecting:
    235304                    return why
     305                elif self.paused:
     306                    self._was_paused = True
     307                    self._appendToLineBuffer(data)
    236308        else:
    237309            if not self.paused:
    238                 data=self.__buffer
    239                 self.__buffer=''
     310                self._buffer.append(data)
     311                data = ''.join(self._buffer)
     312                self.clearLineBuffer()
    240313                if data:
    241314                    return self.rawDataReceived(data)
    242315
     
    275348    def sendLine(self, line):
    276349        """Sends a line to the other end of the connection.
    277350        """
    278         return self.transport.write(line + self.delimiter)
     351        return self.transport.writeSequence((line, self.delimiter))
    279352
    280353    def lineLengthExceeded(self, line):
    281354        """Called when the maximum line length has been reached.
     
    300373    """
    301374    Generic class for length prefixed protocols.
    302375
    303     @ivar recvd: buffer holding received data when splitted.
    304     @type recvd: C{str}
    305 
    306376    @ivar structFormat: format used for struct packing/unpacking. Define it in
    307377        subclass.
    308378    @type structFormat: C{str}
     
    310380    @ivar prefixLength: length of the prefix, in bytes. Define it in subclass,
    311381        using C{struct.calcSize(structFormat)}
    312382    @type prefixLength: C{int}
     383
     384    @ivar _buffer: A list of data pieces received and buffered.
     385    @type _buffer: C{list} of C{str}
     386
     387    @ivar _buffer_size: The amount of data stored in L{_buffer}.
     388    @type _buffer_size: C{int}
     389
     390    @ivar _packet_length: The total length of the packet currently being
     391                          buffered up, including the prefix. The length
     392                          will be None until a full prefix can be
     393                          decoded.
     394    @type _packet_length: C{int} or C{None}
     395
     396    @ivar recvd: A string representation of C{_buffer}. This is a
     397                 deprecated attribute, and will fire a
     398                 DeprecationWarning when accessed. Be very careful if
     399                 you write to this attribute, no sanity checking is
     400                 performed on the contents being injected into the
     401                 buffer.
     402    @type recvd: C{str}
    313403    """
    314404    MAX_LENGTH = 99999
    315     recvd = ""
     405    _buffer = None
     406    _buffer_size = 0
     407    _packet_length = None
    316408
     409    def _appendToBuffer(self, data):
     410        self._buffer.append(data)
     411        self._buffer_size += len(data)
     412
     413    def _clearBuffer(self):
     414        """Clear buffered data."""
     415        self._buffer, self._buffer_size = [], 0
     416
    317417    def stringReceived(self, msg):
    318418        """
    319419        Override this.
    320420        """
    321421        raise NotImplementedError
    322422
    323     def dataReceived(self, recd):
     423    def dataReceived(self, data):
    324424        """
    325425        Convert int prefixed strings into calls to stringReceived.
    326426        """
    327         self.recvd = self.recvd + recd
    328         while len(self.recvd) >= self.prefixLength and not self.paused:
    329             length ,= struct.unpack(
    330                 self.structFormat, self.recvd[:self.prefixLength])
    331             if length > self.MAX_LENGTH:
    332                 self.transport.loseConnection()
    333                 return
    334             if len(self.recvd) < length + self.prefixLength:
    335                 break
    336             packet = self.recvd[self.prefixLength:length + self.prefixLength]
    337             self.recvd = self.recvd[length + self.prefixLength:]
    338             self.stringReceived(packet)
     427        if self._buffer is None:
     428            self._buffer = []
     429        self._appendToBuffer(data)
    339430
     431        while not self.paused:
     432            if self._packet_length is None:
     433                if self._buffer_size < self.prefixLength:
     434                    return
     435                data = ''.join(self._buffer)
     436                self._packet_length ,= struct.unpack(
     437                    self.structFormat, data[:self.prefixLength])
     438                if self._packet_length > self.MAX_LENGTH:
     439                    self._clearBuffer()
     440                    self.transport.loseConnection()
     441                    return
     442                self._packet_length += self.prefixLength
     443                self._buffer = [data]
     444
     445            if self._packet_length is not None:
     446                if self._buffer_size < self._packet_length:
     447                    return
     448                data = ''.join(self._buffer)
     449                packet = data[self.prefixLength:self._packet_length]
     450                self._clearBuffer()
     451                self._appendToBuffer(data[self._packet_length:])
     452                self._packet_length = None
     453                self.stringReceived(packet)
     454
    340455    def sendString(self, data):
    341456        """
    342457        Send an prefixed string to the other end of the connection.
     
    347462            raise StringTooLongError(
    348463                "Try to send %s bytes whereas maximum is %s" % (
    349464                len(data), 2 ** (8 * self.prefixLength)))
    350         self.transport.write(struct.pack(self.structFormat, len(data)) + data)
     465        self.transport.writeSequence(
     466            (struct.pack(self.structFormat, len(data)), data))
    351467
     468    def _getRecvd(self):
     469        warnings.warn("The recvd attribute of IntNStringReceiver is deprecated",
     470                      DeprecationWarning)
     471        if len(self._buffer) == 0:
     472            return ''
     473        elif len(self._buffer) > 1:
     474            self._buffer = ''.join(self._buffer)
     475        return self._buffer[0]
    352476
     477    def _setRecvd(self, data):
     478        warnings.warn("The recvd attribute of IntNStringReceiver is deprecated",
     479                      DeprecationWarning)
     480        self._buffer = [data]
     481        self._buffer_size = len(data)
     482
     483    recvd = property(_getRecvd, _setRecvd)
     484
    353485class Int32StringReceiver(IntNStringReceiver):
    354486    """
    355487    A receiver for int32-prefixed strings.
  • doc/core/benchmarks/receivers_fragmentation.py

     
     1#!/usr/bin/env python
     2
     3"""
     4Benchmark to test the performance of
     5basic.(LineOnlyReceiver|LineReceiver|IntNStringReceiver) when receiving
     6very fragmented packets.
     7
     8This benchmark was created to verify that the patch reducing the time
     9complexity of these protocols from O(n^2) to O(n) worked correctly, and
     10to make sure that the non-pathological cases didn't result in any
     11performance degradation.
     12
     13This benchmark makes each tested protocol receive N messages, each split
     14into X packets of M bytes, with X varying.
     15
     16Run with args: <N> <M> <min(X)> <max(X)> <step>.
     17
     18The output is: <X> <runtime(LineOnlyReceiver)> <runtime(LineReceiver)> <runtime(IntNStringReceiver)>
     19"""
     20
     21import time
     22import sys
     23import struct
     24
     25from twisted.protocols import basic
     26from twisted.internet import protocol
     27from twisted.test.test_protocols import StringIOWithoutClosing
     28
     29class NoopLineOnlyReceiver(basic.LineOnlyReceiver):
     30    MAX_LENGTH = 100000
     31    delimiter = '\n'
     32
     33    def lineReceived(self, _):
     34        pass
     35
     36class NoopLineReceiver(basic.LineReceiver):
     37    MAX_LENGTH = 100000
     38    delimiter = '\n'
     39
     40    def lineReceived(self, _):
     41        pass
     42
     43class NoopInt32Receiver(basic.Int32StringReceiver):
     44    MAX_LENGTH = 100000
     45
     46    def stringReceived(self, _):
     47        pass
     48
     49def run_lineonly_iteration(num_lines, pkts_per_line, packet_size):
     50    packet = 'a'*packet_size
     51    packet_with_delimiter = packet + '\n'
     52
     53    t = StringIOWithoutClosing()
     54    a = NoopLineOnlyReceiver()
     55    a.makeConnection(protocol.FileWrapper(t))
     56
     57    start = time.time()
     58    for _ in xrange(num_lines):
     59        for _ in xrange(pkts_per_line-1):
     60            a.dataReceived(packet)
     61        a.dataReceived(packet_with_delimiter)
     62    stop = time.time()
     63    return stop - start
     64
     65def run_line_iteration(num_lines, pkts_per_line, packet_size):
     66    packet = 'a'*packet_size
     67    packet_with_delimiter = packet + '\n'
     68
     69    t = StringIOWithoutClosing()
     70    a = NoopLineReceiver()
     71    a.makeConnection(protocol.FileWrapper(t))
     72
     73    start = time.time()
     74    for _ in xrange(num_lines):
     75        for _ in xrange(pkts_per_line-1):
     76            a.dataReceived(packet)
     77        a.dataReceived(packet_with_delimiter)
     78    stop = time.time()
     79    return stop - start
     80
     81def run_int32_iteration(num_lines, pkts_per_msg, packet_size):
     82    packet = 'a'*packet_size
     83    packet_with_prefix = struct.pack('!I', pkts_per_msg*packet_size) + packet
     84
     85    t = StringIOWithoutClosing()
     86    a = NoopInt32Receiver()
     87    a.makeConnection(protocol.FileWrapper(t))
     88
     89    start = time.time()
     90    for _ in xrange(num_lines):
     91        for c in packet_with_prefix:
     92            a.dataReceived(c)
     93        for _ in xrange(pkts_per_msg-1):
     94            a.dataReceived(packet)
     95    stop = time.time()
     96    return stop - start
     97
     98def run_over_range_of_num_pkts(num_lines, pkt_size, min, max, step):
     99    for num_pkts in xrange(min, max, step):
     100        t = run_lineonly_iteration(num_lines, num_pkts, pkt_size)
     101        t2 = run_line_iteration(num_lines, num_pkts, pkt_size)
     102        t3 = run_int32_iteration(num_lines, num_pkts, pkt_size)
     103        print "%d %f %f %f" % (num_pkts, t, t2, t3)
     104        sys.stdout.flush()
     105
     106def main():
     107    run_over_range_of_num_pkts(*[int(x) for x in sys.argv[1:]])
     108
     109if __name__ == '__main__':
     110    main()