Ticket #4181: 4181-nlst-globbing-2.diff

File 4181-nlst-globbing-2.diff, 6.3 KB (added by adiroiban, 3 years ago)
  • twisted/test/test_ftp.py

     
    825825                ["425 Can't open data connection."])
    826826        return d.addCallback(gotPortNum)
    827827
     828    def test_NLSTGlobbing(self):
     829        """
     830        When Unix shell globbing is used with NLST only files matching
     831        the pattern will be returned.
     832        """
     833        self.dirPath.child('test.txt').touch()
     834        self.dirPath.child('ceva.txt').touch()
     835        self.dirPath.child('no.match').touch()
     836        d = self._anonymousLogin()
    828837
     838        self._download('NLST *.txt', chainDeferred=d)
    829839
     840        def checkDownload(download):
     841            filenames = download[:-2].split('\r\n')
     842            filenames.sort()
     843            self.assertEqual(['ceva.txt', 'test.txt'], filenames)
     844
     845        return d.addCallback(checkDownload)
     846
     847
    830848class DTPFactoryTests(unittest.TestCase):
    831849    """
    832850    Tests for L{ftp.DTPFactory}.
     
    23302348            self.assertRaises(ftp.InvalidPath, ftp.toSegments, ['x'], inp)
    23312349
    23322350
     2351
     2352class IsGlobbingExpressionTests(unittest.TestCase):
     2353    """
     2354    Tests for _isGlobbingExpression utility function.
     2355    """
     2356
     2357    def test_isGlobbingExpressionEmptySegments(self):
     2358        """
     2359        _isGlobbingExpression will return False for None, or empty
     2360        segments.
     2361        """
     2362        self.assertFalse(ftp._isGlobbingExpression())
     2363        self.assertFalse(ftp._isGlobbingExpression([]))
     2364        self.assertFalse(ftp._isGlobbingExpression(None))
     2365
     2366
     2367    def test_isGlobbingExpressionNoGlob(self):
     2368        """
     2369        _isGlobbingExpression will return False for plain segments.
     2370
     2371        Also, it only checks the last segment part (filename) and will not
     2372        check the path name.
     2373        """
     2374        self.assertFalse(ftp._isGlobbingExpression(['ignore', 'expr']))
     2375        self.assertFalse(ftp._isGlobbingExpression(['*.txt', 'expr']))
     2376
     2377
     2378    def test_isGlobbingExpressionGlob(self):
     2379        """
     2380        _isGlobbingExpression will return True for segments which contains
     2381        globbing characters in the last segment part (filename).
     2382        """
     2383        self.assertTrue(ftp._isGlobbingExpression(['ignore', '*.txt']))
     2384        self.assertTrue(ftp._isGlobbingExpression(['ignore', '[a-b].txt']))
     2385        self.assertTrue(ftp._isGlobbingExpression(['ignore', 'fil?.txt']))
     2386
     2387
     2388
    23332389class BaseFTPRealmTests(unittest.TestCase):
    23342390    """
    23352391    Tests for L{ftp.BaseFTPRealm}, a base class to help define L{IFTPShell}
  • twisted/protocols/ftp.py

     
    224224        return defer.fail()
    225225
    226226
     227def _isGlobbingExpression(segments=None):
     228    """
     229    Helper for checking if a FTPShell `segments` contains a wildcard Unix
     230    expression.
    227231
     232    Only filename globbing is supported.
     233    This means that wildcards can only be presents in the last element of
     234    `segments`.
     235
     236    @type  segments: C{list}
     237    @param segments: List of path elements as used by the FTP server protocol.
     238
     239    @rtype: Boolean
     240    @return: True if `segments` contains a globbing expression.
     241    """
     242    if not segments:
     243        return False
     244
     245    # To check that something is a glob expression, we convert it to
     246    # Regular Expression. If the result is the same as the original expression
     247    # then it contains no globbing expression.
     248    globCandidate = segments[-1]
     249    # A set of default regex rules is added to all strings.
     250    emtpyTranslations = fnmatch.translate('')
     251    globTranslations = fnmatch.translate(globCandidate)
     252
     253    if globCandidate + emtpyTranslations == globTranslations:
     254        return False
     255    else:
     256        return True
     257
     258
    228259class FTPCmdError(Exception):
    229260    """
    230261    Generic exception for FTP commands.
     
    962993        except InvalidPath:
    963994            return defer.fail(FileNotFoundError(path))
    964995
    965         def cbList(results):
     996        def cbList(results, glob=None):
    966997            """
    967998            Send, line by line, each file in the directory listing, and then
    968999            close the connection.
    9691000
     1001            If `glob` is not None, the result will be filtered using
     1002            Unix shell-style wildcards
     1003            (http://docs.python.org/2/library/fnmatch.html).
     1004
    9701005            @type results: A C{list} of C{tuple}. The first element of each
    9711006                C{tuple} is a C{str} and the second element is a C{list}.
    9721007            @param results: The names of the files in the directory.
     
    9771012            """
    9781013            self.reply(DATA_CNX_ALREADY_OPEN_START_XFR)
    9791014            for (name, ignored) in results:
    980                 self.dtpInstance.sendLine(name)
    981             self.dtpInstance.transport.loseConnection()
    982             return (TXFR_COMPLETE_OK,)
    983 
    984         def cbGlob(results):
    985             self.reply(DATA_CNX_ALREADY_OPEN_START_XFR)
    986             for (name, ignored) in results:
    987                 if fnmatch.fnmatch(name, segments[-1]):
     1015                if not glob or (glob and fnmatch.fnmatch(name, glob)):
    9881016                    self.dtpInstance.sendLine(name)
    9891017            self.dtpInstance.transport.loseConnection()
    9901018            return (TXFR_COMPLETE_OK,)
     
    10061034            self.dtpInstance.transport.loseConnection()
    10071035            return (TXFR_COMPLETE_OK,)
    10081036
    1009         # XXX This globbing may be incomplete: see #4181
    1010         if segments and (
    1011             '*' in segments[-1] or '?' in segments[-1] or
    1012             ('[' in segments[-1] and ']' in segments[-1])):
    1013             d = self.shell.list(segments[:-1])
    1014             d.addCallback(cbGlob)
     1037        if _isGlobbingExpression(segments):
     1038            # Remove globbing expression from path
     1039            # and keep to be used for filtering.
     1040            glob = segments.pop()
    10151041        else:
    1016             d = self.shell.list(segments)
    1017             d.addCallback(cbList)
    1018             # self.shell.list will generate an error if the path is invalid
    1019             d.addErrback(listErr)
     1042            glob = None
     1043
     1044        d = self.shell.list(segments)
     1045        d.addCallback(cbList, glob)
     1046        # self.shell.list will generate an error if the path is invalid
     1047        d.addErrback(listErr)
    10201048        return d
    10211049
    10221050