diff options
author | José Padilla <jpadilla@webapplicate.com> | 2015-03-17 13:07:22 -0400 |
---|---|---|
committer | José Padilla <jpadilla@webapplicate.com> | 2015-03-17 13:10:25 -0400 |
commit | 5a2bcf4b9da6b66c9469e597151d62f661fd3710 (patch) | |
tree | 3c2796a64c98ea533daca465069d6638c9a3bf19 | |
parent | d47163117bef52392f314406ad0a4177e2a65e16 (diff) | |
download | pyjwt-fix_alg_vuln_on_verify.tar.gz |
Work in progress fix for vulnerabilityfix_alg_vuln_on_verify
-rw-r--r-- | jwt/api.py | 25 | ||||
-rw-r--r-- | tests/keys/invalid_pub.pem | 19 | ||||
-rw-r--r-- | tests/test_jwt.py | 39 |
3 files changed, 70 insertions, 13 deletions
@@ -89,8 +89,10 @@ def decode(jwt, key='', verify=True, **kwargs): payload, signing_input, header, signature = load(jwt) if verify: - verify_signature(payload, signing_input, header, signature, key, - **kwargs) + verify_signature( + payload, signing_input, header, + signature, key, **kwargs + ) return payload @@ -136,7 +138,20 @@ def load(jwt): def verify_signature(payload, signing_input, header, signature, key='', verify_expiration=True, leeway=0, audience=None, - issuer=None): + issuer=None, algorithms=None): + + if not algorithms and isinstance(key, string_types): + secret = key + if isinstance(key, text_type): + secret = key.encode('utf-8') + + if b'BEGIN CERTIFICATE' in secret or b'BEGIN PUBLIC KEY' in secret: + algorithms = ['RS256', 'RS384', 'RS512', 'ES256', 'ES384', 'ES512'] + else: + algorithms = ['HS256', 'HS384', 'HS512'] + + if not algorithms: + algorithms = _algorithms.keys() if isinstance(leeway, timedelta): leeway = timedelta_total_seconds(leeway) @@ -146,6 +161,10 @@ def verify_signature(payload, signing_input, header, signature, key='', try: alg_obj = _algorithms[header['alg']] + + if header['alg'] not in algorithms: + raise DecodeError('Signature verification failed') + key = alg_obj.prepare_key(key) if not alg_obj.verify(signing_input, key, signature): diff --git a/tests/keys/invalid_pub.pem b/tests/keys/invalid_pub.pem new file mode 100644 index 0000000..2482abb --- /dev/null +++ b/tests/keys/invalid_pub.pem @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDJjCCAg6gAwIBAgIJAMyz3mSPlaW4MA0GCSqGSIb3DQEBBQUAMBYxFDASBgNV +BAMUCyouYXV0aDAuY29tMB4XDTEzMDQxODE3MDE1MFoXDTI2MTIyNjE3MDE1MFow +FjEUMBIGA1UEAxQLKi5hdXRoMC5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw +ggEKAoIBAQDZq1Ua0/BGm+TaBFoftKWeYMWrQG9Fx3g7ikErxljmyOvlwqkiat3q +ixX+Dxw9TFb5gbBjNJ+L3nt4YefJgLsYvsHqkOUxWsB+HM/ulJRVnVrZm1tI3Nbg +xO1BQ7DrGfBpq2KCxtQCaQFRlQJw1+qS5LwrdIvihB7Kc142VElCFFHJ6+09eMUy +jy00Z5pfQr4Am6W6eEOS9ObDbNs4XgKOcWe5khWXj3UStou+VgbAg40XcYht2IbY +gMfKF+VUZOy3+e+aRTqPOBU3MAeb0tvCCPUQJbNAUHgSKVhAvNf8mRwttVsOLT70 +anjjeCOd7RKS8fVKBwc2KtgNkghYdPY9AgMBAAGjdzB1MB0GA1UdDgQWBBSi4+X0 ++MvCKDdd375mDhx/ZBbJ4DBGBgNVHSMEPzA9gBSi4+X0+MvCKDdd375mDhx/ZBbJ +4KEapBgwFjEUMBIGA1UEAxQLKi5hdXRoMC5jb22CCQDMs95kj5WluDAMBgNVHRME +BTADAQH/MA0GCSqGSIb3DQEBBQUAA4IBAQBi0qPe0DzlPSufq+Gdk2Fwf1pGEtjA +D34IxxJ9SX6r1DS/NIP7IOLUnNU8cP8BQWl7i413v29jJsNV457pjdmqf8J7OE9O +eF5Yz1x91gY/27561Iga/TQeIVOlFQAgx66eLfUFFoAig3hz2srZo5TzYBixMJsS +fYMXHPiU7KoLUqYXvpSXIllstQCu51KCC6t9H7wZ92lTES1v76hFY4edQ30sftPo +kjAYWGEhMjPo/r4THcdSMqKXoRtCGEun4pTXid7MJcTgdGDrAJddLWi6SxKecEVB +MhMu4XfUCdxCwqQPjHeJ+zE49A1CUdBB2FN3BNLbmTTwEBgmuwyGRzhj +-----END CERTIFICATE----- diff --git a/tests/test_jwt.py b/tests/test_jwt.py index 4eeef24..007472e 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -248,7 +248,7 @@ class TestJWT(unittest.TestCase): b'urHRJDJHTqIdpLWXkY7zVikeen6FhuGyn060Dz9gYq9t' b'uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr' b'qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A') - decoded_payload = jwt.decode(example_jwt, example_pubkey) + decoded_payload = jwt.decode(example_jwt, example_pubkey, algorithms=['RS384']) self.assertEqual(decoded_payload, example_payload) @@ -594,10 +594,10 @@ class TestJWT(unittest.TestCase): with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: pub_rsakey = rsa_pub_file.read() - assert jwt.decode(jwt_message, pub_rsakey) + assert jwt.decode(jwt_message, pub_rsakey, algorithms=['RS256']) load_output = jwt_load(jwt_message) - jwt_verify_signature(key=pub_rsakey, *load_output) + jwt_verify_signature(key=pub_rsakey, algorithms=['RS256'], *load_output) @unittest.skipIf(not has_crypto, 'Not supported without cryptography library') def test_encode_decode_with_rsa_sha384(self): @@ -611,7 +611,7 @@ class TestJWT(unittest.TestCase): with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()), backend=default_backend()) - assert jwt.decode(jwt_message, pub_rsakey) + assert jwt.decode(jwt_message, pub_rsakey, algorithms=['RS384']) # string-formatted key with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file: @@ -621,10 +621,10 @@ class TestJWT(unittest.TestCase): with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: pub_rsakey = rsa_pub_file.read() - assert jwt.decode(jwt_message, pub_rsakey) + assert jwt.decode(jwt_message, pub_rsakey, algorithms=['RS384']) load_output = jwt_load(jwt_message) - jwt_verify_signature(key=pub_rsakey, *load_output) + jwt_verify_signature(key=pub_rsakey, algorithms=['RS384'], *load_output) @unittest.skipIf(not has_crypto, 'Not supported without cryptography library') def test_encode_decode_with_rsa_sha512(self): @@ -638,10 +638,10 @@ class TestJWT(unittest.TestCase): with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()), backend=default_backend()) - assert jwt.decode(jwt_message, pub_rsakey) + assert jwt.decode(jwt_message, pub_rsakey, algorithms=['RS512']) load_output = jwt_load(jwt_message) - jwt_verify_signature(key=pub_rsakey, *load_output) + jwt_verify_signature(key=pub_rsakey, algorithms=['RS512'], *load_output) # string-formatted key with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file: @@ -651,10 +651,10 @@ class TestJWT(unittest.TestCase): with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: pub_rsakey = rsa_pub_file.read() - assert jwt.decode(jwt_message, pub_rsakey) + assert jwt.decode(jwt_message, pub_rsakey, algorithms=['RS512']) load_output = jwt_load(jwt_message) - jwt_verify_signature(key=pub_rsakey, *load_output) + jwt_verify_signature(key=pub_rsakey, algorithms=['RS512'], *load_output) def test_rsa_related_algorithms(self): if has_crypto: @@ -872,6 +872,25 @@ class TestJWT(unittest.TestCase): payload = jwt.decode(token, 'secret') self.assertEqual(payload, {'some_decimal': 'it worked'}) + @unittest.skipIf(not has_crypto, 'Not supported without cryptography library') + def test_verification_with_an_asymmetric_key_of_a_token_signed_with_a_symmetric_key(self): + with open('tests/keys/invalid_pub.pem', 'r') as rsa_pub_file: + pub_rsakey = rsa_pub_file.read() + + # Normally, we would generate a token using RSA private key + # token = jwt.encode({'superuser': False}, rsaPrivateKey, 'RS256') + + # But an attacker can instead generate an HMAC token using RSA public key + hmac_key = pub_rsakey + token = jwt.encode({'superuser': True}, hmac_key, 'HS256') + + # It decodes and validates successfully, even though the attacker had no + # knowledge of the private key! + with self.assertRaises(DecodeError) as context: + jwt.decode(token, pub_rsakey) + + exception = context.exception + self.assertEqual(str(exception), 'Signature verification failed') if __name__ == '__main__': unittest.main() |