diff options
author | Mark Adams <mark@markadams.me> | 2015-06-02 20:18:13 -0500 |
---|---|---|
committer | Mark Adams <mark@markadams.me> | 2015-06-02 21:11:08 -0500 |
commit | 12791c7875dda323835b8e0b9c687d17ba0e641b (patch) | |
tree | a28499defbe581e00dbd4c8ea4c8644f2d4a778c | |
parent | 32a577fc799010c54415bc5884b58a0f9ebf26a8 (diff) | |
download | pyjwt-12791c7875dda323835b8e0b9c687d17ba0e641b.tar.gz |
Added new options for requiring exp, iat, and nbf claims.
Thanks to David Black <dblack@atlassian.com> for the suggestion.
-rw-r--r-- | CHANGELOG.md | 7 | ||||
-rw-r--r-- | jwt/__init__.py | 3 | ||||
-rw-r--r-- | jwt/api_jwt.py | 23 | ||||
-rw-r--r-- | jwt/exceptions.py | 8 | ||||
-rw-r--r-- | tests/test_api_jwt.py | 65 | ||||
-rw-r--r-- | tests/test_exceptions.py | 7 |
6 files changed, 96 insertions, 17 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ccb129..52abcf8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,13 @@ This project adheres to [Semantic Versioning](http://semver.org/). ### Fixed - Exclude Python cache files from PyPI releases. +### Added +- Added new options to require certain claims + (require_nbf, require_iat, require_exp) and raise `MissingRequiredClaimError` + if they are not present. +- If `audience=` or `issuer=` is specified but the claim is not present, + `MissingRequiredClaimError` is now raised instead of `InvalidAudienceError` + and `InvalidIssuerError` [v1.3][1.3.0] ------------------------------------------------------------------------- diff --git a/jwt/__init__.py b/jwt/__init__.py index 63b4a53..7f8e65a 100644 --- a/jwt/__init__.py +++ b/jwt/__init__.py @@ -24,5 +24,6 @@ from .api_jws import PyJWS from .exceptions import ( InvalidTokenError, DecodeError, InvalidAudienceError, ExpiredSignatureError, ImmatureSignatureError, InvalidIssuedAtError, - InvalidIssuerError, ExpiredSignature, InvalidAudience, InvalidIssuer + InvalidIssuerError, ExpiredSignature, InvalidAudience, InvalidIssuer, + MissingRequiredClaimError ) diff --git a/jwt/api_jwt.py b/jwt/api_jwt.py index 09290e1..9703b8d 100644 --- a/jwt/api_jwt.py +++ b/jwt/api_jwt.py @@ -11,7 +11,7 @@ from .compat import string_types, timedelta_total_seconds from .exceptions import ( DecodeError, ExpiredSignatureError, ImmatureSignatureError, InvalidAudienceError, InvalidIssuedAtError, - InvalidIssuerError + InvalidIssuerError, MissingRequiredClaimError ) from .utils import merge_dict @@ -27,7 +27,10 @@ class PyJWT(PyJWS): 'verify_nbf': True, 'verify_iat': True, 'verify_aud': True, - 'verify_iss': True + 'verify_iss': True, + 'require_exp': False, + 'require_iat': False, + 'require_nbf': False } def encode(self, payload, key, algorithm='HS256', headers=None, @@ -87,6 +90,8 @@ class PyJWT(PyJWS): if not isinstance(audience, (string_types, type(None))): raise TypeError('audience must be a string or None') + self._validate_required_claims(payload, options) + now = timegm(datetime.utcnow().utctimetuple()) if 'iat' in payload and options.get('verify_iat'): @@ -104,6 +109,16 @@ class PyJWT(PyJWS): if options.get('verify_aud'): self._validate_aud(payload, audience) + def _validate_required_claims(self, payload, options): + if options.get('require_exp') and payload.get('exp') is None: + raise MissingRequiredClaimError('exp') + + if options.get('require_iat') and payload.get('iat') is None: + raise MissingRequiredClaimError('iat') + + if options.get('require_nbf') and payload.get('nbf') is None: + raise MissingRequiredClaimError('nbf') + def _validate_iat(self, payload, now, leeway): try: iat = int(payload['iat']) @@ -140,7 +155,7 @@ class PyJWT(PyJWS): if audience is not None and 'aud' not in payload: # Application specified an audience, but it could not be # verified since the token does not contain a claim. - raise InvalidAudienceError('No audience claim in token') + raise MissingRequiredClaimError('aud') audience_claims = payload['aud'] @@ -158,7 +173,7 @@ class PyJWT(PyJWS): return if 'iss' not in payload: - raise InvalidIssuerError('Token does not contain an iss claim') + raise MissingRequiredClaimError('iss') if payload['iss'] != issuer: raise InvalidIssuerError('Invalid issuer') diff --git a/jwt/exceptions.py b/jwt/exceptions.py index 0e82e6f..31177a0 100644 --- a/jwt/exceptions.py +++ b/jwt/exceptions.py @@ -34,6 +34,14 @@ class InvalidAlgorithmError(InvalidTokenError): pass +class MissingRequiredClaimError(InvalidTokenError): + def __init__(self, claim): + self.claim = claim + + def __str__(self): + return 'Token is missing the "%s" claim' % self.claim + + # Compatibility aliases (deprecated) ExpiredSignature = ExpiredSignatureError InvalidAudience = InvalidAudienceError diff --git a/tests/test_api_jwt.py b/tests/test_api_jwt.py index 7d33a9e..211f0df 100644 --- a/tests/test_api_jwt.py +++ b/tests/test_api_jwt.py @@ -9,7 +9,8 @@ from decimal import Decimal from jwt.api_jwt import PyJWT from jwt.exceptions import ( DecodeError, ExpiredSignatureError, ImmatureSignatureError, - InvalidAudienceError, InvalidIssuedAtError, InvalidIssuerError + InvalidAudienceError, InvalidIssuedAtError, InvalidIssuerError, + MissingRequiredClaimError ) import pytest @@ -317,15 +318,31 @@ class TestJWT: with pytest.raises(InvalidAudienceError): jwt.decode(token, 'secret', audience='urn:me') + def test_raise_exception_token_without_issuer(self, jwt): + issuer = 'urn:wrong' + + payload = { + 'some': 'payload' + } + + token = jwt.encode(payload, 'secret') + + with pytest.raises(MissingRequiredClaimError) as exc: + jwt.decode(token, 'secret', issuer=issuer) + + assert exc.value.claim == 'iss' + def test_raise_exception_token_without_audience(self, jwt): payload = { 'some': 'payload', } token = jwt.encode(payload, 'secret') - with pytest.raises(InvalidAudienceError): + with pytest.raises(MissingRequiredClaimError) as exc: jwt.decode(token, 'secret', audience='urn:me') + assert exc.value.claim == 'aud' + def test_check_issuer_when_valid(self, jwt): issuer = 'urn:foo' payload = { @@ -348,33 +365,57 @@ class TestJWT: with pytest.raises(InvalidIssuerError): jwt.decode(token, 'secret', issuer=issuer) - def test_raise_exception_token_without_issuer(self, jwt): - issuer = 'urn:wrong' + def test_skip_check_audience(self, jwt): + payload = { + 'some': 'payload', + 'aud': 'urn:me', + } + token = jwt.encode(payload, 'secret') + jwt.decode(token, 'secret', options={'verify_aud': False}) + def test_skip_check_exp(self, jwt): payload = { 'some': 'payload', + 'exp': datetime.utcnow() - timedelta(days=1) } + token = jwt.encode(payload, 'secret') + jwt.decode(token, 'secret', options={'verify_exp': False}) + def test_decode_should_raise_error_if_exp_required_but_not_present(self, jwt): + payload = { + 'some': 'payload', + # exp not present + } token = jwt.encode(payload, 'secret') - with pytest.raises(InvalidIssuerError): - jwt.decode(token, 'secret', issuer=issuer) + with pytest.raises(MissingRequiredClaimError) as exc: + jwt.decode(token, 'secret', options={'require_exp': True}) - def test_skip_check_audience(self, jwt): + assert exc.value.claim == 'exp' + + def test_decode_should_raise_error_if_iat_required_but_not_present(self, jwt): payload = { 'some': 'payload', - 'aud': 'urn:me', + # iat not present } token = jwt.encode(payload, 'secret') - jwt.decode(token, 'secret', options={'verify_aud': False}) - def test_skip_check_exp(self, jwt): + with pytest.raises(MissingRequiredClaimError) as exc: + jwt.decode(token, 'secret', options={'require_iat': True}) + + assert exc.value.claim == 'iat' + + def test_decode_should_raise_error_if_nbf_required_but_not_present(self, jwt): payload = { 'some': 'payload', - 'exp': datetime.utcnow() - timedelta(days=1) + # nbf not present } token = jwt.encode(payload, 'secret') - jwt.decode(token, 'secret', options={'verify_exp': False}) + + with pytest.raises(MissingRequiredClaimError) as exc: + jwt.decode(token, 'secret', options={'require_nbf': True}) + + assert exc.value.claim == 'nbf' def test_skip_check_signature(self, jwt): token = ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py new file mode 100644 index 0000000..9e7f91e --- /dev/null +++ b/tests/test_exceptions.py @@ -0,0 +1,7 @@ +from jwt.exceptions import MissingRequiredClaimError + + +def test_missing_required_claim_error_has_proper_str(): + exc = MissingRequiredClaimError('abc') + + assert str(exc) == 'Token is missing the "abc" claim' |