summaryrefslogtreecommitdiff
path: root/redis/ocsp.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/ocsp.py')
-rw-r--r--redis/ocsp.py174
1 files changed, 159 insertions, 15 deletions
diff --git a/redis/ocsp.py b/redis/ocsp.py
index 49aaddf..666c7dc 100644
--- a/redis/ocsp.py
+++ b/redis/ocsp.py
@@ -1,18 +1,170 @@
import base64
+import datetime
import ssl
from urllib.parse import urljoin, urlparse
import cryptography.hazmat.primitives.hashes
import requests
from cryptography import hazmat, x509
+from cryptography.exceptions import InvalidSignature
from cryptography.hazmat import backends
+from cryptography.hazmat.primitives.asymmetric.dsa import DSAPublicKey
+from cryptography.hazmat.primitives.asymmetric.ec import ECDSA, EllipticCurvePublicKey
+from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
+from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
+from cryptography.hazmat.primitives.hashes import SHA1, Hash
+from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.x509 import ocsp
from redis.exceptions import AuthorizationError, ConnectionError
+def _verify_response(issuer_cert, ocsp_response):
+ pubkey = issuer_cert.public_key()
+ try:
+ if isinstance(pubkey, RSAPublicKey):
+ pubkey.verify(
+ ocsp_response.signature,
+ ocsp_response.tbs_response_bytes,
+ PKCS1v15(),
+ ocsp_response.signature_hash_algorithm,
+ )
+ elif isinstance(pubkey, DSAPublicKey):
+ pubkey.verify(
+ ocsp_response.signature,
+ ocsp_response.tbs_response_bytes,
+ ocsp_response.signature_hash_algorithm,
+ )
+ elif isinstance(pubkey, EllipticCurvePublicKey):
+ pubkey.verify(
+ ocsp_response.signature,
+ ocsp_response.tbs_response_bytes,
+ ECDSA(ocsp_response.signature_hash_algorithm),
+ )
+ else:
+ pubkey.verify(ocsp_response.signature, ocsp_response.tbs_response_bytes)
+ except InvalidSignature:
+ raise ConnectionError("failed to valid ocsp response")
+
+
+def _check_certificate(issuer_cert, ocsp_bytes, validate=True):
+ """A wrapper the return the validity of a known ocsp certificate"""
+
+ ocsp_response = ocsp.load_der_ocsp_response(ocsp_bytes)
+
+ if ocsp_response.response_status == ocsp.OCSPResponseStatus.UNAUTHORIZED:
+ raise AuthorizationError("you are not authorized to view this ocsp certificate")
+ if ocsp_response.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL:
+ if ocsp_response.certificate_status != ocsp.OCSPCertStatus.GOOD:
+ return False
+ else:
+ return False
+
+ if ocsp_response.this_update >= datetime.datetime.now():
+ raise ConnectionError("ocsp certificate was issued in the future")
+
+ if (
+ ocsp_response.next_update
+ and ocsp_response.next_update < datetime.datetime.now()
+ ):
+ raise ConnectionError("ocsp certificate has invalid update - in the past")
+
+ responder_name = ocsp_response.responder_name
+ issuer_hash = ocsp_response.issuer_key_hash
+ responder_hash = ocsp_response.responder_key_hash
+
+ cert_to_validate = issuer_cert
+ if (
+ responder_name is not None
+ and responder_name == issuer_cert.subject
+ or responder_hash == issuer_hash
+ ):
+ cert_to_validate = issuer_cert
+ else:
+ certs = ocsp_response.certificates
+ responder_certs = _get_certificates(
+ certs, issuer_cert, responder_name, responder_hash
+ )
+
+ try:
+ responder_cert = responder_certs[0]
+ except IndexError:
+ raise ConnectionError("no certificates found for the responder")
+
+ ext = responder_cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
+ if ext is None or x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING not in ext.value:
+ raise ConnectionError("delegate not autorized for ocsp signing")
+ cert_to_validate = responder_cert
+
+ if validate:
+ _verify_response(cert_to_validate, ocsp_response)
+ return True
+
+
+def _get_certificates(certs, issuer_cert, responder_name, responder_hash):
+ if responder_name is None:
+ certificates = [
+ c
+ for c in certs
+ if _get_pubkey_hash(c) == responder_hash and c.issuer == issuer_cert.subject
+ ]
+ else:
+ certificates = [
+ c
+ for c in certs
+ if c.subject == responder_name and c.issuer == issuer_cert.subject
+ ]
+
+ return certificates
+
+
+def _get_pubkey_hash(certificate):
+ pubkey = certificate.public_key()
+
+ # https://stackoverflow.com/a/46309453/600498
+ if isinstance(pubkey, RSAPublicKey):
+ h = pubkey.public_bytes(Encoding.DER, PublicFormat.PKCS1)
+ elif isinstance(pubkey, EllipticCurvePublicKey):
+ h = pubkey.public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
+ else:
+ h = pubkey.public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
+
+ sha1 = Hash(SHA1(), backend=backends.default_backend())
+ sha1.update(h)
+ return sha1.finalize()
+
+
+def ocsp_staple_verifier(con, ocsp_bytes, expected=None):
+ """An implemention of a function for set_ocsp_client_callback in PyOpenSSL.
+
+ This function validates that the provide ocsp_bytes response is valid,
+ and matches the expected, stapled responses.
+ """
+ if ocsp_bytes in [b"", None]:
+ raise ConnectionError("no ocsp response present")
+
+ issuer_cert = None
+ peer_cert = con.get_peer_certificate().to_cryptography()
+ for c in con.get_peer_cert_chain():
+ cert = c.to_cryptography()
+ if cert.subject == peer_cert.issuer:
+ issuer_cert = cert
+ break
+
+ if issuer_cert is None:
+ raise ConnectionError("no matching issuer cert found in certificate chain")
+
+ if expected is not None:
+ e = x509.load_pem_x509_certificate(expected)
+ if peer_cert != e:
+ raise ConnectionError("received and expected certificates do not match")
+
+ return _check_certificate(issuer_cert, ocsp_bytes)
+
+
class OCSPVerifier:
- """A class to verify ssl sockets for RFC6960/RFC6961.
+ """A class to verify ssl sockets for RFC6960/RFC6961. This can be used
+ when using direct validation of OCSP responses and certificate revocations.
@see https://datatracker.ietf.org/doc/html/rfc6960
@see https://datatracker.ietf.org/doc/html/rfc6961
@@ -67,7 +219,7 @@ class OCSPVerifier:
try:
issuer = issuers[0].access_location.value
except IndexError:
- raise ConnectionError("no issuers in certificate")
+ issuer = None
# now, the series of ocsp server entries
ocsps = [
@@ -128,19 +280,7 @@ class OCSPVerifier:
r = requests.get(ocsp_url, headers=header)
if not r.ok:
raise ConnectionError("failed to fetch ocsp certificate")
-
- ocsp_response = ocsp.load_der_ocsp_response(r.content)
- if ocsp_response.response_status == ocsp.OCSPResponseStatus.UNAUTHORIZED:
- raise AuthorizationError(
- "you are not authorized to view this ocsp certificate"
- )
- if ocsp_response.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL:
- if ocsp_response.certificate_status == ocsp.OCSPCertStatus.REVOKED:
- return False
- else:
- return True
- else:
- return False
+ return _check_certificate(issuer_cert, r.content, True)
def is_valid(self):
"""Returns the validity of the certificate wrapping our socket.
@@ -153,7 +293,11 @@ class OCSPVerifier:
# validate the certificate
try:
cert, issuer_url, ocsp_server = self.components_from_socket()
+ if issuer_url is None:
+ raise ConnectionError("no issuers found in certificate chain")
return self.check_certificate(ocsp_server, cert, issuer_url)
except AuthorizationError:
cert, issuer_url, ocsp_server = self.components_from_direct_connection()
+ if issuer_url is None:
+ raise ConnectionError("no issuers found in certificate chain")
return self.check_certificate(ocsp_server, cert, issuer_url)