Ticket #4181: 4181-nlst-globbing-2.diff

File 4181-nlst-globbing-2.diff, 6.3 KB (added by adiroiban, 23 months ago)
  • twisted/test/test_ftp.py

     
    825825                ["425 Can't open data connection."]) 
    826826        return d.addCallback(gotPortNum) 
    827827 
     828    def test_NLSTGlobbing(self): 
     829        """ 
     830        When Unix shell globbing is used with NLST only files matching 
     831        the pattern will be returned. 
     832        """ 
     833        self.dirPath.child('test.txt').touch() 
     834        self.dirPath.child('ceva.txt').touch() 
     835        self.dirPath.child('no.match').touch() 
     836        d = self._anonymousLogin() 
    828837 
     838        self._download('NLST *.txt', chainDeferred=d) 
    829839 
     840        def checkDownload(download): 
     841            filenames = download[:-2].split('\r\n') 
     842            filenames.sort() 
     843            self.assertEqual(['ceva.txt', 'test.txt'], filenames) 
     844 
     845        return d.addCallback(checkDownload) 
     846 
     847 
    830848class DTPFactoryTests(unittest.TestCase): 
    831849    """ 
    832850    Tests for L{ftp.DTPFactory}. 
     
    23302348            self.assertRaises(ftp.InvalidPath, ftp.toSegments, ['x'], inp) 
    23312349 
    23322350 
     2351 
     2352class IsGlobbingExpressionTests(unittest.TestCase): 
     2353    """ 
     2354    Tests for _isGlobbingExpression utility function. 
     2355    """ 
     2356 
     2357    def test_isGlobbingExpressionEmptySegments(self): 
     2358        """ 
     2359        _isGlobbingExpression will return False for None, or empty 
     2360        segments. 
     2361        """ 
     2362        self.assertFalse(ftp._isGlobbingExpression()) 
     2363        self.assertFalse(ftp._isGlobbingExpression([])) 
     2364        self.assertFalse(ftp._isGlobbingExpression(None)) 
     2365 
     2366 
     2367    def test_isGlobbingExpressionNoGlob(self): 
     2368        """ 
     2369        _isGlobbingExpression will return False for plain segments. 
     2370 
     2371        Also, it only checks the last segment part (filename) and will not 
     2372        check the path name. 
     2373        """ 
     2374        self.assertFalse(ftp._isGlobbingExpression(['ignore', 'expr'])) 
     2375        self.assertFalse(ftp._isGlobbingExpression(['*.txt', 'expr'])) 
     2376 
     2377 
     2378    def test_isGlobbingExpressionGlob(self): 
     2379        """ 
     2380        _isGlobbingExpression will return True for segments which contains 
     2381        globbing characters in the last segment part (filename). 
     2382        """ 
     2383        self.assertTrue(ftp._isGlobbingExpression(['ignore', '*.txt'])) 
     2384        self.assertTrue(ftp._isGlobbingExpression(['ignore', '[a-b].txt'])) 
     2385        self.assertTrue(ftp._isGlobbingExpression(['ignore', 'fil?.txt'])) 
     2386 
     2387 
     2388 
    23332389class BaseFTPRealmTests(unittest.TestCase): 
    23342390    """ 
    23352391    Tests for L{ftp.BaseFTPRealm}, a base class to help define L{IFTPShell} 
  • twisted/protocols/ftp.py

     
    224224        return defer.fail() 
    225225 
    226226 
     227def _isGlobbingExpression(segments=None): 
     228    """ 
     229    Helper for checking if a FTPShell `segments` contains a wildcard Unix 
     230    expression. 
    227231 
     232    Only filename globbing is supported. 
     233    This means that wildcards can only be presents in the last element of 
     234    `segments`. 
     235 
     236    @type  segments: C{list} 
     237    @param segments: List of path elements as used by the FTP server protocol. 
     238 
     239    @rtype: Boolean 
     240    @return: True if `segments` contains a globbing expression. 
     241    """ 
     242    if not segments: 
     243        return False 
     244 
     245    # To check that something is a glob expression, we convert it to 
     246    # Regular Expression. If the result is the same as the original expression 
     247    # then it contains no globbing expression. 
     248    globCandidate = segments[-1] 
     249    # A set of default regex rules is added to all strings. 
     250    emtpyTranslations = fnmatch.translate('') 
     251    globTranslations = fnmatch.translate(globCandidate) 
     252 
     253    if globCandidate + emtpyTranslations == globTranslations: 
     254        return False 
     255    else: 
     256        return True 
     257 
     258 
    228259class FTPCmdError(Exception): 
    229260    """ 
    230261    Generic exception for FTP commands. 
     
    962993        except InvalidPath: 
    963994            return defer.fail(FileNotFoundError(path)) 
    964995 
    965         def cbList(results): 
     996        def cbList(results, glob=None): 
    966997            """ 
    967998            Send, line by line, each file in the directory listing, and then 
    968999            close the connection. 
    9691000 
     1001            If `glob` is not None, the result will be filtered using 
     1002            Unix shell-style wildcards 
     1003            (http://docs.python.org/2/library/fnmatch.html). 
     1004 
    9701005            @type results: A C{list} of C{tuple}. The first element of each 
    9711006                C{tuple} is a C{str} and the second element is a C{list}. 
    9721007            @param results: The names of the files in the directory. 
     
    9771012            """ 
    9781013            self.reply(DATA_CNX_ALREADY_OPEN_START_XFR) 
    9791014            for (name, ignored) in results: 
    980                 self.dtpInstance.sendLine(name) 
    981             self.dtpInstance.transport.loseConnection() 
    982             return (TXFR_COMPLETE_OK,) 
    983  
    984         def cbGlob(results): 
    985             self.reply(DATA_CNX_ALREADY_OPEN_START_XFR) 
    986             for (name, ignored) in results: 
    987                 if fnmatch.fnmatch(name, segments[-1]): 
     1015                if not glob or (glob and fnmatch.fnmatch(name, glob)): 
    9881016                    self.dtpInstance.sendLine(name) 
    9891017            self.dtpInstance.transport.loseConnection() 
    9901018            return (TXFR_COMPLETE_OK,) 
     
    10061034            self.dtpInstance.transport.loseConnection() 
    10071035            return (TXFR_COMPLETE_OK,) 
    10081036 
    1009         # XXX This globbing may be incomplete: see #4181 
    1010         if segments and ( 
    1011             '*' in segments[-1] or '?' in segments[-1] or 
    1012             ('[' in segments[-1] and ']' in segments[-1])): 
    1013             d = self.shell.list(segments[:-1]) 
    1014             d.addCallback(cbGlob) 
     1037        if _isGlobbingExpression(segments): 
     1038            # Remove globbing expression from path 
     1039            # and keep to be used for filtering. 
     1040            glob = segments.pop() 
    10151041        else: 
    1016             d = self.shell.list(segments) 
    1017             d.addCallback(cbList) 
    1018             # self.shell.list will generate an error if the path is invalid 
    1019             d.addErrback(listErr) 
     1042            glob = None 
     1043 
     1044        d = self.shell.list(segments) 
     1045        d.addCallback(cbList, glob) 
     1046        # self.shell.list will generate an error if the path is invalid 
     1047        d.addErrback(listErr) 
    10201048        return d 
    10211049 
    10221050