diff options
Diffstat (limited to 'redis/ocsp.py')
-rw-r--r-- | redis/ocsp.py | 174 |
1 files changed, 159 insertions, 15 deletions
diff --git a/redis/ocsp.py b/redis/ocsp.py index 49aaddf..666c7dc 100644 --- a/redis/ocsp.py +++ b/redis/ocsp.py @@ -1,18 +1,170 @@ import base64 +import datetime import ssl from urllib.parse import urljoin, urlparse import cryptography.hazmat.primitives.hashes import requests from cryptography import hazmat, x509 +from cryptography.exceptions import InvalidSignature from cryptography.hazmat import backends +from cryptography.hazmat.primitives.asymmetric.dsa import DSAPublicKey +from cryptography.hazmat.primitives.asymmetric.ec import ECDSA, EllipticCurvePublicKey +from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey +from cryptography.hazmat.primitives.hashes import SHA1, Hash +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from cryptography.x509 import ocsp from redis.exceptions import AuthorizationError, ConnectionError +def _verify_response(issuer_cert, ocsp_response): + pubkey = issuer_cert.public_key() + try: + if isinstance(pubkey, RSAPublicKey): + pubkey.verify( + ocsp_response.signature, + ocsp_response.tbs_response_bytes, + PKCS1v15(), + ocsp_response.signature_hash_algorithm, + ) + elif isinstance(pubkey, DSAPublicKey): + pubkey.verify( + ocsp_response.signature, + ocsp_response.tbs_response_bytes, + ocsp_response.signature_hash_algorithm, + ) + elif isinstance(pubkey, EllipticCurvePublicKey): + pubkey.verify( + ocsp_response.signature, + ocsp_response.tbs_response_bytes, + ECDSA(ocsp_response.signature_hash_algorithm), + ) + else: + pubkey.verify(ocsp_response.signature, ocsp_response.tbs_response_bytes) + except InvalidSignature: + raise ConnectionError("failed to valid ocsp response") + + +def _check_certificate(issuer_cert, ocsp_bytes, validate=True): + """A wrapper the return the validity of a known ocsp certificate""" + + ocsp_response = ocsp.load_der_ocsp_response(ocsp_bytes) + + if ocsp_response.response_status == ocsp.OCSPResponseStatus.UNAUTHORIZED: + raise AuthorizationError("you are not authorized to view this ocsp certificate") + if ocsp_response.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL: + if ocsp_response.certificate_status != ocsp.OCSPCertStatus.GOOD: + return False + else: + return False + + if ocsp_response.this_update >= datetime.datetime.now(): + raise ConnectionError("ocsp certificate was issued in the future") + + if ( + ocsp_response.next_update + and ocsp_response.next_update < datetime.datetime.now() + ): + raise ConnectionError("ocsp certificate has invalid update - in the past") + + responder_name = ocsp_response.responder_name + issuer_hash = ocsp_response.issuer_key_hash + responder_hash = ocsp_response.responder_key_hash + + cert_to_validate = issuer_cert + if ( + responder_name is not None + and responder_name == issuer_cert.subject + or responder_hash == issuer_hash + ): + cert_to_validate = issuer_cert + else: + certs = ocsp_response.certificates + responder_certs = _get_certificates( + certs, issuer_cert, responder_name, responder_hash + ) + + try: + responder_cert = responder_certs[0] + except IndexError: + raise ConnectionError("no certificates found for the responder") + + ext = responder_cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage) + if ext is None or x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING not in ext.value: + raise ConnectionError("delegate not autorized for ocsp signing") + cert_to_validate = responder_cert + + if validate: + _verify_response(cert_to_validate, ocsp_response) + return True + + +def _get_certificates(certs, issuer_cert, responder_name, responder_hash): + if responder_name is None: + certificates = [ + c + for c in certs + if _get_pubkey_hash(c) == responder_hash and c.issuer == issuer_cert.subject + ] + else: + certificates = [ + c + for c in certs + if c.subject == responder_name and c.issuer == issuer_cert.subject + ] + + return certificates + + +def _get_pubkey_hash(certificate): + pubkey = certificate.public_key() + + # https://stackoverflow.com/a/46309453/600498 + if isinstance(pubkey, RSAPublicKey): + h = pubkey.public_bytes(Encoding.DER, PublicFormat.PKCS1) + elif isinstance(pubkey, EllipticCurvePublicKey): + h = pubkey.public_bytes(Encoding.X962, PublicFormat.UncompressedPoint) + else: + h = pubkey.public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo) + + sha1 = Hash(SHA1(), backend=backends.default_backend()) + sha1.update(h) + return sha1.finalize() + + +def ocsp_staple_verifier(con, ocsp_bytes, expected=None): + """An implemention of a function for set_ocsp_client_callback in PyOpenSSL. + + This function validates that the provide ocsp_bytes response is valid, + and matches the expected, stapled responses. + """ + if ocsp_bytes in [b"", None]: + raise ConnectionError("no ocsp response present") + + issuer_cert = None + peer_cert = con.get_peer_certificate().to_cryptography() + for c in con.get_peer_cert_chain(): + cert = c.to_cryptography() + if cert.subject == peer_cert.issuer: + issuer_cert = cert + break + + if issuer_cert is None: + raise ConnectionError("no matching issuer cert found in certificate chain") + + if expected is not None: + e = x509.load_pem_x509_certificate(expected) + if peer_cert != e: + raise ConnectionError("received and expected certificates do not match") + + return _check_certificate(issuer_cert, ocsp_bytes) + + class OCSPVerifier: - """A class to verify ssl sockets for RFC6960/RFC6961. + """A class to verify ssl sockets for RFC6960/RFC6961. This can be used + when using direct validation of OCSP responses and certificate revocations. @see https://datatracker.ietf.org/doc/html/rfc6960 @see https://datatracker.ietf.org/doc/html/rfc6961 @@ -67,7 +219,7 @@ class OCSPVerifier: try: issuer = issuers[0].access_location.value except IndexError: - raise ConnectionError("no issuers in certificate") + issuer = None # now, the series of ocsp server entries ocsps = [ @@ -128,19 +280,7 @@ class OCSPVerifier: r = requests.get(ocsp_url, headers=header) if not r.ok: raise ConnectionError("failed to fetch ocsp certificate") - - ocsp_response = ocsp.load_der_ocsp_response(r.content) - if ocsp_response.response_status == ocsp.OCSPResponseStatus.UNAUTHORIZED: - raise AuthorizationError( - "you are not authorized to view this ocsp certificate" - ) - if ocsp_response.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL: - if ocsp_response.certificate_status == ocsp.OCSPCertStatus.REVOKED: - return False - else: - return True - else: - return False + return _check_certificate(issuer_cert, r.content, True) def is_valid(self): """Returns the validity of the certificate wrapping our socket. @@ -153,7 +293,11 @@ class OCSPVerifier: # validate the certificate try: cert, issuer_url, ocsp_server = self.components_from_socket() + if issuer_url is None: + raise ConnectionError("no issuers found in certificate chain") return self.check_certificate(ocsp_server, cert, issuer_url) except AuthorizationError: cert, issuer_url, ocsp_server = self.components_from_direct_connection() + if issuer_url is None: + raise ConnectionError("no issuers found in certificate chain") return self.check_certificate(ocsp_server, cert, issuer_url) |