Ticket #4181: 4181-nlst-globbing-1.diff

File 4181-nlst-globbing-1.diff, 5.5 KB (added by adiroiban, 2 years ago)
  • twisted/protocols/ftp.py

    diff --git a/twisted/protocols/ftp.py b/twisted/protocols/ftp.py
    index b035f04..997b8d3 100644
    a b def errnoToFailure(e, path): 
    220220        return defer.fail() 
    221221 
    222222 
     223def isGlobbingExpression(segments=None): 
     224    """ 
     225    Returns True if `expression` is a globbing expression. 
     226 
     227    It will try to translate the globabing expression in regex and 
     228    checks that the resulting regular expression is the same 
     229    as the original text. 
     230    """ 
     231    if not segments: 
     232        return False 
     233 
     234    globCandidate = segments[-1] 
     235    emtpyTranslations = fnmatch.translate('') 
     236    globTranslations = fnmatch.translate(globCandidate) 
     237    if globCandidate + emtpyTranslations == globTranslations: 
     238        return False 
     239    return True 
     240 
    223241 
    224242class FTPCmdError(Exception): 
    225243    """ 
    class FTP(object, basic.LineReceiver, policies.TimeoutMixin): 
    946964        except InvalidPath: 
    947965            return defer.fail(FileNotFoundError(path)) 
    948966 
    949         def cbList(results): 
     967        def cbList(results, glob=None): 
    950968            """ 
    951969            Send, line by line, each file in the directory listing, and then 
    952970            close the connection. 
    class FTP(object, basic.LineReceiver, policies.TimeoutMixin): 
    961979            """ 
    962980            self.reply(DATA_CNX_ALREADY_OPEN_START_XFR) 
    963981            for (name, ignored) in results: 
    964                 self.dtpInstance.sendLine(name) 
    965             self.dtpInstance.transport.loseConnection() 
    966             return (TXFR_COMPLETE_OK,) 
    967  
    968         def cbGlob(results): 
    969             self.reply(DATA_CNX_ALREADY_OPEN_START_XFR) 
    970             for (name, ignored) in results: 
    971                 if fnmatch.fnmatch(name, segments[-1]): 
     982                if not glob or (glob and fnmatch.fnmatch(name, glob)): 
    972983                    self.dtpInstance.sendLine(name) 
    973984            self.dtpInstance.transport.loseConnection() 
    974985            return (TXFR_COMPLETE_OK,) 
    975986 
     987 
    976988        def listErr(results): 
    977989            """ 
    978990            RFC 959 specifies that an NLST request may only return directory 
    class FTP(object, basic.LineReceiver, policies.TimeoutMixin): 
    9901002            self.dtpInstance.transport.loseConnection() 
    9911003            return (TXFR_COMPLETE_OK,) 
    9921004 
    993         # XXX This globbing may be incomplete: see #4181 
    994         if segments and ( 
    995             '*' in segments[-1] or '?' in segments[-1] or 
    996             ('[' in segments[-1] and ']' in segments[-1])): 
    997             d = self.shell.list(segments[:-1]) 
    998             d.addCallback(cbGlob) 
    999         else: 
    1000             d = self.shell.list(segments) 
    1001             d.addCallback(cbList) 
    1002             # self.shell.list will generate an error if the path is invalid 
    1003             d.addErrback(listErr) 
     1005        glob = None 
     1006        if isGlobbingExpression(segments): 
     1007            glob = segments.pop() 
     1008 
     1009        d = self.shell.list(segments) 
     1010        d.addCallback(cbList, glob) 
     1011        # self.shell.list will generate an error if the path is invalid 
     1012        d.addErrback(listErr) 
    10041013        return d 
    10051014 
    10061015 
  • twisted/test/test_ftp.py

    diff --git a/twisted/test/test_ftp.py b/twisted/test/test_ftp.py
    index 23ffcba..bcc2446 100644
    a b class FTPServerPortDataConnectionTestCase(FTPServerPasvDataConnectionTestCase): 
    776776                ["425 Can't open data connection."]) 
    777777        return d.addCallback(gotPortNum) 
    778778 
     779    def test_NLSTGlobbing(self): 
     780        """ 
     781        NLST can use Unix shell globbing for matching file patterns. 
     782        """ 
     783        self.dirPath.child('test.txt').touch() 
     784        self.dirPath.child('ceva.txt').touch() 
     785        d = self._anonymousLogin() 
     786 
     787        self._download('NLST *.txt', chainDeferred=d) 
     788 
     789        def checkDownload(download): 
     790            filenames = download[:-2].split('\r\n') 
     791            filenames.sort() 
     792            self.assertEqual(['ceva.txt', 'test.txt'], filenames) 
     793 
     794        return d.addCallback(checkDownload) 
    779795 
    780796 
    781797class DTPFactoryTests(unittest.TestCase): 
    class PathHandling(unittest.TestCase): 
    22392255            self.assertRaises(ftp.InvalidPath, ftp.toSegments, ['x'], inp) 
    22402256 
    22412257 
     2258 
     2259class IsGlobbingExpressionTests(unittest.TestCase): 
     2260    """ 
     2261    Tests for isGlobbingExpression utility function. 
     2262    """ 
     2263 
     2264    def test_isGlobbingExpressionEmptySegments(self): 
     2265        """ 
     2266        isGlobbingExpression will return False for None, or empty 
     2267        segments. 
     2268        """ 
     2269        self.assertFalse(ftp.isGlobbingExpression()) 
     2270        self.assertFalse(ftp.isGlobbingExpression([])) 
     2271        self.assertFalse(ftp.isGlobbingExpression(None)) 
     2272 
     2273 
     2274    def test_isGlobbingExpressionNoGlob(self): 
     2275        """ 
     2276        isGlobbingExpression will return False for plain segments. 
     2277 
     2278        Also, it only checks the last segment part (filename) and will not 
     2279        check the path name. 
     2280        """ 
     2281        self.assertFalse(ftp.isGlobbingExpression(['ignore', 'expression'])) 
     2282        self.assertFalse(ftp.isGlobbingExpression(['*.txt', 'expression'])) 
     2283 
     2284 
     2285    def test_isGlobbingExpressionGlob(self): 
     2286        """ 
     2287        isGlobbingExpression will return True for segments which contains 
     2288        globbing characters in the last segment part (filename). 
     2289        """ 
     2290        self.assertTrue(ftp.isGlobbingExpression(['ignore', '*.txt'])) 
     2291        self.assertTrue(ftp.isGlobbingExpression(['ignore', '[a-b].txt'])) 
     2292        self.assertTrue(ftp.isGlobbingExpression(['ignore', 'fil?.txt'])) 
     2293 
     2294 
     2295 
    22422296class BaseFTPRealmTests(unittest.TestCase): 
    22432297    """ 
    22442298    Tests for L{ftp.BaseFTPRealm}, a base class to help define L{IFTPShell}