summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorent Viard <fviard@gmail.com>2022-03-26 06:15:16 +0100
committerGitHub <noreply@github.com>2022-03-26 11:15:16 +0600
commit1e7915613f4962d793b97a0db7bbe1661a59fa5a (patch)
treee6329bedf66adfc40b34a929b5eb9fbb7fa48fc9
parent6a624f42112661d1e4ea43ce7a78a9d6c693644b (diff)
downloadpyjwt-1e7915613f4962d793b97a0db7bbe1661a59fa5a.tar.gz
Add detached payload support for JWS encoding and decoding (#723)
Specifications allow to have JWS with unencoded detached payloads. This changeset adds detached payload support for encoding and decoding functions. For encoding, detached payload can be enabled by setting the "is_payload_detached" arg or having the "b64=False" inside the headers. For decoding, the detached payload content (bytes) has to be provided with the "detached_payload" arg and "b64=False" has to be found inside the decoded headers. Functionnally, when this feature is used, the signature will be computed over the raw data bytes of the payload, without being base64 encoded and obviously, the payload will not be provided inside the generated JWS. So, the generated JWS will look like: base64url(header)..base64url(signature) Relevant specifications: RFC 7515: "JSON Web Signature (JWS)". (Annexe F) RFC 7797: "JSON Web Signature (JWS) Unencoded Payload Option".
-rw-r--r--jwt/api_jws.py49
-rw-r--r--tests/test_api_jws.py51
2 files changed, 91 insertions, 9 deletions
diff --git a/jwt/api_jws.py b/jwt/api_jws.py
index 9e8e178..f32de8f 100644
--- a/jwt/api_jws.py
+++ b/jwt/api_jws.py
@@ -80,34 +80,54 @@ class PyJWS:
algorithm: Optional[str] = "HS256",
headers: Optional[Dict] = None,
json_encoder: Optional[Type[json.JSONEncoder]] = None,
+ is_payload_detached: bool = False,
) -> str:
segments = []
if algorithm is None:
algorithm = "none"
- # Prefer headers["alg"] if present to algorithm parameter.
- if headers and "alg" in headers and headers["alg"]:
- algorithm = headers["alg"]
+ # Prefer headers values if present to function parameters.
+ if headers:
+ headers_alg = headers.get("alg")
+ if headers_alg:
+ algorithm = headers["alg"]
+
+ headers_b64 = headers.get("b64")
+ if headers_b64 is False:
+ is_payload_detached = True
# Header
- header = {"typ": self.header_typ, "alg": algorithm}
+ header = {"typ": self.header_typ, "alg": algorithm} # type: Dict[str, Any]
if headers:
self._validate_headers(headers)
header.update(headers)
- if not header["typ"]:
- del header["typ"]
+
+ if not header["typ"]:
+ del header["typ"]
+
+ if is_payload_detached:
+ header["b64"] = False
+ elif "b64" in header:
+ # True is the standard value for b64, so no need for it
+ del header["b64"]
json_header = json.dumps(
header, separators=(",", ":"), cls=json_encoder
).encode()
segments.append(base64url_encode(json_header))
- segments.append(base64url_encode(payload))
+
+ if is_payload_detached:
+ msg_payload = payload
+ else:
+ msg_payload = base64url_encode(payload)
+ segments.append(msg_payload)
# Segments
signing_input = b".".join(segments)
+
try:
alg_obj = self._algorithms[algorithm]
key = alg_obj.prepare_key(key)
@@ -119,11 +139,13 @@ class PyJWS:
"Algorithm '%s' could not be found. Do you have cryptography "
"installed?" % algorithm
) from e
- else:
- raise NotImplementedError("Algorithm not supported") from e
+ raise NotImplementedError("Algorithm not supported") from e
segments.append(base64url_encode(signature))
+ # Don't put the payload content inside the encoded token when detached
+ if is_payload_detached:
+ segments[1] = b""
encoded_string = b".".join(segments)
return encoded_string.decode("utf-8")
@@ -134,6 +156,7 @@ class PyJWS:
key: str = "",
algorithms: Optional[List[str]] = None,
options: Optional[Dict] = None,
+ detached_payload: Optional[bytes] = None,
**kwargs,
) -> Dict[str, Any]:
if options is None:
@@ -148,6 +171,14 @@ class PyJWS:
payload, signing_input, header, signature = self._load(jwt)
+ if header.get("b64", True) is False:
+ if detached_payload is None:
+ raise DecodeError(
+ 'It is required that you pass in a value for the "detached_payload" argument to decode a message having the b64 header set to false.'
+ )
+ payload = detached_payload
+ signing_input = b".".join([signing_input.rsplit(b".", 1)[0], payload])
+
if verify_signature:
self._verify_signature(signing_input, header, signature, key, algorithms)
diff --git a/tests/test_api_jws.py b/tests/test_api_jws.py
index 07d1511..0a0e295 100644
--- a/tests/test_api_jws.py
+++ b/tests/test_api_jws.py
@@ -719,3 +719,54 @@ class TestJWS:
jws.encode(payload, "secret", headers={"kid": None})
assert "Key ID header parameter must be a string" == str(exc.value)
+
+ def test_encode_decode_with_detached_content(self, jws, payload):
+ secret = "secret"
+ jws_message = jws.encode(
+ payload, secret, algorithm="HS256", is_payload_detached=True
+ )
+
+ jws.decode(jws_message, secret, algorithms=["HS256"], detached_payload=payload)
+
+ def test_encode_detached_content_with_b64_header(self, jws, payload):
+ secret = "secret"
+
+ # Check that detached content is automatically detected when b64 is false
+ headers = {"b64": False}
+ token = jws.encode(payload, secret, "HS256", headers)
+
+ msg_header, msg_payload, _ = token.split(".")
+ msg_header = base64url_decode(msg_header.encode())
+ msg_header_obj = json.loads(msg_header)
+
+ assert "b64" in msg_header_obj
+ assert msg_header_obj["b64"] is False
+ # Check that the payload is not inside the token
+ assert not msg_payload
+
+ # Check that content is not detached and b64 header removed when b64 is true
+ headers = {"b64": True}
+ token = jws.encode(payload, secret, "HS256", headers)
+
+ msg_header, msg_payload, _ = token.split(".")
+ msg_header = base64url_decode(msg_header.encode())
+ msg_header_obj = json.loads(msg_header)
+
+ assert "b64" not in msg_header_obj
+ assert msg_payload
+
+ def test_decode_detached_content_without_proper_argument(self, jws):
+ example_jws = (
+ "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiIsImI2NCI6ZmFsc2V9"
+ "."
+ ".65yNkX_ZH4A_6pHaTL_eI84OXOHtfl4K0k5UnlXZ8f4"
+ )
+ example_secret = "secret"
+
+ with pytest.raises(DecodeError) as exc:
+ jws.decode(example_jws, example_secret, algorithms=["HS256"])
+
+ assert (
+ 'It is required that you pass in a value for the "detached_payload" argument to decode a message having the b64 header set to false.'
+ in str(exc.value)
+ )