diff options
author | Florent Viard <fviard@gmail.com> | 2022-03-26 06:15:16 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-26 11:15:16 +0600 |
commit | 1e7915613f4962d793b97a0db7bbe1661a59fa5a (patch) | |
tree | e6329bedf66adfc40b34a929b5eb9fbb7fa48fc9 | |
parent | 6a624f42112661d1e4ea43ce7a78a9d6c693644b (diff) | |
download | pyjwt-1e7915613f4962d793b97a0db7bbe1661a59fa5a.tar.gz |
Add detached payload support for JWS encoding and decoding (#723)
Specifications allow to have JWS with unencoded detached payloads.
This changeset adds detached payload support for encoding and decoding
functions.
For encoding, detached payload can be enabled by setting the
"is_payload_detached" arg or having the "b64=False" inside the headers.
For decoding, the detached payload content (bytes) has to be provided
with the "detached_payload" arg and "b64=False" has to be found inside
the decoded headers.
Functionnally, when this feature is used, the signature will be computed
over the raw data bytes of the payload, without being base64 encoded and
obviously, the payload will not be provided inside the generated JWS.
So, the generated JWS will look like:
base64url(header)..base64url(signature)
Relevant specifications:
RFC 7515: "JSON Web Signature (JWS)". (Annexe F)
RFC 7797: "JSON Web Signature (JWS) Unencoded Payload Option".
-rw-r--r-- | jwt/api_jws.py | 49 | ||||
-rw-r--r-- | tests/test_api_jws.py | 51 |
2 files changed, 91 insertions, 9 deletions
diff --git a/jwt/api_jws.py b/jwt/api_jws.py index 9e8e178..f32de8f 100644 --- a/jwt/api_jws.py +++ b/jwt/api_jws.py @@ -80,34 +80,54 @@ class PyJWS: algorithm: Optional[str] = "HS256", headers: Optional[Dict] = None, json_encoder: Optional[Type[json.JSONEncoder]] = None, + is_payload_detached: bool = False, ) -> str: segments = [] if algorithm is None: algorithm = "none" - # Prefer headers["alg"] if present to algorithm parameter. - if headers and "alg" in headers and headers["alg"]: - algorithm = headers["alg"] + # Prefer headers values if present to function parameters. + if headers: + headers_alg = headers.get("alg") + if headers_alg: + algorithm = headers["alg"] + + headers_b64 = headers.get("b64") + if headers_b64 is False: + is_payload_detached = True # Header - header = {"typ": self.header_typ, "alg": algorithm} + header = {"typ": self.header_typ, "alg": algorithm} # type: Dict[str, Any] if headers: self._validate_headers(headers) header.update(headers) - if not header["typ"]: - del header["typ"] + + if not header["typ"]: + del header["typ"] + + if is_payload_detached: + header["b64"] = False + elif "b64" in header: + # True is the standard value for b64, so no need for it + del header["b64"] json_header = json.dumps( header, separators=(",", ":"), cls=json_encoder ).encode() segments.append(base64url_encode(json_header)) - segments.append(base64url_encode(payload)) + + if is_payload_detached: + msg_payload = payload + else: + msg_payload = base64url_encode(payload) + segments.append(msg_payload) # Segments signing_input = b".".join(segments) + try: alg_obj = self._algorithms[algorithm] key = alg_obj.prepare_key(key) @@ -119,11 +139,13 @@ class PyJWS: "Algorithm '%s' could not be found. Do you have cryptography " "installed?" % algorithm ) from e - else: - raise NotImplementedError("Algorithm not supported") from e + raise NotImplementedError("Algorithm not supported") from e segments.append(base64url_encode(signature)) + # Don't put the payload content inside the encoded token when detached + if is_payload_detached: + segments[1] = b"" encoded_string = b".".join(segments) return encoded_string.decode("utf-8") @@ -134,6 +156,7 @@ class PyJWS: key: str = "", algorithms: Optional[List[str]] = None, options: Optional[Dict] = None, + detached_payload: Optional[bytes] = None, **kwargs, ) -> Dict[str, Any]: if options is None: @@ -148,6 +171,14 @@ class PyJWS: payload, signing_input, header, signature = self._load(jwt) + if header.get("b64", True) is False: + if detached_payload is None: + raise DecodeError( + 'It is required that you pass in a value for the "detached_payload" argument to decode a message having the b64 header set to false.' + ) + payload = detached_payload + signing_input = b".".join([signing_input.rsplit(b".", 1)[0], payload]) + if verify_signature: self._verify_signature(signing_input, header, signature, key, algorithms) diff --git a/tests/test_api_jws.py b/tests/test_api_jws.py index 07d1511..0a0e295 100644 --- a/tests/test_api_jws.py +++ b/tests/test_api_jws.py @@ -719,3 +719,54 @@ class TestJWS: jws.encode(payload, "secret", headers={"kid": None}) assert "Key ID header parameter must be a string" == str(exc.value) + + def test_encode_decode_with_detached_content(self, jws, payload): + secret = "secret" + jws_message = jws.encode( + payload, secret, algorithm="HS256", is_payload_detached=True + ) + + jws.decode(jws_message, secret, algorithms=["HS256"], detached_payload=payload) + + def test_encode_detached_content_with_b64_header(self, jws, payload): + secret = "secret" + + # Check that detached content is automatically detected when b64 is false + headers = {"b64": False} + token = jws.encode(payload, secret, "HS256", headers) + + msg_header, msg_payload, _ = token.split(".") + msg_header = base64url_decode(msg_header.encode()) + msg_header_obj = json.loads(msg_header) + + assert "b64" in msg_header_obj + assert msg_header_obj["b64"] is False + # Check that the payload is not inside the token + assert not msg_payload + + # Check that content is not detached and b64 header removed when b64 is true + headers = {"b64": True} + token = jws.encode(payload, secret, "HS256", headers) + + msg_header, msg_payload, _ = token.split(".") + msg_header = base64url_decode(msg_header.encode()) + msg_header_obj = json.loads(msg_header) + + assert "b64" not in msg_header_obj + assert msg_payload + + def test_decode_detached_content_without_proper_argument(self, jws): + example_jws = ( + "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiIsImI2NCI6ZmFsc2V9" + "." + ".65yNkX_ZH4A_6pHaTL_eI84OXOHtfl4K0k5UnlXZ8f4" + ) + example_secret = "secret" + + with pytest.raises(DecodeError) as exc: + jws.decode(example_jws, example_secret, algorithms=["HS256"]) + + assert ( + 'It is required that you pass in a value for the "detached_payload" argument to decode a message having the b64 header set to false.' + in str(exc.value) + ) |