summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosé Padilla <jpadilla@webapplicate.com>2019-10-21 22:38:34 -0400
committerGitHub <noreply@github.com>2019-10-21 22:38:34 -0400
commit11ac89474b1179925c76450fcc4b3d2042c45f19 (patch)
treedda6b15326cab750f5e52ee57e3125bb5d0a7eee
parentae080f472c913ad94456fd9e10b05ec2d038b7cc (diff)
downloadpyjwt-11ac89474b1179925c76450fcc4b3d2042c45f19.tar.gz
DX Tweaks (#450)
* Setup pre-commit hooks * Run initial `tox -e lint` * Fix package name * Fix .travis.yml
-rw-r--r--.coveragerc1
-rw-r--r--.flake86
-rw-r--r--.pre-commit-config.yaml31
-rw-r--r--.travis.yml10
-rw-r--r--AUTHORS2
-rw-r--r--docs/conf.py177
-rw-r--r--docs/installation.rst2
-rw-r--r--docs/requirements-docs.txt1
-rw-r--r--jwt/__init__.py36
-rw-r--r--jwt/__main__.py89
-rw-r--r--jwt/algorithms.py251
-rw-r--r--jwt/api_jws.py168
-rw-r--r--jwt/api_jwt.py198
-rw-r--r--jwt/compat.py12
-rw-r--r--jwt/contrib/algorithms/py_ecdsa.py23
-rw-r--r--jwt/contrib/algorithms/pycrypto.py5
-rw-r--r--jwt/exceptions.py1
-rw-r--r--jwt/help.py10
-rw-r--r--jwt/utils.py35
-rw-r--r--pyproject.toml14
-rw-r--r--pytest.ini2
-rw-r--r--setup.cfg11
-rwxr-xr-xsetup.py82
-rw-r--r--tests/compat.py4
-rw-r--r--tests/contrib/test_algorithms.py140
-rw-r--r--tests/keys/__init__.py27
-rw-r--r--tests/test_algorithms.py477
-rw-r--r--tests/test_api_jws.py613
-rw-r--r--tests/test_api_jwt.py472
-rw-r--r--tests/test_cli.py132
-rw-r--r--tests/test_compat.py8
-rw-r--r--tests/test_exceptions.py2
-rw-r--r--tests/test_jwt.py8
-rw-r--r--tests/test_utils.py45
-rw-r--r--tests/utils.py12
-rw-r--r--tox.ini25
36 files changed, 1737 insertions, 1395 deletions
diff --git a/.coveragerc b/.coveragerc
index 886e322..0ad0bd9 100644
--- a/.coveragerc
+++ b/.coveragerc
@@ -4,4 +4,3 @@ omit =
.tox/*
setup.py
*.egg/*
-
diff --git a/.flake8 b/.flake8
new file mode 100644
index 0000000..35a1558
--- /dev/null
+++ b/.flake8
@@ -0,0 +1,6 @@
+[flake8]
+ignore = E203, E266, E501, W503
+max-line-length = 80
+max-complexity = 18
+select = B,C,E,F,W,T4,B9
+exclude = docs/conf.py,.tox
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
new file mode 100644
index 0000000..2617e46
--- /dev/null
+++ b/.pre-commit-config.yaml
@@ -0,0 +1,31 @@
+repos:
+ - repo: https://github.com/psf/black
+ rev: 19.3b0
+ hooks:
+ - id: black
+ language_version: python3.7
+
+ - repo: https://gitlab.com/pycqa/flake8
+ rev: 3.7.8
+ hooks:
+ - id: flake8
+ language_version: python3.7
+
+ - repo: https://github.com/asottile/seed-isort-config
+ rev: v1.9.3
+ hooks:
+ - id: seed-isort-config
+
+ - repo: https://github.com/pre-commit/mirrors-isort
+ rev: v4.3.21
+ hooks:
+ - id: isort
+ additional_dependencies: [toml]
+ language_version: python3.7
+
+ - repo: https://github.com/pre-commit/pre-commit-hooks
+ rev: v2.3.0
+ hooks:
+ - id: trailing-whitespace
+ - id: end-of-file-fixer
+ - id: debug-statements
diff --git a/.travis.yml b/.travis.yml
index 3edf125..d0123bb 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -2,15 +2,15 @@ language: python
matrix:
include:
- python: 2.7
- env: TOXENV=flake8,py27-crypto,py27-nocrypto,py27-contrib_crypto
+ env: TOXENV=py27-crypto,py27-nocrypto,py27-contrib_crypto
- python: 3.4
- env: TOXENV=flake8,py34-crypto,py34-nocrypto
+ env: TOXENV=py34-crypto,py34-nocrypto
- python: 3.5
- env: TOXENV=flake8,mypy,py35-crypto,py35-nocrypto,py35-contrib_crypto
+ env: TOXENV=py35-crypto,py35-nocrypto,py35-contrib_crypto
- python: 3.6
- env: TOXENV=flake8,mypy,py36-crypto,py36-nocrypto,py36-contrib_crypto
+ env: TOXENV=py36-crypto,py36-nocrypto,py36-contrib_crypto
- python: 3.7
- env: TOXENV=flake8,mypy,py37-crypto,py37-nocrypto,py37-contrib_crypto
+ env: TOXENV=lint,typing,py37-crypto,py37-nocrypto,py37-contrib_crypto
dist: xenial
install:
- pip install -U pip
diff --git a/AUTHORS b/AUTHORS
index 90c7fa4..706dd14 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -25,5 +25,5 @@ Patches and Suggestions
- Michael Davis <mike.philip.davis@gmail.com> <mike.davis@workiva.com>
- Vinod Gupta <codervinod@gmail.com>
-
+
- Derek Weitzel <djw8605@gmail.com>
diff --git a/docs/conf.py b/docs/conf.py
index 57236d2..7c897dd 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -12,20 +12,24 @@
# All configuration values have a default; values that are commented out
# serve to show the default.
-import sys
import os
-import shlex
import re
+import shlex
+import sys
+
+# The theme to use for HTML and HTML Help pages. See the documentation for
+# a list of builtin themes.
+import sphinx_rtd_theme
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
-#sys.path.insert(0, os.path.abspath('.'))
+# sys.path.insert(0, os.path.abspath('.'))
# -- General configuration ------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
-#needs_sphinx = '1.0'
+# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
@@ -33,23 +37,23 @@ import re
extensions = []
# Add any paths that contain templates here, relative to this directory.
-templates_path = ['_templates']
+templates_path = ["_templates"]
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
# source_suffix = ['.rst', '.md']
-source_suffix = '.rst'
+source_suffix = ".rst"
# The encoding of source files.
-#source_encoding = 'utf-8-sig'
+# source_encoding = 'utf-8-sig'
# The master toctree document.
-master_doc = 'index'
+master_doc = "index"
# General information about the project.
-project = u'PyJWT'
-copyright = u'2015, José Padilla'
-author = u'José Padilla'
+project = u"PyJWT"
+copyright = u"2015, José Padilla"
+author = u"José Padilla"
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
@@ -60,11 +64,12 @@ def get_version(package):
"""
Return package version as listed in `__version__` in `init.py`.
"""
- with open(os.path.join('..', package, '__init__.py'), 'rb') as init_py:
- src = init_py.read().decode('utf-8')
+ with open(os.path.join("..", package, "__init__.py"), "rb") as init_py:
+ src = init_py.read().decode("utf-8")
return re.search("__version__ = ['\"]([^'\"]+)['\"]", src).group(1)
-version = get_version('jwt')
+
+version = get_version("jwt")
# The full version, including alpha/beta/rc tags.
release = version
@@ -77,37 +82,37 @@ language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
-#today = ''
+# today = ''
# Else, today_fmt is used as the format for a strftime call.
-#today_fmt = '%B %d, %Y'
+# today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
-exclude_patterns = ['_build']
+exclude_patterns = ["_build"]
# The reST default role (used for this markup: `text`) to use for all
# documents.
-#default_role = None
+# default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
-#add_function_parentheses = True
+# add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
-#add_module_names = True
+# add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
-#show_authors = False
+# show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
-pygments_style = 'sphinx'
+pygments_style = "sphinx"
# A list of ignored prefixes for module index sorting.
-#modindex_common_prefix = []
+# modindex_common_prefix = []
# If true, keep warnings as "system message" paragraphs in the built documents.
-#keep_warnings = False
+# keep_warnings = False
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = False
@@ -115,9 +120,6 @@ todo_include_todos = False
# -- Options for HTML output ----------------------------------------------
-# The theme to use for HTML and HTML Help pages. See the documentation for
-# a list of builtin themes.
-import sphinx_rtd_theme
html_theme = "sphinx_rtd_theme"
@@ -126,158 +128,157 @@ html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
-#html_theme_options = {}
+# html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
-#html_theme_path = []
+# html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
-#html_title = None
+# html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
-#html_short_title = None
+# html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
-#html_logo = None
+# html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
-#html_favicon = None
+# html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
-html_static_path = ['_static']
+html_static_path = ["_static"]
html_context = {
- 'extra_css_files': [
+ "extra_css_files": [
# override wide tables in RTD theme
- '_static/theme_overrides.css',
- ],
+ "_static/theme_overrides.css"
+ ]
}
# Add any extra paths that contain custom files (such as robots.txt or
# .htaccess) here, relative to this directory. These files are copied
# directly to the root of the documentation.
-#html_extra_path = []
+# html_extra_path = []
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
-#html_last_updated_fmt = '%b %d, %Y'
+# html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
-#html_use_smartypants = True
+# html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
-#html_sidebars = {}
+# html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
-#html_additional_pages = {}
+# html_additional_pages = {}
# If false, no module index is generated.
-#html_domain_indices = True
+# html_domain_indices = True
# If false, no index is generated.
-#html_use_index = True
+# html_use_index = True
# If true, the index is split into individual pages for each letter.
-#html_split_index = False
+# html_split_index = False
# If true, links to the reST sources are added to the pages.
-#html_show_sourcelink = True
+# html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
-#html_show_sphinx = True
+# html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
-#html_show_copyright = True
+# html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a <link> tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
-#html_use_opensearch = ''
+# html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
-#html_file_suffix = None
+# html_file_suffix = None
# Language to be used for generating the HTML full-text search index.
# Sphinx supports the following languages:
# 'da', 'de', 'en', 'es', 'fi', 'fr', 'hu', 'it', 'ja'
# 'nl', 'no', 'pt', 'ro', 'ru', 'sv', 'tr'
-#html_search_language = 'en'
+# html_search_language = 'en'
# A dictionary with options for the search language support, empty by default.
# Now only 'ja' uses this config value
-#html_search_options = {'type': 'default'}
+# html_search_options = {'type': 'default'}
# The name of a javascript file (relative to the configuration directory) that
# implements a search results scorer. If empty, the default will be used.
-#html_search_scorer = 'scorer.js'
+# html_search_scorer = 'scorer.js'
# Output file base name for HTML help builder.
-htmlhelp_basename = 'PyJWTdoc'
+htmlhelp_basename = "PyJWTdoc"
# -- Options for LaTeX output ---------------------------------------------
latex_elements = {
-# The paper size ('letterpaper' or 'a4paper').
-#'papersize': 'letterpaper',
-
-# The font size ('10pt', '11pt' or '12pt').
-#'pointsize': '10pt',
-
-# Additional stuff for the LaTeX preamble.
-#'preamble': '',
-
-# Latex figure (float) alignment
-#'figure_align': 'htbp',
+ # The paper size ('letterpaper' or 'a4paper').
+ #'papersize': 'letterpaper',
+ # The font size ('10pt', '11pt' or '12pt').
+ #'pointsize': '10pt',
+ # Additional stuff for the LaTeX preamble.
+ #'preamble': '',
+ # Latex figure (float) alignment
+ #'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
- (master_doc, 'PyJWT.tex', u'PyJWT Documentation',
- u'José Padilla', 'manual'),
+ (
+ master_doc,
+ "PyJWT.tex",
+ u"PyJWT Documentation",
+ u"José Padilla",
+ "manual",
+ )
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
-#latex_logo = None
+# latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
-#latex_use_parts = False
+# latex_use_parts = False
# If true, show page references after internal links.
-#latex_show_pagerefs = False
+# latex_show_pagerefs = False
# If true, show URL addresses after external links.
-#latex_show_urls = False
+# latex_show_urls = False
# Documents to append as an appendix to all manuals.
-#latex_appendices = []
+# latex_appendices = []
# If false, no module index is generated.
-#latex_domain_indices = True
+# latex_domain_indices = True
# -- Options for manual page output ---------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
-man_pages = [
- (master_doc, 'pyjwt', u'PyJWT Documentation',
- [author], 1)
-]
+man_pages = [(master_doc, "pyjwt", u"PyJWT Documentation", [author], 1)]
# If true, show URL addresses after external links.
-#man_show_urls = False
+# man_show_urls = False
# -- Options for Texinfo output -------------------------------------------
@@ -286,19 +287,25 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
- (master_doc, 'PyJWT', u'PyJWT Documentation',
- author, 'PyJWT', 'One line description of project.',
- 'Miscellaneous'),
+ (
+ master_doc,
+ "PyJWT",
+ u"PyJWT Documentation",
+ author,
+ "PyJWT",
+ "One line description of project.",
+ "Miscellaneous",
+ )
]
# Documents to append as an appendix to all manuals.
-#texinfo_appendices = []
+# texinfo_appendices = []
# If false, no module index is generated.
-#texinfo_domain_indices = True
+# texinfo_domain_indices = True
# How to display URL addresses: 'footnote', 'no', or 'inline'.
-#texinfo_show_urls = 'footnote'
+# texinfo_show_urls = 'footnote'
# If true, do not generate a @detailmenu in the "Top" node's menu.
-#texinfo_no_detailmenu = False
+# texinfo_no_detailmenu = False
diff --git a/docs/installation.rst b/docs/installation.rst
index 7e9c220..e423cfb 100644
--- a/docs/installation.rst
+++ b/docs/installation.rst
@@ -53,7 +53,7 @@ for RSA with SHA256 and EC with SHA256 signatures.
jwt.unregister_algorithm('RS256')
jwt.unregister_algorithm('ES256')
-
+
jwt.register_algorithm('RS256', RSAAlgorithm(RSAAlgorithm.SHA256))
jwt.register_algorithm('ES256', ECAlgorithm(ECAlgorithm.SHA256))
diff --git a/docs/requirements-docs.txt b/docs/requirements-docs.txt
index fedd390..8213302 100644
--- a/docs/requirements-docs.txt
+++ b/docs/requirements-docs.txt
@@ -1,3 +1,2 @@
sphinx
sphinx_rtd_theme
-
diff --git a/jwt/__init__.py b/jwt/__init__.py
index 077b450..e8e1f47 100644
--- a/jwt/__init__.py
+++ b/jwt/__init__.py
@@ -9,23 +9,35 @@ https://self-issued.info/docs/draft-jones-json-web-token-01.html
"""
-__title__ = 'pyjwt'
-__version__ = '1.7.1'
-__author__ = 'José Padilla'
-__license__ = 'MIT'
-__copyright__ = 'Copyright 2015-2018 José Padilla'
+__title__ = "pyjwt"
+__version__ = "1.7.1"
+__author__ = "José Padilla"
+__license__ = "MIT"
+__copyright__ = "Copyright 2015-2018 José Padilla"
+from .api_jws import PyJWS
from .api_jwt import (
- encode, decode, register_algorithm, unregister_algorithm,
- get_unverified_header, PyJWT
+ PyJWT,
+ decode,
+ encode,
+ get_unverified_header,
+ register_algorithm,
+ unregister_algorithm,
)
-from .api_jws import PyJWS
from .exceptions import (
- InvalidTokenError, DecodeError, InvalidAlgorithmError,
- InvalidAudienceError, ExpiredSignatureError, ImmatureSignatureError,
- InvalidIssuedAtError, InvalidIssuerError, ExpiredSignature,
- InvalidAudience, InvalidIssuer, MissingRequiredClaimError,
+ DecodeError,
+ ExpiredSignature,
+ ExpiredSignatureError,
+ ImmatureSignatureError,
+ InvalidAlgorithmError,
+ InvalidAudience,
+ InvalidAudienceError,
+ InvalidIssuedAtError,
+ InvalidIssuer,
+ InvalidIssuerError,
InvalidSignatureError,
+ InvalidTokenError,
+ MissingRequiredClaimError,
PyJWTError,
)
diff --git a/jwt/__main__.py b/jwt/__main__.py
index bf50aab..c20a41f 100644
--- a/jwt/__main__.py
+++ b/jwt/__main__.py
@@ -13,17 +13,19 @@ from . import DecodeError, __version__, decode, encode
def encode_payload(args):
# Try to encode
if args.key is None:
- raise ValueError('Key is required when encoding. See --help for usage.')
+ raise ValueError(
+ "Key is required when encoding. See --help for usage."
+ )
# Build payload object to encode
payload = {}
for arg in args.payload:
- k, v = arg.split('=', 1)
+ k, v = arg.split("=", 1)
# exp +offset special case?
- if k == 'exp' and v[0] == '+' and len(v) > 1:
- v = str(int(time.time()+int(v[1:])))
+ if k == "exp" and v[0] == "+" and len(v) > 1:
+ v = str(int(time.time() + int(v[1:])))
# Cast to integer?
if v.isdigit():
@@ -36,20 +38,16 @@ def encode_payload(args):
pass
# Cast to true, false, or null?
- constants = {'true': True, 'false': False, 'null': None}
+ constants = {"true": True, "false": False, "null": None}
if v in constants:
v = constants[v]
payload[k] = v
- token = encode(
- payload,
- key=args.key,
- algorithm=args.algorithm
- )
+ token = encode(payload, key=args.key, algorithm=args.algorithm)
- return token.decode('utf-8')
+ return token.decode("utf-8")
def decode_payload(args):
@@ -60,20 +58,20 @@ def decode_payload(args):
if sys.stdin.isatty():
token = sys.stdin.readline().strip()
else:
- raise IOError('Cannot read from stdin: terminal not a TTY')
+ raise IOError("Cannot read from stdin: terminal not a TTY")
- token = token.encode('utf-8')
+ token = token.encode("utf-8")
data = decode(token, key=args.key, verify=args.verify)
return json.dumps(data)
except DecodeError as e:
- raise DecodeError('There was an error decoding the token: %s' % e)
+ raise DecodeError("There was an error decoding the token: %s" % e)
def build_argparser():
- usage = '''
+ usage = """
Encodes or decodes JSON Web Tokens based on input.
%(prog)s [options] <command> [options] input
@@ -90,63 +88,62 @@ def build_argparser():
%(prog)s --key=secret encode foo=bar exp=+10
The exp key is special and can take an offset to current Unix time.
- '''
+ """
- arg_parser = argparse.ArgumentParser(
- prog='pyjwt',
- usage=usage
- )
+ arg_parser = argparse.ArgumentParser(prog="pyjwt", usage=usage)
arg_parser.add_argument(
- '-v', '--version',
- action='version',
- version='%(prog)s ' + __version__
+ "-v", "--version", action="version", version="%(prog)s " + __version__
)
arg_parser.add_argument(
- '--key',
- dest='key',
- metavar='KEY',
+ "--key",
+ dest="key",
+ metavar="KEY",
default=None,
- help='set the secret key to sign with'
+ help="set the secret key to sign with",
)
arg_parser.add_argument(
- '--alg',
- dest='algorithm',
- metavar='ALG',
- default='HS256',
- help='set crypto algorithm to sign with. default=HS256'
+ "--alg",
+ dest="algorithm",
+ metavar="ALG",
+ default="HS256",
+ help="set crypto algorithm to sign with. default=HS256",
)
subparsers = arg_parser.add_subparsers(
- title='PyJWT subcommands',
- description='valid subcommands',
- help='additional help'
+ title="PyJWT subcommands",
+ description="valid subcommands",
+ help="additional help",
)
# Encode subcommand
- encode_parser = subparsers.add_parser('encode', help='use to encode a supplied payload')
+ encode_parser = subparsers.add_parser(
+ "encode", help="use to encode a supplied payload"
+ )
payload_help = """Payload to encode. Must be a space separated list of key/value
pairs separated by equals (=) sign."""
- encode_parser.add_argument('payload', nargs='+', help=payload_help)
+ encode_parser.add_argument("payload", nargs="+", help=payload_help)
encode_parser.set_defaults(func=encode_payload)
# Decode subcommand
- decode_parser = subparsers.add_parser('decode', help='use to decode a supplied JSON web token')
+ decode_parser = subparsers.add_parser(
+ "decode", help="use to decode a supplied JSON web token"
+ )
decode_parser.add_argument(
- 'token',
- help='JSON web token to decode.',
- nargs='?')
+ "token", help="JSON web token to decode.", nargs="?"
+ )
decode_parser.add_argument(
- '-n', '--no-verify',
- action='store_false',
- dest='verify',
+ "-n",
+ "--no-verify",
+ action="store_false",
+ dest="verify",
default=True,
- help='ignore signature and claims verification on decode'
+ help="ignore signature and claims verification on decode",
)
decode_parser.set_defaults(func=decode_payload)
@@ -164,5 +161,5 @@ def main():
print(output)
except Exception as e:
- print('There was an unforseen error: ', e)
+ print("There was an unforseen error: ", e)
arg_parser.print_help()
diff --git a/jwt/algorithms.py b/jwt/algorithms.py
index 1343688..293a470 100644
--- a/jwt/algorithms.py
+++ b/jwt/algorithms.py
@@ -2,26 +2,39 @@ import hashlib
import hmac
import json
-
from .compat import constant_time_compare, string_types
from .exceptions import InvalidKeyError
from .utils import (
- base64url_decode, base64url_encode, der_to_raw_signature,
- force_bytes, force_unicode, from_base64url_uint, raw_to_der_signature,
- to_base64url_uint
+ base64url_decode,
+ base64url_encode,
+ der_to_raw_signature,
+ force_bytes,
+ force_unicode,
+ from_base64url_uint,
+ raw_to_der_signature,
+ to_base64url_uint,
)
try:
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import (
- load_pem_private_key, load_pem_public_key, load_ssh_public_key
+ load_pem_private_key,
+ load_pem_public_key,
+ load_ssh_public_key,
)
from cryptography.hazmat.primitives.asymmetric.rsa import (
- RSAPrivateKey, RSAPublicKey, RSAPrivateNumbers, RSAPublicNumbers,
- rsa_recover_prime_factors, rsa_crt_dmp1, rsa_crt_dmq1, rsa_crt_iqmp
+ RSAPrivateKey,
+ RSAPublicKey,
+ RSAPrivateNumbers,
+ RSAPublicNumbers,
+ rsa_recover_prime_factors,
+ rsa_crt_dmp1,
+ rsa_crt_dmq1,
+ rsa_crt_iqmp,
)
from cryptography.hazmat.primitives.asymmetric.ec import (
- EllipticCurvePrivateKey, EllipticCurvePublicKey
+ EllipticCurvePrivateKey,
+ EllipticCurvePublicKey,
)
from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.backends import default_backend
@@ -31,8 +44,20 @@ try:
except ImportError:
has_crypto = False
-requires_cryptography = set(['RS256', 'RS384', 'RS512', 'ES256', 'ES384',
- 'ES521', 'ES512', 'PS256', 'PS384', 'PS512'])
+requires_cryptography = set(
+ [
+ "RS256",
+ "RS384",
+ "RS512",
+ "ES256",
+ "ES384",
+ "ES521",
+ "ES512",
+ "PS256",
+ "PS384",
+ "PS512",
+ ]
+)
def get_default_algorithms():
@@ -40,25 +65,29 @@ def get_default_algorithms():
Returns the algorithms that are implemented by the library.
"""
default_algorithms = {
- 'none': NoneAlgorithm(),
- 'HS256': HMACAlgorithm(HMACAlgorithm.SHA256),
- 'HS384': HMACAlgorithm(HMACAlgorithm.SHA384),
- 'HS512': HMACAlgorithm(HMACAlgorithm.SHA512)
+ "none": NoneAlgorithm(),
+ "HS256": HMACAlgorithm(HMACAlgorithm.SHA256),
+ "HS384": HMACAlgorithm(HMACAlgorithm.SHA384),
+ "HS512": HMACAlgorithm(HMACAlgorithm.SHA512),
}
if has_crypto:
- default_algorithms.update({
- 'RS256': RSAAlgorithm(RSAAlgorithm.SHA256),
- 'RS384': RSAAlgorithm(RSAAlgorithm.SHA384),
- 'RS512': RSAAlgorithm(RSAAlgorithm.SHA512),
- 'ES256': ECAlgorithm(ECAlgorithm.SHA256),
- 'ES384': ECAlgorithm(ECAlgorithm.SHA384),
- 'ES521': ECAlgorithm(ECAlgorithm.SHA512),
- 'ES512': ECAlgorithm(ECAlgorithm.SHA512), # Backward compat for #219 fix
- 'PS256': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256),
- 'PS384': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384),
- 'PS512': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA512)
- })
+ default_algorithms.update(
+ {
+ "RS256": RSAAlgorithm(RSAAlgorithm.SHA256),
+ "RS384": RSAAlgorithm(RSAAlgorithm.SHA384),
+ "RS512": RSAAlgorithm(RSAAlgorithm.SHA512),
+ "ES256": ECAlgorithm(ECAlgorithm.SHA256),
+ "ES384": ECAlgorithm(ECAlgorithm.SHA384),
+ "ES521": ECAlgorithm(ECAlgorithm.SHA512),
+ "ES512": ECAlgorithm(
+ ECAlgorithm.SHA512
+ ), # Backward compat for #219 fix
+ "PS256": RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256),
+ "PS384": RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384),
+ "PS512": RSAPSSAlgorithm(RSAPSSAlgorithm.SHA512),
+ }
+ )
return default_algorithms
@@ -67,6 +96,7 @@ class Algorithm(object):
"""
The interface for an algorithm used to sign and verify tokens.
"""
+
def prepare_key(self, key):
"""
Performs necessary validation and conversions on the key and returns
@@ -108,8 +138,9 @@ class NoneAlgorithm(Algorithm):
Placeholder for use when no signing or verification
operations are required.
"""
+
def prepare_key(self, key):
- if key == '':
+ if key == "":
key = None
if key is not None:
@@ -118,7 +149,7 @@ class NoneAlgorithm(Algorithm):
return key
def sign(self, msg, key):
- return b''
+ return b""
def verify(self, msg, key, sig):
return False
@@ -129,6 +160,7 @@ class HMACAlgorithm(Algorithm):
Performs signing and verification operations using HMAC
and the specified hash function.
"""
+
SHA256 = hashlib.sha256
SHA384 = hashlib.sha384
SHA512 = hashlib.sha512
@@ -140,34 +172,37 @@ class HMACAlgorithm(Algorithm):
key = force_bytes(key)
invalid_strings = [
- b'-----BEGIN PUBLIC KEY-----',
- b'-----BEGIN CERTIFICATE-----',
- b'-----BEGIN RSA PUBLIC KEY-----',
- b'ssh-rsa'
+ b"-----BEGIN PUBLIC KEY-----",
+ b"-----BEGIN CERTIFICATE-----",
+ b"-----BEGIN RSA PUBLIC KEY-----",
+ b"ssh-rsa",
]
if any([string_value in key for string_value in invalid_strings]):
raise InvalidKeyError(
- 'The specified key is an asymmetric key or x509 certificate and'
- ' should not be used as an HMAC secret.')
+ "The specified key is an asymmetric key or x509 certificate and"
+ " should not be used as an HMAC secret."
+ )
return key
@staticmethod
def to_jwk(key_obj):
- return json.dumps({
- 'k': force_unicode(base64url_encode(force_bytes(key_obj))),
- 'kty': 'oct'
- })
+ return json.dumps(
+ {
+ "k": force_unicode(base64url_encode(force_bytes(key_obj))),
+ "kty": "oct",
+ }
+ )
@staticmethod
def from_jwk(jwk):
obj = json.loads(jwk)
- if obj.get('kty') != 'oct':
- raise InvalidKeyError('Not an HMAC key')
+ if obj.get("kty") != "oct":
+ raise InvalidKeyError("Not an HMAC key")
- return base64url_decode(obj['k'])
+ return base64url_decode(obj["k"])
def sign(self, msg, key):
return hmac.new(key, msg, self.hash_alg).digest()
@@ -176,13 +211,14 @@ class HMACAlgorithm(Algorithm):
return constant_time_compare(sig, self.sign(msg, key))
-if has_crypto:
+if has_crypto: # noqa: C901
class RSAAlgorithm(Algorithm):
"""
Performs signing and verification operations using
RSASSA-PKCS-v1_5 and the specified hash function.
"""
+
SHA256 = hashes.SHA256
SHA384 = hashes.SHA384
SHA512 = hashes.SHA512
@@ -191,22 +227,25 @@ if has_crypto:
self.hash_alg = hash_alg
def prepare_key(self, key):
- if isinstance(key, RSAPrivateKey) or \
- isinstance(key, RSAPublicKey):
+ if isinstance(key, RSAPrivateKey) or isinstance(key, RSAPublicKey):
return key
if isinstance(key, string_types):
key = force_bytes(key)
try:
- if key.startswith(b'ssh-rsa'):
- key = load_ssh_public_key(key, backend=default_backend())
+ if key.startswith(b"ssh-rsa"):
+ key = load_ssh_public_key(
+ key, backend=default_backend()
+ )
else:
- key = load_pem_private_key(key, password=None, backend=default_backend())
+ key = load_pem_private_key(
+ key, password=None, backend=default_backend()
+ )
except ValueError:
key = load_pem_public_key(key, backend=default_backend())
else:
- raise TypeError('Expecting a PEM-formatted key.')
+ raise TypeError("Expecting a PEM-formatted key.")
return key
@@ -214,35 +253,39 @@ if has_crypto:
def to_jwk(key_obj):
obj = None
- if getattr(key_obj, 'private_numbers', None):
+ if getattr(key_obj, "private_numbers", None):
# Private key
numbers = key_obj.private_numbers()
obj = {
- 'kty': 'RSA',
- 'key_ops': ['sign'],
- 'n': force_unicode(to_base64url_uint(numbers.public_numbers.n)),
- 'e': force_unicode(to_base64url_uint(numbers.public_numbers.e)),
- 'd': force_unicode(to_base64url_uint(numbers.d)),
- 'p': force_unicode(to_base64url_uint(numbers.p)),
- 'q': force_unicode(to_base64url_uint(numbers.q)),
- 'dp': force_unicode(to_base64url_uint(numbers.dmp1)),
- 'dq': force_unicode(to_base64url_uint(numbers.dmq1)),
- 'qi': force_unicode(to_base64url_uint(numbers.iqmp))
+ "kty": "RSA",
+ "key_ops": ["sign"],
+ "n": force_unicode(
+ to_base64url_uint(numbers.public_numbers.n)
+ ),
+ "e": force_unicode(
+ to_base64url_uint(numbers.public_numbers.e)
+ ),
+ "d": force_unicode(to_base64url_uint(numbers.d)),
+ "p": force_unicode(to_base64url_uint(numbers.p)),
+ "q": force_unicode(to_base64url_uint(numbers.q)),
+ "dp": force_unicode(to_base64url_uint(numbers.dmp1)),
+ "dq": force_unicode(to_base64url_uint(numbers.dmq1)),
+ "qi": force_unicode(to_base64url_uint(numbers.iqmp)),
}
- elif getattr(key_obj, 'verify', None):
+ elif getattr(key_obj, "verify", None):
# Public key
numbers = key_obj.public_numbers()
obj = {
- 'kty': 'RSA',
- 'key_ops': ['verify'],
- 'n': force_unicode(to_base64url_uint(numbers.n)),
- 'e': force_unicode(to_base64url_uint(numbers.e))
+ "kty": "RSA",
+ "key_ops": ["verify"],
+ "n": force_unicode(to_base64url_uint(numbers.n)),
+ "e": force_unicode(to_base64url_uint(numbers.e)),
}
else:
- raise InvalidKeyError('Not a public or private key')
+ raise InvalidKeyError("Not a public or private key")
return json.dumps(obj)
@@ -251,39 +294,44 @@ if has_crypto:
try:
obj = json.loads(jwk)
except ValueError:
- raise InvalidKeyError('Key is not valid JSON')
+ raise InvalidKeyError("Key is not valid JSON")
- if obj.get('kty') != 'RSA':
- raise InvalidKeyError('Not an RSA key')
+ if obj.get("kty") != "RSA":
+ raise InvalidKeyError("Not an RSA key")
- if 'd' in obj and 'e' in obj and 'n' in obj:
+ if "d" in obj and "e" in obj and "n" in obj:
# Private key
- if 'oth' in obj:
- raise InvalidKeyError('Unsupported RSA private key: > 2 primes not supported')
+ if "oth" in obj:
+ raise InvalidKeyError(
+ "Unsupported RSA private key: > 2 primes not supported"
+ )
- other_props = ['p', 'q', 'dp', 'dq', 'qi']
+ other_props = ["p", "q", "dp", "dq", "qi"]
props_found = [prop in obj for prop in other_props]
any_props_found = any(props_found)
if any_props_found and not all(props_found):
- raise InvalidKeyError('RSA key must include all parameters if any are present besides d')
+ raise InvalidKeyError(
+ "RSA key must include all parameters if any are present besides d"
+ )
public_numbers = RSAPublicNumbers(
- from_base64url_uint(obj['e']), from_base64url_uint(obj['n'])
+ from_base64url_uint(obj["e"]),
+ from_base64url_uint(obj["n"]),
)
if any_props_found:
numbers = RSAPrivateNumbers(
- d=from_base64url_uint(obj['d']),
- p=from_base64url_uint(obj['p']),
- q=from_base64url_uint(obj['q']),
- dmp1=from_base64url_uint(obj['dp']),
- dmq1=from_base64url_uint(obj['dq']),
- iqmp=from_base64url_uint(obj['qi']),
- public_numbers=public_numbers
+ d=from_base64url_uint(obj["d"]),
+ p=from_base64url_uint(obj["p"]),
+ q=from_base64url_uint(obj["q"]),
+ dmp1=from_base64url_uint(obj["dp"]),
+ dmq1=from_base64url_uint(obj["dq"]),
+ iqmp=from_base64url_uint(obj["qi"]),
+ public_numbers=public_numbers,
)
else:
- d = from_base64url_uint(obj['d'])
+ d = from_base64url_uint(obj["d"])
p, q = rsa_recover_prime_factors(
public_numbers.n, d, public_numbers.e
)
@@ -295,19 +343,20 @@ if has_crypto:
dmp1=rsa_crt_dmp1(d, p),
dmq1=rsa_crt_dmq1(d, q),
iqmp=rsa_crt_iqmp(p, q),
- public_numbers=public_numbers
+ public_numbers=public_numbers,
)
return numbers.private_key(default_backend())
- elif 'n' in obj and 'e' in obj:
+ elif "n" in obj and "e" in obj:
# Public key
numbers = RSAPublicNumbers(
- from_base64url_uint(obj['e']), from_base64url_uint(obj['n'])
+ from_base64url_uint(obj["e"]),
+ from_base64url_uint(obj["n"]),
)
return numbers.public_key(default_backend())
else:
- raise InvalidKeyError('Not a public or private key')
+ raise InvalidKeyError("Not a public or private key")
def sign(self, msg, key):
return key.sign(msg, padding.PKCS1v15(), self.hash_alg())
@@ -324,6 +373,7 @@ if has_crypto:
Performs signing and verification operations using
ECDSA and the specified hash function
"""
+
SHA256 = hashes.SHA256
SHA384 = hashes.SHA384
SHA512 = hashes.SHA512
@@ -332,8 +382,9 @@ if has_crypto:
self.hash_alg = hash_alg
def prepare_key(self, key):
- if isinstance(key, EllipticCurvePrivateKey) or \
- isinstance(key, EllipticCurvePublicKey):
+ if isinstance(key, EllipticCurvePrivateKey) or isinstance(
+ key, EllipticCurvePublicKey
+ ):
return key
if isinstance(key, string_types):
@@ -343,15 +394,21 @@ if has_crypto:
# a Signing Key or a Verifying Key, so we try
# the Verifying Key first.
try:
- if key.startswith(b'ecdsa-sha2-'):
- key = load_ssh_public_key(key, backend=default_backend())
+ if key.startswith(b"ecdsa-sha2-"):
+ key = load_ssh_public_key(
+ key, backend=default_backend()
+ )
else:
- key = load_pem_public_key(key, backend=default_backend())
+ key = load_pem_public_key(
+ key, backend=default_backend()
+ )
except ValueError:
- key = load_pem_private_key(key, password=None, backend=default_backend())
+ key = load_pem_private_key(
+ key, password=None, backend=default_backend()
+ )
else:
- raise TypeError('Expecting a PEM-formatted key.')
+ raise TypeError("Expecting a PEM-formatted key.")
return key
@@ -382,9 +439,9 @@ if has_crypto:
msg,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
- salt_length=self.hash_alg.digest_size
+ salt_length=self.hash_alg.digest_size,
),
- self.hash_alg()
+ self.hash_alg(),
)
def verify(self, msg, key, sig):
@@ -394,9 +451,9 @@ if has_crypto:
msg,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
- salt_length=self.hash_alg.digest_size
+ salt_length=self.hash_alg.digest_size,
),
- self.hash_alg()
+ self.hash_alg(),
)
return True
except InvalidSignature:
diff --git a/jwt/api_jws.py b/jwt/api_jws.py
index 19f58d0..9504c9f 100644
--- a/jwt/api_jws.py
+++ b/jwt/api_jws.py
@@ -1,30 +1,35 @@
import binascii
import json
import warnings
-try:
- # import required by mypy to perform type checking, not used for normal execution
- from typing import Callable, Dict, List, Optional, Type, Union # NOQA
-except ImportError:
- pass
-from .algorithms import (
- Algorithm, get_default_algorithms, has_crypto, requires_cryptography # NOQA
-)
+from .algorithms import requires_cryptography # NOQA
+from .algorithms import Algorithm, get_default_algorithms, has_crypto
from .compat import Mapping, binary_type, string_types, text_type
from .exceptions import (
- DecodeError, InvalidAlgorithmError, InvalidSignatureError,
- InvalidTokenError
+ DecodeError,
+ InvalidAlgorithmError,
+ InvalidSignatureError,
+ InvalidTokenError,
)
from .utils import base64url_decode, base64url_encode, force_bytes, merge_dict
+try:
+ # import required by mypy to perform type checking, not used for normal execution
+ from typing import Callable, Dict, List, Optional, Type, Union # NOQA
+except ImportError:
+ pass
+
class PyJWS(object):
- header_typ = 'JWT'
+ header_typ = "JWT"
def __init__(self, algorithms=None, options=None):
self._algorithms = get_default_algorithms()
- self._valid_algs = (set(algorithms) if algorithms is not None
- else set(self._algorithms))
+ self._valid_algs = (
+ set(algorithms)
+ if algorithms is not None
+ else set(self._algorithms)
+ )
# Remove algorithms that aren't on the whitelist
for key in list(self._algorithms.keys()):
@@ -38,19 +43,17 @@ class PyJWS(object):
@staticmethod
def _get_default_options():
- return {
- 'verify_signature': True
- }
+ return {"verify_signature": True}
def register_algorithm(self, alg_id, alg_obj):
"""
Registers a new Algorithm for use when creating and verifying tokens.
"""
if alg_id in self._algorithms:
- raise ValueError('Algorithm already has a handler.')
+ raise ValueError("Algorithm already has a handler.")
if not isinstance(alg_obj, Algorithm):
- raise TypeError('Object is not of type `Algorithm`')
+ raise TypeError("Object is not of type `Algorithm`")
self._algorithms[alg_id] = alg_obj
self._valid_algs.add(alg_id)
@@ -61,8 +64,10 @@ class PyJWS(object):
Throws KeyError if algorithm is not registered.
"""
if alg_id not in self._algorithms:
- raise KeyError('The specified algorithm could not be removed'
- ' because it is not registered.')
+ raise KeyError(
+ "The specified algorithm could not be removed"
+ " because it is not registered."
+ )
del self._algorithms[alg_id]
self._valid_algs.remove(alg_id)
@@ -73,41 +78,38 @@ class PyJWS(object):
"""
return list(self._valid_algs)
- def encode(self,
- payload, # type: Union[Dict, bytes]
- key, # type: str
- algorithm='HS256', # type: str
- headers=None, # type: Optional[Dict]
- json_encoder=None # type: Optional[Type[json.JSONEncoder]]
- ):
+ def encode(
+ self,
+ payload, # type: Union[Dict, bytes]
+ key, # type: str
+ algorithm="HS256", # type: str
+ headers=None, # type: Optional[Dict]
+ json_encoder=None, # type: Optional[Type[json.JSONEncoder]]
+ ):
segments = []
if algorithm is None:
- algorithm = 'none'
+ algorithm = "none"
if algorithm not in self._valid_algs:
pass
# Header
- header = {'typ': self.header_typ, 'alg': algorithm}
+ header = {"typ": self.header_typ, "alg": algorithm}
if headers:
self._validate_headers(headers)
header.update(headers)
json_header = force_bytes(
- json.dumps(
- header,
- separators=(',', ':'),
- cls=json_encoder
- )
+ json.dumps(header, separators=(",", ":"), cls=json_encoder)
)
segments.append(base64url_encode(json_header))
segments.append(base64url_encode(payload))
# Segments
- signing_input = b'.'.join(segments)
+ signing_input = b".".join(segments)
try:
alg_obj = self._algorithms[algorithm]
key = alg_obj.prepare_key(key)
@@ -120,40 +122,46 @@ class PyJWS(object):
"installed?" % algorithm
)
else:
- raise NotImplementedError('Algorithm not supported')
+ raise NotImplementedError("Algorithm not supported")
segments.append(base64url_encode(signature))
- return b'.'.join(segments)
+ return b".".join(segments)
- def decode(self,
- jwt, # type: str
- key='', # type: str
- verify=True, # type: bool
- algorithms=None, # type: List[str]
- options=None, # type: Dict
- **kwargs):
+ def decode(
+ self,
+ jwt, # type: str
+ key="", # type: str
+ verify=True, # type: bool
+ algorithms=None, # type: List[str]
+ options=None, # type: Dict
+ **kwargs
+ ):
merged_options = merge_dict(self.options, options)
- verify_signature = merged_options['verify_signature']
+ verify_signature = merged_options["verify_signature"]
if verify_signature and not algorithms:
warnings.warn(
- 'It is strongly recommended that you pass in a ' +
- 'value for the "algorithms" argument when calling decode(). ' +
- 'This argument will be mandatory in a future version.',
- DeprecationWarning
+ "It is strongly recommended that you pass in a "
+ + 'value for the "algorithms" argument when calling decode(). '
+ + "This argument will be mandatory in a future version.",
+ DeprecationWarning,
)
payload, signing_input, header, signature = self._load(jwt)
if not verify:
- warnings.warn('The verify parameter is deprecated. '
- 'Please use verify_signature in options instead.',
- DeprecationWarning, stacklevel=2)
+ warnings.warn(
+ "The verify parameter is deprecated. "
+ "Please use verify_signature in options instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
elif verify_signature:
- self._verify_signature(payload, signing_input, header, signature,
- key, algorithms)
+ self._verify_signature(
+ payload, signing_input, header, signature, key, algorithms
+ )
return payload
@@ -170,68 +178,78 @@ class PyJWS(object):
def _load(self, jwt):
if isinstance(jwt, text_type):
- jwt = jwt.encode('utf-8')
+ jwt = jwt.encode("utf-8")
if not issubclass(type(jwt), binary_type):
- raise DecodeError("Invalid token type. Token must be a {0}".format(
- binary_type))
+ raise DecodeError(
+ "Invalid token type. Token must be a {0}".format(binary_type)
+ )
try:
- signing_input, crypto_segment = jwt.rsplit(b'.', 1)
- header_segment, payload_segment = signing_input.split(b'.', 1)
+ signing_input, crypto_segment = jwt.rsplit(b".", 1)
+ header_segment, payload_segment = signing_input.split(b".", 1)
except ValueError:
- raise DecodeError('Not enough segments')
+ raise DecodeError("Not enough segments")
try:
header_data = base64url_decode(header_segment)
except (TypeError, binascii.Error):
- raise DecodeError('Invalid header padding')
+ raise DecodeError("Invalid header padding")
try:
- header = json.loads(header_data.decode('utf-8'))
+ header = json.loads(header_data.decode("utf-8"))
except ValueError as e:
- raise DecodeError('Invalid header string: %s' % e)
+ raise DecodeError("Invalid header string: %s" % e)
if not isinstance(header, Mapping):
- raise DecodeError('Invalid header string: must be a json object')
+ raise DecodeError("Invalid header string: must be a json object")
try:
payload = base64url_decode(payload_segment)
except (TypeError, binascii.Error):
- raise DecodeError('Invalid payload padding')
+ raise DecodeError("Invalid payload padding")
try:
signature = base64url_decode(crypto_segment)
except (TypeError, binascii.Error):
- raise DecodeError('Invalid crypto padding')
+ raise DecodeError("Invalid crypto padding")
return (payload, signing_input, header, signature)
- def _verify_signature(self, payload, signing_input, header, signature,
- key='', algorithms=None):
+ def _verify_signature(
+ self,
+ payload,
+ signing_input,
+ header,
+ signature,
+ key="",
+ algorithms=None,
+ ):
- alg = header.get('alg')
+ alg = header.get("alg")
if algorithms is not None and alg not in algorithms:
- raise InvalidAlgorithmError('The specified alg value is not allowed')
+ raise InvalidAlgorithmError(
+ "The specified alg value is not allowed"
+ )
try:
alg_obj = self._algorithms[alg]
key = alg_obj.prepare_key(key)
if not alg_obj.verify(signing_input, key, signature):
- raise InvalidSignatureError('Signature verification failed')
+ raise InvalidSignatureError("Signature verification failed")
except KeyError:
- raise InvalidAlgorithmError('Algorithm not supported')
+ raise InvalidAlgorithmError("Algorithm not supported")
def _validate_headers(self, headers):
- if 'kid' in headers:
- self._validate_kid(headers['kid'])
+ if "kid" in headers:
+ self._validate_kid(headers["kid"])
def _validate_kid(self, kid):
if not isinstance(kid, string_types):
- raise InvalidTokenError('Key ID header parameter must be a string')
+ raise InvalidTokenError("Key ID header parameter must be a string")
_jws_global_obj = PyJWS()
diff --git a/jwt/api_jwt.py b/jwt/api_jwt.py
index 3e280bc..22bfc6a 100644
--- a/jwt/api_jwt.py
+++ b/jwt/api_jwt.py
@@ -2,103 +2,113 @@ import json
import warnings
from calendar import timegm
from datetime import datetime, timedelta
-try:
- # import required by mypy to perform type checking, not used for normal execution
- from typing import Any, Callable, Dict, List, Optional, Type, Union # NOQA
-except ImportError:
- pass
-from .api_jws import PyJWS
from .algorithms import Algorithm, get_default_algorithms # NOQA
+from .api_jws import PyJWS
from .compat import Iterable, Mapping, string_types
from .exceptions import (
- DecodeError, ExpiredSignatureError, ImmatureSignatureError,
- InvalidAudienceError, InvalidIssuedAtError,
- InvalidIssuerError, MissingRequiredClaimError
+ DecodeError,
+ ExpiredSignatureError,
+ ImmatureSignatureError,
+ InvalidAudienceError,
+ InvalidIssuedAtError,
+ InvalidIssuerError,
+ MissingRequiredClaimError,
)
from .utils import merge_dict
+try:
+ # import required by mypy to perform type checking, not used for normal execution
+ from typing import Any, Callable, Dict, List, Optional, Type, Union # NOQA
+except ImportError:
+ pass
+
class PyJWT(PyJWS):
- header_type = 'JWT'
+ header_type = "JWT"
@staticmethod
def _get_default_options():
# type: () -> Dict[str, bool]
return {
- 'verify_signature': True,
- 'verify_exp': True,
- 'verify_nbf': True,
- 'verify_iat': True,
- 'verify_aud': True,
- 'verify_iss': True,
- 'require_exp': False,
- 'require_iat': False,
- 'require_nbf': False
+ "verify_signature": True,
+ "verify_exp": True,
+ "verify_nbf": True,
+ "verify_iat": True,
+ "verify_aud": True,
+ "verify_iss": True,
+ "require_exp": False,
+ "require_iat": False,
+ "require_nbf": False,
}
- def encode(self,
- payload, # type: Union[Dict, bytes]
- key, # type: str
- algorithm='HS256', # type: str
- headers=None, # type: Optional[Dict]
- json_encoder=None # type: Optional[Type[json.JSONEncoder]]
- ):
+ def encode(
+ self,
+ payload, # type: Union[Dict, bytes]
+ key, # type: str
+ algorithm="HS256", # type: str
+ headers=None, # type: Optional[Dict]
+ json_encoder=None, # type: Optional[Type[json.JSONEncoder]]
+ ):
# Check that we get a mapping
if not isinstance(payload, Mapping):
- raise TypeError('Expecting a mapping object, as JWT only supports '
- 'JSON objects as payloads.')
+ raise TypeError(
+ "Expecting a mapping object, as JWT only supports "
+ "JSON objects as payloads."
+ )
# Payload
- for time_claim in ['exp', 'iat', 'nbf']:
+ for time_claim in ["exp", "iat", "nbf"]:
# Convert datetime to a intDate value in known time-format claims
if isinstance(payload.get(time_claim), datetime):
- payload[time_claim] = timegm(payload[time_claim].utctimetuple()) # type: ignore
+ payload[time_claim] = timegm(
+ payload[time_claim].utctimetuple()
+ ) # type: ignore
json_payload = json.dumps(
- payload,
- separators=(',', ':'),
- cls=json_encoder
- ).encode('utf-8')
+ payload, separators=(",", ":"), cls=json_encoder
+ ).encode("utf-8")
return super(PyJWT, self).encode(
json_payload, key, algorithm, headers, json_encoder
)
- def decode(self,
- jwt, # type: str
- key='', # type: str
- verify=True, # type: bool
- algorithms=None, # type: List[str]
- options=None, # type: Dict
- **kwargs):
+ def decode(
+ self,
+ jwt, # type: str
+ key="", # type: str
+ verify=True, # type: bool
+ algorithms=None, # type: List[str]
+ options=None, # type: Dict
+ **kwargs
+ ):
# type: (...) -> Dict[str, Any]
if verify and not algorithms:
warnings.warn(
- 'It is strongly recommended that you pass in a ' +
- 'value for the "algorithms" argument when calling decode(). ' +
- 'This argument will be mandatory in a future version.',
- DeprecationWarning
+ "It is strongly recommended that you pass in a "
+ + 'value for the "algorithms" argument when calling decode(). '
+ + "This argument will be mandatory in a future version.",
+ DeprecationWarning,
)
payload, _, _, _ = self._load(jwt)
if options is None:
- options = {'verify_signature': verify}
+ options = {"verify_signature": verify}
else:
- options.setdefault('verify_signature', verify)
+ options.setdefault("verify_signature", verify)
decoded = super(PyJWT, self).decode(
jwt, key=key, algorithms=algorithms, options=options, **kwargs
)
try:
- payload = json.loads(decoded.decode('utf-8'))
+ payload = json.loads(decoded.decode("utf-8"))
except ValueError as e:
- raise DecodeError('Invalid payload string: %s' % e)
+ raise DecodeError("Invalid payload string: %s" % e)
if not isinstance(payload, dict):
- raise DecodeError('Invalid payload string: must be a json object')
+ raise DecodeError("Invalid payload string: must be a json object")
if verify:
merged_options = merge_dict(self.options, options)
@@ -106,113 +116,119 @@ class PyJWT(PyJWS):
return payload
- def _validate_claims(self, payload, options, audience=None, issuer=None,
- leeway=0, **kwargs):
+ def _validate_claims(
+ self, payload, options, audience=None, issuer=None, leeway=0, **kwargs
+ ):
- if 'verify_expiration' in kwargs:
- options['verify_exp'] = kwargs.get('verify_expiration', True)
- warnings.warn('The verify_expiration parameter is deprecated. '
- 'Please use verify_exp in options instead.',
- DeprecationWarning)
+ if "verify_expiration" in kwargs:
+ options["verify_exp"] = kwargs.get("verify_expiration", True)
+ warnings.warn(
+ "The verify_expiration parameter is deprecated. "
+ "Please use verify_exp in options instead.",
+ DeprecationWarning,
+ )
if isinstance(leeway, timedelta):
leeway = leeway.total_seconds()
if not isinstance(audience, (string_types, type(None), Iterable)):
- raise TypeError('audience must be a string, iterable, or None')
+ raise TypeError("audience must be a string, iterable, or None")
self._validate_required_claims(payload, options)
now = timegm(datetime.utcnow().utctimetuple())
- if 'iat' in payload and options.get('verify_iat'):
+ if "iat" in payload and options.get("verify_iat"):
self._validate_iat(payload, now, leeway)
- if 'nbf' in payload and options.get('verify_nbf'):
+ if "nbf" in payload and options.get("verify_nbf"):
self._validate_nbf(payload, now, leeway)
- if 'exp' in payload and options.get('verify_exp'):
+ if "exp" in payload and options.get("verify_exp"):
self._validate_exp(payload, now, leeway)
- if options.get('verify_iss'):
+ if options.get("verify_iss"):
self._validate_iss(payload, issuer)
- if options.get('verify_aud'):
+ if options.get("verify_aud"):
self._validate_aud(payload, audience)
def _validate_required_claims(self, payload, options):
- if options.get('require_exp') and payload.get('exp') is None:
- raise MissingRequiredClaimError('exp')
+ if options.get("require_exp") and payload.get("exp") is None:
+ raise MissingRequiredClaimError("exp")
- if options.get('require_iat') and payload.get('iat') is None:
- raise MissingRequiredClaimError('iat')
+ if options.get("require_iat") and payload.get("iat") is None:
+ raise MissingRequiredClaimError("iat")
- if options.get('require_nbf') and payload.get('nbf') is None:
- raise MissingRequiredClaimError('nbf')
+ if options.get("require_nbf") and payload.get("nbf") is None:
+ raise MissingRequiredClaimError("nbf")
def _validate_iat(self, payload, now, leeway):
try:
- int(payload['iat'])
+ int(payload["iat"])
except ValueError:
- raise InvalidIssuedAtError('Issued At claim (iat) must be an integer.')
+ raise InvalidIssuedAtError(
+ "Issued At claim (iat) must be an integer."
+ )
def _validate_nbf(self, payload, now, leeway):
try:
- nbf = int(payload['nbf'])
+ nbf = int(payload["nbf"])
except ValueError:
- raise DecodeError('Not Before claim (nbf) must be an integer.')
+ raise DecodeError("Not Before claim (nbf) must be an integer.")
if nbf > (now + leeway):
- raise ImmatureSignatureError('The token is not yet valid (nbf)')
+ raise ImmatureSignatureError("The token is not yet valid (nbf)")
def _validate_exp(self, payload, now, leeway):
try:
- exp = int(payload['exp'])
+ exp = int(payload["exp"])
except ValueError:
- raise DecodeError('Expiration Time claim (exp) must be an'
- ' integer.')
+ raise DecodeError(
+ "Expiration Time claim (exp) must be an" " integer."
+ )
if exp < (now - leeway):
- raise ExpiredSignatureError('Signature has expired')
+ raise ExpiredSignatureError("Signature has expired")
def _validate_aud(self, payload, audience):
- if audience is None and 'aud' not in payload:
+ if audience is None and "aud" not in payload:
return
- if audience is not None and 'aud' not in payload:
+ if audience is not None and "aud" not in payload:
# Application specified an audience, but it could not be
# verified since the token does not contain a claim.
- raise MissingRequiredClaimError('aud')
+ raise MissingRequiredClaimError("aud")
- if audience is None and 'aud' in payload:
+ if audience is None and "aud" in payload:
# Application did not specify an audience, but
# the token has the 'aud' claim
- raise InvalidAudienceError('Invalid audience')
+ raise InvalidAudienceError("Invalid audience")
- audience_claims = payload['aud']
+ audience_claims = payload["aud"]
if isinstance(audience_claims, string_types):
audience_claims = [audience_claims]
if not isinstance(audience_claims, list):
- raise InvalidAudienceError('Invalid claim format in token')
+ raise InvalidAudienceError("Invalid claim format in token")
if any(not isinstance(c, string_types) for c in audience_claims):
- raise InvalidAudienceError('Invalid claim format in token')
+ raise InvalidAudienceError("Invalid claim format in token")
if isinstance(audience, string_types):
audience = [audience]
if not any(aud in audience_claims for aud in audience):
- raise InvalidAudienceError('Invalid audience')
+ raise InvalidAudienceError("Invalid audience")
def _validate_iss(self, payload, issuer):
if issuer is None:
return
- if 'iss' not in payload:
- raise MissingRequiredClaimError('iss')
+ if "iss" not in payload:
+ raise MissingRequiredClaimError("iss")
- if payload['iss'] != issuer:
- raise InvalidIssuerError('Invalid issuer')
+ if payload["iss"] != issuer:
+ raise InvalidIssuerError("Invalid issuer")
_jwt_global_obj = PyJWT()
diff --git a/jwt/compat.py b/jwt/compat.py
index e79e258..918ff4a 100644
--- a/jwt/compat.py
+++ b/jwt/compat.py
@@ -7,7 +7,6 @@ import hmac
import struct
import sys
-
PY3 = sys.version_info[0] == 3
@@ -46,8 +45,10 @@ except AttributeError:
return result == 0
+
# Use int.to_bytes if it exists (Python 3)
-if getattr(int, 'to_bytes', None):
+if getattr(int, "to_bytes", None):
+
def bytes_from_int(val):
remaining = val
byte_length = 0
@@ -56,8 +57,11 @@ if getattr(int, 'to_bytes', None):
remaining = remaining >> 8
byte_length += 1
- return val.to_bytes(byte_length, 'big', signed=False)
+ return val.to_bytes(byte_length, "big", signed=False)
+
+
else:
+
def bytes_from_int(val):
buf = []
while val:
@@ -65,4 +69,4 @@ else:
buf.append(remainder)
buf.reverse()
- return struct.pack('%sB' % len(buf), *buf)
+ return struct.pack("%sB" % len(buf), *buf)
diff --git a/jwt/contrib/algorithms/py_ecdsa.py b/jwt/contrib/algorithms/py_ecdsa.py
index f1170a6..5b878f5 100644
--- a/jwt/contrib/algorithms/py_ecdsa.py
+++ b/jwt/contrib/algorithms/py_ecdsa.py
@@ -18,6 +18,7 @@ class ECAlgorithm(Algorithm):
This is based off of the implementation in PyJWT 0.3.2
"""
+
SHA256 = hashlib.sha256
SHA384 = hashlib.sha384
SHA512 = hashlib.sha512
@@ -27,13 +28,14 @@ class ECAlgorithm(Algorithm):
def prepare_key(self, key):
- if isinstance(key, ecdsa.SigningKey) or \
- isinstance(key, ecdsa.VerifyingKey):
+ if isinstance(key, ecdsa.SigningKey) or isinstance(
+ key, ecdsa.VerifyingKey
+ ):
return key
if isinstance(key, string_types):
if isinstance(key, text_type):
- key = key.encode('utf-8')
+ key = key.encode("utf-8")
# Attempt to load key. We don't know if it's
# a Signing Key or a Verifying Key, so we try
@@ -44,18 +46,23 @@ class ECAlgorithm(Algorithm):
key = ecdsa.SigningKey.from_pem(key)
else:
- raise TypeError('Expecting a PEM-formatted key.')
+ raise TypeError("Expecting a PEM-formatted key.")
return key
def sign(self, msg, key):
- return key.sign(msg, hashfunc=self.hash_alg,
- sigencode=ecdsa.util.sigencode_string)
+ return key.sign(
+ msg, hashfunc=self.hash_alg, sigencode=ecdsa.util.sigencode_string
+ )
def verify(self, msg, key, sig):
try:
- return key.verify(sig, msg, hashfunc=self.hash_alg,
- sigdecode=ecdsa.util.sigdecode_string)
+ return key.verify(
+ sig,
+ msg,
+ hashfunc=self.hash_alg,
+ sigdecode=ecdsa.util.sigdecode_string,
+ )
# ecdsa <= 0.13.2 raises AssertionError on too long signatures,
# ecdsa >= 0.13.3 raises BadSignatureError for verification errors.
except (AssertionError, ecdsa.BadSignatureError):
diff --git a/jwt/contrib/algorithms/pycrypto.py b/jwt/contrib/algorithms/pycrypto.py
index e49cdbf..d58e907 100644
--- a/jwt/contrib/algorithms/pycrypto.py
+++ b/jwt/contrib/algorithms/pycrypto.py
@@ -17,6 +17,7 @@ class RSAAlgorithm(Algorithm):
This is based off of the implementation in PyJWT 0.3.2
"""
+
SHA256 = Crypto.Hash.SHA256
SHA384 = Crypto.Hash.SHA384
SHA512 = Crypto.Hash.SHA512
@@ -31,11 +32,11 @@ class RSAAlgorithm(Algorithm):
if isinstance(key, string_types):
if isinstance(key, text_type):
- key = key.encode('utf-8')
+ key = key.encode("utf-8")
key = RSA.importKey(key)
else:
- raise TypeError('Expecting a PEM- or RSA-formatted key.')
+ raise TypeError("Expecting a PEM- or RSA-formatted key.")
return key
diff --git a/jwt/exceptions.py b/jwt/exceptions.py
index 2a6aa59..cd2ca2a 100644
--- a/jwt/exceptions.py
+++ b/jwt/exceptions.py
@@ -2,6 +2,7 @@ class PyJWTError(Exception):
"""
Base class for all exceptions
"""
+
pass
diff --git a/jwt/help.py b/jwt/help.py
index 573088d..0639cb6 100644
--- a/jwt/help.py
+++ b/jwt/help.py
@@ -23,7 +23,10 @@ def info():
Based on the requests package help utility module.
"""
try:
- platform_info = {"system": platform.system(), "release": platform.release()}
+ platform_info = {
+ "system": platform.system(),
+ "release": platform.release(),
+ }
except IOError:
platform_info = {"system": "Unknown", "release": "Unknown"}
@@ -46,7 +49,10 @@ def info():
return {
"platform": platform_info,
- "implementation": {"name": implementation, "version": implementation_version},
+ "implementation": {
+ "name": implementation,
+ "version": implementation_version,
+ },
"cryptography": {"version": getattr(cryptography, "__version__", "")},
"pyjwt": {"version": pyjwt_version},
}
diff --git a/jwt/utils.py b/jwt/utils.py
index b33c7a2..cc7f56c 100644
--- a/jwt/utils.py
+++ b/jwt/utils.py
@@ -6,7 +6,8 @@ from .compat import binary_type, bytes_from_int, text_type
try:
from cryptography.hazmat.primitives.asymmetric.utils import (
- decode_dss_signature, encode_dss_signature
+ decode_dss_signature,
+ encode_dss_signature,
)
except ImportError:
pass
@@ -14,58 +15,58 @@ except ImportError:
def force_unicode(value):
if isinstance(value, binary_type):
- return value.decode('utf-8')
+ return value.decode("utf-8")
elif isinstance(value, text_type):
return value
else:
- raise TypeError('Expected a string value')
+ raise TypeError("Expected a string value")
def force_bytes(value):
if isinstance(value, text_type):
- return value.encode('utf-8')
+ return value.encode("utf-8")
elif isinstance(value, binary_type):
return value
else:
- raise TypeError('Expected a string value')
+ raise TypeError("Expected a string value")
def base64url_decode(input):
if isinstance(input, text_type):
- input = input.encode('ascii')
+ input = input.encode("ascii")
rem = len(input) % 4
if rem > 0:
- input += b'=' * (4 - rem)
+ input += b"=" * (4 - rem)
return base64.urlsafe_b64decode(input)
def base64url_encode(input):
- return base64.urlsafe_b64encode(input).replace(b'=', b'')
+ return base64.urlsafe_b64encode(input).replace(b"=", b"")
def to_base64url_uint(val):
if val < 0:
- raise ValueError('Must be a positive integer')
+ raise ValueError("Must be a positive integer")
int_bytes = bytes_from_int(val)
if len(int_bytes) == 0:
- int_bytes = b'\x00'
+ int_bytes = b"\x00"
return base64url_encode(int_bytes)
def from_base64url_uint(val):
if isinstance(val, text_type):
- val = val.encode('ascii')
+ val = val.encode("ascii")
data = base64url_decode(val)
- buf = struct.unpack('%sB' % len(data), data)
- return int(''.join(["%02x" % byte for byte in buf]), 16)
+ buf = struct.unpack("%sB" % len(data), data)
+ return int("".join(["%02x" % byte for byte in buf]), 16)
def merge_dict(original, updates):
@@ -76,14 +77,14 @@ def merge_dict(original, updates):
merged_options = original.copy()
merged_options.update(updates)
except (AttributeError, ValueError) as e:
- raise TypeError('original and updates must be a dictionary: %s' % e)
+ raise TypeError("original and updates must be a dictionary: %s" % e)
return merged_options
def number_to_bytes(num, num_bytes):
- padded_hex = '%0*x' % (2 * num_bytes, num)
- big_endian = binascii.a2b_hex(padded_hex.encode('ascii'))
+ padded_hex = "%0*x" % (2 * num_bytes, num)
+ big_endian = binascii.a2b_hex(padded_hex.encode("ascii"))
return big_endian
@@ -105,7 +106,7 @@ def raw_to_der_signature(raw_sig, curve):
num_bytes = (num_bits + 7) // 8
if len(raw_sig) != 2 * num_bytes:
- raise ValueError('Invalid signature')
+ raise ValueError("Invalid signature")
r = bytes_to_number(raw_sig[:num_bytes])
s = bytes_to_number(raw_sig[num_bytes:])
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..82c7969
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,14 @@
+[tool.black]
+line-length = 79
+
+
+[tool.isort]
+atomic=true
+force_grid_wrap=0
+include_trailing_comma=true
+multi_line_output=3
+use_parentheses=true
+combine_as_imports=true
+
+known_first_party="jwt"
+known_third_party=["Crypto", "ecdsa", "pytest", "setuptools", "sphinx_rtd_theme"]
diff --git a/pytest.ini b/pytest.ini
new file mode 100644
index 0000000..fb1850e
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,2 @@
+[tool:pytest]
+addopts = --cov-report term-missing --cov-config=.coveragerc --cov .
diff --git a/setup.cfg b/setup.cfg
deleted file mode 100644
index 8823146..0000000
--- a/setup.cfg
+++ /dev/null
@@ -1,11 +0,0 @@
-[flake8]
-max-line-length = 119
-exclude =
- docs/,
- .tox/
-
-[bdist_wheel]
-universal = 1
-
-[tool:pytest]
-addopts = --cov-report term-missing --cov-config=.coveragerc --cov .
diff --git a/setup.py b/setup.py
index 1231235..a33b1b8 100755
--- a/setup.py
+++ b/setup.py
@@ -11,72 +11,64 @@ def get_version(package):
"""
Return package version as listed in `__version__` in `init.py`.
"""
- with open(os.path.join(package, '__init__.py'), 'rb') as init_py:
- src = init_py.read().decode('utf-8')
+ with open(os.path.join(package, "__init__.py"), "rb") as init_py:
+ src = init_py.read().decode("utf-8")
return re.search("__version__ = ['\"]([^'\"]+)['\"]", src).group(1)
-version = get_version('jwt')
+version = get_version("jwt")
-with open(os.path.join(os.path.dirname(__file__), 'README.rst')) as readme:
+with open(os.path.join(os.path.dirname(__file__), "README.rst")) as readme:
long_description = readme.read()
-if sys.argv[-1] == 'publish':
+if sys.argv[-1] == "publish":
if os.system("pip freeze | grep twine"):
print("twine not installed.\nUse `pip install twine`.\nExiting.")
sys.exit()
os.system("python setup.py sdist bdist_wheel")
os.system("twine upload dist/*")
- print('You probably want to also tag the version now:')
+ print("You probably want to also tag the version now:")
print(" git tag -a {0} -m 'version {0}'".format(version))
- print(' git push --tags')
+ print(" git push --tags")
sys.exit()
+EXTRAS_REQUIRE = {
+ "tests": ["pytest>=4.0.1,<5.0.0", "pytest-cov>=2.6.0,<3.0.0"],
+ "crypto": ["cryptography >= 1.4"],
+}
+
+EXTRAS_REQUIRE["dev"] = (
+ EXTRAS_REQUIRE["tests"] + EXTRAS_REQUIRE["crypto"] + ["mypy", "pre-commit"]
+)
+
setup(
- name='PyJWT',
+ name="PyJWT",
version=version,
- author='Jose Padilla',
- author_email='hello@jpadilla.com',
- description='JSON Web Token implementation in Python',
- license='MIT',
- keywords='jwt json web token security signing',
- url='https://github.com/jpadilla/pyjwt',
+ author="Jose Padilla",
+ author_email="hello@jpadilla.com",
+ description="JSON Web Token implementation in Python",
+ license="MIT",
+ keywords="jwt json web token security signing",
+ url="https://github.com/jpadilla/pyjwt",
packages=find_packages(
exclude=["*.tests", "*.tests.*", "tests.*", "tests"]
),
long_description=long_description,
classifiers=[
- 'Development Status :: 5 - Production/Stable',
- 'Intended Audience :: Developers',
- 'Natural Language :: English',
- 'License :: OSI Approved :: MIT License',
- 'Programming Language :: Python',
- 'Programming Language :: Python :: 2.7',
- 'Programming Language :: Python :: 3.4',
- 'Programming Language :: Python :: 3.5',
- 'Programming Language :: Python :: 3.6',
- 'Programming Language :: Python :: 3.7',
- 'Topic :: Utilities',
+ "Development Status :: 5 - Production/Stable",
+ "Intended Audience :: Developers",
+ "Natural Language :: English",
+ "License :: OSI Approved :: MIT License",
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 2.7",
+ "Programming Language :: Python :: 3.4",
+ "Programming Language :: Python :: 3.5",
+ "Programming Language :: Python :: 3.6",
+ "Programming Language :: Python :: 3.7",
+ "Topic :: Utilities",
],
python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*",
- extras_require=dict(
- test=[
- 'pytest>=4.0.1,<5.0.0',
- 'pytest-cov>=2.6.0,<3.0.0',
- ],
- crypto=['cryptography >= 1.4'],
- flake8=[
- 'flake8',
- 'flake8-import-order',
- 'pep8-naming'
- ],
- mypy=[
- 'mypy'
- ]
- ),
- entry_points={
- 'console_scripts': [
- 'pyjwt = jwt.__main__:main'
- ]
- }
+ extras_require=EXTRAS_REQUIRE,
+ entry_points={"console_scripts": ["pyjwt = jwt.__main__:main"]},
+ options={"bdist_wheel": {"universal": "1"}},
)
diff --git a/tests/compat.py b/tests/compat.py
index 12d6ec0..6d9dec9 100644
--- a/tests/compat.py
+++ b/tests/compat.py
@@ -5,8 +5,8 @@ import sys
PY3 = sys.version_info[0] == 3
if PY3:
- string_types = str,
+ string_types = (str,)
text_type = str
else:
- string_types = basestring,
+ string_types = (basestring,)
text_type = unicode
diff --git a/tests/contrib/test_algorithms.py b/tests/contrib/test_algorithms.py
index 6d5ca75..4a1550b 100644
--- a/tests/contrib/test_algorithms.py
+++ b/tests/contrib/test_algorithms.py
@@ -1,36 +1,40 @@
import base64
-from jwt.utils import force_bytes, force_unicode
-
import pytest
+from jwt.utils import force_bytes, force_unicode
+
from ..utils import key_path
try:
from jwt.contrib.algorithms.pycrypto import RSAAlgorithm
+
has_pycrypto = True
except ImportError:
has_pycrypto = False
try:
from jwt.contrib.algorithms.py_ecdsa import ECAlgorithm
+
has_ecdsa = True
except ImportError:
has_ecdsa = False
-@pytest.mark.skipif(not has_pycrypto, reason='Not supported without PyCrypto library')
+@pytest.mark.skipif(
+ not has_pycrypto, reason="Not supported without PyCrypto library"
+)
class TestPycryptoAlgorithms:
def test_rsa_should_parse_pem_public_key(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('testkey2_rsa.pub.pem'), 'r') as pem_key:
+ with open(key_path("testkey2_rsa.pub.pem"), "r") as pem_key:
algo.prepare_key(pem_key.read())
def test_rsa_should_accept_unicode_key(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('testkey_rsa'), 'r') as rsa_key:
+ with open(key_path("testkey_rsa"), "r") as rsa_key:
algo.prepare_key(force_unicode(rsa_key.read()))
def test_rsa_should_reject_non_string_key(self):
@@ -42,20 +46,23 @@ class TestPycryptoAlgorithms:
def test_rsa_sign_should_generate_correct_signature_value(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- jwt_message = force_bytes('Hello World!')
+ jwt_message = force_bytes("Hello World!")
- expected_sig = base64.b64decode(force_bytes(
- 'yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp'
- '10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl'
- '2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix'
- 'sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX'
- 'fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA'
- 'APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA=='))
+ expected_sig = base64.b64decode(
+ force_bytes(
+ "yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp"
+ "10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl"
+ "2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix"
+ "sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX"
+ "fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA"
+ "APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA=="
+ )
+ )
- with open(key_path('testkey_rsa'), 'r') as keyfile:
+ with open(key_path("testkey_rsa"), "r") as keyfile:
jwt_key = algo.prepare_key(keyfile.read())
- with open(key_path('testkey_rsa.pub'), 'r') as keyfile:
+ with open(key_path("testkey_rsa.pub"), "r") as keyfile:
jwt_pub_key = algo.prepare_key(keyfile.read())
algo.sign(jwt_message, jwt_key)
@@ -65,19 +72,22 @@ class TestPycryptoAlgorithms:
def test_rsa_verify_should_return_false_if_signature_invalid(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- jwt_message = force_bytes('Hello World!')
+ jwt_message = force_bytes("Hello World!")
- jwt_sig = base64.b64decode(force_bytes(
- 'yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp'
- '10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl'
- '2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix'
- 'sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX'
- 'fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA'
- 'APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA=='))
+ jwt_sig = base64.b64decode(
+ force_bytes(
+ "yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp"
+ "10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl"
+ "2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix"
+ "sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX"
+ "fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA"
+ "APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA=="
+ )
+ )
- jwt_sig += force_bytes('123') # Signature is now invalid
+ jwt_sig += force_bytes("123") # Signature is now invalid
- with open(key_path('testkey_rsa.pub'), 'r') as keyfile:
+ with open(key_path("testkey_rsa.pub"), "r") as keyfile:
jwt_pub_key = algo.prepare_key(keyfile.read())
result = algo.verify(jwt_message, jwt_pub_key, jwt_sig)
@@ -86,17 +96,20 @@ class TestPycryptoAlgorithms:
def test_rsa_verify_should_return_true_if_signature_valid(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- jwt_message = force_bytes('Hello World!')
+ jwt_message = force_bytes("Hello World!")
- jwt_sig = base64.b64decode(force_bytes(
- 'yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp'
- '10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl'
- '2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix'
- 'sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX'
- 'fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA'
- 'APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA=='))
+ jwt_sig = base64.b64decode(
+ force_bytes(
+ "yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp"
+ "10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl"
+ "2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix"
+ "sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX"
+ "fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA"
+ "APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA=="
+ )
+ )
- with open(key_path('testkey_rsa.pub'), 'r') as keyfile:
+ with open(key_path("testkey_rsa.pub"), "r") as keyfile:
jwt_pub_key = algo.prepare_key(keyfile.read())
result = algo.verify(jwt_message, jwt_pub_key, jwt_sig)
@@ -105,14 +118,16 @@ class TestPycryptoAlgorithms:
def test_rsa_prepare_key_should_be_idempotent(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('testkey_rsa.pub'), 'r') as keyfile:
+ with open(key_path("testkey_rsa.pub"), "r") as keyfile:
jwt_pub_key_first = algo.prepare_key(keyfile.read())
jwt_pub_key_second = algo.prepare_key(jwt_pub_key_first)
assert jwt_pub_key_first == jwt_pub_key_second
-@pytest.mark.skipif(not has_ecdsa, reason='Not supported without ecdsa library')
+@pytest.mark.skipif(
+ not has_ecdsa, reason="Not supported without ecdsa library"
+)
class TestEcdsaAlgorithms:
def test_ec_should_reject_non_string_key(self):
algo = ECAlgorithm(ECAlgorithm.SHA256)
@@ -123,23 +138,26 @@ class TestEcdsaAlgorithms:
def test_ec_should_accept_unicode_key(self):
algo = ECAlgorithm(ECAlgorithm.SHA256)
- with open(key_path('testkey_ec'), 'r') as ec_key:
+ with open(key_path("testkey_ec"), "r") as ec_key:
algo.prepare_key(force_unicode(ec_key.read()))
def test_ec_sign_should_generate_correct_signature_value(self):
algo = ECAlgorithm(ECAlgorithm.SHA256)
- jwt_message = force_bytes('Hello World!')
+ jwt_message = force_bytes("Hello World!")
- expected_sig = base64.b64decode(force_bytes(
- 'AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M'
- 'mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw'
- 'LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65'))
+ expected_sig = base64.b64decode(
+ force_bytes(
+ "AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M"
+ "mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw"
+ "LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65"
+ )
+ )
- with open(key_path('testkey_ec'), 'r') as keyfile:
+ with open(key_path("testkey_ec"), "r") as keyfile:
jwt_key = algo.prepare_key(keyfile.read())
- with open(key_path('testkey_ec.pub'), 'r') as keyfile:
+ with open(key_path("testkey_ec.pub"), "r") as keyfile:
jwt_pub_key = algo.prepare_key(keyfile.read())
algo.sign(jwt_message, jwt_key)
@@ -149,16 +167,19 @@ class TestEcdsaAlgorithms:
def test_ec_verify_should_return_false_if_signature_invalid(self):
algo = ECAlgorithm(ECAlgorithm.SHA256)
- jwt_message = force_bytes('Hello World!')
+ jwt_message = force_bytes("Hello World!")
- jwt_sig = base64.b64decode(force_bytes(
- 'AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M'
- 'mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw'
- 'LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65'))
+ jwt_sig = base64.b64decode(
+ force_bytes(
+ "AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M"
+ "mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw"
+ "LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65"
+ )
+ )
- jwt_sig += force_bytes('123') # Signature is now invalid
+ jwt_sig += force_bytes("123") # Signature is now invalid
- with open(key_path('testkey_ec.pub'), 'r') as keyfile:
+ with open(key_path("testkey_ec.pub"), "r") as keyfile:
jwt_pub_key = algo.prepare_key(keyfile.read())
result = algo.verify(jwt_message, jwt_pub_key, jwt_sig)
@@ -167,14 +188,17 @@ class TestEcdsaAlgorithms:
def test_ec_verify_should_return_true_if_signature_valid(self):
algo = ECAlgorithm(ECAlgorithm.SHA256)
- jwt_message = force_bytes('Hello World!')
+ jwt_message = force_bytes("Hello World!")
- jwt_sig = base64.b64decode(force_bytes(
- 'AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M'
- 'mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw'
- 'LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65'))
+ jwt_sig = base64.b64decode(
+ force_bytes(
+ "AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M"
+ "mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw"
+ "LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65"
+ )
+ )
- with open(key_path('testkey_ec.pub'), 'r') as keyfile:
+ with open(key_path("testkey_ec.pub"), "r") as keyfile:
jwt_pub_key = algo.prepare_key(keyfile.read())
result = algo.verify(jwt_message, jwt_pub_key, jwt_sig)
@@ -183,7 +207,7 @@ class TestEcdsaAlgorithms:
def test_ec_prepare_key_should_be_idempotent(self):
algo = ECAlgorithm(ECAlgorithm.SHA256)
- with open(key_path('testkey_ec.pub'), 'r') as keyfile:
+ with open(key_path("testkey_ec.pub"), "r") as keyfile:
jwt_pub_key_first = algo.prepare_key(keyfile.read())
jwt_pub_key_second = algo.prepare_key(jwt_pub_key_first)
diff --git a/tests/keys/__init__.py b/tests/keys/__init__.py
index 3727378..6b61caa 100644
--- a/tests/keys/__init__.py
+++ b/tests/keys/__init__.py
@@ -2,7 +2,6 @@ import json
import os
from jwt.utils import base64url_decode, force_bytes
-
from tests.utils import int_from_bytes
BASE_PATH = os.path.dirname(os.path.abspath(__file__))
@@ -10,48 +9,50 @@ BASE_PATH = os.path.dirname(os.path.abspath(__file__))
def decode_value(val):
decoded = base64url_decode(force_bytes(val))
- return int_from_bytes(decoded, 'big')
+ return int_from_bytes(decoded, "big")
def load_hmac_key():
- with open(os.path.join(BASE_PATH, 'jwk_hmac.json'), 'r') as infile:
+ with open(os.path.join(BASE_PATH, "jwk_hmac.json"), "r") as infile:
keyobj = json.load(infile)
- return base64url_decode(force_bytes(keyobj['k']))
+ return base64url_decode(force_bytes(keyobj["k"]))
try:
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
from jwt.algorithms import RSAAlgorithm
+
has_crypto = True
except ImportError:
has_crypto = False
if has_crypto:
+
def load_rsa_key():
- with open(os.path.join(BASE_PATH, 'jwk_rsa_key.json'), 'r') as infile:
+ with open(os.path.join(BASE_PATH, "jwk_rsa_key.json"), "r") as infile:
return RSAAlgorithm.from_jwk(infile.read())
def load_rsa_pub_key():
- with open(os.path.join(BASE_PATH, 'jwk_rsa_pub.json'), 'r') as infile:
+ with open(os.path.join(BASE_PATH, "jwk_rsa_pub.json"), "r") as infile:
return RSAAlgorithm.from_jwk(infile.read())
def load_ec_key():
- with open(os.path.join(BASE_PATH, 'jwk_ec_key.json'), 'r') as infile:
+ with open(os.path.join(BASE_PATH, "jwk_ec_key.json"), "r") as infile:
keyobj = json.load(infile)
return ec.EllipticCurvePrivateNumbers(
- private_value=decode_value(keyobj['d']),
- public_numbers=load_ec_pub_key().public_numbers()
+ private_value=decode_value(keyobj["d"]),
+ public_numbers=load_ec_pub_key().public_numbers(),
)
def load_ec_pub_key():
- with open(os.path.join(BASE_PATH, 'jwk_ec_pub.json'), 'r') as infile:
+ with open(os.path.join(BASE_PATH, "jwk_ec_pub.json"), "r") as infile:
keyobj = json.load(infile)
return ec.EllipticCurvePublicNumbers(
- x=decode_value(keyobj['x']),
- y=decode_value(keyobj['y']),
- curve=ec.SECP521R1()
+ x=decode_value(keyobj["x"]),
+ y=decode_value(keyobj["y"]),
+ curve=ec.SECP521R1(),
).public_key(default_backend())
diff --git a/tests/test_algorithms.py b/tests/test_algorithms.py
index 317bf3a..79d1b8e 100644
--- a/tests/test_algorithms.py
+++ b/tests/test_algorithms.py
@@ -1,18 +1,19 @@
import base64
import json
+import pytest
+
from jwt.algorithms import Algorithm, HMACAlgorithm, NoneAlgorithm
from jwt.exceptions import InvalidKeyError
from jwt.utils import base64url_decode, force_bytes, force_unicode
-import pytest
-
from .keys import load_hmac_key
from .utils import key_path
try:
from jwt.algorithms import RSAAlgorithm, ECAlgorithm, RSAPSSAlgorithm
from .keys import load_rsa_pub_key, load_ec_pub_key
+
has_crypto = True
except ImportError:
has_crypto = False
@@ -23,37 +24,37 @@ class TestAlgorithms:
algo = Algorithm()
with pytest.raises(NotImplementedError):
- algo.prepare_key('test')
+ algo.prepare_key("test")
def test_algorithm_should_throw_exception_if_sign_not_impl(self):
algo = Algorithm()
with pytest.raises(NotImplementedError):
- algo.sign('message', 'key')
+ algo.sign("message", "key")
def test_algorithm_should_throw_exception_if_verify_not_impl(self):
algo = Algorithm()
with pytest.raises(NotImplementedError):
- algo.verify('message', 'key', 'signature')
+ algo.verify("message", "key", "signature")
def test_algorithm_should_throw_exception_if_to_jwk_not_impl(self):
algo = Algorithm()
with pytest.raises(NotImplementedError):
- algo.from_jwk('value')
+ algo.from_jwk("value")
def test_algorithm_should_throw_exception_if_from_jwk_not_impl(self):
algo = Algorithm()
with pytest.raises(NotImplementedError):
- algo.to_jwk('value')
+ algo.to_jwk("value")
def test_none_algorithm_should_throw_exception_if_key_is_not_none(self):
algo = NoneAlgorithm()
with pytest.raises(InvalidKeyError):
- algo.prepare_key('123')
+ algo.prepare_key("123")
def test_hmac_should_reject_nonstring_key(self):
algo = HMACAlgorithm(HMACAlgorithm.SHA256)
@@ -62,186 +63,214 @@ class TestAlgorithms:
algo.prepare_key(object())
exception = context.value
- assert str(exception) == 'Expected a string value'
+ assert str(exception) == "Expected a string value"
def test_hmac_should_accept_unicode_key(self):
algo = HMACAlgorithm(HMACAlgorithm.SHA256)
- algo.prepare_key(force_unicode('awesome'))
+ algo.prepare_key(force_unicode("awesome"))
def test_hmac_should_throw_exception_if_key_is_pem_public_key(self):
algo = HMACAlgorithm(HMACAlgorithm.SHA256)
with pytest.raises(InvalidKeyError):
- with open(key_path('testkey2_rsa.pub.pem'), 'r') as keyfile:
+ with open(key_path("testkey2_rsa.pub.pem"), "r") as keyfile:
algo.prepare_key(keyfile.read())
def test_hmac_should_throw_exception_if_key_is_x509_certificate(self):
algo = HMACAlgorithm(HMACAlgorithm.SHA256)
with pytest.raises(InvalidKeyError):
- with open(key_path('testkey_rsa.cer'), 'r') as keyfile:
+ with open(key_path("testkey_rsa.cer"), "r") as keyfile:
algo.prepare_key(keyfile.read())
def test_hmac_should_throw_exception_if_key_is_ssh_public_key(self):
algo = HMACAlgorithm(HMACAlgorithm.SHA256)
with pytest.raises(InvalidKeyError):
- with open(key_path('testkey_rsa.pub'), 'r') as keyfile:
+ with open(key_path("testkey_rsa.pub"), "r") as keyfile:
algo.prepare_key(keyfile.read())
def test_hmac_should_throw_exception_if_key_is_x509_cert(self):
algo = HMACAlgorithm(HMACAlgorithm.SHA256)
with pytest.raises(InvalidKeyError):
- with open(key_path('testkey2_rsa.pub.pem'), 'r') as keyfile:
+ with open(key_path("testkey2_rsa.pub.pem"), "r") as keyfile:
algo.prepare_key(keyfile.read())
def test_hmac_should_throw_exception_if_key_is_pkcs1_pem_public(self):
algo = HMACAlgorithm(HMACAlgorithm.SHA256)
with pytest.raises(InvalidKeyError):
- with open(key_path('testkey_pkcs1.pub.pem'), 'r') as keyfile:
+ with open(key_path("testkey_pkcs1.pub.pem"), "r") as keyfile:
algo.prepare_key(keyfile.read())
def test_hmac_jwk_should_parse_and_verify(self):
algo = HMACAlgorithm(HMACAlgorithm.SHA256)
- with open(key_path('jwk_hmac.json'), 'r') as keyfile:
+ with open(key_path("jwk_hmac.json"), "r") as keyfile:
key = algo.from_jwk(keyfile.read())
- signature = algo.sign(b'Hello World!', key)
- assert algo.verify(b'Hello World!', key, signature)
+ signature = algo.sign(b"Hello World!", key)
+ assert algo.verify(b"Hello World!", key, signature)
def test_hmac_to_jwk_returns_correct_values(self):
algo = HMACAlgorithm(HMACAlgorithm.SHA256)
- key = algo.to_jwk('secret')
+ key = algo.to_jwk("secret")
- assert json.loads(key) == {'kty': 'oct', 'k': 'c2VjcmV0'}
+ assert json.loads(key) == {"kty": "oct", "k": "c2VjcmV0"}
def test_hmac_from_jwk_should_raise_exception_if_not_hmac_key(self):
algo = HMACAlgorithm(HMACAlgorithm.SHA256)
- with open(key_path('jwk_rsa_pub.json'), 'r') as keyfile:
+ with open(key_path("jwk_rsa_pub.json"), "r") as keyfile:
with pytest.raises(InvalidKeyError):
algo.from_jwk(keyfile.read())
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_should_parse_pem_public_key(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('testkey2_rsa.pub.pem'), 'r') as pem_key:
+ with open(key_path("testkey2_rsa.pub.pem"), "r") as pem_key:
algo.prepare_key(pem_key.read())
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_should_accept_pem_private_key_bytes(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('testkey_rsa'), 'rb') as pem_key:
+ with open(key_path("testkey_rsa"), "rb") as pem_key:
algo.prepare_key(pem_key.read())
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_should_accept_unicode_key(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('testkey_rsa'), 'r') as rsa_key:
+ with open(key_path("testkey_rsa"), "r") as rsa_key:
algo.prepare_key(force_unicode(rsa_key.read()))
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_should_reject_non_string_key(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
with pytest.raises(TypeError):
algo.prepare_key(None)
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_verify_should_return_false_if_signature_invalid(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- message = force_bytes('Hello World!')
-
- sig = base64.b64decode(force_bytes(
- 'yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp'
- '10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl'
- '2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix'
- 'sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX'
- 'fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA'
- 'APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA=='))
+ message = force_bytes("Hello World!")
+
+ sig = base64.b64decode(
+ force_bytes(
+ "yS6zk9DBkuGTtcBzLUzSpo9gGJxJFOGvUqN01iLhWHrzBQ9ZEz3+Ae38AXp"
+ "10RWwscp42ySC85Z6zoN67yGkLNWnfmCZSEv+xqELGEvBJvciOKsrhiObUl"
+ "2mveSc1oeO/2ujkGDkkkJ2epn0YliacVjZF5+/uDmImUfAAj8lzjnHlzYix"
+ "sn5jGz1H07jYYbi9diixN8IUhXeTafwFg02IcONhum29V40Wu6O5tAKWlJX"
+ "fHJnNUzAEUOXS0WahHVb57D30pcgIji9z923q90p5c7E2cU8V+E1qe8NdCA"
+ "APCDzZZ9zQ/dgcMVaBrGrgimrcLbPjueOKFgSO+SSjIElKA=="
+ )
+ )
- sig += force_bytes('123') # Signature is now invalid
+ sig += force_bytes("123") # Signature is now invalid
- with open(key_path('testkey_rsa.pub'), 'r') as keyfile:
+ with open(key_path("testkey_rsa.pub"), "r") as keyfile:
pub_key = algo.prepare_key(keyfile.read())
result = algo.verify(message, pub_key, sig)
assert not result
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_jwk_public_and_private_keys_should_parse_and_verify(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('jwk_rsa_pub.json'), 'r') as keyfile:
+ with open(key_path("jwk_rsa_pub.json"), "r") as keyfile:
pub_key = algo.from_jwk(keyfile.read())
- with open(key_path('jwk_rsa_key.json'), 'r') as keyfile:
+ with open(key_path("jwk_rsa_key.json"), "r") as keyfile:
priv_key = algo.from_jwk(keyfile.read())
- signature = algo.sign(force_bytes('Hello World!'), priv_key)
- assert algo.verify(force_bytes('Hello World!'), pub_key, signature)
+ signature = algo.sign(force_bytes("Hello World!"), priv_key)
+ assert algo.verify(force_bytes("Hello World!"), pub_key, signature)
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_private_key_to_jwk_works_with_from_jwk(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('testkey_rsa'), 'r') as rsa_key:
+ with open(key_path("testkey_rsa"), "r") as rsa_key:
orig_key = algo.prepare_key(force_unicode(rsa_key.read()))
parsed_key = algo.from_jwk(algo.to_jwk(orig_key))
assert parsed_key.private_numbers() == orig_key.private_numbers()
- assert parsed_key.private_numbers().public_numbers == orig_key.private_numbers().public_numbers
+ assert (
+ parsed_key.private_numbers().public_numbers
+ == orig_key.private_numbers().public_numbers
+ )
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_public_key_to_jwk_works_with_from_jwk(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('testkey_rsa.pub'), 'r') as rsa_key:
+ with open(key_path("testkey_rsa.pub"), "r") as rsa_key:
orig_key = algo.prepare_key(force_unicode(rsa_key.read()))
parsed_key = algo.from_jwk(algo.to_jwk(orig_key))
assert parsed_key.public_numbers() == orig_key.public_numbers()
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_jwk_private_key_with_other_primes_is_invalid(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('jwk_rsa_key.json'), 'r') as keyfile:
+ with open(key_path("jwk_rsa_key.json"), "r") as keyfile:
with pytest.raises(InvalidKeyError):
keydata = json.loads(keyfile.read())
- keydata['oth'] = []
+ keydata["oth"] = []
algo.from_jwk(json.dumps(keydata))
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_jwk_private_key_with_missing_values_is_invalid(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('jwk_rsa_key.json'), 'r') as keyfile:
+ with open(key_path("jwk_rsa_key.json"), "r") as keyfile:
with pytest.raises(InvalidKeyError):
keydata = json.loads(keyfile.read())
- del keydata['p']
+ del keydata["p"]
algo.from_jwk(json.dumps(keydata))
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_jwk_private_key_can_recover_prime_factors(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('jwk_rsa_key.json'), 'r') as keyfile:
+ with open(key_path("jwk_rsa_key.json"), "r") as keyfile:
keybytes = keyfile.read()
control_key = algo.from_jwk(keybytes).private_numbers()
keydata = json.loads(keybytes)
- delete_these = ['p', 'q', 'dp', 'dq', 'qi']
+ delete_these = ["p", "q", "dp", "dq", "qi"]
for field in delete_these:
del keydata[field]
@@ -254,206 +283,254 @@ class TestAlgorithms:
assert control_key.dmq1 == parsed_key.dmq1
assert control_key.iqmp == parsed_key.iqmp
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_jwk_private_key_with_missing_required_values_is_invalid(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('jwk_rsa_key.json'), 'r') as keyfile:
+ with open(key_path("jwk_rsa_key.json"), "r") as keyfile:
with pytest.raises(InvalidKeyError):
keydata = json.loads(keyfile.read())
- del keydata['p']
+ del keydata["p"]
algo.from_jwk(json.dumps(keydata))
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_jwk_raises_exception_if_not_a_valid_key(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
# Invalid JSON
with pytest.raises(InvalidKeyError):
- algo.from_jwk('{not-a-real-key')
+ algo.from_jwk("{not-a-real-key")
# Missing key parts
with pytest.raises(InvalidKeyError):
algo.from_jwk('{"kty": "RSA"}')
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_to_jwk_returns_correct_values_for_public_key(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('testkey_rsa.pub'), 'r') as keyfile:
+ with open(key_path("testkey_rsa.pub"), "r") as keyfile:
pub_key = algo.prepare_key(keyfile.read())
key = algo.to_jwk(pub_key)
expected = {
- 'e': 'AQAB',
- 'key_ops': ['verify'],
- 'kty': 'RSA',
- 'n': (
- '1HgzBfJv2cOjQryCwe8NEelriOTNFWKZUivevUrRhlqcmZJdCvuCJRr-xCN-'
- 'OmO8qwgJJR98feNujxVg-J9Ls3_UOA4HcF9nYH6aqVXELAE8Hk_ALvxi96ms'
- '1DDuAvQGaYZ-lANxlvxeQFOZSbjkz_9mh8aLeGKwqJLp3p-OhUBQpwvAUAPg'
- '82-OUtgTW3nSljjeFr14B8qAneGSc_wl0ni--1SRZUXFSovzcqQOkla3W27r'
- 'rLfrD6LXgj_TsDs4vD1PnIm1zcVenKT7TfYI17bsG_O_Wecwz2Nl19pL7gDo'
- 'sNruF3ogJWNq1Lyn_ijPQnkPLpZHyhvuiycYcI3DiQ'
+ "e": "AQAB",
+ "key_ops": ["verify"],
+ "kty": "RSA",
+ "n": (
+ "1HgzBfJv2cOjQryCwe8NEelriOTNFWKZUivevUrRhlqcmZJdCvuCJRr-xCN-"
+ "OmO8qwgJJR98feNujxVg-J9Ls3_UOA4HcF9nYH6aqVXELAE8Hk_ALvxi96ms"
+ "1DDuAvQGaYZ-lANxlvxeQFOZSbjkz_9mh8aLeGKwqJLp3p-OhUBQpwvAUAPg"
+ "82-OUtgTW3nSljjeFr14B8qAneGSc_wl0ni--1SRZUXFSovzcqQOkla3W27r"
+ "rLfrD6LXgj_TsDs4vD1PnIm1zcVenKT7TfYI17bsG_O_Wecwz2Nl19pL7gDo"
+ "sNruF3ogJWNq1Lyn_ijPQnkPLpZHyhvuiycYcI3DiQ"
),
}
assert json.loads(key) == expected
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_to_jwk_returns_correct_values_for_private_key(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('testkey_rsa'), 'r') as keyfile:
+ with open(key_path("testkey_rsa"), "r") as keyfile:
priv_key = algo.prepare_key(keyfile.read())
key = algo.to_jwk(priv_key)
expected = {
- 'key_ops': [u'sign'],
- 'kty': 'RSA',
- 'e': 'AQAB',
- 'n': (
- '1HgzBfJv2cOjQryCwe8NEelriOTNFWKZUivevUrRhlqcmZJdCvuCJRr-xCN-'
- 'OmO8qwgJJR98feNujxVg-J9Ls3_UOA4HcF9nYH6aqVXELAE8Hk_ALvxi96ms'
- '1DDuAvQGaYZ-lANxlvxeQFOZSbjkz_9mh8aLeGKwqJLp3p-OhUBQpwvAUAPg'
- '82-OUtgTW3nSljjeFr14B8qAneGSc_wl0ni--1SRZUXFSovzcqQOkla3W27r'
- 'rLfrD6LXgj_TsDs4vD1PnIm1zcVenKT7TfYI17bsG_O_Wecwz2Nl19pL7gDo'
- 'sNruF3ogJWNq1Lyn_ijPQnkPLpZHyhvuiycYcI3DiQ'
+ "key_ops": [u"sign"],
+ "kty": "RSA",
+ "e": "AQAB",
+ "n": (
+ "1HgzBfJv2cOjQryCwe8NEelriOTNFWKZUivevUrRhlqcmZJdCvuCJRr-xCN-"
+ "OmO8qwgJJR98feNujxVg-J9Ls3_UOA4HcF9nYH6aqVXELAE8Hk_ALvxi96ms"
+ "1DDuAvQGaYZ-lANxlvxeQFOZSbjkz_9mh8aLeGKwqJLp3p-OhUBQpwvAUAPg"
+ "82-OUtgTW3nSljjeFr14B8qAneGSc_wl0ni--1SRZUXFSovzcqQOkla3W27r"
+ "rLfrD6LXgj_TsDs4vD1PnIm1zcVenKT7TfYI17bsG_O_Wecwz2Nl19pL7gDo"
+ "sNruF3ogJWNq1Lyn_ijPQnkPLpZHyhvuiycYcI3DiQ"
+ ),
+ "d": (
+ "rfbs8AWdB1RkLJRlC51LukrAvYl5UfU1TE6XRa4o-DTg2-03OXLNEMyVpMr"
+ "a47weEnu14StypzC8qXL7vxXOyd30SSFTffLfleaTg-qxgMZSDw-Fb_M-pU"
+ "HMPMEDYG-lgGma4l4fd1yTX2ATtoUo9BVOQgWS1LMZqi0ASEOkUfzlBgL04"
+ "UoaLhPSuDdLygdlDzgruVPnec0t1uOEObmrcWIkhwU2CGQzeLtuzX6OVgPh"
+ "k7xcnjbDurTTVpWH0R0gbZ5ukmQ2P-YuCX8T9iWNMGjPNSkb7h02s2Oe9ZR"
+ "zP007xQ0VF-Z7xyLuxk6ASmoX1S39ujSbk2WF0eXNPRgFwQ"
+ ),
+ "q": (
+ "47hlW2f1ARuWYJf9Dl6MieXjdj2dGx9PL2UH0unVzJYInd56nqXNPrQrc5k"
+ "ZU65KApC9n9oKUwIxuqwAAbh8oGNEQDqnuTj-powCkdC6bwA8KH1Y-wotpq"
+ "_GSjxkNzjWRm2GArJSzZc6Fb8EuObOrAavKJ285-zMPCEfus1WZG0"
+ ),
+ "p": (
+ "7tr0z929Lp4OHIRJjIKM_rDrWMPtRgnV-51pgWsN6qdpDzns_PgFwrHcoyY"
+ "sWIO-4yCdVWPxFOgEZ8xXTM_uwOe4VEmdZhw55Tx7axYZtmZYZbO_RIP4CG"
+ "mlJlOFTiYnxpr-2Cx6kIeQmd-hf7fA3tL018aEzwYMbFMcnAGnEg0"
+ ),
+ "qi": (
+ "djo95mB0LVYikNPa-NgyDwLotLqrueb9IviMmn6zKHCwiOXReqXDX9slB8"
+ "RA15uv56bmN04O__NyVFcgJ2ef169GZHiRFIgIy0Pl8LYkMhCYKKhyqM7g"
+ "xN-SqGqDTKDC22j00S7jcvCaa1qadn1qbdfukZ4NXv7E2d_LO0Y2Kkc"
+ ),
+ "dp": (
+ "tgZ2-tJpEdWxu1m1EzeKa644LHVjpTRptk7H0LDc8i6SieADEuWQvkb9df"
+ "fpY6tDFaQNQr3fQ6dtdAztmsP7l1b_ynwvT1nDZUcqZvl4ruBgDWFmKbjI"
+ "lOCt0v9jX6MEPP5xqBx9axdkw18BnGtUuHrbzHSlUX-yh_rumpVH1SE"
+ ),
+ "dq": (
+ "xxCIuhD0YlWFbUcwFgGdBWcLIm_WCMGj7SB6aGu1VDTLr4Wu10TFWM0TNu"
+ "hc9YPker2gpj5qzAmdAzwcfWSSvXpJTYR43jfulBTMoj8-2o3wCM0anclW"
+ "AuKhin-kc4mh9ssDXRQZwlMymZP0QtaxUDw_nlfVrUCZgO7L1_ZsUTk"
),
- 'd': ('rfbs8AWdB1RkLJRlC51LukrAvYl5UfU1TE6XRa4o-DTg2-03OXLNEMyVpMr'
- 'a47weEnu14StypzC8qXL7vxXOyd30SSFTffLfleaTg-qxgMZSDw-Fb_M-pU'
- 'HMPMEDYG-lgGma4l4fd1yTX2ATtoUo9BVOQgWS1LMZqi0ASEOkUfzlBgL04'
- 'UoaLhPSuDdLygdlDzgruVPnec0t1uOEObmrcWIkhwU2CGQzeLtuzX6OVgPh'
- 'k7xcnjbDurTTVpWH0R0gbZ5ukmQ2P-YuCX8T9iWNMGjPNSkb7h02s2Oe9ZR'
- 'zP007xQ0VF-Z7xyLuxk6ASmoX1S39ujSbk2WF0eXNPRgFwQ'),
- 'q': ('47hlW2f1ARuWYJf9Dl6MieXjdj2dGx9PL2UH0unVzJYInd56nqXNPrQrc5k'
- 'ZU65KApC9n9oKUwIxuqwAAbh8oGNEQDqnuTj-powCkdC6bwA8KH1Y-wotpq'
- '_GSjxkNzjWRm2GArJSzZc6Fb8EuObOrAavKJ285-zMPCEfus1WZG0'),
- 'p': ('7tr0z929Lp4OHIRJjIKM_rDrWMPtRgnV-51pgWsN6qdpDzns_PgFwrHcoyY'
- 'sWIO-4yCdVWPxFOgEZ8xXTM_uwOe4VEmdZhw55Tx7axYZtmZYZbO_RIP4CG'
- 'mlJlOFTiYnxpr-2Cx6kIeQmd-hf7fA3tL018aEzwYMbFMcnAGnEg0'),
- 'qi': ('djo95mB0LVYikNPa-NgyDwLotLqrueb9IviMmn6zKHCwiOXReqXDX9slB8'
- 'RA15uv56bmN04O__NyVFcgJ2ef169GZHiRFIgIy0Pl8LYkMhCYKKhyqM7g'
- 'xN-SqGqDTKDC22j00S7jcvCaa1qadn1qbdfukZ4NXv7E2d_LO0Y2Kkc'),
- 'dp': ('tgZ2-tJpEdWxu1m1EzeKa644LHVjpTRptk7H0LDc8i6SieADEuWQvkb9df'
- 'fpY6tDFaQNQr3fQ6dtdAztmsP7l1b_ynwvT1nDZUcqZvl4ruBgDWFmKbjI'
- 'lOCt0v9jX6MEPP5xqBx9axdkw18BnGtUuHrbzHSlUX-yh_rumpVH1SE'),
- 'dq': ('xxCIuhD0YlWFbUcwFgGdBWcLIm_WCMGj7SB6aGu1VDTLr4Wu10TFWM0TNu'
- 'hc9YPker2gpj5qzAmdAzwcfWSSvXpJTYR43jfulBTMoj8-2o3wCM0anclW'
- 'AuKhin-kc4mh9ssDXRQZwlMymZP0QtaxUDw_nlfVrUCZgO7L1_ZsUTk')
}
assert json.loads(key) == expected
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_to_jwk_raises_exception_on_invalid_key(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
with pytest.raises(InvalidKeyError):
- algo.to_jwk({'not': 'a valid key'})
+ algo.to_jwk({"not": "a valid key"})
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_from_jwk_raises_exception_on_invalid_key(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
- with open(key_path('jwk_hmac.json'), 'r') as keyfile:
+ with open(key_path("jwk_hmac.json"), "r") as keyfile:
with pytest.raises(InvalidKeyError):
algo.from_jwk(keyfile.read())
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_ec_should_reject_non_string_key(self):
algo = ECAlgorithm(ECAlgorithm.SHA256)
with pytest.raises(TypeError):
algo.prepare_key(None)
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_ec_should_accept_unicode_key(self):
algo = ECAlgorithm(ECAlgorithm.SHA256)
- with open(key_path('testkey_ec'), 'r') as ec_key:
+ with open(key_path("testkey_ec"), "r") as ec_key:
algo.prepare_key(force_unicode(ec_key.read()))
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_ec_should_accept_pem_private_key_bytes(self):
algo = ECAlgorithm(ECAlgorithm.SHA256)
- with open(key_path('testkey_ec'), 'rb') as ec_key:
+ with open(key_path("testkey_ec"), "rb") as ec_key:
algo.prepare_key(ec_key.read())
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_ec_should_accept_ssh_public_key_bytes(self):
algo = ECAlgorithm(ECAlgorithm.SHA256)
- with open(key_path('testkey_ec_ssh.pub'), 'r') as ec_key:
+ with open(key_path("testkey_ec_ssh.pub"), "r") as ec_key:
algo.prepare_key(ec_key.read())
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_ec_verify_should_return_false_if_signature_invalid(self):
algo = ECAlgorithm(ECAlgorithm.SHA256)
- message = force_bytes('Hello World!')
+ message = force_bytes("Hello World!")
# Mess up the signature by replacing a known byte
- sig = base64.b64decode(force_bytes(
- 'AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M'
- 'mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw'
- 'LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65'.replace('r', 's')))
+ sig = base64.b64decode(
+ force_bytes(
+ "AC+m4Jf/xI3guAC6w0w37t5zRpSCF6F4udEz5LiMiTIjCS4vcVe6dDOxK+M"
+ "mvkF8PxJuvqxP2CO3TR3okDPCl/NjATTO1jE+qBZ966CRQSSzcCM+tzcHzw"
+ "LZS5kbvKu0Acd/K6Ol2/W3B1NeV5F/gjvZn/jOwaLgWEUYsg0o4XVrAg65".replace(
+ "r", "s"
+ )
+ )
+ )
- with open(key_path('testkey_ec.pub'), 'r') as keyfile:
+ with open(key_path("testkey_ec.pub"), "r") as keyfile:
pub_key = algo.prepare_key(keyfile.read())
result = algo.verify(message, pub_key, sig)
assert not result
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_ec_verify_should_return_false_if_signature_wrong_length(self):
algo = ECAlgorithm(ECAlgorithm.SHA256)
- message = force_bytes('Hello World!')
+ message = force_bytes("Hello World!")
- sig = base64.b64decode(force_bytes('AC+m4Jf/xI3guAC6w0w3'))
+ sig = base64.b64decode(force_bytes("AC+m4Jf/xI3guAC6w0w3"))
- with open(key_path('testkey_ec.pub'), 'r') as keyfile:
+ with open(key_path("testkey_ec.pub"), "r") as keyfile:
pub_key = algo.prepare_key(keyfile.read())
result = algo.verify(message, pub_key, sig)
assert not result
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_pss_sign_then_verify_should_return_true(self):
algo = RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256)
- message = force_bytes('Hello World!')
+ message = force_bytes("Hello World!")
- with open(key_path('testkey_rsa'), 'r') as keyfile:
+ with open(key_path("testkey_rsa"), "r") as keyfile:
priv_key = algo.prepare_key(keyfile.read())
sig = algo.sign(message, priv_key)
- with open(key_path('testkey_rsa.pub'), 'r') as keyfile:
+ with open(key_path("testkey_rsa.pub"), "r") as keyfile:
pub_key = algo.prepare_key(keyfile.read())
result = algo.verify(message, pub_key, sig)
assert result
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_pss_verify_should_return_false_if_signature_invalid(self):
algo = RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256)
- jwt_message = force_bytes('Hello World!')
-
- jwt_sig = base64.b64decode(force_bytes(
- 'ywKAUGRIDC//6X+tjvZA96yEtMqpOrSppCNfYI7NKyon3P7doud5v65oWNu'
- 'vQsz0fzPGfF7mQFGo9Cm9Vn0nljm4G6PtqZRbz5fXNQBH9k10gq34AtM02c'
- '/cveqACQ8gF3zxWh6qr9jVqIpeMEaEBIkvqG954E0HT9s9ybHShgHX9mlWk'
- '186/LopP4xe5c/hxOQjwhv6yDlTiwJFiqjNCvj0GyBKsc4iECLGIIO+4mC4'
- 'daOCWqbpZDuLb1imKpmm8Nsm56kAxijMLZnpCcnPgyb7CqG+B93W9GHglA5'
- 'drUeR1gRtO7vqbZMsCAQ4bpjXxwbYyjQlEVuMl73UL6sOWg=='))
+ jwt_message = force_bytes("Hello World!")
+
+ jwt_sig = base64.b64decode(
+ force_bytes(
+ "ywKAUGRIDC//6X+tjvZA96yEtMqpOrSppCNfYI7NKyon3P7doud5v65oWNu"
+ "vQsz0fzPGfF7mQFGo9Cm9Vn0nljm4G6PtqZRbz5fXNQBH9k10gq34AtM02c"
+ "/cveqACQ8gF3zxWh6qr9jVqIpeMEaEBIkvqG954E0HT9s9ybHShgHX9mlWk"
+ "186/LopP4xe5c/hxOQjwhv6yDlTiwJFiqjNCvj0GyBKsc4iECLGIIO+4mC4"
+ "daOCWqbpZDuLb1imKpmm8Nsm56kAxijMLZnpCcnPgyb7CqG+B93W9GHglA5"
+ "drUeR1gRtO7vqbZMsCAQ4bpjXxwbYyjQlEVuMl73UL6sOWg=="
+ )
+ )
- jwt_sig += force_bytes('123') # Signature is now invalid
+ jwt_sig += force_bytes("123") # Signature is now invalid
- with open(key_path('testkey_rsa.pub'), 'r') as keyfile:
+ with open(key_path("testkey_rsa.pub"), "r") as keyfile:
jwt_pub_key = algo.prepare_key(keyfile.read())
result = algo.verify(jwt_message, jwt_pub_key, jwt_sig)
@@ -474,16 +551,16 @@ class TestAlgorithmsRFC7520:
Reference: https://tools.ietf.org/html/rfc7520#section-4.4
"""
signing_input = force_bytes(
- 'eyJhbGciOiJIUzI1NiIsImtpZCI6IjAxOGMwYWU1LTRkOWItNDcxYi1iZmQ2LWVlZ'
- 'jMxNGJjNzAzNyJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ'
- '29pbmcgb3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIG'
- 'lmIHlvdSBkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmc'
- 'gd2hlcmUgeW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4'
+ "eyJhbGciOiJIUzI1NiIsImtpZCI6IjAxOGMwYWU1LTRkOWItNDcxYi1iZmQ2LWVlZ"
+ "jMxNGJjNzAzNyJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ"
+ "29pbmcgb3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIG"
+ "lmIHlvdSBkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmc"
+ "gd2hlcmUgeW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4"
)
- signature = base64url_decode(force_bytes(
- 's0h6KThzkfBBBkLspW1h84VsJZFTsPPqMDA7g1Md7p0'
- ))
+ signature = base64url_decode(
+ force_bytes("s0h6KThzkfBBBkLspW1h84VsJZFTsPPqMDA7g1Md7p0")
+ )
algo = HMACAlgorithm(HMACAlgorithm.SHA256)
key = algo.prepare_key(load_hmac_key())
@@ -491,7 +568,9 @@ class TestAlgorithmsRFC7520:
result = algo.verify(signing_input, key, signature)
assert result
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsa_verify_should_return_true_for_test_vector(self):
"""
This test verifies that RSA PKCS v1.5 verification works with a known
@@ -500,21 +579,23 @@ class TestAlgorithmsRFC7520:
Reference: https://tools.ietf.org/html/rfc7520#section-4.1
"""
signing_input = force_bytes(
- 'eyJhbGciOiJSUzI1NiIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb'
- 'XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb'
- '3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS'
- 'Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU'
- 'geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4'
+ "eyJhbGciOiJSUzI1NiIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb"
+ "XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb"
+ "3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS"
+ "Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU"
+ "geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4"
)
- signature = base64url_decode(force_bytes(
- 'MRjdkly7_-oTPTS3AXP41iQIGKa80A0ZmTuV5MEaHoxnW2e5CZ5NlKtainoFmKZop'
- 'dHM1O2U4mwzJdQx996ivp83xuglII7PNDi84wnB-BDkoBwA78185hX-Es4JIwmDLJ'
- 'K3lfWRa-XtL0RnltuYv746iYTh_qHRD68BNt1uSNCrUCTJDt5aAE6x8wW1Kt9eRo4'
- 'QPocSadnHXFxnt8Is9UzpERV0ePPQdLuW3IS_de3xyIrDaLGdjluPxUAhb6L2aXic'
- '1U12podGU0KLUQSE_oI-ZnmKJ3F4uOZDnd6QZWJushZ41Axf_fcIe8u9ipH84ogor'
- 'ee7vjbU5y18kDquDg'
- ))
+ signature = base64url_decode(
+ force_bytes(
+ "MRjdkly7_-oTPTS3AXP41iQIGKa80A0ZmTuV5MEaHoxnW2e5CZ5NlKtainoFmKZop"
+ "dHM1O2U4mwzJdQx996ivp83xuglII7PNDi84wnB-BDkoBwA78185hX-Es4JIwmDLJ"
+ "K3lfWRa-XtL0RnltuYv746iYTh_qHRD68BNt1uSNCrUCTJDt5aAE6x8wW1Kt9eRo4"
+ "QPocSadnHXFxnt8Is9UzpERV0ePPQdLuW3IS_de3xyIrDaLGdjluPxUAhb6L2aXic"
+ "1U12podGU0KLUQSE_oI-ZnmKJ3F4uOZDnd6QZWJushZ41Axf_fcIe8u9ipH84ogor"
+ "ee7vjbU5y18kDquDg"
+ )
+ )
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
key = algo.prepare_key(load_rsa_pub_key())
@@ -522,7 +603,9 @@ class TestAlgorithmsRFC7520:
result = algo.verify(signing_input, key, signature)
assert result
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_rsapss_verify_should_return_true_for_test_vector(self):
"""
This test verifies that RSA-PSS verification works with a known good
@@ -531,21 +614,23 @@ class TestAlgorithmsRFC7520:
Reference: https://tools.ietf.org/html/rfc7520#section-4.2
"""
signing_input = force_bytes(
- 'eyJhbGciOiJQUzM4NCIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb'
- 'XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb'
- '3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS'
- 'Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU'
- 'geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4'
+ "eyJhbGciOiJQUzM4NCIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb"
+ "XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb"
+ "3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS"
+ "Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU"
+ "geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4"
)
- signature = base64url_decode(force_bytes(
- 'cu22eBqkYDKgIlTpzDXGvaFfz6WGoz7fUDcfT0kkOy42miAh2qyBzk1xEsnk2IpN6'
- '-tPid6VrklHkqsGqDqHCdP6O8TTB5dDDItllVo6_1OLPpcbUrhiUSMxbbXUvdvWXz'
- 'g-UD8biiReQFlfz28zGWVsdiNAUf8ZnyPEgVFn442ZdNqiVJRmBqrYRXe8P_ijQ7p'
- '8Vdz0TTrxUeT3lm8d9shnr2lfJT8ImUjvAA2Xez2Mlp8cBE5awDzT0qI0n6uiP1aC'
- 'N_2_jLAeQTlqRHtfa64QQSUmFAAjVKPbByi7xho0uTOcbH510a6GYmJUAfmWjwZ6o'
- 'D4ifKo8DYM-X72Eaw'
- ))
+ signature = base64url_decode(
+ force_bytes(
+ "cu22eBqkYDKgIlTpzDXGvaFfz6WGoz7fUDcfT0kkOy42miAh2qyBzk1xEsnk2IpN6"
+ "-tPid6VrklHkqsGqDqHCdP6O8TTB5dDDItllVo6_1OLPpcbUrhiUSMxbbXUvdvWXz"
+ "g-UD8biiReQFlfz28zGWVsdiNAUf8ZnyPEgVFn442ZdNqiVJRmBqrYRXe8P_ijQ7p"
+ "8Vdz0TTrxUeT3lm8d9shnr2lfJT8ImUjvAA2Xez2Mlp8cBE5awDzT0qI0n6uiP1aC"
+ "N_2_jLAeQTlqRHtfa64QQSUmFAAjVKPbByi7xho0uTOcbH510a6GYmJUAfmWjwZ6o"
+ "D4ifKo8DYM-X72Eaw"
+ )
+ )
algo = RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384)
key = algo.prepare_key(load_rsa_pub_key())
@@ -553,7 +638,9 @@ class TestAlgorithmsRFC7520:
result = algo.verify(signing_input, key, signature)
assert result
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_ec_verify_should_return_true_for_test_vector(self):
"""
This test verifies that ECDSA verification works with a known good
@@ -562,18 +649,20 @@ class TestAlgorithmsRFC7520:
Reference: https://tools.ietf.org/html/rfc7520#section-4.3
"""
signing_input = force_bytes(
- 'eyJhbGciOiJFUzUxMiIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb'
- 'XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb'
- '3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS'
- 'Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU'
- 'geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4'
+ "eyJhbGciOiJFUzUxMiIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb"
+ "XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb"
+ "3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS"
+ "Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU"
+ "geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4"
)
- signature = base64url_decode(force_bytes(
- 'AE_R_YZCChjn4791jSQCrdPZCNYqHXCTZH0-JZGYNlaAjP2kqaluUIIUnC9qvbu9P'
- 'lon7KRTzoNEuT4Va2cmL1eJAQy3mtPBu_u_sDDyYjnAMDxXPn7XrT0lw-kvAD890j'
- 'l8e2puQens_IEKBpHABlsbEPX6sFY8OcGDqoRuBomu9xQ2'
- ))
+ signature = base64url_decode(
+ force_bytes(
+ "AE_R_YZCChjn4791jSQCrdPZCNYqHXCTZH0-JZGYNlaAjP2kqaluUIIUnC9qvbu9P"
+ "lon7KRTzoNEuT4Va2cmL1eJAQy3mtPBu_u_sDDyYjnAMDxXPn7XrT0lw-kvAD890j"
+ "l8e2puQens_IEKBpHABlsbEPX6sFY8OcGDqoRuBomu9xQ2"
+ )
+ )
algo = ECAlgorithm(ECAlgorithm.SHA512)
key = algo.prepare_key(load_ec_pub_key())
diff --git a/tests/test_api_jws.py b/tests/test_api_jws.py
index 4f70b56..b502124 100644
--- a/tests/test_api_jws.py
+++ b/tests/test_api_jws.py
@@ -1,23 +1,26 @@
-
import json
from decimal import Decimal
+import pytest
+
from jwt.algorithms import Algorithm
from jwt.api_jws import PyJWS
from jwt.exceptions import (
- DecodeError, InvalidAlgorithmError, InvalidSignatureError,
- InvalidTokenError
+ DecodeError,
+ InvalidAlgorithmError,
+ InvalidSignatureError,
+ InvalidTokenError,
)
from jwt.utils import base64url_decode, force_bytes, force_unicode
-import pytest
-
from .compat import string_types, text_type
try:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import (
- load_pem_private_key, load_pem_public_key, load_ssh_public_key
+ load_pem_private_key,
+ load_pem_public_key,
+ load_ssh_public_key,
)
has_crypto = True
@@ -33,167 +36,179 @@ def jws():
@pytest.fixture
def payload():
""" Creates a sample jws claimset for use as a payload during tests """
- return force_bytes('hello world')
+ return force_bytes("hello world")
class TestJWS:
def test_register_algo_does_not_allow_duplicate_registration(self, jws):
- jws.register_algorithm('AAA', Algorithm())
+ jws.register_algorithm("AAA", Algorithm())
with pytest.raises(ValueError):
- jws.register_algorithm('AAA', Algorithm())
+ jws.register_algorithm("AAA", Algorithm())
def test_register_algo_rejects_non_algorithm_obj(self, jws):
with pytest.raises(TypeError):
- jws.register_algorithm('AAA123', {})
+ jws.register_algorithm("AAA123", {})
def test_unregister_algo_removes_algorithm(self, jws):
supported = jws.get_algorithms()
- assert 'none' in supported
- assert 'HS256' in supported
+ assert "none" in supported
+ assert "HS256" in supported
- jws.unregister_algorithm('HS256')
+ jws.unregister_algorithm("HS256")
supported = jws.get_algorithms()
- assert 'HS256' not in supported
+ assert "HS256" not in supported
def test_unregister_algo_throws_error_if_not_registered(self, jws):
with pytest.raises(KeyError):
- jws.unregister_algorithm('AAA')
+ jws.unregister_algorithm("AAA")
def test_algo_parameter_removes_alg_from_algorithms_list(self, jws):
- assert 'none' in jws.get_algorithms()
- assert 'HS256' in jws.get_algorithms()
+ assert "none" in jws.get_algorithms()
+ assert "HS256" in jws.get_algorithms()
- jws = PyJWS(algorithms=['HS256'])
- assert 'none' not in jws.get_algorithms()
- assert 'HS256' in jws.get_algorithms()
+ jws = PyJWS(algorithms=["HS256"])
+ assert "none" not in jws.get_algorithms()
+ assert "HS256" in jws.get_algorithms()
def test_override_options(self):
- jws = PyJWS(options={'verify_signature': False})
+ jws = PyJWS(options={"verify_signature": False})
- assert not jws.options['verify_signature']
+ assert not jws.options["verify_signature"]
def test_non_object_options_dont_persist(self, jws, payload):
- token = jws.encode(payload, 'secret')
+ token = jws.encode(payload, "secret")
- jws.decode(token, 'secret', options={'verify_signature': False})
+ jws.decode(token, "secret", options={"verify_signature": False})
- assert jws.options['verify_signature']
+ assert jws.options["verify_signature"]
def test_options_must_be_dict(self, jws):
pytest.raises(TypeError, PyJWS, options=object())
- pytest.raises(TypeError, PyJWS, options=('something'))
+ pytest.raises(TypeError, PyJWS, options=("something"))
def test_encode_decode(self, jws, payload):
- secret = 'secret'
+ secret = "secret"
jws_message = jws.encode(payload, secret)
decoded_payload = jws.decode(jws_message, secret)
assert decoded_payload == payload
- def test_decode_fails_when_alg_is_not_on_method_algorithms_param(self, jws, payload):
- secret = 'secret'
- jws_token = jws.encode(payload, secret, algorithm='HS256')
+ def test_decode_fails_when_alg_is_not_on_method_algorithms_param(
+ self, jws, payload
+ ):
+ secret = "secret"
+ jws_token = jws.encode(payload, secret, algorithm="HS256")
jws.decode(jws_token, secret)
with pytest.raises(InvalidAlgorithmError):
- jws.decode(jws_token, secret, algorithms=['HS384'])
+ jws.decode(jws_token, secret, algorithms=["HS384"])
def test_decode_works_with_unicode_token(self, jws):
- secret = 'secret'
+ secret = "secret"
unicode_jws = text_type(
- 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
- '.eyJoZWxsbyI6ICJ3b3JsZCJ9'
- '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
+ "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
+ ".eyJoZWxsbyI6ICJ3b3JsZCJ9"
+ ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8"
+ )
jws.decode(unicode_jws, secret)
def test_decode_missing_segments_throws_exception(self, jws):
- secret = 'secret'
- example_jws = ('eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
- '.eyJoZWxsbyI6ICJ3b3JsZCJ9'
- '') # Missing segment
+ secret = "secret"
+ example_jws = (
+ "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
+ ".eyJoZWxsbyI6ICJ3b3JsZCJ9"
+ ""
+ ) # Missing segment
with pytest.raises(DecodeError) as context:
jws.decode(example_jws, secret)
exception = context.value
- assert str(exception) == 'Not enough segments'
+ assert str(exception) == "Not enough segments"
def test_decode_invalid_token_type_is_none(self, jws):
example_jws = None
- example_secret = 'secret'
+ example_secret = "secret"
with pytest.raises(DecodeError) as context:
jws.decode(example_jws, example_secret)
exception = context.value
- assert 'Invalid token type' in str(exception)
+ assert "Invalid token type" in str(exception)
def test_decode_invalid_token_type_is_int(self, jws):
example_jws = 123
- example_secret = 'secret'
+ example_secret = "secret"
with pytest.raises(DecodeError) as context:
jws.decode(example_jws, example_secret)
exception = context.value
- assert 'Invalid token type' in str(exception)
+ assert "Invalid token type" in str(exception)
def test_decode_with_non_mapping_header_throws_exception(self, jws):
- secret = 'secret'
- example_jws = ('MQ' # == 1
- '.eyJoZWxsbyI6ICJ3b3JsZCJ9'
- '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
+ secret = "secret"
+ example_jws = (
+ "MQ" # == 1
+ ".eyJoZWxsbyI6ICJ3b3JsZCJ9"
+ ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8"
+ )
with pytest.raises(DecodeError) as context:
jws.decode(example_jws, secret)
exception = context.value
- assert str(exception) == 'Invalid header string: must be a json object'
+ assert str(exception) == "Invalid header string: must be a json object"
- def test_encode_algorithm_param_should_be_case_sensitive(self, jws, payload):
+ def test_encode_algorithm_param_should_be_case_sensitive(
+ self, jws, payload
+ ):
- jws.encode(payload, 'secret', algorithm='HS256')
+ jws.encode(payload, "secret", algorithm="HS256")
with pytest.raises(NotImplementedError) as context:
- jws.encode(payload, None, algorithm='hs256')
+ jws.encode(payload, None, algorithm="hs256")
exception = context.value
- assert str(exception) == 'Algorithm not supported'
+ assert str(exception) == "Algorithm not supported"
def test_decode_algorithm_param_should_be_case_sensitive(self, jws):
- example_jws = ('eyJhbGciOiJoczI1NiIsInR5cCI6IkpXVCJ9' # alg = hs256
- '.eyJoZWxsbyI6IndvcmxkIn0'
- '.5R_FEPE7SW2dT9GgIxPgZATjFGXfUDOSwo7TtO_Kd_g')
+ example_jws = (
+ "eyJhbGciOiJoczI1NiIsInR5cCI6IkpXVCJ9" # alg = hs256
+ ".eyJoZWxsbyI6IndvcmxkIn0"
+ ".5R_FEPE7SW2dT9GgIxPgZATjFGXfUDOSwo7TtO_Kd_g"
+ )
with pytest.raises(InvalidAlgorithmError) as context:
- jws.decode(example_jws, 'secret')
+ jws.decode(example_jws, "secret")
exception = context.value
- assert str(exception) == 'Algorithm not supported'
+ assert str(exception) == "Algorithm not supported"
def test_bad_secret(self, jws, payload):
- right_secret = 'foo'
- bad_secret = 'bar'
+ right_secret = "foo"
+ bad_secret = "bar"
jws_message = jws.encode(payload, right_secret)
with pytest.raises(DecodeError) as excinfo:
# Backward compat for ticket #315
jws.decode(jws_message, bad_secret)
- assert 'Signature verification failed' == str(excinfo.value)
+ assert "Signature verification failed" == str(excinfo.value)
with pytest.raises(InvalidSignatureError) as excinfo:
jws.decode(jws_message, bad_secret)
- assert 'Signature verification failed' == str(excinfo.value)
+ assert "Signature verification failed" == str(excinfo.value)
def test_decodes_valid_jws(self, jws, payload):
- example_secret = 'secret'
+ example_secret = "secret"
example_jws = (
- b'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.'
- b'aGVsbG8gd29ybGQ.'
- b'gEW0pdU4kxPthjtehYdhxB9mMOGajt1xCKlGGXDJ8PM')
+ b"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9."
+ b"aGVsbG8gd29ybGQ."
+ b"gEW0pdU4kxPthjtehYdhxB9mMOGajt1xCKlGGXDJ8PM"
+ )
decoded_payload = jws.decode(example_jws, example_secret)
@@ -203,18 +218,21 @@ class TestJWS:
# Used to test for regressions that could affect both
# encoding / decoding operations equally (causing tests
# to still pass).
- @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library")
+ @pytest.mark.skipif(
+ not has_crypto, reason="Can't run without cryptography library"
+ )
def test_decodes_valid_es384_jws(self, jws):
- example_payload = {'hello': 'world'}
- with open('tests/keys/testkey_ec.pub', 'r') as fp:
+ example_payload = {"hello": "world"}
+ with open("tests/keys/testkey_ec.pub", "r") as fp:
example_pubkey = fp.read()
example_jws = (
- b'eyJhbGciOiJFUzM4NCIsInR5cCI6IkpXVCJ9'
- b'.eyJoZWxsbyI6IndvcmxkIn0'
- b'.AGtlemKghaIaYh1yeeekFH9fRuNY7hCaw5hUgZ5aG1N'
- b'2F8FIbiKLaZKr8SiFdTimXFVTEmxpBQ9sRmdsDsnrM-1'
- b'HAG0_zxxu0JyINOFT2iqF3URYl9HZ8kZWMeZAtXmn6Cw'
- b'PXRJD2f7N-f7bJ5JeL9VT5beI2XD3FlK3GgRvI-eE-2Ik')
+ b"eyJhbGciOiJFUzM4NCIsInR5cCI6IkpXVCJ9"
+ b".eyJoZWxsbyI6IndvcmxkIn0"
+ b".AGtlemKghaIaYh1yeeekFH9fRuNY7hCaw5hUgZ5aG1N"
+ b"2F8FIbiKLaZKr8SiFdTimXFVTEmxpBQ9sRmdsDsnrM-1"
+ b"HAG0_zxxu0JyINOFT2iqF3URYl9HZ8kZWMeZAtXmn6Cw"
+ b"PXRJD2f7N-f7bJ5JeL9VT5beI2XD3FlK3GgRvI-eE-2Ik"
+ )
decoded_payload = jws.decode(example_jws, example_pubkey)
json_payload = json.loads(force_unicode(decoded_payload))
@@ -224,40 +242,43 @@ class TestJWS:
# Used to test for regressions that could affect both
# encoding / decoding operations equally (causing tests
# to still pass).
- @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library")
+ @pytest.mark.skipif(
+ not has_crypto, reason="Can't run without cryptography library"
+ )
def test_decodes_valid_rs384_jws(self, jws):
- example_payload = {'hello': 'world'}
- with open('tests/keys/testkey_rsa.pub', 'r') as fp:
+ example_payload = {"hello": "world"}
+ with open("tests/keys/testkey_rsa.pub", "r") as fp:
example_pubkey = fp.read()
example_jws = (
- b'eyJhbGciOiJSUzM4NCIsInR5cCI6IkpXVCJ9'
- b'.eyJoZWxsbyI6IndvcmxkIn0'
- b'.yNQ3nI9vEDs7lEh-Cp81McPuiQ4ZRv6FL4evTYYAh1X'
- b'lRTTR3Cz8pPA9Stgso8Ra9xGB4X3rlra1c8Jz10nTUju'
- b'O06OMm7oXdrnxp1KIiAJDerWHkQ7l3dlizIk1bmMA457'
- b'W2fNzNfHViuED5ISM081dgf_a71qBwJ_yShMMrSOfxDx'
- b'mX9c4DjRogRJG8SM5PvpLqI_Cm9iQPGMvmYK7gzcq2cJ'
- b'urHRJDJHTqIdpLWXkY7zVikeen6FhuGyn060Dz9gYq9t'
- b'uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr'
- b'qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A')
+ b"eyJhbGciOiJSUzM4NCIsInR5cCI6IkpXVCJ9"
+ b".eyJoZWxsbyI6IndvcmxkIn0"
+ b".yNQ3nI9vEDs7lEh-Cp81McPuiQ4ZRv6FL4evTYYAh1X"
+ b"lRTTR3Cz8pPA9Stgso8Ra9xGB4X3rlra1c8Jz10nTUju"
+ b"O06OMm7oXdrnxp1KIiAJDerWHkQ7l3dlizIk1bmMA457"
+ b"W2fNzNfHViuED5ISM081dgf_a71qBwJ_yShMMrSOfxDx"
+ b"mX9c4DjRogRJG8SM5PvpLqI_Cm9iQPGMvmYK7gzcq2cJ"
+ b"urHRJDJHTqIdpLWXkY7zVikeen6FhuGyn060Dz9gYq9t"
+ b"uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr"
+ b"qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A"
+ )
decoded_payload = jws.decode(example_jws, example_pubkey)
json_payload = json.loads(force_unicode(decoded_payload))
assert json_payload == example_payload
def test_load_verify_valid_jws(self, jws, payload):
- example_secret = 'secret'
+ example_secret = "secret"
example_jws = (
- b'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.'
- b'aGVsbG8gd29ybGQ.'
- b'SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI'
+ b"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9."
+ b"aGVsbG8gd29ybGQ."
+ b"SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI"
)
decoded_payload = jws.decode(example_jws, key=example_secret)
assert decoded_payload == payload
def test_allow_skip_verification(self, jws, payload):
- right_secret = 'foo'
+ right_secret = "foo"
jws_message = jws.encode(payload, right_secret)
decoded_payload = jws.decode(jws_message, verify=False)
@@ -265,34 +286,37 @@ class TestJWS:
def test_verify_false_deprecated(self, jws, recwarn):
example_jws = (
- b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
- b'.eyJoZWxsbyI6ICJ3b3JsZCJ9'
- b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
+ b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
+ b".eyJoZWxsbyI6ICJ3b3JsZCJ9"
+ b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8"
+ )
pytest.deprecated_call(jws.decode, example_jws, verify=False)
def test_decode_with_optional_algorithms(self, jws):
- example_secret = 'secret'
+ example_secret = "secret"
example_jws = (
- b'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.'
- b'aGVsbG8gd29ybGQ.'
- b'SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI'
+ b"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9."
+ b"aGVsbG8gd29ybGQ."
+ b"SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI"
)
pytest.deprecated_call(jws.decode, example_jws, key=example_secret)
def test_decode_no_algorithms_verify_signature_false(self, jws):
- example_secret = 'secret'
+ example_secret = "secret"
example_jws = (
- b'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.'
- b'aGVsbG8gd29ybGQ.'
- b'SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI'
+ b"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9."
+ b"aGVsbG8gd29ybGQ."
+ b"SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI"
)
try:
pytest.deprecated_call(
- jws.decode, example_jws, key=example_secret,
- options={'verify_signature': False},
+ jws.decode,
+ example_jws,
+ key=example_secret,
+ options={"verify_signature": False},
)
except pytest.fail.Exception:
pass
@@ -300,7 +324,7 @@ class TestJWS:
assert False, "Unexpected DeprecationWarning raised."
def test_load_no_verification(self, jws, payload):
- right_secret = 'foo'
+ right_secret = "foo"
jws_message = jws.encode(payload, right_secret)
decoded_payload = jws.decode(jws_message, key=None, verify=False)
@@ -308,50 +332,54 @@ class TestJWS:
assert decoded_payload == payload
def test_no_secret(self, jws, payload):
- right_secret = 'foo'
+ right_secret = "foo"
jws_message = jws.encode(payload, right_secret)
with pytest.raises(DecodeError):
jws.decode(jws_message)
def test_verify_signature_with_no_secret(self, jws, payload):
- right_secret = 'foo'
+ right_secret = "foo"
jws_message = jws.encode(payload, right_secret)
with pytest.raises(DecodeError) as exc:
jws.decode(jws_message)
- assert 'Signature verification' in str(exc.value)
+ assert "Signature verification" in str(exc.value)
- def test_verify_signature_with_no_algo_header_throws_exception(self, jws, payload):
+ def test_verify_signature_with_no_algo_header_throws_exception(
+ self, jws, payload
+ ):
example_jws = (
- b'e30'
- b'.eyJhIjo1fQ'
- b'.KEh186CjVw_Q8FadjJcaVnE7hO5Z9nHBbU8TgbhHcBY'
+ b"e30"
+ b".eyJhIjo1fQ"
+ b".KEh186CjVw_Q8FadjJcaVnE7hO5Z9nHBbU8TgbhHcBY"
)
with pytest.raises(InvalidAlgorithmError):
- jws.decode(example_jws, 'secret')
+ jws.decode(example_jws, "secret")
def test_invalid_crypto_alg(self, jws, payload):
with pytest.raises(NotImplementedError):
- jws.encode(payload, 'secret', algorithm='HS1024')
+ jws.encode(payload, "secret", algorithm="HS1024")
- @pytest.mark.skipif(has_crypto, reason='Scenario requires cryptography to not be installed')
+ @pytest.mark.skipif(
+ has_crypto, reason="Scenario requires cryptography to not be installed"
+ )
def test_missing_crypto_library_better_error_messages(self, jws, payload):
with pytest.raises(NotImplementedError) as excinfo:
- jws.encode(payload, 'secret', algorithm='RS256')
- assert 'cryptography' in str(excinfo.value)
+ jws.encode(payload, "secret", algorithm="RS256")
+ assert "cryptography" in str(excinfo.value)
def test_unicode_secret(self, jws, payload):
- secret = '\xc2'
+ secret = "\xc2"
jws_message = jws.encode(payload, secret)
decoded_payload = jws.decode(jws_message, secret)
assert decoded_payload == payload
def test_nonascii_secret(self, jws, payload):
- secret = '\xc2' # char value that ascii codec cannot decode
+ secret = "\xc2" # char value that ascii codec cannot decode
jws_message = jws.encode(payload, secret)
decoded_payload = jws.decode(jws_message, secret)
@@ -359,7 +387,7 @@ class TestJWS:
assert decoded_payload == payload
def test_bytes_secret(self, jws, payload):
- secret = b'\xc2' # char value that ascii codec cannot decode
+ secret = b"\xc2" # char value that ascii codec cannot decode
jws_message = jws.encode(payload, secret)
decoded_payload = jws.decode(jws_message, secret)
@@ -368,51 +396,55 @@ class TestJWS:
def test_decode_invalid_header_padding(self, jws):
example_jws = (
- 'aeyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
- '.eyJoZWxsbyI6ICJ3b3JsZCJ9'
- '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
- example_secret = 'secret'
+ "aeyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
+ ".eyJoZWxsbyI6ICJ3b3JsZCJ9"
+ ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8"
+ )
+ example_secret = "secret"
with pytest.raises(DecodeError) as exc:
jws.decode(example_jws, example_secret)
- assert 'header padding' in str(exc.value)
+ assert "header padding" in str(exc.value)
def test_decode_invalid_header_string(self, jws):
example_jws = (
- 'eyJhbGciOiAiSFMyNTbpIiwgInR5cCI6ICJKV1QifQ=='
- '.eyJoZWxsbyI6ICJ3b3JsZCJ9'
- '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
- example_secret = 'secret'
+ "eyJhbGciOiAiSFMyNTbpIiwgInR5cCI6ICJKV1QifQ=="
+ ".eyJoZWxsbyI6ICJ3b3JsZCJ9"
+ ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8"
+ )
+ example_secret = "secret"
with pytest.raises(DecodeError) as exc:
jws.decode(example_jws, example_secret)
- assert 'Invalid header' in str(exc.value)
+ assert "Invalid header" in str(exc.value)
def test_decode_invalid_payload_padding(self, jws):
example_jws = (
- 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
- '.aeyJoZWxsbyI6ICJ3b3JsZCJ9'
- '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
- example_secret = 'secret'
+ "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
+ ".aeyJoZWxsbyI6ICJ3b3JsZCJ9"
+ ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8"
+ )
+ example_secret = "secret"
with pytest.raises(DecodeError) as exc:
jws.decode(example_jws, example_secret)
- assert 'Invalid payload padding' in str(exc.value)
+ assert "Invalid payload padding" in str(exc.value)
def test_decode_invalid_crypto_padding(self, jws):
example_jws = (
- 'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
- '.eyJoZWxsbyI6ICJ3b3JsZCJ9'
- '.aatvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
- example_secret = 'secret'
+ "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
+ ".eyJoZWxsbyI6ICJ3b3JsZCJ9"
+ ".aatvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8"
+ )
+ example_secret = "secret"
with pytest.raises(DecodeError) as exc:
jws.decode(example_jws, example_secret)
- assert 'Invalid crypto padding' in str(exc.value)
+ assert "Invalid crypto padding" in str(exc.value)
def test_decode_with_algo_none_should_fail(self, jws, payload):
jws_message = jws.encode(payload, key=None, algorithm=None)
@@ -420,95 +452,122 @@ class TestJWS:
with pytest.raises(DecodeError):
jws.decode(jws_message)
- def test_decode_with_algo_none_and_verify_false_should_pass(self, jws, payload):
+ def test_decode_with_algo_none_and_verify_false_should_pass(
+ self, jws, payload
+ ):
jws_message = jws.encode(payload, key=None, algorithm=None)
jws.decode(jws_message, verify=False)
def test_get_unverified_header_returns_header_values(self, jws, payload):
- jws_message = jws.encode(payload, key='secret', algorithm='HS256',
- headers={'kid': 'toomanysecrets'})
+ jws_message = jws.encode(
+ payload,
+ key="secret",
+ algorithm="HS256",
+ headers={"kid": "toomanysecrets"},
+ )
header = jws.get_unverified_header(jws_message)
- assert 'kid' in header
- assert header['kid'] == 'toomanysecrets'
+ assert "kid" in header
+ assert header["kid"] == "toomanysecrets"
- def test_get_unverified_header_fails_on_bad_header_types(self, jws, payload):
+ def test_get_unverified_header_fails_on_bad_header_types(
+ self, jws, payload
+ ):
# Contains a bad kid value (int 123 instead of string)
example_jws = (
- 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6MTIzfQ'
- '.eyJzdWIiOiIxMjM0NTY3ODkwIn0'
- '.vs2WY54jfpKP3JGC73Vq5YlMsqM5oTZ1ZydT77SiZSk')
+ "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6MTIzfQ"
+ ".eyJzdWIiOiIxMjM0NTY3ODkwIn0"
+ ".vs2WY54jfpKP3JGC73Vq5YlMsqM5oTZ1ZydT77SiZSk"
+ )
with pytest.raises(InvalidTokenError) as exc:
jws.get_unverified_header(example_jws)
- assert 'Key ID header parameter must be a string' == str(exc.value)
+ assert "Key ID header parameter must be a string" == str(exc.value)
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_encode_decode_with_rsa_sha256(self, jws, payload):
# PEM-formatted RSA key
- with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
- priv_rsakey = load_pem_private_key(force_bytes(rsa_priv_file.read()),
- password=None, backend=default_backend())
- jws_message = jws.encode(payload, priv_rsakey, algorithm='RS256')
+ with open("tests/keys/testkey_rsa", "r") as rsa_priv_file:
+ priv_rsakey = load_pem_private_key(
+ force_bytes(rsa_priv_file.read()),
+ password=None,
+ backend=default_backend(),
+ )
+ jws_message = jws.encode(payload, priv_rsakey, algorithm="RS256")
- with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
- pub_rsakey = load_ssh_public_key(force_bytes(rsa_pub_file.read()),
- backend=default_backend())
+ with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file:
+ pub_rsakey = load_ssh_public_key(
+ force_bytes(rsa_pub_file.read()), backend=default_backend()
+ )
jws.decode(jws_message, pub_rsakey)
# string-formatted key
- with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
+ with open("tests/keys/testkey_rsa", "r") as rsa_priv_file:
priv_rsakey = rsa_priv_file.read()
- jws_message = jws.encode(payload, priv_rsakey, algorithm='RS256')
+ jws_message = jws.encode(payload, priv_rsakey, algorithm="RS256")
- with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
+ with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file:
pub_rsakey = rsa_pub_file.read()
jws.decode(jws_message, pub_rsakey)
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_encode_decode_with_rsa_sha384(self, jws, payload):
# PEM-formatted RSA key
- with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
- priv_rsakey = load_pem_private_key(force_bytes(rsa_priv_file.read()),
- password=None, backend=default_backend())
- jws_message = jws.encode(payload, priv_rsakey, algorithm='RS384')
-
- with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
- pub_rsakey = load_ssh_public_key(force_bytes(rsa_pub_file.read()),
- backend=default_backend())
+ with open("tests/keys/testkey_rsa", "r") as rsa_priv_file:
+ priv_rsakey = load_pem_private_key(
+ force_bytes(rsa_priv_file.read()),
+ password=None,
+ backend=default_backend(),
+ )
+ jws_message = jws.encode(payload, priv_rsakey, algorithm="RS384")
+
+ with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file:
+ pub_rsakey = load_ssh_public_key(
+ force_bytes(rsa_pub_file.read()), backend=default_backend()
+ )
jws.decode(jws_message, pub_rsakey)
# string-formatted key
- with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
+ with open("tests/keys/testkey_rsa", "r") as rsa_priv_file:
priv_rsakey = rsa_priv_file.read()
- jws_message = jws.encode(payload, priv_rsakey, algorithm='RS384')
+ jws_message = jws.encode(payload, priv_rsakey, algorithm="RS384")
- with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
+ with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file:
pub_rsakey = rsa_pub_file.read()
jws.decode(jws_message, pub_rsakey)
- @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
+ @pytest.mark.skipif(
+ not has_crypto, reason="Not supported without cryptography library"
+ )
def test_encode_decode_with_rsa_sha512(self, jws, payload):
# PEM-formatted RSA key
- with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
- priv_rsakey = load_pem_private_key(force_bytes(rsa_priv_file.read()),
- password=None, backend=default_backend())
- jws_message = jws.encode(payload, priv_rsakey, algorithm='RS512')
-
- with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
- pub_rsakey = load_ssh_public_key(force_bytes(rsa_pub_file.read()),
- backend=default_backend())
+ with open("tests/keys/testkey_rsa", "r") as rsa_priv_file:
+ priv_rsakey = load_pem_private_key(
+ force_bytes(rsa_priv_file.read()),
+ password=None,
+ backend=default_backend(),
+ )
+ jws_message = jws.encode(payload, priv_rsakey, algorithm="RS512")
+
+ with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file:
+ pub_rsakey = load_ssh_public_key(
+ force_bytes(rsa_pub_file.read()), backend=default_backend()
+ )
jws.decode(jws_message, pub_rsakey)
# string-formatted key
- with open('tests/keys/testkey_rsa', 'r') as rsa_priv_file:
+ with open("tests/keys/testkey_rsa", "r") as rsa_priv_file:
priv_rsakey = rsa_priv_file.read()
- jws_message = jws.encode(payload, priv_rsakey, algorithm='RS512')
+ jws_message = jws.encode(payload, priv_rsakey, algorithm="RS512")
- with open('tests/keys/testkey_rsa.pub', 'r') as rsa_pub_file:
+ with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file:
pub_rsakey = rsa_pub_file.read()
jws.decode(jws_message, pub_rsakey)
@@ -517,84 +576,103 @@ class TestJWS:
jws_algorithms = jws.get_algorithms()
if has_crypto:
- assert 'RS256' in jws_algorithms
- assert 'RS384' in jws_algorithms
- assert 'RS512' in jws_algorithms
- assert 'PS256' in jws_algorithms
- assert 'PS384' in jws_algorithms
- assert 'PS512' in jws_algorithms
+ assert "RS256" in jws_algorithms
+ assert "RS384" in jws_algorithms
+ assert "RS512" in jws_algorithms
+ assert "PS256" in jws_algorithms
+ assert "PS384" in jws_algorithms
+ assert "PS512" in jws_algorithms
else:
- assert 'RS256' not in jws_algorithms
- assert 'RS384' not in jws_algorithms
- assert 'RS512' not in jws_algorithms
- assert 'PS256' not in jws_algorithms
- assert 'PS384' not in jws_algorithms
- assert 'PS512' not in jws_algorithms
-
- @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library")
+ assert "RS256" not in jws_algorithms
+ assert "RS384" not in jws_algorithms
+ assert "RS512" not in jws_algorithms
+ assert "PS256" not in jws_algorithms
+ assert "PS384" not in jws_algorithms
+ assert "PS512" not in jws_algorithms
+
+ @pytest.mark.skipif(
+ not has_crypto, reason="Can't run without cryptography library"
+ )
def test_encode_decode_with_ecdsa_sha256(self, jws, payload):
# PEM-formatted EC key
- with open('tests/keys/testkey_ec', 'r') as ec_priv_file:
- priv_eckey = load_pem_private_key(force_bytes(ec_priv_file.read()),
- password=None, backend=default_backend())
- jws_message = jws.encode(payload, priv_eckey, algorithm='ES256')
-
- with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file:
- pub_eckey = load_pem_public_key(force_bytes(ec_pub_file.read()),
- backend=default_backend())
+ with open("tests/keys/testkey_ec", "r") as ec_priv_file:
+ priv_eckey = load_pem_private_key(
+ force_bytes(ec_priv_file.read()),
+ password=None,
+ backend=default_backend(),
+ )
+ jws_message = jws.encode(payload, priv_eckey, algorithm="ES256")
+
+ with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file:
+ pub_eckey = load_pem_public_key(
+ force_bytes(ec_pub_file.read()), backend=default_backend()
+ )
jws.decode(jws_message, pub_eckey)
# string-formatted key
- with open('tests/keys/testkey_ec', 'r') as ec_priv_file:
+ with open("tests/keys/testkey_ec", "r") as ec_priv_file:
priv_eckey = ec_priv_file.read()
- jws_message = jws.encode(payload, priv_eckey, algorithm='ES256')
+ jws_message = jws.encode(payload, priv_eckey, algorithm="ES256")
- with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file:
+ with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file:
pub_eckey = ec_pub_file.read()
jws.decode(jws_message, pub_eckey)
- @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library")
+ @pytest.mark.skipif(
+ not has_crypto, reason="Can't run without cryptography library"
+ )
def test_encode_decode_with_ecdsa_sha384(self, jws, payload):
# PEM-formatted EC key
- with open('tests/keys/testkey_ec', 'r') as ec_priv_file:
- priv_eckey = load_pem_private_key(force_bytes(ec_priv_file.read()),
- password=None, backend=default_backend())
- jws_message = jws.encode(payload, priv_eckey, algorithm='ES384')
-
- with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file:
- pub_eckey = load_pem_public_key(force_bytes(ec_pub_file.read()),
- backend=default_backend())
+ with open("tests/keys/testkey_ec", "r") as ec_priv_file:
+ priv_eckey = load_pem_private_key(
+ force_bytes(ec_priv_file.read()),
+ password=None,
+ backend=default_backend(),
+ )
+ jws_message = jws.encode(payload, priv_eckey, algorithm="ES384")
+
+ with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file:
+ pub_eckey = load_pem_public_key(
+ force_bytes(ec_pub_file.read()), backend=default_backend()
+ )
jws.decode(jws_message, pub_eckey)
# string-formatted key
- with open('tests/keys/testkey_ec', 'r') as ec_priv_file:
+ with open("tests/keys/testkey_ec", "r") as ec_priv_file:
priv_eckey = ec_priv_file.read()
- jws_message = jws.encode(payload, priv_eckey, algorithm='ES384')
+ jws_message = jws.encode(payload, priv_eckey, algorithm="ES384")
- with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file:
+ with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file:
pub_eckey = ec_pub_file.read()
jws.decode(jws_message, pub_eckey)
- @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library")
+ @pytest.mark.skipif(
+ not has_crypto, reason="Can't run without cryptography library"
+ )
def test_encode_decode_with_ecdsa_sha512(self, jws, payload):
# PEM-formatted EC key
- with open('tests/keys/testkey_ec', 'r') as ec_priv_file:
- priv_eckey = load_pem_private_key(force_bytes(ec_priv_file.read()),
- password=None, backend=default_backend())
- jws_message = jws.encode(payload, priv_eckey, algorithm='ES521')
+ with open("tests/keys/testkey_ec", "r") as ec_priv_file:
+ priv_eckey = load_pem_private_key(
+ force_bytes(ec_priv_file.read()),
+ password=None,
+ backend=default_backend(),
+ )
+ jws_message = jws.encode(payload, priv_eckey, algorithm="ES521")
- with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file:
- pub_eckey = load_pem_public_key(force_bytes(ec_pub_file.read()), backend=default_backend())
+ with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file:
+ pub_eckey = load_pem_public_key(
+ force_bytes(ec_pub_file.read()), backend=default_backend()
+ )
jws.decode(jws_message, pub_eckey)
# string-formatted key
- with open('tests/keys/testkey_ec', 'r') as ec_priv_file:
+ with open("tests/keys/testkey_ec", "r") as ec_priv_file:
priv_eckey = ec_priv_file.read()
- jws_message = jws.encode(payload, priv_eckey, algorithm='ES521')
+ jws_message = jws.encode(payload, priv_eckey, algorithm="ES521")
- with open('tests/keys/testkey_ec.pub', 'r') as ec_pub_file:
+ with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file:
pub_eckey = ec_pub_file.read()
jws.decode(jws_message, pub_eckey)
@@ -603,62 +681,61 @@ class TestJWS:
jws_algorithms = jws.get_algorithms()
if has_crypto:
- assert 'ES256' in jws_algorithms
- assert 'ES384' in jws_algorithms
- assert 'ES521' in jws_algorithms
+ assert "ES256" in jws_algorithms
+ assert "ES384" in jws_algorithms
+ assert "ES521" in jws_algorithms
else:
- assert 'ES256' not in jws_algorithms
- assert 'ES384' not in jws_algorithms
- assert 'ES521' not in jws_algorithms
+ assert "ES256" not in jws_algorithms
+ assert "ES384" not in jws_algorithms
+ assert "ES521" not in jws_algorithms
def test_skip_check_signature(self, jws):
- token = ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
- ".eyJzb21lIjoicGF5bG9hZCJ9"
- ".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA")
- jws.decode(token, 'secret', options={'verify_signature': False})
+ token = (
+ "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
+ ".eyJzb21lIjoicGF5bG9hZCJ9"
+ ".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA"
+ )
+ jws.decode(token, "secret", options={"verify_signature": False})
def test_decode_options_must_be_dict(self, jws, payload):
- token = jws.encode(payload, 'secret')
+ token = jws.encode(payload, "secret")
with pytest.raises(TypeError):
- jws.decode(token, 'secret', options=object())
+ jws.decode(token, "secret", options=object())
with pytest.raises(TypeError):
- jws.decode(token, 'secret', options='something')
+ jws.decode(token, "secret", options="something")
def test_custom_json_encoder(self, jws, payload):
-
class CustomJSONEncoder(json.JSONEncoder):
-
def default(self, o):
if isinstance(o, Decimal):
- return 'it worked'
+ return "it worked"
return super(CustomJSONEncoder, self).default(o)
- data = {
- 'some_decimal': Decimal('2.2')
- }
+ data = {"some_decimal": Decimal("2.2")}
with pytest.raises(TypeError):
- jws.encode(payload, 'secret', headers=data)
+ jws.encode(payload, "secret", headers=data)
- token = jws.encode(payload, 'secret', headers=data,
- json_encoder=CustomJSONEncoder)
+ token = jws.encode(
+ payload, "secret", headers=data, json_encoder=CustomJSONEncoder
+ )
- header = force_bytes(force_unicode(token).split('.')[0])
+ header = force_bytes(force_unicode(token).split(".")[0])
header = json.loads(force_unicode(base64url_decode(header)))
- assert 'some_decimal' in header
- assert header['some_decimal'] == 'it worked'
+ assert "some_decimal" in header
+ assert header["some_decimal"] == "it worked"
def test_encode_headers_parameter_adds_headers(self, jws, payload):
- headers = {'testheader': True}
- token = jws.encode(payload, 'secret', headers=headers)
+ headers = {"testheader": True}
+ token = jws.encode(payload, "secret", headers=headers)
if not isinstance(token, string_types):
token = token.decode()
- header = token[0:token.index('.')].encode()
+ header = token[0 : token.index(".")].encode()
header = base64url_decode(header)
if not isinstance(header, text_type):
@@ -666,16 +743,16 @@ class TestJWS:
header_obj = json.loads(header)
- assert 'testheader' in header_obj
- assert header_obj['testheader'] == headers['testheader']
+ assert "testheader" in header_obj
+ assert header_obj["testheader"] == headers["testheader"]
def test_encode_fails_on_invalid_kid_types(self, jws, payload):
with pytest.raises(InvalidTokenError) as exc:
- jws.encode(payload, 'secret', headers={'kid': 123})
+ jws.encode(payload, "secret", headers={"kid": 123})
- assert 'Key ID header parameter must be a string' == str(exc.value)
+ assert "Key ID header parameter must be a string" == str(exc.value)
with pytest.raises(InvalidTokenError) as exc:
- jws.encode(payload, 'secret', headers={'kid': None})
+ jws.encode(payload, "secret", headers={"kid": None})
- assert 'Key ID header parameter must be a string' == str(exc.value)
+ assert "Key ID header parameter must be a string" == str(exc.value)
diff --git a/tests/test_api_jwt.py b/tests/test_api_jwt.py
index 8d07f3f..e4065c6 100644
--- a/tests/test_api_jwt.py
+++ b/tests/test_api_jwt.py
@@ -1,19 +1,22 @@
-
import json
import time
from calendar import timegm
from datetime import datetime, timedelta
from decimal import Decimal
+import pytest
+
from jwt.api_jwt import PyJWT
from jwt.exceptions import (
- DecodeError, ExpiredSignatureError, ImmatureSignatureError,
- InvalidAudienceError, InvalidIssuedAtError, InvalidIssuerError,
- MissingRequiredClaimError
+ DecodeError,
+ ExpiredSignatureError,
+ ImmatureSignatureError,
+ InvalidAudienceError,
+ InvalidIssuedAtError,
+ InvalidIssuerError,
+ MissingRequiredClaimError,
)
-import pytest
-
from .test_api_jws import has_crypto
from .utils import utc_timestamp
@@ -26,32 +29,30 @@ def jwt():
@pytest.fixture
def payload():
""" Creates a sample JWT claimset for use as a payload during tests """
- return {
- 'iss': 'jeff',
- 'exp': utc_timestamp() + 15,
- 'claim': 'insanity'
- }
+ return {"iss": "jeff", "exp": utc_timestamp() + 15, "claim": "insanity"}
class TestJWT:
def test_decodes_valid_jwt(self, jwt):
- example_payload = {'hello': 'world'}
- example_secret = 'secret'
+ example_payload = {"hello": "world"}
+ example_secret = "secret"
example_jwt = (
- b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
- b'.eyJoZWxsbyI6ICJ3b3JsZCJ9'
- b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
+ b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
+ b".eyJoZWxsbyI6ICJ3b3JsZCJ9"
+ b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8"
+ )
decoded_payload = jwt.decode(example_jwt, example_secret)
assert decoded_payload == example_payload
def test_load_verify_valid_jwt(self, jwt):
- example_payload = {'hello': 'world'}
- example_secret = 'secret'
+ example_payload = {"hello": "world"}
+ example_secret = "secret"
example_jwt = (
- b'eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
- b'.eyJoZWxsbyI6ICJ3b3JsZCJ9'
- b'.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
+ b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
+ b".eyJoZWxsbyI6ICJ3b3JsZCJ9"
+ b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8"
+ )
decoded_payload = jwt.decode(example_jwt, key=example_secret)
@@ -59,134 +60,157 @@ class TestJWT:
def test_decode_invalid_payload_string(self, jwt):
example_jwt = (
- 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.aGVsb'
- 'G8gd29ybGQ.SIr03zM64awWRdPrAM_61QWsZchAtgDV'
- '3pphfHPPWkI')
- example_secret = 'secret'
+ "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.aGVsb"
+ "G8gd29ybGQ.SIr03zM64awWRdPrAM_61QWsZchAtgDV"
+ "3pphfHPPWkI"
+ )
+ example_secret = "secret"
with pytest.raises(DecodeError) as exc:
jwt.decode(example_jwt, example_secret)
- assert 'Invalid payload string' in str(exc.value)
+ assert "Invalid payload string" in str(exc.value)
def test_decode_with_non_mapping_payload_throws_exception(self, jwt):
- secret = 'secret'
- example_jwt = ('eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.'
- 'MQ.' # == 1
- 'AbcSR3DWum91KOgfKxUHm78rLs_DrrZ1CrDgpUFFzls')
+ secret = "secret"
+ example_jwt = (
+ "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9."
+ "MQ." # == 1
+ "AbcSR3DWum91KOgfKxUHm78rLs_DrrZ1CrDgpUFFzls"
+ )
with pytest.raises(DecodeError) as context:
jwt.decode(example_jwt, secret)
exception = context.value
- assert str(exception) == 'Invalid payload string: must be a json object'
+ assert (
+ str(exception) == "Invalid payload string: must be a json object"
+ )
def test_decode_with_invalid_audience_param_throws_exception(self, jwt):
- secret = 'secret'
- example_jwt = ('eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9'
- '.eyJoZWxsbyI6ICJ3b3JsZCJ9'
- '.tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8')
+ secret = "secret"
+ example_jwt = (
+ "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
+ ".eyJoZWxsbyI6ICJ3b3JsZCJ9"
+ ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8"
+ )
with pytest.raises(TypeError) as context:
jwt.decode(example_jwt, secret, audience=1)
exception = context.value
- assert str(exception) == 'audience must be a string, iterable, or None'
+ assert str(exception) == "audience must be a string, iterable, or None"
def test_decode_with_nonlist_aud_claim_throws_exception(self, jwt):
- secret = 'secret'
- example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9'
- '.eyJoZWxsbyI6IndvcmxkIiwiYXVkIjoxfQ' # aud = 1
- '.Rof08LBSwbm8Z_bhA2N3DFY-utZR1Gi9rbIS5Zthnnc')
+ secret = "secret"
+ example_jwt = (
+ "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
+ ".eyJoZWxsbyI6IndvcmxkIiwiYXVkIjoxfQ" # aud = 1
+ ".Rof08LBSwbm8Z_bhA2N3DFY-utZR1Gi9rbIS5Zthnnc"
+ )
with pytest.raises(InvalidAudienceError) as context:
- jwt.decode(example_jwt, secret, audience='my_audience')
+ jwt.decode(example_jwt, secret, audience="my_audience")
exception = context.value
- assert str(exception) == 'Invalid claim format in token'
+ assert str(exception) == "Invalid claim format in token"
def test_decode_with_invalid_aud_list_member_throws_exception(self, jwt):
- secret = 'secret'
- example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9'
- '.eyJoZWxsbyI6IndvcmxkIiwiYXVkIjpbMV19'
- '.iQgKpJ8shetwNMIosNXWBPFB057c2BHs-8t1d2CCM2A')
+ secret = "secret"
+ example_jwt = (
+ "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
+ ".eyJoZWxsbyI6IndvcmxkIiwiYXVkIjpbMV19"
+ ".iQgKpJ8shetwNMIosNXWBPFB057c2BHs-8t1d2CCM2A"
+ )
with pytest.raises(InvalidAudienceError) as context:
- jwt.decode(example_jwt, secret, audience='my_audience')
+ jwt.decode(example_jwt, secret, audience="my_audience")
exception = context.value
- assert str(exception) == 'Invalid claim format in token'
+ assert str(exception) == "Invalid claim format in token"
def test_encode_bad_type(self, jwt):
- types = ['string', tuple(), list(), 42, set()]
+ types = ["string", tuple(), list(), 42, set()]
for t in types:
- pytest.raises(TypeError, lambda: jwt.encode(t, 'secret'))
+ pytest.raises(TypeError, lambda: jwt.encode(t, "secret"))
def test_decode_raises_exception_if_exp_is_not_int(self, jwt):
# >>> jwt.encode({'exp': 'not-an-int'}, 'secret')
- example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.'
- 'eyJleHAiOiJub3QtYW4taW50In0.'
- 'P65iYgoHtBqB07PMtBSuKNUEIPPPfmjfJG217cEE66s')
+ example_jwt = (
+ "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9."
+ "eyJleHAiOiJub3QtYW4taW50In0."
+ "P65iYgoHtBqB07PMtBSuKNUEIPPPfmjfJG217cEE66s"
+ )
with pytest.raises(DecodeError) as exc:
- jwt.decode(example_jwt, 'secret')
+ jwt.decode(example_jwt, "secret")
- assert 'exp' in str(exc.value)
+ assert "exp" in str(exc.value)
def test_decode_raises_exception_if_iat_is_not_int(self, jwt):
# >>> jwt.encode({'iat': 'not-an-int'}, 'secret')
- example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.'
- 'eyJpYXQiOiJub3QtYW4taW50In0.'
- 'H1GmcQgSySa5LOKYbzGm--b1OmRbHFkyk8pq811FzZM')
+ example_jwt = (
+ "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9."
+ "eyJpYXQiOiJub3QtYW4taW50In0."
+ "H1GmcQgSySa5LOKYbzGm--b1OmRbHFkyk8pq811FzZM"
+ )
with pytest.raises(InvalidIssuedAtError):
- jwt.decode(example_jwt, 'secret')
+ jwt.decode(example_jwt, "secret")
def test_decode_raises_exception_if_nbf_is_not_int(self, jwt):
# >>> jwt.encode({'nbf': 'not-an-int'}, 'secret')
- example_jwt = ('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.'
- 'eyJuYmYiOiJub3QtYW4taW50In0.'
- 'c25hldC8G2ZamC8uKpax9sYMTgdZo3cxrmzFHaAAluw')
+ example_jwt = (
+ "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9."
+ "eyJuYmYiOiJub3QtYW4taW50In0."
+ "c25hldC8G2ZamC8uKpax9sYMTgdZo3cxrmzFHaAAluw"
+ )
with pytest.raises(DecodeError):
- jwt.decode(example_jwt, 'secret')
+ jwt.decode(example_jwt, "secret")
def test_encode_datetime(self, jwt):
- secret = 'secret'
+ secret = "secret"
current_datetime = datetime.utcnow()
payload = {
- 'exp': current_datetime,
- 'iat': current_datetime,
- 'nbf': current_datetime
+ "exp": current_datetime,
+ "iat": current_datetime,
+ "nbf": current_datetime,
}
jwt_message = jwt.encode(payload, secret)
decoded_payload = jwt.decode(jwt_message, secret, leeway=1)
- assert (decoded_payload['exp'] ==
- timegm(current_datetime.utctimetuple()))
- assert (decoded_payload['iat'] ==
- timegm(current_datetime.utctimetuple()))
- assert (decoded_payload['nbf'] ==
- timegm(current_datetime.utctimetuple()))
+ assert decoded_payload["exp"] == timegm(
+ current_datetime.utctimetuple()
+ )
+ assert decoded_payload["iat"] == timegm(
+ current_datetime.utctimetuple()
+ )
+ assert decoded_payload["nbf"] == timegm(
+ current_datetime.utctimetuple()
+ )
# 'Control' Elliptic Curve JWT created by another library.
# Used to test for regressions that could affect both
# encoding / decoding operations equally (causing tests
# to still pass).
- @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library")
+ @pytest.mark.skipif(
+ not has_crypto, reason="Can't run without cryptography library"
+ )
def test_decodes_valid_es384_jwt(self, jwt):
- example_payload = {'hello': 'world'}
- with open('tests/keys/testkey_ec.pub', 'r') as fp:
+ example_payload = {"hello": "world"}
+ with open("tests/keys/testkey_ec.pub", "r") as fp:
example_pubkey = fp.read()
example_jwt = (
- b'eyJhbGciOiJFUzM4NCIsInR5cCI6IkpXVCJ9'
- b'.eyJoZWxsbyI6IndvcmxkIn0'
- b'.AddMgkmRhzqptDYqlmy_f2dzM6O9YZmVo-txs_CeAJD'
- b'NoD8LN7YiPeLmtIhkO5_VZeHHKvtQcGc4lsq-Y72c4dK'
- b'pANr1f6HEYhjpBc03u_bv06PYMcr5N2-9k97-qf-JCSb'
- b'zqW6R250Q7gNCX5R7NrCl7MTM4DTBZkGbUlqsFUleiGlj')
+ b"eyJhbGciOiJFUzM4NCIsInR5cCI6IkpXVCJ9"
+ b".eyJoZWxsbyI6IndvcmxkIn0"
+ b".AddMgkmRhzqptDYqlmy_f2dzM6O9YZmVo-txs_CeAJD"
+ b"NoD8LN7YiPeLmtIhkO5_VZeHHKvtQcGc4lsq-Y72c4dK"
+ b"pANr1f6HEYhjpBc03u_bv06PYMcr5N2-9k97-qf-JCSb"
+ b"zqW6R250Q7gNCX5R7NrCl7MTM4DTBZkGbUlqsFUleiGlj"
+ )
decoded_payload = jwt.decode(example_jwt, example_pubkey)
assert decoded_payload == example_payload
@@ -195,59 +219,62 @@ class TestJWT:
# Used to test for regressions that could affect both
# encoding / decoding operations equally (causing tests
# to still pass).
- @pytest.mark.skipif(not has_crypto, reason="Can't run without cryptography library")
+ @pytest.mark.skipif(
+ not has_crypto, reason="Can't run without cryptography library"
+ )
def test_decodes_valid_rs384_jwt(self, jwt):
- example_payload = {'hello': 'world'}
- with open('tests/keys/testkey_rsa.pub', 'r') as fp:
+ example_payload = {"hello": "world"}
+ with open("tests/keys/testkey_rsa.pub", "r") as fp:
example_pubkey = fp.read()
example_jwt = (
- b'eyJhbGciOiJSUzM4NCIsInR5cCI6IkpXVCJ9'
- b'.eyJoZWxsbyI6IndvcmxkIn0'
- b'.yNQ3nI9vEDs7lEh-Cp81McPuiQ4ZRv6FL4evTYYAh1X'
- b'lRTTR3Cz8pPA9Stgso8Ra9xGB4X3rlra1c8Jz10nTUju'
- b'O06OMm7oXdrnxp1KIiAJDerWHkQ7l3dlizIk1bmMA457'
- b'W2fNzNfHViuED5ISM081dgf_a71qBwJ_yShMMrSOfxDx'
- b'mX9c4DjRogRJG8SM5PvpLqI_Cm9iQPGMvmYK7gzcq2cJ'
- b'urHRJDJHTqIdpLWXkY7zVikeen6FhuGyn060Dz9gYq9t'
- b'uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr'
- b'qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A')
+ b"eyJhbGciOiJSUzM4NCIsInR5cCI6IkpXVCJ9"
+ b".eyJoZWxsbyI6IndvcmxkIn0"
+ b".yNQ3nI9vEDs7lEh-Cp81McPuiQ4ZRv6FL4evTYYAh1X"
+ b"lRTTR3Cz8pPA9Stgso8Ra9xGB4X3rlra1c8Jz10nTUju"
+ b"O06OMm7oXdrnxp1KIiAJDerWHkQ7l3dlizIk1bmMA457"
+ b"W2fNzNfHViuED5ISM081dgf_a71qBwJ_yShMMrSOfxDx"
+ b"mX9c4DjRogRJG8SM5PvpLqI_Cm9iQPGMvmYK7gzcq2cJ"
+ b"urHRJDJHTqIdpLWXkY7zVikeen6FhuGyn060Dz9gYq9t"
+ b"uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr"
+ b"qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A"
+ )
decoded_payload = jwt.decode(example_jwt, example_pubkey)
assert decoded_payload == example_payload
def test_decode_with_expiration(self, jwt, payload):
- payload['exp'] = utc_timestamp() - 1
- secret = 'secret'
+ payload["exp"] = utc_timestamp() - 1
+ secret = "secret"
jwt_message = jwt.encode(payload, secret)
with pytest.raises(ExpiredSignatureError):
jwt.decode(jwt_message, secret)
def test_decode_with_notbefore(self, jwt, payload):
- payload['nbf'] = utc_timestamp() + 10
- secret = 'secret'
+ payload["nbf"] = utc_timestamp() + 10
+ secret = "secret"
jwt_message = jwt.encode(payload, secret)
with pytest.raises(ImmatureSignatureError):
jwt.decode(jwt_message, secret)
def test_decode_skip_expiration_verification(self, jwt, payload):
- payload['exp'] = time.time() - 1
- secret = 'secret'
+ payload["exp"] = time.time() - 1
+ secret = "secret"
jwt_message = jwt.encode(payload, secret)
- jwt.decode(jwt_message, secret, options={'verify_exp': False})
+ jwt.decode(jwt_message, secret, options={"verify_exp": False})
def test_decode_skip_notbefore_verification(self, jwt, payload):
- payload['nbf'] = time.time() + 10
- secret = 'secret'
+ payload["nbf"] = time.time() + 10
+ secret = "secret"
jwt_message = jwt.encode(payload, secret)
- jwt.decode(jwt_message, secret, options={'verify_nbf': False})
+ jwt.decode(jwt_message, secret, options={"verify_nbf": False})
def test_decode_with_expiration_with_leeway(self, jwt, payload):
- payload['exp'] = utc_timestamp() - 2
- secret = 'secret'
+ payload["exp"] = utc_timestamp() - 2
+ secret = "secret"
jwt_message = jwt.encode(payload, secret)
decoded_payload, signing, header, signature = jwt._load(jwt_message)
@@ -262,8 +289,8 @@ class TestJWT:
jwt.decode(jwt_message, secret, leeway=leeway)
def test_decode_with_notbefore_with_leeway(self, jwt, payload):
- payload['nbf'] = utc_timestamp() + 10
- secret = 'secret'
+ payload["nbf"] = utc_timestamp() + 10
+ secret = "secret"
jwt_message = jwt.encode(payload, secret)
# With 13 seconds leeway, should be ok
@@ -273,248 +300,211 @@ class TestJWT:
jwt.decode(jwt_message, secret, leeway=1)
def test_check_audience_when_valid(self, jwt):
- payload = {
- 'some': 'payload',
- 'aud': 'urn:me'
- }
- token = jwt.encode(payload, 'secret')
- jwt.decode(token, 'secret', audience='urn:me')
+ payload = {"some": "payload", "aud": "urn:me"}
+ token = jwt.encode(payload, "secret")
+ jwt.decode(token, "secret", audience="urn:me")
def test_check_audience_list_when_valid(self, jwt):
- payload = {
- 'some': 'payload',
- 'aud': 'urn:me'
- }
- token = jwt.encode(payload, 'secret')
- jwt.decode(token, 'secret', audience=['urn:you', 'urn:me'])
+ payload = {"some": "payload", "aud": "urn:me"}
+ token = jwt.encode(payload, "secret")
+ jwt.decode(token, "secret", audience=["urn:you", "urn:me"])
def test_check_audience_none_specified(self, jwt):
- payload = {
- 'some': 'payload',
- 'aud': 'urn:me'
- }
- token = jwt.encode(payload, 'secret')
+ payload = {"some": "payload", "aud": "urn:me"}
+ token = jwt.encode(payload, "secret")
with pytest.raises(InvalidAudienceError):
- jwt.decode(token, 'secret')
+ jwt.decode(token, "secret")
def test_raise_exception_invalid_audience_list(self, jwt):
- payload = {
- 'some': 'payload',
- 'aud': 'urn:me'
- }
- token = jwt.encode(payload, 'secret')
+ payload = {"some": "payload", "aud": "urn:me"}
+ token = jwt.encode(payload, "secret")
with pytest.raises(InvalidAudienceError):
- jwt.decode(token, 'secret', audience=['urn:you', 'urn:him'])
+ jwt.decode(token, "secret", audience=["urn:you", "urn:him"])
def test_check_audience_in_array_when_valid(self, jwt):
- payload = {
- 'some': 'payload',
- 'aud': ['urn:me', 'urn:someone-else']
- }
- token = jwt.encode(payload, 'secret')
- jwt.decode(token, 'secret', audience='urn:me')
+ payload = {"some": "payload", "aud": ["urn:me", "urn:someone-else"]}
+ token = jwt.encode(payload, "secret")
+ jwt.decode(token, "secret", audience="urn:me")
def test_raise_exception_invalid_audience(self, jwt):
- payload = {
- 'some': 'payload',
- 'aud': 'urn:someone-else'
- }
+ payload = {"some": "payload", "aud": "urn:someone-else"}
- token = jwt.encode(payload, 'secret')
+ token = jwt.encode(payload, "secret")
with pytest.raises(InvalidAudienceError):
- jwt.decode(token, 'secret', audience='urn-me')
+ jwt.decode(token, "secret", audience="urn-me")
def test_raise_exception_invalid_audience_in_array(self, jwt):
payload = {
- 'some': 'payload',
- 'aud': ['urn:someone', 'urn:someone-else']
+ "some": "payload",
+ "aud": ["urn:someone", "urn:someone-else"],
}
- token = jwt.encode(payload, 'secret')
+ token = jwt.encode(payload, "secret")
with pytest.raises(InvalidAudienceError):
- jwt.decode(token, 'secret', audience='urn:me')
+ jwt.decode(token, "secret", audience="urn:me")
def test_raise_exception_token_without_issuer(self, jwt):
- issuer = 'urn:wrong'
+ issuer = "urn:wrong"
- payload = {
- 'some': 'payload'
- }
+ payload = {"some": "payload"}
- token = jwt.encode(payload, 'secret')
+ token = jwt.encode(payload, "secret")
with pytest.raises(MissingRequiredClaimError) as exc:
- jwt.decode(token, 'secret', issuer=issuer)
+ jwt.decode(token, "secret", issuer=issuer)
- assert exc.value.claim == 'iss'
+ assert exc.value.claim == "iss"
def test_raise_exception_token_without_audience(self, jwt):
- payload = {
- 'some': 'payload',
- }
- token = jwt.encode(payload, 'secret')
+ payload = {"some": "payload"}
+ token = jwt.encode(payload, "secret")
with pytest.raises(MissingRequiredClaimError) as exc:
- jwt.decode(token, 'secret', audience='urn:me')
+ jwt.decode(token, "secret", audience="urn:me")
- assert exc.value.claim == 'aud'
+ assert exc.value.claim == "aud"
def test_check_issuer_when_valid(self, jwt):
- issuer = 'urn:foo'
- payload = {
- 'some': 'payload',
- 'iss': 'urn:foo'
- }
- token = jwt.encode(payload, 'secret')
- jwt.decode(token, 'secret', issuer=issuer)
+ issuer = "urn:foo"
+ payload = {"some": "payload", "iss": "urn:foo"}
+ token = jwt.encode(payload, "secret")
+ jwt.decode(token, "secret", issuer=issuer)
def test_raise_exception_invalid_issuer(self, jwt):
- issuer = 'urn:wrong'
+ issuer = "urn:wrong"
- payload = {
- 'some': 'payload',
- 'iss': 'urn:foo'
- }
+ payload = {"some": "payload", "iss": "urn:foo"}
- token = jwt.encode(payload, 'secret')
+ token = jwt.encode(payload, "secret")
with pytest.raises(InvalidIssuerError):
- jwt.decode(token, 'secret', issuer=issuer)
+ jwt.decode(token, "secret", issuer=issuer)
def test_skip_check_audience(self, jwt):
- payload = {
- 'some': 'payload',
- 'aud': 'urn:me',
- }
- token = jwt.encode(payload, 'secret')
- jwt.decode(token, 'secret', options={'verify_aud': False})
+ payload = {"some": "payload", "aud": "urn:me"}
+ token = jwt.encode(payload, "secret")
+ jwt.decode(token, "secret", options={"verify_aud": False})
def test_skip_check_exp(self, jwt):
payload = {
- 'some': 'payload',
- 'exp': datetime.utcnow() - timedelta(days=1)
+ "some": "payload",
+ "exp": datetime.utcnow() - timedelta(days=1),
}
- token = jwt.encode(payload, 'secret')
- jwt.decode(token, 'secret', options={'verify_exp': False})
+ token = jwt.encode(payload, "secret")
+ jwt.decode(token, "secret", options={"verify_exp": False})
- def test_decode_should_raise_error_if_exp_required_but_not_present(self, jwt):
+ def test_decode_should_raise_error_if_exp_required_but_not_present(
+ self, jwt
+ ):
payload = {
- 'some': 'payload',
+ "some": "payload",
# exp not present
}
- token = jwt.encode(payload, 'secret')
+ token = jwt.encode(payload, "secret")
with pytest.raises(MissingRequiredClaimError) as exc:
- jwt.decode(token, 'secret', options={'require_exp': True})
+ jwt.decode(token, "secret", options={"require_exp": True})
- assert exc.value.claim == 'exp'
+ assert exc.value.claim == "exp"
- def test_decode_should_raise_error_if_iat_required_but_not_present(self, jwt):
+ def test_decode_should_raise_error_if_iat_required_but_not_present(
+ self, jwt
+ ):
payload = {
- 'some': 'payload',
+ "some": "payload",
# iat not present
}
- token = jwt.encode(payload, 'secret')
+ token = jwt.encode(payload, "secret")
with pytest.raises(MissingRequiredClaimError) as exc:
- jwt.decode(token, 'secret', options={'require_iat': True})
+ jwt.decode(token, "secret", options={"require_iat": True})
- assert exc.value.claim == 'iat'
+ assert exc.value.claim == "iat"
- def test_decode_should_raise_error_if_nbf_required_but_not_present(self, jwt):
+ def test_decode_should_raise_error_if_nbf_required_but_not_present(
+ self, jwt
+ ):
payload = {
- 'some': 'payload',
+ "some": "payload",
# nbf not present
}
- token = jwt.encode(payload, 'secret')
+ token = jwt.encode(payload, "secret")
with pytest.raises(MissingRequiredClaimError) as exc:
- jwt.decode(token, 'secret', options={'require_nbf': True})
+ jwt.decode(token, "secret", options={"require_nbf": True})
- assert exc.value.claim == 'nbf'
+ assert exc.value.claim == "nbf"
def test_skip_check_signature(self, jwt):
- token = ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
- ".eyJzb21lIjoicGF5bG9hZCJ9"
- ".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA")
- jwt.decode(token, 'secret', options={'verify_signature': False})
+ token = (
+ "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
+ ".eyJzb21lIjoicGF5bG9hZCJ9"
+ ".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA"
+ )
+ jwt.decode(token, "secret", options={"verify_signature": False})
def test_skip_check_iat(self, jwt):
payload = {
- 'some': 'payload',
- 'iat': datetime.utcnow() + timedelta(days=1)
+ "some": "payload",
+ "iat": datetime.utcnow() + timedelta(days=1),
}
- token = jwt.encode(payload, 'secret')
- jwt.decode(token, 'secret', options={'verify_iat': False})
+ token = jwt.encode(payload, "secret")
+ jwt.decode(token, "secret", options={"verify_iat": False})
def test_skip_check_nbf(self, jwt):
payload = {
- 'some': 'payload',
- 'nbf': datetime.utcnow() + timedelta(days=1)
+ "some": "payload",
+ "nbf": datetime.utcnow() + timedelta(days=1),
}
- token = jwt.encode(payload, 'secret')
- jwt.decode(token, 'secret', options={'verify_nbf': False})
+ token = jwt.encode(payload, "secret")
+ jwt.decode(token, "secret", options={"verify_nbf": False})
def test_custom_json_encoder(self, jwt):
-
class CustomJSONEncoder(json.JSONEncoder):
-
def default(self, o):
if isinstance(o, Decimal):
- return 'it worked'
+ return "it worked"
return super(CustomJSONEncoder, self).default(o)
- data = {
- 'some_decimal': Decimal('2.2')
- }
+ data = {"some_decimal": Decimal("2.2")}
with pytest.raises(TypeError):
- jwt.encode(data, 'secret')
+ jwt.encode(data, "secret")
- token = jwt.encode(data, 'secret', json_encoder=CustomJSONEncoder)
- payload = jwt.decode(token, 'secret')
+ token = jwt.encode(data, "secret", json_encoder=CustomJSONEncoder)
+ payload = jwt.decode(token, "secret")
- assert payload == {'some_decimal': 'it worked'}
+ assert payload == {"some_decimal": "it worked"}
def test_decode_with_verify_expiration_kwarg(self, jwt, payload):
- payload['exp'] = utc_timestamp() - 1
- secret = 'secret'
+ payload["exp"] = utc_timestamp() - 1
+ secret = "secret"
jwt_message = jwt.encode(payload, secret)
pytest.deprecated_call(
- jwt.decode,
- jwt_message,
- secret,
- verify_expiration=False
+ jwt.decode, jwt_message, secret, verify_expiration=False
)
with pytest.raises(ExpiredSignatureError):
pytest.deprecated_call(
- jwt.decode,
- jwt_message,
- secret,
- verify_expiration=True
+ jwt.decode, jwt_message, secret, verify_expiration=True
)
def test_decode_with_optional_algorithms(self, jwt, payload):
- secret = 'secret'
+ secret = "secret"
jwt_message = jwt.encode(payload, secret)
- pytest.deprecated_call(
- jwt.decode,
- jwt_message,
- secret
- )
+ pytest.deprecated_call(jwt.decode, jwt_message, secret)
def test_decode_no_algorithms_verify_false(self, jwt, payload):
- secret = 'secret'
+ secret = "secret"
jwt_message = jwt.encode(payload, secret)
try:
pytest.deprecated_call(
- jwt.decode, jwt_message, secret, verify=False,
+ jwt.decode, jwt_message, secret, verify=False
)
except pytest.fail.Exception:
pass
diff --git a/tests/test_cli.py b/tests/test_cli.py
index a89597d..6246c9a 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -1,25 +1,23 @@
-
import argparse
import json
import sys
+import pytest
+
import jwt
from jwt.__main__ import build_argparser, decode_payload, encode_payload, main
-import pytest
-
class TestCli:
-
def test_build_argparse(self):
- args = ['--key', '1234', 'encode', 'name=Vader']
+ args = ["--key", "1234", "encode", "name=Vader"]
parser = build_argparser()
parsed_args = parser.parse_args(args)
- assert parsed_args.key == '1234'
+ assert parsed_args.key == "1234"
def test_encode_payload_raises_value_error_key_is_required(self):
- encode_args = ['encode', 'name=Vader', 'job=Sith']
+ encode_args = ["encode", "name=Vader", "job=Sith"]
parser = build_argparser()
args = parser.parse_args(encode_args)
@@ -27,10 +25,10 @@ class TestCli:
with pytest.raises(ValueError) as excinfo:
encode_payload(args)
- assert 'Key is required when encoding' in str(excinfo.value)
+ assert "Key is required when encoding" in str(excinfo.value)
def test_decode_payload_raises_decoded_error(self):
- decode_args = ['--key', '1234', 'decode', 'wrong-token']
+ decode_args = ["--key", "1234", "decode", "wrong-token"]
parser = build_argparser()
args = parser.parse_args(decode_args)
@@ -38,121 +36,119 @@ class TestCli:
with pytest.raises(jwt.DecodeError) as excinfo:
decode_payload(args)
- assert 'There was an error decoding the token' in str(excinfo.value)
+ assert "There was an error decoding the token" in str(excinfo.value)
def test_decode_payload_raises_decoded_error_isatty(self, monkeypatch):
def patched_sys_stdin_read():
raise jwt.DecodeError()
- decode_args = ['--key', '1234', 'decode', 'wrong-token']
+ decode_args = ["--key", "1234", "decode", "wrong-token"]
parser = build_argparser()
args = parser.parse_args(decode_args)
- monkeypatch.setattr(sys.stdin, 'isatty', lambda: True)
- monkeypatch.setattr(sys.stdin, 'read', patched_sys_stdin_read)
+ monkeypatch.setattr(sys.stdin, "isatty", lambda: True)
+ monkeypatch.setattr(sys.stdin, "read", patched_sys_stdin_read)
with pytest.raises(jwt.DecodeError) as excinfo:
decode_payload(args)
- assert 'There was an error decoding the token' in str(excinfo.value)
+ assert "There was an error decoding the token" in str(excinfo.value)
def test_decode_payload_terminal_tty(self, monkeypatch):
- encode_args = [
- '--key=secret-key',
- 'encode',
- 'name=hello-world',
- ]
+ encode_args = ["--key=secret-key", "encode", "name=hello-world"]
parser = build_argparser()
parsed_encode_args = parser.parse_args(encode_args)
token = encode_payload(parsed_encode_args)
- decode_args = ['--key=secret-key', 'decode']
+ decode_args = ["--key=secret-key", "decode"]
parsed_decode_args = parser.parse_args(decode_args)
- monkeypatch.setattr(sys.stdin, 'isatty', lambda: True)
- monkeypatch.setattr(sys.stdin, 'readline', lambda: token)
+ monkeypatch.setattr(sys.stdin, "isatty", lambda: True)
+ monkeypatch.setattr(sys.stdin, "readline", lambda: token)
actual = json.loads(decode_payload(parsed_decode_args))
- assert actual['name'] == 'hello-world'
+ assert actual["name"] == "hello-world"
def test_decode_payload_raises_terminal_not_a_tty(self, monkeypatch):
- decode_args = ['--key', '1234', 'decode']
+ decode_args = ["--key", "1234", "decode"]
parser = build_argparser()
args = parser.parse_args(decode_args)
- monkeypatch.setattr(sys.stdin, 'isatty', lambda: False)
+ monkeypatch.setattr(sys.stdin, "isatty", lambda: False)
with pytest.raises(IOError) as excinfo:
decode_payload(args)
- assert 'Cannot read from stdin: terminal not a TTY' \
- in str(excinfo.value)
-
- @pytest.mark.parametrize('key,name,job,exp,verify', [
- ('1234', 'Vader', 'Sith', None, None),
- ('4567', 'Anakin', 'Jedi', '+1', None),
- ('4321', 'Padme', 'Queen', '4070926800', 'true'),
- ])
+ assert "Cannot read from stdin: terminal not a TTY" in str(
+ excinfo.value
+ )
+
+ @pytest.mark.parametrize(
+ "key,name,job,exp,verify",
+ [
+ ("1234", "Vader", "Sith", None, None),
+ ("4567", "Anakin", "Jedi", "+1", None),
+ ("4321", "Padme", "Queen", "4070926800", "true"),
+ ],
+ )
def test_encode_decode(self, key, name, job, exp, verify):
encode_args = [
- '--key={0}'.format(key),
- 'encode',
- 'name={0}'.format(name),
- 'job={0}'.format(job),
+ "--key={0}".format(key),
+ "encode",
+ "name={0}".format(name),
+ "job={0}".format(job),
]
if exp:
- encode_args.append('exp={0}'.format(exp))
+ encode_args.append("exp={0}".format(exp))
if verify:
- encode_args.append('verify={0}'.format(verify))
+ encode_args.append("verify={0}".format(verify))
parser = build_argparser()
parsed_encode_args = parser.parse_args(encode_args)
token = encode_payload(parsed_encode_args)
assert token is not None
- assert token != ''
+ assert token != ""
- decode_args = [
- '--key={0}'.format(key),
- 'decode',
- token
- ]
+ decode_args = ["--key={0}".format(key), "decode", token]
parser = build_argparser()
parsed_decode_args = parser.parse_args(decode_args)
actual = json.loads(decode_payload(parsed_decode_args))
- expected = {
- 'job': job,
- 'name': name,
- }
- assert actual['name'] == expected['name']
- assert actual['job'] == expected['job']
-
- @pytest.mark.parametrize('key,name,job,exp,verify', [
- ('1234', 'Vader', 'Sith', None, None),
- ('4567', 'Anakin', 'Jedi', '+1', None),
- ('4321', 'Padme', 'Queen', '4070926800', 'true'),
- ])
+ expected = {"job": job, "name": name}
+ assert actual["name"] == expected["name"]
+ assert actual["job"] == expected["job"]
+
+ @pytest.mark.parametrize(
+ "key,name,job,exp,verify",
+ [
+ ("1234", "Vader", "Sith", None, None),
+ ("4567", "Anakin", "Jedi", "+1", None),
+ ("4321", "Padme", "Queen", "4070926800", "true"),
+ ],
+ )
def test_main(self, monkeypatch, key, name, job, exp, verify):
args = [
- 'test_cli.py',
- '--key={0}'.format(key),
- 'encode',
- 'name={0}'.format(name),
- 'job={0}'.format(job),
+ "test_cli.py",
+ "--key={0}".format(key),
+ "encode",
+ "name={0}".format(name),
+ "job={0}".format(job),
]
if exp:
- args.append('exp={0}'.format(exp))
+ args.append("exp={0}".format(exp))
if verify:
- args.append('verify={0}'.format(verify))
- monkeypatch.setattr(sys, 'argv', args)
+ args.append("verify={0}".format(verify))
+ monkeypatch.setattr(sys, "argv", args)
main()
def test_main_throw_exception(self, monkeypatch, capsys):
def patched_argparser_parse_args(self, args):
- raise Exception('NOOOOOOOOOOO!')
+ raise Exception("NOOOOOOOOOOO!")
- monkeypatch.setattr(argparse.ArgumentParser, 'parse_args', patched_argparser_parse_args)
+ monkeypatch.setattr(
+ argparse.ArgumentParser, "parse_args", patched_argparser_parse_args
+ )
main()
out, _ = capsys.readouterr()
- assert 'NOOOOOOOOOOO!' in out
+ assert "NOOOOOOOOOOO!" in out
diff --git a/tests/test_compat.py b/tests/test_compat.py
index 10beb94..7a7516c 100644
--- a/tests/test_compat.py
+++ b/tests/test_compat.py
@@ -4,16 +4,14 @@ from jwt.utils import force_bytes
class TestCompat:
def test_constant_time_compare_returns_true_if_same(self):
- assert constant_time_compare(
- force_bytes('abc'), force_bytes('abc')
- )
+ assert constant_time_compare(force_bytes("abc"), force_bytes("abc"))
def test_constant_time_compare_returns_false_if_diff_lengths(self):
assert not constant_time_compare(
- force_bytes('abc'), force_bytes('abcd')
+ force_bytes("abc"), force_bytes("abcd")
)
def test_constant_time_compare_returns_false_if_totally_different(self):
assert not constant_time_compare(
- force_bytes('abcd'), force_bytes('efgh')
+ force_bytes("abcd"), force_bytes("efgh")
)
diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py
index 9e7f91e..90030a7 100644
--- a/tests/test_exceptions.py
+++ b/tests/test_exceptions.py
@@ -2,6 +2,6 @@ from jwt.exceptions import MissingRequiredClaimError
def test_missing_required_claim_error_has_proper_str():
- exc = MissingRequiredClaimError('abc')
+ exc = MissingRequiredClaimError("abc")
assert str(exc) == 'Token is missing the "abc" claim'
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index 95ea5de..db96f46 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -10,13 +10,9 @@ def test_encode_decode():
tests. This is primarily a sanity check to make sure we don't break the
public global functions.
"""
- payload = {
- 'iss': 'jeff',
- 'exp': utc_timestamp() + 15,
- 'claim': 'insanity'
- }
+ payload = {"iss": "jeff", "exp": utc_timestamp() + 15, "claim": "insanity"}
- secret = 'secret'
+ secret = "secret"
jwt_message = jwt.encode(payload, secret)
decoded_payload = jwt.decode(jwt_message, secret)
diff --git a/tests/test_utils.py b/tests/test_utils.py
index 1408af2..cb73c52 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -1,30 +1,39 @@
+import pytest
+
from jwt.utils import (
- force_bytes, force_unicode, from_base64url_uint, to_base64url_uint
+ force_bytes,
+ force_unicode,
+ from_base64url_uint,
+ to_base64url_uint,
)
-import pytest
-
-@pytest.mark.parametrize("inputval,expected", [
- (0, b'AA'),
- (1, b'AQ'),
- (255, b'_w'),
- (65537, b'AQAB'),
- (123456789, b'B1vNFQ'),
- pytest.param(-1, '', marks=pytest.mark.xfail(raises=ValueError))
-])
+@pytest.mark.parametrize(
+ "inputval,expected",
+ [
+ (0, b"AA"),
+ (1, b"AQ"),
+ (255, b"_w"),
+ (65537, b"AQAB"),
+ (123456789, b"B1vNFQ"),
+ pytest.param(-1, "", marks=pytest.mark.xfail(raises=ValueError)),
+ ],
+)
def test_to_base64url_uint(inputval, expected):
actual = to_base64url_uint(inputval)
assert actual == expected
-@pytest.mark.parametrize("inputval,expected", [
- (b'AA', 0),
- (b'AQ', 1),
- (b'_w', 255),
- (b'AQAB', 65537),
- (b'B1vNFQ', 123456789, ),
-])
+@pytest.mark.parametrize(
+ "inputval,expected",
+ [
+ (b"AA", 0),
+ (b"AQ", 1),
+ (b"_w", 255),
+ (b"AQAB", 65537),
+ (b"B1vNFQ", 123456789),
+ ],
+)
def test_from_base64url_uint(inputval, expected):
actual = from_base64url_uint(inputval)
assert actual == expected
diff --git a/tests/utils.py b/tests/utils.py
index be189f2..ad39f75 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -9,25 +9,27 @@ def utc_timestamp():
def key_path(key_name):
- return os.path.join(os.path.dirname(os.path.realpath(__file__)),
- 'keys', key_name)
+ return os.path.join(
+ os.path.dirname(os.path.realpath(__file__)), "keys", key_name
+ )
# Borrowed from `cryptography`
if hasattr(int, "from_bytes"):
int_from_bytes = int.from_bytes
else:
+
def int_from_bytes(data, byteorder, signed=False):
- assert byteorder == 'big'
+ assert byteorder == "big"
assert not signed
if len(data) % 4 != 0:
- data = (b'\x00' * (4 - (len(data) % 4))) + data
+ data = (b"\x00" * (4 - (len(data) % 4))) + data
result = 0
while len(data) > 0:
- digit, = struct.unpack('>I', data[:4])
+ digit, = struct.unpack(">I", data[:4])
result = (result << 32) + digit
data = data[4:]
diff --git a/tox.ini b/tox.ini
index 230b973..0b62eab 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,21 +1,22 @@
[tox]
-envlist = py{27,34,35,36,37}-crypto, py{27,35,36,37}-contrib_crypto, py{27,35,36,37}-nocrypto, flake8, mypy
+envlist = lint, typing, py{27,34,35,36,37}-crypto, py{27,35,36,37}-contrib_crypto, py{27,35,36,37}-nocrypto
+
[testenv]
-commands =
- pytest
+extras = tests
+commands = pytest
deps =
crypto: cryptography
contrib_crypto: pycrypto
contrib_crypto: ecdsa
-extras = test
-[testenv:flake8]
-commands =
- flake8
-extras = flake8
-[testenv:mypy]
-commands =
- mypy --ignore-missing-imports jwt
-extras = mypy
+[testenv:typing]
+extras = dev
+commands = mypy --ignore-missing-imports jwt
+
+
+[testenv:lint]
+extras = dev
+passenv = HOMEPATH # needed on Windows
+commands = pre-commit run --all-files