diff options
author | José Padilla <jpadilla@webapplicate.com> | 2019-10-21 22:38:34 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-21 22:38:34 -0400 |
commit | 11ac89474b1179925c76450fcc4b3d2042c45f19 (patch) | |
tree | dda6b15326cab750f5e52ee57e3125bb5d0a7eee /jwt/api_jwt.py | |
parent | ae080f472c913ad94456fd9e10b05ec2d038b7cc (diff) | |
download | pyjwt-11ac89474b1179925c76450fcc4b3d2042c45f19.tar.gz |
DX Tweaks (#450)
* Setup pre-commit hooks
* Run initial `tox -e lint`
* Fix package name
* Fix .travis.yml
Diffstat (limited to 'jwt/api_jwt.py')
-rw-r--r-- | jwt/api_jwt.py | 198 |
1 files changed, 107 insertions, 91 deletions
diff --git a/jwt/api_jwt.py b/jwt/api_jwt.py index 3e280bc..22bfc6a 100644 --- a/jwt/api_jwt.py +++ b/jwt/api_jwt.py @@ -2,103 +2,113 @@ import json import warnings from calendar import timegm from datetime import datetime, timedelta -try: - # import required by mypy to perform type checking, not used for normal execution - from typing import Any, Callable, Dict, List, Optional, Type, Union # NOQA -except ImportError: - pass -from .api_jws import PyJWS from .algorithms import Algorithm, get_default_algorithms # NOQA +from .api_jws import PyJWS from .compat import Iterable, Mapping, string_types from .exceptions import ( - DecodeError, ExpiredSignatureError, ImmatureSignatureError, - InvalidAudienceError, InvalidIssuedAtError, - InvalidIssuerError, MissingRequiredClaimError + DecodeError, + ExpiredSignatureError, + ImmatureSignatureError, + InvalidAudienceError, + InvalidIssuedAtError, + InvalidIssuerError, + MissingRequiredClaimError, ) from .utils import merge_dict +try: + # import required by mypy to perform type checking, not used for normal execution + from typing import Any, Callable, Dict, List, Optional, Type, Union # NOQA +except ImportError: + pass + class PyJWT(PyJWS): - header_type = 'JWT' + header_type = "JWT" @staticmethod def _get_default_options(): # type: () -> Dict[str, bool] return { - 'verify_signature': True, - 'verify_exp': True, - 'verify_nbf': True, - 'verify_iat': True, - 'verify_aud': True, - 'verify_iss': True, - 'require_exp': False, - 'require_iat': False, - 'require_nbf': False + "verify_signature": True, + "verify_exp": True, + "verify_nbf": True, + "verify_iat": True, + "verify_aud": True, + "verify_iss": True, + "require_exp": False, + "require_iat": False, + "require_nbf": False, } - def encode(self, - payload, # type: Union[Dict, bytes] - key, # type: str - algorithm='HS256', # type: str - headers=None, # type: Optional[Dict] - json_encoder=None # type: Optional[Type[json.JSONEncoder]] - ): + def encode( + self, + payload, # type: Union[Dict, bytes] + key, # type: str + algorithm="HS256", # type: str + headers=None, # type: Optional[Dict] + json_encoder=None, # type: Optional[Type[json.JSONEncoder]] + ): # Check that we get a mapping if not isinstance(payload, Mapping): - raise TypeError('Expecting a mapping object, as JWT only supports ' - 'JSON objects as payloads.') + raise TypeError( + "Expecting a mapping object, as JWT only supports " + "JSON objects as payloads." + ) # Payload - for time_claim in ['exp', 'iat', 'nbf']: + for time_claim in ["exp", "iat", "nbf"]: # Convert datetime to a intDate value in known time-format claims if isinstance(payload.get(time_claim), datetime): - payload[time_claim] = timegm(payload[time_claim].utctimetuple()) # type: ignore + payload[time_claim] = timegm( + payload[time_claim].utctimetuple() + ) # type: ignore json_payload = json.dumps( - payload, - separators=(',', ':'), - cls=json_encoder - ).encode('utf-8') + payload, separators=(",", ":"), cls=json_encoder + ).encode("utf-8") return super(PyJWT, self).encode( json_payload, key, algorithm, headers, json_encoder ) - def decode(self, - jwt, # type: str - key='', # type: str - verify=True, # type: bool - algorithms=None, # type: List[str] - options=None, # type: Dict - **kwargs): + def decode( + self, + jwt, # type: str + key="", # type: str + verify=True, # type: bool + algorithms=None, # type: List[str] + options=None, # type: Dict + **kwargs + ): # type: (...) -> Dict[str, Any] if verify and not algorithms: warnings.warn( - 'It is strongly recommended that you pass in a ' + - 'value for the "algorithms" argument when calling decode(). ' + - 'This argument will be mandatory in a future version.', - DeprecationWarning + "It is strongly recommended that you pass in a " + + 'value for the "algorithms" argument when calling decode(). ' + + "This argument will be mandatory in a future version.", + DeprecationWarning, ) payload, _, _, _ = self._load(jwt) if options is None: - options = {'verify_signature': verify} + options = {"verify_signature": verify} else: - options.setdefault('verify_signature', verify) + options.setdefault("verify_signature", verify) decoded = super(PyJWT, self).decode( jwt, key=key, algorithms=algorithms, options=options, **kwargs ) try: - payload = json.loads(decoded.decode('utf-8')) + payload = json.loads(decoded.decode("utf-8")) except ValueError as e: - raise DecodeError('Invalid payload string: %s' % e) + raise DecodeError("Invalid payload string: %s" % e) if not isinstance(payload, dict): - raise DecodeError('Invalid payload string: must be a json object') + raise DecodeError("Invalid payload string: must be a json object") if verify: merged_options = merge_dict(self.options, options) @@ -106,113 +116,119 @@ class PyJWT(PyJWS): return payload - def _validate_claims(self, payload, options, audience=None, issuer=None, - leeway=0, **kwargs): + def _validate_claims( + self, payload, options, audience=None, issuer=None, leeway=0, **kwargs + ): - if 'verify_expiration' in kwargs: - options['verify_exp'] = kwargs.get('verify_expiration', True) - warnings.warn('The verify_expiration parameter is deprecated. ' - 'Please use verify_exp in options instead.', - DeprecationWarning) + if "verify_expiration" in kwargs: + options["verify_exp"] = kwargs.get("verify_expiration", True) + warnings.warn( + "The verify_expiration parameter is deprecated. " + "Please use verify_exp in options instead.", + DeprecationWarning, + ) if isinstance(leeway, timedelta): leeway = leeway.total_seconds() if not isinstance(audience, (string_types, type(None), Iterable)): - raise TypeError('audience must be a string, iterable, or None') + raise TypeError("audience must be a string, iterable, or None") self._validate_required_claims(payload, options) now = timegm(datetime.utcnow().utctimetuple()) - if 'iat' in payload and options.get('verify_iat'): + if "iat" in payload and options.get("verify_iat"): self._validate_iat(payload, now, leeway) - if 'nbf' in payload and options.get('verify_nbf'): + if "nbf" in payload and options.get("verify_nbf"): self._validate_nbf(payload, now, leeway) - if 'exp' in payload and options.get('verify_exp'): + if "exp" in payload and options.get("verify_exp"): self._validate_exp(payload, now, leeway) - if options.get('verify_iss'): + if options.get("verify_iss"): self._validate_iss(payload, issuer) - if options.get('verify_aud'): + if options.get("verify_aud"): self._validate_aud(payload, audience) def _validate_required_claims(self, payload, options): - if options.get('require_exp') and payload.get('exp') is None: - raise MissingRequiredClaimError('exp') + if options.get("require_exp") and payload.get("exp") is None: + raise MissingRequiredClaimError("exp") - if options.get('require_iat') and payload.get('iat') is None: - raise MissingRequiredClaimError('iat') + if options.get("require_iat") and payload.get("iat") is None: + raise MissingRequiredClaimError("iat") - if options.get('require_nbf') and payload.get('nbf') is None: - raise MissingRequiredClaimError('nbf') + if options.get("require_nbf") and payload.get("nbf") is None: + raise MissingRequiredClaimError("nbf") def _validate_iat(self, payload, now, leeway): try: - int(payload['iat']) + int(payload["iat"]) except ValueError: - raise InvalidIssuedAtError('Issued At claim (iat) must be an integer.') + raise InvalidIssuedAtError( + "Issued At claim (iat) must be an integer." + ) def _validate_nbf(self, payload, now, leeway): try: - nbf = int(payload['nbf']) + nbf = int(payload["nbf"]) except ValueError: - raise DecodeError('Not Before claim (nbf) must be an integer.') + raise DecodeError("Not Before claim (nbf) must be an integer.") if nbf > (now + leeway): - raise ImmatureSignatureError('The token is not yet valid (nbf)') + raise ImmatureSignatureError("The token is not yet valid (nbf)") def _validate_exp(self, payload, now, leeway): try: - exp = int(payload['exp']) + exp = int(payload["exp"]) except ValueError: - raise DecodeError('Expiration Time claim (exp) must be an' - ' integer.') + raise DecodeError( + "Expiration Time claim (exp) must be an" " integer." + ) if exp < (now - leeway): - raise ExpiredSignatureError('Signature has expired') + raise ExpiredSignatureError("Signature has expired") def _validate_aud(self, payload, audience): - if audience is None and 'aud' not in payload: + if audience is None and "aud" not in payload: return - if audience is not None and 'aud' not in payload: + if audience is not None and "aud" not in payload: # Application specified an audience, but it could not be # verified since the token does not contain a claim. - raise MissingRequiredClaimError('aud') + raise MissingRequiredClaimError("aud") - if audience is None and 'aud' in payload: + if audience is None and "aud" in payload: # Application did not specify an audience, but # the token has the 'aud' claim - raise InvalidAudienceError('Invalid audience') + raise InvalidAudienceError("Invalid audience") - audience_claims = payload['aud'] + audience_claims = payload["aud"] if isinstance(audience_claims, string_types): audience_claims = [audience_claims] if not isinstance(audience_claims, list): - raise InvalidAudienceError('Invalid claim format in token') + raise InvalidAudienceError("Invalid claim format in token") if any(not isinstance(c, string_types) for c in audience_claims): - raise InvalidAudienceError('Invalid claim format in token') + raise InvalidAudienceError("Invalid claim format in token") if isinstance(audience, string_types): audience = [audience] if not any(aud in audience_claims for aud in audience): - raise InvalidAudienceError('Invalid audience') + raise InvalidAudienceError("Invalid audience") def _validate_iss(self, payload, issuer): if issuer is None: return - if 'iss' not in payload: - raise MissingRequiredClaimError('iss') + if "iss" not in payload: + raise MissingRequiredClaimError("iss") - if payload['iss'] != issuer: - raise InvalidIssuerError('Invalid issuer') + if payload["iss"] != issuer: + raise InvalidIssuerError("Invalid issuer") _jwt_global_obj = PyJWT() |