Changeset 27082

Show
Ignore:
Timestamp:
07/02/2009 12:00:19 PM (14 months ago)
Author:
z3p
Message:

merging forward

Location:
branches/spwd-3242-3/twisted/conch
Files:
2 modified

Legend:

Unmodified
Added
Removed
  • branches/spwd-3242-3/twisted/conch/checkers.py

    r26534 r27082  
    1010try: 
    1111    import pwd 
     12    pwd 
    1213except ImportError: 
    1314    pwd = None 
     
    1617 
    1718try: 
    18     # get this from http://www.twistedmatrix.com/users/z3p/files/pyshadow-0.2.tar.gz 
    19     import shadow 
    20 except: 
    21     shadow = None 
     19    # Python 2.5 got spwd to interface with shadow passwords 
     20    import spwd as shadow 
     21    shadow 
     22except ImportError: 
     23    try: 
     24        # get this from http://www.twistedmatrix.com/users/z3p/files/pyshadow-0.2.tar.gz 
     25        import shadow 
     26        shadow 
     27    except ImportError: 
     28        shadow = None 
    2229 
    2330try: 
    2431    from twisted.cred import pamauth 
     32    pamauth 
    2533except ImportError: 
    2634    pamauth = None 
     
    3745from twisted.python.util import runAsEffectiveUser 
    3846 
     47 
     48 
    3949def verifyCryptedPassword(crypted, pw): 
    40     if crypted[0] == '$': # md5_crypt encrypted 
    41         salt = '$1$' + crypted.split('$')[2] 
    42     else: 
    43         salt = crypted[:2] 
    44     return crypt.crypt(pw, salt) == crypted 
     50    return crypt.crypt(pw, crypted) == crypted 
     51 
     52 
     53 
     54def getpwnamPasswd(username): 
     55    """ 
     56    Look up a user in the /etc/passwd database using the pwd module.  If the 
     57    pwd module is not available, return None. 
     58 
     59    @param username: the username of the user to return the passwd database 
     60    information for. 
     61    """ 
     62    if not pwd: 
     63        return None 
     64    return pwd.getpwnam(username) 
     65 
     66 
     67 
     68def getpwnamShadow(username): 
     69    """ 
     70    Look up a user in the /etc/shadow database using the spwd or shadow 
     71    modules.  If neither module is available, return None. 
     72 
     73    @param username: the username of the user to return the shadow database 
     74    information for. 
     75    """ 
     76    if not shadow: 
     77        return None 
     78    return runAsEffectiveUser(0, 0, shadow.getspnam, username) 
     79 
     80 
    4581 
    4682class UNIXPasswordDatabase: 
     83    """ 
     84    A checker which validates users out of the UNIX password databases, or a 
     85    databases a compatible format. 
     86 
     87    @ivar getpwnamFunctions: a C{tuple} of functions which are called in order 
     88    to valid a user.  The default value is (getpwnamPasswd, getpwnamShadow), 
     89    which tries the /etc/passwd database first, followed by the /etc/shadow 
     90    database. 
     91    """ 
    4792    credentialInterfaces = IUsernamePassword, 
    4893    implements(ICredentialsChecker) 
    4994 
     95 
     96    def __init__(self, getpwnamFunctions=(getpwnamPasswd, getpwnamShadow)): 
     97        self.getpwnamFunctions = getpwnamFunctions 
     98 
     99 
    50100    def requestAvatarId(self, credentials): 
    51         if pwd: 
     101        for func in self.getpwnamFunctions: 
    52102            try: 
    53                 cryptedPass = pwd.getpwnam(credentials.username)[1] 
     103                pwnam = func(credentials.username) 
    54104            except KeyError: 
    55105                return defer.fail(UnauthorizedLogin("invalid username")) 
    56106            else: 
    57                 if cryptedPass not in ['*', 'x'] and \ 
    58                     verifyCryptedPassword(cryptedPass, credentials.password): 
    59                     return defer.succeed(credentials.username) 
    60         if shadow: 
    61             gid = os.getegid() 
    62             uid = os.geteuid() 
    63             os.setegid(0) 
    64             os.seteuid(0) 
    65             try: 
    66                 shadowPass = shadow.getspnam(credentials.username)[1] 
    67             except KeyError: 
    68                 os.setegid(gid) 
    69                 os.seteuid(uid) 
    70                 return defer.fail(UnauthorizedLogin("invalid username")) 
    71             os.setegid(gid) 
    72             os.seteuid(uid) 
    73             if verifyCryptedPassword(shadowPass, credentials.password): 
    74                 return defer.succeed(credentials.username) 
    75             return defer.fail(UnauthorizedLogin("invalid password")) 
    76  
     107                if pwnam is not None: 
     108                    cryptedPass = pwnam[1] 
     109                    if verifyCryptedPassword(cryptedPass, 
     110                                             credentials.password): 
     111                        return defer.succeed(credentials.username) 
     112        # fallback 
    77113        return defer.fail(UnauthorizedLogin("unable to verify password")) 
     114 
    78115 
    79116 
  • branches/spwd-3242-3/twisted/conch/test/test_checkers.py

    r27079 r27082  
    55Tests for L{twisted.conch.checkers}. 
    66""" 
    7  
    8 try: 
    9     import pwd 
    10 except ImportError: 
    11     pwd = None 
    12  
    137import os, base64 
    148 
     
    2317try: 
    2418    import Crypto.Cipher.DES3 
     19    Crypto 
    2520    import pyasn1 
    2621except ImportError: 
    27     SSHPublicKeyDatabase = None 
     22    Crypto = None 
    2823else: 
    2924    from twisted.conch.ssh import keys 
    30     from twisted.conch.checkers import SSHPublicKeyDatabase, SSHProtocolChecker 
     25    from twisted.conch import checkers 
    3126    from twisted.conch.error import NotEnoughAuthentication, ValidPublicKey 
    3227    from twisted.conch.test import keydata 
     
    3833    """ 
    3934 
    40     if pwd is None: 
     35    if checkers.pwd is None: 
    4136        skip = "Cannot run without pwd module" 
    42     elif SSHPublicKeyDatabase is None: 
     37    elif Crypto is None: 
    4338        skip = "Cannot run without PyCrypto or PyASN1" 
    4439 
    4540    def setUp(self): 
    46         self.checker = SSHPublicKeyDatabase() 
     41        self.checker = checkers.SSHPublicKeyDatabase() 
    4742        self.sshDir = FilePath(self.mktemp()) 
    4843        self.sshDir.makedirs() 
     
    5550        self.mockos.path = self.sshDir.path 
    5651        self.patch(os.path, "expanduser", self.mockos.expanduser) 
    57         self.patch(pwd, "getpwnam", self.mockos.getpwnam) 
     52        self.patch(checkers.pwd, "getpwnam", self.mockos.getpwnam) 
    5853        self.patch(os, "seteuid", self.mockos.seteuid) 
    5954        self.patch(os, "setegid", self.mockos.setegid) 
     
    122117            return True 
    123118        self.patch(self.checker, 'checkKey', _checkKey) 
    124         credentials = SSHPrivateKey('test', 'ssh-rsa', keydata.publicRSA_openssh, 
    125                                     'foo', keys.Key.fromString(keydata.privateRSA_openssh).sign('foo')) 
     119        credentials = SSHPrivateKey('test', 'ssh-rsa', 
     120                                    keydata.publicRSA_openssh, 'foo', 
     121                                    keys.Key.fromString( 
     122                keydata.privateRSA_openssh).sign('foo')) 
    126123        d = self.checker.requestAvatarId(credentials) 
    127124        def _verify(avatarId): 
     
    140137            return True 
    141138        self.patch(self.checker, 'checkKey', _checkKey) 
    142         credentials = SSHPrivateKey('test', 'ssh-rsa', keydata.publicRSA_openssh, None, None) 
     139        credentials = SSHPrivateKey('test', 'ssh-rsa', 
     140                                    keydata.publicRSA_openssh, None, None) 
    143141        d = self.checker.requestAvatarId(credentials) 
    144142        return self.assertFailure(d, ValidPublicKey) 
     
    166164            return True 
    167165        self.patch(self.checker, 'checkKey', _checkKey) 
    168         credentials = SSHPrivateKey('test', 'ssh-rsa', keydata.publicRSA_openssh, 
    169                                     'foo', keys.Key.fromString(keydata.privateDSA_openssh).sign('foo')) 
     166        credentials = SSHPrivateKey('test', 'ssh-rsa', 
     167                                    keydata.publicRSA_openssh, 'foo', 
     168                                    keys.Key.fromString( 
     169                keydata.privateDSA_openssh).sign('foo')) 
    170170        d = self.checker.requestAvatarId(credentials) 
    171171        return self.assertFailure(d, UnauthorizedLogin) 
     
    190190 
    191191 
     192 
    192193class SSHProtocolCheckerTestCase(TestCase): 
    193194    """ 
     
    195196    """ 
    196197 
    197     if SSHPublicKeyDatabase is None: 
     198    if Crypto is None: 
    198199        skip = "Cannot run without PyCrypto" 
    199200 
     
    203204        the list of registered checkers. 
    204205        """ 
    205         checker = SSHProtocolChecker() 
     206        checker = checkers.SSHProtocolChecker() 
    206207        self.assertEquals(checker.credentialInterfaces, []) 
    207         checker.registerChecker(SSHPublicKeyDatabase(), ) 
     208        checker.registerChecker(checkers.SSHPublicKeyDatabase(), ) 
    208209        self.assertEquals(checker.credentialInterfaces, [ISSHPrivateKey]) 
    209210        self.assertIsInstance(checker.checkers[ISSHPrivateKey], 
    210                               SSHPublicKeyDatabase) 
     211                              checkers.SSHPublicKeyDatabase) 
    211212 
    212213 
     
    218219        credentialIntefaces. 
    219220        """ 
    220         checker = SSHProtocolChecker() 
     221        checker = checkers.SSHProtocolChecker() 
    221222        self.assertEquals(checker.credentialInterfaces, []) 
    222         checker.registerChecker(SSHPublicKeyDatabase(), IUsernamePassword) 
     223        checker.registerChecker(checkers.SSHPublicKeyDatabase(), 
     224                                IUsernamePassword) 
    223225        self.assertEquals(checker.credentialInterfaces, [IUsernamePassword]) 
    224226        self.assertIsInstance(checker.checkers[IUsernamePassword], 
    225                               SSHPublicKeyDatabase) 
     227                              checkers.SSHPublicKeyDatabase) 
    226228 
    227229 
     
    231233        registered checkers to authenticate a user. 
    232234        """ 
    233         checker = SSHProtocolChecker() 
     235        checker = checkers.SSHProtocolChecker() 
    234236        passwordDatabase = InMemoryUsernamePasswordDatabaseDontUse() 
    235237        passwordDatabase.addUser('test', 'test') 
     
    247249        L{NotEnoughAuthentication}. 
    248250        """ 
    249         checker = SSHProtocolChecker() 
     251        checker = checkers.SSHProtocolChecker() 
    250252        def _areDone(avatarId): 
    251253            return False 
     
    264266        L{SSHProtocolChecker} should raise L{UnhandledCredentials}. 
    265267        """ 
    266         checker = SSHProtocolChecker() 
     268        checker = checkers.SSHProtocolChecker() 
    267269        d = checker.requestAvatarId(UsernamePassword('test', 'test')) 
    268270        return self.assertFailure(d, UnhandledCredentials) 
     
    273275        The default L{SSHProcotolChecker.areDone} should simply return True. 
    274276        """ 
    275         self.assertEquals(SSHProtocolChecker().areDone(None), True) 
     277        self.assertEquals(checkers.SSHProtocolChecker().areDone(None), True) 
     278 
     279 
     280 
     281class HelperFunctionTests(TestCase): 
     282    """ 
     283    Tests for helper functions L{verifyCryptedPassword}, L{getpwnamPasswd} and 
     284    L{getpwnamShadow}. 
     285    """ 
     286    if checkers.pwd is None: 
     287        skip = 'cannot run without crypt module' 
     288    elif not getattr(os, 'O_NOCTTY', None): 
     289        skip = 'cannot run without os.O_NOCTTY' 
     290 
     291 
     292    def setUp(self): 
     293        self.mockos = MockOS() 
     294 
     295 
     296    def test_verifyCryptedPassword(self): 
     297        """ 
     298        L{verifyCryptedPassword} returns True if the provided cleartext 
     299        password matches the provided password hash. 
     300        """ 
     301        password = 'password' 
     302        salt = 'sa' 
     303        encrypted = checkers.crypt.crypt(password, salt) 
     304        self.assertTrue(checkers.verifyCryptedPassword( 
     305                encrypted, 
     306                password), '%r not valid encrypted password for %s' % ( 
     307                encrypted, password)) 
     308 
     309 
     310    def test_verifyCryptedPassword_md5(self): 
     311        """ 
     312        L{verifyCryptedPassword} returns True if the provided cleartext 
     313        password matches the provided MD5 password hash. 
     314        """ 
     315        password = 'password' 
     316        salt = '$1$salt' 
     317        encrypted = checkers.crypt.crypt(password, salt) 
     318        self.assertTrue(checkers.verifyCryptedPassword( 
     319                encrypted, 
     320                password), '%r not valid encrypted password for %s' % ( 
     321                encrypted, password)) 
     322 
     323 
     324    def test_getpwnamPasswd(self): 
     325        """ 
     326        L{getpwnamPasswd} returns a tuple of items from the UNIX /etc/passwd 
     327        database if the L{pwd} module is present.  If that module isn't 
     328        present, it returns None. 
     329        """ 
     330        if checkers.pwd is None: 
     331            self.assertIdentical(checkers.getpwnamPasswd('user'), None) 
     332        else: 
     333            self.patch(checkers.pwd, "getpwnam", self.mockos.getpwnam) 
     334            user_line = checkers.getpwnamPasswd('user') 
     335            self.assertEquals(user_line, (0, 0, 1, 2)) # what MockOS.getpwnam 
     336                                                       # returns 
     337 
     338 
     339    def test_getpwnamShadow(self): 
     340        """ 
     341        L{getpwnamShadow} returns a tuple of items from the UNIX /etc/shadow 
     342        database if the L{spwd} or L{shadow} modules are present.  If the 
     343        modules are not present, it returns None. 
     344        """ 
     345        if checkers.shadow is None: 
     346            self.assertIdentical(checkers.getpwnamShadow('user'), None) 
     347        else: 
     348            self.patch(checkers.shadow, 'getspnam', lambda u: (0, 1, 2, 3)) 
     349            self.mockos.euid = 2345 
     350            self.mockos.egid = 1234 
     351            self.patch(os, "geteuid", self.mockos.geteuid) 
     352            self.patch(os, "getegid", self.mockos.getegid) 
     353            self.patch(os, "seteuid", self.mockos.seteuid) 
     354            self.patch(os, "setegid", self.mockos.setegid) 
     355            user_line = checkers.getpwnamShadow('user') 
     356            self.assertEquals(user_line, (0, 1, 2, 3)) 
     357            self.assertEquals(self.mockos.seteuidCalls, [0, 2345]) 
     358            self.assertEquals(self.mockos.setegidCalls, [0, 1234]) 
     359 
     360 
     361 
     362class UNIXPasswordDatabaseTests(TestCase): 
     363    """ 
     364    Tests for L{UNIXPasswordDatabase}. 
     365    """ 
     366    def test_defaultCheckers(self): 
     367        """ 
     368        L{UNIXPasswordDatabase} with no arguments has L{getpwnamPasswd} and 
     369        L{getpwnamShadow} as the getpwnamFunctions. 
     370        """ 
     371        checker = checkers.UNIXPasswordDatabase() 
     372        self.assertEquals(checker.getpwnamFunctions, 
     373                          (checkers.getpwnamPasswd, 
     374                           checkers.getpwnamShadow)) 
     375 
     376 
     377    def assertLoggedIn(self, d): 
     378        """ 
     379        Assert that the L{Deferred} passed in is called back with the value 
     380        'username'.  This represents a valid login for this TestCase. 
     381 
     382        NOTE: To work, this method's return value must be returned from the 
     383        test method, or otherwise hooked up to the test machinery. 
     384 
     385        @param d: a L{Deferred} from an L{IChecker.requestAvatarId} method. 
     386        @type d: L{Deferred} 
     387        @rtype: L{Deferred} 
     388        """ 
     389        def _cbRequestAvatarId(username): 
     390            self.assertEquals(username, 'username') 
     391 
     392        return d.addCallback(_cbRequestAvatarId) 
     393 
     394 
     395    def assertUnauthorizedLogin(self, d): 
     396        """ 
     397        Asserts that the L{Deferred} passed in is erred back with an 
     398        L{UnauthorizedLogin} L{Failure}.  This reprsents an invalid login for 
     399        this TestCase. 
     400 
     401        NOTE: To work, this method's return value must be returned from the 
     402        test method, or otherwise hooked up to the test machinery. 
     403 
     404        @param d: a L{Deferred} from an L{IChecker.requestAvatarId} method. 
     405        @type d: L{Deferred} 
     406        @rtype: L{Deferred} 
     407        """ 
     408        def _ebRequestAvatarId(reason): 
     409            reason.trap(checkers.UnauthorizedLogin) 
     410        return d.addErrback(_ebRequestAvatarId) 
     411 
     412 
     413    def test_passInCheckers(self): 
     414        """ 
     415        L{UNIXPasswordDatabase} takes a list of functions to check for UNIX 
     416        user information. 
     417        """ 
     418        def getpwnam(username): pass 
     419        checker = checkers.UNIXPasswordDatabase([getpwnam]) 
     420        self.assertEquals(checker.getpwnamFunctions, 
     421                          [getpwnam]) 
     422 
     423 
     424    def test_verifyPassword(self): 
     425        """ 
     426        If the encrypted password provided by the getpwnam function is valid 
     427        (verified by the L{verifyCryptedPassword} function), we callback the 
     428        C{requestAvatarId} L{Deferred} with the username. 
     429        """ 
     430        def verifyCryptedPassword(crypted, pw): 
     431            return crypted == pw 
     432        def getpwnam(username): 
     433            return [username, username] 
     434        self.patch(checkers, 'verifyCryptedPassword', verifyCryptedPassword) 
     435        checker = checkers.UNIXPasswordDatabase([getpwnam]) 
     436        credential = UsernamePassword('username', 'username') 
     437        d = checker.requestAvatarId(credential) 
     438        return self.assertLoggedIn(d) 
     439 
     440 
     441    def test_failOnKeyError(self): 
     442        """ 
     443        If the getpwnam function raises a KeyError, the login fails with an 
     444        L{UnauthorizedLogin} exception. 
     445        """ 
     446        def getpwnam(username): 
     447            raise KeyError(username) 
     448        checker = checkers.UNIXPasswordDatabase([getpwnam]) 
     449        credential = UsernamePassword('username', 'username') 
     450        d = checker.requestAvatarId(credential) 
     451        return self.assertUnauthorizedLogin(d) 
     452 
     453 
     454    def test_failOnBadPassword(self): 
     455        """ 
     456        If the verifyCryptedPassword function doesn't verify the password, the 
     457        login fails with an L{UnauthorizedLogin} exception. 
     458        """ 
     459        def verifyCryptedPassword(crypted, pw): 
     460            return False 
     461        def getpwnam(username): 
     462            return [username, username] 
     463        self.patch(checkers, 'verifyCryptedPassword', verifyCryptedPassword) 
     464        checker = checkers.UNIXPasswordDatabase([getpwnam]) 
     465        credential = UsernamePassword('username', 'username') 
     466        d = checker.requestAvatarId(credential) 
     467        return self.assertUnauthorizedLogin(d) 
     468 
     469 
     470    def test_loopThroughFunctions(self): 
     471        """ 
     472        UNIXPasswordDatabase.requestAvatarId loops through each getpwnam 
     473        function associated with it and returns a L{Deferred} which fires with 
     474        the result of the first one which returns a value other than None. 
     475        ones do not verify the password. 
     476        """ 
     477        def verifyCryptedPassword(crypted, pw): 
     478            return crypted == pw 
     479        def getpwnam1(username): 
     480            return [username, 'not the password'] 
     481        def getpwnam2(username): 
     482            return [username, username] 
     483        self.patch(checkers, 'verifyCryptedPassword', verifyCryptedPassword) 
     484        checker = checkers.UNIXPasswordDatabase([getpwnam1, getpwnam2]) 
     485        credential = UsernamePassword('username', 'username') 
     486        d = checker.requestAvatarId(credential) 
     487        return self.assertLoggedIn(d)