Ticket #2611: faster-string-receivers-2611.2.patch

File faster-string-receivers-2611.2.patch, 19.0 KB (added by indigo, 10 years ago)

fixed a few errors introduced by merge conflicts

  • doc/core/benchmarks/receivers_fragmentation.py

    === added file 'doc/core/benchmarks/receivers_fragmentation.py'
     
     1#!/usr/bin/env python
     2
     3"""
     4Benchmark to test the performance of
     5basic.(LineOnlyReceiver|LineReceiver|IntNStringReceiver) when receiving
     6very fragmented packets.
     7
     8This benchmark was created to verify that the patch reducing the time
     9complexity of these protocols from O(n^2) to O(n) worked correctly, and
     10to make sure that the non-pathological cases didn't result in any
     11performance degradation.
     12
     13This benchmark makes each tested protocol receive N messages, each split
     14into X packets of M bytes, with X varying.
     15
     16Run with args: <N> <M> <min(X)> <max(X)> <step>.
     17
     18The output is: <X> <runtime(LineOnlyReceiver)> <runtime(LineReceiver)> <runtime(IntNStringReceiver)>
     19"""
     20
     21import time
     22import sys
     23import struct
     24
     25from twisted.protocols import basic
     26from twisted.internet import protocol
     27from twisted.test.test_protocols import StringIOWithoutClosing
     28
     29class NoopLineOnlyReceiver(basic.LineOnlyReceiver):
     30    MAX_LENGTH = 100000
     31    delimiter = '\n'
     32
     33    def lineReceived(self, _):
     34        pass
     35
     36class NoopLineReceiver(basic.LineReceiver):
     37    MAX_LENGTH = 100000
     38    delimiter = '\n'
     39
     40    def lineReceived(self, _):
     41        pass
     42
     43class NoopInt32Receiver(basic.Int32StringReceiver):
     44    MAX_LENGTH = 100000
     45
     46    def stringReceived(self, _):
     47        pass
     48
     49def run_lineonly_iteration(num_lines, pkts_per_line, packet_size):
     50    packet = 'a'*packet_size
     51    packet_with_delimiter = packet + '\n'
     52
     53    t = StringIOWithoutClosing()
     54    a = NoopLineOnlyReceiver()
     55    a.makeConnection(protocol.FileWrapper(t))
     56
     57    start = time.time()
     58    for _ in xrange(num_lines):
     59        for _ in xrange(pkts_per_line-1):
     60            a.dataReceived(packet)
     61        a.dataReceived(packet_with_delimiter)
     62    stop = time.time()
     63    return stop - start
     64
     65def run_line_iteration(num_lines, pkts_per_line, packet_size):
     66    packet = 'a'*packet_size
     67    packet_with_delimiter = packet + '\n'
     68
     69    t = StringIOWithoutClosing()
     70    a = NoopLineReceiver()
     71    a.makeConnection(protocol.FileWrapper(t))
     72
     73    start = time.time()
     74    for _ in xrange(num_lines):
     75        for _ in xrange(pkts_per_line-1):
     76            a.dataReceived(packet)
     77        a.dataReceived(packet_with_delimiter)
     78    stop = time.time()
     79    return stop - start
     80
     81def run_int32_iteration(num_lines, pkts_per_msg, packet_size):
     82    packet = 'a'*packet_size
     83    packet_with_prefix = struct.pack('!I', pkts_per_msg*packet_size) + packet
     84
     85    t = StringIOWithoutClosing()
     86    a = NoopInt32Receiver()
     87    a.makeConnection(protocol.FileWrapper(t))
     88
     89    start = time.time()
     90    for _ in xrange(num_lines):
     91        for c in packet_with_prefix:
     92            a.dataReceived(c)
     93        for _ in xrange(pkts_per_msg-1):
     94            a.dataReceived(packet)
     95    stop = time.time()
     96    return stop - start
     97
     98def run_over_range_of_num_pkts(num_lines, pkt_size, min, max, step):
     99    for num_pkts in xrange(min, max, step):
     100        t = run_lineonly_iteration(num_lines, num_pkts, pkt_size)
     101        t2 = run_line_iteration(num_lines, num_pkts, pkt_size)
     102        t3 = run_int32_iteration(num_lines, num_pkts, pkt_size)
     103        print "%d %f %f %f" % (num_pkts, t, t2, t3)
     104        sys.stdout.flush()
     105
     106def main():
     107    run_over_range_of_num_pkts(*[int(x) for x in sys.argv[1:]])
     108
     109if __name__ == '__main__':
     110    main()
  • twisted/protocols/basic.py

    === modified file 'twisted/protocols/basic.py'
     
    428428
    429429    @cvar delimiter: The line-ending delimiter to use. By default this is
    430430                     '\\r\\n'.
     431    @type delimiter: C{str}
     432
    431433    @cvar MAX_LENGTH: The maximum length of a line to allow (If a
    432434                      sent line is longer than this, the connection is dropped).
    433435                      Default is 16384.
     436    @type MAX_LENGTH: C{int}
     437
     438    @ivar _buffer: A list of data pieces received and buffered, waiting
     439                   for a delimiter.
     440    @type _buffer: C{list} of C{str}
     441
     442    @ivar _buffer_size: The amount of data stored in L{_buffer}.
     443    @type _buffer_size: C{int}
    434444    """
    435     _buffer = ''
     445    _buffer = None
     446    _buffer_size = 0
    436447    delimiter = '\r\n'
    437448    MAX_LENGTH = 16384
    438449
     
    440451        """
    441452        Translates bytes into lines, and calls lineReceived.
    442453        """
    443         lines  = (self._buffer+data).split(self.delimiter)
    444         self._buffer = lines.pop(-1)
     454        if self._buffer is None:
     455            self._buffer = []
     456        lines = data.split(self.delimiter)
     457        trailing = lines.pop()
     458
    445459        for line in lines:
    446460            if self.transport.disconnecting:
    447461                # this is necessary because the transport may be told to lose
     
    449463                # important to disregard all the lines in that packet following
    450464                # the one that told it to close.
    451465                return
     466            self._buffer.append(line)
     467            line = ''.join(self._buffer)
     468            self._buffer, self._buffer_size = [], 0
     469
    452470            if len(line) > self.MAX_LENGTH:
    453471                return self.lineLengthExceeded(line)
    454472            else:
    455473                self.lineReceived(line)
    456         if len(self._buffer) > self.MAX_LENGTH:
    457             return self.lineLengthExceeded(self._buffer)
     474
     475        self._buffer.append(trailing)
     476        self._buffer_size += len(trailing)
     477        if self._buffer_size > self.MAX_LENGTH:
     478            return self.lineLengthExceeded(''.join(self._buffer))
    458479
    459480
    460481    def lineReceived(self, line):
     
    519540
    520541    @cvar delimiter: The line-ending delimiter to use. By default this is
    521542                     '\\r\\n'.
     543    @type delimiter: C{str}
     544
    522545    @cvar MAX_LENGTH: The maximum length of a line to allow (If a
    523546                      sent line is longer than this, the connection is dropped).
    524547                      Default is 16384.
     548    @type MAX_LENGTH: C{int}
     549
     550    @ivar _buffer: A list of data pieces received and buffered. Either
     551                   an incomplete line, or raw data when the protocol is
     552                   paused.
     553    @type _buffer: C{list} of C{str}
     554
     555    @ivar _buffer_size: The amount of data stored in L{_buffer}.
     556    @type _buffer_size: C{int}
     557
     558    @ivar _was_paused: True if the protocol was paused the last time
     559                       L{dataReceived} was called. This is needed to
     560                       detect unpausing and trigger processing of
     561                       buffered data.
     562    @type _was_paused: C{bool}
    525563    """
    526564    line_mode = 1
    527     __buffer = ''
     565    _buffer = None
     566    _buffer_size = 0
     567    _was_paused = False
    528568    delimiter = '\r\n'
    529569    MAX_LENGTH = 16384
    530570
     571    def _appendToLineBuffer(self, data):
     572        self._buffer.append(data)
     573        self._buffer_size += len(data)
     574
    531575    def clearLineBuffer(self):
    532576        """
    533577        Clear buffered data.
     
    535579        @return: All of the cleared buffered data.
    536580        @rtype: C{str}
    537581        """
    538         b = self.__buffer
    539         self.__buffer = ""
    540         return b
    541 
     582        b = self._buffer
     583        self._buffer = []
     584        self._buffer_size = 0
     585        return b        # XXX this probably isn't a string.
    542586
    543587    def dataReceived(self, data):
    544588        """
     
    546590        Translates bytes into lines, and calls lineReceived (or
    547591        rawDataReceived, depending on mode.)
    548592        """
    549         self.__buffer = self.__buffer+data
     593        if self._buffer is None:
     594            self._buffer = []
     595
     596        if self.paused:
     597            self._was_paused = True
     598            self._appendToLineBuffer(data)
     599            return
     600        elif self._was_paused:
     601            # This is the call of dataReceived that follows an
     602            # unpausing. The buffer has been accumulating data without
     603            # processing it, so we need to send it all back through the
     604            # pipes.
     605            self._was_paused = False
     606            self._buffer.append(data)
     607            buf = self._buffer
     608            self.clearLineBuffer()
     609            for chunk in buf:
     610                why = self.dataReceived(chunk)
     611                if why or self.transport and self.transport.disconnecting:
     612                    return why
     613            return
     614        self._was_paused = False
     615
    550616        while self.line_mode and not self.paused:
    551617            try:
    552                 line, self.__buffer = self.__buffer.split(self.delimiter, 1)
     618                line, data = data.split(self.delimiter, 1)
    553619            except ValueError:
    554                 if len(self.__buffer) > self.MAX_LENGTH:
    555                     line, self.__buffer = self.__buffer, ''
     620                self._appendToLineBuffer(data)
     621                if self._buffer_size > self.MAX_LENGTH:
     622                    line = ''.join(self._buffer)
     623                    self.clearLineBuffer()
    556624                    return self.lineLengthExceeded(line)
    557625                break
    558626            else:
    559                 linelength = len(line)
    560                 if linelength > self.MAX_LENGTH:
    561                     exceeded = line + self.__buffer
    562                     self.__buffer = ''
    563                     return self.lineLengthExceeded(exceeded)
     627                self._buffer.append(line)
     628                line = ''.join(self._buffer)
     629                self.clearLineBuffer()
     630                if len(line) > self.MAX_LENGTH:
     631                    return self.lineLengthExceeded(line)
    564632                why = self.lineReceived(line)
    565633                if why or self.transport and self.transport.disconnecting:
    566634                    return why
     635                elif self.paused:
     636                    self._was_paused = True
     637                    self._appendToLineBuffer(data)
    567638        else:
    568639            if not self.paused:
    569                 data=self.__buffer
    570                 self.__buffer=''
     640                self._buffer.append(data)
     641                data = ''.join(self._buffer)
     642                self.clearLineBuffer()
    571643                if data:
    572644                    return self.rawDataReceived(data)
    573645
     
    622694        @param line: The line to send, not including the delimiter.
    623695        @type line: C{str}
    624696        """
    625         return self.transport.write(line + self.delimiter)
     697        return self.transport.writeSequence((line, self.delimiter))
    626698
    627699
    628700    def lineLengthExceeded(self, line):
     
    651723    """
    652724    Generic class for length prefixed protocols.
    653725
    654     @ivar recvd: buffer holding received data when splitted.
    655     @type recvd: C{str}
    656 
    657726    @ivar structFormat: format used for struct packing/unpacking. Define it in
    658727        subclass.
    659728    @type structFormat: C{str}
     
    661730    @ivar prefixLength: length of the prefix, in bytes. Define it in subclass,
    662731        using C{struct.calcsize(structFormat)}
    663732    @type prefixLength: C{int}
     733
     734    @ivar _buffer: A list of data pieces received and buffered.
     735    @type _buffer: C{list} of C{str}
     736
     737    @ivar _buffer_size: The amount of data stored in L{_buffer}.
     738    @type _buffer_size: C{int}
     739
     740    @ivar _packet_length: The total length of the packet currently being
     741                          buffered up, including the prefix. The length
     742                          will be None until a full prefix can be
     743                          decoded.
     744    @type _packet_length: C{int} or C{None}
     745
     746    @ivar recvd: A string representation of C{_buffer}. This is a
     747                 deprecated attribute, and will fire a
     748                 DeprecationWarning when accessed. Be very careful if
     749                 you write to this attribute, no sanity checking is
     750                 performed on the contents being injected into the
     751                 buffer.
     752    @type recvd: C{str}
    664753    """
    665754    MAX_LENGTH = 99999
    666     recvd = ""
     755
     756    _buffer = None
     757    _buffer_size = 0
     758    _packet_length = None
     759
     760    def _appendToBuffer(self, data):
     761        self._buffer.append(data)
     762        self._buffer_size += len(data)
     763
     764
     765    def _clearBuffer(self):
     766        """Clear buffered data."""
     767        self._buffer, self._buffer_size = [], 0
     768
    667769
    668770    def stringReceived(self, string):
    669771        """
     
    688790        self.transport.loseConnection()
    689791
    690792
    691     def dataReceived(self, recd):
     793    def dataReceived(self, data):
    692794        """
    693795        Convert int prefixed strings into calls to stringReceived.
    694796        """
    695         self.recvd = self.recvd + recd
    696         while len(self.recvd) >= self.prefixLength and not self.paused:
    697             length ,= struct.unpack(
    698                 self.structFormat, self.recvd[:self.prefixLength])
    699             if length > self.MAX_LENGTH:
    700                 self.lengthLimitExceeded(length)
    701                 return
    702             if len(self.recvd) < length + self.prefixLength:
    703                 break
    704             packet = self.recvd[self.prefixLength:length + self.prefixLength]
    705             self.recvd = self.recvd[length + self.prefixLength:]
    706             self.stringReceived(packet)
     797        if self._buffer is None:
     798            self._buffer = []
     799        self._appendToBuffer(data)
     800
     801        while not self.paused:
     802            if self._packet_length is None:
     803                if self._buffer_size < self.prefixLength:
     804                    return
     805                data = ''.join(self._buffer)
     806                self._packet_length ,= struct.unpack(
     807                    self.structFormat, data[:self.prefixLength])
     808                if self._packet_length > self.MAX_LENGTH:
     809                    self._clearBuffer()
     810                    return self.lengthLimitExceeded(self._packet_length)
     811                self._packet_length += self.prefixLength
     812                self._buffer = [data]
     813
     814            if self._packet_length is not None:
     815                if self._buffer_size < self._packet_length:
     816                    return
     817                data = ''.join(self._buffer)
     818                packet = data[self.prefixLength:self._packet_length]
     819                self._clearBuffer()
     820                self._appendToBuffer(data[self._packet_length:])
     821                self._packet_length = None
     822                self.stringReceived(packet)
     823
    707824
    708825
    709826    def sendString(self, string):
     
    718835            raise StringTooLongError(
    719836                "Try to send %s bytes whereas maximum is %s" % (
    720837                len(string), 2 ** (8 * self.prefixLength)))
    721         self.transport.write(
    722             struct.pack(self.structFormat, len(string)) + string)
     838        self.transport.writeSequence(
     839            (struct.pack(self.structFormat, len(string)), string))
     840
     841    def _getRecvd(self):
     842        warnings.warn(
     843            "IntNStringReceiver.recvd deprecated since Twisted 8.3",
     844            DeprecationWarning, 2)
     845        if not self._buffer: # None or empty
     846            return ''
     847        elif len(self._buffer) > 1:
     848            self._buffer = ''.join(self._buffer)
     849        return self._buffer[0]
     850
     851    def _setRecvd(self, data):
     852        warnings.warn("The recvd attribute of IntNStringReceiver is deprecated",
     853                      DeprecationWarning)
     854        self._buffer = [data]
     855        self._buffer_size = len(data)
     856
     857    recvd = property(_getRecvd, _setRecvd)
    723858
    724859
    725860
  • twisted/test/test_protocols.py

    === modified file 'twisted/test/test_protocols.py'
     
    195195              'len 20', 'foo 123', '0123456789\n012345678',
    196196              'len 0', 'foo 5', '', '67890', 'len 1', 'a']
    197197
     198    def _iotest(self, delimiter):
     199        data = delimiter.join(self.output) + delimiter
     200        for packetSize in range(1, 11):
     201            transport = proto_helpers.StringTransport()
     202            protocol = LineTester()
     203            protocol.delimiter = delimiter
     204            protocol.makeConnection(transport)
     205            for i in range(len(data) / packetSize + 1):
     206                bytes = self.buffer[i * packetSize:(i + 1) * packetSize]
     207                protocol.dataReceived(bytes)
     208            self.assertEqual(protocol.received, self.output)
     209
     210
    198211    def testBuffer(self):
    199212        """
    200213        Test buffering for different packet size, checking received matches
     
    210223            self.assertEqual(self.output, a.received)
    211224
    212225
     226    def test_multibyteDelimiter(self):
     227        """
     228        L{LineReceiver.delimiter} may be multiple bytes.
     229        """
     230        self._iotest('\1\2\3')
     231
     232
    213233    pause_buf = 'twiddle1\ntwiddle2\npause\ntwiddle3\n'
    214234
    215235    pause_output1 = ['twiddle1', 'twiddle2', 'pause']
     
    750770        self.assertEqual(r.received, [])
    751771
    752772
     773    def _verifyRecvdDeprecation(self, warnings):
     774        self.assertEqual(warnings[0]['category'], DeprecationWarning)
     775        self.assertEqual(
     776            warnings[0]['message'],
     777            "IntNStringReceiver.recvd deprecated since Twisted 8.3")
     778        self.assertEqual(len(warnings), 1)
     779
     780
     781    def test_recvdInStringReceived(self):
     782        """
     783        During a call to C{stringReceived}, the protocol's C{recvd} attribute
     784        is a C{str} containing any bytes already received for the next string.
     785        """
     786        buffered = []
     787        protocol = self.getProtocol()
     788        def invasiveStringReceived(string):
     789            buffered.append(protocol.recvd)
     790        protocol.stringReceived = invasiveStringReceived
     791        protocol.dataReceived(
     792            struct.pack(protocol.structFormat, 1) + 'x' + 'abc')
     793        self.assertEqual(buffered, ['abc'])
     794        warnings = self.flushWarnings(
     795            offendingFunctions=[invasiveStringReceived])
     796        self._verifyRecvdDeprecation(warnings)
     797
     798
     799    def test_recvdBeforeDataReceived(self):
     800        """
     801        Before any calls to C{dataReceived}, the protocol's C{recvd} attribute
     802        is C{""}.
     803        """
     804        protocol = self.getProtocol()
     805        self.assertEqual(protocol.recvd, "")
     806        warnings = self.flushWarnings(
     807            offendingFunctions=[self.test_recvdBeforeDataReceived])
     808        self._verifyRecvdDeprecation(warnings)
     809
     810
     811    def test_recvdSetInStringReceived(self):
     812        """
     813        If C{recvd} is changed during a call to the protocol's
     814        C{stringReceived}, the previous value is forgotten and the new value is
     815        parsed instead.
     816        """
     817        received = []
     818        protocol = self.getProtocol()
     819        def invasiveStringReceived(string):
     820            received.append(string)
     821            protocol.recvd = nextValues.pop(0) + 'garbage'
     822        protocol.stringReceived = invasiveStringReceived
     823        nextValues = [
     824            struct.pack(protocol.structFormat, 1) + '1',
     825            struct.pack(protocol.structFormat, 3) + 'abc',
     826            struct.pack(protocol.structFormat, 5) + 'hello',
     827            '']
     828        protocol.dataReceived(nextValues.pop(0) + 'garbage')
     829        self.assertEqual(received, ['1', 'abc', 'hello'])
     830        warnings = self.flushWarnings()
     831        self.assertEqual(warnings[0]['category'], DeprecationWarning)
     832        self.assertEqual(
     833            warnings[0]['message'],
     834            "Setting IntNStringReceiver.recvd deprecated since Twisted 8.3")
     835
    753836
    754837class TestInt32(TestMixin, basic.Int32StringReceiver):
    755838    """