summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Adams <mark@markadams.me>2015-04-08 21:18:05 -0500
committerMark Adams <mark@markadams.me>2015-04-08 21:18:05 -0500
commit2f4c770d8b1550d9bdafd292c7bae07ff4fe662d (patch)
treecbb7c8facc55484324bd18c924700d5d65cbc356
parenta2601ad46433a99c8777a74abeaf4dfd70630d17 (diff)
parent6e6046cd9f5b0a5b48e37f3a5eb7bfb7efaad958 (diff)
downloadpyjwt-2f4c770d8b1550d9bdafd292c7bae07ff4fe662d.tar.gz
Merge pull request #131 from michaeldavis-wf/options-dict
Added `options=` argument to decode()
-rw-r--r--AUTHORS2
-rw-r--r--CHANGELOG.md1
-rw-r--r--README.md28
-rw-r--r--jwt/api.py52
-rw-r--r--tests/test_api.py71
5 files changed, 141 insertions, 13 deletions
diff --git a/AUTHORS b/AUTHORS
index be65bf8..02fbc3b 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -21,3 +21,5 @@ Patches and Suggestions
- Mark Adams <mark@markadams.me>
- Wouter Bolsterlee <uws@xs4all.nl>
+
+ - Michael Davis <mike.philip.davis@gmail.com> <mike.davis@workiva.com>
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ced0519..1564f5e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@ This project adheres to [Semantic Versioning](http://semver.org/).
-------------------------------------------------------------------------
### Changed
- Added this CHANGELOG.md file
+- Added flexible and complete verification options. #131
### Fixed
- Placeholder
diff --git a/README.md b/README.md
index 167f78e..5ae0b40 100644
--- a/README.md
+++ b/README.md
@@ -62,6 +62,34 @@ except jwt.InvalidTokenError:
pass # do something sensible here, e.g. return HTTP 403 status code
```
+You may also override exception checking via an `options` dictionary. The default
+options are as follows:
+
+```python
+options = {
+ 'verify_signature': True,
+ 'verify_exp': True,
+ 'verify_nbf': True,
+ 'verify_iat': True,
+ 'verify_aud`: True
+}
+```
+
+You can skip individual checks by passing an `options` dictionary with certain keys set to `False`.
+For example, if you want to verify the signature of a JWT that has already expired.
+
+```python
+options = {
+ 'verify_exp': True,
+}
+
+jwt.decode('someJWTstring', 'secret', options=options)
+```
+
+**NOTE**: *Changing the default behavior is done at your own risk, and almost certainly will make your
+application less secure. Doing so should only be done with a very clear understanding of what you
+are doing.*
+
## Tests
You can run tests from the project root after cloning with:
diff --git a/jwt/api.py b/jwt/api.py
index 6d39d8d..68d30f6 100644
--- a/jwt/api.py
+++ b/jwt/api.py
@@ -16,7 +16,7 @@ from .utils import base64url_decode, base64url_encode
class PyJWT(object):
- def __init__(self, algorithms=None):
+ def __init__(self, algorithms=None, options=None):
self._algorithms = get_default_algorithms()
self._valid_algs = set(algorithms) if algorithms is not None else set(self._algorithms)
@@ -25,6 +25,19 @@ class PyJWT(object):
if key not in self._valid_algs:
del self._algorithms[key]
+ if not options:
+ options = {}
+
+ self.default_options = {
+ 'verify_signature': True,
+ 'verify_exp': True,
+ 'verify_nbf': True,
+ 'verify_iat': True,
+ 'verify_aud': True,
+ }
+
+ self.options = self._merge_options(self.default_options, options)
+
def register_algorithm(self, alg_id, alg_obj):
"""
Registers a new Algorithm for use when creating and verifying tokens.
@@ -110,14 +123,16 @@ class PyJWT(object):
return b'.'.join(segments)
- def decode(self, jwt, key='', verify=True, algorithms=None, **kwargs):
+ def decode(self, jwt, key='', verify=True, algorithms=None, options=None, **kwargs):
payload, signing_input, header, signature = self._load(jwt)
if verify:
- self._verify_signature(payload, signing_input, header, signature,
- key, algorithms)
+ merged_options = self._merge_options(override_options=options)
+ if merged_options.get('verify_signature'):
+ self._verify_signature(payload, signing_input, header, signature,
+ key, algorithms)
- self._validate_claims(payload, **kwargs)
+ self._validate_claims(payload, options=merged_options, **kwargs)
return payload
@@ -177,8 +192,8 @@ class PyJWT(object):
except KeyError:
raise InvalidAlgorithmError('Algorithm not supported')
- def _validate_claims(self, payload, verify_expiration=True, leeway=0,
- audience=None, issuer=None):
+ def _validate_claims(self, payload, audience=None, issuer=None, leeway=0,
+ options=None, **kwargs):
if isinstance(leeway, timedelta):
leeway = timedelta_total_seconds(leeway)
@@ -187,7 +202,7 @@ class PyJWT(object):
now = timegm(datetime.utcnow().utctimetuple())
- if 'iat' in payload:
+ if 'iat' in payload and options.get('verify_iat'):
try:
iat = int(payload['iat'])
except ValueError:
@@ -196,7 +211,7 @@ class PyJWT(object):
if iat > (now + leeway):
raise InvalidIssuedAtError('Issued At claim (iat) cannot be in the future.')
- if 'nbf' in payload and verify_expiration:
+ if 'nbf' in payload and options.get('verify_nbf'):
try:
nbf = int(payload['nbf'])
except ValueError:
@@ -205,7 +220,7 @@ class PyJWT(object):
if nbf > (now + leeway):
raise ImmatureSignatureError('The token is not yet valid (nbf)')
- if 'exp' in payload and verify_expiration:
+ if 'exp' in payload and options.get('verify_exp'):
try:
exp = int(payload['exp'])
except ValueError:
@@ -214,7 +229,7 @@ class PyJWT(object):
if exp < (now - leeway):
raise ExpiredSignatureError('Signature has expired')
- if 'aud' in payload:
+ if 'aud' in payload and options.get('verify_aud'):
audience_claims = payload['aud']
if isinstance(audience_claims, string_types):
audience_claims = [audience_claims]
@@ -233,6 +248,21 @@ class PyJWT(object):
if payload.get('iss') != issuer:
raise InvalidIssuerError('Invalid issuer')
+ def _merge_options(self, default_options=None, override_options=None):
+ if not default_options:
+ default_options = {}
+
+ if not override_options:
+ override_options = {}
+
+ try:
+ merged_options = self.default_options.copy()
+ merged_options.update(override_options)
+ except (AttributeError, ValueError) as e:
+ raise TypeError('options must be a dictionary: %s' % e)
+
+ return merged_options
+
_jwt_global_obj = PyJWT()
encode = _jwt_global_obj.encode
diff --git a/tests/test_api.py b/tests/test_api.py
index f1734b4..33ccd51 100644
--- a/tests/test_api.py
+++ b/tests/test_api.py
@@ -71,6 +71,27 @@ class TestAPI(unittest.TestCase):
self.assertNotIn('none', self.jwt.get_algorithms())
self.assertIn('HS256', self.jwt.get_algorithms())
+ def test_default_options(self):
+ self.assertEqual(self.jwt.default_options, self.jwt.options)
+
+ def test_override_options(self):
+ self.jwt = PyJWT(options={'verify_exp': False, 'verify_nbf': False})
+ expected_options = self.jwt.default_options
+ expected_options['verify_exp'] = False
+ expected_options['verify_nbf'] = False
+ self.assertEqual(expected_options, self.jwt.options)
+
+ def test_non_default_options_persist(self):
+ self.jwt = PyJWT(options={'verify_iat': False, 'foobar': False})
+ expected_options = self.jwt.default_options
+ expected_options['verify_iat'] = False
+ expected_options['foobar'] = False
+ self.assertEqual(expected_options, self.jwt.options)
+
+ def test_options_must_be_dict(self):
+ self.assertRaises(TypeError, PyJWT, options=object())
+ self.assertRaises(TypeError, PyJWT, options=('something'))
+
def test_encode_decode(self):
secret = 'secret'
jwt_message = self.jwt.encode(self.payload, secret)
@@ -467,14 +488,14 @@ class TestAPI(unittest.TestCase):
secret = 'secret'
jwt_message = self.jwt.encode(self.payload, secret)
- self.jwt.decode(jwt_message, secret, verify_expiration=False)
+ self.jwt.decode(jwt_message, secret, options={'verify_exp': False})
def test_decode_skip_notbefore_verification(self):
self.payload['nbf'] = time.time() + 10
secret = 'secret'
jwt_message = self.jwt.encode(self.payload, secret)
- self.jwt.decode(jwt_message, secret, verify_expiration=False)
+ self.jwt.decode(jwt_message, secret, options={'verify_nbf': False})
def test_decode_with_expiration_with_leeway(self):
self.payload['exp'] = utc_timestamp() - 2
@@ -765,6 +786,52 @@ class TestAPI(unittest.TestCase):
with self.assertRaises(InvalidIssuerError):
self.jwt.decode(token, 'secret', issuer=issuer)
+ def test_skip_check_audience(self):
+ payload = {
+ 'some': 'payload',
+ 'aud': 'urn:me',
+ }
+ token = self.jwt.encode(payload, 'secret')
+ self.jwt.decode(token, 'secret', options={'verify_aud': False})
+
+ def test_skip_check_exp(self):
+ payload = {
+ 'some': 'payload',
+ 'exp': datetime.utcnow() - timedelta(days=1)
+ }
+ token = self.jwt.encode(payload, 'secret')
+ self.jwt.decode(token, 'secret', options={'verify_exp': False})
+
+ def test_skip_check_signature(self):
+ token = ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
+ ".eyJzb21lIjoicGF5bG9hZCJ9"
+ ".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA")
+ self.jwt.decode(token, 'secret', options={'verify_signature': False})
+
+ def test_skip_check_iat(self):
+ payload = {
+ 'some': 'payload',
+ 'iat': datetime.utcnow() + timedelta(days=1)
+ }
+ token = self.jwt.encode(payload, 'secret')
+ self.jwt.decode(token, 'secret', options={'verify_iat': False})
+
+ def test_skip_check_nbf(self):
+ payload = {
+ 'some': 'payload',
+ 'nbf': datetime.utcnow() + timedelta(days=1)
+ }
+ token = self.jwt.encode(payload, 'secret')
+ self.jwt.decode(token, 'secret', options={'verify_nbf': False})
+
+ def test_decode_options_must_be_dict(self):
+ payload = {
+ 'some': 'payload',
+ }
+ token = self.jwt.encode(payload, 'secret')
+ self.assertRaises(TypeError, self.jwt.decode, token, 'secret', options=object())
+ self.assertRaises(TypeError, self.jwt.decode, token, 'secret', options='something')
+
def test_custom_json_encoder(self):
class CustomJSONEncoder(json.JSONEncoder):