| | 517 | |
| | 518 | def test_getEncodedFilename_unicode(self): |
| | 519 | """ |
| | 520 | When Unicode filenames are received from IFTPShell.list it will |
| | 521 | be encoded to UTF-8. |
| | 522 | """ |
| | 523 | unicodeFilename = u'my resum\xe9' |
| | 524 | |
| | 525 | encodedFilename = self.serverProtocol._getEncodedFilename( |
| | 526 | unicodeFilename) |
| | 527 | |
| | 528 | self.assertEqual( |
| | 529 | unicodeFilename.encode('utf-8'), encodedFilename) |
| | 530 | |
| | 531 | |
| | 532 | def test_getEncodedFilename_non_unicode(self): |
| | 533 | """ |
| | 534 | When non-Unicode filenames are received from IFTPShell.list it will |
| | 535 | pass the data without changing it together with raising a warning. |
| | 536 | """ |
| | 537 | alreadyEncodedFilename = u'my resum\xe9'.encode('utf-8') |
| | 538 | |
| | 539 | result = self.assertWarns( |
| | 540 | DeprecationWarning, |
| | 541 | "Non-Unicode date received in %s from IFTPShell.list. " |
| | 542 | "Data transmitted without encoding it. " |
| | 543 | "Please send Unicode." % ( |
| | 544 | qual(self.serverProtocol.__class__)), |
| | 545 | ftp.__file__, |
| | 546 | self.serverProtocol._getEncodedFilename, alreadyEncodedFilename) |
| | 547 | |
| | 548 | self.assertIdentical(alreadyEncodedFilename, result) |
| | 549 | |
| | 550 | |
| | 710 | |
| | 711 | def test_LISTUnicode(self): |
| | 712 | """ |
| | 713 | LIST will receive Unicode filenames from IFTPShell.list, and will |
| | 714 | encode them using UTF-8. |
| | 715 | """ |
| | 716 | d = self._anonymousLogin() |
| | 717 | |
| | 718 | def patchedFTPShellList(me, segments): |
| | 719 | """ |
| | 720 | Mock method that patches the IFTPShell.list. |
| | 721 | """ |
| | 722 | return defer.succeed([( |
| | 723 | u'my resum\xe9', (0, 1, 0777, 0, 0, 'user', 'group'))]) |
| | 724 | |
| | 725 | def patchFTPShellList(result): |
| | 726 | """ |
| | 727 | Patch the IFTPShell.list, once we got an instance. |
| | 728 | """ |
| | 729 | self.patch(self.serverProtocol.shell, 'list', patchedFTPShellList) |
| | 730 | return result |
| | 731 | d.addCallback(patchFTPShellList) |
| | 732 | |
| | 733 | self._download('LIST something', chainDeferred=d) |
| | 734 | |
| | 735 | def checkDownload(download): |
| | 736 | self.assertEqual( |
| | 737 | 'drwxrwxrwx 0 user group ' |
| | 738 | '0 Jan 01 1970 my resum\xc3\xa9\r\n', |
| | 739 | download) |
| | 740 | return d.addCallback(checkDownload) |
| | 741 | |
| | 742 | |
| | 827 | def test_NLSTUnicode(self): |
| | 828 | """ |
| | 829 | NLST will receive Unicode filenames from IFTPShell.list, and will |
| | 830 | encode them using UTF-8. |
| | 831 | """ |
| | 832 | d = self._anonymousLogin() |
| | 833 | |
| | 834 | def patchedFTPShellList(me): |
| | 835 | """ |
| | 836 | Mock method that patches the IFTPShell.list. |
| | 837 | """ |
| | 838 | return defer.succeed([(u'my resum\xe9', None)]) |
| | 839 | |
| | 840 | def patchFTPShellList(result): |
| | 841 | """ |
| | 842 | Patch the IFTPShell.list, once we got an instance. |
| | 843 | """ |
| | 844 | self.patch(self.serverProtocol.shell, 'list', patchedFTPShellList) |
| | 845 | return result |
| | 846 | d.addCallback(patchFTPShellList) |
| | 847 | |
| | 848 | self._download('NLST something', chainDeferred=d) |
| | 849 | |
| | 850 | def checkDownload(download): |
| | 851 | self.assertEqual('my resum\xc3\xa9\r\n', download) |
| | 852 | |
| | 853 | return d.addCallback(checkDownload) |
| | 854 | |
| | 855 | |
| | 1083 | class DTPTests(unittest.TestCase): |
| | 1084 | """ |
| | 1085 | Tests for L{ftp.DTP}. |
| | 1086 | |
| | 1087 | The DTP instances in these tests are generated using |
| | 1088 | DTPFactory.buildProtocol() |
| | 1089 | """ |
| | 1090 | |
| | 1091 | def setUp(self): |
| | 1092 | """ |
| | 1093 | Create a fake protocol interpreter, a L{ftp.DTPFactory} instance, |
| | 1094 | and dummy transport to help with tests. |
| | 1095 | """ |
| | 1096 | self.reactor = task.Clock() |
| | 1097 | |
| | 1098 | class ProtocolInterpreter(object): |
| | 1099 | dtpInstance = None |
| | 1100 | |
| | 1101 | self.protocolInterpreter = ProtocolInterpreter() |
| | 1102 | self.factory = ftp.DTPFactory( |
| | 1103 | self.protocolInterpreter, None, self.reactor) |
| | 1104 | self.transport = proto_helpers.StringTransportWithDisconnection() |
| | 1105 | |
| | 1106 | |
| | 1107 | def test_sendLine_newline(self): |
| | 1108 | """ |
| | 1109 | When sending a line, the newline delimiter will be automatically |
| | 1110 | added. |
| | 1111 | """ |
| | 1112 | dtpInstance = self.factory.buildProtocol(None) |
| | 1113 | dtpInstance.makeConnection(self.transport) |
| | 1114 | lineContent = 'line content' |
| | 1115 | |
| | 1116 | dtpInstance.sendLine(lineContent) |
| | 1117 | |
| | 1118 | dataSent = self.transport.value() |
| | 1119 | self.assertEqual(lineContent + '\r\n', dataSent) |
| | 1120 | |
| | 1121 | |
| | 1122 | def test_sendLine_unicode(self): |
| | 1123 | """ |
| | 1124 | When sending an unicode line, it will be converted to str and |
| | 1125 | a warning is raised. |
| | 1126 | """ |
| | 1127 | from twisted.trial import _synctest |
| | 1128 | dtpInstance = self.factory.buildProtocol(None) |
| | 1129 | dtpInstance.makeConnection(self.transport) |
| | 1130 | lineContent = u'my resum\xe9' |
| | 1131 | |
| | 1132 | self.assertWarns( |
| | 1133 | DeprecationWarning, |
| | 1134 | "Unicode date received in %s. " |
| | 1135 | "Encoded to UTF-8. Please send bytes." % ( |
| | 1136 | qual(dtpInstance.__class__)), |
| | 1137 | _synctest.__file__, |
| | 1138 | dtpInstance.sendLine, lineContent) |
| | 1139 | |
| | 1140 | dataSent = self.transport.value() |
| | 1141 | self.assertTrue(isinstance(dataSent, str)) |
| | 1142 | self.assertEqual(lineContent.encode('utf-8') + '\r\n', dataSent) |
| | 1143 | |
| | 1144 | |