summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jwt/__init__.py2
-rw-r--r--jwt/algorithms.py23
-rw-r--r--jwt/api.py332
-rw-r--r--tests/test_jwt.py353
4 files changed, 359 insertions, 351 deletions
diff --git a/jwt/__init__.py b/jwt/__init__.py
index 892fb18..3a5a2d5 100644
--- a/jwt/__init__.py
+++ b/jwt/__init__.py
@@ -16,7 +16,7 @@ __license__ = 'MIT'
__copyright__ = 'Copyright 2015 José Padilla'
-from .api import encode, decode, register_algorithm
+from .api import encode, decode, register_algorithm, PyJWT
from .exceptions import (
InvalidTokenError, DecodeError, ExpiredSignatureError,
InvalidAudienceError, InvalidIssuerError,
diff --git a/jwt/algorithms.py b/jwt/algorithms.py
index 720d675..4616378 100644
--- a/jwt/algorithms.py
+++ b/jwt/algorithms.py
@@ -1,7 +1,6 @@
import hashlib
import hmac
-from .api import register_algorithm
from .compat import constant_time_compare, string_types, text_type
try:
@@ -18,23 +17,23 @@ except ImportError:
has_crypto = False
-def _register_default_algorithms():
+def _register_default_algorithms(pyjwt_obj):
"""
Registers the algorithms that are implemented by the library.
"""
- register_algorithm('none', NoneAlgorithm())
- register_algorithm('HS256', HMACAlgorithm(HMACAlgorithm.SHA256))
- register_algorithm('HS384', HMACAlgorithm(HMACAlgorithm.SHA384))
- register_algorithm('HS512', HMACAlgorithm(HMACAlgorithm.SHA512))
+ pyjwt_obj.register_algorithm('none', NoneAlgorithm())
+ pyjwt_obj.register_algorithm('HS256', HMACAlgorithm(HMACAlgorithm.SHA256))
+ pyjwt_obj.register_algorithm('HS384', HMACAlgorithm(HMACAlgorithm.SHA384))
+ pyjwt_obj.register_algorithm('HS512', HMACAlgorithm(HMACAlgorithm.SHA512))
if has_crypto:
- register_algorithm('RS256', RSAAlgorithm(RSAAlgorithm.SHA256))
- register_algorithm('RS384', RSAAlgorithm(RSAAlgorithm.SHA384))
- register_algorithm('RS512', RSAAlgorithm(RSAAlgorithm.SHA512))
+ pyjwt_obj.register_algorithm('RS256', RSAAlgorithm(RSAAlgorithm.SHA256()))
+ pyjwt_obj.register_algorithm('RS384', RSAAlgorithm(RSAAlgorithm.SHA384()))
+ pyjwt_obj.register_algorithm('RS512', RSAAlgorithm(RSAAlgorithm.SHA512()))
- register_algorithm('ES256', ECAlgorithm(ECAlgorithm.SHA256))
- register_algorithm('ES384', ECAlgorithm(ECAlgorithm.SHA384))
- register_algorithm('ES512', ECAlgorithm(ECAlgorithm.SHA512))
+ pyjwt_obj.register_algorithm('ES256', ECAlgorithm(ECAlgorithm.SHA256()))
+ pyjwt_obj.register_algorithm('ES384', ECAlgorithm(ECAlgorithm.SHA384()))
+ pyjwt_obj.register_algorithm('ES512', ECAlgorithm(ECAlgorithm.SHA512()))
class Algorithm(object):
diff --git a/jwt/api.py b/jwt/api.py
index e509e23..3c70e88 100644
--- a/jwt/api.py
+++ b/jwt/api.py
@@ -12,175 +12,179 @@ from .exceptions import (
)
from .utils import base64url_decode, base64url_encode
-
-_algorithms = {}
-
-
-def register_algorithm(alg_id, alg_obj):
- """
- Registers a new Algorithm for use when creating and verifying tokens.
- """
- if alg_id in _algorithms:
- raise ValueError('Algorithm already has a handler.')
-
- if not isinstance(alg_obj, Algorithm):
- raise TypeError('Object is not of type `Algorithm`')
-
- _algorithms[alg_id] = alg_obj
-
from jwt.algorithms import Algorithm, _register_default_algorithms # NOQA
-_register_default_algorithms()
-
-
-def encode(payload, key, algorithm='HS256', headers=None, json_encoder=None):
- segments = []
-
- if algorithm is None:
- algorithm = 'none'
-
- # Check that we get a mapping
- if not isinstance(payload, Mapping):
- raise TypeError('Expecting a mapping object, as json web token only'
- 'support json objects.')
- # Header
- header = {'typ': 'JWT', 'alg': algorithm}
- if headers:
- header.update(headers)
+class PyJWT(object):
+ def __init__(self):
+ self._algorithms = {}
+ _register_default_algorithms(self)
+
+ def register_algorithm(self, alg_id, alg_obj):
+ """
+ Registers a new Algorithm for use when creating and verifying tokens.
+ """
+ if alg_id in self._algorithms:
+ raise ValueError('Algorithm already has a handler.')
+
+ if not isinstance(alg_obj, Algorithm):
+ raise TypeError('Object is not of type `Algorithm`')
+
+ self._algorithms[alg_id] = alg_obj
+
+ def encode(self, payload, key, algorithm='HS256', headers=None, json_encoder=None):
+ segments = []
+
+ if algorithm is None:
+ algorithm = 'none'
+
+ # Check that we get a mapping
+ if not isinstance(payload, Mapping):
+ raise TypeError('Expecting a mapping object, as json web token only'
+ 'support json objects.')
+
+ # Header
+ header = {'typ': 'JWT', 'alg': algorithm}
+ if headers:
+ header.update(headers)
- json_header = json.dumps(
- header,
- separators=(',', ':'),
- cls=json_encoder
- ).encode('utf-8')
+ json_header = json.dumps(
+ header,
+ separators=(',', ':'),
+ cls=json_encoder
+ ).encode('utf-8')
- segments.append(base64url_encode(json_header))
+ segments.append(base64url_encode(json_header))
- # Payload
- for time_claim in ['exp', 'iat', 'nbf']:
- # Convert datetime to a intDate value in known time-format claims
- if isinstance(payload.get(time_claim), datetime):
- payload[time_claim] = timegm(payload[time_claim].utctimetuple())
+ # Payload
+ for time_claim in ['exp', 'iat', 'nbf']:
+ # Convert datetime to a intDate value in known time-format claims
+ if isinstance(payload.get(time_claim), datetime):
+ payload[time_claim] = timegm(payload[time_claim].utctimetuple())
- json_payload = json.dumps(
- payload,
- separators=(',', ':'),
- cls=json_encoder
- ).encode('utf-8')
+ json_payload = json.dumps(
+ payload,
+ separators=(',', ':'),
+ cls=json_encoder
+ ).encode('utf-8')
- segments.append(base64url_encode(json_payload))
+ segments.append(base64url_encode(json_payload))
- # Segments
- signing_input = b'.'.join(segments)
- try:
- alg_obj = _algorithms[algorithm]
- key = alg_obj.prepare_key(key)
- signature = alg_obj.sign(signing_input, key)
-
- except KeyError:
- raise NotImplementedError('Algorithm not supported')
-
- segments.append(base64url_encode(signature))
-
- return b'.'.join(segments)
-
-
-def decode(jwt, key='', verify=True, **kwargs):
- payload, signing_input, header, signature = load(jwt)
-
- if verify:
- verify_signature(payload, signing_input, header, signature, key,
- **kwargs)
-
- return payload
-
-
-def load(jwt):
- if isinstance(jwt, text_type):
- jwt = jwt.encode('utf-8')
- try:
- signing_input, crypto_segment = jwt.rsplit(b'.', 1)
- header_segment, payload_segment = signing_input.split(b'.', 1)
- except ValueError:
- raise DecodeError('Not enough segments')
-
- try:
- header_data = base64url_decode(header_segment)
- except (TypeError, binascii.Error):
- raise DecodeError('Invalid header padding')
- try:
- header = json.loads(header_data.decode('utf-8'))
- except ValueError as e:
- raise DecodeError('Invalid header string: %s' % e)
- if not isinstance(header, Mapping):
- raise DecodeError('Invalid header string: must be a json object')
-
- try:
- payload_data = base64url_decode(payload_segment)
- except (TypeError, binascii.Error):
- raise DecodeError('Invalid payload padding')
- try:
- payload = json.loads(payload_data.decode('utf-8'))
- except ValueError as e:
- raise DecodeError('Invalid payload string: %s' % e)
- if not isinstance(payload, Mapping):
- raise DecodeError('Invalid payload string: must be a json object')
-
- try:
- signature = base64url_decode(crypto_segment)
- except (TypeError, binascii.Error):
- raise DecodeError('Invalid crypto padding')
-
- return (payload, signing_input, header, signature)
-
-
-def verify_signature(payload, signing_input, header, signature, key='',
- verify_expiration=True, leeway=0, audience=None,
- issuer=None):
-
- if isinstance(leeway, timedelta):
- leeway = timedelta_total_seconds(leeway)
-
- if not isinstance(audience, (string_types, type(None))):
- raise TypeError('audience must be a string or None')
-
- try:
- alg_obj = _algorithms[header['alg']]
- key = alg_obj.prepare_key(key)
-
- if not alg_obj.verify(signing_input, key, signature):
- raise DecodeError('Signature verification failed')
-
- except KeyError:
- raise DecodeError('Algorithm not supported')
-
- if 'nbf' in payload and verify_expiration:
- utc_timestamp = timegm(datetime.utcnow().utctimetuple())
-
- if payload['nbf'] > (utc_timestamp + leeway):
- raise ExpiredSignatureError('Signature not yet valid')
-
- if 'exp' in payload and verify_expiration:
- utc_timestamp = timegm(datetime.utcnow().utctimetuple())
-
- if payload['exp'] < (utc_timestamp - leeway):
- raise ExpiredSignatureError('Signature has expired')
-
- if 'aud' in payload:
- audience_claims = payload['aud']
- if isinstance(audience_claims, string_types):
- audience_claims = [audience_claims]
- if not isinstance(audience_claims, list):
- raise InvalidAudienceError('Invalid claim format in token')
- if any(not isinstance(c, string_types) for c in audience_claims):
- raise InvalidAudienceError('Invalid claim format in token')
- if audience not in audience_claims:
- raise InvalidAudienceError('Invalid audience')
- elif audience is not None:
- # Application specified an audience, but it could not be
- # verified since the token does not contain a claim.
- raise InvalidAudienceError('No audience claim in token')
-
- if issuer is not None:
- if payload.get('iss') != issuer:
- raise InvalidIssuerError('Invalid issuer')
+ # Segments
+ signing_input = b'.'.join(segments)
+ try:
+ alg_obj = self._algorithms[algorithm]
+ key = alg_obj.prepare_key(key)
+ signature = alg_obj.sign(signing_input, key)
+
+ except KeyError:
+ raise NotImplementedError('Algorithm not supported')
+
+ segments.append(base64url_encode(signature))
+
+ return b'.'.join(segments)
+
+
+ def decode(self, jwt, key='', verify=True, **kwargs):
+ payload, signing_input, header, signature = self._load(jwt)
+
+ if verify:
+ self._verify_signature(payload, signing_input, header, signature,
+ key, **kwargs)
+
+ return payload
+
+
+ def _load(self, jwt):
+ if isinstance(jwt, text_type):
+ jwt = jwt.encode('utf-8')
+ try:
+ signing_input, crypto_segment = jwt.rsplit(b'.', 1)
+ header_segment, payload_segment = signing_input.split(b'.', 1)
+ except ValueError:
+ raise DecodeError('Not enough segments')
+
+ try:
+ header_data = base64url_decode(header_segment)
+ except (TypeError, binascii.Error):
+ raise DecodeError('Invalid header padding')
+ try:
+ header = json.loads(header_data.decode('utf-8'))
+ except ValueError as e:
+ raise DecodeError('Invalid header string: %s' % e)
+ if not isinstance(header, Mapping):
+ raise DecodeError('Invalid header string: must be a json object')
+
+ try:
+ payload_data = base64url_decode(payload_segment)
+ except (TypeError, binascii.Error):
+ raise DecodeError('Invalid payload padding')
+ try:
+ payload = json.loads(payload_data.decode('utf-8'))
+ except ValueError as e:
+ raise DecodeError('Invalid payload string: %s' % e)
+ if not isinstance(payload, Mapping):
+ raise DecodeError('Invalid payload string: must be a json object')
+
+ try:
+ signature = base64url_decode(crypto_segment)
+ except (TypeError, binascii.Error):
+ raise DecodeError('Invalid crypto padding')
+
+ return (payload, signing_input, header, signature)
+
+
+ def _verify_signature(self, payload, signing_input, header, signature,
+ key='', verify_expiration=True, leeway=0,
+ audience=None, issuer=None):
+
+ if isinstance(leeway, timedelta):
+ leeway = timedelta_total_seconds(leeway)
+
+ if not isinstance(audience, (string_types, type(None))):
+ raise TypeError('audience must be a string or None')
+
+ try:
+ alg_obj = self._algorithms[header['alg']]
+ key = alg_obj.prepare_key(key)
+
+ if not alg_obj.verify(signing_input, key, signature):
+ raise DecodeError('Signature verification failed')
+
+ except KeyError:
+ raise DecodeError('Algorithm not supported')
+
+ if 'nbf' in payload and verify_expiration:
+ utc_timestamp = timegm(datetime.utcnow().utctimetuple())
+
+ if payload['nbf'] > (utc_timestamp + leeway):
+ raise ExpiredSignatureError('Signature not yet valid')
+
+ if 'exp' in payload and verify_expiration:
+ utc_timestamp = timegm(datetime.utcnow().utctimetuple())
+
+ if payload['exp'] < (utc_timestamp - leeway):
+ raise ExpiredSignatureError('Signature has expired')
+
+ if 'aud' in payload:
+ audience_claims = payload['aud']
+ if isinstance(audience_claims, string_types):
+ audience_claims = [audience_claims]
+ if not isinstance(audience_claims, list):
+ raise InvalidAudienceError('Invalid claim format in token')
+ if any(not isinstance(c, string_types) for c in audience_claims):
+ raise InvalidAudienceError('Invalid claim format in token')
+ if audience not in audience_claims:
+ raise InvalidAudienceError('Invalid audience')
+ elif audience is not None:
+ # Application specified an audience, but it could not be
+ # verified since the token does not contain a claim.
+ raise InvalidAudienceError('No audience claim in token')
+
+ if issuer is not None:
+ if payload.get('iss') != issuer:
+ raise InvalidIssuerError('Invalid issuer')
+
+_jwt_global_obj = PyJWT()
+encode = _jwt_global_obj.encode
+decode = _jwt_global_obj.decode
+register_algorithm = _jwt_global_obj.register_algorithm
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index 4eeef24..59f5bc9 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -8,11 +8,7 @@ from decimal import Decimal
import jwt
from jwt.algorithms import Algorithm
-from jwt.api import (
- _algorithms as jwt_algorithms,
- load as jwt_load,
- verify_signature as jwt_verify_signature
-)
+from jwt.api import PyJWT
from jwt.exceptions import DecodeError, InvalidAudienceError
from .compat import text_type, unittest
@@ -38,6 +34,7 @@ class TestJWT(unittest.TestCase):
def setUp(self): # noqa
self.payload = {'iss': 'jeff', 'exp': utc_timestamp() + 15,
'claim': 'insanity'}
+ self.jwt = PyJWT()
def test_register_algorithm_does_not_allow_duplicate_registration(self):
jwt.register_algorithm('AAA', Algorithm())
@@ -51,8 +48,8 @@ class TestJWT(unittest.TestCase):
def test_encode_decode(self):
secret = 'secret'
- jwt_message = jwt.encode(self.payload, secret)
- decoded_payload = jwt.decode(jwt_message, secret)
+ jwt_message = self.jwt.encode(self.payload, secret)
+ decoded_payload = self.jwt.decode(jwt_message, secret)
self.assertEqual(decoded_payload, self.payload)
@@ -63,7 +60,7 @@ class TestJWT(unittest.TestCase):
'.eyJoZWxsbyI6ICJ3b3JsZCJ9'
'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
- jwt.decode(unicode_jwt, secret)
+ self.jwt.decode(unicode_jwt, secret)
def test_decode_missing_segments_throws_exception(self):
secret = 'secret'
@@ -72,7 +69,7 @@ class TestJWT(unittest.TestCase):
'') # Missing segment
with self.assertRaises(DecodeError) as context:
- jwt.decode(example_jwt, secret)
+ self.jwt.decode(example_jwt, secret)
exception = context.exception
self.assertEqual(str(exception), 'Not enough segments')
@@ -84,7 +81,7 @@ class TestJWT(unittest.TestCase):
'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
with self.assertRaises(DecodeError) as context:
- jwt.decode(example_jwt, secret)
+ self.jwt.decode(example_jwt, secret)
exception = context.exception
self.assertEqual(str(exception), 'Invalid header string: must be a json object')
@@ -96,7 +93,7 @@ class TestJWT(unittest.TestCase):
'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
with self.assertRaises(DecodeError) as context:
- jwt.decode(example_jwt, secret)
+ self.jwt.decode(example_jwt, secret)
exception = context.exception
self.assertEqual(str(exception), 'Invalid payload string: must be a json object')
@@ -108,7 +105,7 @@ class TestJWT(unittest.TestCase):
'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
with self.assertRaises(TypeError) as context:
- jwt.decode(example_jwt, secret, audience=1)
+ self.jwt.decode(example_jwt, secret, audience=1)
exception = context.exception
self.assertEqual(str(exception), 'audience must be a string or None')
@@ -120,7 +117,7 @@ class TestJWT(unittest.TestCase):
'.Rof08LBSwbm8Z_bhA2N3DFY-utZR1Gi9rbIS5Zthnnc')
with self.assertRaises(InvalidAudienceError) as context:
- jwt.decode(example_jwt, secret, audience='my_audience')
+ self.jwt.decode(example_jwt, secret, audience='my_audience')
exception = context.exception
self.assertEqual(str(exception), 'Invalid claim format in token')
@@ -132,7 +129,7 @@ class TestJWT(unittest.TestCase):
'.iQgKpJ8shetwNMIosNXWBPFB057c2BHs-8t1d2CCM2A')
with self.assertRaises(InvalidAudienceError) as context:
- jwt.decode(example_jwt, secret, audience='my_audience')
+ self.jwt.decode(example_jwt, secret, audience='my_audience')
exception = context.exception
self.assertEqual(str(exception), 'Invalid claim format in token')
@@ -142,15 +139,15 @@ class TestJWT(unittest.TestCase):
types = ['string', tuple(), list(), 42, set()]
for t in types:
- self.assertRaises(TypeError, lambda: jwt.encode(t, 'secret'))
+ self.assertRaises(TypeError, lambda: self.jwt.encode(t, 'secret'))
def test_encode_algorithm_param_should_be_case_sensitive(self):
payload = {'hello': 'world'}
- jwt.encode(payload, 'secret', algorithm='HS256')
+ self.jwt.encode(payload, 'secret', algorithm='HS256')
with self.assertRaises(NotImplementedError) as context:
- jwt.encode(payload, None, algorithm='hs256')
+ self.jwt.encode(payload, None, algorithm='hs256')
exception = context.exception
self.assertEquals(str(exception), 'Algorithm not supported')
@@ -161,7 +158,7 @@ class TestJWT(unittest.TestCase):
'.5R_FEPE7SW2dT9GgIxPgZATjFGXfUDOSwo7TtO_Kd_g')
with self.assertRaises(DecodeError) as context:
- jwt.decode(example_jwt, 'secret')
+ self.jwt.decode(example_jwt, 'secret')
exception = context.exception
self.assertEquals(str(exception), 'Algorithm not supported')
@@ -174,8 +171,8 @@ class TestJWT(unittest.TestCase):
'iat': current_datetime,
'nbf': current_datetime
}
- jwt_message = jwt.encode(payload, secret)
- decoded_payload = jwt.decode(jwt_message, secret, leeway=1)
+ jwt_message = self.jwt.encode(payload, secret)
+ decoded_payload = self.jwt.decode(jwt_message, secret, leeway=1)
self.assertEqual(
decoded_payload['exp'],
@@ -190,11 +187,11 @@ class TestJWT(unittest.TestCase):
def test_bad_secret(self):
right_secret = 'foo'
bad_secret = 'bar'
- jwt_message = jwt.encode(self.payload, right_secret)
+ jwt_message = self.jwt.encode(self.payload, right_secret)
self.assertRaises(
- jwt.DecodeError,
- lambda: jwt.decode(jwt_message, bad_secret))
+ DecodeError,
+ lambda: self.jwt.decode(jwt_message, bad_secret))
def test_decodes_valid_jwt(self):
example_payload = {'hello': 'world'}
@@ -203,7 +200,7 @@ class TestJWT(unittest.TestCase):
b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
b'.eyJoZWxsbyI6ICJ3b3JsZCJ9'
b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
- decoded_payload = jwt.decode(example_jwt, example_secret)
+ decoded_payload = self.jwt.decode(example_jwt, example_secret)
self.assertEqual(decoded_payload, example_payload)
@@ -224,7 +221,7 @@ class TestJWT(unittest.TestCase):
b'KZzxhXAJCAPLPuJoKyAixFnXPBkvkti-UzSIj4s6DePe'
b'uTu7102G_QIXiijY5bx6mdmZa3xUuKeu-zobOIOqR8Zw'
b'FqGjBLZum')
- decoded_payload = jwt.decode(example_jwt, example_pubkey)
+ decoded_payload = self.jwt.decode(example_jwt, example_pubkey)
self.assertEqual(decoded_payload, example_payload)
@@ -248,7 +245,7 @@ class TestJWT(unittest.TestCase):
b'urHRJDJHTqIdpLWXkY7zVikeen6FhuGyn060Dz9gYq9t'
b'uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr'
b'qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A')
- decoded_payload = jwt.decode(example_jwt, example_pubkey)
+ decoded_payload = self.jwt.decode(example_jwt, example_pubkey)
self.assertEqual(decoded_payload, example_payload)
@@ -260,99 +257,101 @@ class TestJWT(unittest.TestCase):
b'.eyJoZWxsbyI6ICJ3b3JsZCJ9'
b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
- decoded_payload, signing, header, signature = jwt_load(example_jwt)
+ decoded_payload, signing, header, signature = self.jwt._load(example_jwt)
- jwt_verify_signature(decoded_payload, signing, header,
+ self.jwt._verify_signature(decoded_payload, signing, header,
signature, example_secret)
self.assertEqual(decoded_payload, example_payload)
def test_allow_skip_verification(self):
right_secret = 'foo'
- jwt_message = jwt.encode(self.payload, right_secret)
- decoded_payload = jwt.decode(jwt_message, verify=False)
+ jwt_message = self.jwt.encode(self.payload, right_secret)
+ decoded_payload = self.jwt.decode(jwt_message, verify=False)
self.assertEqual(decoded_payload, self.payload)
def test_load_no_verification(self):
right_secret = 'foo'
- jwt_message = jwt.encode(self.payload, right_secret)
- decoded_payload, signing, header, signature = jwt_load(jwt_message)
+ jwt_message = self.jwt.encode(self.payload, right_secret)
+
+ decoded_payload, signing, header, signature = self.jwt._load(jwt_message)
self.assertEqual(decoded_payload, self.payload)
def test_no_secret(self):
right_secret = 'foo'
- jwt_message = jwt.encode(self.payload, right_secret)
+ jwt_message = self.jwt.encode(self.payload, right_secret)
self.assertRaises(
- jwt.DecodeError,
- lambda: jwt.decode(jwt_message))
+ DecodeError,
+ lambda: self.jwt.decode(jwt_message))
def test_verify_signature_no_secret(self):
right_secret = 'foo'
- jwt_message = jwt.encode(self.payload, right_secret)
- decoded_payload, signing, header, signature = jwt_load(jwt_message)
+ jwt_message = self.jwt.encode(self.payload, right_secret)
+
+ decoded_payload, signing, header, signature = self.jwt._load(jwt_message)
self.assertRaises(
- jwt.DecodeError,
- lambda: jwt_verify_signature(decoded_payload, signing,
+ DecodeError,
+ lambda: self.jwt._verify_signature(decoded_payload, signing,
header, signature))
def test_custom_headers(self):
right_secret = 'foo'
headers = {'foo': 'bar', 'kid': 'test'}
- jwt_message = jwt.encode(self.payload, right_secret, headers=headers)
- decoded_payload, signing, header, signature = jwt_load(jwt_message)
+ jwt_message = self.jwt.encode(self.payload, right_secret, headers=headers)
+
+ decoded_payload, signing, header, signature = self.jwt._load(jwt_message)
for key, value in headers.items():
self.assertEqual(header[key], value)
def test_invalid_crypto_alg(self):
- self.assertRaises(NotImplementedError, jwt.encode, self.payload,
+ self.assertRaises(NotImplementedError, self.jwt.encode, self.payload,
'secret', 'HS1024')
def test_unicode_secret(self):
secret = '\xc2'
- jwt_message = jwt.encode(self.payload, secret)
-
- decoded_payload = jwt.decode(jwt_message, secret)
+ jwt_message = self.jwt.encode(self.payload, secret)
+ decoded_payload = self.jwt.decode(jwt_message, secret)
self.assertEqual(decoded_payload, self.payload)
- decoded_payload, signing, header, signature = jwt_load(jwt_message)
+ decoded_payload, signing, header, signature = self.jwt._load(jwt_message)
- jwt_verify_signature(decoded_payload, signing, header,
+ self.jwt._verify_signature(decoded_payload, signing, header,
signature, secret)
self.assertEqual(decoded_payload, self.payload)
def test_nonascii_secret(self):
secret = '\xc2' # char value that ascii codec cannot decode
- jwt_message = jwt.encode(self.payload, secret)
+ jwt_message = self.jwt.encode(self.payload, secret)
- decoded_payload = jwt.decode(jwt_message, secret)
+ decoded_payload = self.jwt.decode(jwt_message, secret)
self.assertEqual(decoded_payload, self.payload)
- decoded_payload, signing, header, signature = jwt_load(jwt_message)
+ decoded_payload, signing, header, signature = self.jwt._load(jwt_message)
- jwt_verify_signature(decoded_payload, signing,
+ self.jwt._verify_signature(decoded_payload, signing,
header, signature, secret)
self.assertEqual(decoded_payload, self.payload)
def test_bytes_secret(self):
secret = b'\xc2' # char value that ascii codec cannot decode
- jwt_message = jwt.encode(self.payload, secret)
+ jwt_message = self.jwt.encode(self.payload, secret)
- decoded_payload = jwt.decode(jwt_message, secret)
+ decoded_payload = self.jwt.decode(jwt_message, secret)
self.assertEqual(decoded_payload, self.payload)
- decoded_payload, signing, header, signature = jwt_load(jwt_message)
+ decoded_payload, signing, header, signature = self.jwt._load(jwt_message)
- jwt_verify_signature(decoded_payload, signing,
+ self.jwt._verify_signature(decoded_payload, signing,
header, signature, secret)
self.assertEqual(decoded_payload, self.payload)
@@ -364,10 +363,10 @@ class TestJWT(unittest.TestCase):
'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
'.eyJoZWxsbyI6ICJ3b3JsZCJ9'
'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
- decoded_payload = jwt.decode(example_jwt, example_secret)
+ decoded_payload = self.jwt.decode(example_jwt, example_secret)
self.assertEqual(decoded_payload, example_payload)
- decoded_payload, signing, header, signature = jwt_load(example_jwt)
+ decoded_payload, signing, header, signature = self.jwt._load(example_jwt)
self.assertEqual(decoded_payload, example_payload)
def test_decode_invalid_header_padding(self):
@@ -378,12 +377,12 @@ class TestJWT(unittest.TestCase):
example_secret = 'secret'
self.assertRaises(
- jwt.DecodeError,
- lambda: jwt_load(example_jwt))
+ DecodeError,
+ lambda: self.jwt._load(example_jwt))
self.assertRaises(
- jwt.DecodeError,
- lambda: jwt.decode(example_jwt, example_secret))
+ DecodeError,
+ lambda: self.jwt.decode(example_jwt, example_secret))
def test_decode_invalid_header_string(self):
example_jwt = (
@@ -393,15 +392,15 @@ class TestJWT(unittest.TestCase):
example_secret = 'secret'
try:
- jwt_load(example_jwt)
- except jwt.DecodeError as e:
+ self.jwt._load(example_jwt)
+ except DecodeError as e:
self.assertTrue('Invalid header string' in str(e))
else:
self.fail('DecodeError not raised')
try:
- jwt.decode(example_jwt, example_secret)
- except jwt.DecodeError as e:
+ self.jwt.decode(example_jwt, example_secret)
+ except DecodeError as e:
self.assertTrue('Invalid header string' in str(e))
else:
self.fail('DecodeError not raised')
@@ -414,12 +413,12 @@ class TestJWT(unittest.TestCase):
example_secret = 'secret'
self.assertRaises(
- jwt.DecodeError,
- lambda: jwt_load(example_jwt))
+ DecodeError,
+ lambda: self.jwt._load(example_jwt))
self.assertRaises(
- jwt.DecodeError,
- lambda: jwt.decode(example_jwt, example_secret))
+ DecodeError,
+ lambda: self.jwt.decode(example_jwt, example_secret))
def test_decode_invalid_payload_string(self):
example_jwt = (
@@ -429,15 +428,15 @@ class TestJWT(unittest.TestCase):
example_secret = 'secret'
try:
- jwt_load(example_jwt)
- except jwt.DecodeError as e:
+ self.jwt._load(example_jwt)
+ except DecodeError as e:
self.assertTrue('Invalid payload string' in str(e))
else:
self.fail('DecodeError not raised')
try:
- jwt.decode(example_jwt, example_secret)
- except jwt.DecodeError as e:
+ self.jwt.decode(example_jwt, example_secret)
+ except DecodeError as e:
self.assertTrue('Invalid payload string' in str(e))
else:
self.fail('DecodeError not raised')
@@ -450,124 +449,124 @@ class TestJWT(unittest.TestCase):
example_secret = 'secret'
self.assertRaises(
- jwt.DecodeError,
- lambda: jwt_load(example_jwt))
+ DecodeError,
+ lambda: self.jwt._load(example_jwt))
self.assertRaises(
- jwt.DecodeError,
- lambda: jwt.decode(example_jwt, example_secret))
+ DecodeError,
+ lambda: self.jwt.decode(example_jwt, example_secret))
def test_decode_with_expiration(self):
self.payload['exp'] = utc_timestamp() - 1
secret = 'secret'
- jwt_message = jwt.encode(self.payload, secret)
+ jwt_message = self.jwt.encode(self.payload, secret)
self.assertRaises(
jwt.ExpiredSignatureError,
- lambda: jwt.decode(jwt_message, secret))
+ lambda: self.jwt.decode(jwt_message, secret))
- decoded_payload, signing, header, signature = jwt_load(jwt_message)
+ decoded_payload, signing, header, signature = self.jwt._load(jwt_message)
self.assertRaises(
jwt.ExpiredSignatureError,
- lambda: jwt_verify_signature(
+ lambda: self.jwt._verify_signature(
decoded_payload, signing, header, signature, secret))
def test_decode_with_notbefore(self):
self.payload['nbf'] = utc_timestamp() + 10
secret = 'secret'
- jwt_message = jwt.encode(self.payload, secret)
+ jwt_message = self.jwt.encode(self.payload, secret)
self.assertRaises(
jwt.ExpiredSignatureError,
- lambda: jwt.decode(jwt_message, secret))
+ lambda: self.jwt.decode(jwt_message, secret))
- decoded_payload, signing, header, signature = jwt_load(jwt_message)
+ decoded_payload, signing, header, signature = self.jwt._load(jwt_message)
self.assertRaises(
jwt.ExpiredSignatureError,
- lambda: jwt_verify_signature(
+ lambda: self.jwt._verify_signature(
decoded_payload, signing, header, signature, secret))
def test_decode_skip_expiration_verification(self):
self.payload['exp'] = time.time() - 1
secret = 'secret'
- jwt_message = jwt.encode(self.payload, secret)
+ jwt_message = self.jwt.encode(self.payload, secret)
- jwt.decode(jwt_message, secret, verify_expiration=False)
+ self.jwt.decode(jwt_message, secret, verify_expiration=False)
- decoded_payload, signing, header, signature = jwt_load(jwt_message)
- jwt_verify_signature(decoded_payload, signing, header,
+ decoded_payload, signing, header, signature = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(decoded_payload, signing, header,
signature, secret, verify_expiration=False)
def test_decode_skip_notbefore_verification(self):
self.payload['nbf'] = time.time() + 10
secret = 'secret'
- jwt_message = jwt.encode(self.payload, secret)
+ jwt_message = self.jwt.encode(self.payload, secret)
- jwt.decode(jwt_message, secret, verify_expiration=False)
+ self.jwt.decode(jwt_message, secret, verify_expiration=False)
- decoded_payload, signing, header, signature = jwt_load(jwt_message)
- jwt_verify_signature(decoded_payload, signing, header,
+ decoded_payload, signing, header, signature = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(decoded_payload, signing, header,
signature, secret, verify_expiration=False)
def test_decode_with_expiration_with_leeway(self):
self.payload['exp'] = utc_timestamp() - 2
secret = 'secret'
- jwt_message = jwt.encode(self.payload, secret)
+ jwt_message = self.jwt.encode(self.payload, secret)
- decoded_payload, signing, header, signature = jwt_load(jwt_message)
+ decoded_payload, signing, header, signature = self.jwt._load(jwt_message)
# With 3 seconds leeway, should be ok
for leeway in (3, timedelta(seconds=3)):
- jwt.decode(jwt_message, secret, leeway=leeway)
+ self.jwt.decode(jwt_message, secret, leeway=leeway)
- jwt_verify_signature(decoded_payload, signing, header,
+ self.jwt._verify_signature(decoded_payload, signing, header,
signature, secret, leeway=leeway)
# With 1 seconds, should fail
for leeway in (1, timedelta(seconds=1)):
self.assertRaises(
jwt.ExpiredSignatureError,
- lambda: jwt.decode(jwt_message, secret, leeway=leeway))
+ lambda: self.jwt.decode(jwt_message, secret, leeway=leeway))
self.assertRaises(
jwt.ExpiredSignatureError,
- lambda: jwt_verify_signature(decoded_payload, signing,
+ lambda: self.jwt._verify_signature(decoded_payload, signing,
header, signature, secret,
leeway=leeway))
def test_decode_with_notbefore_with_leeway(self):
self.payload['nbf'] = utc_timestamp() + 10
secret = 'secret'
- jwt_message = jwt.encode(self.payload, secret)
+ jwt_message = self.jwt.encode(self.payload, secret)
- decoded_payload, signing, header, signature = jwt_load(jwt_message)
+ decoded_payload, signing, header, signature = self.jwt._load(jwt_message)
# With 13 seconds leeway, should be ok
- jwt.decode(jwt_message, secret, leeway=13)
+ self.jwt.decode(jwt_message, secret, leeway=13)
- jwt_verify_signature(decoded_payload, signing, header,
+ self.jwt._verify_signature(decoded_payload, signing, header,
signature, secret, leeway=13)
# With 1 seconds, should fail
self.assertRaises(
jwt.ExpiredSignatureError,
- lambda: jwt.decode(jwt_message, secret, leeway=1))
+ lambda: self.jwt.decode(jwt_message, secret, leeway=1))
self.assertRaises(
jwt.ExpiredSignatureError,
- lambda: jwt_verify_signature(decoded_payload, signing,
+ lambda: self.jwt._verify_signature(decoded_payload, signing,
header, signature, secret, leeway=1))
def test_encode_decode_with_algo_none(self):
- jwt_message = jwt.encode(self.payload, key=None, algorithm=None)
+ jwt_message = self.jwt.encode(self.payload, key=None, algorithm=None)
self.assertRaises(
- jwt.DecodeError,
- lambda: jwt.decode(jwt_message))
+ DecodeError,
+ lambda: self.jwt.decode(jwt_message))
- jwt.decode(jwt_message, verify=False)
+ self.jwt.decode(jwt_message, verify=False)
@unittest.skipIf(not has_crypto, 'Not supported without cryptography library')
def test_encode_decode_with_rsa_sha256(self):
@@ -575,29 +574,29 @@ class TestJWT(unittest.TestCase):
with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
priv_rsakey = load_pem_private_key(ensure_bytes(rsa_priv_file.read()),
password=None, backend=default_backend())
- jwt_message = jwt.encode(self.payload, priv_rsakey,
+ jwt_message = self.jwt.encode(self.payload, priv_rsakey,
algorithm='RS256')
with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()),
backend=default_backend())
- assert jwt.decode(jwt_message, pub_rsakey)
+ assert self.jwt.decode(jwt_message, pub_rsakey)
- load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_rsakey, *load_output)
+ load_output = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(key=pub_rsakey, *load_output)
# string-formatted key
with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
priv_rsakey = rsa_priv_file.read()
- jwt_message = jwt.encode(self.payload, priv_rsakey,
+ jwt_message = self.jwt.encode(self.payload, priv_rsakey,
algorithm='RS256')
with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
pub_rsakey = rsa_pub_file.read()
- assert jwt.decode(jwt_message, pub_rsakey)
+ assert self.jwt.decode(jwt_message, pub_rsakey)
- load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_rsakey, *load_output)
+ load_output = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(key=pub_rsakey, *load_output)
@unittest.skipIf(not has_crypto, 'Not supported without cryptography library')
def test_encode_decode_with_rsa_sha384(self):
@@ -605,26 +604,26 @@ class TestJWT(unittest.TestCase):
with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
priv_rsakey = load_pem_private_key(ensure_bytes(rsa_priv_file.read()),
password=None, backend=default_backend())
- jwt_message = jwt.encode(self.payload, priv_rsakey,
+ jwt_message = self.jwt.encode(self.payload, priv_rsakey,
algorithm='RS384')
with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()),
backend=default_backend())
- assert jwt.decode(jwt_message, pub_rsakey)
+ assert self.jwt.decode(jwt_message, pub_rsakey)
# string-formatted key
with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
priv_rsakey = rsa_priv_file.read()
- jwt_message = jwt.encode(self.payload, priv_rsakey,
+ jwt_message = self.jwt.encode(self.payload, priv_rsakey,
algorithm='RS384')
with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
pub_rsakey = rsa_pub_file.read()
- assert jwt.decode(jwt_message, pub_rsakey)
+ assert self.jwt.decode(jwt_message, pub_rsakey)
- load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_rsakey, *load_output)
+ load_output = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(key=pub_rsakey, *load_output)
@unittest.skipIf(not has_crypto, 'Not supported without cryptography library')
def test_encode_decode_with_rsa_sha512(self):
@@ -632,31 +631,34 @@ class TestJWT(unittest.TestCase):
with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
priv_rsakey = load_pem_private_key(ensure_bytes(rsa_priv_file.read()),
password=None, backend=default_backend())
- jwt_message = jwt.encode(self.payload, priv_rsakey,
+ jwt_message = self.jwt.encode(self.payload, priv_rsakey,
algorithm='RS512')
with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
pub_rsakey = load_ssh_public_key(ensure_bytes(rsa_pub_file.read()),
backend=default_backend())
- assert jwt.decode(jwt_message, pub_rsakey)
+ assert self.jwt.decode(jwt_message, pub_rsakey)
- load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_rsakey, *load_output)
+ load_output = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(key=pub_rsakey, *load_output)
# string-formatted key
with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
priv_rsakey = rsa_priv_file.read()
- jwt_message = jwt.encode(self.payload, priv_rsakey,
+ jwt_message = self.jwt.encode(self.payload, priv_rsakey,
algorithm='RS512')
with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
pub_rsakey = rsa_pub_file.read()
- assert jwt.decode(jwt_message, pub_rsakey)
+ assert self.jwt.decode(jwt_message, pub_rsakey)
- load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_rsakey, *load_output)
+ load_output = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(key=pub_rsakey, *load_output)
def test_rsa_related_algorithms(self):
+ self.jwt = PyJWT()
+ jwt_algorithms = self.jwt._algorithms
+
if has_crypto:
self.assertTrue('RS256' in jwt_algorithms)
self.assertTrue('RS384' in jwt_algorithms)
@@ -672,29 +674,29 @@ class TestJWT(unittest.TestCase):
with open('tests/keys/testkey_ec', 'r') as ec_priv_file:
priv_eckey = load_pem_private_key(ensure_bytes(ec_priv_file.read()),
password=None, backend=default_backend())
- jwt_message = jwt.encode(self.payload, priv_eckey,
+ jwt_message = self.jwt.encode(self.payload, priv_eckey,
algorithm='ES256')
with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file:
pub_eckey = load_pem_public_key(ensure_bytes(ec_pub_file.read()),
backend=default_backend())
- assert jwt.decode(jwt_message, pub_eckey)
+ assert self.jwt.decode(jwt_message, pub_eckey)
- load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_eckey, *load_output)
+ load_output = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(key=pub_eckey, *load_output)
# string-formatted key
with open('tests/keys/testkey_ec', 'r') as ec_priv_file:
priv_eckey = ec_priv_file.read()
- jwt_message = jwt.encode(self.payload, priv_eckey,
+ jwt_message = self.jwt.encode(self.payload, priv_eckey,
algorithm='ES256')
with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file:
pub_eckey = ec_pub_file.read()
- assert jwt.decode(jwt_message, pub_eckey)
+ assert self.jwt.decode(jwt_message, pub_eckey)
- load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_eckey, *load_output)
+ load_output = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(key=pub_eckey, *load_output)
@unittest.skipIf(not has_crypto, "Can't run without cryptography library")
def test_encode_decode_with_ecdsa_sha384(self):
@@ -703,29 +705,29 @@ class TestJWT(unittest.TestCase):
with open('tests/keys/testkey_ec', 'r') as ec_priv_file:
priv_eckey = load_pem_private_key(ensure_bytes(ec_priv_file.read()),
password=None, backend=default_backend())
- jwt_message = jwt.encode(self.payload, priv_eckey,
+ jwt_message = self.jwt.encode(self.payload, priv_eckey,
algorithm='ES384')
with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file:
pub_eckey = load_pem_public_key(ensure_bytes(ec_pub_file.read()),
backend=default_backend())
- assert jwt.decode(jwt_message, pub_eckey)
+ assert self.jwt.decode(jwt_message, pub_eckey)
- load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_eckey, *load_output)
+ load_output = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(key=pub_eckey, *load_output)
# string-formatted key
with open('tests/keys/testkey_ec', 'r') as ec_priv_file:
priv_eckey = ec_priv_file.read()
- jwt_message = jwt.encode(self.payload, priv_eckey,
+ jwt_message = self.jwt.encode(self.payload, priv_eckey,
algorithm='ES384')
with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file:
pub_eckey = ec_pub_file.read()
- assert jwt.decode(jwt_message, pub_eckey)
+ assert self.jwt.decode(jwt_message, pub_eckey)
- load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_eckey, *load_output)
+ load_output = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(key=pub_eckey, *load_output)
@unittest.skipIf(not has_crypto, "Can't run without cryptography library")
def test_encode_decode_with_ecdsa_sha512(self):
@@ -733,30 +735,33 @@ class TestJWT(unittest.TestCase):
with open('tests/keys/testkey_ec', 'r') as ec_priv_file:
priv_eckey = load_pem_private_key(ensure_bytes(ec_priv_file.read()),
password=None, backend=default_backend())
- jwt_message = jwt.encode(self.payload, priv_eckey,
+ jwt_message = self.jwt.encode(self.payload, priv_eckey,
algorithm='ES512')
with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file:
pub_eckey = load_pem_public_key(ensure_bytes(ec_pub_file.read()), backend=default_backend())
- assert jwt.decode(jwt_message, pub_eckey)
+ assert self.jwt.decode(jwt_message, pub_eckey)
- load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_eckey, *load_output)
+ load_output = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(key=pub_eckey, *load_output)
# string-formatted key
with open('tests/keys/testkey_ec', 'r') as ec_priv_file:
priv_eckey = ec_priv_file.read()
- jwt_message = jwt.encode(self.payload, priv_eckey,
+ jwt_message = self.jwt.encode(self.payload, priv_eckey,
algorithm='ES512')
with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file:
pub_eckey = ec_pub_file.read()
- assert jwt.decode(jwt_message, pub_eckey)
+ assert self.jwt.decode(jwt_message, pub_eckey)
- load_output = jwt_load(jwt_message)
- jwt_verify_signature(key=pub_eckey, *load_output)
+ load_output = self.jwt._load(jwt_message)
+ self.jwt._verify_signature(key=pub_eckey, *load_output)
def test_ecdsa_related_algorithms(self):
+ self.jwt = PyJWT()
+ jwt_algorithms = self.jwt._algorithms
+
if has_crypto:
self.assertTrue('ES256' in jwt_algorithms)
self.assertTrue('ES384' in jwt_algorithms)
@@ -771,8 +776,8 @@ class TestJWT(unittest.TestCase):
'some': 'payload',
'aud': 'urn:me'
}
- token = jwt.encode(payload, 'secret')
- decoded = jwt.decode(token, 'secret', audience='urn:me')
+ token = self.jwt.encode(payload, 'secret')
+ decoded = self.jwt.decode(token, 'secret', audience='urn:me')
self.assertEqual(decoded, payload)
def test_check_audience_in_array(self):
@@ -780,8 +785,8 @@ class TestJWT(unittest.TestCase):
'some': 'payload',
'aud': ['urn:me', 'urn:someone-else']
}
- token = jwt.encode(payload, 'secret')
- decoded = jwt.decode(token, 'secret', audience='urn:me')
+ token = self.jwt.encode(payload, 'secret')
+ decoded = self.jwt.decode(token, 'secret', audience='urn:me')
self.assertEqual(decoded, payload)
def test_raise_exception_invalid_audience(self):
@@ -789,29 +794,29 @@ class TestJWT(unittest.TestCase):
'some': 'payload',
'aud': 'urn:someone-else'
}
- token = jwt.encode(payload, 'secret')
+ token = self.jwt.encode(payload, 'secret')
self.assertRaises(
jwt.InvalidAudienceError,
- lambda: jwt.decode(token, 'secret', audience='urn-me'))
+ lambda: self.jwt.decode(token, 'secret', audience='urn-me'))
def test_raise_exception_invalid_audience_in_array(self):
payload = {
'some': 'payload',
'aud': ['urn:someone', 'urn:someone-else']
}
- token = jwt.encode(payload, 'secret')
+ token = self.jwt.encode(payload, 'secret')
self.assertRaises(
jwt.InvalidAudienceError,
- lambda: jwt.decode(token, 'secret', audience='urn:me'))
+ lambda: self.jwt.decode(token, 'secret', audience='urn:me'))
def test_raise_exception_token_without_audience(self):
payload = {
'some': 'payload',
}
- token = jwt.encode(payload, 'secret')
+ token = self.jwt.encode(payload, 'secret')
self.assertRaises(
jwt.InvalidAudienceError,
- lambda: jwt.decode(token, 'secret', audience='urn:me'))
+ lambda: self.jwt.decode(token, 'secret', audience='urn:me'))
def test_check_issuer(self):
issuer = 'urn:foo'
@@ -819,8 +824,8 @@ class TestJWT(unittest.TestCase):
'some': 'payload',
'iss': 'urn:foo'
}
- token = jwt.encode(payload, 'secret')
- decoded = jwt.decode(token, 'secret', issuer=issuer)
+ token = self.jwt.encode(payload, 'secret')
+ decoded = self.jwt.decode(token, 'secret', issuer=issuer)
self.assertEqual(decoded, payload)
@@ -832,11 +837,11 @@ class TestJWT(unittest.TestCase):
'iss': 'urn:foo'
}
- token = jwt.encode(payload, 'secret')
+ token = self.jwt.encode(payload, 'secret')
self.assertRaises(
jwt.InvalidIssuerError,
- lambda: jwt.decode(token, 'secret', issuer=issuer))
+ lambda: self.jwt.decode(token, 'secret', issuer=issuer))
def test_raise_exception_token_without_issuer(self):
issuer = 'urn:wrong'
@@ -845,11 +850,11 @@ class TestJWT(unittest.TestCase):
'some': 'payload',
}
- token = jwt.encode(payload, 'secret')
+ token = self.jwt.encode(payload, 'secret')
self.assertRaises(
jwt.InvalidIssuerError,
- lambda: jwt.decode(token, 'secret', issuer=issuer))
+ lambda: self.jwt.decode(token, 'secret', issuer=issuer))
def test_custom_json_encoder(self):
@@ -866,10 +871,10 @@ class TestJWT(unittest.TestCase):
self.assertRaises(
TypeError,
- lambda: jwt.encode(data, 'secret'))
+ lambda: self.jwt.encode(data, 'secret'))
- token = jwt.encode(data, 'secret', json_encoder=CustomJSONEncoder)
- payload = jwt.decode(token, 'secret')
+ token = self.jwt.encode(data, 'secret', json_encoder=CustomJSONEncoder)
+ payload = self.jwt.decode(token, 'secret')
self.assertEqual(payload, {'some_decimal': 'it worked'})