summaryrefslogtreecommitdiff
path: root/src/third_party
diff options
context:
space:
mode:
authorShreyas Kalyan <shreyas.kalyan@mongodb.com>2019-11-15 00:18:36 +0000
committerevergreen <evergreen@mongodb.com>2019-11-15 00:18:36 +0000
commitcc3f2c8ba06e9e8c248a0d91a9efd5351311ca37 (patch)
treecce857d427f02007a4cd10de17439522e1f2e104 /src/third_party
parente6deb004dfb74bbf0c381597185ea7cf88a9a812 (diff)
downloadmongo-cc3f2c8ba06e9e8c248a0d91a9efd5351311ca37.tar.gz
SERVER-44048 Fix OCSP Mock Responder
Diffstat (limited to 'src/third_party')
-rw-r--r--src/third_party/mock_ocsp_responder/mock_ocsp_responder.py611
1 files changed, 611 insertions, 0 deletions
diff --git a/src/third_party/mock_ocsp_responder/mock_ocsp_responder.py b/src/third_party/mock_ocsp_responder/mock_ocsp_responder.py
new file mode 100644
index 00000000000..64925b41e7b
--- /dev/null
+++ b/src/third_party/mock_ocsp_responder/mock_ocsp_responder.py
@@ -0,0 +1,611 @@
+#
+# This file has been modified in 2019 by MongoDB Inc.
+#
+
+# OCSPBuilder is derived from https://github.com/wbond/ocspbuilder
+# OCSPResponder is derived from https://github.com/threema-ch/ocspresponder
+
+# Copyright (c) 2015-2018 Will Bond <will@wbond.net>
+
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+# of the Software, and to permit persons to whom the Software is furnished to do
+# so, subject to the following conditions:
+
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+# Copyright 2016 Threema GmbH
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import unicode_literals, division, absolute_import, print_function
+
+import logging
+import base64
+import inspect
+import re
+import enum
+import sys
+import textwrap
+from datetime import datetime, timezone, timedelta
+from typing import Callable, Tuple, Optional
+
+from asn1crypto import x509, keys, core, ocsp
+from asn1crypto.ocsp import OCSPRequest, OCSPResponse
+from oscrypto import asymmetric
+from bottle import Bottle, HTTPResponse, request
+
+__version__ = '0.10.2'
+__version_info__ = (0, 10, 2)
+
+logger = logging.getLogger(__name__)
+
+if sys.version_info < (3,):
+ byte_cls = str
+else:
+ byte_cls = bytes
+
+def _pretty_message(string, *params):
+ """
+ Takes a multi-line string and does the following:
+ - dedents
+ - converts newlines with text before and after into a single line
+ - strips leading and trailing whitespace
+ :param string:
+ The string to format
+ :param *params:
+ Params to interpolate into the string
+ :return:
+ The formatted string
+ """
+
+ output = textwrap.dedent(string)
+
+ # Unwrap lines, taking into account bulleted lists, ordered lists and
+ # underlines consisting of = signs
+ if output.find('\n') != -1:
+ output = re.sub('(?<=\\S)\n(?=[^ \n\t\\d\\*\\-=])', ' ', output)
+
+ if params:
+ output = output % params
+
+ output = output.strip()
+
+ return output
+
+
+def _type_name(value):
+ """
+ :param value:
+ A value to get the object name of
+ :return:
+ A unicode string of the object name
+ """
+
+ if inspect.isclass(value):
+ cls = value
+ else:
+ cls = value.__class__
+ if cls.__module__ in set(['builtins', '__builtin__']):
+ return cls.__name__
+ return '%s.%s' % (cls.__module__, cls.__name__)
+
+def _writer(func):
+ """
+ Decorator for a custom writer, but a default reader
+ """
+
+ name = func.__name__
+ return property(fget=lambda self: getattr(self, '_%s' % name), fset=func)
+
+
+class OCSPResponseBuilder(object):
+
+ _response_status = None
+ _certificate = None
+ _certificate_status = None
+ _revocation_date = None
+ _certificate_issuer = None
+ _hash_algo = None
+ _key_hash_algo = None
+ _nonce = None
+ _this_update = None
+ _next_update = None
+ _response_data_extensions = None
+ _single_response_extensions = None
+
+ def __init__(self, response_status, certificate_status_list=[], revocation_date=None):
+ """
+ Unless changed, responses will use SHA-256 for the signature,
+ and will be valid from the moment created for one week.
+ :param response_status:
+ A unicode string of OCSP response type:
+ - "successful" - when the response includes information about the certificate
+ - "malformed_request" - when the request could not be understood
+ - "internal_error" - when an internal error occured with the OCSP responder
+ - "try_later" - when the OCSP responder is temporarily unavailable
+ - "sign_required" - when the OCSP request must be signed
+ - "unauthorized" - when the responder is not the correct responder for the certificate
+ :param certificate_list:
+ A list of tuples with certificate serial number and certificate status objects.
+ certificate_status:
+ A unicode string of the status of the certificate. Only required if
+ the response_status is "successful".
+ - "good" - when the certificate is in good standing
+ - "revoked" - when the certificate is revoked without a reason code
+ - "key_compromise" - when a private key is compromised
+ - "ca_compromise" - when the CA issuing the certificate is compromised
+ - "affiliation_changed" - when the certificate subject name changed
+ - "superseded" - when the certificate was replaced with a new one
+ - "cessation_of_operation" - when the certificate is no longer needed
+ - "certificate_hold" - when the certificate is temporarily invalid
+ - "remove_from_crl" - only delta CRLs - when temporary hold is removed
+ - "privilege_withdrawn" - one of the usages for a certificate was removed
+ - "unknown" - the responder doesn't know about the certificate being requested
+ :param revocation_date:
+ A datetime.datetime object of when the certificate was revoked, if
+ the response_status is "successful" and the certificate status is
+ not "good" or "unknown".
+ """
+ self._response_status = response_status
+ self._certificate_status_list = certificate_status_list
+ self._revocation_date = revocation_date
+
+ self._key_hash_algo = 'sha1'
+ self._hash_algo = 'sha256'
+ self._response_data_extensions = {}
+ self._single_response_extensions = {}
+
+ @_writer
+ def nonce(self, value):
+ """
+ The nonce that was provided during the request.
+ """
+
+ if not isinstance(value, byte_cls):
+ raise TypeError(_pretty_message(
+ '''
+ nonce must be a byte string, not %s
+ ''',
+ _type_name(value)
+ ))
+
+ self._nonce = value
+
+ @_writer
+ def certificate_issuer(self, value):
+ """
+ An asn1crypto.x509.Certificate object of the issuer of the certificate.
+ This should only be set if the OCSP responder is not the issuer of
+ the certificate, but instead a special certificate only for OCSP
+ responses.
+ """
+
+ if value is not None:
+ is_oscrypto = isinstance(value, asymmetric.Certificate)
+ if not is_oscrypto and not isinstance(value, x509.Certificate):
+ raise TypeError(_pretty_message(
+ '''
+ certificate_issuer must be an instance of
+ asn1crypto.x509.Certificate or
+ oscrypto.asymmetric.Certificate, not %s
+ ''',
+ _type_name(value)
+ ))
+
+ if is_oscrypto:
+ value = value.asn1
+
+ self._certificate_issuer = value
+
+ @_writer
+ def next_update(self, value):
+ """
+ A datetime.datetime object of when the response may next change. This
+ should only be set if responses are cached. If responses are generated
+ fresh on every request, this should not be set.
+ """
+
+ if not isinstance(value, datetime):
+ raise TypeError(_pretty_message(
+ '''
+ next_update must be an instance of datetime.datetime, not %s
+ ''',
+ _type_name(value)
+ ))
+
+ self._next_update = value
+
+ def build(self, responder_private_key=None, responder_certificate=None):
+ """
+ Validates the request information, constructs the ASN.1 structure and
+ signs it.
+ The responder_private_key and responder_certificate parameters are onlystr
+ required if the response_status is "successful".
+ :param responder_private_key:
+ An asn1crypto.keys.PrivateKeyInfo or oscrypto.asymmetric.PrivateKey
+ object for the private key to sign the response with
+ :param responder_certificate:
+ An asn1crypto.x509.Certificate or oscrypto.asymmetric.Certificate
+ object of the certificate associated with the private key
+ :return:
+ An asn1crypto.ocsp.OCSPResponse object of the response
+ """
+ if self._response_status != 'successful':
+ return ocsp.OCSPResponse({
+ 'response_status': self._response_status
+ })
+
+ is_oscrypto = isinstance(responder_private_key, asymmetric.PrivateKey)
+ if not isinstance(responder_private_key, keys.PrivateKeyInfo) and not is_oscrypto:
+ raise TypeError(_pretty_message(
+ '''
+ responder_private_key must be an instance ofthe c
+ asn1crypto.keys.PrivateKeyInfo or
+ oscrypto.asymmetric.PrivateKey, not %s
+ ''',
+ _type_name(responder_private_key)
+ ))
+
+ cert_is_oscrypto = isinstance(responder_certificate, asymmetric.Certificate)
+ if not isinstance(responder_certificate, x509.Certificate) and not cert_is_oscrypto:
+ raise TypeError(_pretty_message(
+ '''
+ responder_certificate must be an instance of
+ asn1crypto.x509.Certificate or
+ oscrypto.asymmetric.Certificate, not %s
+ ''',
+ _type_name(responder_certificate)
+ ))
+
+ if cert_is_oscrypto:
+ responder_certificate = responder_certificate.asn1
+
+ if self._certificate_status_list is None:
+ raise ValueError(_pretty_message(
+ '''
+ certificate_status_list must be set if the response_status is
+ "successful"
+ '''
+ ))
+
+ def _make_extension(name, value):
+ return {
+ 'extn_id': name,
+ 'critical': False,
+ 'extn_value': value
+ }
+
+ responses = []
+ for serial, status in self._certificate_status_list:
+ response_data_extensions = []
+ single_response_extensions = []
+ for name, value in self._response_data_extensions.items():
+ response_data_extensions.append(_make_extension(name, value))
+ if self._nonce:
+ response_data_extensions.append(
+ _make_extension('nonce', self._nonce)
+ )
+
+ if not response_data_extensions:
+ response_data_extensions = None
+
+ for name, value in self._single_response_extensions.items():
+ single_response_extensions.append(_make_extension(name, value))
+
+ if self._certificate_issuer:
+ single_response_extensions.append(
+ _make_extension(
+ 'certificate_issuer',
+ [
+ x509.GeneralName(
+ name='directory_name',
+ value=self._certificate_issuer.subject
+ )
+ ]
+ )
+ )
+
+ if not single_response_extensions:
+ single_response_extensions = None
+
+ responder_key_hash = getattr(responder_certificate.public_key, self._key_hash_algo)
+
+ if status == 'good':
+ cert_status = ocsp.CertStatus(
+ name='good',
+ value=core.Null()
+ )
+ elif status == 'unknown':
+ cert_status = ocsp.CertStatus(
+ name='unknown',
+ value=core.Null()
+ )
+ else:
+ reason = status if status != 'revoked' else 'unspecified'
+ cert_status = ocsp.CertStatus(
+ name='revoked',
+ value={
+ 'revocation_time': self._revocation_date,
+ 'revocation_reason': reason,
+ }
+ )
+
+ issuer = self._certificate_issuer if self._certificate_issuer else responder_certificate
+
+ produced_at = datetime.now(timezone.utc)
+
+ if self._this_update is None:
+ self._this_update = produced_at
+
+ if self._next_update is None:
+ self._next_update = self._this_update + timedelta(days=7)
+
+ response = {
+ 'cert_id': {
+ 'hash_algorithm': {
+ 'algorithm': self._key_hash_algo
+ },
+ 'issuer_name_hash': getattr(issuer.subject, self._key_hash_algo),
+ 'issuer_key_hash': getattr(issuer.public_key, self._key_hash_algo),
+ 'serial_number': serial,
+ },
+ 'cert_status': cert_status,
+ 'this_update': self._this_update,
+ 'next_update': self._next_update,
+ 'single_extensions': single_response_extensions
+ }
+ responses.append(response)
+
+ response_data = ocsp.ResponseData({
+ 'responder_id': ocsp.ResponderId(name='by_key', value=responder_key_hash),
+ 'produced_at': produced_at,
+ 'responses': responses,
+ 'response_extensions': response_data_extensions
+ })
+
+ signature_algo = responder_private_key.algorithm
+ if signature_algo == 'ec':
+ signature_algo = 'ecdsa'
+
+ signature_algorithm_id = '%s_%s' % (self._hash_algo, signature_algo)
+
+ if responder_private_key.algorithm == 'rsa':
+ sign_func = asymmetric.rsa_pkcs1v15_sign
+ elif responder_private_key.algorithm == 'dsa':
+ sign_func = asymmetric.dsa_sign
+ elif responder_private_key.algorithm == 'ec':
+ sign_func = asymmetric.ecdsa_sign
+
+ if not is_oscrypto:
+ responder_private_key = asymmetric.load_private_key(responder_private_key)
+ signature_bytes = sign_func(responder_private_key, response_data.dump(), self._hash_algo)
+
+ certs = None
+ if self._certificate_issuer:
+ certs = [responder_certificate]
+
+ return ocsp.OCSPResponse({
+ 'response_status': self._response_status,
+ 'response_bytes': {
+ 'response_type': 'basic_ocsp_response',
+ 'response': {
+ 'tbs_response_data': response_data,
+ 'signature_algorithm': {'algorithm': signature_algorithm_id},
+ 'signature': signature_bytes,
+ }
+ }
+ })
+
+# Enums
+
+class ResponseStatus(enum.Enum):
+ successful = 'successful'
+ malformed_request = 'malformed_request'
+ internal_error = 'internal_error'
+ try_later = 'try_later'
+ sign_required = 'sign_required'
+ unauthorized = 'unauthorized'
+
+
+class CertificateStatus(enum.Enum):
+ good = 'good'
+ revoked = 'revoked'
+ key_compromise = 'key_compromise'
+ ca_compromise = 'ca_compromise'
+ affiliation_changed = 'affiliation_changed'
+ superseded = 'superseded'
+ cessation_of_operation = 'cessation_of_operation'
+ certificate_hold = 'certificate_hold'
+ remove_from_crl = 'remove_from_crl'
+ privilege_withdrawn = 'privilege_withdrawn'
+ unknown = 'unknown'
+
+
+# API endpoints
+FAULT_REVOKED = "revoked"
+FAULT_UNKNOWN = "unknown"
+
+class OCSPResponder:
+
+ def __init__(self, issuer_cert: str, responder_cert: str, responder_key: str,
+ fault: str = None, next_update_days: int = 7):
+ """
+ Create a new OCSPResponder instance.
+
+ :param issuer_cert: Path to the issuer certificate.
+ :param responder_cert: Path to the certificate of the OCSP responder
+ with the `OCSP Signing` extension.
+ :param responder_key: Path to the private key belonging to the
+ responder cert.
+ :param validate_func: A function that - given a certificate serial -
+ will return the appropriate :class:`CertificateStatus` and -
+ depending on the status - a revocation datetime.
+ :param cert_retrieve_func: A function that - given a certificate serial -
+ will return the corresponding certificate as a string.
+ :param next_update_days: The ``nextUpdate`` value that will be written
+ into the response. Default: 7 days.
+
+ """
+ # Certs and keys
+ self._issuer_cert = asymmetric.load_certificate(issuer_cert)
+ self._responder_cert = asymmetric.load_certificate(responder_cert)
+ self._responder_key = asymmetric.load_private_key(responder_key)
+
+ # Next update
+ self._next_update_days = next_update_days
+
+ self._fault = fault
+
+ # Bottle
+ self._app = Bottle()
+
+ # Initialize routing
+ self._route()
+
+ def _route(self):
+ self._app.get('/', callback=self._handle_root)
+ self._app.get('/status/<request_data>', callback=self._handle_get)
+ self._app.post('/status/', callback=self._handle_post)
+
+ def _handle_root(self):
+ return 'ocsp-responder'
+
+ def _handle_get(self, request_data):
+ """
+ An OCSP GET request contains the DER-in-base64 encoded OCSP request in the
+ HTTP request URL.
+ """
+ der = base64.b64decode(request_data)
+ ocsp_request = self._parse_ocsp_request(der)
+ return self._build_http_response(ocsp_request)
+
+ def _handle_post(self):
+ """
+ An OCSP POST request contains the DER encoded OCSP request in the HTTP
+ request body.
+ """
+ der = request.body.read()
+ ocsp_request = self._parse_ocsp_request(der)
+ return self._build_http_response(ocsp_request)
+
+ def _fail(self, status: ResponseStatus) -> OCSPResponse:
+ builder = OCSPResponseBuilder(response_status=status.value)
+ return builder.build()
+
+ def _parse_ocsp_request(self, request_der: bytes) -> OCSPRequest:
+ """
+ Parse the request bytes, return an ``OCSPRequest`` instance.
+ """
+ return OCSPRequest.load(request_der)
+
+ def validate(self):
+ time = datetime(2018, 1, 1, 1, 00, 00, 00, timezone.utc)
+ if self._fault == FAULT_REVOKED:
+ return (CertificateStatus.revoked, time)
+ elif self._fault == FAULT_UNKNOWN:
+ return (CertificateStatus.unknown, None)
+ elif self._fault != None:
+ raise NotImplemented('Fault type could not be found')
+ return (CertificateStatus.good, time)
+
+ def _build_ocsp_response(self, ocsp_request: OCSPRequest) -> OCSPResponse:
+ """
+ Create and return an OCSP response from an OCSP request.
+ """
+ # Get the certificate serial
+ tbs_request = ocsp_request['tbs_request']
+ request_list = tbs_request['request_list']
+ if len(request_list) < 1:
+ logger.warning('Received OCSP request with no requests')
+ raise NotImplemented('Empty requests not supported')
+
+ single_request = request_list[0] # TODO: Support more than one request
+ req_cert = single_request['req_cert']
+ serial = req_cert['serial_number'].native
+
+ # Check certificate status
+ try:
+ certificate_status, revocation_date = self.validate()
+ except Exception as e:
+ logger.exception('Could not determine certificate status: %s', e)
+ return self._fail(ResponseStatus.internal_error)
+
+ certificate_status_list = [(serial, certificate_status.value)]
+
+ # Build the response
+ builder = OCSPResponseBuilder(**{
+ 'response_status': ResponseStatus.successful.value,
+ 'certificate_status_list': certificate_status_list,
+ 'revocation_date': revocation_date,
+ })
+
+ # Parse extensions
+ for extension in tbs_request['request_extensions']:
+ extn_id = extension['extn_id'].native
+ critical = extension['critical'].native
+ value = extension['extn_value'].parsed
+
+ # This variable tracks whether any unknown extensions were encountered
+ unknown = False
+
+ # Handle nonce extension
+ if extn_id == 'nonce':
+ builder.nonce = value.native
+
+ # That's all we know
+ else:
+ unknown = True
+
+ # If an unknown critical extension is encountered (which should not
+ # usually happen, according to RFC 6960 4.1.2), we should throw our
+ # hands up in despair and run.
+ if unknown is True and critical is True:
+ logger.warning('Could not parse unknown critical extension: %r',
+ dict(extension.native))
+ return self._fail(ResponseStatus.internal_error)
+
+ # If it's an unknown non-critical extension, we can safely ignore it.
+ elif unknown is True:
+ logger.info('Ignored unknown non-critical extension: %r', dict(extension.native))
+
+ # Set certificate issuer
+ builder.certificate_issuer = self._issuer_cert
+
+ # Set next update date
+ builder.next_update = datetime.now(timezone.utc) + timedelta(days=self._next_update_days)
+
+ return builder.build(self._responder_key, self._responder_cert)
+
+ def _build_http_response(self, request_der: bytes) -> HTTPResponse:
+ response_der = self._build_ocsp_response(request_der).dump()
+ return HTTPResponse(
+ status=200,
+ body=response_der,
+ content_type='application/ocsp-response',
+ )
+
+ def serve(self, port=8080, debug=False):
+ logger.info('Launching %sserver on port %d', 'debug' if debug else '', port)
+ self._app.run(port=port, debug=debug)