Ticket #2611: faster-string-receivers-2611.patch
File faster-string-receivers-2611.patch, 19.1 KB (added by , 11 years ago) |
---|
-
doc/core/benchmarks/receivers_fragmentation.py
=== added file 'doc/core/benchmarks/receivers_fragmentation.py'
1 #!/usr/bin/env python 2 3 """ 4 Benchmark to test the performance of 5 basic.(LineOnlyReceiver|LineReceiver|IntNStringReceiver) when receiving 6 very fragmented packets. 7 8 This benchmark was created to verify that the patch reducing the time 9 complexity of these protocols from O(n^2) to O(n) worked correctly, and 10 to make sure that the non-pathological cases didn't result in any 11 performance degradation. 12 13 This benchmark makes each tested protocol receive N messages, each split 14 into X packets of M bytes, with X varying. 15 16 Run with args: <N> <M> <min(X)> <max(X)> <step>. 17 18 The output is: <X> <runtime(LineOnlyReceiver)> <runtime(LineReceiver)> <runtime(IntNStringReceiver)> 19 """ 20 21 import time 22 import sys 23 import struct 24 25 from twisted.protocols import basic 26 from twisted.internet import protocol 27 from twisted.test.test_protocols import StringIOWithoutClosing 28 29 class NoopLineOnlyReceiver(basic.LineOnlyReceiver): 30 MAX_LENGTH = 100000 31 delimiter = '\n' 32 33 def lineReceived(self, _): 34 pass 35 36 class NoopLineReceiver(basic.LineReceiver): 37 MAX_LENGTH = 100000 38 delimiter = '\n' 39 40 def lineReceived(self, _): 41 pass 42 43 class NoopInt32Receiver(basic.Int32StringReceiver): 44 MAX_LENGTH = 100000 45 46 def stringReceived(self, _): 47 pass 48 49 def run_lineonly_iteration(num_lines, pkts_per_line, packet_size): 50 packet = 'a'*packet_size 51 packet_with_delimiter = packet + '\n' 52 53 t = StringIOWithoutClosing() 54 a = NoopLineOnlyReceiver() 55 a.makeConnection(protocol.FileWrapper(t)) 56 57 start = time.time() 58 for _ in xrange(num_lines): 59 for _ in xrange(pkts_per_line-1): 60 a.dataReceived(packet) 61 a.dataReceived(packet_with_delimiter) 62 stop = time.time() 63 return stop - start 64 65 def run_line_iteration(num_lines, pkts_per_line, packet_size): 66 packet = 'a'*packet_size 67 packet_with_delimiter = packet + '\n' 68 69 t = StringIOWithoutClosing() 70 a = NoopLineReceiver() 71 a.makeConnection(protocol.FileWrapper(t)) 72 73 start = time.time() 74 for _ in xrange(num_lines): 75 for _ in xrange(pkts_per_line-1): 76 a.dataReceived(packet) 77 a.dataReceived(packet_with_delimiter) 78 stop = time.time() 79 return stop - start 80 81 def run_int32_iteration(num_lines, pkts_per_msg, packet_size): 82 packet = 'a'*packet_size 83 packet_with_prefix = struct.pack('!I', pkts_per_msg*packet_size) + packet 84 85 t = StringIOWithoutClosing() 86 a = NoopInt32Receiver() 87 a.makeConnection(protocol.FileWrapper(t)) 88 89 start = time.time() 90 for _ in xrange(num_lines): 91 for c in packet_with_prefix: 92 a.dataReceived(c) 93 for _ in xrange(pkts_per_msg-1): 94 a.dataReceived(packet) 95 stop = time.time() 96 return stop - start 97 98 def run_over_range_of_num_pkts(num_lines, pkt_size, min, max, step): 99 for num_pkts in xrange(min, max, step): 100 t = run_lineonly_iteration(num_lines, num_pkts, pkt_size) 101 t2 = run_line_iteration(num_lines, num_pkts, pkt_size) 102 t3 = run_int32_iteration(num_lines, num_pkts, pkt_size) 103 print "%d %f %f %f" % (num_pkts, t, t2, t3) 104 sys.stdout.flush() 105 106 def main(): 107 run_over_range_of_num_pkts(*[int(x) for x in sys.argv[1:]]) 108 109 if __name__ == '__main__': 110 main() -
twisted/protocols/basic.py
=== modified file 'twisted/protocols/basic.py'
428 428 429 429 @cvar delimiter: The line-ending delimiter to use. By default this is 430 430 '\\r\\n'. 431 @type delimiter: C{str} 432 431 433 @cvar MAX_LENGTH: The maximum length of a line to allow (If a 432 434 sent line is longer than this, the connection is dropped). 433 435 Default is 16384. 436 @type MAX_LENGTH: C{int} 437 438 @ivar _buffer: A list of data pieces received and buffered, waiting 439 for a delimiter. 440 @type _buffer: C{list} of C{str} 441 442 @ivar _buffer_size: The amount of data stored in L{_buffer}. 443 @type _buffer_size: C{int} 434 444 """ 435 _buffer = '' 445 _buffer = None 446 _buffer_size = 0 436 447 delimiter = '\r\n' 437 448 MAX_LENGTH = 16384 438 449 … … 440 451 """ 441 452 Translates bytes into lines, and calls lineReceived. 442 453 """ 443 lines = (self._buffer+data).split(self.delimiter) 444 self._buffer = lines.pop(-1) 454 if self._buffer is None: 455 self._buffer = [] 456 lines = data.split(self.delimiter) 457 trailing = lines.pop() 458 445 459 for line in lines: 446 460 if self.transport.disconnecting: 447 461 # this is necessary because the transport may be told to lose … … 449 463 # important to disregard all the lines in that packet following 450 464 # the one that told it to close. 451 465 return 466 self._buffer.append(line) 467 line = ''.join(self._buffer) 468 self._buffer, self._buffer_size = [], 0 469 452 470 if len(line) > self.MAX_LENGTH: 453 471 return self.lineLengthExceeded(line) 454 472 else: 455 473 self.lineReceived(line) 456 if len(self._buffer) > self.MAX_LENGTH: 457 return self.lineLengthExceeded(self._buffer) 474 475 self._buffer.append(trailing) 476 self._buffer_size += len(trailing) 477 if self._buffer_size > self.MAX_LENGTH: 478 return self.lineLengthExceeded(''.join(self._buffer)) 458 479 459 480 460 481 def lineReceived(self, line): … … 519 540 520 541 @cvar delimiter: The line-ending delimiter to use. By default this is 521 542 '\\r\\n'. 543 @type delimiter: C{str} 544 522 545 @cvar MAX_LENGTH: The maximum length of a line to allow (If a 523 546 sent line is longer than this, the connection is dropped). 524 547 Default is 16384. 548 @type MAX_LENGTH: C{int} 549 550 @ivar _buffer: A list of data pieces received and buffered. Either 551 an incomplete line, or raw data when the protocol is 552 paused. 553 @type _buffer: C{list} of C{str} 554 555 @ivar _buffer_size: The amount of data stored in L{_buffer}. 556 @type _buffer_size: C{int} 557 558 @ivar _was_paused: True if the protocol was paused the last time 559 L{dataReceived} was called. This is needed to 560 detect unpausing and trigger processing of 561 buffered data. 562 @type _was_paused: C{bool} 525 563 """ 526 564 line_mode = 1 527 __buffer = '' 565 _buffer = None 566 _buffer_size = 0 567 _was_paused = False 528 568 delimiter = '\r\n' 529 569 MAX_LENGTH = 16384 530 570 571 def _appendToLineBuffer(self, data): 572 self._buffer.append(data) 573 self._buffer_size += len(data) 574 531 575 def clearLineBuffer(self): 532 576 """ 533 577 Clear buffered data. … … 535 579 @return: All of the cleared buffered data. 536 580 @rtype: C{str} 537 581 """ 538 b = self._ _buffer539 self._ _buffer = ""540 return b541 582 b = self._buffer 583 self._buffer = [] 584 self._buffer_size = 0 585 return b # XXX this probably isn't a string. 542 586 543 587 def dataReceived(self, data): 544 588 """ … … 546 590 Translates bytes into lines, and calls lineReceived (or 547 591 rawDataReceived, depending on mode.) 548 592 """ 549 self.__buffer = self.__buffer+data 593 if self._buffer is None: 594 self._buffer = [] 595 596 if self.paused: 597 self._was_paused = True 598 self._appendToLineBuffer(data) 599 return 600 elif self._was_paused: 601 # This is the call of dataReceived that follows an 602 # unpausing. The buffer has been accumulating data without 603 # processing it, so we need to send it all back through the 604 # pipes. 605 self._was_paused = False 606 self._buffer.append(data) 607 buf = self._buffer 608 self.clearLineBuffer() 609 for chunk in buf: 610 why = self.dataReceived(chunk) 611 if why or self.transport and self.transport.disconnecting: 612 return why 613 return 614 self._was_paused = False 615 550 616 while self.line_mode and not self.paused: 551 617 try: 552 line, self.__buffer = self.__buffer.split(self.delimiter, 1)618 line, data = data.split(self.delimiter, 1) 553 619 except ValueError: 554 if len(self.__buffer) > self.MAX_LENGTH: 555 line, self.__buffer = self.__buffer, '' 620 self._appendToLineBuffer(data) 621 if self._buffer_size > self.MAX_LENGTH: 622 line = ''.join(self._buffer) 623 self.clearLineBuffer() 556 624 return self.lineLengthExceeded(line) 557 625 break 558 626 else: 559 linelength = len(line)560 if linelength > self.MAX_LENGTH:561 exceeded = line + self.__buffer562 self.__buffer = ''563 return self.lineLengthExceeded( exceeded)627 self._buffer.append(line) 628 line = ''.join(self._buffer) 629 self.clearLineBuffer() 630 if len(line) > self.MAX_LENGTH: 631 return self.lineLengthExceeded(line) 564 632 why = self.lineReceived(line) 565 633 if why or self.transport and self.transport.disconnecting: 566 634 return why 635 elif self.paused: 636 self._was_paused = True 637 self._appendToLineBuffer(data) 567 638 else: 568 639 if not self.paused: 569 data=self.__buffer 570 self.__buffer='' 640 self._buffer.append(data) 641 data = ''.join(self._buffer) 642 self.clearLineBuffer() 571 643 if data: 572 644 return self.rawDataReceived(data) 573 645 … … 622 694 @param line: The line to send, not including the delimiter. 623 695 @type line: C{str} 624 696 """ 625 return self.transport.write (line + self.delimiter)697 return self.transport.writeSequence((line, self.delimiter)) 626 698 627 699 628 700 def lineLengthExceeded(self, line): … … 651 723 """ 652 724 Generic class for length prefixed protocols. 653 725 654 @ivar recvd: buffer holding received data when splitted.655 @type recvd: C{str}656 657 726 @ivar structFormat: format used for struct packing/unpacking. Define it in 658 727 subclass. 659 728 @type structFormat: C{str} … … 661 730 @ivar prefixLength: length of the prefix, in bytes. Define it in subclass, 662 731 using C{struct.calcsize(structFormat)} 663 732 @type prefixLength: C{int} 733 734 @ivar _buffer: A list of data pieces received and buffered. 735 @type _buffer: C{list} of C{str} 736 737 @ivar _buffer_size: The amount of data stored in L{_buffer}. 738 @type _buffer_size: C{int} 739 740 @ivar _packet_length: The total length of the packet currently being 741 buffered up, including the prefix. The length 742 will be None until a full prefix can be 743 decoded. 744 @type _packet_length: C{int} or C{None} 745 746 @ivar recvd: A string representation of C{_buffer}. This is a 747 deprecated attribute, and will fire a 748 DeprecationWarning when accessed. Be very careful if 749 you write to this attribute, no sanity checking is 750 performed on the contents being injected into the 751 buffer. 752 @type recvd: C{str} 664 753 """ 665 754 MAX_LENGTH = 99999 666 recvd = "" 755 756 _buffer = None 757 _buffer_size = 0 758 _packet_length = None 759 760 def _appendToBuffer(self, data): 761 self._buffer.append(data) 762 self._buffer_size += len(data) 763 764 765 def _clearBuffer(self): 766 """Clear buffered data.""" 767 self._buffer, self._buffer_size = [], 0 768 667 769 668 770 def stringReceived(self, string): 669 771 """ … … 688 790 self.transport.loseConnection() 689 791 690 792 691 def dataReceived(self, recd):793 def dataReceived(self, data): 692 794 """ 693 795 Convert int prefixed strings into calls to stringReceived. 694 796 """ 695 self.recvd = self.recvd + recd 696 while len(self.recvd) >= self.prefixLength and not self.paused: 697 length ,= struct.unpack( 698 self.structFormat, self.recvd[:self.prefixLength]) 699 if length > self.MAX_LENGTH: 700 self.lengthLimitExceeded(length) 701 return 702 if len(self.recvd) < length + self.prefixLength: 703 break 704 packet = self.recvd[self.prefixLength:length + self.prefixLength] 705 self.recvd = self.recvd[length + self.prefixLength:] 706 self.stringReceived(packet) 797 if self._buffer is None: 798 self._buffer = [] 799 self._appendToBuffer(data) 800 801 while not self.paused: 802 if self._packet_length is None: 803 if self._buffer_size < self.prefixLength: 804 return 805 data = ''.join(self._buffer) 806 self._packet_length ,= struct.unpack( 807 self.structFormat, data[:self.prefixLength]) 808 if self._packet_length > self.MAX_LENGTH: 809 self._clearBuffer() 810 return self.lengthLimitExceeded(self._packet_length) 811 self._packet_length += self.prefixLength 812 self._buffer = [data] 813 814 if self._packet_length is not None: 815 if self._buffer_size < self._packet_length: 816 return 817 data = ''.join(self._buffer) 818 packet = data[self.prefixLength:self._packet_length] 819 self._clearBuffer() 820 self._appendToBuffer(data[self._packet_length:]) 821 self._packet_length = None 822 self.stringReceived(packet) 823 707 824 708 825 709 826 def sendString(self, string): … … 717 834 if len(string) >= 2 ** (8 * self.prefixLength): 718 835 raise StringTooLongError( 719 836 "Try to send %s bytes whereas maximum is %s" % ( 720 len(string), 2 ** (8 * self.prefixLength))) 721 self.transport.write( 722 struct.pack(self.structFormat, len(string)) + string) 837 len(data), 2 ** (8 * self.prefixLength))) 838 self.transport.writeSequence( 839 (struct.pack(self.structFormat, len(data)), data)) 840 841 def _getRecvd(self): 842 warnings.warn( 843 "IntNStringReceiver.recvd deprecated since Twisted 8.3", 844 DeprecationWarning, 2) 845 if not self._buffer: # None or empty 846 return '' 847 elif len(self._buffer) > 1: 848 self._buffer = ''.join(self._buffer) 849 return self._buffer[0] 850 851 def _setRecvd(self, data): 852 warnings.warn("The recvd attribute of IntNStringReceiver is deprecated", 853 DeprecationWarning) 854 self._buffer = [data] 855 self._buffer_size = len(data) 856 857 recvd = property(_getRecvd, _setRecvd) 723 858 724 859 725 860 -
twisted/test/test_protocols.py
=== modified file 'twisted/test/test_protocols.py'
195 195 'len 20', 'foo 123', '0123456789\n012345678', 196 196 'len 0', 'foo 5', '', '67890', 'len 1', 'a'] 197 197 198 def _iotest(self, delimiter): 199 data = delimiter.join(self.output) + delimiter 200 for packetSize in range(1, 11): 201 transport = proto_helpers.StringTransport() 202 protocol = LineTester() 203 protocol.delimiter = delimiter 204 protocol.makeConnection(transport) 205 for i in range(len(data) / packetSize + 1): 206 bytes = self.buffer[i * packetSize:(i + 1) * packetSize] 207 protocol.dataReceived(bytes) 208 self.assertEqual(protocol.received, self.output) 209 210 198 211 def testBuffer(self): 199 212 """ 200 213 Test buffering for different packet size, checking received matches … … 210 223 self.assertEqual(self.output, a.received) 211 224 212 225 226 def test_multibyteDelimiter(self): 227 """ 228 L{LineReceiver.delimiter} may be multiple bytes. 229 """ 230 self._iotest('\1\2\3') 231 232 213 233 pause_buf = 'twiddle1\ntwiddle2\npause\ntwiddle3\n' 214 234 215 235 pause_output1 = ['twiddle1', 'twiddle2', 'pause'] … … 750 770 self.assertEqual(r.received, []) 751 771 752 772 773 def _verifyRecvdDeprecation(self, warnings): 774 self.assertEqual(warnings[0]['category'], DeprecationWarning) 775 self.assertEqual( 776 warnings[0]['message'], 777 "IntNStringReceiver.recvd deprecated since Twisted 8.3") 778 self.assertEqual(len(warnings), 1) 779 780 781 def test_recvdInStringReceived(self): 782 """ 783 During a call to C{stringReceived}, the protocol's C{recvd} attribute 784 is a C{str} containing any bytes already received for the next string. 785 """ 786 buffered = [] 787 protocol = self.getProtocol() 788 def invasiveStringReceived(string): 789 buffered.append(protocol.recvd) 790 protocol.stringReceived = invasiveStringReceived 791 protocol.dataReceived( 792 struct.pack(protocol.structFormat, 1) + 'x' + 'abc') 793 self.assertEqual(buffered, ['abc']) 794 warnings = self.flushWarnings( 795 offendingFunctions=[invasiveStringReceived]) 796 self._verifyRecvdDeprecation(warnings) 797 798 799 def test_recvdBeforeDataReceived(self): 800 """ 801 Before any calls to C{dataReceived}, the protocol's C{recvd} attribute 802 is C{""}. 803 """ 804 protocol = self.getProtocol() 805 self.assertEqual(protocol.recvd, "") 806 warnings = self.flushWarnings( 807 offendingFunctions=[self.test_recvdBeforeDataReceived]) 808 self._verifyRecvdDeprecation(warnings) 809 810 811 def test_recvdSetInStringReceived(self): 812 """ 813 If C{recvd} is changed during a call to the protocol's 814 C{stringReceived}, the previous value is forgotten and the new value is 815 parsed instead. 816 """ 817 received = [] 818 protocol = self.getProtocol() 819 def invasiveStringReceived(string): 820 received.append(string) 821 protocol.recvd = nextValues.pop(0) + 'garbage' 822 protocol.stringReceived = invasiveStringReceived 823 nextValues = [ 824 struct.pack(protocol.structFormat, 1) + '1', 825 struct.pack(protocol.structFormat, 3) + 'abc', 826 struct.pack(protocol.structFormat, 5) + 'hello', 827 ''] 828 protocol.dataReceived(nextValues.pop(0) + 'garbage') 829 self.assertEqual(received, ['1', 'abc', 'hello']) 830 warnings = self.flushWarnings() 831 self.assertEqual(warnings[0]['category'], DeprecationWarning) 832 self.assertEqual( 833 warnings[0]['message'], 834 "Setting IntNStringReceiver.recvd deprecated since Twisted 8.3") 835 753 836 754 837 class TestInt32(TestMixin, basic.Int32StringReceiver): 755 838 """