summaryrefslogtreecommitdiff
path: root/jwt
diff options
context:
space:
mode:
authorFlorent Viard <fviard@gmail.com>2022-03-26 06:15:16 +0100
committerGitHub <noreply@github.com>2022-03-26 11:15:16 +0600
commit1e7915613f4962d793b97a0db7bbe1661a59fa5a (patch)
treee6329bedf66adfc40b34a929b5eb9fbb7fa48fc9 /jwt
parent6a624f42112661d1e4ea43ce7a78a9d6c693644b (diff)
downloadpyjwt-1e7915613f4962d793b97a0db7bbe1661a59fa5a.tar.gz
Add detached payload support for JWS encoding and decoding (#723)
Specifications allow to have JWS with unencoded detached payloads. This changeset adds detached payload support for encoding and decoding functions. For encoding, detached payload can be enabled by setting the "is_payload_detached" arg or having the "b64=False" inside the headers. For decoding, the detached payload content (bytes) has to be provided with the "detached_payload" arg and "b64=False" has to be found inside the decoded headers. Functionnally, when this feature is used, the signature will be computed over the raw data bytes of the payload, without being base64 encoded and obviously, the payload will not be provided inside the generated JWS. So, the generated JWS will look like: base64url(header)..base64url(signature) Relevant specifications: RFC 7515: "JSON Web Signature (JWS)". (Annexe F) RFC 7797: "JSON Web Signature (JWS) Unencoded Payload Option".
Diffstat (limited to 'jwt')
-rw-r--r--jwt/api_jws.py49
1 files changed, 40 insertions, 9 deletions
diff --git a/jwt/api_jws.py b/jwt/api_jws.py
index 9e8e178..f32de8f 100644
--- a/jwt/api_jws.py
+++ b/jwt/api_jws.py
@@ -80,34 +80,54 @@ class PyJWS:
algorithm: Optional[str] = "HS256",
headers: Optional[Dict] = None,
json_encoder: Optional[Type[json.JSONEncoder]] = None,
+ is_payload_detached: bool = False,
) -> str:
segments = []
if algorithm is None:
algorithm = "none"
- # Prefer headers["alg"] if present to algorithm parameter.
- if headers and "alg" in headers and headers["alg"]:
- algorithm = headers["alg"]
+ # Prefer headers values if present to function parameters.
+ if headers:
+ headers_alg = headers.get("alg")
+ if headers_alg:
+ algorithm = headers["alg"]
+
+ headers_b64 = headers.get("b64")
+ if headers_b64 is False:
+ is_payload_detached = True
# Header
- header = {"typ": self.header_typ, "alg": algorithm}
+ header = {"typ": self.header_typ, "alg": algorithm} # type: Dict[str, Any]
if headers:
self._validate_headers(headers)
header.update(headers)
- if not header["typ"]:
- del header["typ"]
+
+ if not header["typ"]:
+ del header["typ"]
+
+ if is_payload_detached:
+ header["b64"] = False
+ elif "b64" in header:
+ # True is the standard value for b64, so no need for it
+ del header["b64"]
json_header = json.dumps(
header, separators=(",", ":"), cls=json_encoder
).encode()
segments.append(base64url_encode(json_header))
- segments.append(base64url_encode(payload))
+
+ if is_payload_detached:
+ msg_payload = payload
+ else:
+ msg_payload = base64url_encode(payload)
+ segments.append(msg_payload)
# Segments
signing_input = b".".join(segments)
+
try:
alg_obj = self._algorithms[algorithm]
key = alg_obj.prepare_key(key)
@@ -119,11 +139,13 @@ class PyJWS:
"Algorithm '%s' could not be found. Do you have cryptography "
"installed?" % algorithm
) from e
- else:
- raise NotImplementedError("Algorithm not supported") from e
+ raise NotImplementedError("Algorithm not supported") from e
segments.append(base64url_encode(signature))
+ # Don't put the payload content inside the encoded token when detached
+ if is_payload_detached:
+ segments[1] = b""
encoded_string = b".".join(segments)
return encoded_string.decode("utf-8")
@@ -134,6 +156,7 @@ class PyJWS:
key: str = "",
algorithms: Optional[List[str]] = None,
options: Optional[Dict] = None,
+ detached_payload: Optional[bytes] = None,
**kwargs,
) -> Dict[str, Any]:
if options is None:
@@ -148,6 +171,14 @@ class PyJWS:
payload, signing_input, header, signature = self._load(jwt)
+ if header.get("b64", True) is False:
+ if detached_payload is None:
+ raise DecodeError(
+ 'It is required that you pass in a value for the "detached_payload" argument to decode a message having the b64 header set to false.'
+ )
+ payload = detached_payload
+ signing_input = b".".join([signing_input.rsplit(b".", 1)[0], payload])
+
if verify_signature:
self._verify_signature(signing_input, header, signature, key, algorithms)