summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosé Padilla <jpadilla@webapplicate.com>2015-03-17 13:07:22 -0400
committerJosé Padilla <jpadilla@webapplicate.com>2015-03-17 13:10:25 -0400
commit5a2bcf4b9da6b66c9469e597151d62f661fd3710 (patch)
tree3c2796a64c98ea533daca465069d6638c9a3bf19
parentd47163117bef52392f314406ad0a4177e2a65e16 (diff)
downloadpyjwt-fix_alg_vuln_on_verify.tar.gz
Work in progress fix for vulnerabilityfix_alg_vuln_on_verify
-rw-r--r--jwt/api.py25
-rw-r--r--tests/keys/invalid_pub.pem19
-rw-r--r--tests/test_jwt.py39
3 files changed, 70 insertions, 13 deletions
diff --git a/jwt/api.py b/jwt/api.py
index e509e23..e1b4672 100644
--- a/jwt/api.py
+++ b/jwt/api.py
@@ -89,8 +89,10 @@ def decode(jwt, key='', verify=True, **kwargs):
payload, signing_input, header, signature = load(jwt)
if verify:
- verify_signature(payload, signing_input, header, signature, key,
- **kwargs)
+ verify_signature(
+ payload, signing_input, header,
+ signature, key, **kwargs
+ )
return payload
@@ -136,7 +138,20 @@ def load(jwt):
def verify_signature(payload, signing_input, header, signature, key='',
verify_expiration=True, leeway=0, audience=None,
- issuer=None):
+ issuer=None, algorithms=None):
+
+ if not algorithms and isinstance(key, string_types):
+ secret = key
+ if isinstance(key, text_type):
+ secret = key.encode('utf-8')
+
+ if b'BEGIN CERTIFICATE' in secret or b'BEGIN PUBLIC KEY' in secret:
+ algorithms = ['RS256', 'RS384', 'RS512', 'ES256', 'ES384', 'ES512']
+ else:
+ algorithms = ['HS256', 'HS384', 'HS512']
+
+ if not algorithms:
+ algorithms = _algorithms.keys()
if isinstance(leeway, timedelta):
leeway = timedelta_total_seconds(leeway)
@@ -146,6 +161,10 @@ def verify_signature(payload, signing_input, header, signature, key='',
try:
alg_obj = _algorithms[header['alg']]
+
+ if header['alg'] not in algorithms:
+ raise DecodeError('Signature verification failed')
+
key = alg_obj.prepare_key(key)
if not alg_obj.verify(signing_input, key, signature):
diff --git a/tests/keys/invalid_pub.pem b/tests/keys/invalid_pub.pem
new file mode 100644
index 0000000..2482abb
--- /dev/null
+++ b/tests/keys/invalid_pub.pem
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDJjCCAg6gAwIBAgIJAMyz3mSPlaW4MA0GCSqGSIb3DQEBBQUAMBYxFDASBgNV
+BAMUCyouYXV0aDAuY29tMB4XDTEzMDQxODE3MDE1MFoXDTI2MTIyNjE3MDE1MFow
+FjEUMBIGA1UEAxQLKi5hdXRoMC5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw
+ggEKAoIBAQDZq1Ua0/BGm+TaBFoftKWeYMWrQG9Fx3g7ikErxljmyOvlwqkiat3q
+ixX+Dxw9TFb5gbBjNJ+L3nt4YefJgLsYvsHqkOUxWsB+HM/ulJRVnVrZm1tI3Nbg
+xO1BQ7DrGfBpq2KCxtQCaQFRlQJw1+qS5LwrdIvihB7Kc142VElCFFHJ6+09eMUy
+jy00Z5pfQr4Am6W6eEOS9ObDbNs4XgKOcWe5khWXj3UStou+VgbAg40XcYht2IbY
+gMfKF+VUZOy3+e+aRTqPOBU3MAeb0tvCCPUQJbNAUHgSKVhAvNf8mRwttVsOLT70
+anjjeCOd7RKS8fVKBwc2KtgNkghYdPY9AgMBAAGjdzB1MB0GA1UdDgQWBBSi4+X0
++MvCKDdd375mDhx/ZBbJ4DBGBgNVHSMEPzA9gBSi4+X0+MvCKDdd375mDhx/ZBbJ
+4KEapBgwFjEUMBIGA1UEAxQLKi5hdXRoMC5jb22CCQDMs95kj5WluDAMBgNVHRME
+BTADAQH/MA0GCSqGSIb3DQEBBQUAA4IBAQBi0qPe0DzlPSufq+Gdk2Fwf1pGEtjA
+D34IxxJ9SX6r1DS/NIP7IOLUnNU8cP8BQWl7i413v29jJsNV457pjdmqf8J7OE9O
+eF5Yz1x91gY/27561Iga/TQeIVOlFQAgx66eLfUFFoAig3hz2srZo5TzYBixMJsS
+fYMXHPiU7KoLUqYXvpSXIllstQCu51KCC6t9H7wZ92lTES1v76hFY4edQ30sftPo
+kjAYWGEhMjPo/r4THcdSMqKXoRtCGEun4pTXid7MJcTgdGDrAJddLWi6SxKecEVB
+MhMu4XfUCdxCwqQPjHeJ+zE49A1CUdBB2FN3BNLbmTTwEBgmuwyGRzhj
+-----END CERTIFICATE-----
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index 4eeef24..007472e 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -248,7 +248,7 @@ class TestJWT(unittest.TestCase):
b'urHRJDJHTqIdpLWXkY7zVikeen6FhuGyn060Dz9gYq9t'
b'uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr'
b'qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A')
- decoded_payload = jwt.decode(example_jwt, example_pubkey)
+ decoded_payload = jwt.decode(example_jwt, example_pubkey, algorithms=['RS384'])
self.assertEqual(decoded_payload, example_payload)
@@ -594,10 +594,10 @@ class TestJWT(unittest.TestCase):
with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
pub_rsakey = rsa_pub_file.read()
- assert jwt.decode(jwt_message, pub_rsakey)
+ assert jwt.decode(jwt_message, pub_rsakey, algorithms=['RS256'])
load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_rsakey, *load_output)
+ jwt_verify_signature(key=pub_rsakey, algorithms=['RS256'], *load_output)
@unittest.skipIf(not has_crypto, 'Not supported without cryptography library')
def test_encode_decode_with_rsa_sha384(self):
@@ -611,7 +611,7 @@ class TestJWT(unittest.TestCase):
with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()),
backend=default_backend())
- assert jwt.decode(jwt_message, pub_rsakey)
+ assert jwt.decode(jwt_message, pub_rsakey, algorithms=['RS384'])
# string-formatted key
with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
@@ -621,10 +621,10 @@ class TestJWT(unittest.TestCase):
with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
pub_rsakey = rsa_pub_file.read()
- assert jwt.decode(jwt_message, pub_rsakey)
+ assert jwt.decode(jwt_message, pub_rsakey, algorithms=['RS384'])
load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_rsakey, *load_output)
+ jwt_verify_signature(key=pub_rsakey, algorithms=['RS384'], *load_output)
@unittest.skipIf(not has_crypto, 'Not supported without cryptography library')
def test_encode_decode_with_rsa_sha512(self):
@@ -638,10 +638,10 @@ class TestJWT(unittest.TestCase):
with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()),
backend=default_backend())
- assert jwt.decode(jwt_message, pub_rsakey)
+ assert jwt.decode(jwt_message, pub_rsakey, algorithms=['RS512'])
load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_rsakey, *load_output)
+ jwt_verify_signature(key=pub_rsakey, algorithms=['RS512'], *load_output)
# string-formatted key
with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
@@ -651,10 +651,10 @@ class TestJWT(unittest.TestCase):
with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
pub_rsakey = rsa_pub_file.read()
- assert jwt.decode(jwt_message, pub_rsakey)
+ assert jwt.decode(jwt_message, pub_rsakey, algorithms=['RS512'])
load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_rsakey, *load_output)
+ jwt_verify_signature(key=pub_rsakey, algorithms=['RS512'], *load_output)
def test_rsa_related_algorithms(self):
if has_crypto:
@@ -872,6 +872,25 @@ class TestJWT(unittest.TestCase):
payload = jwt.decode(token, 'secret')
self.assertEqual(payload, {'some_decimal': 'it worked'})
+ @unittest.skipIf(not has_crypto, 'Not supported without cryptography library')
+ def test_verification_with_an_asymmetric_key_of_a_token_signed_with_a_symmetric_key(self):
+ with open('tests/keys/invalid_pub.pem', 'r') as rsa_pub_file:
+ pub_rsakey = rsa_pub_file.read()
+
+ # Normally, we would generate a token using RSA private key
+ # token = jwt.encode({'superuser': False}, rsaPrivateKey, 'RS256')
+
+ # But an attacker can instead generate an HMAC token using RSA public key
+ hmac_key = pub_rsakey
+ token = jwt.encode({'superuser': True}, hmac_key, 'HS256')
+
+ # It decodes and validates successfully, even though the attacker had no
+ # knowledge of the private key!
+ with self.assertRaises(DecodeError) as context:
+ jwt.decode(token, pub_rsakey)
+
+ exception = context.exception
+ self.assertEqual(str(exception), 'Signature verification failed')
if __name__ == '__main__':
unittest.main()