diff options
author | José Padilla <jpadilla@webapplicate.com> | 2019-10-21 22:38:34 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-21 22:38:34 -0400 |
commit | 11ac89474b1179925c76450fcc4b3d2042c45f19 (patch) | |
tree | dda6b15326cab750f5e52ee57e3125bb5d0a7eee | |
parent | ae080f472c913ad94456fd9e10b05ec2d038b7cc (diff) | |
download | pyjwt-11ac89474b1179925c76450fcc4b3d2042c45f19.tar.gz |
DX Tweaks (#450)
* Setup pre-commit hooks
* Run initial `tox -e lint`
* Fix package name
* Fix .travis.yml
36 files changed, 1737 insertions, 1395 deletions
diff --git a/.coveragerc b/.coveragerc index 886e322..0ad0bd9 100644 --- a/.coveragerc +++ b/.coveragerc @@ -4,4 +4,3 @@ omit = .tox/* setup.py *.egg/* - @@ -0,0 +1,6 @@ +[flake8] +ignore = E203, E266, E501, W503 +max-line-length = 80 +max-complexity = 18 +select = B,C,E,F,W,T4,B9 +exclude = docs/conf.py,.tox diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..2617e46 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,31 @@ +repos: + - repo: https://github.com/psf/black + rev: 19.3b0 + hooks: + - id: black + language_version: python3.7 + + - repo: https://gitlab.com/pycqa/flake8 + rev: 3.7.8 + hooks: + - id: flake8 + language_version: python3.7 + + - repo: https://github.com/asottile/seed-isort-config + rev: v1.9.3 + hooks: + - id: seed-isort-config + + - repo: https://github.com/pre-commit/mirrors-isort + rev: v4.3.21 + hooks: + - id: isort + additional_dependencies: [toml] + language_version: python3.7 + + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.3.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: debug-statements diff --git a/.travis.yml b/.travis.yml index 3edf125..d0123bb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,15 +2,15 @@ language: python matrix: include: - python: 2.7 - env: TOXENV=flake8,py27-crypto,py27-nocrypto,py27-contrib_crypto + env: TOXENV=py27-crypto,py27-nocrypto,py27-contrib_crypto - python: 3.4 - env: TOXENV=flake8,py34-crypto,py34-nocrypto + env: TOXENV=py34-crypto,py34-nocrypto - python: 3.5 - env: TOXENV=flake8,mypy,py35-crypto,py35-nocrypto,py35-contrib_crypto + env: TOXENV=py35-crypto,py35-nocrypto,py35-contrib_crypto - python: 3.6 - env: TOXENV=flake8,mypy,py36-crypto,py36-nocrypto,py36-contrib_crypto + env: TOXENV=py36-crypto,py36-nocrypto,py36-contrib_crypto - python: 3.7 - env: TOXENV=flake8,mypy,py37-crypto,py37-nocrypto,py37-contrib_crypto + env: TOXENV=lint,typing,py37-crypto,py37-nocrypto,py37-contrib_crypto dist: xenial install: - pip install -U pip @@ -25,5 +25,5 @@ Patches and Suggestions - Michael Davis <mike.philip.davis@gmail.com> <mike.davis@workiva.com> - Vinod Gupta <codervinod@gmail.com> - + - Derek Weitzel <djw8605@gmail.com> diff --git a/docs/conf.py b/docs/conf.py index 57236d2..7c897dd 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -12,20 +12,24 @@ # All configuration values have a default; values that are commented out # serve to show the default. -import sys import os -import shlex import re +import shlex +import sys + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +import sphinx_rtd_theme # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. -#sys.path.insert(0, os.path.abspath('.')) +# sys.path.insert(0, os.path.abspath('.')) # -- General configuration ------------------------------------------------ # If your documentation needs a minimal Sphinx version, state it here. -#needs_sphinx = '1.0' +# needs_sphinx = '1.0' # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom @@ -33,23 +37,23 @@ import re extensions = [] # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # The suffix(es) of source filenames. # You can specify multiple suffix as a list of string: # source_suffix = ['.rst', '.md'] -source_suffix = '.rst' +source_suffix = ".rst" # The encoding of source files. -#source_encoding = 'utf-8-sig' +# source_encoding = 'utf-8-sig' # The master toctree document. -master_doc = 'index' +master_doc = "index" # General information about the project. -project = u'PyJWT' -copyright = u'2015, José Padilla' -author = u'José Padilla' +project = u"PyJWT" +copyright = u"2015, José Padilla" +author = u"José Padilla" # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the @@ -60,11 +64,12 @@ def get_version(package): """ Return package version as listed in `__version__` in `init.py`. """ - with open(os.path.join('..', package, '__init__.py'), 'rb') as init_py: - src = init_py.read().decode('utf-8') + with open(os.path.join("..", package, "__init__.py"), "rb") as init_py: + src = init_py.read().decode("utf-8") return re.search("__version__ = ['\"]([^'\"]+)['\"]", src).group(1) -version = get_version('jwt') + +version = get_version("jwt") # The full version, including alpha/beta/rc tags. release = version @@ -77,37 +82,37 @@ language = None # There are two options for replacing |today|: either, you set today to some # non-false value, then it is used: -#today = '' +# today = '' # Else, today_fmt is used as the format for a strftime call. -#today_fmt = '%B %d, %Y' +# today_fmt = '%B %d, %Y' # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. -exclude_patterns = ['_build'] +exclude_patterns = ["_build"] # The reST default role (used for this markup: `text`) to use for all # documents. -#default_role = None +# default_role = None # If true, '()' will be appended to :func: etc. cross-reference text. -#add_function_parentheses = True +# add_function_parentheses = True # If true, the current module name will be prepended to all description # unit titles (such as .. function::). -#add_module_names = True +# add_module_names = True # If true, sectionauthor and moduleauthor directives will be shown in the # output. They are ignored by default. -#show_authors = False +# show_authors = False # The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' +pygments_style = "sphinx" # A list of ignored prefixes for module index sorting. -#modindex_common_prefix = [] +# modindex_common_prefix = [] # If true, keep warnings as "system message" paragraphs in the built documents. -#keep_warnings = False +# keep_warnings = False # If true, `todo` and `todoList` produce output, else they produce nothing. todo_include_todos = False @@ -115,9 +120,6 @@ todo_include_todos = False # -- Options for HTML output ---------------------------------------------- -# The theme to use for HTML and HTML Help pages. See the documentation for -# a list of builtin themes. -import sphinx_rtd_theme html_theme = "sphinx_rtd_theme" @@ -126,158 +128,157 @@ html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the # documentation. -#html_theme_options = {} +# html_theme_options = {} # Add any paths that contain custom themes here, relative to this directory. -#html_theme_path = [] +# html_theme_path = [] # The name for this set of Sphinx documents. If None, it defaults to # "<project> v<release> documentation". -#html_title = None +# html_title = None # A shorter title for the navigation bar. Default is the same as html_title. -#html_short_title = None +# html_short_title = None # The name of an image file (relative to this directory) to place at the top # of the sidebar. -#html_logo = None +# html_logo = None # The name of an image file (within the static path) to use as favicon of the # docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 # pixels large. -#html_favicon = None +# html_favicon = None # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = ["_static"] html_context = { - 'extra_css_files': [ + "extra_css_files": [ # override wide tables in RTD theme - '_static/theme_overrides.css', - ], + "_static/theme_overrides.css" + ] } # Add any extra paths that contain custom files (such as robots.txt or # .htaccess) here, relative to this directory. These files are copied # directly to the root of the documentation. -#html_extra_path = [] +# html_extra_path = [] # If not '', a 'Last updated on:' timestamp is inserted at every page bottom, # using the given strftime format. -#html_last_updated_fmt = '%b %d, %Y' +# html_last_updated_fmt = '%b %d, %Y' # If true, SmartyPants will be used to convert quotes and dashes to # typographically correct entities. -#html_use_smartypants = True +# html_use_smartypants = True # Custom sidebar templates, maps document names to template names. -#html_sidebars = {} +# html_sidebars = {} # Additional templates that should be rendered to pages, maps page names to # template names. -#html_additional_pages = {} +# html_additional_pages = {} # If false, no module index is generated. -#html_domain_indices = True +# html_domain_indices = True # If false, no index is generated. -#html_use_index = True +# html_use_index = True # If true, the index is split into individual pages for each letter. -#html_split_index = False +# html_split_index = False # If true, links to the reST sources are added to the pages. -#html_show_sourcelink = True +# html_show_sourcelink = True # If true, "Created using Sphinx" is shown in the HTML footer. Default is True. -#html_show_sphinx = True +# html_show_sphinx = True # If true, "(C) Copyright ..." is shown in the HTML footer. Default is True. -#html_show_copyright = True +# html_show_copyright = True # If true, an OpenSearch description file will be output, and all pages will # contain a <link> tag referring to it. The value of this option must be the # base URL from which the finished HTML is served. -#html_use_opensearch = '' +# html_use_opensearch = '' # This is the file name suffix for HTML files (e.g. ".xhtml"). -#html_file_suffix = None +# html_file_suffix = None # Language to be used for generating the HTML full-text search index. # Sphinx supports the following languages: # 'da', 'de', 'en', 'es', 'fi', 'fr', 'hu', 'it', 'ja' # 'nl', 'no', 'pt', 'ro', 'ru', 'sv', 'tr' -#html_search_language = 'en' +# html_search_language = 'en' # A dictionary with options for the search language support, empty by default. # Now only 'ja' uses this config value -#html_search_options = {'type': 'default'} +# html_search_options = {'type': 'default'} # The name of a javascript file (relative to the configuration directory) that # implements a search results scorer. If empty, the default will be used. -#html_search_scorer = 'scorer.js' +# html_search_scorer = 'scorer.js' # Output file base name for HTML help builder. -htmlhelp_basename = 'PyJWTdoc' +htmlhelp_basename = "PyJWTdoc" # -- Options for LaTeX output --------------------------------------------- latex_elements = { -# The paper size ('letterpaper' or 'a4paper'). -#'papersize': 'letterpaper', - -# The font size ('10pt', '11pt' or '12pt'). -#'pointsize': '10pt', - -# Additional stuff for the LaTeX preamble. -#'preamble': '', - -# Latex figure (float) alignment -#'figure_align': 'htbp', + # The paper size ('letterpaper' or 'a4paper'). + #'papersize': 'letterpaper', + # The font size ('10pt', '11pt' or '12pt'). + #'pointsize': '10pt', + # Additional stuff for the LaTeX preamble. + #'preamble': '', + # Latex figure (float) alignment + #'figure_align': 'htbp', } # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, 'PyJWT.tex', u'PyJWT Documentation', - u'José Padilla', 'manual'), + ( + master_doc, + "PyJWT.tex", + u"PyJWT Documentation", + u"José Padilla", + "manual", + ) ] # The name of an image file (relative to this directory) to place at the top of # the title page. -#latex_logo = None +# latex_logo = None # For "manual" documents, if this is true, then toplevel headings are parts, # not chapters. -#latex_use_parts = False +# latex_use_parts = False # If true, show page references after internal links. -#latex_show_pagerefs = False +# latex_show_pagerefs = False # If true, show URL addresses after external links. -#latex_show_urls = False +# latex_show_urls = False # Documents to append as an appendix to all manuals. -#latex_appendices = [] +# latex_appendices = [] # If false, no module index is generated. -#latex_domain_indices = True +# latex_domain_indices = True # -- Options for manual page output --------------------------------------- # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). -man_pages = [ - (master_doc, 'pyjwt', u'PyJWT Documentation', - [author], 1) -] +man_pages = [(master_doc, "pyjwt", u"PyJWT Documentation", [author], 1)] # If true, show URL addresses after external links. -#man_show_urls = False +# man_show_urls = False # -- Options for Texinfo output ------------------------------------------- @@ -286,19 +287,25 @@ man_pages = [ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, 'PyJWT', u'PyJWT Documentation', - author, 'PyJWT', 'One line description of project.', - 'Miscellaneous'), + ( + master_doc, + "PyJWT", + u"PyJWT Documentation", + author, + "PyJWT", + "One line description of project.", + "Miscellaneous", + ) ] # Documents to append as an appendix to all manuals. -#texinfo_appendices = [] +# texinfo_appendices = [] # If false, no module index is generated. -#texinfo_domain_indices = True +# texinfo_domain_indices = True # How to display URL addresses: 'footnote', 'no', or 'inline'. -#texinfo_show_urls = 'footnote' +# texinfo_show_urls = 'footnote' # If true, do not generate a @detailmenu in the "Top" node's menu. -#texinfo_no_detailmenu = False +# texinfo_no_detailmenu = False diff --git a/docs/installation.rst b/docs/installation.rst index 7e9c220..e423cfb 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -53,7 +53,7 @@ for RSA with SHA256 and EC with SHA256 signatures. jwt.unregister_algorithm('RS256') jwt.unregister_algorithm('ES256') - + jwt.register_algorithm('RS256', RSAAlgorithm(RSAAlgorithm.SHA256)) jwt.register_algorithm('ES256', ECAlgorithm(ECAlgorithm.SHA256)) diff --git a/docs/requirements-docs.txt b/docs/requirements-docs.txt index fedd390..8213302 100644 --- a/docs/requirements-docs.txt +++ b/docs/requirements-docs.txt @@ -1,3 +1,2 @@ sphinx sphinx_rtd_theme - diff --git a/jwt/__init__.py b/jwt/__init__.py index 077b450..e8e1f47 100644 --- a/jwt/__init__.py +++ b/jwt/__init__.py @@ -9,23 +9,35 @@ https://self-issued.info/docs/draft-jones-json-web-token-01.html """ -__title__ = 'pyjwt' -__version__ = '1.7.1' -__author__ = 'José Padilla' -__license__ = 'MIT' -__copyright__ = 'Copyright 2015-2018 José Padilla' +__title__ = "pyjwt" +__version__ = "1.7.1" +__author__ = "José Padilla" +__license__ = "MIT" +__copyright__ = "Copyright 2015-2018 José Padilla" +from .api_jws import PyJWS from .api_jwt import ( - encode, decode, register_algorithm, unregister_algorithm, - get_unverified_header, PyJWT + PyJWT, + decode, + encode, + get_unverified_header, + register_algorithm, + unregister_algorithm, ) -from .api_jws import PyJWS from .exceptions import ( - InvalidTokenError, DecodeError, InvalidAlgorithmError, - InvalidAudienceError, ExpiredSignatureError, ImmatureSignatureError, - InvalidIssuedAtError, InvalidIssuerError, ExpiredSignature, - InvalidAudience, InvalidIssuer, MissingRequiredClaimError, + DecodeError, + ExpiredSignature, + ExpiredSignatureError, + ImmatureSignatureError, + InvalidAlgorithmError, + InvalidAudience, + InvalidAudienceError, + InvalidIssuedAtError, + InvalidIssuer, + InvalidIssuerError, InvalidSignatureError, + InvalidTokenError, + MissingRequiredClaimError, PyJWTError, ) diff --git a/jwt/__main__.py b/jwt/__main__.py index bf50aab..c20a41f 100644 --- a/jwt/__main__.py +++ b/jwt/__main__.py @@ -13,17 +13,19 @@ from . import DecodeError, __version__, decode, encode def encode_payload(args): # Try to encode if args.key is None: - raise ValueError('Key is required when encoding. See --help for usage.') + raise ValueError( + "Key is required when encoding. See --help for usage." + ) # Build payload object to encode payload = {} for arg in args.payload: - k, v = arg.split('=', 1) + k, v = arg.split("=", 1) # exp +offset special case? - if k == 'exp' and v[0] == '+' and len(v) > 1: - v = str(int(time.time()+int(v[1:]))) + if k == "exp" and v[0] == "+" and len(v) > 1: + v = str(int(time.time() + int(v[1:]))) # Cast to integer? if v.isdigit(): @@ -36,20 +38,16 @@ def encode_payload(args): pass # Cast to true, false, or null? - constants = {'true': True, 'false': False, 'null': None} + constants = {"true": True, "false": False, "null": None} if v in constants: v = constants[v] payload[k] = v - token = encode( - payload, - key=args.key, - algorithm=args.algorithm - ) + token = encode(payload, key=args.key, algorithm=args.algorithm) - return token.decode('utf-8') + return token.decode("utf-8") def decode_payload(args): @@ -60,20 +58,20 @@ def decode_payload(args): if sys.stdin.isatty(): token = sys.stdin.readline().strip() else: - raise IOError('Cannot read from stdin: terminal not a TTY') + raise IOError("Cannot read from stdin: terminal not a TTY") - token = token.encode('utf-8') + token = token.encode("utf-8") data = decode(token, key=args.key, verify=args.verify) return json.dumps(data) except DecodeError as e: - raise DecodeError('There was an error decoding the token: %s' % e) + raise DecodeError("There was an error decoding the token: %s" % e) def build_argparser(): - usage = ''' + usage = """ Encodes or decodes JSON Web Tokens based on input. %(prog)s [options] <command> [options] input @@ -90,63 +88,62 @@ def build_argparser(): %(prog)s --key=secret encode foo=bar exp=+10 The exp key is special and can take an offset to current Unix time. - ''' + """ - arg_parser = argparse.ArgumentParser( - prog='pyjwt', - usage=usage - ) + arg_parser = argparse.ArgumentParser(prog="pyjwt", usage=usage) arg_parser.add_argument( - '-v', '--version', - action='version', - version='%(prog)s ' + __version__ + "-v", "--version", action="version", version="%(prog)s " + __version__ ) arg_parser.add_argument( - '--key', - dest='key', - metavar='KEY', + "--key", + dest="key", + metavar="KEY", default=None, - help='set the secret key to sign with' + help="set the secret key to sign with", ) arg_parser.add_argument( - '--alg', - dest='algorithm', - metavar='ALG', - default='HS256', - help='set crypto algorithm to sign with. default=HS256' + "--alg", + dest="algorithm", + metavar="ALG", + default="HS256", + help="set crypto algorithm to sign with. default=HS256", ) subparsers = arg_parser.add_subparsers( - title='PyJWT subcommands', - description='valid subcommands', - help='additional help' + title="PyJWT subcommands", + description="valid subcommands", + help="additional help", ) # Encode subcommand - encode_parser = subparsers.add_parser('encode', help='use to encode a supplied payload') + encode_parser = subparsers.add_parser( + "encode", help="use to encode a supplied payload" + ) payload_help = """Payload to encode. Must be a space separated list of key/value pairs separated by equals (=) sign.""" - encode_parser.add_argument('payload', nargs='+', help=payload_help) + encode_parser.add_argument("payload", nargs="+", help=payload_help) encode_parser.set_defaults(func=encode_payload) # Decode subcommand - decode_parser = subparsers.add_parser('decode', help='use to decode a supplied JSON web token') + decode_parser = subparsers.add_parser( + "decode", help="use to decode a supplied JSON web token" + ) decode_parser.add_argument( - 'token', - help='JSON web token to decode.', - nargs='?') + "token", help="JSON web token to decode.", nargs="?" + ) decode_parser.add_argument( - '-n', '--no-verify', - action='store_false', - dest='verify', + "-n", + "--no-verify", + action="store_false", + dest="verify", default=True, - help='ignore signature and claims verification on decode' + help="ignore signature and claims verification on decode", ) decode_parser.set_defaults(func=decode_payload) @@ -164,5 +161,5 @@ def main(): print(output) except Exception as e: - print('There was an unforseen error: ', e) + print("There was an unforseen error: ", e) arg_parser.print_help() diff --git a/jwt/algorithms.py b/jwt/algorithms.py index 1343688..293a470 100644 --- a/jwt/algorithms.py +++ b/jwt/algorithms.py @@ -2,26 +2,39 @@ import hashlib import hmac import json - from .compat import constant_time_compare, string_types from .exceptions import InvalidKeyError from .utils import ( - base64url_decode, base64url_encode, der_to_raw_signature, - force_bytes, force_unicode, from_base64url_uint, raw_to_der_signature, - to_base64url_uint + base64url_decode, + base64url_encode, + der_to_raw_signature, + force_bytes, + force_unicode, + from_base64url_uint, + raw_to_der_signature, + to_base64url_uint, ) try: from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.serialization import ( - load_pem_private_key, load_pem_public_key, load_ssh_public_key + load_pem_private_key, + load_pem_public_key, + load_ssh_public_key, ) from cryptography.hazmat.primitives.asymmetric.rsa import ( - RSAPrivateKey, RSAPublicKey, RSAPrivateNumbers, RSAPublicNumbers, - rsa_recover_prime_factors, rsa_crt_dmp1, rsa_crt_dmq1, rsa_crt_iqmp + RSAPrivateKey, + RSAPublicKey, + RSAPrivateNumbers, + RSAPublicNumbers, + rsa_recover_prime_factors, + rsa_crt_dmp1, + rsa_crt_dmq1, + rsa_crt_iqmp, ) from cryptography.hazmat.primitives.asymmetric.ec import ( - EllipticCurvePrivateKey, EllipticCurvePublicKey + EllipticCurvePrivateKey, + EllipticCurvePublicKey, ) from cryptography.hazmat.primitives.asymmetric import ec, padding from cryptography.hazmat.backends import default_backend @@ -31,8 +44,20 @@ try: except ImportError: has_crypto = False -requires_cryptography = set(['RS256', 'RS384', 'RS512', 'ES256', 'ES384', - 'ES521', 'ES512', 'PS256', 'PS384', 'PS512']) +requires_cryptography = set( + [ + "RS256", + "RS384", + "RS512", + "ES256", + "ES384", + "ES521", + "ES512", + "PS256", + "PS384", + "PS512", + ] +) def get_default_algorithms(): @@ -40,25 +65,29 @@ def get_default_algorithms(): Returns the algorithms that are implemented by the library. """ default_algorithms = { - 'none': NoneAlgorithm(), - 'HS256': HMACAlgorithm(HMACAlgorithm.SHA256), - 'HS384': HMACAlgorithm(HMACAlgorithm.SHA384), - 'HS512': HMACAlgorithm(HMACAlgorithm.SHA512) + "none": NoneAlgorithm(), + "HS256": HMACAlgorithm(HMACAlgorithm.SHA256), + "HS384": HMACAlgorithm(HMACAlgorithm.SHA384), + "HS512": HMACAlgorithm(HMACAlgorithm.SHA512), } if has_crypto: - default_algorithms.update({ - 'RS256': RSAAlgorithm(RSAAlgorithm.SHA256), - 'RS384': RSAAlgorithm(RSAAlgorithm.SHA384), - 'RS512': RSAAlgorithm(RSAAlgorithm.SHA512), - 'ES256': ECAlgorithm(ECAlgorithm.SHA256), - 'ES384': ECAlgorithm(ECAlgorithm.SHA384), - 'ES521': ECAlgorithm(ECAlgorithm.SHA512), - 'ES512': ECAlgorithm(ECAlgorithm.SHA512), # Backward compat for #219 fix - 'PS256': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256), - 'PS384': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384), - 'PS512': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA512) - }) + default_algorithms.update( + { + "RS256": RSAAlgorithm(RSAAlgorithm.SHA256), + "RS384": RSAAlgorithm(RSAAlgorithm.SHA384), + "RS512": RSAAlgorithm(RSAAlgorithm.SHA512), + "ES256": ECAlgorithm(ECAlgorithm.SHA256), + "ES384": ECAlgorithm(ECAlgorithm.SHA384), + "ES521": ECAlgorithm(ECAlgorithm.SHA512), + "ES512": ECAlgorithm( + ECAlgorithm.SHA512 + ), # Backward compat for #219 fix + "PS256": RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256), + "PS384": RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384), + "PS512": RSAPSSAlgorithm(RSAPSSAlgorithm.SHA512), + } + ) return default_algorithms @@ -67,6 +96,7 @@ class Algorithm(object): """ The interface for an algorithm used to sign and verify tokens. """ + def prepare_key(self, key): """ Performs necessary validation and conversions on the key and returns @@ -108,8 +138,9 @@ class NoneAlgorithm(Algorithm): Placeholder for use when no signing or verification operations are required. """ + def prepare_key(self, key): - if key == '': + if key == "": key = None if key is not None: @@ -118,7 +149,7 @@ class NoneAlgorithm(Algorithm): return key def sign(self, msg, key): - return b'' + return b"" def verify(self, msg, key, sig): return False @@ -129,6 +160,7 @@ class HMACAlgorithm(Algorithm): Performs signing and verification operations using HMAC and the specified hash function. """ + SHA256 = hashlib.sha256 SHA384 = hashlib.sha384 SHA512 = hashlib.sha512 @@ -140,34 +172,37 @@ class HMACAlgorithm(Algorithm): key = force_bytes(key) invalid_strings = [ - b'-----BEGIN PUBLIC KEY-----', - b'-----BEGIN CERTIFICATE-----', - b'-----BEGIN RSA PUBLIC KEY-----', - b'ssh-rsa' + b"-----BEGIN PUBLIC KEY-----", + b"-----BEGIN CERTIFICATE-----", + b"-----BEGIN RSA PUBLIC KEY-----", + b"ssh-rsa", ] if any([string_value in key for string_value in invalid_strings]): raise InvalidKeyError( - 'The specified key is an asymmetric key or x509 certificate and' - ' should not be used as an HMAC secret.') + "The specified key is an asymmetric key or x509 certificate and" + " should not be used as an HMAC secret." + ) return key @staticmethod def to_jwk(key_obj): - return json.dumps({ - 'k': force_unicode(base64url_encode(force_bytes(key_obj))), - 'kty': 'oct' - }) + return json.dumps( + { + "k": force_unicode(base64url_encode(force_bytes(key_obj))), + "kty": "oct", + } + ) @staticmethod def from_jwk(jwk): obj = json.loads(jwk) - if obj.get('kty') != 'oct': - raise InvalidKeyError('Not an HMAC key') + if obj.get("kty") != "oct": + raise InvalidKeyError("Not an HMAC key") - return base64url_decode(obj['k']) + return base64url_decode(obj["k"]) def sign(self, msg, key): return hmac.new(key, msg, self.hash_alg).digest() @@ -176,13 +211,14 @@ class HMACAlgorithm(Algorithm): return constant_time_compare(sig, self.sign(msg, key)) -if has_crypto: +if has_crypto: # noqa: C901 class RSAAlgorithm(Algorithm): """ Performs signing and verification operations using RSASSA-PKCS-v1_5 and the specified hash function. """ + SHA256 = hashes.SHA256 SHA384 = hashes.SHA384 SHA512 = hashes.SHA512 @@ -191,22 +227,25 @@ if has_crypto: self.hash_alg = hash_alg def prepare_key(self, key): - if isinstance(key, RSAPrivateKey) or \ - isinstance(key, RSAPublicKey): + if isinstance(key, RSAPrivateKey) or isinstance(key, RSAPublicKey): return key if isinstance(key, string_types): key = force_bytes(key) try: - if key.startswith(b'ssh-rsa'): - key = load_ssh_public_key(key, backend=default_backend()) + if key.startswith(b"ssh-rsa"): + key = load_ssh_public_key( + key, backend=default_backend() + ) else: - key = load_pem_private_key(key, password=None, backend=default_backend()) + key = load_pem_private_key( + key, password=None, backend=default_backend() + ) except ValueError: key = load_pem_public_key(key, backend=default_backend()) else: - raise TypeError('Expecting a PEM-formatted key.') + raise TypeError("Expecting a PEM-formatted key.") return key @@ -214,35 +253,39 @@ if has_crypto: def to_jwk(key_obj): obj = None - if getattr(key_obj, 'private_numbers', None): + if getattr(key_obj, "private_numbers", None): # Private key numbers = key_obj.private_numbers() obj = { - 'kty': 'RSA', - 'key_ops': ['sign'], - 'n': force_unicode(to_base64url_uint(numbers.public_numbers.n)), - 'e': force_unicode(to_base64url_uint(numbers.public_numbers.e)), - 'd': force_unicode(to_base64url_uint(numbers.d)), - 'p': force_unicode(to_base64url_uint(numbers.p)), - 'q': force_unicode(to_base64url_uint(numbers.q)), - 'dp': force_unicode(to_base64url_uint(numbers.dmp1)), - 'dq': force_unicode(to_base64url_uint(numbers.dmq1)), - 'qi': force_unicode(to_base64url_uint(numbers.iqmp)) + "kty": "RSA", + "key_ops": ["sign"], + "n": force_unicode( + to_base64url_uint(numbers.public_numbers.n) + ), + "e": force_unicode( + to_base64url_uint(numbers.public_numbers.e) + ), + "d": force_unicode(to_base64url_uint(numbers.d)), + "p": force_unicode(to_base64url_uint(numbers.p)), + "q": force_unicode(to_base64url_uint(numbers.q)), + "dp": force_unicode(to_base64url_uint(numbers.dmp1)), + "dq": force_unicode(to_base64url_uint(numbers.dmq1)), + "qi": force_unicode(to_base64url_uint(numbers.iqmp)), } - elif getattr(key_obj, 'verify', None): + elif getattr(key_obj, "verify", None): # Public key numbers = key_obj.public_numbers() obj = { - 'kty': 'RSA', - 'key_ops': ['verify'], - 'n': force_unicode(to_base64url_uint(numbers.n)), - 'e': force_unicode(to_base64url_uint(numbers.e)) + "kty": "RSA", + "key_ops": ["verify"], + "n": force_unicode(to_base64url_uint(numbers.n)), + "e": force_unicode(to_base64url_uint(numbers.e)), } else: - raise InvalidKeyError('Not a public or private key') + raise InvalidKeyError("Not a public or private key") return json.dumps(obj) @@ -251,39 +294,44 @@ if has_crypto: try: obj = json.loads(jwk) except ValueError: - raise InvalidKeyError('Key is not valid JSON') + raise InvalidKeyError("Key is not valid JSON") - if obj.get('kty') != 'RSA': - raise InvalidKeyError('Not an RSA key') + if obj.get("kty") != "RSA": + raise InvalidKeyError("Not an RSA key") - if 'd' in obj and 'e' in obj and 'n' in obj: + if "d" in obj and "e" in obj and "n" in obj: # Private key - if 'oth' in obj: - raise InvalidKeyError('Unsupported RSA private key: > 2 primes not supported') + if "oth" in obj: + raise InvalidKeyError( + "Unsupported RSA private key: > 2 primes not supported" + ) - other_props = ['p', 'q', 'dp', 'dq', 'qi'] + other_props = ["p", "q", "dp", "dq", "qi"] props_found = [prop in obj for prop in other_props] any_props_found = any(props_found) if any_props_found and not all(props_found): - raise InvalidKeyError('RSA key must include all parameters if any are present besides d') + raise InvalidKeyError( + "RSA key must include all parameters if any are present besides d" + ) public_numbers = RSAPublicNumbers( - from_base64url_uint(obj['e']), from_base64url_uint(obj['n']) + from_base64url_uint(obj["e"]), + from_base64url_uint(obj["n"]), ) if any_props_found: numbers = RSAPrivateNumbers( - d=from_base64url_uint(obj['d']), - p=from_base64url_uint(obj['p']), - q=from_base64url_uint(obj['q']), - dmp1=from_base64url_uint(obj['dp']), - dmq1=from_base64url_uint(obj['dq']), - iqmp=from_base64url_uint(obj['qi']), - public_numbers=public_numbers + d=from_base64url_uint(obj["d"]), + p=from_base64url_uint(obj["p"]), + q=from_base64url_uint(obj["q"]), + dmp1=from_base64url_uint(obj["dp"]), + dmq1=from_base64url_uint(obj["dq"]), + iqmp=from_base64url_uint(obj["qi"]), + public_numbers=public_numbers, ) else: - d = from_base64url_uint(obj['d']) + d = from_base64url_uint(obj["d"]) p, q = rsa_recover_prime_factors( public_numbers.n, d, public_numbers.e ) @@ -295,19 +343,20 @@ if has_crypto: dmp1=rsa_crt_dmp1(d, p), dmq1=rsa_crt_dmq1(d, q), iqmp=rsa_crt_iqmp(p, q), - public_numbers=public_numbers + public_numbers=public_numbers, ) return numbers.private_key(default_backend()) - elif 'n' in obj and 'e' in obj: + elif "n" in obj and "e" in obj: # Public key numbers = RSAPublicNumbers( - from_base64url_uint(obj['e']), from_base64url_uint(obj['n']) + from_base64url_uint(obj["e"]), + from_base64url_uint(obj["n"]), ) return numbers.public_key(default_backend()) else: - raise InvalidKeyError('Not a public or private key') + raise InvalidKeyError("Not a public or private key") def sign(self, msg, key): return key.sign(msg, padding.PKCS1v15(), self.hash_alg()) @@ -324,6 +373,7 @@ if has_crypto: Performs signing and verification operations using ECDSA and the specified hash function """ + SHA256 = hashes.SHA256 SHA384 = hashes.SHA384 SHA512 = hashes.SHA512 @@ -332,8 +382,9 @@ if has_crypto: self.hash_alg = hash_alg def prepare_key(self, key): - if isinstance(key, EllipticCurvePrivateKey) or \ - isinstance(key, EllipticCurvePublicKey): + if isinstance(key, EllipticCurvePrivateKey) or isinstance( + key, EllipticCurvePublicKey + ): return key if isinstance(key, string_types): @@ -343,15 +394,21 @@ if has_crypto: # a Signing Key or a Verifying Key, so we try # the Verifying Key first. try: - if key.startswith(b'ecdsa-sha2-'): - key = load_ssh_public_key(key, backend=default_backend()) + if key.startswith(b"ecdsa-sha2-"): + key = load_ssh_public_key( + key, backend=default_backend() + ) else: - key = load_pem_public_key(key, backend=default_backend()) + key = load_pem_public_key( + key, backend=default_backend() + ) except ValueError: - key = load_pem_private_key(key, password=None, backend=default_backend()) + key = load_pem_private_key( + key, password=None, backend=default_backend() + ) else: - raise TypeError('Expecting a PEM-formatted key.') + raise TypeError("Expecting a PEM-formatted key.") return key @@ -382,9 +439,9 @@ if has_crypto: msg, padding.PSS( mgf=padding.MGF1(self.hash_alg()), - salt_length=self.hash_alg.digest_size + salt_length=self.hash_alg.digest_size, ), - self.hash_alg() + self.hash_alg(), ) def verify(self, msg, key, sig): @@ -394,9 +451,9 @@ if has_crypto: msg, padding.PSS( mgf=padding.MGF1(self.hash_alg()), - salt_length=self.hash_alg.digest_size + salt_length=self.hash_alg.digest_size, ), - self.hash_alg() + self.hash_alg(), ) return True except InvalidSignature: diff --git a/jwt/api_jws.py b/jwt/api_jws.py index 19f58d0..9504c9f 100644 --- a/jwt/api_jws.py +++ b/jwt/api_jws.py @@ -1,30 +1,35 @@ import binascii import json import warnings -try: - # import required by mypy to perform type checking, not used for normal execution - from typing import Callable, Dict, List, Optional, Type, Union # NOQA -except ImportError: - pass -from .algorithms import ( - Algorithm, get_default_algorithms, has_crypto, requires_cryptography # NOQA -) +from .algorithms import requires_cryptography # NOQA +from .algorithms import Algorithm, get_default_algorithms, has_crypto from .compat import Mapping, binary_type, string_types, text_type from .exceptions import ( - DecodeError, InvalidAlgorithmError, InvalidSignatureError, - InvalidTokenError + DecodeError, + InvalidAlgorithmError, + InvalidSignatureError, + InvalidTokenError, ) from .utils import base64url_decode, base64url_encode, force_bytes, merge_dict +try: + # import required by mypy to perform type checking, not used for normal execution + from typing import Callable, Dict, List, Optional, Type, Union # NOQA +except ImportError: + pass + class PyJWS(object): - header_typ = 'JWT' + header_typ = "JWT" def __init__(self, algorithms=None, options=None): self._algorithms = get_default_algorithms() - self._valid_algs = (set(algorithms) if algorithms is not None - else set(self._algorithms)) + self._valid_algs = ( + set(algorithms) + if algorithms is not None + else set(self._algorithms) + ) # Remove algorithms that aren't on the whitelist for key in list(self._algorithms.keys()): @@ -38,19 +43,17 @@ class PyJWS(object): @staticmethod def _get_default_options(): - return { - 'verify_signature': True - } + return {"verify_signature": True} def register_algorithm(self, alg_id, alg_obj): """ Registers a new Algorithm for use when creating and verifying tokens. """ if alg_id in self._algorithms: - raise ValueError('Algorithm already has a handler.') + raise ValueError("Algorithm already has a handler.") if not isinstance(alg_obj, Algorithm): - raise TypeError('Object is not of type `Algorithm`') + raise TypeError("Object is not of type `Algorithm`") self._algorithms[alg_id] = alg_obj self._valid_algs.add(alg_id) @@ -61,8 +64,10 @@ class PyJWS(object): Throws KeyError if algorithm is not registered. """ if alg_id not in self._algorithms: - raise KeyError('The specified algorithm could not be removed' - ' because it is not registered.') + raise KeyError( + "The specified algorithm could not be removed" + " because it is not registered." + ) del self._algorithms[alg_id] self._valid_algs.remove(alg_id) @@ -73,41 +78,38 @@ class PyJWS(object): """ return list(self._valid_algs) - def encode(self, - payload, # type: Union[Dict, bytes] - key, # type: str - algorithm='HS256', # type: str - headers=None, # type: Optional[Dict] - json_encoder=None # type: Optional[Type[json.JSONEncoder]] - ): + def encode( + self, + payload, # type: Union[Dict, bytes] + key, # type: str + algorithm="HS256", # type: str + headers=None, # type: Optional[Dict] + json_encoder=None, # type: Optional[Type[json.JSONEncoder]] + ): segments = [] if algorithm is None: - algorithm = 'none' + algorithm = "none" if algorithm not in self._valid_algs: pass # Header - header = {'typ': self.header_typ, 'alg': algorithm} + header = {"typ": self.header_typ, "alg": algorithm} if headers: self._validate_headers(headers) header.update(headers) json_header = force_bytes( - json.dumps( - header, - separators=(',', ':'), - cls=json_encoder - ) + json.dumps(header, separators=(",", ":"), cls=json_encoder) ) segments.append(base64url_encode(json_header)) segments.append(base64url_encode(payload)) # Segments - signing_input = b'.'.join(segments) + signing_input = b".".join(segments) try: alg_obj = self._algorithms[algorithm] key = alg_obj.prepare_key(key) @@ -120,40 +122,46 @@ class PyJWS(object): "installed?" % algorithm ) else: - raise NotImplementedError('Algorithm not supported') + raise NotImplementedError("Algorithm not supported") segments.append(base64url_encode(signature)) - return b'.'.join(segments) + return b".".join(segments) - def decode(self, - jwt, # type: str - key='', # type: str - verify=True, # type: bool - algorithms=None, # type: List[str] - options=None, # type: Dict - **kwargs): + def decode( + self, + jwt, # type: str + key="", # type: str + verify=True, # type: bool + algorithms=None, # type: List[str] + options=None, # type: Dict + **kwargs + ): merged_options = merge_dict(self.options, options) - verify_signature = merged_options['verify_signature'] + verify_signature = merged_options["verify_signature"] if verify_signature and not algorithms: warnings.warn( - 'It is strongly recommended that you pass in a ' + - 'value for the "algorithms" argument when calling decode(). ' + - 'This argument will be mandatory in a future version.', - DeprecationWarning + "It is strongly recommended that you pass in a " + + 'value for the "algorithms" argument when calling decode(). ' + + "This argument will be mandatory in a future version.", + DeprecationWarning, ) payload, signing_input, header, signature = self._load(jwt) if not verify: - warnings.warn('The verify parameter is deprecated. ' - 'Please use verify_signature in options instead.', - DeprecationWarning, stacklevel=2) + warnings.warn( + "The verify parameter is deprecated. " + "Please use verify_signature in options instead.", + DeprecationWarning, + stacklevel=2, + ) elif verify_signature: - self._verify_signature(payload, signing_input, header, signature, - key, algorithms) + self._verify_signature( + payload, signing_input, header, signature, key, algorithms + ) return payload @@ -170,68 +178,78 @@ class PyJWS(object): def _load(self, jwt): if isinstance(jwt, text_type): - jwt = jwt.encode('utf-8') + jwt = jwt.encode("utf-8") if not issubclass(type(jwt), binary_type): - raise DecodeError("Invalid token type. Token must be a {0}".format( - binary_type)) + raise DecodeError( + "Invalid token type. Token must be a {0}".format(binary_type) + ) try: - signing_input, crypto_segment = jwt.rsplit(b'.', 1) - header_segment, payload_segment = signing_input.split(b'.', 1) + signing_input, crypto_segment = jwt.rsplit(b".", 1) + header_segment, payload_segment = signing_input.split(b".", 1) except ValueError: - raise DecodeError('Not enough segments') + raise DecodeError("Not enough segments") try: header_data = base64url_decode(header_segment) except (TypeError, binascii.Error): - raise DecodeError('Invalid header padding') + raise DecodeError("Invalid header padding") try: - header = json.loads(header_data.decode('utf-8')) + header = json.loads(header_data.decode("utf-8")) except ValueError as e: - raise DecodeError('Invalid header string: %s' % e) + raise DecodeError("Invalid header string: %s" % e) if not isinstance(header, Mapping): - raise DecodeError('Invalid header string: must be a json object') + raise DecodeError("Invalid header string: must be a json object") try: payload = base64url_decode(payload_segment) except (TypeError, binascii.Error): - raise DecodeError('Invalid payload padding') + raise DecodeError("Invalid payload padding") try: signature = base64url_decode(crypto_segment) except (TypeError, binascii.Error): - raise DecodeError('Invalid crypto padding') + raise DecodeError("Invalid crypto padding") return (payload, signing_input, header, signature) - def _verify_signature(self, payload, signing_input, header, signature, - key='', algorithms=None): + def _verify_signature( + self, + payload, + signing_input, + header, + signature, + key="", + algorithms=None, + ): - alg = header.get('alg') + alg = header.get("alg") if algorithms is not None and alg not in algorithms: - raise InvalidAlgorithmError('The specified alg value is not allowed') + raise InvalidAlgorithmError( + "The specified alg value is not allowed" + ) try: alg_obj = self._algorithms[alg] key = alg_obj.prepare_key(key) if not alg_obj.verify(signing_input, key, signature): - raise InvalidSignatureError('Signature verification failed') + raise InvalidSignatureError("Signature verification failed") except KeyError: - raise InvalidAlgorithmError('Algorithm not supported') + raise InvalidAlgorithmError("Algorithm not supported") def _validate_headers(self, headers): - if 'kid' in headers: - self._validate_kid(headers['kid']) + if "kid" in headers: + self._validate_kid(headers["kid"]) def _validate_kid(self, kid): if not isinstance(kid, string_types): - raise InvalidTokenError('Key ID header parameter must be a string') + raise InvalidTokenError("Key ID header parameter must be a string") _jws_global_obj = PyJWS() diff --git a/jwt/api_jwt.py b/jwt/api_jwt.py index 3e280bc..22bfc6a 100644 --- a/jwt/api_jwt.py +++ b/jwt/api_jwt.py @@ -2,103 +2,113 @@ import json import warnings from calendar import timegm from datetime import datetime, timedelta -try: - # import required by mypy to perform type checking, not used for normal execution - from typing import Any, Callable, Dict, List, Optional, Type, Union # NOQA -except ImportError: - pass -from .api_jws import PyJWS from .algorithms import Algorithm, get_default_algorithms # NOQA +from .api_jws import PyJWS from .compat import Iterable, Mapping, string_types from .exceptions import ( - DecodeError, ExpiredSignatureError, ImmatureSignatureError, - InvalidAudienceError, InvalidIssuedAtError, - InvalidIssuerError, MissingRequiredClaimError + DecodeError, + ExpiredSignatureError, + ImmatureSignatureError, + InvalidAudienceError, + InvalidIssuedAtError, + InvalidIssuerError, + MissingRequiredClaimError, ) from .utils import merge_dict +try: + # import required by mypy to perform type checking, not used for normal execution + from typing import Any, Callable, Dict, List, Optional, Type, Union # NOQA +except ImportError: + pass + class PyJWT(PyJWS): - header_type = 'JWT' + header_type = "JWT" @staticmethod def _get_default_options(): # type: () -> Dict[str, bool] return { - 'verify_signature': True, - 'verify_exp': True, - 'verify_nbf': True, - 'verify_iat': True, - 'verify_aud': True, - 'verify_iss': True, - 'require_exp': False, - 'require_iat': False, - 'require_nbf': False + "verify_signature": True, + "verify_exp": True, + "verify_nbf": True, + "verify_iat": True, + "verify_aud": True, + "verify_iss": True, + "require_exp": False, + "require_iat": False, + "require_nbf": False, } - def encode(self, - payload, # type: Union[Dict, bytes] - key, # type: str - algorithm='HS256', # type: str - headers=None, # type: Optional[Dict] - json_encoder=None # type: Optional[Type[json.JSONEncoder]] - ): + def encode( + self, + payload, # type: Union[Dict, bytes] + key, # type: str + algorithm="HS256", # type: str + headers=None, # type: Optional[Dict] + json_encoder=None, # type: Optional[Type[json.JSONEncoder]] + ): # Check that we get a mapping if not isinstance(payload, Mapping): - raise TypeError('Expecting a mapping object, as JWT only supports ' - 'JSON objects as payloads.') + raise TypeError( + "Expecting a mapping object, as JWT only supports " + "JSON objects as payloads." + ) # Payload - for time_claim in ['exp', 'iat', 'nbf']: + for time_claim in ["exp", "iat", "nbf"]: # Convert datetime to a intDate value in known time-format claims if isinstance(payload.get(time_claim), datetime): - payload[time_claim] = timegm(payload[time_claim].utctimetuple()) # type: ignore + payload[time_claim] = timegm( + payload[time_claim].utctimetuple() + ) # type: ignore json_payload = json.dumps( - payload, - separators=(',', ':'), - cls=json_encoder - ).encode('utf-8') + payload, separators=(",", ":"), cls=json_encoder + ).encode("utf-8") return super(PyJWT, self).encode( json_payload, key, algorithm, headers, json_encoder ) - def decode(self, - jwt, # type: str - key='', # type: str - verify=True, # type: bool - algorithms=None, # type: List[str] - options=None, # type: Dict - **kwargs): + def decode( + self, + jwt, # type: str + key="", # type: str + verify=True, # type: bool + algorithms=None, # type: List[str] + options=None, # type: Dict + **kwargs + ): # type: (...) -> Dict[str, Any] if verify and not algorithms: warnings.warn( - 'It is strongly recommended that you pass in a ' + - 'value for the "algorithms" argument when calling decode(). ' + - 'This argument will be mandatory in a future version.', - DeprecationWarning + "It is strongly recommended that you pass in a " + + 'value for the "algorithms" argument when calling decode(). ' + + "This argument will be mandatory in a future version.", + DeprecationWarning, ) payload, _, _, _ = self._load(jwt) if options is None: - options = {'verify_signature': verify} + options = {"verify_signature": verify} else: - options.setdefault('verify_signature', verify) + options.setdefault("verify_signature", verify) decoded = super(PyJWT, self).decode( jwt, key=key, algorithms=algorithms, options=options, **kwargs ) try: - payload = json.loads(decoded.decode('utf-8')) + payload = json.loads(decoded.decode("utf-8")) except ValueError as e: - raise DecodeError('Invalid payload string: %s' % e) + raise DecodeError("Invalid payload string: %s" % e) if not isinstance(payload, dict): - raise DecodeError('Invalid payload string: must be a json object') + raise DecodeError("Invalid payload string: must be a json object") if verify: merged_options = merge_dict(self.options, options) @@ -106,113 +116,119 @@ class PyJWT(PyJWS): return payload - def _validate_claims(self, payload, options, audience=None, issuer=None, - leeway=0, **kwargs): + def _validate_claims( + self, payload, options, audience=None, issuer=None, leeway=0, **kwargs + ): - if 'verify_expiration' in kwargs: - options['verify_exp'] = kwargs.get('verify_expiration', True) - warnings.warn('The verify_expiration parameter is deprecated. ' - 'Please use verify_exp in options instead.', - DeprecationWarning) + if "verify_expiration" in kwargs: + options["verify_exp"] = kwargs.get("verify_expiration", True) + warnings.warn( + "The verify_expiration parameter is deprecated. " + "Please use verify_exp in options instead.", + DeprecationWarning, + ) if isinstance(leeway, timedelta): leeway = leeway.total_seconds() if not isinstance(audience, (string_types, type(None), Iterable)): - raise TypeError('audience must be a string, iterable, or None') + raise TypeError("audience must be a string, iterable, or None") self._validate_required_claims(payload, options) now = timegm(datetime.utcnow().utctimetuple()) - if 'iat' in payload and options.get('verify_iat'): + if "iat" in payload and options.get("verify_iat"): self._validate_iat(payload, now, leeway) - if 'nbf' in payload and options.get('verify_nbf'): + if "nbf" in payload and options.get("verify_nbf"): self._validate_nbf(payload, now, leeway) - if 'exp' in payload and options.get('verify_exp'): + if "exp" in payload and options.get("verify_exp"): self._validate_exp(payload, now, leeway) - if options.get('verify_iss'): + if options.get("verify_iss"): self._validate_iss(payload, issuer) - if options.get('verify_aud'): + if options.get("verify_aud"): self._validate_aud(payload, audience) def _validate_required_claims(self, payload, options): - if options.get('require_exp') and payload.get('exp') is None: - raise MissingRequiredClaimError('exp') + if options.get("require_exp") and payload.get("exp") is None: + raise MissingRequiredClaimError("exp") - if options.get('require_iat') and payload.get('iat') is None: - raise MissingRequiredClaimError('iat') + if options.get("require_iat") and payload.get("iat") is None: + raise MissingRequiredClaimError("iat") - if options.get('require_nbf') and payload.get('nbf') is None: - raise MissingRequiredClaimError('nbf') + if options.get("require_nbf") and payload.get("nbf") is None: + raise MissingRequiredClaimError("nbf") def _validate_iat(self, payload, now, leeway): try: - int(payload['iat']) + int(payload["iat"]) except ValueError: - raise InvalidIssuedAtError('Issued At claim (iat) must be an integer.') + raise InvalidIssuedAtError( + "Issued At claim (iat) must be an integer." + ) def _validate_nbf(self, payload, now, leeway): try: - nbf = int(payload['nbf']) + nbf = int(payload["nbf"]) except ValueError: - raise DecodeError('Not Before claim (nbf) must be an integer.') + raise DecodeError("Not Before claim (nbf) must be an integer.") if nbf > (now + leeway): - raise ImmatureSignatureError('The token is not yet valid (nbf)') + raise ImmatureSignatureError("The token is not yet valid (nbf)") def _validate_exp(self, payload, now, leeway): try: - exp = int(payload['exp']) + exp = int(payload["exp"]) except ValueError: - raise DecodeError('Expiration Time claim (exp) must be an' - ' integer.') + raise DecodeError( + "Expiration Time claim (exp) must be an" " integer." + ) if exp < (now - leeway): - raise ExpiredSignatureError('Signature has expired') + raise ExpiredSignatureError("Signature has expired") def _validate_aud(self, payload, audience): - if audience is None and 'aud' not in payload: + if audience is None and "aud" not in payload: return - if audience is not None and 'aud' not in payload: + if audience is not None and "aud" not in payload: # Application specified an audience, but it could not be # verified since the token does not contain a claim. - raise MissingRequiredClaimError('aud') + raise MissingRequiredClaimError("aud") - if audience is None and 'aud' in payload: + if audience is None and "aud" in payload: # Application did not specify an audience, but # the token has the 'aud' claim - raise InvalidAudienceError('Invalid audience') + raise InvalidAudienceError("Invalid audience") - audience_claims = payload['aud'] + audience_claims = payload["aud"] if isinstance(audience_claims, string_types): audience_claims = [audience_claims] if not isinstance(audience_claims, list): - raise InvalidAudienceError('Invalid claim format in token') + raise InvalidAudienceError("Invalid claim format in token") if any(not isinstance(c, string_types) for c in audience_claims): - raise InvalidAudienceError('Invalid claim format in token') + raise InvalidAudienceError("Invalid claim format in token") if isinstance(audience, string_types): audience = [audience] if not any(aud in audience_claims for aud in audience): - raise InvalidAudienceError('Invalid audience') + raise InvalidAudienceError("Invalid audience") def _validate_iss(self, payload, issuer): if issuer is None: return - if 'iss' not in payload: - raise MissingRequiredClaimError('iss') + if "iss" not in payload: + raise MissingRequiredClaimError("iss") - if payload['iss'] != issuer: - raise InvalidIssuerError('Invalid issuer') + if payload["iss"] != issuer: + raise InvalidIssuerError("Invalid issuer") _jwt_global_obj = PyJWT() diff --git a/jwt/compat.py b/jwt/compat.py index e79e258..918ff4a 100644 --- a/jwt/compat.py +++ b/jwt/compat.py @@ -7,7 +7,6 @@ import hmac import struct import sys - PY3 = sys.version_info[0] == 3 @@ -46,8 +45,10 @@ except AttributeError: return result == 0 + # Use int.to_bytes if it exists (Python 3) -if getattr(int, 'to_bytes', None): +if getattr(int, "to_bytes", None): + def bytes_from_int(val): remaining = val byte_length = 0 @@ -56,8 +57,11 @@ if getattr(int, 'to_bytes', None): remaining = remaining >> 8 byte_length += 1 - return val.to_bytes(byte_length, 'big', signed=False) + return val.to_bytes(byte_length, "big", signed=False) + + else: + def bytes_from_int(val): buf = [] while val: @@ -65,4 +69,4 @@ else: buf.append(remainder) buf.reverse() - return struct.pack('%sB' % len(buf), *buf) + return struct.pack("%sB" % len(buf), *buf) diff --git a/jwt/contrib/algorithms/py_ecdsa.py b/jwt/contrib/algorithms/py_ecdsa.py index f1170a6..5b878f5 100644 --- a/jwt/contrib/algorithms/py_ecdsa.py +++ b/jwt/contrib/algorithms/py_ecdsa.py @@ -18,6 +18,7 @@ class ECAlgorithm(Algorithm): This is based off of the implementation in PyJWT 0.3.2 """ + SHA256 = hashlib.sha256 SHA384 = hashlib.sha384 SHA512 = hashlib.sha512 @@ -27,13 +28,14 @@ class ECAlgorithm(Algorithm): def prepare_key(self, key): - if isinstance(key, ecdsa.SigningKey) or \ - isinstance(key, ecdsa.VerifyingKey): + if isinstance(key, ecdsa.SigningKey) or isinstance( + key, ecdsa.VerifyingKey + ): return key if isinstance(key, string_types): if isinstance(key, text_type): - key = key.encode('utf-8') + key = key.encode("utf-8") # Attempt to load key. We don't know if it's # a Signing Key or a Verifying Key, so we try @@ -44,18 +46,23 @@ class ECAlgorithm(Algorithm): key = ecdsa.SigningKey.from_pem(key) else: - raise TypeError('Expecting a PEM-formatted key.') + raise TypeError("Expecting a PEM-formatted key.") return key def sign(self, msg, key): - return key.sign(msg, hashfunc=self.hash_alg, - sigencode=ecdsa.util.sigencode_string) + return key.sign( + msg, hashfunc=self.hash_alg, sigencode=ecdsa.util.sigencode_string + ) def verify(self, msg, key, sig): try: - return key.verify(sig, msg, hashfunc=self.hash_alg, - sigdecode=ecdsa.util.sigdecode_string) + return key.verify( + sig, + msg, + hashfunc=self.hash_alg, + sigdecode=ecdsa.util.sigdecode_string, + ) # ecdsa <= 0.13.2 raises AssertionError on too long signatures, # ecdsa >= 0.13.3 raises BadSignatureError for verification errors. except (AssertionError, ecdsa.BadSignatureError): diff --git a/jwt/contrib/algorithms/pycrypto.py b/jwt/contrib/algorithms/pycrypto.py index e49cdbf..d58e907 100644 --- a/jwt/contrib/algorithms/pycrypto.py +++ b/jwt/contrib/algorithms/pycrypto.py @@ -17,6 +17,7 @@ class RSAAlgorithm(Algorithm): This is based off of the implementation in PyJWT 0.3.2 """ + SHA256 = Crypto.Hash.SHA256 SHA384 = Crypto.Hash.SHA384 SHA512 = Crypto.Hash.SHA512 @@ -31,11 +32,11 @@ class RSAAlgorithm(Algorithm): if isinstance(key, string_types): if isinstance(key, text_type): - key = key.encode('utf-8') + key = key.encode("utf-8") key = RSA.importKey(key) else: - raise TypeError('Expecting a PEM- or RSA-formatted key.') + raise TypeError("Expecting a PEM- or RSA-formatted key.") return key diff --git a/jwt/exceptions.py b/jwt/exceptions.py index 2a6aa59..cd2ca2a 100644 --- a/jwt/exceptions.py +++ b/jwt/exceptions.py @@ -2,6 +2,7 @@ class PyJWTError(Exception): """ Base class for all exceptions """ + pass diff --git a/jwt/help.py b/jwt/help.py index 573088d..0639cb6 100644 --- a/jwt/help.py +++ b/jwt/help.py @@ -23,7 +23,10 @@ def info(): Based on the requests package help utility module. """ try: - platform_info = {"system": platform.system(), "release": platform.release()} + platform_info = { + "system": platform.system(), + "release": platform.release(), + } except IOError: platform_info = {"system": "Unknown", "release": "Unknown"} @@ -46,7 +49,10 @@ def info(): return { "platform": platform_info, - "implementation": {"name": implementation, "version": implementation_version}, + "implementation": { + "name": implementation, + "version": implementation_version, + }, "cryptography": {"version": getattr(cryptography, "__version__", "")}, "pyjwt": {"version": pyjwt_version}, } diff --git a/jwt/utils.py b/jwt/utils.py index b33c7a2..cc7f56c 100644 --- a/jwt/utils.py +++ b/jwt/utils.py @@ -6,7 +6,8 @@ from .compat import binary_type, bytes_from_int, text_type try: from cryptography.hazmat.primitives.asymmetric.utils import ( - decode_dss_signature, encode_dss_signature + decode_dss_signature, + encode_dss_signature, ) except ImportError: pass @@ -14,58 +15,58 @@ except ImportError: def force_unicode(value): if isinstance(value, binary_type): - return value.decode('utf-8') + return value.decode("utf-8") elif isinstance(value, text_type): return value else: - raise TypeError('Expected a string value') + raise TypeError("Expected a string value") def force_bytes(value): if isinstance(value, text_type): - return value.encode('utf-8') + return value.encode("utf-8") elif isinstance(value, binary_type): return value else: - raise TypeError('Expected a string value') + raise TypeError("Expected a string value") def base64url_decode(input): if isinstance(input, text_type): - input = input.encode('ascii') + input = input.encode("ascii") rem = len(input) % 4 if rem > 0: - input += b'=' * (4 - rem) + input += b"=" * (4 - rem) return base64.urlsafe_b64decode(input) def base64url_encode(input): - return base64.urlsafe_b64encode(input).replace(b'=', b'') + return base64.urlsafe_b64encode(input).replace(b"=", b"") def to_base64url_uint(val): if val < 0: - raise ValueError('Must be a positive integer') + raise ValueError("Must be a positive integer") int_bytes = bytes_from_int(val) if len(int_bytes) == 0: - int_bytes = b'\x00' + int_bytes = b"\x00" return base64url_encode(int_bytes) def from_base64url_uint(val): if isinstance(val, text_type): - val = val.encode('ascii') + val = val.encode("ascii") data = base64url_decode(val) - buf = struct.unpack('%sB' % len(data), data) - return int(''.join(["%02x" % byte for byte in buf]), 16) + buf = struct.unpack("%sB" % len(data), data) + return int("".join(["%02x" % byte for byte in buf]), 16) def merge_dict(original, updates): @@ -76,14 +77,14 @@ def merge_dict(original, updates): merged_options = original.copy() merged_options.update(updates) except (AttributeError, ValueError) as e: - raise TypeError('original and updates must be a dictionary: %s' % e) + raise TypeError("original and updates must be a dictionary: %s" % e) return merged_options def number_to_bytes(num, num_bytes): - padded_hex = '%0*x' % (2 * num_bytes, num) - big_endian = binascii.a2b_hex(padded_hex.encode('ascii')) + padded_hex = "%0*x" % (2 * num_bytes, num) + big_endian = binascii.a2b_hex(padded_hex.encode("ascii")) return big_endian @@ -105,7 +106,7 @@ def raw_to_der_signature(raw_sig, curve): num_bytes = (num_bits + 7) // 8 if len(raw_sig) != 2 * num_bytes: - raise ValueError('Invalid signature') + raise ValueError("Invalid signature") r = bytes_to_number(raw_sig[:num_bytes]) s = bytes_to_number(raw_sig[num_bytes:]) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..82c7969 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,14 @@ +[tool.black] +line-length = 79 + + +[tool.isort] +atomic=true +force_grid_wrap=0 +include_trailing_comma=true +multi_line_output=3 +use_parentheses=true +combine_as_imports=true + +known_first_party="jwt" +known_third_party=["Crypto", "ecdsa", "pytest", "setuptools", "sphinx_rtd_theme"] diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..fb1850e --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[tool:pytest] +addopts = --cov-report term-missing --cov-config=.coveragerc --cov . diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 8823146..0000000 --- a/setup.cfg +++ /dev/null @@ -1,11 +0,0 @@ -[flake8] -max-line-length = 119 -exclude = - docs/, - .tox/ - -[bdist_wheel] -universal = 1 - -[tool:pytest] -addopts = --cov-report term-missing --cov-config=.coveragerc --cov . @@ -11,72 +11,64 @@ def get_version(package): """ Return package version as listed in `__version__` in `init.py`. """ - with open(os.path.join(package, '__init__.py'), 'rb') as init_py: - src = init_py.read().decode('utf-8') + with open(os.path.join(package, "__init__.py"), "rb") as init_py: + src = init_py.read().decode("utf-8") return re.search("__version__ = ['\"]([^'\"]+)['\"]", src).group(1) -version = get_version('jwt') +version = get_version("jwt") -with open(os.path.join(os.path.dirname(__file__), 'README.rst')) as readme: +with open(os.path.join(os.path.dirname(__file__), "README.rst")) as readme: long_description = readme.read() -if sys.argv[-1] == 'publish': +if sys.argv[-1] == "publish": if os.system("pip freeze | grep twine"): print("twine not installed.\nUse `pip install twine`.\nExiting.") sys.exit() os.system("python setup.py sdist bdist_wheel") os.system("twine upload dist/*") - print('You probably want to also tag the version now:') + print("You probably want to also tag the version now:") print(" git tag -a {0} -m 'version {0}'".format(version)) - print(' git push --tags') + print(" git push --tags") sys.exit() +EXTRAS_REQUIRE = { + "tests": ["pytest>=4.0.1,<5.0.0", "pytest-cov>=2.6.0,<3.0.0"], + "crypto": ["cryptography >= 1.4"], +} + +EXTRAS_REQUIRE["dev"] = ( + EXTRAS_REQUIRE["tests"] + EXTRAS_REQUIRE["crypto"] + ["mypy", "pre-commit"] +) + setup( - name='PyJWT', + name="PyJWT", version=version, - author='Jose Padilla', - author_email='hello@jpadilla.com', - description='JSON Web Token implementation in Python', - license='MIT', - keywords='jwt json web token security signing', - url='https://github.com/jpadilla/pyjwt', + author="Jose Padilla", + author_email="hello@jpadilla.com", + description="JSON Web Token implementation in Python", + license="MIT", + keywords="jwt json web token security signing", + url="https://github.com/jpadilla/pyjwt", packages=find_packages( exclude=["*.tests", "*.tests.*", "tests.*", "tests"] ), long_description=long_description, classifiers=[ - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'Natural Language :: English', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Topic :: Utilities', + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Natural Language :: English", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3.4", + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Topic :: Utilities", ], python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*", - extras_require=dict( - test=[ - 'pytest>=4.0.1,<5.0.0', - 'pytest-cov>=2.6.0,<3.0.0', - ], - crypto=['cryptography >= 1.4'], - flake8=[ - 'flake8', - 'flake8-import-order', - 'pep8-naming' - ], - mypy=[ - 'mypy' - ] - ), - entry_points={ - 'console_scripts': [ - 'pyjwt = jwt.__main__:main' - ] - } + extras_require=EXTRAS_REQUIRE, + entry_points={"console_scripts": ["pyjwt = jwt.__main__:main"]}, + options={"bdist_wheel": {"universal": "1"}}, ) diff --git a/tests/compat.py b/tests/compat.py index 12d6ec0..6d9dec9 100644 --- a/tests/compat.py +++ b/tests/compat.py @@ -5,8 +5,8 @@ import sys PY3 = sys.version_info[0] == 3 if PY3: - string_types = str, + string_types = (str,) text_type = str else: - string_types = basestring, + string_types = (basestring,) text_type = unicode diff --git a/tests/contrib/test_algorithms.py b/tests/contrib/test_algorithms.py index 6d5ca75..4a1550b 100644 --- a/tests/contrib/test_algorithms.py +++ b/tests/contrib/test_algorithms.py @@ -1,36 +1,40 @@ import base64 -from jwt.utils import force_bytes, force_unicode - import pytest +from jwt.utils import force_bytes, force_unicode + from ..utils import key_path try: from jwt.contrib.algorithms.pycrypto import RSAAlgorithm + has_pycrypto = True except ImportError: has_pycrypto = False try: from jwt.contrib.algorithms.py_ecdsa import ECAlgorithm + has_ecdsa = True except ImportError: has_ecdsa = False -@pytest.mark.skipif(not has_pycrypto, reason='Not supported without PyCrypto library') +@pytest.mark.skipif( + not has_pycrypto, reason="Not supported without PyCrypto library" +) class TestPycryptoAlgorithms: def test_rsa_should_parse_pem_public_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('testkey2_rsa.pub.pem'), 'r') as pem_key: + with open(key_path("testkey2_rsa.pub.pem"), "r") as pem_key: algo.prepare_key(pem_key.read()) def test_rsa_should_accept_unicode_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('testkey_rsa'), 'r') as rsa_key: + with open(key_path("testkey_rsa"), "r") as rsa_key: algo.prepare_key(force_unicode(rsa_key.read())) def test_rsa_should_reject_non_string_key(self): @@ -42,20 +46,23 @@ class TestPycryptoAlgorithms: def test_rsa_sign_should_generate_correct_signature_value(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - jwt_message = force_bytes('Hello World!') + jwt_message = force_bytes("Hello World!") - expected_sig = base64.b64decode(force_bytes( - 'yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp' - '10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl' - '2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix' - 'sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX' - 'fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA' - 'APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA==')) + expected_sig = base64.b64decode( + force_bytes( + "yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp" + "10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl" + "2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix" + "sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX" + "fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA" + "APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA==" + ) + ) - with open(key_path('testkey_rsa'), 'r') as keyfile: + with open(key_path("testkey_rsa"), "r") as keyfile: jwt_key = algo.prepare_key(keyfile.read()) - with open(key_path('testkey_rsa.pub'), 'r') as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: jwt_pub_key = algo.prepare_key(keyfile.read()) algo.sign(jwt_message, jwt_key) @@ -65,19 +72,22 @@ class TestPycryptoAlgorithms: def test_rsa_verify_should_return_false_if_signature_invalid(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - jwt_message = force_bytes('Hello World!') + jwt_message = force_bytes("Hello World!") - jwt_sig = base64.b64decode(force_bytes( - 'yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp' - '10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl' - '2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix' - 'sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX' - 'fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA' - 'APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA==')) + jwt_sig = base64.b64decode( + force_bytes( + "yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp" + "10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl" + "2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix" + "sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX" + "fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA" + "APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA==" + ) + ) - jwt_sig += force_bytes('123') # Signature is now invalid + jwt_sig += force_bytes("123") # Signature is now invalid - with open(key_path('testkey_rsa.pub'), 'r') as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: jwt_pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(jwt_message, jwt_pub_key, jwt_sig) @@ -86,17 +96,20 @@ class TestPycryptoAlgorithms: def test_rsa_verify_should_return_true_if_signature_valid(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - jwt_message = force_bytes('Hello World!') + jwt_message = force_bytes("Hello World!") - jwt_sig = base64.b64decode(force_bytes( - 'yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp' - '10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl' - '2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix' - 'sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX' - 'fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA' - 'APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA==')) + jwt_sig = base64.b64decode( + force_bytes( + "yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp" + "10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl" + "2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix" + "sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX" + "fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA" + "APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA==" + ) + ) - with open(key_path('testkey_rsa.pub'), 'r') as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: jwt_pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(jwt_message, jwt_pub_key, jwt_sig) @@ -105,14 +118,16 @@ class TestPycryptoAlgorithms: def test_rsa_prepare_key_should_be_idempotent(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('testkey_rsa.pub'), 'r') as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: jwt_pub_key_first = algo.prepare_key(keyfile.read()) jwt_pub_key_second = algo.prepare_key(jwt_pub_key_first) assert jwt_pub_key_first == jwt_pub_key_second -@pytest.mark.skipif(not has_ecdsa, reason='Not supported without ecdsa library') +@pytest.mark.skipif( + not has_ecdsa, reason="Not supported without ecdsa library" +) class TestEcdsaAlgorithms: def test_ec_should_reject_non_string_key(self): algo = ECAlgorithm(ECAlgorithm.SHA256) @@ -123,23 +138,26 @@ class TestEcdsaAlgorithms: def test_ec_should_accept_unicode_key(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - with open(key_path('testkey_ec'), 'r') as ec_key: + with open(key_path("testkey_ec"), "r") as ec_key: algo.prepare_key(force_unicode(ec_key.read())) def test_ec_sign_should_generate_correct_signature_value(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - jwt_message = force_bytes('Hello World!') + jwt_message = force_bytes("Hello World!") - expected_sig = base64.b64decode(force_bytes( - 'AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M' - 'mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw' - 'LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65')) + expected_sig = base64.b64decode( + force_bytes( + "AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M" + "mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw" + "LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65" + ) + ) - with open(key_path('testkey_ec'), 'r') as keyfile: + with open(key_path("testkey_ec"), "r") as keyfile: jwt_key = algo.prepare_key(keyfile.read()) - with open(key_path('testkey_ec.pub'), 'r') as keyfile: + with open(key_path("testkey_ec.pub"), "r") as keyfile: jwt_pub_key = algo.prepare_key(keyfile.read()) algo.sign(jwt_message, jwt_key) @@ -149,16 +167,19 @@ class TestEcdsaAlgorithms: def test_ec_verify_should_return_false_if_signature_invalid(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - jwt_message = force_bytes('Hello World!') + jwt_message = force_bytes("Hello World!") - jwt_sig = base64.b64decode(force_bytes( - 'AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M' - 'mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw' - 'LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65')) + jwt_sig = base64.b64decode( + force_bytes( + "AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M" + "mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw" + "LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65" + ) + ) - jwt_sig += force_bytes('123') # Signature is now invalid + jwt_sig += force_bytes("123") # Signature is now invalid - with open(key_path('testkey_ec.pub'), 'r') as keyfile: + with open(key_path("testkey_ec.pub"), "r") as keyfile: jwt_pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(jwt_message, jwt_pub_key, jwt_sig) @@ -167,14 +188,17 @@ class TestEcdsaAlgorithms: def test_ec_verify_should_return_true_if_signature_valid(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - jwt_message = force_bytes('Hello World!') + jwt_message = force_bytes("Hello World!") - jwt_sig = base64.b64decode(force_bytes( - 'AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M' - 'mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw' - 'LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65')) + jwt_sig = base64.b64decode( + force_bytes( + "AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M" + "mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw" + "LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65" + ) + ) - with open(key_path('testkey_ec.pub'), 'r') as keyfile: + with open(key_path("testkey_ec.pub"), "r") as keyfile: jwt_pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(jwt_message, jwt_pub_key, jwt_sig) @@ -183,7 +207,7 @@ class TestEcdsaAlgorithms: def test_ec_prepare_key_should_be_idempotent(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - with open(key_path('testkey_ec.pub'), 'r') as keyfile: + with open(key_path("testkey_ec.pub"), "r") as keyfile: jwt_pub_key_first = algo.prepare_key(keyfile.read()) jwt_pub_key_second = algo.prepare_key(jwt_pub_key_first) diff --git a/tests/keys/__init__.py b/tests/keys/__init__.py index 3727378..6b61caa 100644 --- a/tests/keys/__init__.py +++ b/tests/keys/__init__.py @@ -2,7 +2,6 @@ import json import os from jwt.utils import base64url_decode, force_bytes - from tests.utils import int_from_bytes BASE_PATH = os.path.dirname(os.path.abspath(__file__)) @@ -10,48 +9,50 @@ BASE_PATH = os.path.dirname(os.path.abspath(__file__)) def decode_value(val): decoded = base64url_decode(force_bytes(val)) - return int_from_bytes(decoded, 'big') + return int_from_bytes(decoded, "big") def load_hmac_key(): - with open(os.path.join(BASE_PATH, 'jwk_hmac.json'), 'r') as infile: + with open(os.path.join(BASE_PATH, "jwk_hmac.json"), "r") as infile: keyobj = json.load(infile) - return base64url_decode(force_bytes(keyobj['k'])) + return base64url_decode(force_bytes(keyobj["k"])) try: from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.backends import default_backend from jwt.algorithms import RSAAlgorithm + has_crypto = True except ImportError: has_crypto = False if has_crypto: + def load_rsa_key(): - with open(os.path.join(BASE_PATH, 'jwk_rsa_key.json'), 'r') as infile: + with open(os.path.join(BASE_PATH, "jwk_rsa_key.json"), "r") as infile: return RSAAlgorithm.from_jwk(infile.read()) def load_rsa_pub_key(): - with open(os.path.join(BASE_PATH, 'jwk_rsa_pub.json'), 'r') as infile: + with open(os.path.join(BASE_PATH, "jwk_rsa_pub.json"), "r") as infile: return RSAAlgorithm.from_jwk(infile.read()) def load_ec_key(): - with open(os.path.join(BASE_PATH, 'jwk_ec_key.json'), 'r') as infile: + with open(os.path.join(BASE_PATH, "jwk_ec_key.json"), "r") as infile: keyobj = json.load(infile) return ec.EllipticCurvePrivateNumbers( - private_value=decode_value(keyobj['d']), - public_numbers=load_ec_pub_key().public_numbers() + private_value=decode_value(keyobj["d"]), + public_numbers=load_ec_pub_key().public_numbers(), ) def load_ec_pub_key(): - with open(os.path.join(BASE_PATH, 'jwk_ec_pub.json'), 'r') as infile: + with open(os.path.join(BASE_PATH, "jwk_ec_pub.json"), "r") as infile: keyobj = json.load(infile) return ec.EllipticCurvePublicNumbers( - x=decode_value(keyobj['x']), - y=decode_value(keyobj['y']), - curve=ec.SECP521R1() + x=decode_value(keyobj["x"]), + y=decode_value(keyobj["y"]), + curve=ec.SECP521R1(), ).public_key(default_backend()) diff --git a/tests/test_algorithms.py b/tests/test_algorithms.py index 317bf3a..79d1b8e 100644 --- a/tests/test_algorithms.py +++ b/tests/test_algorithms.py @@ -1,18 +1,19 @@ import base64 import json +import pytest + from jwt.algorithms import Algorithm, HMACAlgorithm, NoneAlgorithm from jwt.exceptions import InvalidKeyError from jwt.utils import base64url_decode, force_bytes, force_unicode -import pytest - from .keys import load_hmac_key from .utils import key_path try: from jwt.algorithms import RSAAlgorithm, ECAlgorithm, RSAPSSAlgorithm from .keys import load_rsa_pub_key, load_ec_pub_key + has_crypto = True except ImportError: has_crypto = False @@ -23,37 +24,37 @@ class TestAlgorithms: algo = Algorithm() with pytest.raises(NotImplementedError): - algo.prepare_key('test') + algo.prepare_key("test") def test_algorithm_should_throw_exception_if_sign_not_impl(self): algo = Algorithm() with pytest.raises(NotImplementedError): - algo.sign('message', 'key') + algo.sign("message", "key") def test_algorithm_should_throw_exception_if_verify_not_impl(self): algo = Algorithm() with pytest.raises(NotImplementedError): - algo.verify('message', 'key', 'signature') + algo.verify("message", "key", "signature") def test_algorithm_should_throw_exception_if_to_jwk_not_impl(self): algo = Algorithm() with pytest.raises(NotImplementedError): - algo.from_jwk('value') + algo.from_jwk("value") def test_algorithm_should_throw_exception_if_from_jwk_not_impl(self): algo = Algorithm() with pytest.raises(NotImplementedError): - algo.to_jwk('value') + algo.to_jwk("value") def test_none_algorithm_should_throw_exception_if_key_is_not_none(self): algo = NoneAlgorithm() with pytest.raises(InvalidKeyError): - algo.prepare_key('123') + algo.prepare_key("123") def test_hmac_should_reject_nonstring_key(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) @@ -62,186 +63,214 @@ class TestAlgorithms: algo.prepare_key(object()) exception = context.value - assert str(exception) == 'Expected a string value' + assert str(exception) == "Expected a string value" def test_hmac_should_accept_unicode_key(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) - algo.prepare_key(force_unicode('awesome')) + algo.prepare_key(force_unicode("awesome")) def test_hmac_should_throw_exception_if_key_is_pem_public_key(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) with pytest.raises(InvalidKeyError): - with open(key_path('testkey2_rsa.pub.pem'), 'r') as keyfile: + with open(key_path("testkey2_rsa.pub.pem"), "r") as keyfile: algo.prepare_key(keyfile.read()) def test_hmac_should_throw_exception_if_key_is_x509_certificate(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) with pytest.raises(InvalidKeyError): - with open(key_path('testkey_rsa.cer'), 'r') as keyfile: + with open(key_path("testkey_rsa.cer"), "r") as keyfile: algo.prepare_key(keyfile.read()) def test_hmac_should_throw_exception_if_key_is_ssh_public_key(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) with pytest.raises(InvalidKeyError): - with open(key_path('testkey_rsa.pub'), 'r') as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: algo.prepare_key(keyfile.read()) def test_hmac_should_throw_exception_if_key_is_x509_cert(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) with pytest.raises(InvalidKeyError): - with open(key_path('testkey2_rsa.pub.pem'), 'r') as keyfile: + with open(key_path("testkey2_rsa.pub.pem"), "r") as keyfile: algo.prepare_key(keyfile.read()) def test_hmac_should_throw_exception_if_key_is_pkcs1_pem_public(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) with pytest.raises(InvalidKeyError): - with open(key_path('testkey_pkcs1.pub.pem'), 'r') as keyfile: + with open(key_path("testkey_pkcs1.pub.pem"), "r") as keyfile: algo.prepare_key(keyfile.read()) def test_hmac_jwk_should_parse_and_verify(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) - with open(key_path('jwk_hmac.json'), 'r') as keyfile: + with open(key_path("jwk_hmac.json"), "r") as keyfile: key = algo.from_jwk(keyfile.read()) - signature = algo.sign(b'Hello World!', key) - assert algo.verify(b'Hello World!', key, signature) + signature = algo.sign(b"Hello World!", key) + assert algo.verify(b"Hello World!", key, signature) def test_hmac_to_jwk_returns_correct_values(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) - key = algo.to_jwk('secret') + key = algo.to_jwk("secret") - assert json.loads(key) == {'kty': 'oct', 'k': 'c2VjcmV0'} + assert json.loads(key) == {"kty": "oct", "k": "c2VjcmV0"} def test_hmac_from_jwk_should_raise_exception_if_not_hmac_key(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) - with open(key_path('jwk_rsa_pub.json'), 'r') as keyfile: + with open(key_path("jwk_rsa_pub.json"), "r") as keyfile: with pytest.raises(InvalidKeyError): algo.from_jwk(keyfile.read()) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_should_parse_pem_public_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('testkey2_rsa.pub.pem'), 'r') as pem_key: + with open(key_path("testkey2_rsa.pub.pem"), "r") as pem_key: algo.prepare_key(pem_key.read()) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_should_accept_pem_private_key_bytes(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('testkey_rsa'), 'rb') as pem_key: + with open(key_path("testkey_rsa"), "rb") as pem_key: algo.prepare_key(pem_key.read()) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_should_accept_unicode_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('testkey_rsa'), 'r') as rsa_key: + with open(key_path("testkey_rsa"), "r") as rsa_key: algo.prepare_key(force_unicode(rsa_key.read())) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_should_reject_non_string_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) with pytest.raises(TypeError): algo.prepare_key(None) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_verify_should_return_false_if_signature_invalid(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - message = force_bytes('Hello World!') - - sig = base64.b64decode(force_bytes( - 'yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp' - '10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl' - '2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix' - 'sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX' - 'fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA' - 'APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA==')) + message = force_bytes("Hello World!") + + sig = base64.b64decode( + force_bytes( + "yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp" + "10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl" + "2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix" + "sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX" + "fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA" + "APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA==" + ) + ) - sig += force_bytes('123') # Signature is now invalid + sig += force_bytes("123") # Signature is now invalid - with open(key_path('testkey_rsa.pub'), 'r') as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(message, pub_key, sig) assert not result - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_jwk_public_and_private_keys_should_parse_and_verify(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('jwk_rsa_pub.json'), 'r') as keyfile: + with open(key_path("jwk_rsa_pub.json"), "r") as keyfile: pub_key = algo.from_jwk(keyfile.read()) - with open(key_path('jwk_rsa_key.json'), 'r') as keyfile: + with open(key_path("jwk_rsa_key.json"), "r") as keyfile: priv_key = algo.from_jwk(keyfile.read()) - signature = algo.sign(force_bytes('Hello World!'), priv_key) - assert algo.verify(force_bytes('Hello World!'), pub_key, signature) + signature = algo.sign(force_bytes("Hello World!"), priv_key) + assert algo.verify(force_bytes("Hello World!"), pub_key, signature) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_private_key_to_jwk_works_with_from_jwk(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('testkey_rsa'), 'r') as rsa_key: + with open(key_path("testkey_rsa"), "r") as rsa_key: orig_key = algo.prepare_key(force_unicode(rsa_key.read())) parsed_key = algo.from_jwk(algo.to_jwk(orig_key)) assert parsed_key.private_numbers() == orig_key.private_numbers() - assert parsed_key.private_numbers().public_numbers == orig_key.private_numbers().public_numbers + assert ( + parsed_key.private_numbers().public_numbers + == orig_key.private_numbers().public_numbers + ) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_public_key_to_jwk_works_with_from_jwk(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('testkey_rsa.pub'), 'r') as rsa_key: + with open(key_path("testkey_rsa.pub"), "r") as rsa_key: orig_key = algo.prepare_key(force_unicode(rsa_key.read())) parsed_key = algo.from_jwk(algo.to_jwk(orig_key)) assert parsed_key.public_numbers() == orig_key.public_numbers() - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_jwk_private_key_with_other_primes_is_invalid(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('jwk_rsa_key.json'), 'r') as keyfile: + with open(key_path("jwk_rsa_key.json"), "r") as keyfile: with pytest.raises(InvalidKeyError): keydata = json.loads(keyfile.read()) - keydata['oth'] = [] + keydata["oth"] = [] algo.from_jwk(json.dumps(keydata)) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_jwk_private_key_with_missing_values_is_invalid(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('jwk_rsa_key.json'), 'r') as keyfile: + with open(key_path("jwk_rsa_key.json"), "r") as keyfile: with pytest.raises(InvalidKeyError): keydata = json.loads(keyfile.read()) - del keydata['p'] + del keydata["p"] algo.from_jwk(json.dumps(keydata)) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_jwk_private_key_can_recover_prime_factors(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('jwk_rsa_key.json'), 'r') as keyfile: + with open(key_path("jwk_rsa_key.json"), "r") as keyfile: keybytes = keyfile.read() control_key = algo.from_jwk(keybytes).private_numbers() keydata = json.loads(keybytes) - delete_these = ['p', 'q', 'dp', 'dq', 'qi'] + delete_these = ["p", "q", "dp", "dq", "qi"] for field in delete_these: del keydata[field] @@ -254,206 +283,254 @@ class TestAlgorithms: assert control_key.dmq1 == parsed_key.dmq1 assert control_key.iqmp == parsed_key.iqmp - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_jwk_private_key_with_missing_required_values_is_invalid(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('jwk_rsa_key.json'), 'r') as keyfile: + with open(key_path("jwk_rsa_key.json"), "r") as keyfile: with pytest.raises(InvalidKeyError): keydata = json.loads(keyfile.read()) - del keydata['p'] + del keydata["p"] algo.from_jwk(json.dumps(keydata)) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_jwk_raises_exception_if_not_a_valid_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) # Invalid JSON with pytest.raises(InvalidKeyError): - algo.from_jwk('{not-a-real-key') + algo.from_jwk("{not-a-real-key") # Missing key parts with pytest.raises(InvalidKeyError): algo.from_jwk('{"kty": "RSA"}') - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_to_jwk_returns_correct_values_for_public_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('testkey_rsa.pub'), 'r') as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: pub_key = algo.prepare_key(keyfile.read()) key = algo.to_jwk(pub_key) expected = { - 'e': 'AQAB', - 'key_ops': ['verify'], - 'kty': 'RSA', - 'n': ( - '1HgzBfJv2cOjQryCwe8NEelriOTNFWKZUivevUrRhlqcmZJdCvuCJRr-xCN-' - 'OmO8qwgJJR98feNujxVg-J9Ls3_UOA4HcF9nYH6aqVXELAE8Hk_ALvxi96ms' - '1DDuAvQGaYZ-lANxlvxeQFOZSbjkz_9mh8aLeGKwqJLp3p-OhUBQpwvAUAPg' - '82-OUtgTW3nSljjeFr14B8qAneGSc_wl0ni--1SRZUXFSovzcqQOkla3W27r' - 'rLfrD6LXgj_TsDs4vD1PnIm1zcVenKT7TfYI17bsG_O_Wecwz2Nl19pL7gDo' - 'sNruF3ogJWNq1Lyn_ijPQnkPLpZHyhvuiycYcI3DiQ' + "e": "AQAB", + "key_ops": ["verify"], + "kty": "RSA", + "n": ( + "1HgzBfJv2cOjQryCwe8NEelriOTNFWKZUivevUrRhlqcmZJdCvuCJRr-xCN-" + "OmO8qwgJJR98feNujxVg-J9Ls3_UOA4HcF9nYH6aqVXELAE8Hk_ALvxi96ms" + "1DDuAvQGaYZ-lANxlvxeQFOZSbjkz_9mh8aLeGKwqJLp3p-OhUBQpwvAUAPg" + "82-OUtgTW3nSljjeFr14B8qAneGSc_wl0ni--1SRZUXFSovzcqQOkla3W27r" + "rLfrD6LXgj_TsDs4vD1PnIm1zcVenKT7TfYI17bsG_O_Wecwz2Nl19pL7gDo" + "sNruF3ogJWNq1Lyn_ijPQnkPLpZHyhvuiycYcI3DiQ" ), } assert json.loads(key) == expected - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_to_jwk_returns_correct_values_for_private_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('testkey_rsa'), 'r') as keyfile: + with open(key_path("testkey_rsa"), "r") as keyfile: priv_key = algo.prepare_key(keyfile.read()) key = algo.to_jwk(priv_key) expected = { - 'key_ops': [u'sign'], - 'kty': 'RSA', - 'e': 'AQAB', - 'n': ( - '1HgzBfJv2cOjQryCwe8NEelriOTNFWKZUivevUrRhlqcmZJdCvuCJRr-xCN-' - 'OmO8qwgJJR98feNujxVg-J9Ls3_UOA4HcF9nYH6aqVXELAE8Hk_ALvxi96ms' - '1DDuAvQGaYZ-lANxlvxeQFOZSbjkz_9mh8aLeGKwqJLp3p-OhUBQpwvAUAPg' - '82-OUtgTW3nSljjeFr14B8qAneGSc_wl0ni--1SRZUXFSovzcqQOkla3W27r' - 'rLfrD6LXgj_TsDs4vD1PnIm1zcVenKT7TfYI17bsG_O_Wecwz2Nl19pL7gDo' - 'sNruF3ogJWNq1Lyn_ijPQnkPLpZHyhvuiycYcI3DiQ' + "key_ops": [u"sign"], + "kty": "RSA", + "e": "AQAB", + "n": ( + "1HgzBfJv2cOjQryCwe8NEelriOTNFWKZUivevUrRhlqcmZJdCvuCJRr-xCN-" + "OmO8qwgJJR98feNujxVg-J9Ls3_UOA4HcF9nYH6aqVXELAE8Hk_ALvxi96ms" + "1DDuAvQGaYZ-lANxlvxeQFOZSbjkz_9mh8aLeGKwqJLp3p-OhUBQpwvAUAPg" + "82-OUtgTW3nSljjeFr14B8qAneGSc_wl0ni--1SRZUXFSovzcqQOkla3W27r" + "rLfrD6LXgj_TsDs4vD1PnIm1zcVenKT7TfYI17bsG_O_Wecwz2Nl19pL7gDo" + "sNruF3ogJWNq1Lyn_ijPQnkPLpZHyhvuiycYcI3DiQ" + ), + "d": ( + "rfbs8AWdB1RkLJRlC51LukrAvYl5UfU1TE6XRa4o-DTg2-03OXLNEMyVpMr" + "a47weEnu14StypzC8qXL7vxXOyd30SSFTffLfleaTg-qxgMZSDw-Fb_M-pU" + "HMPMEDYG-lgGma4l4fd1yTX2ATtoUo9BVOQgWS1LMZqi0ASEOkUfzlBgL04" + "UoaLhPSuDdLygdlDzgruVPnec0t1uOEObmrcWIkhwU2CGQzeLtuzX6OVgPh" + "k7xcnjbDurTTVpWH0R0gbZ5ukmQ2P-YuCX8T9iWNMGjPNSkb7h02s2Oe9ZR" + "zP007xQ0VF-Z7xyLuxk6ASmoX1S39ujSbk2WF0eXNPRgFwQ" + ), + "q": ( + "47hlW2f1ARuWYJf9Dl6MieXjdj2dGx9PL2UH0unVzJYInd56nqXNPrQrc5k" + "ZU65KApC9n9oKUwIxuqwAAbh8oGNEQDqnuTj-powCkdC6bwA8KH1Y-wotpq" + "_GSjxkNzjWRm2GArJSzZc6Fb8EuObOrAavKJ285-zMPCEfus1WZG0" + ), + "p": ( + "7tr0z929Lp4OHIRJjIKM_rDrWMPtRgnV-51pgWsN6qdpDzns_PgFwrHcoyY" + "sWIO-4yCdVWPxFOgEZ8xXTM_uwOe4VEmdZhw55Tx7axYZtmZYZbO_RIP4CG" + "mlJlOFTiYnxpr-2Cx6kIeQmd-hf7fA3tL018aEzwYMbFMcnAGnEg0" + ), + "qi": ( + "djo95mB0LVYikNPa-NgyDwLotLqrueb9IviMmn6zKHCwiOXReqXDX9slB8" + "RA15uv56bmN04O__NyVFcgJ2ef169GZHiRFIgIy0Pl8LYkMhCYKKhyqM7g" + "xN-SqGqDTKDC22j00S7jcvCaa1qadn1qbdfukZ4NXv7E2d_LO0Y2Kkc" + ), + "dp": ( + "tgZ2-tJpEdWxu1m1EzeKa644LHVjpTRptk7H0LDc8i6SieADEuWQvkb9df" + "fpY6tDFaQNQr3fQ6dtdAztmsP7l1b_ynwvT1nDZUcqZvl4ruBgDWFmKbjI" + "lOCt0v9jX6MEPP5xqBx9axdkw18BnGtUuHrbzHSlUX-yh_rumpVH1SE" + ), + "dq": ( + "xxCIuhD0YlWFbUcwFgGdBWcLIm_WCMGj7SB6aGu1VDTLr4Wu10TFWM0TNu" + "hc9YPker2gpj5qzAmdAzwcfWSSvXpJTYR43jfulBTMoj8-2o3wCM0anclW" + "AuKhin-kc4mh9ssDXRQZwlMymZP0QtaxUDw_nlfVrUCZgO7L1_ZsUTk" ), - 'd': ('rfbs8AWdB1RkLJRlC51LukrAvYl5UfU1TE6XRa4o-DTg2-03OXLNEMyVpMr' - 'a47weEnu14StypzC8qXL7vxXOyd30SSFTffLfleaTg-qxgMZSDw-Fb_M-pU' - 'HMPMEDYG-lgGma4l4fd1yTX2ATtoUo9BVOQgWS1LMZqi0ASEOkUfzlBgL04' - 'UoaLhPSuDdLygdlDzgruVPnec0t1uOEObmrcWIkhwU2CGQzeLtuzX6OVgPh' - 'k7xcnjbDurTTVpWH0R0gbZ5ukmQ2P-YuCX8T9iWNMGjPNSkb7h02s2Oe9ZR' - 'zP007xQ0VF-Z7xyLuxk6ASmoX1S39ujSbk2WF0eXNPRgFwQ'), - 'q': ('47hlW2f1ARuWYJf9Dl6MieXjdj2dGx9PL2UH0unVzJYInd56nqXNPrQrc5k' - 'ZU65KApC9n9oKUwIxuqwAAbh8oGNEQDqnuTj-powCkdC6bwA8KH1Y-wotpq' - '_GSjxkNzjWRm2GArJSzZc6Fb8EuObOrAavKJ285-zMPCEfus1WZG0'), - 'p': ('7tr0z929Lp4OHIRJjIKM_rDrWMPtRgnV-51pgWsN6qdpDzns_PgFwrHcoyY' - 'sWIO-4yCdVWPxFOgEZ8xXTM_uwOe4VEmdZhw55Tx7axYZtmZYZbO_RIP4CG' - 'mlJlOFTiYnxpr-2Cx6kIeQmd-hf7fA3tL018aEzwYMbFMcnAGnEg0'), - 'qi': ('djo95mB0LVYikNPa-NgyDwLotLqrueb9IviMmn6zKHCwiOXReqXDX9slB8' - 'RA15uv56bmN04O__NyVFcgJ2ef169GZHiRFIgIy0Pl8LYkMhCYKKhyqM7g' - 'xN-SqGqDTKDC22j00S7jcvCaa1qadn1qbdfukZ4NXv7E2d_LO0Y2Kkc'), - 'dp': ('tgZ2-tJpEdWxu1m1EzeKa644LHVjpTRptk7H0LDc8i6SieADEuWQvkb9df' - 'fpY6tDFaQNQr3fQ6dtdAztmsP7l1b_ynwvT1nDZUcqZvl4ruBgDWFmKbjI' - 'lOCt0v9jX6MEPP5xqBx9axdkw18BnGtUuHrbzHSlUX-yh_rumpVH1SE'), - 'dq': ('xxCIuhD0YlWFbUcwFgGdBWcLIm_WCMGj7SB6aGu1VDTLr4Wu10TFWM0TNu' - 'hc9YPker2gpj5qzAmdAzwcfWSSvXpJTYR43jfulBTMoj8-2o3wCM0anclW' - 'AuKhin-kc4mh9ssDXRQZwlMymZP0QtaxUDw_nlfVrUCZgO7L1_ZsUTk') } assert json.loads(key) == expected - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_to_jwk_raises_exception_on_invalid_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) with pytest.raises(InvalidKeyError): - algo.to_jwk({'not': 'a valid key'}) + algo.to_jwk({"not": "a valid key"}) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_from_jwk_raises_exception_on_invalid_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path('jwk_hmac.json'), 'r') as keyfile: + with open(key_path("jwk_hmac.json"), "r") as keyfile: with pytest.raises(InvalidKeyError): algo.from_jwk(keyfile.read()) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_ec_should_reject_non_string_key(self): algo = ECAlgorithm(ECAlgorithm.SHA256) with pytest.raises(TypeError): algo.prepare_key(None) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_ec_should_accept_unicode_key(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - with open(key_path('testkey_ec'), 'r') as ec_key: + with open(key_path("testkey_ec"), "r") as ec_key: algo.prepare_key(force_unicode(ec_key.read())) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_ec_should_accept_pem_private_key_bytes(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - with open(key_path('testkey_ec'), 'rb') as ec_key: + with open(key_path("testkey_ec"), "rb") as ec_key: algo.prepare_key(ec_key.read()) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_ec_should_accept_ssh_public_key_bytes(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - with open(key_path('testkey_ec_ssh.pub'), 'r') as ec_key: + with open(key_path("testkey_ec_ssh.pub"), "r") as ec_key: algo.prepare_key(ec_key.read()) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_ec_verify_should_return_false_if_signature_invalid(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - message = force_bytes('Hello World!') + message = force_bytes("Hello World!") # Mess up the signature by replacing a known byte - sig = base64.b64decode(force_bytes( - 'AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M' - 'mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw' - 'LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65'.replace('r', 's'))) + sig = base64.b64decode( + force_bytes( + "AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M" + "mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw" + "LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65".replace( + "r", "s" + ) + ) + ) - with open(key_path('testkey_ec.pub'), 'r') as keyfile: + with open(key_path("testkey_ec.pub"), "r") as keyfile: pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(message, pub_key, sig) assert not result - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_ec_verify_should_return_false_if_signature_wrong_length(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - message = force_bytes('Hello World!') + message = force_bytes("Hello World!") - sig = base64.b64decode(force_bytes('AC+m4Jf/xI3guAC6w0w3')) + sig = base64.b64decode(force_bytes("AC+m4Jf/xI3guAC6w0w3")) - with open(key_path('testkey_ec.pub'), 'r') as keyfile: + with open(key_path("testkey_ec.pub"), "r") as keyfile: pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(message, pub_key, sig) assert not result - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_pss_sign_then_verify_should_return_true(self): algo = RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256) - message = force_bytes('Hello World!') + message = force_bytes("Hello World!") - with open(key_path('testkey_rsa'), 'r') as keyfile: + with open(key_path("testkey_rsa"), "r") as keyfile: priv_key = algo.prepare_key(keyfile.read()) sig = algo.sign(message, priv_key) - with open(key_path('testkey_rsa.pub'), 'r') as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(message, pub_key, sig) assert result - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_pss_verify_should_return_false_if_signature_invalid(self): algo = RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256) - jwt_message = force_bytes('Hello World!') - - jwt_sig = base64.b64decode(force_bytes( - 'ywKAUGRIDC//6X+tjvZA96yEtMqpOrSppCNfYI7NKyon3P7doud5v65oWNu' - 'vQsz0fzPGfF7mQFGo9Cm9Vn0nljm4G6PtqZRbz5fXNQBH9k10gq34AtM02c' - '/cveqACQ8gF3zxWh6qr9jVqIpeMEaEBIkvqG954E0HT9s9ybHShgHX9mlWk' - '186/LopP4xe5c/hxOQjwhv6yDlTiwJFiqjNCvj0GyBKsc4iECLGIIO+4mC4' - 'daOCWqbpZDuLb1imKpmm8Nsm56kAxijMLZnpCcnPgyb7CqG+B93W9GHglA5' - 'drUeR1gRtO7vqbZMsCAQ4bpjXxwbYyjQlEVuMl73UL6sOWg==')) + jwt_message = force_bytes("Hello World!") + + jwt_sig = base64.b64decode( + force_bytes( + "ywKAUGRIDC//6X+tjvZA96yEtMqpOrSppCNfYI7NKyon3P7doud5v65oWNu" + "vQsz0fzPGfF7mQFGo9Cm9Vn0nljm4G6PtqZRbz5fXNQBH9k10gq34AtM02c" + "/cveqACQ8gF3zxWh6qr9jVqIpeMEaEBIkvqG954E0HT9s9ybHShgHX9mlWk" + "186/LopP4xe5c/hxOQjwhv6yDlTiwJFiqjNCvj0GyBKsc4iECLGIIO+4mC4" + "daOCWqbpZDuLb1imKpmm8Nsm56kAxijMLZnpCcnPgyb7CqG+B93W9GHglA5" + "drUeR1gRtO7vqbZMsCAQ4bpjXxwbYyjQlEVuMl73UL6sOWg==" + ) + ) - jwt_sig += force_bytes('123') # Signature is now invalid + jwt_sig += force_bytes("123") # Signature is now invalid - with open(key_path('testkey_rsa.pub'), 'r') as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: jwt_pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(jwt_message, jwt_pub_key, jwt_sig) @@ -474,16 +551,16 @@ class TestAlgorithmsRFC7520: Reference: https://tools.ietf.org/html/rfc7520#section-4.4 """ signing_input = force_bytes( - 'eyJhbGciOiJIUzI1NiIsImtpZCI6IjAxOGMwYWU1LTRkOWItNDcxYi1iZmQ2LWVlZ' - 'jMxNGJjNzAzNyJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ' - '29pbmcgb3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIG' - 'lmIHlvdSBkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmc' - 'gd2hlcmUgeW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4' + "eyJhbGciOiJIUzI1NiIsImtpZCI6IjAxOGMwYWU1LTRkOWItNDcxYi1iZmQ2LWVlZ" + "jMxNGJjNzAzNyJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ" + "29pbmcgb3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIG" + "lmIHlvdSBkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmc" + "gd2hlcmUgeW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4" ) - signature = base64url_decode(force_bytes( - 's0h6KThzkfBBBkLspW1h84VsJZFTsPPqMDA7g1Md7p0' - )) + signature = base64url_decode( + force_bytes("s0h6KThzkfBBBkLspW1h84VsJZFTsPPqMDA7g1Md7p0") + ) algo = HMACAlgorithm(HMACAlgorithm.SHA256) key = algo.prepare_key(load_hmac_key()) @@ -491,7 +568,9 @@ class TestAlgorithmsRFC7520: result = algo.verify(signing_input, key, signature) assert result - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsa_verify_should_return_true_for_test_vector(self): """ This test verifies that RSA PKCS v1.5 verification works with a known @@ -500,21 +579,23 @@ class TestAlgorithmsRFC7520: Reference: https://tools.ietf.org/html/rfc7520#section-4.1 """ signing_input = force_bytes( - 'eyJhbGciOiJSUzI1NiIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb' - 'XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb' - '3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS' - 'Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU' - 'geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4' + "eyJhbGciOiJSUzI1NiIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb" + "XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb" + "3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS" + "Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU" + "geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4" ) - signature = base64url_decode(force_bytes( - 'MRjdkly7_-oTPTS3AXP41iQIGKa80A0ZmTuV5MEaHoxnW2e5CZ5NlKtainoFmKZop' - 'dHM1O2U4mwzJdQx996ivp83xuglII7PNDi84wnB-BDkoBwA78185hX-Es4JIwmDLJ' - 'K3lfWRa-XtL0RnltuYv746iYTh_qHRD68BNt1uSNCrUCTJDt5aAE6x8wW1Kt9eRo4' - 'QPocSadnHXFxnt8Is9UzpERV0ePPQdLuW3IS_de3xyIrDaLGdjluPxUAhb6L2aXic' - '1U12podGU0KLUQSE_oI-ZnmKJ3F4uOZDnd6QZWJushZ41Axf_fcIe8u9ipH84ogor' - 'ee7vjbU5y18kDquDg' - )) + signature = base64url_decode( + force_bytes( + "MRjdkly7_-oTPTS3AXP41iQIGKa80A0ZmTuV5MEaHoxnW2e5CZ5NlKtainoFmKZop" + "dHM1O2U4mwzJdQx996ivp83xuglII7PNDi84wnB-BDkoBwA78185hX-Es4JIwmDLJ" + "K3lfWRa-XtL0RnltuYv746iYTh_qHRD68BNt1uSNCrUCTJDt5aAE6x8wW1Kt9eRo4" + "QPocSadnHXFxnt8Is9UzpERV0ePPQdLuW3IS_de3xyIrDaLGdjluPxUAhb6L2aXic" + "1U12podGU0KLUQSE_oI-ZnmKJ3F4uOZDnd6QZWJushZ41Axf_fcIe8u9ipH84ogor" + "ee7vjbU5y18kDquDg" + ) + ) algo = RSAAlgorithm(RSAAlgorithm.SHA256) key = algo.prepare_key(load_rsa_pub_key()) @@ -522,7 +603,9 @@ class TestAlgorithmsRFC7520: result = algo.verify(signing_input, key, signature) assert result - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_rsapss_verify_should_return_true_for_test_vector(self): """ This test verifies that RSA-PSS verification works with a known good @@ -531,21 +614,23 @@ class TestAlgorithmsRFC7520: Reference: https://tools.ietf.org/html/rfc7520#section-4.2 """ signing_input = force_bytes( - 'eyJhbGciOiJQUzM4NCIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb' - 'XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb' - '3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS' - 'Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU' - 'geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4' + "eyJhbGciOiJQUzM4NCIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb" + "XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb" + "3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS" + "Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU" + "geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4" ) - signature = base64url_decode(force_bytes( - 'cu22eBqkYDKgIlTpzDXGvaFfz6WGoz7fUDcfT0kkOy42miAh2qyBzk1xEsnk2IpN6' - '-tPid6VrklHkqsGqDqHCdP6O8TTB5dDDItllVo6_1OLPpcbUrhiUSMxbbXUvdvWXz' - 'g-UD8biiReQFlfz28zGWVsdiNAUf8ZnyPEgVFn442ZdNqiVJRmBqrYRXe8P_ijQ7p' - '8Vdz0TTrxUeT3lm8d9shnr2lfJT8ImUjvAA2Xez2Mlp8cBE5awDzT0qI0n6uiP1aC' - 'N_2_jLAeQTlqRHtfa64QQSUmFAAjVKPbByi7xho0uTOcbH510a6GYmJUAfmWjwZ6o' - 'D4ifKo8DYM-X72Eaw' - )) + signature = base64url_decode( + force_bytes( + "cu22eBqkYDKgIlTpzDXGvaFfz6WGoz7fUDcfT0kkOy42miAh2qyBzk1xEsnk2IpN6" + "-tPid6VrklHkqsGqDqHCdP6O8TTB5dDDItllVo6_1OLPpcbUrhiUSMxbbXUvdvWXz" + "g-UD8biiReQFlfz28zGWVsdiNAUf8ZnyPEgVFn442ZdNqiVJRmBqrYRXe8P_ijQ7p" + "8Vdz0TTrxUeT3lm8d9shnr2lfJT8ImUjvAA2Xez2Mlp8cBE5awDzT0qI0n6uiP1aC" + "N_2_jLAeQTlqRHtfa64QQSUmFAAjVKPbByi7xho0uTOcbH510a6GYmJUAfmWjwZ6o" + "D4ifKo8DYM-X72Eaw" + ) + ) algo = RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384) key = algo.prepare_key(load_rsa_pub_key()) @@ -553,7 +638,9 @@ class TestAlgorithmsRFC7520: result = algo.verify(signing_input, key, signature) assert result - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_ec_verify_should_return_true_for_test_vector(self): """ This test verifies that ECDSA verification works with a known good @@ -562,18 +649,20 @@ class TestAlgorithmsRFC7520: Reference: https://tools.ietf.org/html/rfc7520#section-4.3 """ signing_input = force_bytes( - 'eyJhbGciOiJFUzUxMiIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb' - 'XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb' - '3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS' - 'Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU' - 'geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4' + "eyJhbGciOiJFUzUxMiIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb" + "XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb" + "3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS" + "Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU" + "geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4" ) - signature = base64url_decode(force_bytes( - 'AE_R_YZCChjn4791jSQCrdPZCNYqHXCTZH0-JZGYNlaAjP2kqaluUIIUnC9qvbu9P' - 'lon7KRTzoNEuT4Va2cmL1eJAQy3mtPBu_u_sDDyYjnAMDxXPn7XrT0lw-kvAD890j' - 'l8e2puQens_IEKBpHABlsbEPX6sFY8OcGDqoRuBomu9xQ2' - )) + signature = base64url_decode( + force_bytes( + "AE_R_YZCChjn4791jSQCrdPZCNYqHXCTZH0-JZGYNlaAjP2kqaluUIIUnC9qvbu9P" + "lon7KRTzoNEuT4Va2cmL1eJAQy3mtPBu_u_sDDyYjnAMDxXPn7XrT0lw-kvAD890j" + "l8e2puQens_IEKBpHABlsbEPX6sFY8OcGDqoRuBomu9xQ2" + ) + ) algo = ECAlgorithm(ECAlgorithm.SHA512) key = algo.prepare_key(load_ec_pub_key()) diff --git a/tests/test_api_jws.py b/tests/test_api_jws.py index 4f70b56..b502124 100644 --- a/tests/test_api_jws.py +++ b/tests/test_api_jws.py @@ -1,23 +1,26 @@ - import json from decimal import Decimal +import pytest + from jwt.algorithms import Algorithm from jwt.api_jws import PyJWS from jwt.exceptions import ( - DecodeError, InvalidAlgorithmError, InvalidSignatureError, - InvalidTokenError + DecodeError, + InvalidAlgorithmError, + InvalidSignatureError, + InvalidTokenError, ) from jwt.utils import base64url_decode, force_bytes, force_unicode -import pytest - from .compat import string_types, text_type try: from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import ( - load_pem_private_key, load_pem_public_key, load_ssh_public_key + load_pem_private_key, + load_pem_public_key, + load_ssh_public_key, ) has_crypto = True @@ -33,167 +36,179 @@ def jws(): @pytest.fixture def payload(): """ Creates a sample jws claimset for use as a payload during tests """ - return force_bytes('hello world') + return force_bytes("hello world") class TestJWS: def test_register_algo_does_not_allow_duplicate_registration(self, jws): - jws.register_algorithm('AAA', Algorithm()) + jws.register_algorithm("AAA", Algorithm()) with pytest.raises(ValueError): - jws.register_algorithm('AAA', Algorithm()) + jws.register_algorithm("AAA", Algorithm()) def test_register_algo_rejects_non_algorithm_obj(self, jws): with pytest.raises(TypeError): - jws.register_algorithm('AAA123', {}) + jws.register_algorithm("AAA123", {}) def test_unregister_algo_removes_algorithm(self, jws): supported = jws.get_algorithms() - assert 'none' in supported - assert 'HS256' in supported + assert "none" in supported + assert "HS256" in supported - jws.unregister_algorithm('HS256') + jws.unregister_algorithm("HS256") supported = jws.get_algorithms() - assert 'HS256' not in supported + assert "HS256" not in supported def test_unregister_algo_throws_error_if_not_registered(self, jws): with pytest.raises(KeyError): - jws.unregister_algorithm('AAA') + jws.unregister_algorithm("AAA") def test_algo_parameter_removes_alg_from_algorithms_list(self, jws): - assert 'none' in jws.get_algorithms() - assert 'HS256' in jws.get_algorithms() + assert "none" in jws.get_algorithms() + assert "HS256" in jws.get_algorithms() - jws = PyJWS(algorithms=['HS256']) - assert 'none' not in jws.get_algorithms() - assert 'HS256' in jws.get_algorithms() + jws = PyJWS(algorithms=["HS256"]) + assert "none" not in jws.get_algorithms() + assert "HS256" in jws.get_algorithms() def test_override_options(self): - jws = PyJWS(options={'verify_signature': False}) + jws = PyJWS(options={"verify_signature": False}) - assert not jws.options['verify_signature'] + assert not jws.options["verify_signature"] def test_non_object_options_dont_persist(self, jws, payload): - token = jws.encode(payload, 'secret') + token = jws.encode(payload, "secret") - jws.decode(token, 'secret', options={'verify_signature': False}) + jws.decode(token, "secret", options={"verify_signature": False}) - assert jws.options['verify_signature'] + assert jws.options["verify_signature"] def test_options_must_be_dict(self, jws): pytest.raises(TypeError, PyJWS, options=object()) - pytest.raises(TypeError, PyJWS, options=('something')) + pytest.raises(TypeError, PyJWS, options=("something")) def test_encode_decode(self, jws, payload): - secret = 'secret' + secret = "secret" jws_message = jws.encode(payload, secret) decoded_payload = jws.decode(jws_message, secret) assert decoded_payload == payload - def test_decode_fails_when_alg_is_not_on_method_algorithms_param(self, jws, payload): - secret = 'secret' - jws_token = jws.encode(payload, secret, algorithm='HS256') + def test_decode_fails_when_alg_is_not_on_method_algorithms_param( + self, jws, payload + ): + secret = "secret" + jws_token = jws.encode(payload, secret, algorithm="HS256") jws.decode(jws_token, secret) with pytest.raises(InvalidAlgorithmError): - jws.decode(jws_token, secret, algorithms=['HS384']) + jws.decode(jws_token, secret, algorithms=["HS384"]) def test_decode_works_with_unicode_token(self, jws): - secret = 'secret' + secret = "secret" unicode_jws = text_type( - 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' - '.eyJoZWxsbyI6ICJ3b3JsZCJ9' - '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" + ".eyJoZWxsbyI6ICJ3b3JsZCJ9" + ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" + ) jws.decode(unicode_jws, secret) def test_decode_missing_segments_throws_exception(self, jws): - secret = 'secret' - example_jws = ('eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' - '.eyJoZWxsbyI6ICJ3b3JsZCJ9' - '') # Missing segment + secret = "secret" + example_jws = ( + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" + ".eyJoZWxsbyI6ICJ3b3JsZCJ9" + "" + ) # Missing segment with pytest.raises(DecodeError) as context: jws.decode(example_jws, secret) exception = context.value - assert str(exception) == 'Not enough segments' + assert str(exception) == "Not enough segments" def test_decode_invalid_token_type_is_none(self, jws): example_jws = None - example_secret = 'secret' + example_secret = "secret" with pytest.raises(DecodeError) as context: jws.decode(example_jws, example_secret) exception = context.value - assert 'Invalid token type' in str(exception) + assert "Invalid token type" in str(exception) def test_decode_invalid_token_type_is_int(self, jws): example_jws = 123 - example_secret = 'secret' + example_secret = "secret" with pytest.raises(DecodeError) as context: jws.decode(example_jws, example_secret) exception = context.value - assert 'Invalid token type' in str(exception) + assert "Invalid token type" in str(exception) def test_decode_with_non_mapping_header_throws_exception(self, jws): - secret = 'secret' - example_jws = ('MQ' # == 1 - '.eyJoZWxsbyI6ICJ3b3JsZCJ9' - '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') + secret = "secret" + example_jws = ( + "MQ" # == 1 + ".eyJoZWxsbyI6ICJ3b3JsZCJ9" + ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" + ) with pytest.raises(DecodeError) as context: jws.decode(example_jws, secret) exception = context.value - assert str(exception) == 'Invalid header string: must be a json object' + assert str(exception) == "Invalid header string: must be a json object" - def test_encode_algorithm_param_should_be_case_sensitive(self, jws, payload): + def test_encode_algorithm_param_should_be_case_sensitive( + self, jws, payload + ): - jws.encode(payload, 'secret', algorithm='HS256') + jws.encode(payload, "secret", algorithm="HS256") with pytest.raises(NotImplementedError) as context: - jws.encode(payload, None, algorithm='hs256') + jws.encode(payload, None, algorithm="hs256") exception = context.value - assert str(exception) == 'Algorithm not supported' + assert str(exception) == "Algorithm not supported" def test_decode_algorithm_param_should_be_case_sensitive(self, jws): - example_jws = ('eyJhbGciOiJoczI1NiIsInR5cCI6IkpXVCJ9' # alg = hs256 - '.eyJoZWxsbyI6IndvcmxkIn0' - '.5R_FEPE7SW2dT9GgIxPgZATjFGXfUDOSwo7TtO_Kd_g') + example_jws = ( + "eyJhbGciOiJoczI1NiIsInR5cCI6IkpXVCJ9" # alg = hs256 + ".eyJoZWxsbyI6IndvcmxkIn0" + ".5R_FEPE7SW2dT9GgIxPgZATjFGXfUDOSwo7TtO_Kd_g" + ) with pytest.raises(InvalidAlgorithmError) as context: - jws.decode(example_jws, 'secret') + jws.decode(example_jws, "secret") exception = context.value - assert str(exception) == 'Algorithm not supported' + assert str(exception) == "Algorithm not supported" def test_bad_secret(self, jws, payload): - right_secret = 'foo' - bad_secret = 'bar' + right_secret = "foo" + bad_secret = "bar" jws_message = jws.encode(payload, right_secret) with pytest.raises(DecodeError) as excinfo: # Backward compat for ticket #315 jws.decode(jws_message, bad_secret) - assert 'Signature verification failed' == str(excinfo.value) + assert "Signature verification failed" == str(excinfo.value) with pytest.raises(InvalidSignatureError) as excinfo: jws.decode(jws_message, bad_secret) - assert 'Signature verification failed' == str(excinfo.value) + assert "Signature verification failed" == str(excinfo.value) def test_decodes_valid_jws(self, jws, payload): - example_secret = 'secret' + example_secret = "secret" example_jws = ( - b'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.' - b'aGVsbG8gd29ybGQ.' - b'gEW0pdU4kxPthjtehYdhxB9mMOGajt1xCKlGGXDJ8PM') + b"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9." + b"aGVsbG8gd29ybGQ." + b"gEW0pdU4kxPthjtehYdhxB9mMOGajt1xCKlGGXDJ8PM" + ) decoded_payload = jws.decode(example_jws, example_secret) @@ -203,18 +218,21 @@ class TestJWS: # Used to test for regressions that could affect both # encoding / decoding operations equally (causing tests # to still pass). - @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library") + @pytest.mark.skipif( + not has_crypto, reason="Can't run without cryptography library" + ) def test_decodes_valid_es384_jws(self, jws): - example_payload = {'hello': 'world'} - with open('tests/keys/testkey_ec.pub', 'r') as fp: + example_payload = {"hello": "world"} + with open("tests/keys/testkey_ec.pub", "r") as fp: example_pubkey = fp.read() example_jws = ( - b'eyJhbGciOiJFUzM4NCIsInR5cCI6IkpXVCJ9' - b'.eyJoZWxsbyI6IndvcmxkIn0' - b'.AGtlemKghaIaYh1yeeekFH9fRuNY7hCaw5hUgZ5aG1N' - b'2F8FIbiKLaZKr8SiFdTimXFVTEmxpBQ9sRmdsDsnrM-1' - b'HAG0_zxxu0JyINOFT2iqF3URYl9HZ8kZWMeZAtXmn6Cw' - b'PXRJD2f7N-f7bJ5JeL9VT5beI2XD3FlK3GgRvI-eE-2Ik') + b"eyJhbGciOiJFUzM4NCIsInR5cCI6IkpXVCJ9" + b".eyJoZWxsbyI6IndvcmxkIn0" + b".AGtlemKghaIaYh1yeeekFH9fRuNY7hCaw5hUgZ5aG1N" + b"2F8FIbiKLaZKr8SiFdTimXFVTEmxpBQ9sRmdsDsnrM-1" + b"HAG0_zxxu0JyINOFT2iqF3URYl9HZ8kZWMeZAtXmn6Cw" + b"PXRJD2f7N-f7bJ5JeL9VT5beI2XD3FlK3GgRvI-eE-2Ik" + ) decoded_payload = jws.decode(example_jws, example_pubkey) json_payload = json.loads(force_unicode(decoded_payload)) @@ -224,40 +242,43 @@ class TestJWS: # Used to test for regressions that could affect both # encoding / decoding operations equally (causing tests # to still pass). - @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library") + @pytest.mark.skipif( + not has_crypto, reason="Can't run without cryptography library" + ) def test_decodes_valid_rs384_jws(self, jws): - example_payload = {'hello': 'world'} - with open('tests/keys/testkey_rsa.pub', 'r') as fp: + example_payload = {"hello": "world"} + with open("tests/keys/testkey_rsa.pub", "r") as fp: example_pubkey = fp.read() example_jws = ( - b'eyJhbGciOiJSUzM4NCIsInR5cCI6IkpXVCJ9' - b'.eyJoZWxsbyI6IndvcmxkIn0' - b'.yNQ3nI9vEDs7lEh-Cp81McPuiQ4ZRv6FL4evTYYAh1X' - b'lRTTR3Cz8pPA9Stgso8Ra9xGB4X3rlra1c8Jz10nTUju' - b'O06OMm7oXdrnxp1KIiAJDerWHkQ7l3dlizIk1bmMA457' - b'W2fNzNfHViuED5ISM081dgf_a71qBwJ_yShMMrSOfxDx' - b'mX9c4DjRogRJG8SM5PvpLqI_Cm9iQPGMvmYK7gzcq2cJ' - b'urHRJDJHTqIdpLWXkY7zVikeen6FhuGyn060Dz9gYq9t' - b'uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr' - b'qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A') + b"eyJhbGciOiJSUzM4NCIsInR5cCI6IkpXVCJ9" + b".eyJoZWxsbyI6IndvcmxkIn0" + b".yNQ3nI9vEDs7lEh-Cp81McPuiQ4ZRv6FL4evTYYAh1X" + b"lRTTR3Cz8pPA9Stgso8Ra9xGB4X3rlra1c8Jz10nTUju" + b"O06OMm7oXdrnxp1KIiAJDerWHkQ7l3dlizIk1bmMA457" + b"W2fNzNfHViuED5ISM081dgf_a71qBwJ_yShMMrSOfxDx" + b"mX9c4DjRogRJG8SM5PvpLqI_Cm9iQPGMvmYK7gzcq2cJ" + b"urHRJDJHTqIdpLWXkY7zVikeen6FhuGyn060Dz9gYq9t" + b"uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr" + b"qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A" + ) decoded_payload = jws.decode(example_jws, example_pubkey) json_payload = json.loads(force_unicode(decoded_payload)) assert json_payload == example_payload def test_load_verify_valid_jws(self, jws, payload): - example_secret = 'secret' + example_secret = "secret" example_jws = ( - b'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.' - b'aGVsbG8gd29ybGQ.' - b'SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI' + b"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." + b"aGVsbG8gd29ybGQ." + b"SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI" ) decoded_payload = jws.decode(example_jws, key=example_secret) assert decoded_payload == payload def test_allow_skip_verification(self, jws, payload): - right_secret = 'foo' + right_secret = "foo" jws_message = jws.encode(payload, right_secret) decoded_payload = jws.decode(jws_message, verify=False) @@ -265,34 +286,37 @@ class TestJWS: def test_verify_false_deprecated(self, jws, recwarn): example_jws = ( - b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' - b'.eyJoZWxsbyI6ICJ3b3JsZCJ9' - b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') + b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" + b".eyJoZWxsbyI6ICJ3b3JsZCJ9" + b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" + ) pytest.deprecated_call(jws.decode, example_jws, verify=False) def test_decode_with_optional_algorithms(self, jws): - example_secret = 'secret' + example_secret = "secret" example_jws = ( - b'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.' - b'aGVsbG8gd29ybGQ.' - b'SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI' + b"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." + b"aGVsbG8gd29ybGQ." + b"SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI" ) pytest.deprecated_call(jws.decode, example_jws, key=example_secret) def test_decode_no_algorithms_verify_signature_false(self, jws): - example_secret = 'secret' + example_secret = "secret" example_jws = ( - b'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.' - b'aGVsbG8gd29ybGQ.' - b'SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI' + b"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." + b"aGVsbG8gd29ybGQ." + b"SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI" ) try: pytest.deprecated_call( - jws.decode, example_jws, key=example_secret, - options={'verify_signature': False}, + jws.decode, + example_jws, + key=example_secret, + options={"verify_signature": False}, ) except pytest.fail.Exception: pass @@ -300,7 +324,7 @@ class TestJWS: assert False, "Unexpected DeprecationWarning raised." def test_load_no_verification(self, jws, payload): - right_secret = 'foo' + right_secret = "foo" jws_message = jws.encode(payload, right_secret) decoded_payload = jws.decode(jws_message, key=None, verify=False) @@ -308,50 +332,54 @@ class TestJWS: assert decoded_payload == payload def test_no_secret(self, jws, payload): - right_secret = 'foo' + right_secret = "foo" jws_message = jws.encode(payload, right_secret) with pytest.raises(DecodeError): jws.decode(jws_message) def test_verify_signature_with_no_secret(self, jws, payload): - right_secret = 'foo' + right_secret = "foo" jws_message = jws.encode(payload, right_secret) with pytest.raises(DecodeError) as exc: jws.decode(jws_message) - assert 'Signature verification' in str(exc.value) + assert "Signature verification" in str(exc.value) - def test_verify_signature_with_no_algo_header_throws_exception(self, jws, payload): + def test_verify_signature_with_no_algo_header_throws_exception( + self, jws, payload + ): example_jws = ( - b'e30' - b'.eyJhIjo1fQ' - b'.KEh186CjVw_Q8FadjJcaVnE7hO5Z9nHBbU8TgbhHcBY' + b"e30" + b".eyJhIjo1fQ" + b".KEh186CjVw_Q8FadjJcaVnE7hO5Z9nHBbU8TgbhHcBY" ) with pytest.raises(InvalidAlgorithmError): - jws.decode(example_jws, 'secret') + jws.decode(example_jws, "secret") def test_invalid_crypto_alg(self, jws, payload): with pytest.raises(NotImplementedError): - jws.encode(payload, 'secret', algorithm='HS1024') + jws.encode(payload, "secret", algorithm="HS1024") - @pytest.mark.skipif(has_crypto, reason='Scenario requires cryptography to not be installed') + @pytest.mark.skipif( + has_crypto, reason="Scenario requires cryptography to not be installed" + ) def test_missing_crypto_library_better_error_messages(self, jws, payload): with pytest.raises(NotImplementedError) as excinfo: - jws.encode(payload, 'secret', algorithm='RS256') - assert 'cryptography' in str(excinfo.value) + jws.encode(payload, "secret", algorithm="RS256") + assert "cryptography" in str(excinfo.value) def test_unicode_secret(self, jws, payload): - secret = '\xc2' + secret = "\xc2" jws_message = jws.encode(payload, secret) decoded_payload = jws.decode(jws_message, secret) assert decoded_payload == payload def test_nonascii_secret(self, jws, payload): - secret = '\xc2' # char value that ascii codec cannot decode + secret = "\xc2" # char value that ascii codec cannot decode jws_message = jws.encode(payload, secret) decoded_payload = jws.decode(jws_message, secret) @@ -359,7 +387,7 @@ class TestJWS: assert decoded_payload == payload def test_bytes_secret(self, jws, payload): - secret = b'\xc2' # char value that ascii codec cannot decode + secret = b"\xc2" # char value that ascii codec cannot decode jws_message = jws.encode(payload, secret) decoded_payload = jws.decode(jws_message, secret) @@ -368,51 +396,55 @@ class TestJWS: def test_decode_invalid_header_padding(self, jws): example_jws = ( - 'aeyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' - '.eyJoZWxsbyI6ICJ3b3JsZCJ9' - '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') - example_secret = 'secret' + "aeyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" + ".eyJoZWxsbyI6ICJ3b3JsZCJ9" + ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" + ) + example_secret = "secret" with pytest.raises(DecodeError) as exc: jws.decode(example_jws, example_secret) - assert 'header padding' in str(exc.value) + assert "header padding" in str(exc.value) def test_decode_invalid_header_string(self, jws): example_jws = ( - 'eyJhbGciOiAiSFMyNTbpIiwgInR5cCI6ICJKV1QifQ==' - '.eyJoZWxsbyI6ICJ3b3JsZCJ9' - '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') - example_secret = 'secret' + "eyJhbGciOiAiSFMyNTbpIiwgInR5cCI6ICJKV1QifQ==" + ".eyJoZWxsbyI6ICJ3b3JsZCJ9" + ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" + ) + example_secret = "secret" with pytest.raises(DecodeError) as exc: jws.decode(example_jws, example_secret) - assert 'Invalid header' in str(exc.value) + assert "Invalid header" in str(exc.value) def test_decode_invalid_payload_padding(self, jws): example_jws = ( - 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' - '.aeyJoZWxsbyI6ICJ3b3JsZCJ9' - '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') - example_secret = 'secret' + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" + ".aeyJoZWxsbyI6ICJ3b3JsZCJ9" + ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" + ) + example_secret = "secret" with pytest.raises(DecodeError) as exc: jws.decode(example_jws, example_secret) - assert 'Invalid payload padding' in str(exc.value) + assert "Invalid payload padding" in str(exc.value) def test_decode_invalid_crypto_padding(self, jws): example_jws = ( - 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' - '.eyJoZWxsbyI6ICJ3b3JsZCJ9' - '.aatvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') - example_secret = 'secret' + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" + ".eyJoZWxsbyI6ICJ3b3JsZCJ9" + ".aatvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" + ) + example_secret = "secret" with pytest.raises(DecodeError) as exc: jws.decode(example_jws, example_secret) - assert 'Invalid crypto padding' in str(exc.value) + assert "Invalid crypto padding" in str(exc.value) def test_decode_with_algo_none_should_fail(self, jws, payload): jws_message = jws.encode(payload, key=None, algorithm=None) @@ -420,95 +452,122 @@ class TestJWS: with pytest.raises(DecodeError): jws.decode(jws_message) - def test_decode_with_algo_none_and_verify_false_should_pass(self, jws, payload): + def test_decode_with_algo_none_and_verify_false_should_pass( + self, jws, payload + ): jws_message = jws.encode(payload, key=None, algorithm=None) jws.decode(jws_message, verify=False) def test_get_unverified_header_returns_header_values(self, jws, payload): - jws_message = jws.encode(payload, key='secret', algorithm='HS256', - headers={'kid': 'toomanysecrets'}) + jws_message = jws.encode( + payload, + key="secret", + algorithm="HS256", + headers={"kid": "toomanysecrets"}, + ) header = jws.get_unverified_header(jws_message) - assert 'kid' in header - assert header['kid'] == 'toomanysecrets' + assert "kid" in header + assert header["kid"] == "toomanysecrets" - def test_get_unverified_header_fails_on_bad_header_types(self, jws, payload): + def test_get_unverified_header_fails_on_bad_header_types( + self, jws, payload + ): # Contains a bad kid value (int 123 instead of string) example_jws = ( - 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6MTIzfQ' - '.eyJzdWIiOiIxMjM0NTY3ODkwIn0' - '.vs2WY54jfpKP3JGC73Vq5YlMsqM5oTZ1ZydT77SiZSk') + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6MTIzfQ" + ".eyJzdWIiOiIxMjM0NTY3ODkwIn0" + ".vs2WY54jfpKP3JGC73Vq5YlMsqM5oTZ1ZydT77SiZSk" + ) with pytest.raises(InvalidTokenError) as exc: jws.get_unverified_header(example_jws) - assert 'Key ID header parameter must be a string' == str(exc.value) + assert "Key ID header parameter must be a string" == str(exc.value) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_encode_decode_with_rsa_sha256(self, jws, payload): # PEM-formatted RSA key - with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file: - priv_rsakey = load_pem_private_key(force_bytes(rsa_priv_file.read()), - password=None, backend=default_backend()) - jws_message = jws.encode(payload, priv_rsakey, algorithm='RS256') + with open("tests/keys/testkey_rsa", "r") as rsa_priv_file: + priv_rsakey = load_pem_private_key( + force_bytes(rsa_priv_file.read()), + password=None, + backend=default_backend(), + ) + jws_message = jws.encode(payload, priv_rsakey, algorithm="RS256") - with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: - pub_rsakey = load_ssh_public_key(force_bytes(rsa_pub_file.read()), - backend=default_backend()) + with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file: + pub_rsakey = load_ssh_public_key( + force_bytes(rsa_pub_file.read()), backend=default_backend() + ) jws.decode(jws_message, pub_rsakey) # string-formatted key - with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file: + with open("tests/keys/testkey_rsa", "r") as rsa_priv_file: priv_rsakey = rsa_priv_file.read() - jws_message = jws.encode(payload, priv_rsakey, algorithm='RS256') + jws_message = jws.encode(payload, priv_rsakey, algorithm="RS256") - with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: + with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file: pub_rsakey = rsa_pub_file.read() jws.decode(jws_message, pub_rsakey) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_encode_decode_with_rsa_sha384(self, jws, payload): # PEM-formatted RSA key - with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file: - priv_rsakey = load_pem_private_key(force_bytes(rsa_priv_file.read()), - password=None, backend=default_backend()) - jws_message = jws.encode(payload, priv_rsakey, algorithm='RS384') - - with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: - pub_rsakey = load_ssh_public_key(force_bytes(rsa_pub_file.read()), - backend=default_backend()) + with open("tests/keys/testkey_rsa", "r") as rsa_priv_file: + priv_rsakey = load_pem_private_key( + force_bytes(rsa_priv_file.read()), + password=None, + backend=default_backend(), + ) + jws_message = jws.encode(payload, priv_rsakey, algorithm="RS384") + + with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file: + pub_rsakey = load_ssh_public_key( + force_bytes(rsa_pub_file.read()), backend=default_backend() + ) jws.decode(jws_message, pub_rsakey) # string-formatted key - with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file: + with open("tests/keys/testkey_rsa", "r") as rsa_priv_file: priv_rsakey = rsa_priv_file.read() - jws_message = jws.encode(payload, priv_rsakey, algorithm='RS384') + jws_message = jws.encode(payload, priv_rsakey, algorithm="RS384") - with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: + with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file: pub_rsakey = rsa_pub_file.read() jws.decode(jws_message, pub_rsakey) - @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') + @pytest.mark.skipif( + not has_crypto, reason="Not supported without cryptography library" + ) def test_encode_decode_with_rsa_sha512(self, jws, payload): # PEM-formatted RSA key - with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file: - priv_rsakey = load_pem_private_key(force_bytes(rsa_priv_file.read()), - password=None, backend=default_backend()) - jws_message = jws.encode(payload, priv_rsakey, algorithm='RS512') - - with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: - pub_rsakey = load_ssh_public_key(force_bytes(rsa_pub_file.read()), - backend=default_backend()) + with open("tests/keys/testkey_rsa", "r") as rsa_priv_file: + priv_rsakey = load_pem_private_key( + force_bytes(rsa_priv_file.read()), + password=None, + backend=default_backend(), + ) + jws_message = jws.encode(payload, priv_rsakey, algorithm="RS512") + + with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file: + pub_rsakey = load_ssh_public_key( + force_bytes(rsa_pub_file.read()), backend=default_backend() + ) jws.decode(jws_message, pub_rsakey) # string-formatted key - with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file: + with open("tests/keys/testkey_rsa", "r") as rsa_priv_file: priv_rsakey = rsa_priv_file.read() - jws_message = jws.encode(payload, priv_rsakey, algorithm='RS512') + jws_message = jws.encode(payload, priv_rsakey, algorithm="RS512") - with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file: + with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file: pub_rsakey = rsa_pub_file.read() jws.decode(jws_message, pub_rsakey) @@ -517,84 +576,103 @@ class TestJWS: jws_algorithms = jws.get_algorithms() if has_crypto: - assert 'RS256' in jws_algorithms - assert 'RS384' in jws_algorithms - assert 'RS512' in jws_algorithms - assert 'PS256' in jws_algorithms - assert 'PS384' in jws_algorithms - assert 'PS512' in jws_algorithms + assert "RS256" in jws_algorithms + assert "RS384" in jws_algorithms + assert "RS512" in jws_algorithms + assert "PS256" in jws_algorithms + assert "PS384" in jws_algorithms + assert "PS512" in jws_algorithms else: - assert 'RS256' not in jws_algorithms - assert 'RS384' not in jws_algorithms - assert 'RS512' not in jws_algorithms - assert 'PS256' not in jws_algorithms - assert 'PS384' not in jws_algorithms - assert 'PS512' not in jws_algorithms - - @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library") + assert "RS256" not in jws_algorithms + assert "RS384" not in jws_algorithms + assert "RS512" not in jws_algorithms + assert "PS256" not in jws_algorithms + assert "PS384" not in jws_algorithms + assert "PS512" not in jws_algorithms + + @pytest.mark.skipif( + not has_crypto, reason="Can't run without cryptography library" + ) def test_encode_decode_with_ecdsa_sha256(self, jws, payload): # PEM-formatted EC key - with open('tests/keys/testkey_ec', 'r') as ec_priv_file: - priv_eckey = load_pem_private_key(force_bytes(ec_priv_file.read()), - password=None, backend=default_backend()) - jws_message = jws.encode(payload, priv_eckey, algorithm='ES256') - - with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file: - pub_eckey = load_pem_public_key(force_bytes(ec_pub_file.read()), - backend=default_backend()) + with open("tests/keys/testkey_ec", "r") as ec_priv_file: + priv_eckey = load_pem_private_key( + force_bytes(ec_priv_file.read()), + password=None, + backend=default_backend(), + ) + jws_message = jws.encode(payload, priv_eckey, algorithm="ES256") + + with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file: + pub_eckey = load_pem_public_key( + force_bytes(ec_pub_file.read()), backend=default_backend() + ) jws.decode(jws_message, pub_eckey) # string-formatted key - with open('tests/keys/testkey_ec', 'r') as ec_priv_file: + with open("tests/keys/testkey_ec", "r") as ec_priv_file: priv_eckey = ec_priv_file.read() - jws_message = jws.encode(payload, priv_eckey, algorithm='ES256') + jws_message = jws.encode(payload, priv_eckey, algorithm="ES256") - with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file: + with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file: pub_eckey = ec_pub_file.read() jws.decode(jws_message, pub_eckey) - @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library") + @pytest.mark.skipif( + not has_crypto, reason="Can't run without cryptography library" + ) def test_encode_decode_with_ecdsa_sha384(self, jws, payload): # PEM-formatted EC key - with open('tests/keys/testkey_ec', 'r') as ec_priv_file: - priv_eckey = load_pem_private_key(force_bytes(ec_priv_file.read()), - password=None, backend=default_backend()) - jws_message = jws.encode(payload, priv_eckey, algorithm='ES384') - - with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file: - pub_eckey = load_pem_public_key(force_bytes(ec_pub_file.read()), - backend=default_backend()) + with open("tests/keys/testkey_ec", "r") as ec_priv_file: + priv_eckey = load_pem_private_key( + force_bytes(ec_priv_file.read()), + password=None, + backend=default_backend(), + ) + jws_message = jws.encode(payload, priv_eckey, algorithm="ES384") + + with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file: + pub_eckey = load_pem_public_key( + force_bytes(ec_pub_file.read()), backend=default_backend() + ) jws.decode(jws_message, pub_eckey) # string-formatted key - with open('tests/keys/testkey_ec', 'r') as ec_priv_file: + with open("tests/keys/testkey_ec", "r") as ec_priv_file: priv_eckey = ec_priv_file.read() - jws_message = jws.encode(payload, priv_eckey, algorithm='ES384') + jws_message = jws.encode(payload, priv_eckey, algorithm="ES384") - with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file: + with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file: pub_eckey = ec_pub_file.read() jws.decode(jws_message, pub_eckey) - @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library") + @pytest.mark.skipif( + not has_crypto, reason="Can't run without cryptography library" + ) def test_encode_decode_with_ecdsa_sha512(self, jws, payload): # PEM-formatted EC key - with open('tests/keys/testkey_ec', 'r') as ec_priv_file: - priv_eckey = load_pem_private_key(force_bytes(ec_priv_file.read()), - password=None, backend=default_backend()) - jws_message = jws.encode(payload, priv_eckey, algorithm='ES521') + with open("tests/keys/testkey_ec", "r") as ec_priv_file: + priv_eckey = load_pem_private_key( + force_bytes(ec_priv_file.read()), + password=None, + backend=default_backend(), + ) + jws_message = jws.encode(payload, priv_eckey, algorithm="ES521") - with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file: - pub_eckey = load_pem_public_key(force_bytes(ec_pub_file.read()), backend=default_backend()) + with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file: + pub_eckey = load_pem_public_key( + force_bytes(ec_pub_file.read()), backend=default_backend() + ) jws.decode(jws_message, pub_eckey) # string-formatted key - with open('tests/keys/testkey_ec', 'r') as ec_priv_file: + with open("tests/keys/testkey_ec", "r") as ec_priv_file: priv_eckey = ec_priv_file.read() - jws_message = jws.encode(payload, priv_eckey, algorithm='ES521') + jws_message = jws.encode(payload, priv_eckey, algorithm="ES521") - with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file: + with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file: pub_eckey = ec_pub_file.read() jws.decode(jws_message, pub_eckey) @@ -603,62 +681,61 @@ class TestJWS: jws_algorithms = jws.get_algorithms() if has_crypto: - assert 'ES256' in jws_algorithms - assert 'ES384' in jws_algorithms - assert 'ES521' in jws_algorithms + assert "ES256" in jws_algorithms + assert "ES384" in jws_algorithms + assert "ES521" in jws_algorithms else: - assert 'ES256' not in jws_algorithms - assert 'ES384' not in jws_algorithms - assert 'ES521' not in jws_algorithms + assert "ES256" not in jws_algorithms + assert "ES384" not in jws_algorithms + assert "ES521" not in jws_algorithms def test_skip_check_signature(self, jws): - token = ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" - ".eyJzb21lIjoicGF5bG9hZCJ9" - ".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA") - jws.decode(token, 'secret', options={'verify_signature': False}) + token = ( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" + ".eyJzb21lIjoicGF5bG9hZCJ9" + ".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA" + ) + jws.decode(token, "secret", options={"verify_signature": False}) def test_decode_options_must_be_dict(self, jws, payload): - token = jws.encode(payload, 'secret') + token = jws.encode(payload, "secret") with pytest.raises(TypeError): - jws.decode(token, 'secret', options=object()) + jws.decode(token, "secret", options=object()) with pytest.raises(TypeError): - jws.decode(token, 'secret', options='something') + jws.decode(token, "secret", options="something") def test_custom_json_encoder(self, jws, payload): - class CustomJSONEncoder(json.JSONEncoder): - def default(self, o): if isinstance(o, Decimal): - return 'it worked' + return "it worked" return super(CustomJSONEncoder, self).default(o) - data = { - 'some_decimal': Decimal('2.2') - } + data = {"some_decimal": Decimal("2.2")} with pytest.raises(TypeError): - jws.encode(payload, 'secret', headers=data) + jws.encode(payload, "secret", headers=data) - token = jws.encode(payload, 'secret', headers=data, - json_encoder=CustomJSONEncoder) + token = jws.encode( + payload, "secret", headers=data, json_encoder=CustomJSONEncoder + ) - header = force_bytes(force_unicode(token).split('.')[0]) + header = force_bytes(force_unicode(token).split(".")[0]) header = json.loads(force_unicode(base64url_decode(header))) - assert 'some_decimal' in header - assert header['some_decimal'] == 'it worked' + assert "some_decimal" in header + assert header["some_decimal"] == "it worked" def test_encode_headers_parameter_adds_headers(self, jws, payload): - headers = {'testheader': True} - token = jws.encode(payload, 'secret', headers=headers) + headers = {"testheader": True} + token = jws.encode(payload, "secret", headers=headers) if not isinstance(token, string_types): token = token.decode() - header = token[0:token.index('.')].encode() + header = token[0 : token.index(".")].encode() header = base64url_decode(header) if not isinstance(header, text_type): @@ -666,16 +743,16 @@ class TestJWS: header_obj = json.loads(header) - assert 'testheader' in header_obj - assert header_obj['testheader'] == headers['testheader'] + assert "testheader" in header_obj + assert header_obj["testheader"] == headers["testheader"] def test_encode_fails_on_invalid_kid_types(self, jws, payload): with pytest.raises(InvalidTokenError) as exc: - jws.encode(payload, 'secret', headers={'kid': 123}) + jws.encode(payload, "secret", headers={"kid": 123}) - assert 'Key ID header parameter must be a string' == str(exc.value) + assert "Key ID header parameter must be a string" == str(exc.value) with pytest.raises(InvalidTokenError) as exc: - jws.encode(payload, 'secret', headers={'kid': None}) + jws.encode(payload, "secret", headers={"kid": None}) - assert 'Key ID header parameter must be a string' == str(exc.value) + assert "Key ID header parameter must be a string" == str(exc.value) diff --git a/tests/test_api_jwt.py b/tests/test_api_jwt.py index 8d07f3f..e4065c6 100644 --- a/tests/test_api_jwt.py +++ b/tests/test_api_jwt.py @@ -1,19 +1,22 @@ - import json import time from calendar import timegm from datetime import datetime, timedelta from decimal import Decimal +import pytest + from jwt.api_jwt import PyJWT from jwt.exceptions import ( - DecodeError, ExpiredSignatureError, ImmatureSignatureError, - InvalidAudienceError, InvalidIssuedAtError, InvalidIssuerError, - MissingRequiredClaimError + DecodeError, + ExpiredSignatureError, + ImmatureSignatureError, + InvalidAudienceError, + InvalidIssuedAtError, + InvalidIssuerError, + MissingRequiredClaimError, ) -import pytest - from .test_api_jws import has_crypto from .utils import utc_timestamp @@ -26,32 +29,30 @@ def jwt(): @pytest.fixture def payload(): """ Creates a sample JWT claimset for use as a payload during tests """ - return { - 'iss': 'jeff', - 'exp': utc_timestamp() + 15, - 'claim': 'insanity' - } + return {"iss": "jeff", "exp": utc_timestamp() + 15, "claim": "insanity"} class TestJWT: def test_decodes_valid_jwt(self, jwt): - example_payload = {'hello': 'world'} - example_secret = 'secret' + example_payload = {"hello": "world"} + example_secret = "secret" example_jwt = ( - b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' - b'.eyJoZWxsbyI6ICJ3b3JsZCJ9' - b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') + b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" + b".eyJoZWxsbyI6ICJ3b3JsZCJ9" + b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" + ) decoded_payload = jwt.decode(example_jwt, example_secret) assert decoded_payload == example_payload def test_load_verify_valid_jwt(self, jwt): - example_payload = {'hello': 'world'} - example_secret = 'secret' + example_payload = {"hello": "world"} + example_secret = "secret" example_jwt = ( - b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' - b'.eyJoZWxsbyI6ICJ3b3JsZCJ9' - b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') + b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" + b".eyJoZWxsbyI6ICJ3b3JsZCJ9" + b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" + ) decoded_payload = jwt.decode(example_jwt, key=example_secret) @@ -59,134 +60,157 @@ class TestJWT: def test_decode_invalid_payload_string(self, jwt): example_jwt = ( - 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.aGVsb' - 'G8gd29ybGQ.SIr03zM64awWRdPrAM_61QWsZchAtgDV' - '3pphfHPPWkI') - example_secret = 'secret' + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.aGVsb" + "G8gd29ybGQ.SIr03zM64awWRdPrAM_61QWsZchAtgDV" + "3pphfHPPWkI" + ) + example_secret = "secret" with pytest.raises(DecodeError) as exc: jwt.decode(example_jwt, example_secret) - assert 'Invalid payload string' in str(exc.value) + assert "Invalid payload string" in str(exc.value) def test_decode_with_non_mapping_payload_throws_exception(self, jwt): - secret = 'secret' - example_jwt = ('eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.' - 'MQ.' # == 1 - 'AbcSR3DWum91KOgfKxUHm78rLs_DrrZ1CrDgpUFFzls') + secret = "secret" + example_jwt = ( + "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9." + "MQ." # == 1 + "AbcSR3DWum91KOgfKxUHm78rLs_DrrZ1CrDgpUFFzls" + ) with pytest.raises(DecodeError) as context: jwt.decode(example_jwt, secret) exception = context.value - assert str(exception) == 'Invalid payload string: must be a json object' + assert ( + str(exception) == "Invalid payload string: must be a json object" + ) def test_decode_with_invalid_audience_param_throws_exception(self, jwt): - secret = 'secret' - example_jwt = ('eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9' - '.eyJoZWxsbyI6ICJ3b3JsZCJ9' - '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8') + secret = "secret" + example_jwt = ( + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" + ".eyJoZWxsbyI6ICJ3b3JsZCJ9" + ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" + ) with pytest.raises(TypeError) as context: jwt.decode(example_jwt, secret, audience=1) exception = context.value - assert str(exception) == 'audience must be a string, iterable, or None' + assert str(exception) == "audience must be a string, iterable, or None" def test_decode_with_nonlist_aud_claim_throws_exception(self, jwt): - secret = 'secret' - example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9' - '.eyJoZWxsbyI6IndvcmxkIiwiYXVkIjoxfQ' # aud = 1 - '.Rof08LBSwbm8Z_bhA2N3DFY-utZR1Gi9rbIS5Zthnnc') + secret = "secret" + example_jwt = ( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" + ".eyJoZWxsbyI6IndvcmxkIiwiYXVkIjoxfQ" # aud = 1 + ".Rof08LBSwbm8Z_bhA2N3DFY-utZR1Gi9rbIS5Zthnnc" + ) with pytest.raises(InvalidAudienceError) as context: - jwt.decode(example_jwt, secret, audience='my_audience') + jwt.decode(example_jwt, secret, audience="my_audience") exception = context.value - assert str(exception) == 'Invalid claim format in token' + assert str(exception) == "Invalid claim format in token" def test_decode_with_invalid_aud_list_member_throws_exception(self, jwt): - secret = 'secret' - example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9' - '.eyJoZWxsbyI6IndvcmxkIiwiYXVkIjpbMV19' - '.iQgKpJ8shetwNMIosNXWBPFB057c2BHs-8t1d2CCM2A') + secret = "secret" + example_jwt = ( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" + ".eyJoZWxsbyI6IndvcmxkIiwiYXVkIjpbMV19" + ".iQgKpJ8shetwNMIosNXWBPFB057c2BHs-8t1d2CCM2A" + ) with pytest.raises(InvalidAudienceError) as context: - jwt.decode(example_jwt, secret, audience='my_audience') + jwt.decode(example_jwt, secret, audience="my_audience") exception = context.value - assert str(exception) == 'Invalid claim format in token' + assert str(exception) == "Invalid claim format in token" def test_encode_bad_type(self, jwt): - types = ['string', tuple(), list(), 42, set()] + types = ["string", tuple(), list(), 42, set()] for t in types: - pytest.raises(TypeError, lambda: jwt.encode(t, 'secret')) + pytest.raises(TypeError, lambda: jwt.encode(t, "secret")) def test_decode_raises_exception_if_exp_is_not_int(self, jwt): # >>> jwt.encode({'exp': 'not-an-int'}, 'secret') - example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.' - 'eyJleHAiOiJub3QtYW4taW50In0.' - 'P65iYgoHtBqB07PMtBSuKNUEIPPPfmjfJG217cEE66s') + example_jwt = ( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." + "eyJleHAiOiJub3QtYW4taW50In0." + "P65iYgoHtBqB07PMtBSuKNUEIPPPfmjfJG217cEE66s" + ) with pytest.raises(DecodeError) as exc: - jwt.decode(example_jwt, 'secret') + jwt.decode(example_jwt, "secret") - assert 'exp' in str(exc.value) + assert "exp" in str(exc.value) def test_decode_raises_exception_if_iat_is_not_int(self, jwt): # >>> jwt.encode({'iat': 'not-an-int'}, 'secret') - example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.' - 'eyJpYXQiOiJub3QtYW4taW50In0.' - 'H1GmcQgSySa5LOKYbzGm--b1OmRbHFkyk8pq811FzZM') + example_jwt = ( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." + "eyJpYXQiOiJub3QtYW4taW50In0." + "H1GmcQgSySa5LOKYbzGm--b1OmRbHFkyk8pq811FzZM" + ) with pytest.raises(InvalidIssuedAtError): - jwt.decode(example_jwt, 'secret') + jwt.decode(example_jwt, "secret") def test_decode_raises_exception_if_nbf_is_not_int(self, jwt): # >>> jwt.encode({'nbf': 'not-an-int'}, 'secret') - example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.' - 'eyJuYmYiOiJub3QtYW4taW50In0.' - 'c25hldC8G2ZamC8uKpax9sYMTgdZo3cxrmzFHaAAluw') + example_jwt = ( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." + "eyJuYmYiOiJub3QtYW4taW50In0." + "c25hldC8G2ZamC8uKpax9sYMTgdZo3cxrmzFHaAAluw" + ) with pytest.raises(DecodeError): - jwt.decode(example_jwt, 'secret') + jwt.decode(example_jwt, "secret") def test_encode_datetime(self, jwt): - secret = 'secret' + secret = "secret" current_datetime = datetime.utcnow() payload = { - 'exp': current_datetime, - 'iat': current_datetime, - 'nbf': current_datetime + "exp": current_datetime, + "iat": current_datetime, + "nbf": current_datetime, } jwt_message = jwt.encode(payload, secret) decoded_payload = jwt.decode(jwt_message, secret, leeway=1) - assert (decoded_payload['exp'] == - timegm(current_datetime.utctimetuple())) - assert (decoded_payload['iat'] == - timegm(current_datetime.utctimetuple())) - assert (decoded_payload['nbf'] == - timegm(current_datetime.utctimetuple())) + assert decoded_payload["exp"] == timegm( + current_datetime.utctimetuple() + ) + assert decoded_payload["iat"] == timegm( + current_datetime.utctimetuple() + ) + assert decoded_payload["nbf"] == timegm( + current_datetime.utctimetuple() + ) # 'Control' Elliptic Curve JWT created by another library. # Used to test for regressions that could affect both # encoding / decoding operations equally (causing tests # to still pass). - @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library") + @pytest.mark.skipif( + not has_crypto, reason="Can't run without cryptography library" + ) def test_decodes_valid_es384_jwt(self, jwt): - example_payload = {'hello': 'world'} - with open('tests/keys/testkey_ec.pub', 'r') as fp: + example_payload = {"hello": "world"} + with open("tests/keys/testkey_ec.pub", "r") as fp: example_pubkey = fp.read() example_jwt = ( - b'eyJhbGciOiJFUzM4NCIsInR5cCI6IkpXVCJ9' - b'.eyJoZWxsbyI6IndvcmxkIn0' - b'.AddMgkmRhzqptDYqlmy_f2dzM6O9YZmVo-txs_CeAJD' - b'NoD8LN7YiPeLmtIhkO5_VZeHHKvtQcGc4lsq-Y72c4dK' - b'pANr1f6HEYhjpBc03u_bv06PYMcr5N2-9k97-qf-JCSb' - b'zqW6R250Q7gNCX5R7NrCl7MTM4DTBZkGbUlqsFUleiGlj') + b"eyJhbGciOiJFUzM4NCIsInR5cCI6IkpXVCJ9" + b".eyJoZWxsbyI6IndvcmxkIn0" + b".AddMgkmRhzqptDYqlmy_f2dzM6O9YZmVo-txs_CeAJD" + b"NoD8LN7YiPeLmtIhkO5_VZeHHKvtQcGc4lsq-Y72c4dK" + b"pANr1f6HEYhjpBc03u_bv06PYMcr5N2-9k97-qf-JCSb" + b"zqW6R250Q7gNCX5R7NrCl7MTM4DTBZkGbUlqsFUleiGlj" + ) decoded_payload = jwt.decode(example_jwt, example_pubkey) assert decoded_payload == example_payload @@ -195,59 +219,62 @@ class TestJWT: # Used to test for regressions that could affect both # encoding / decoding operations equally (causing tests # to still pass). - @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library") + @pytest.mark.skipif( + not has_crypto, reason="Can't run without cryptography library" + ) def test_decodes_valid_rs384_jwt(self, jwt): - example_payload = {'hello': 'world'} - with open('tests/keys/testkey_rsa.pub', 'r') as fp: + example_payload = {"hello": "world"} + with open("tests/keys/testkey_rsa.pub", "r") as fp: example_pubkey = fp.read() example_jwt = ( - b'eyJhbGciOiJSUzM4NCIsInR5cCI6IkpXVCJ9' - b'.eyJoZWxsbyI6IndvcmxkIn0' - b'.yNQ3nI9vEDs7lEh-Cp81McPuiQ4ZRv6FL4evTYYAh1X' - b'lRTTR3Cz8pPA9Stgso8Ra9xGB4X3rlra1c8Jz10nTUju' - b'O06OMm7oXdrnxp1KIiAJDerWHkQ7l3dlizIk1bmMA457' - b'W2fNzNfHViuED5ISM081dgf_a71qBwJ_yShMMrSOfxDx' - b'mX9c4DjRogRJG8SM5PvpLqI_Cm9iQPGMvmYK7gzcq2cJ' - b'urHRJDJHTqIdpLWXkY7zVikeen6FhuGyn060Dz9gYq9t' - b'uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr' - b'qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A') + b"eyJhbGciOiJSUzM4NCIsInR5cCI6IkpXVCJ9" + b".eyJoZWxsbyI6IndvcmxkIn0" + b".yNQ3nI9vEDs7lEh-Cp81McPuiQ4ZRv6FL4evTYYAh1X" + b"lRTTR3Cz8pPA9Stgso8Ra9xGB4X3rlra1c8Jz10nTUju" + b"O06OMm7oXdrnxp1KIiAJDerWHkQ7l3dlizIk1bmMA457" + b"W2fNzNfHViuED5ISM081dgf_a71qBwJ_yShMMrSOfxDx" + b"mX9c4DjRogRJG8SM5PvpLqI_Cm9iQPGMvmYK7gzcq2cJ" + b"urHRJDJHTqIdpLWXkY7zVikeen6FhuGyn060Dz9gYq9t" + b"uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr" + b"qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A" + ) decoded_payload = jwt.decode(example_jwt, example_pubkey) assert decoded_payload == example_payload def test_decode_with_expiration(self, jwt, payload): - payload['exp'] = utc_timestamp() - 1 - secret = 'secret' + payload["exp"] = utc_timestamp() - 1 + secret = "secret" jwt_message = jwt.encode(payload, secret) with pytest.raises(ExpiredSignatureError): jwt.decode(jwt_message, secret) def test_decode_with_notbefore(self, jwt, payload): - payload['nbf'] = utc_timestamp() + 10 - secret = 'secret' + payload["nbf"] = utc_timestamp() + 10 + secret = "secret" jwt_message = jwt.encode(payload, secret) with pytest.raises(ImmatureSignatureError): jwt.decode(jwt_message, secret) def test_decode_skip_expiration_verification(self, jwt, payload): - payload['exp'] = time.time() - 1 - secret = 'secret' + payload["exp"] = time.time() - 1 + secret = "secret" jwt_message = jwt.encode(payload, secret) - jwt.decode(jwt_message, secret, options={'verify_exp': False}) + jwt.decode(jwt_message, secret, options={"verify_exp": False}) def test_decode_skip_notbefore_verification(self, jwt, payload): - payload['nbf'] = time.time() + 10 - secret = 'secret' + payload["nbf"] = time.time() + 10 + secret = "secret" jwt_message = jwt.encode(payload, secret) - jwt.decode(jwt_message, secret, options={'verify_nbf': False}) + jwt.decode(jwt_message, secret, options={"verify_nbf": False}) def test_decode_with_expiration_with_leeway(self, jwt, payload): - payload['exp'] = utc_timestamp() - 2 - secret = 'secret' + payload["exp"] = utc_timestamp() - 2 + secret = "secret" jwt_message = jwt.encode(payload, secret) decoded_payload, signing, header, signature = jwt._load(jwt_message) @@ -262,8 +289,8 @@ class TestJWT: jwt.decode(jwt_message, secret, leeway=leeway) def test_decode_with_notbefore_with_leeway(self, jwt, payload): - payload['nbf'] = utc_timestamp() + 10 - secret = 'secret' + payload["nbf"] = utc_timestamp() + 10 + secret = "secret" jwt_message = jwt.encode(payload, secret) # With 13 seconds leeway, should be ok @@ -273,248 +300,211 @@ class TestJWT: jwt.decode(jwt_message, secret, leeway=1) def test_check_audience_when_valid(self, jwt): - payload = { - 'some': 'payload', - 'aud': 'urn:me' - } - token = jwt.encode(payload, 'secret') - jwt.decode(token, 'secret', audience='urn:me') + payload = {"some": "payload", "aud": "urn:me"} + token = jwt.encode(payload, "secret") + jwt.decode(token, "secret", audience="urn:me") def test_check_audience_list_when_valid(self, jwt): - payload = { - 'some': 'payload', - 'aud': 'urn:me' - } - token = jwt.encode(payload, 'secret') - jwt.decode(token, 'secret', audience=['urn:you', 'urn:me']) + payload = {"some": "payload", "aud": "urn:me"} + token = jwt.encode(payload, "secret") + jwt.decode(token, "secret", audience=["urn:you", "urn:me"]) def test_check_audience_none_specified(self, jwt): - payload = { - 'some': 'payload', - 'aud': 'urn:me' - } - token = jwt.encode(payload, 'secret') + payload = {"some": "payload", "aud": "urn:me"} + token = jwt.encode(payload, "secret") with pytest.raises(InvalidAudienceError): - jwt.decode(token, 'secret') + jwt.decode(token, "secret") def test_raise_exception_invalid_audience_list(self, jwt): - payload = { - 'some': 'payload', - 'aud': 'urn:me' - } - token = jwt.encode(payload, 'secret') + payload = {"some": "payload", "aud": "urn:me"} + token = jwt.encode(payload, "secret") with pytest.raises(InvalidAudienceError): - jwt.decode(token, 'secret', audience=['urn:you', 'urn:him']) + jwt.decode(token, "secret", audience=["urn:you", "urn:him"]) def test_check_audience_in_array_when_valid(self, jwt): - payload = { - 'some': 'payload', - 'aud': ['urn:me', 'urn:someone-else'] - } - token = jwt.encode(payload, 'secret') - jwt.decode(token, 'secret', audience='urn:me') + payload = {"some": "payload", "aud": ["urn:me", "urn:someone-else"]} + token = jwt.encode(payload, "secret") + jwt.decode(token, "secret", audience="urn:me") def test_raise_exception_invalid_audience(self, jwt): - payload = { - 'some': 'payload', - 'aud': 'urn:someone-else' - } + payload = {"some": "payload", "aud": "urn:someone-else"} - token = jwt.encode(payload, 'secret') + token = jwt.encode(payload, "secret") with pytest.raises(InvalidAudienceError): - jwt.decode(token, 'secret', audience='urn-me') + jwt.decode(token, "secret", audience="urn-me") def test_raise_exception_invalid_audience_in_array(self, jwt): payload = { - 'some': 'payload', - 'aud': ['urn:someone', 'urn:someone-else'] + "some": "payload", + "aud": ["urn:someone", "urn:someone-else"], } - token = jwt.encode(payload, 'secret') + token = jwt.encode(payload, "secret") with pytest.raises(InvalidAudienceError): - jwt.decode(token, 'secret', audience='urn:me') + jwt.decode(token, "secret", audience="urn:me") def test_raise_exception_token_without_issuer(self, jwt): - issuer = 'urn:wrong' + issuer = "urn:wrong" - payload = { - 'some': 'payload' - } + payload = {"some": "payload"} - token = jwt.encode(payload, 'secret') + token = jwt.encode(payload, "secret") with pytest.raises(MissingRequiredClaimError) as exc: - jwt.decode(token, 'secret', issuer=issuer) + jwt.decode(token, "secret", issuer=issuer) - assert exc.value.claim == 'iss' + assert exc.value.claim == "iss" def test_raise_exception_token_without_audience(self, jwt): - payload = { - 'some': 'payload', - } - token = jwt.encode(payload, 'secret') + payload = {"some": "payload"} + token = jwt.encode(payload, "secret") with pytest.raises(MissingRequiredClaimError) as exc: - jwt.decode(token, 'secret', audience='urn:me') + jwt.decode(token, "secret", audience="urn:me") - assert exc.value.claim == 'aud' + assert exc.value.claim == "aud" def test_check_issuer_when_valid(self, jwt): - issuer = 'urn:foo' - payload = { - 'some': 'payload', - 'iss': 'urn:foo' - } - token = jwt.encode(payload, 'secret') - jwt.decode(token, 'secret', issuer=issuer) + issuer = "urn:foo" + payload = {"some": "payload", "iss": "urn:foo"} + token = jwt.encode(payload, "secret") + jwt.decode(token, "secret", issuer=issuer) def test_raise_exception_invalid_issuer(self, jwt): - issuer = 'urn:wrong' + issuer = "urn:wrong" - payload = { - 'some': 'payload', - 'iss': 'urn:foo' - } + payload = {"some": "payload", "iss": "urn:foo"} - token = jwt.encode(payload, 'secret') + token = jwt.encode(payload, "secret") with pytest.raises(InvalidIssuerError): - jwt.decode(token, 'secret', issuer=issuer) + jwt.decode(token, "secret", issuer=issuer) def test_skip_check_audience(self, jwt): - payload = { - 'some': 'payload', - 'aud': 'urn:me', - } - token = jwt.encode(payload, 'secret') - jwt.decode(token, 'secret', options={'verify_aud': False}) + payload = {"some": "payload", "aud": "urn:me"} + token = jwt.encode(payload, "secret") + jwt.decode(token, "secret", options={"verify_aud": False}) def test_skip_check_exp(self, jwt): payload = { - 'some': 'payload', - 'exp': datetime.utcnow() - timedelta(days=1) + "some": "payload", + "exp": datetime.utcnow() - timedelta(days=1), } - token = jwt.encode(payload, 'secret') - jwt.decode(token, 'secret', options={'verify_exp': False}) + token = jwt.encode(payload, "secret") + jwt.decode(token, "secret", options={"verify_exp": False}) - def test_decode_should_raise_error_if_exp_required_but_not_present(self, jwt): + def test_decode_should_raise_error_if_exp_required_but_not_present( + self, jwt + ): payload = { - 'some': 'payload', + "some": "payload", # exp not present } - token = jwt.encode(payload, 'secret') + token = jwt.encode(payload, "secret") with pytest.raises(MissingRequiredClaimError) as exc: - jwt.decode(token, 'secret', options={'require_exp': True}) + jwt.decode(token, "secret", options={"require_exp": True}) - assert exc.value.claim == 'exp' + assert exc.value.claim == "exp" - def test_decode_should_raise_error_if_iat_required_but_not_present(self, jwt): + def test_decode_should_raise_error_if_iat_required_but_not_present( + self, jwt + ): payload = { - 'some': 'payload', + "some": "payload", # iat not present } - token = jwt.encode(payload, 'secret') + token = jwt.encode(payload, "secret") with pytest.raises(MissingRequiredClaimError) as exc: - jwt.decode(token, 'secret', options={'require_iat': True}) + jwt.decode(token, "secret", options={"require_iat": True}) - assert exc.value.claim == 'iat' + assert exc.value.claim == "iat" - def test_decode_should_raise_error_if_nbf_required_but_not_present(self, jwt): + def test_decode_should_raise_error_if_nbf_required_but_not_present( + self, jwt + ): payload = { - 'some': 'payload', + "some": "payload", # nbf not present } - token = jwt.encode(payload, 'secret') + token = jwt.encode(payload, "secret") with pytest.raises(MissingRequiredClaimError) as exc: - jwt.decode(token, 'secret', options={'require_nbf': True}) + jwt.decode(token, "secret", options={"require_nbf": True}) - assert exc.value.claim == 'nbf' + assert exc.value.claim == "nbf" def test_skip_check_signature(self, jwt): - token = ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" - ".eyJzb21lIjoicGF5bG9hZCJ9" - ".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA") - jwt.decode(token, 'secret', options={'verify_signature': False}) + token = ( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" + ".eyJzb21lIjoicGF5bG9hZCJ9" + ".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA" + ) + jwt.decode(token, "secret", options={"verify_signature": False}) def test_skip_check_iat(self, jwt): payload = { - 'some': 'payload', - 'iat': datetime.utcnow() + timedelta(days=1) + "some": "payload", + "iat": datetime.utcnow() + timedelta(days=1), } - token = jwt.encode(payload, 'secret') - jwt.decode(token, 'secret', options={'verify_iat': False}) + token = jwt.encode(payload, "secret") + jwt.decode(token, "secret", options={"verify_iat": False}) def test_skip_check_nbf(self, jwt): payload = { - 'some': 'payload', - 'nbf': datetime.utcnow() + timedelta(days=1) + "some": "payload", + "nbf": datetime.utcnow() + timedelta(days=1), } - token = jwt.encode(payload, 'secret') - jwt.decode(token, 'secret', options={'verify_nbf': False}) + token = jwt.encode(payload, "secret") + jwt.decode(token, "secret", options={"verify_nbf": False}) def test_custom_json_encoder(self, jwt): - class CustomJSONEncoder(json.JSONEncoder): - def default(self, o): if isinstance(o, Decimal): - return 'it worked' + return "it worked" return super(CustomJSONEncoder, self).default(o) - data = { - 'some_decimal': Decimal('2.2') - } + data = {"some_decimal": Decimal("2.2")} with pytest.raises(TypeError): - jwt.encode(data, 'secret') + jwt.encode(data, "secret") - token = jwt.encode(data, 'secret', json_encoder=CustomJSONEncoder) - payload = jwt.decode(token, 'secret') + token = jwt.encode(data, "secret", json_encoder=CustomJSONEncoder) + payload = jwt.decode(token, "secret") - assert payload == {'some_decimal': 'it worked'} + assert payload == {"some_decimal": "it worked"} def test_decode_with_verify_expiration_kwarg(self, jwt, payload): - payload['exp'] = utc_timestamp() - 1 - secret = 'secret' + payload["exp"] = utc_timestamp() - 1 + secret = "secret" jwt_message = jwt.encode(payload, secret) pytest.deprecated_call( - jwt.decode, - jwt_message, - secret, - verify_expiration=False + jwt.decode, jwt_message, secret, verify_expiration=False ) with pytest.raises(ExpiredSignatureError): pytest.deprecated_call( - jwt.decode, - jwt_message, - secret, - verify_expiration=True + jwt.decode, jwt_message, secret, verify_expiration=True ) def test_decode_with_optional_algorithms(self, jwt, payload): - secret = 'secret' + secret = "secret" jwt_message = jwt.encode(payload, secret) - pytest.deprecated_call( - jwt.decode, - jwt_message, - secret - ) + pytest.deprecated_call(jwt.decode, jwt_message, secret) def test_decode_no_algorithms_verify_false(self, jwt, payload): - secret = 'secret' + secret = "secret" jwt_message = jwt.encode(payload, secret) try: pytest.deprecated_call( - jwt.decode, jwt_message, secret, verify=False, + jwt.decode, jwt_message, secret, verify=False ) except pytest.fail.Exception: pass diff --git a/tests/test_cli.py b/tests/test_cli.py index a89597d..6246c9a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,25 +1,23 @@ - import argparse import json import sys +import pytest + import jwt from jwt.__main__ import build_argparser, decode_payload, encode_payload, main -import pytest - class TestCli: - def test_build_argparse(self): - args = ['--key', '1234', 'encode', 'name=Vader'] + args = ["--key", "1234", "encode", "name=Vader"] parser = build_argparser() parsed_args = parser.parse_args(args) - assert parsed_args.key == '1234' + assert parsed_args.key == "1234" def test_encode_payload_raises_value_error_key_is_required(self): - encode_args = ['encode', 'name=Vader', 'job=Sith'] + encode_args = ["encode", "name=Vader", "job=Sith"] parser = build_argparser() args = parser.parse_args(encode_args) @@ -27,10 +25,10 @@ class TestCli: with pytest.raises(ValueError) as excinfo: encode_payload(args) - assert 'Key is required when encoding' in str(excinfo.value) + assert "Key is required when encoding" in str(excinfo.value) def test_decode_payload_raises_decoded_error(self): - decode_args = ['--key', '1234', 'decode', 'wrong-token'] + decode_args = ["--key", "1234", "decode", "wrong-token"] parser = build_argparser() args = parser.parse_args(decode_args) @@ -38,121 +36,119 @@ class TestCli: with pytest.raises(jwt.DecodeError) as excinfo: decode_payload(args) - assert 'There was an error decoding the token' in str(excinfo.value) + assert "There was an error decoding the token" in str(excinfo.value) def test_decode_payload_raises_decoded_error_isatty(self, monkeypatch): def patched_sys_stdin_read(): raise jwt.DecodeError() - decode_args = ['--key', '1234', 'decode', 'wrong-token'] + decode_args = ["--key", "1234", "decode", "wrong-token"] parser = build_argparser() args = parser.parse_args(decode_args) - monkeypatch.setattr(sys.stdin, 'isatty', lambda: True) - monkeypatch.setattr(sys.stdin, 'read', patched_sys_stdin_read) + monkeypatch.setattr(sys.stdin, "isatty", lambda: True) + monkeypatch.setattr(sys.stdin, "read", patched_sys_stdin_read) with pytest.raises(jwt.DecodeError) as excinfo: decode_payload(args) - assert 'There was an error decoding the token' in str(excinfo.value) + assert "There was an error decoding the token" in str(excinfo.value) def test_decode_payload_terminal_tty(self, monkeypatch): - encode_args = [ - '--key=secret-key', - 'encode', - 'name=hello-world', - ] + encode_args = ["--key=secret-key", "encode", "name=hello-world"] parser = build_argparser() parsed_encode_args = parser.parse_args(encode_args) token = encode_payload(parsed_encode_args) - decode_args = ['--key=secret-key', 'decode'] + decode_args = ["--key=secret-key", "decode"] parsed_decode_args = parser.parse_args(decode_args) - monkeypatch.setattr(sys.stdin, 'isatty', lambda: True) - monkeypatch.setattr(sys.stdin, 'readline', lambda: token) + monkeypatch.setattr(sys.stdin, "isatty", lambda: True) + monkeypatch.setattr(sys.stdin, "readline", lambda: token) actual = json.loads(decode_payload(parsed_decode_args)) - assert actual['name'] == 'hello-world' + assert actual["name"] == "hello-world" def test_decode_payload_raises_terminal_not_a_tty(self, monkeypatch): - decode_args = ['--key', '1234', 'decode'] + decode_args = ["--key", "1234", "decode"] parser = build_argparser() args = parser.parse_args(decode_args) - monkeypatch.setattr(sys.stdin, 'isatty', lambda: False) + monkeypatch.setattr(sys.stdin, "isatty", lambda: False) with pytest.raises(IOError) as excinfo: decode_payload(args) - assert 'Cannot read from stdin: terminal not a TTY' \ - in str(excinfo.value) - - @pytest.mark.parametrize('key,name,job,exp,verify', [ - ('1234', 'Vader', 'Sith', None, None), - ('4567', 'Anakin', 'Jedi', '+1', None), - ('4321', 'Padme', 'Queen', '4070926800', 'true'), - ]) + assert "Cannot read from stdin: terminal not a TTY" in str( + excinfo.value + ) + + @pytest.mark.parametrize( + "key,name,job,exp,verify", + [ + ("1234", "Vader", "Sith", None, None), + ("4567", "Anakin", "Jedi", "+1", None), + ("4321", "Padme", "Queen", "4070926800", "true"), + ], + ) def test_encode_decode(self, key, name, job, exp, verify): encode_args = [ - '--key={0}'.format(key), - 'encode', - 'name={0}'.format(name), - 'job={0}'.format(job), + "--key={0}".format(key), + "encode", + "name={0}".format(name), + "job={0}".format(job), ] if exp: - encode_args.append('exp={0}'.format(exp)) + encode_args.append("exp={0}".format(exp)) if verify: - encode_args.append('verify={0}'.format(verify)) + encode_args.append("verify={0}".format(verify)) parser = build_argparser() parsed_encode_args = parser.parse_args(encode_args) token = encode_payload(parsed_encode_args) assert token is not None - assert token != '' + assert token != "" - decode_args = [ - '--key={0}'.format(key), - 'decode', - token - ] + decode_args = ["--key={0}".format(key), "decode", token] parser = build_argparser() parsed_decode_args = parser.parse_args(decode_args) actual = json.loads(decode_payload(parsed_decode_args)) - expected = { - 'job': job, - 'name': name, - } - assert actual['name'] == expected['name'] - assert actual['job'] == expected['job'] - - @pytest.mark.parametrize('key,name,job,exp,verify', [ - ('1234', 'Vader', 'Sith', None, None), - ('4567', 'Anakin', 'Jedi', '+1', None), - ('4321', 'Padme', 'Queen', '4070926800', 'true'), - ]) + expected = {"job": job, "name": name} + assert actual["name"] == expected["name"] + assert actual["job"] == expected["job"] + + @pytest.mark.parametrize( + "key,name,job,exp,verify", + [ + ("1234", "Vader", "Sith", None, None), + ("4567", "Anakin", "Jedi", "+1", None), + ("4321", "Padme", "Queen", "4070926800", "true"), + ], + ) def test_main(self, monkeypatch, key, name, job, exp, verify): args = [ - 'test_cli.py', - '--key={0}'.format(key), - 'encode', - 'name={0}'.format(name), - 'job={0}'.format(job), + "test_cli.py", + "--key={0}".format(key), + "encode", + "name={0}".format(name), + "job={0}".format(job), ] if exp: - args.append('exp={0}'.format(exp)) + args.append("exp={0}".format(exp)) if verify: - args.append('verify={0}'.format(verify)) - monkeypatch.setattr(sys, 'argv', args) + args.append("verify={0}".format(verify)) + monkeypatch.setattr(sys, "argv", args) main() def test_main_throw_exception(self, monkeypatch, capsys): def patched_argparser_parse_args(self, args): - raise Exception('NOOOOOOOOOOO!') + raise Exception("NOOOOOOOOOOO!") - monkeypatch.setattr(argparse.ArgumentParser, 'parse_args', patched_argparser_parse_args) + monkeypatch.setattr( + argparse.ArgumentParser, "parse_args", patched_argparser_parse_args + ) main() out, _ = capsys.readouterr() - assert 'NOOOOOOOOOOO!' in out + assert "NOOOOOOOOOOO!" in out diff --git a/tests/test_compat.py b/tests/test_compat.py index 10beb94..7a7516c 100644 --- a/tests/test_compat.py +++ b/tests/test_compat.py @@ -4,16 +4,14 @@ from jwt.utils import force_bytes class TestCompat: def test_constant_time_compare_returns_true_if_same(self): - assert constant_time_compare( - force_bytes('abc'), force_bytes('abc') - ) + assert constant_time_compare(force_bytes("abc"), force_bytes("abc")) def test_constant_time_compare_returns_false_if_diff_lengths(self): assert not constant_time_compare( - force_bytes('abc'), force_bytes('abcd') + force_bytes("abc"), force_bytes("abcd") ) def test_constant_time_compare_returns_false_if_totally_different(self): assert not constant_time_compare( - force_bytes('abcd'), force_bytes('efgh') + force_bytes("abcd"), force_bytes("efgh") ) diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py index 9e7f91e..90030a7 100644 --- a/tests/test_exceptions.py +++ b/tests/test_exceptions.py @@ -2,6 +2,6 @@ from jwt.exceptions import MissingRequiredClaimError def test_missing_required_claim_error_has_proper_str(): - exc = MissingRequiredClaimError('abc') + exc = MissingRequiredClaimError("abc") assert str(exc) == 'Token is missing the "abc" claim' diff --git a/tests/test_jwt.py b/tests/test_jwt.py index 95ea5de..db96f46 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -10,13 +10,9 @@ def test_encode_decode(): tests. This is primarily a sanity check to make sure we don't break the public global functions. """ - payload = { - 'iss': 'jeff', - 'exp': utc_timestamp() + 15, - 'claim': 'insanity' - } + payload = {"iss": "jeff", "exp": utc_timestamp() + 15, "claim": "insanity"} - secret = 'secret' + secret = "secret" jwt_message = jwt.encode(payload, secret) decoded_payload = jwt.decode(jwt_message, secret) diff --git a/tests/test_utils.py b/tests/test_utils.py index 1408af2..cb73c52 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,30 +1,39 @@ +import pytest + from jwt.utils import ( - force_bytes, force_unicode, from_base64url_uint, to_base64url_uint + force_bytes, + force_unicode, + from_base64url_uint, + to_base64url_uint, ) -import pytest - -@pytest.mark.parametrize("inputval,expected", [ - (0, b'AA'), - (1, b'AQ'), - (255, b'_w'), - (65537, b'AQAB'), - (123456789, b'B1vNFQ'), - pytest.param(-1, '', marks=pytest.mark.xfail(raises=ValueError)) -]) +@pytest.mark.parametrize( + "inputval,expected", + [ + (0, b"AA"), + (1, b"AQ"), + (255, b"_w"), + (65537, b"AQAB"), + (123456789, b"B1vNFQ"), + pytest.param(-1, "", marks=pytest.mark.xfail(raises=ValueError)), + ], +) def test_to_base64url_uint(inputval, expected): actual = to_base64url_uint(inputval) assert actual == expected -@pytest.mark.parametrize("inputval,expected", [ - (b'AA', 0), - (b'AQ', 1), - (b'_w', 255), - (b'AQAB', 65537), - (b'B1vNFQ', 123456789, ), -]) +@pytest.mark.parametrize( + "inputval,expected", + [ + (b"AA", 0), + (b"AQ", 1), + (b"_w", 255), + (b"AQAB", 65537), + (b"B1vNFQ", 123456789), + ], +) def test_from_base64url_uint(inputval, expected): actual = from_base64url_uint(inputval) assert actual == expected diff --git a/tests/utils.py b/tests/utils.py index be189f2..ad39f75 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -9,25 +9,27 @@ def utc_timestamp(): def key_path(key_name): - return os.path.join(os.path.dirname(os.path.realpath(__file__)), - 'keys', key_name) + return os.path.join( + os.path.dirname(os.path.realpath(__file__)), "keys", key_name + ) # Borrowed from `cryptography` if hasattr(int, "from_bytes"): int_from_bytes = int.from_bytes else: + def int_from_bytes(data, byteorder, signed=False): - assert byteorder == 'big' + assert byteorder == "big" assert not signed if len(data) % 4 != 0: - data = (b'\x00' * (4 - (len(data) % 4))) + data + data = (b"\x00" * (4 - (len(data) % 4))) + data result = 0 while len(data) > 0: - digit, = struct.unpack('>I', data[:4]) + digit, = struct.unpack(">I", data[:4]) result = (result << 32) + digit data = data[4:] @@ -1,21 +1,22 @@ [tox] -envlist = py{27,34,35,36,37}-crypto, py{27,35,36,37}-contrib_crypto, py{27,35,36,37}-nocrypto, flake8, mypy +envlist = lint, typing, py{27,34,35,36,37}-crypto, py{27,35,36,37}-contrib_crypto, py{27,35,36,37}-nocrypto + [testenv] -commands = - pytest +extras = tests +commands = pytest deps = crypto: cryptography contrib_crypto: pycrypto contrib_crypto: ecdsa -extras = test -[testenv:flake8] -commands = - flake8 -extras = flake8 -[testenv:mypy] -commands = - mypy --ignore-missing-imports jwt -extras = mypy +[testenv:typing] +extras = dev +commands = mypy --ignore-missing-imports jwt + + +[testenv:lint] +extras = dev +passenv = HOMEPATH # needed on Windows +commands = pre-commit run --all-files |