Changeset 27082
- Timestamp:
- 07/02/2009 12:00:19 PM (14 months ago)
- Location:
- branches/spwd-3242-3/twisted/conch
- Files:
-
- 2 modified
-
checkers.py (modified) (3 diffs)
-
test/test_checkers.py (modified) (15 diffs)
Legend:
- Unmodified
- Added
- Removed
-
branches/spwd-3242-3/twisted/conch/checkers.py
r26534 r27082 10 10 try: 11 11 import pwd 12 pwd 12 13 except ImportError: 13 14 pwd = None … … 16 17 17 18 try: 18 # get this from http://www.twistedmatrix.com/users/z3p/files/pyshadow-0.2.tar.gz 19 import shadow 20 except: 21 shadow = None 19 # Python 2.5 got spwd to interface with shadow passwords 20 import spwd as shadow 21 shadow 22 except ImportError: 23 try: 24 # get this from http://www.twistedmatrix.com/users/z3p/files/pyshadow-0.2.tar.gz 25 import shadow 26 shadow 27 except ImportError: 28 shadow = None 22 29 23 30 try: 24 31 from twisted.cred import pamauth 32 pamauth 25 33 except ImportError: 26 34 pamauth = None … … 37 45 from twisted.python.util import runAsEffectiveUser 38 46 47 48 39 49 def verifyCryptedPassword(crypted, pw): 40 if crypted[0] == '$': # md5_crypt encrypted 41 salt = '$1$' + crypted.split('$')[2] 42 else: 43 salt = crypted[:2] 44 return crypt.crypt(pw, salt) == crypted 50 return crypt.crypt(pw, crypted) == crypted 51 52 53 54 def getpwnamPasswd(username): 55 """ 56 Look up a user in the /etc/passwd database using the pwd module. If the 57 pwd module is not available, return None. 58 59 @param username: the username of the user to return the passwd database 60 information for. 61 """ 62 if not pwd: 63 return None 64 return pwd.getpwnam(username) 65 66 67 68 def getpwnamShadow(username): 69 """ 70 Look up a user in the /etc/shadow database using the spwd or shadow 71 modules. If neither module is available, return None. 72 73 @param username: the username of the user to return the shadow database 74 information for. 75 """ 76 if not shadow: 77 return None 78 return runAsEffectiveUser(0, 0, shadow.getspnam, username) 79 80 45 81 46 82 class UNIXPasswordDatabase: 83 """ 84 A checker which validates users out of the UNIX password databases, or a 85 databases a compatible format. 86 87 @ivar getpwnamFunctions: a C{tuple} of functions which are called in order 88 to valid a user. The default value is (getpwnamPasswd, getpwnamShadow), 89 which tries the /etc/passwd database first, followed by the /etc/shadow 90 database. 91 """ 47 92 credentialInterfaces = IUsernamePassword, 48 93 implements(ICredentialsChecker) 49 94 95 96 def __init__(self, getpwnamFunctions=(getpwnamPasswd, getpwnamShadow)): 97 self.getpwnamFunctions = getpwnamFunctions 98 99 50 100 def requestAvatarId(self, credentials): 51 if pwd:101 for func in self.getpwnamFunctions: 52 102 try: 53 cryptedPass = pwd.getpwnam(credentials.username)[1]103 pwnam = func(credentials.username) 54 104 except KeyError: 55 105 return defer.fail(UnauthorizedLogin("invalid username")) 56 106 else: 57 if cryptedPass not in ['*', 'x'] and \ 58 verifyCryptedPassword(cryptedPass, credentials.password): 59 return defer.succeed(credentials.username) 60 if shadow: 61 gid = os.getegid() 62 uid = os.geteuid() 63 os.setegid(0) 64 os.seteuid(0) 65 try: 66 shadowPass = shadow.getspnam(credentials.username)[1] 67 except KeyError: 68 os.setegid(gid) 69 os.seteuid(uid) 70 return defer.fail(UnauthorizedLogin("invalid username")) 71 os.setegid(gid) 72 os.seteuid(uid) 73 if verifyCryptedPassword(shadowPass, credentials.password): 74 return defer.succeed(credentials.username) 75 return defer.fail(UnauthorizedLogin("invalid password")) 76 107 if pwnam is not None: 108 cryptedPass = pwnam[1] 109 if verifyCryptedPassword(cryptedPass, 110 credentials.password): 111 return defer.succeed(credentials.username) 112 # fallback 77 113 return defer.fail(UnauthorizedLogin("unable to verify password")) 114 78 115 79 116 -
branches/spwd-3242-3/twisted/conch/test/test_checkers.py
r27079 r27082 5 5 Tests for L{twisted.conch.checkers}. 6 6 """ 7 8 try:9 import pwd10 except ImportError:11 pwd = None12 13 7 import os, base64 14 8 … … 23 17 try: 24 18 import Crypto.Cipher.DES3 19 Crypto 25 20 import pyasn1 26 21 except ImportError: 27 SSHPublicKeyDatabase= None22 Crypto = None 28 23 else: 29 24 from twisted.conch.ssh import keys 30 from twisted.conch .checkers import SSHPublicKeyDatabase, SSHProtocolChecker25 from twisted.conch import checkers 31 26 from twisted.conch.error import NotEnoughAuthentication, ValidPublicKey 32 27 from twisted.conch.test import keydata … … 38 33 """ 39 34 40 if pwd is None:35 if checkers.pwd is None: 41 36 skip = "Cannot run without pwd module" 42 elif SSHPublicKeyDatabaseis None:37 elif Crypto is None: 43 38 skip = "Cannot run without PyCrypto or PyASN1" 44 39 45 40 def setUp(self): 46 self.checker = SSHPublicKeyDatabase()41 self.checker = checkers.SSHPublicKeyDatabase() 47 42 self.sshDir = FilePath(self.mktemp()) 48 43 self.sshDir.makedirs() … … 55 50 self.mockos.path = self.sshDir.path 56 51 self.patch(os.path, "expanduser", self.mockos.expanduser) 57 self.patch( pwd, "getpwnam", self.mockos.getpwnam)52 self.patch(checkers.pwd, "getpwnam", self.mockos.getpwnam) 58 53 self.patch(os, "seteuid", self.mockos.seteuid) 59 54 self.patch(os, "setegid", self.mockos.setegid) … … 122 117 return True 123 118 self.patch(self.checker, 'checkKey', _checkKey) 124 credentials = SSHPrivateKey('test', 'ssh-rsa', keydata.publicRSA_openssh, 125 'foo', keys.Key.fromString(keydata.privateRSA_openssh).sign('foo')) 119 credentials = SSHPrivateKey('test', 'ssh-rsa', 120 keydata.publicRSA_openssh, 'foo', 121 keys.Key.fromString( 122 keydata.privateRSA_openssh).sign('foo')) 126 123 d = self.checker.requestAvatarId(credentials) 127 124 def _verify(avatarId): … … 140 137 return True 141 138 self.patch(self.checker, 'checkKey', _checkKey) 142 credentials = SSHPrivateKey('test', 'ssh-rsa', keydata.publicRSA_openssh, None, None) 139 credentials = SSHPrivateKey('test', 'ssh-rsa', 140 keydata.publicRSA_openssh, None, None) 143 141 d = self.checker.requestAvatarId(credentials) 144 142 return self.assertFailure(d, ValidPublicKey) … … 166 164 return True 167 165 self.patch(self.checker, 'checkKey', _checkKey) 168 credentials = SSHPrivateKey('test', 'ssh-rsa', keydata.publicRSA_openssh, 169 'foo', keys.Key.fromString(keydata.privateDSA_openssh).sign('foo')) 166 credentials = SSHPrivateKey('test', 'ssh-rsa', 167 keydata.publicRSA_openssh, 'foo', 168 keys.Key.fromString( 169 keydata.privateDSA_openssh).sign('foo')) 170 170 d = self.checker.requestAvatarId(credentials) 171 171 return self.assertFailure(d, UnauthorizedLogin) … … 190 190 191 191 192 192 193 class SSHProtocolCheckerTestCase(TestCase): 193 194 """ … … 195 196 """ 196 197 197 if SSHPublicKeyDatabaseis None:198 if Crypto is None: 198 199 skip = "Cannot run without PyCrypto" 199 200 … … 203 204 the list of registered checkers. 204 205 """ 205 checker = SSHProtocolChecker()206 checker = checkers.SSHProtocolChecker() 206 207 self.assertEquals(checker.credentialInterfaces, []) 207 checker.registerChecker( SSHPublicKeyDatabase(), )208 checker.registerChecker(checkers.SSHPublicKeyDatabase(), ) 208 209 self.assertEquals(checker.credentialInterfaces, [ISSHPrivateKey]) 209 210 self.assertIsInstance(checker.checkers[ISSHPrivateKey], 210 SSHPublicKeyDatabase)211 checkers.SSHPublicKeyDatabase) 211 212 212 213 … … 218 219 credentialIntefaces. 219 220 """ 220 checker = SSHProtocolChecker()221 checker = checkers.SSHProtocolChecker() 221 222 self.assertEquals(checker.credentialInterfaces, []) 222 checker.registerChecker(SSHPublicKeyDatabase(), IUsernamePassword) 223 checker.registerChecker(checkers.SSHPublicKeyDatabase(), 224 IUsernamePassword) 223 225 self.assertEquals(checker.credentialInterfaces, [IUsernamePassword]) 224 226 self.assertIsInstance(checker.checkers[IUsernamePassword], 225 SSHPublicKeyDatabase)227 checkers.SSHPublicKeyDatabase) 226 228 227 229 … … 231 233 registered checkers to authenticate a user. 232 234 """ 233 checker = SSHProtocolChecker()235 checker = checkers.SSHProtocolChecker() 234 236 passwordDatabase = InMemoryUsernamePasswordDatabaseDontUse() 235 237 passwordDatabase.addUser('test', 'test') … … 247 249 L{NotEnoughAuthentication}. 248 250 """ 249 checker = SSHProtocolChecker()251 checker = checkers.SSHProtocolChecker() 250 252 def _areDone(avatarId): 251 253 return False … … 264 266 L{SSHProtocolChecker} should raise L{UnhandledCredentials}. 265 267 """ 266 checker = SSHProtocolChecker()268 checker = checkers.SSHProtocolChecker() 267 269 d = checker.requestAvatarId(UsernamePassword('test', 'test')) 268 270 return self.assertFailure(d, UnhandledCredentials) … … 273 275 The default L{SSHProcotolChecker.areDone} should simply return True. 274 276 """ 275 self.assertEquals(SSHProtocolChecker().areDone(None), True) 277 self.assertEquals(checkers.SSHProtocolChecker().areDone(None), True) 278 279 280 281 class HelperFunctionTests(TestCase): 282 """ 283 Tests for helper functions L{verifyCryptedPassword}, L{getpwnamPasswd} and 284 L{getpwnamShadow}. 285 """ 286 if checkers.pwd is None: 287 skip = 'cannot run without crypt module' 288 elif not getattr(os, 'O_NOCTTY', None): 289 skip = 'cannot run without os.O_NOCTTY' 290 291 292 def setUp(self): 293 self.mockos = MockOS() 294 295 296 def test_verifyCryptedPassword(self): 297 """ 298 L{verifyCryptedPassword} returns True if the provided cleartext 299 password matches the provided password hash. 300 """ 301 password = 'password' 302 salt = 'sa' 303 encrypted = checkers.crypt.crypt(password, salt) 304 self.assertTrue(checkers.verifyCryptedPassword( 305 encrypted, 306 password), '%r not valid encrypted password for %s' % ( 307 encrypted, password)) 308 309 310 def test_verifyCryptedPassword_md5(self): 311 """ 312 L{verifyCryptedPassword} returns True if the provided cleartext 313 password matches the provided MD5 password hash. 314 """ 315 password = 'password' 316 salt = '$1$salt' 317 encrypted = checkers.crypt.crypt(password, salt) 318 self.assertTrue(checkers.verifyCryptedPassword( 319 encrypted, 320 password), '%r not valid encrypted password for %s' % ( 321 encrypted, password)) 322 323 324 def test_getpwnamPasswd(self): 325 """ 326 L{getpwnamPasswd} returns a tuple of items from the UNIX /etc/passwd 327 database if the L{pwd} module is present. If that module isn't 328 present, it returns None. 329 """ 330 if checkers.pwd is None: 331 self.assertIdentical(checkers.getpwnamPasswd('user'), None) 332 else: 333 self.patch(checkers.pwd, "getpwnam", self.mockos.getpwnam) 334 user_line = checkers.getpwnamPasswd('user') 335 self.assertEquals(user_line, (0, 0, 1, 2)) # what MockOS.getpwnam 336 # returns 337 338 339 def test_getpwnamShadow(self): 340 """ 341 L{getpwnamShadow} returns a tuple of items from the UNIX /etc/shadow 342 database if the L{spwd} or L{shadow} modules are present. If the 343 modules are not present, it returns None. 344 """ 345 if checkers.shadow is None: 346 self.assertIdentical(checkers.getpwnamShadow('user'), None) 347 else: 348 self.patch(checkers.shadow, 'getspnam', lambda u: (0, 1, 2, 3)) 349 self.mockos.euid = 2345 350 self.mockos.egid = 1234 351 self.patch(os, "geteuid", self.mockos.geteuid) 352 self.patch(os, "getegid", self.mockos.getegid) 353 self.patch(os, "seteuid", self.mockos.seteuid) 354 self.patch(os, "setegid", self.mockos.setegid) 355 user_line = checkers.getpwnamShadow('user') 356 self.assertEquals(user_line, (0, 1, 2, 3)) 357 self.assertEquals(self.mockos.seteuidCalls, [0, 2345]) 358 self.assertEquals(self.mockos.setegidCalls, [0, 1234]) 359 360 361 362 class UNIXPasswordDatabaseTests(TestCase): 363 """ 364 Tests for L{UNIXPasswordDatabase}. 365 """ 366 def test_defaultCheckers(self): 367 """ 368 L{UNIXPasswordDatabase} with no arguments has L{getpwnamPasswd} and 369 L{getpwnamShadow} as the getpwnamFunctions. 370 """ 371 checker = checkers.UNIXPasswordDatabase() 372 self.assertEquals(checker.getpwnamFunctions, 373 (checkers.getpwnamPasswd, 374 checkers.getpwnamShadow)) 375 376 377 def assertLoggedIn(self, d): 378 """ 379 Assert that the L{Deferred} passed in is called back with the value 380 'username'. This represents a valid login for this TestCase. 381 382 NOTE: To work, this method's return value must be returned from the 383 test method, or otherwise hooked up to the test machinery. 384 385 @param d: a L{Deferred} from an L{IChecker.requestAvatarId} method. 386 @type d: L{Deferred} 387 @rtype: L{Deferred} 388 """ 389 def _cbRequestAvatarId(username): 390 self.assertEquals(username, 'username') 391 392 return d.addCallback(_cbRequestAvatarId) 393 394 395 def assertUnauthorizedLogin(self, d): 396 """ 397 Asserts that the L{Deferred} passed in is erred back with an 398 L{UnauthorizedLogin} L{Failure}. This reprsents an invalid login for 399 this TestCase. 400 401 NOTE: To work, this method's return value must be returned from the 402 test method, or otherwise hooked up to the test machinery. 403 404 @param d: a L{Deferred} from an L{IChecker.requestAvatarId} method. 405 @type d: L{Deferred} 406 @rtype: L{Deferred} 407 """ 408 def _ebRequestAvatarId(reason): 409 reason.trap(checkers.UnauthorizedLogin) 410 return d.addErrback(_ebRequestAvatarId) 411 412 413 def test_passInCheckers(self): 414 """ 415 L{UNIXPasswordDatabase} takes a list of functions to check for UNIX 416 user information. 417 """ 418 def getpwnam(username): pass 419 checker = checkers.UNIXPasswordDatabase([getpwnam]) 420 self.assertEquals(checker.getpwnamFunctions, 421 [getpwnam]) 422 423 424 def test_verifyPassword(self): 425 """ 426 If the encrypted password provided by the getpwnam function is valid 427 (verified by the L{verifyCryptedPassword} function), we callback the 428 C{requestAvatarId} L{Deferred} with the username. 429 """ 430 def verifyCryptedPassword(crypted, pw): 431 return crypted == pw 432 def getpwnam(username): 433 return [username, username] 434 self.patch(checkers, 'verifyCryptedPassword', verifyCryptedPassword) 435 checker = checkers.UNIXPasswordDatabase([getpwnam]) 436 credential = UsernamePassword('username', 'username') 437 d = checker.requestAvatarId(credential) 438 return self.assertLoggedIn(d) 439 440 441 def test_failOnKeyError(self): 442 """ 443 If the getpwnam function raises a KeyError, the login fails with an 444 L{UnauthorizedLogin} exception. 445 """ 446 def getpwnam(username): 447 raise KeyError(username) 448 checker = checkers.UNIXPasswordDatabase([getpwnam]) 449 credential = UsernamePassword('username', 'username') 450 d = checker.requestAvatarId(credential) 451 return self.assertUnauthorizedLogin(d) 452 453 454 def test_failOnBadPassword(self): 455 """ 456 If the verifyCryptedPassword function doesn't verify the password, the 457 login fails with an L{UnauthorizedLogin} exception. 458 """ 459 def verifyCryptedPassword(crypted, pw): 460 return False 461 def getpwnam(username): 462 return [username, username] 463 self.patch(checkers, 'verifyCryptedPassword', verifyCryptedPassword) 464 checker = checkers.UNIXPasswordDatabase([getpwnam]) 465 credential = UsernamePassword('username', 'username') 466 d = checker.requestAvatarId(credential) 467 return self.assertUnauthorizedLogin(d) 468 469 470 def test_loopThroughFunctions(self): 471 """ 472 UNIXPasswordDatabase.requestAvatarId loops through each getpwnam 473 function associated with it and returns a L{Deferred} which fires with 474 the result of the first one which returns a value other than None. 475 ones do not verify the password. 476 """ 477 def verifyCryptedPassword(crypted, pw): 478 return crypted == pw 479 def getpwnam1(username): 480 return [username, 'not the password'] 481 def getpwnam2(username): 482 return [username, username] 483 self.patch(checkers, 'verifyCryptedPassword', verifyCryptedPassword) 484 checker = checkers.UNIXPasswordDatabase([getpwnam1, getpwnam2]) 485 credential = UsernamePassword('username', 'username') 486 d = checker.requestAvatarId(credential) 487 return self.assertLoggedIn(d)
