diff options
author | Mark Adams <mark@markadams.me> | 2015-04-08 21:18:05 -0500 |
---|---|---|
committer | Mark Adams <mark@markadams.me> | 2015-04-08 21:18:05 -0500 |
commit | 2f4c770d8b1550d9bdafd292c7bae07ff4fe662d (patch) | |
tree | cbb7c8facc55484324bd18c924700d5d65cbc356 | |
parent | a2601ad46433a99c8777a74abeaf4dfd70630d17 (diff) | |
parent | 6e6046cd9f5b0a5b48e37f3a5eb7bfb7efaad958 (diff) | |
download | pyjwt-2f4c770d8b1550d9bdafd292c7bae07ff4fe662d.tar.gz |
Merge pull request #131 from michaeldavis-wf/options-dict
Added `options=` argument to decode()
-rw-r--r-- | AUTHORS | 2 | ||||
-rw-r--r-- | CHANGELOG.md | 1 | ||||
-rw-r--r-- | README.md | 28 | ||||
-rw-r--r-- | jwt/api.py | 52 | ||||
-rw-r--r-- | tests/test_api.py | 71 |
5 files changed, 141 insertions, 13 deletions
@@ -21,3 +21,5 @@ Patches and Suggestions - Mark Adams <mark@markadams.me> - Wouter Bolsterlee <uws@xs4all.nl> + + - Michael Davis <mike.philip.davis@gmail.com> <mike.davis@workiva.com> diff --git a/CHANGELOG.md b/CHANGELOG.md index ced0519..1564f5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ This project adheres to [Semantic Versioning](http://semver.org/). ------------------------------------------------------------------------- ### Changed - Added this CHANGELOG.md file +- Added flexible and complete verification options. #131 ### Fixed - Placeholder @@ -62,6 +62,34 @@ except jwt.InvalidTokenError: pass # do something sensible here, e.g. return HTTP 403 status code ``` +You may also override exception checking via an `options` dictionary. The default +options are as follows: + +```python +options = { + 'verify_signature': True, + 'verify_exp': True, + 'verify_nbf': True, + 'verify_iat': True, + 'verify_aud`: True +} +``` + +You can skip individual checks by passing an `options` dictionary with certain keys set to `False`. +For example, if you want to verify the signature of a JWT that has already expired. + +```python +options = { + 'verify_exp': True, +} + +jwt.decode('someJWTstring', 'secret', options=options) +``` + +**NOTE**: *Changing the default behavior is done at your own risk, and almost certainly will make your +application less secure. Doing so should only be done with a very clear understanding of what you +are doing.* + ## Tests You can run tests from the project root after cloning with: @@ -16,7 +16,7 @@ from .utils import base64url_decode, base64url_encode class PyJWT(object): - def __init__(self, algorithms=None): + def __init__(self, algorithms=None, options=None): self._algorithms = get_default_algorithms() self._valid_algs = set(algorithms) if algorithms is not None else set(self._algorithms) @@ -25,6 +25,19 @@ class PyJWT(object): if key not in self._valid_algs: del self._algorithms[key] + if not options: + options = {} + + self.default_options = { + 'verify_signature': True, + 'verify_exp': True, + 'verify_nbf': True, + 'verify_iat': True, + 'verify_aud': True, + } + + self.options = self._merge_options(self.default_options, options) + def register_algorithm(self, alg_id, alg_obj): """ Registers a new Algorithm for use when creating and verifying tokens. @@ -110,14 +123,16 @@ class PyJWT(object): return b'.'.join(segments) - def decode(self, jwt, key='', verify=True, algorithms=None, **kwargs): + def decode(self, jwt, key='', verify=True, algorithms=None, options=None, **kwargs): payload, signing_input, header, signature = self._load(jwt) if verify: - self._verify_signature(payload, signing_input, header, signature, - key, algorithms) + merged_options = self._merge_options(override_options=options) + if merged_options.get('verify_signature'): + self._verify_signature(payload, signing_input, header, signature, + key, algorithms) - self._validate_claims(payload, **kwargs) + self._validate_claims(payload, options=merged_options, **kwargs) return payload @@ -177,8 +192,8 @@ class PyJWT(object): except KeyError: raise InvalidAlgorithmError('Algorithm not supported') - def _validate_claims(self, payload, verify_expiration=True, leeway=0, - audience=None, issuer=None): + def _validate_claims(self, payload, audience=None, issuer=None, leeway=0, + options=None, **kwargs): if isinstance(leeway, timedelta): leeway = timedelta_total_seconds(leeway) @@ -187,7 +202,7 @@ class PyJWT(object): now = timegm(datetime.utcnow().utctimetuple()) - if 'iat' in payload: + if 'iat' in payload and options.get('verify_iat'): try: iat = int(payload['iat']) except ValueError: @@ -196,7 +211,7 @@ class PyJWT(object): if iat > (now + leeway): raise InvalidIssuedAtError('Issued At claim (iat) cannot be in the future.') - if 'nbf' in payload and verify_expiration: + if 'nbf' in payload and options.get('verify_nbf'): try: nbf = int(payload['nbf']) except ValueError: @@ -205,7 +220,7 @@ class PyJWT(object): if nbf > (now + leeway): raise ImmatureSignatureError('The token is not yet valid (nbf)') - if 'exp' in payload and verify_expiration: + if 'exp' in payload and options.get('verify_exp'): try: exp = int(payload['exp']) except ValueError: @@ -214,7 +229,7 @@ class PyJWT(object): if exp < (now - leeway): raise ExpiredSignatureError('Signature has expired') - if 'aud' in payload: + if 'aud' in payload and options.get('verify_aud'): audience_claims = payload['aud'] if isinstance(audience_claims, string_types): audience_claims = [audience_claims] @@ -233,6 +248,21 @@ class PyJWT(object): if payload.get('iss') != issuer: raise InvalidIssuerError('Invalid issuer') + def _merge_options(self, default_options=None, override_options=None): + if not default_options: + default_options = {} + + if not override_options: + override_options = {} + + try: + merged_options = self.default_options.copy() + merged_options.update(override_options) + except (AttributeError, ValueError) as e: + raise TypeError('options must be a dictionary: %s' % e) + + return merged_options + _jwt_global_obj = PyJWT() encode = _jwt_global_obj.encode diff --git a/tests/test_api.py b/tests/test_api.py index f1734b4..33ccd51 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -71,6 +71,27 @@ class TestAPI(unittest.TestCase): self.assertNotIn('none', self.jwt.get_algorithms()) self.assertIn('HS256', self.jwt.get_algorithms()) + def test_default_options(self): + self.assertEqual(self.jwt.default_options, self.jwt.options) + + def test_override_options(self): + self.jwt = PyJWT(options={'verify_exp': False, 'verify_nbf': False}) + expected_options = self.jwt.default_options + expected_options['verify_exp'] = False + expected_options['verify_nbf'] = False + self.assertEqual(expected_options, self.jwt.options) + + def test_non_default_options_persist(self): + self.jwt = PyJWT(options={'verify_iat': False, 'foobar': False}) + expected_options = self.jwt.default_options + expected_options['verify_iat'] = False + expected_options['foobar'] = False + self.assertEqual(expected_options, self.jwt.options) + + def test_options_must_be_dict(self): + self.assertRaises(TypeError, PyJWT, options=object()) + self.assertRaises(TypeError, PyJWT, options=('something')) + def test_encode_decode(self): secret = 'secret' jwt_message = self.jwt.encode(self.payload, secret) @@ -467,14 +488,14 @@ class TestAPI(unittest.TestCase): secret = 'secret' jwt_message = self.jwt.encode(self.payload, secret) - self.jwt.decode(jwt_message, secret, verify_expiration=False) + self.jwt.decode(jwt_message, secret, options={'verify_exp': False}) def test_decode_skip_notbefore_verification(self): self.payload['nbf'] = time.time() + 10 secret = 'secret' jwt_message = self.jwt.encode(self.payload, secret) - self.jwt.decode(jwt_message, secret, verify_expiration=False) + self.jwt.decode(jwt_message, secret, options={'verify_nbf': False}) def test_decode_with_expiration_with_leeway(self): self.payload['exp'] = utc_timestamp() - 2 @@ -765,6 +786,52 @@ class TestAPI(unittest.TestCase): with self.assertRaises(InvalidIssuerError): self.jwt.decode(token, 'secret', issuer=issuer) + def test_skip_check_audience(self): + payload = { + 'some': 'payload', + 'aud': 'urn:me', + } + token = self.jwt.encode(payload, 'secret') + self.jwt.decode(token, 'secret', options={'verify_aud': False}) + + def test_skip_check_exp(self): + payload = { + 'some': 'payload', + 'exp': datetime.utcnow() - timedelta(days=1) + } + token = self.jwt.encode(payload, 'secret') + self.jwt.decode(token, 'secret', options={'verify_exp': False}) + + def test_skip_check_signature(self): + token = ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" + ".eyJzb21lIjoicGF5bG9hZCJ9" + ".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA") + self.jwt.decode(token, 'secret', options={'verify_signature': False}) + + def test_skip_check_iat(self): + payload = { + 'some': 'payload', + 'iat': datetime.utcnow() + timedelta(days=1) + } + token = self.jwt.encode(payload, 'secret') + self.jwt.decode(token, 'secret', options={'verify_iat': False}) + + def test_skip_check_nbf(self): + payload = { + 'some': 'payload', + 'nbf': datetime.utcnow() + timedelta(days=1) + } + token = self.jwt.encode(payload, 'secret') + self.jwt.decode(token, 'secret', options={'verify_nbf': False}) + + def test_decode_options_must_be_dict(self): + payload = { + 'some': 'payload', + } + token = self.jwt.encode(payload, 'secret') + self.assertRaises(TypeError, self.jwt.decode, token, 'secret', options=object()) + self.assertRaises(TypeError, self.jwt.decode, token, 'secret', options='something') + def test_custom_json_encoder(self): class CustomJSONEncoder(json.JSONEncoder): |