Ticket #5495: ticket-5495-digestcredentials-nonce-verification-and-pipelining.patch

File ticket-5495-digestcredentials-nonce-verification-and-pipelining.patch, 13.9 KB (added by JohnDoeee, 2 years ago)
  • twisted/test/test_digestauth.py

     
    2626    def __init__(self, *args, **kwargs): 
    2727        super(FakeDigestCredentialFactory, self).__init__(*args, **kwargs) 
    2828        self.privateKey = "0" 
     29        self.fakeTime = 0 
    2930 
    30  
    31     def _generateNonce(self): 
    32         """ 
    33         Generate a static nonce 
    34         """ 
    35         return '178288758716122392881254770685' 
    36  
    37  
    3831    def _getTime(self): 
    3932        """ 
    4033        Return a stable time 
    4134        """ 
    42         return 0 
     35        return self.fakeTime 
    4336 
    4437 
    4538 
     
    355348        self.assertFalse(creds.checkPassword(self.password + 'wrong')) 
    356349 
    357350 
    358     def test_multiResponse(self): 
    359         """ 
    360         L{DigestCredentialFactory.decode} handles multiple responses to a 
    361         single challenge. 
    362         """ 
    363         challenge = self.credentialFactory.getChallenge(self.clientAddress.host) 
    364  
    365         nc = "00000001" 
    366         clientResponse = self.formatResponse( 
    367             nonce=challenge['nonce'], 
    368             response=self.getDigestResponse(challenge, nc), 
    369             nc=nc, 
    370             opaque=challenge['opaque']) 
    371  
    372         creds = self.credentialFactory.decode(clientResponse, self.method, 
    373                                               self.clientAddress.host) 
    374         self.assertTrue(creds.checkPassword(self.password)) 
    375         self.assertFalse(creds.checkPassword(self.password + 'wrong')) 
    376  
    377         nc = "00000002" 
    378         clientResponse = self.formatResponse( 
    379             nonce=challenge['nonce'], 
    380             response=self.getDigestResponse(challenge, nc), 
    381             nc=nc, 
    382             opaque=challenge['opaque']) 
    383  
    384         creds = self.credentialFactory.decode(clientResponse, self.method, 
    385                                               self.clientAddress.host) 
    386         self.assertTrue(creds.checkPassword(self.password)) 
    387         self.assertFalse(creds.checkPassword(self.password + 'wrong')) 
    388  
    389  
    390351    def test_failsWithDifferentMethod(self): 
    391352        """ 
    392353        L{DigestCredentialFactory.decode} returns an L{IUsernameHashedPassword} 
     
    669630        opaque = self.credentialFactory._generateOpaque( 
    670631            "long nonce " * 10, None) 
    671632        self.assertNotIn('\n', opaque) 
     633 
     634 
     635    def test_reusedNonce(self): 
     636        """ 
     637        L{DigestCredentialFactory.decode} raises L{LoginFailed} when the given 
     638        same nonce twice 
     639        """ 
     640        credentialFactory = FakeDigestCredentialFactory(self.algorithm, 
     641                                                        self.realm) 
     642        challenge = credentialFactory.getChallenge(self.clientAddress.host) 
     643 
     644        key = '%s,%s,%s' % (challenge['nonce'], 
     645                            self.clientAddress.host, 
     646                            '0') 
     647        digest = md5(key + credentialFactory.privateKey).hexdigest() 
     648        ekey = b64encode(key) 
     649 
     650        nonceOpaque = '%s-%s' % (digest, ekey.strip('\n')) 
     651         
     652        self.assertEqual(credentialFactory._verifyNonce( 
     653                challenge['nonce']), 
     654            True) 
     655 
     656        self.assertRaises( 
     657            LoginFailed, 
     658            credentialFactory._verifyNonce, 
     659            challenge['nonce']) 
     660 
     661 
     662    def test_nonceTimeoutCleanup(self): 
     663        """ 
     664        L{DigestCredentialFactory._nonceCleanup} cleans up old nonces 
     665        to prevent memory exhaustion. The nonces cannot facilitate replay 
     666        attacks as the original request is timestamped. 
     667        """ 
     668        credentialFactory = FakeDigestCredentialFactory(self.algorithm, 
     669                                                        self.realm) 
     670 
     671        challenge = credentialFactory.getChallenge(self.clientAddress.host) 
     672 
     673        # First verify a nonce, this adds it to known nonces 
     674        self.assertEqual(credentialFactory._verifyNonce( 
     675                challenge['nonce']), 
     676            True) 
     677 
     678        # Turn the clock forward 
     679        credentialFactory.fakeTime = 10000 
     680 
     681        # Cleanup old nonces 
     682        credentialFactory._nonceCleanup() 
     683 
     684        # The nonce tracking mechanisms must now be empty as the only item 
     685        # tracked was cleaned up. 
     686        self.assertEqual(len(credentialFactory._nonces), 0) 
     687        self.assertEqual(len(credentialFactory._nonceCleanupTracker), 0) 
     688 
     689        # Turn clock back again, the same nonce must now work again 
     690        # as it is no longer known 
     691        self.assertEqual(credentialFactory._verifyNonce( 
     692                challenge['nonce']), 
     693            True) 
     694 
     695    def test_nonceCounter(self): 
     696        """ 
     697        L{DigestCredentialFactory._verifyNonce} keeps track of nonce counter 
     698        and allows reuse if the nonces are in order 
     699        """ 
     700        credentialFactory = FakeDigestCredentialFactory(self.algorithm, 
     701                                                        self.realm) 
     702 
     703        challenge = credentialFactory.getChallenge(self.clientAddress.host) 
     704 
     705        self.assertEqual(credentialFactory._verifyNonce( 
     706                challenge['nonce'], 
     707                '00000001'), 
     708            True) 
     709 
     710        self.assertEqual(credentialFactory._verifyNonce( 
     711                challenge['nonce'], 
     712                '00000002'), 
     713            True) 
     714 
     715    def test_nonceCounterWrongInitValue(self): 
     716        """ 
     717        L{DigestCredentialFactory._verifyNonce} keeps track of nonce counter 
     718        and allows reuse if the nonces are in order 
     719        """ 
     720        credentialFactory = FakeDigestCredentialFactory(self.algorithm, 
     721                                                        self.realm) 
     722 
     723        challenge = credentialFactory.getChallenge(self.clientAddress.host) 
     724 
     725        self.assertRaises( 
     726            LoginFailed, 
     727            credentialFactory._verifyNonce, 
     728            challenge['nonce'], '00000002') 
     729 
     730    def test_nonceCounterOutOfOrder(self): 
     731        """ 
     732        FIXME 
     733        """ 
     734        credentialFactory = FakeDigestCredentialFactory(self.algorithm, 
     735                                                        self.realm) 
     736 
     737        challenge = credentialFactory.getChallenge(self.clientAddress.host) 
     738 
     739        self.assertEqual(credentialFactory._verifyNonce( 
     740                challenge['nonce'], 
     741                '00000001'), 
     742            True) 
     743 
     744        self.assertRaises( 
     745            LoginFailed, 
     746            credentialFactory._verifyNonce, 
     747            challenge['nonce'], '00000003') 
     748 
     749        self.assertEqual(credentialFactory._verifyNonce( 
     750                challenge['nonce'], 
     751                '00000002'), 
     752            True) 
     753 
     754    def test_withQopWithoutCnonceOrNc(self): 
     755        """ 
     756        L{DigestCredentialFactory.decode} must fail when provided with qop but 
     757        without cnonce or nc. 
     758        """ 
     759        # Test for nc 
     760        challenge = self.credentialFactory.getChallenge(self.clientAddress.host) 
     761 
     762        nc = None 
     763        clientResponse = self.formatResponse( 
     764            nonce=challenge['nonce'], 
     765            response=self.getDigestResponse(challenge, nc), 
     766            nc=nc, 
     767            opaque=challenge['opaque']) 
     768        self.assertRaises(LoginFailed, self.credentialFactory.decode, 
     769            clientResponse, self.method, self.clientAddress.host) 
     770 
     771        # Test for cnonce 
     772        challenge = self.credentialFactory.getChallenge(self.clientAddress.host) 
     773 
     774        nc = "00000001" 
     775        clientResponse = self.formatResponse( 
     776            nonce=challenge['nonce'], 
     777            response=self.getDigestResponse(challenge, nc), 
     778            cnonce=None, 
     779            opaque=challenge['opaque']) 
     780        self.assertRaises(LoginFailed, self.credentialFactory.decode, 
     781            clientResponse, self.method, self.clientAddress.host) 
     782 
     783    def test_withoutQopWithCnonceAndNc(self): 
     784        """ 
     785        L{DigestCredentialFactory.decode} must fail when not provided with qop but 
     786        with cnonce or nc. 
     787        """ 
     788        challenge = self.credentialFactory.getChallenge(self.clientAddress.host) 
     789 
     790        nc = "00000001" 
     791        clientResponse = self.formatResponse( 
     792            nonce=challenge['nonce'], 
     793            response=self.getDigestResponse(challenge, nc), 
     794            nc=nc, 
     795            qop=None, 
     796            opaque=challenge['opaque']) 
     797        self.assertRaises(LoginFailed, self.credentialFactory.decode, 
     798            clientResponse, self.method, self.clientAddress.host) 
     799 
     800    def test_withoutQop(self): 
     801        """ 
     802        L{DigestCredentialFactory.decode} must work when no qop is provided to be backward 
     803        compatible. 
     804        """ 
     805        challenge = self.credentialFactory.getChallenge(self.clientAddress.host) 
     806 
     807        nc = None 
     808        clientResponse = self.formatResponse( 
     809            nonce=challenge['nonce'], 
     810            response=self.getDigestResponse(challenge, nc), 
     811            nc=nc, 
     812            cnonce=None, 
     813            qop=None, 
     814            opaque=challenge['opaque']) 
     815        creds = self.credentialFactory.decode( 
     816            clientResponse, self.method, self.clientAddress.host) 
     817        self.assertTrue(creds.checkPassword(self.password)) 
     818        self.assertFalse(creds.checkPassword(self.password + 'wrong')) 
     819 No newline at end of file 
  • twisted/cred/credentials.py

     
    66 
    77from zope.interface import implements, Interface 
    88 
    9 import hmac, time, random 
     9import hmac, time, random, struct 
     10from collections import deque 
    1011from twisted.python.hashlib import md5 
    1112from twisted.python.randbytes import secureRandom 
    1213from twisted.cred._digest import calcResponse, calcHA1, calcHA2 
     
    187188    @type authenticationRealm: C{str} 
    188189    @param authenticationRealm: case sensitive string that specifies the realm 
    189190        portion of the challenge 
     191 
     192    @type _nonces: C{dict} 
     193    @param _nonces: a collection of all previously used nonce, each key contains 
     194        a tuple as follows: (nonce_last_use, none_counter) 
     195 
     196    @type _nonceCleanupTracker: C{deque} 
     197    @param _nonceCleanupTracker: a queue of all currently known nonces, used to 
     198        cycle over in an orderly fashion to remove old nonces 
    190199    """ 
    191200 
    192201    CHALLENGE_LIFETIME_SECS = 15 * 60    # 15 minutes 
     
    197206        self.algorithm = algorithm 
    198207        self.authenticationRealm = authenticationRealm 
    199208        self.privateKey = secureRandom(12) 
     209        self._nonces = {} 
     210        self._nonceCleanupTracker = deque() 
    200211 
    201212 
    202213    def getChallenge(self, address): 
     
    313324        return True 
    314325 
    315326 
     327    def _verifyNonce(self, nonce, nc=None): 
     328        """ 
     329        Given the nonce and a nonce counter from the request, verify that 
     330        the request is not a replay of an already handled request. 
     331         
     332        This implementation accepts unknown nonces even though the nonces 
     333        are handed out by the server. 
     334        A specific method to handle nonces in relation to this is not mentioned 
     335        in rfc2617, but the method used here is also used elsewhere, e.g. 
     336        https://tools.ietf.org/html/draft-ietf-oauth-v2-http-mac-01 
     337 
     338        @param nonce: The nonce value from the Digest response 
     339        @param nc: The nonce count value from the Digest response 
     340         
     341        @return: C{True} if the nonce does not present as a replay. 
     342 
     343        @raise error.LoginFailed: if C{nonce} is already used, not provided 
     344            by us or counter is not valid. 
     345        """ 
     346 
     347        self._nonceCleanup() 
     348 
     349        if nc is not None: 
     350            nc, = struct.unpack('!I', nc.decode('hex')) 
     351 
     352        if nonce in self._nonces: 
     353            if nc is None: # no counter, then this nonce cannot be reused 
     354                raise error.LoginFailed('Invalid response, nonce already used') 
     355 
     356            _, last_nc = self._nonces[nonce] 
     357 
     358            if nc - last_nc != 1: 
     359                raise error.LoginFailed('Invalid response, nc counts wrong') 
     360 
     361        elif nc is not None and nc != 1: 
     362            raise error.LoginFailed('Invalid Response, nc must start at 1') 
     363 
     364        self._nonces[nonce] = (self._getTime(), nc) 
     365        self._nonceCleanupTracker.append(nonce) 
     366 
     367        return True 
     368 
     369 
     370    def _nonceCleanup(self, checkCount=10): 
     371        """ 
     372        Given a number of nonces to check, cycles C{checkCount} known nonces in a 
     373        lru fashion. Checks age, deletes nonces too old to be used in a replay 
     374        attack and adds any other nonce back into the verification queue. 
     375 
     376        This method is used to be reasonably sure old nonces does not consume 
     377        memory indefinitely while taking performance into consideration. 
     378 
     379        @param checkCount: Number of nonces to verify 
     380        """ 
     381 
     382        for _ in range(min(len(self._nonceCleanupTracker), checkCount)): 
     383            nonce = self._nonceCleanupTracker.popleft() 
     384            last_use, _ = self._nonces[nonce] 
     385            if self._getTime() - last_use > self.CHALLENGE_LIFETIME_SECS: 
     386                del self._nonces[nonce] 
     387            else: 
     388                self._nonceCleanupTracker.append(nonce) 
     389 
     390 
    316391    def decode(self, response, method, host): 
    317392        """ 
    318393        Decode the given response and attempt to generate a 
     
    355430        if 'nonce' not in auth: 
    356431            raise error.LoginFailed('Invalid response, no nonce given.') 
    357432 
     433        for k in ['cnonce', 'nc']: # these MUST/MUST NOT be part of the request if 
     434                                   # qop is/is not part of the request 
     435            if (k in auth) != ('qop' in auth): 
     436                raise error.LoginFailed('Invalid Response, qop mismatch with other parts') 
     437 
    358438        # Now verify the nonce/opaque values for this client 
    359         if self._verifyOpaque(auth.get('opaque'), auth.get('nonce'), host): 
     439        if self._verifyOpaque(auth.get('opaque'), auth.get('nonce'), host) and \ 
     440           self._verifyNonce(auth.get('nonce'), auth.get('nc', None)): 
    360441            return DigestedCredentials(username, 
    361442                                       method, 
    362443                                       self.authenticationRealm,