diff options
Diffstat (limited to 'src/M2Crypto/X509.py')
-rw-r--r-- | src/M2Crypto/X509.py | 1443 |
1 files changed, 1443 insertions, 0 deletions
diff --git a/src/M2Crypto/X509.py b/src/M2Crypto/X509.py new file mode 100644 index 0000000..b27af13 --- /dev/null +++ b/src/M2Crypto/X509.py @@ -0,0 +1,1443 @@ +from __future__ import absolute_import + +"""M2Crypto wrapper for OpenSSL X509 API. + +Copyright (c) 1999-2004 Ng Pheng Siong. All rights reserved. + +Portions created by Open Source Applications Foundation (OSAF) are +Copyright (C) 2004-2007 OSAF. All Rights Reserved. +Author: Heikki Toivonen +""" + +import binascii +import logging + +from M2Crypto import ASN1, BIO, EVP, m2, six # noqa +from typing import AnyStr, List, Optional # noqa + +FORMAT_DER = 0 +FORMAT_PEM = 1 + +if hasattr(m2, "VERIFY_ALLOW_PROXY_CERTS"): + verify_allow_proxy_certs = m2.VERIFY_ALLOW_PROXY_CERTS +if hasattr(m2, "VERIFY_CB_ISSUER_CHECK"): + verify_cb_issuer_check = m2.VERIFY_CB_ISSUER_CHECK +if hasattr(m2, "VERIFY_CHECK_SS_SIGNATURE"): + verify_check_ss_signature = m2.VERIFY_CHECK_SS_SIGNATURE +if hasattr(m2, "VERIFY_CRL_CHECK"): + verify_crl_check = m2.VERIFY_CRL_CHECK +if hasattr(m2, "VERIFY_CRL_CHECK_ALL"): + verify_crl_check_all = m2.VERIFY_CRL_CHECK_ALL +if hasattr(m2, "VERIFY_EXPLICIT_POLICY"): + verify_explicit_policy = m2.VERIFY_EXPLICIT_POLICY +if hasattr(m2, "VERIFY_EXTENDED_CRL_SUPPORT"): + verify_extended_crl_support = m2.VERIFY_EXTENDED_CRL_SUPPORT +if hasattr(m2, "VERIFY_IGNORE_CRITICAL"): + verify_ignore_critical = m2.VERIFY_IGNORE_CRITICAL +if hasattr(m2, "VERIFY_INHIBIT_ANY"): + verify_inhibit_any = m2.VERIFY_INHIBIT_ANY +if hasattr(m2, "VERIFY_INHIBIT_MAP"): + verify_inhibit_map = m2.VERIFY_INHIBIT_MAP +if hasattr(m2, "VERIFY_NO_ALT_CHAINS"): + verify_no_alt_chains = m2.VERIFY_NO_ALT_CHAINS +if hasattr(m2, "VERIFY_NO_CHECK_TIME"): + verify_no_check_time = m2.VERIFY_NO_CHECK_TIME +if hasattr(m2, "VERIFY_NOTIFY_POLICY"): + verify_notify_policy = m2.VERIFY_NOTIFY_POLICY +if hasattr(m2, "VERIFY_PARTIAL_CHAIN"): + verify_partial_chain = m2.VERIFY_PARTIAL_CHAIN +if hasattr(m2, "VERIFY_POLICY_CHECK"): + verify_policy_check = m2.VERIFY_POLICY_CHECK +if hasattr(m2, "VERIFY_TRUSTED_FIRST"): + verify_trusted_first = m2.VERIFY_TRUSTED_FIRST +if hasattr(m2, "VERIFY_USE_DELTAS"): + verify_use_deltas = m2.VERIFY_USE_DELTAS +if hasattr(m2, "VERIFY_X509_STRICT"): + verify_x509_strict = m2.VERIFY_X509_STRICT + +log = logging.getLogger(__name__) + + +class X509Error(ValueError): + pass + + +m2.x509_init(X509Error) + +V_OK = m2.X509_V_OK # type: int + + +def x509_store_default_cb(ok, ctx): + # type: (int, X509_Store_Context) -> int + return ok + + +def new_extension(name, value, critical=0, _pyfree=1): + # type: (str, bytes, int, int) -> X509_Extension + """ + Create new X509_Extension instance. + """ + if name == 'subjectKeyIdentifier' and \ + value.strip('0123456789abcdefABCDEF:') != '': + raise ValueError('value must be precomputed hash') + ctx = m2.x509v3_set_nconf() + x509_ext_ptr = m2.x509v3_ext_conf(None, ctx, name, value) + if x509_ext_ptr is None: + raise X509Error( + "Cannot create X509_Extension with name '%s' and value '%s'" % + (name, value)) + x509_ext = X509_Extension(x509_ext_ptr, _pyfree) + x509_ext.set_critical(critical) + return x509_ext + + +class X509_Extension(object): + """ + X509 Extension + """ + + m2_x509_extension_free = m2.x509_extension_free + + def __init__(self, x509_ext_ptr=None, _pyfree=1): + # type: (Optional[bytes], int) -> None + self.x509_ext = x509_ext_ptr + self._pyfree = _pyfree + + def __del__(self): + # type: () -> None + if getattr(self, '_pyfree', 0) and self.x509_ext: + self.m2_x509_extension_free(self.x509_ext) + + def _ptr(self): + # type: () -> bytes + return self.x509_ext + + def set_critical(self, critical=1): + # type: (int) -> int + """ + Mark this extension critical or noncritical. By default an + extension is not critical. + + :param critical: Nonzero sets this extension as critical. + Calling this method without arguments will + set this extension to critical. + :return: 1 for success, 0 for failure + """ + return m2.x509_extension_set_critical(self.x509_ext, critical) + + def get_critical(self): + # type: () -> int + """ + Return whether or not this is a critical extension. + + :return: Nonzero if this is a critical extension. + """ + return m2.x509_extension_get_critical(self.x509_ext) + + def get_name(self): + # type: () -> str + """ + Get the extension name, for example 'subjectAltName'. + """ + return six.ensure_text(m2.x509_extension_get_name(self.x509_ext)) + + def get_value(self, flag=0, indent=0): + # type: (int, int) -> str + """ + Get the extension value, for example 'DNS:www.example.com'. + + :param flag: Flag to control what and how to print. + :param indent: How many spaces to print before actual value. + """ + buf = BIO.MemoryBuffer() + m2.x509_ext_print(buf.bio_ptr(), self.x509_ext, flag, indent) + return six.ensure_text(buf.read_all()) + + +class X509_Extension_Stack(object): + """ + X509 Extension Stack + + :warning: Do not modify the underlying OpenSSL stack + except through this interface, or use any OpenSSL + functions that do so indirectly. Doing so will get the + OpenSSL stack and the internal pystack of this class out + of sync, leading to python memory leaks, exceptions or + even python crashes! + """ + + m2_sk_x509_extension_free = m2.sk_x509_extension_free + + def __init__(self, stack=None, _pyfree=0): + # type: (Optional[bytes], int) -> None + if stack is not None: + self.stack = stack + self._pyfree = _pyfree + num = m2.sk_x509_extension_num(self.stack) + for i in range(num): + self.pystack.append(X509_Extension( + m2.sk_x509_extension_value(self.stack, i), + _pyfree=_pyfree)) + else: + self.stack = m2.sk_x509_extension_new_null() + self._pyfree = 1 + self.pystack = [] # This must be kept in sync with self.stack + + def __del__(self): + # type: () -> None + # see BIO.py - unbalanced __init__ / __del__ + if getattr(self, '_pyfree', 0): + self.m2_sk_x509_extension_free(self.stack) + + def __len__(self): + # type: () -> int + assert m2.sk_x509_extension_num(self.stack) == len(self.pystack) + return len(self.pystack) + + def __getitem__(self, idx): + # type: (int) -> X509_Extension + return self.pystack[idx] + + def __iter__(self): + return iter(self.pystack) + + def _ptr(self): + # type: () -> bytes + return self.stack + + def push(self, x509_ext): + # type: (X509_Extension) -> int + """ + Push X509_Extension object onto the stack. + + :param x509_ext: X509_Extension object to be pushed onto the stack. + :return: The number of extensions on the stack. + """ + self.pystack.append(x509_ext) + ret = m2.sk_x509_extension_push(self.stack, x509_ext._ptr()) + assert ret == len(self.pystack) + return ret + + def pop(self): + # type: () -> X509_Extension + """ + Pop X509_Extension object from the stack. + + :return: X509_Extension popped + """ + x509_ext_ptr = m2.sk_x509_extension_pop(self.stack) + if x509_ext_ptr is None: + assert len(self.pystack) == 0 + return None + return self.pystack.pop() + + +class X509_Name_Entry(object): + """ + X509 Name Entry + """ + + m2_x509_name_entry_free = m2.x509_name_entry_free + + def __init__(self, x509_name_entry, _pyfree=0): + # type: (bytes, int) -> None + """ + :param x509_name_entry: this should be OpenSSL X509_NAME_ENTRY binary + :param _pyfree: + """ + self.x509_name_entry = x509_name_entry + self._pyfree = _pyfree + + def __del__(self): + # type: () -> None + if getattr(self, '_pyfree', 0): + self.m2_x509_name_entry_free(self.x509_name_entry) + + def _ptr(self): + # type: () -> bytes + return self.x509_name_entry + + def set_object(self, asn1obj): + # type: (ASN1.ASN1_Object) -> int + """ + Sets the field name to asn1obj + + :param asn1obj: + :return: 0 on failure, 1 on success + """ + return m2.x509_name_entry_set_object(self.x509_name_entry, + asn1obj._ptr()) + + def set_data(self, data, type=ASN1.MBSTRING_ASC): + # type: (bytes, int) -> int + """ + Sets the field name to asn1obj + + :param data: data in a binary form to be set + :return: 0 on failure, 1 on success + """ + return m2.x509_name_entry_set_data(self.x509_name_entry, type, data) + + def get_object(self): + # type: () -> ASN1.ASN1_Object + return ASN1.ASN1_Object( + m2.x509_name_entry_get_object(self.x509_name_entry)) + + def get_data(self): + # type: () -> ASN1.ASN1_String + return ASN1.ASN1_String( + m2.x509_name_entry_get_data(self.x509_name_entry)) + + def create_by_txt(self, field, type, entry, len): + return m2.x509_name_entry_create_by_txt(self.x509_name_entry._ptr(), + field, type, entry, len) + + +class X509_Name(object): + """ + X509 Name + """ + + nid = {'C': m2.NID_countryName, + 'SP': m2.NID_stateOrProvinceName, + 'ST': m2.NID_stateOrProvinceName, + 'stateOrProvinceName': m2.NID_stateOrProvinceName, + 'L': m2.NID_localityName, + 'localityName': m2.NID_localityName, + 'O': m2.NID_organizationName, + 'organizationName': m2.NID_organizationName, + 'OU': m2.NID_organizationalUnitName, + 'organizationUnitName': m2.NID_organizationalUnitName, + 'CN': m2.NID_commonName, + 'commonName': m2.NID_commonName, + 'Email': m2.NID_pkcs9_emailAddress, + 'emailAddress': m2.NID_pkcs9_emailAddress, + 'serialNumber': m2.NID_serialNumber, + 'SN': m2.NID_surname, + 'surname': m2.NID_surname, + 'GN': m2.NID_givenName, + 'givenName': m2.NID_givenName + } + + m2_x509_name_free = m2.x509_name_free + + def __init__(self, x509_name=None, _pyfree=0): + # type: (bytes, int) -> None + """ + :param x509_name: this should be OpenSSL X509_NAME binary + :param _pyfree: + """ + if x509_name is not None: + assert m2.x509_name_type_check(x509_name), "'x509_name' type error" + self.x509_name = x509_name + self._pyfree = _pyfree + else: + self.x509_name = m2.x509_name_new() + self._pyfree = 1 + + def __del__(self): + # type: () -> None + if getattr(self, '_pyfree', 0): + self.m2_x509_name_free(self.x509_name) + + def __str__(self): + # type: () -> bytes + assert m2.x509_name_type_check(self.x509_name), \ + "'x509_name' type error" + return m2.x509_name_oneline(self.x509_name) + + def __getattr__(self, attr): + # type: (str) -> str + if attr in self.nid: + assert m2.x509_name_type_check(self.x509_name), \ + "'x509_name' type error" + return six.ensure_text(m2.x509_name_by_nid(self.x509_name, self.nid[attr])) + + if attr in self.__dict__: + return self.__dict__[attr] + + raise AttributeError(self, attr) + + def __setattr__(self, attr, value): + # type: (str, AnyStr) -> int + """ + :return: 1 for success of 0 if an error occurred. + """ + if attr in self.nid: + assert m2.x509_name_type_check(self.x509_name), \ + "'x509_name' type error" + return m2.x509_name_set_by_nid(self.x509_name, self.nid[attr], + six.ensure_binary(value)) + + self.__dict__[attr] = value + + def __len__(self): + # type: () -> int + return m2.x509_name_entry_count(self.x509_name) + + def __getitem__(self, idx): + # type: (int) -> X509_Name_Entry + if not 0 <= idx < self.entry_count(): + raise IndexError("index out of range") + return X509_Name_Entry(m2.x509_name_get_entry(self.x509_name, idx)) + + def __iter__(self): + for i in range(self.entry_count()): + yield self[i] + + def _ptr(self): + assert m2.x509_name_type_check(self.x509_name), \ + "'x509_name' type error" + return self.x509_name + + def add_entry_by_txt(self, field, type, entry, len, loc, set): + # entry_type: (str, int, bytes, int, int, int) -> int + """ + Add X509_Name field whose name is identified by its name. + + :param field: name of the entry + :param type: use MBSTRING_ASC or MBSTRING_UTF8 + (or standard ASN1 type like V_ASN1_IA5STRING) + :param entry: value + :param len: buf_len of the entry + (-1 and the length is computed automagically) + + The ``loc`` and ``set`` parameters determine where a new entry + should be added. + For almost all applications loc can be set to -1 and set to 0. + This adds a new entry to the end of name as a single valued + RelativeDistinguishedName (RDN). + + :param loc: determines the index where the new entry is + inserted: if it is -1 it is appended. + :param set: determines how the new type is added. If it is zero + a new RDN is created. + If set is -1 or 1 it is added to the previous or next RDN + structure respectively. This will then be a multivalued + RDN: since multivalues RDNs are very seldom used set is + almost always set to zero. + + :return: 1 for success of 0 if an error occurred. + """ + return m2.x509_name_add_entry_by_txt(self.x509_name, + six.ensure_str(field), type, + six.ensure_str(entry), len, loc, set) + + def entry_count(self): + # type: () -> int + return m2.x509_name_entry_count(self.x509_name) + + def get_entries_by_nid(self, nid): + # type: (int) -> List[X509_Name_Entry] + """ + Retrieve the next index matching nid. + + :param nid: name of the entry (as m2.NID* constants) + + :return: list of X509_Name_Entry items + """ + ret = [] + lastpos = -1 + + while True: + lastpos = m2.x509_name_get_index_by_nid(self.x509_name, nid, + lastpos) + if lastpos == -1: + break + + ret.append(self[lastpos]) + + return ret + + def as_text(self, indent=0, flags=m2.XN_FLAG_COMPAT): + # type: (int, int) -> str + """ + as_text returns the name as a string. + + :param indent: Each line in multiline format is indented + by this many spaces. + :param flags: Flags that control how the output should be formatted. + """ + assert m2.x509_name_type_check(self.x509_name), \ + "'x509_name' type error" + buf = BIO.MemoryBuffer() + m2.x509_name_print_ex(buf.bio_ptr(), self.x509_name, indent, flags) + return six.ensure_text(buf.read_all()) + + def as_der(self): + # type: () -> bytes + assert m2.x509_name_type_check(self.x509_name), \ + "'x509_name' type error" + return m2.x509_name_get_der(self.x509_name) + + def as_hash(self): + # type: () -> int + assert m2.x509_name_type_check(self.x509_name), \ + "'x509_name' type error" + return m2.x509_name_hash(self.x509_name) + + +class X509(object): + """ + X.509 Certificate + """ + + m2_x509_free = m2.x509_free + + def __init__(self, x509=None, _pyfree=0): + # type: (Optional[bytes], int) -> None + """ + :param x509: binary representation of + the underlying OpenSSL X509 object. + :param _pyfree: + """ + if x509 is not None: + assert m2.x509_type_check(x509), "'x509' type error" + self.x509 = x509 + self._pyfree = _pyfree + else: + self.x509 = m2.x509_new() + self._pyfree = 1 + + def __del__(self): + # type: () -> None + if getattr(self, '_pyfree', 0): + self.m2_x509_free(self.x509) + + def _ptr(self): + # type: () -> bytes + assert m2.x509_type_check(self.x509), "'x509' type error" + return self.x509 + + def as_text(self): + # type: () -> str + assert m2.x509_type_check(self.x509), "'x509' type error" + buf = BIO.MemoryBuffer() + m2.x509_print(buf.bio_ptr(), self.x509) + return six.ensure_text(buf.read_all()) + + def as_der(self): + # type: () -> bytes + assert m2.x509_type_check(self.x509), "'x509' type error" + return m2.i2d_x509(self.x509) + + def as_pem(self): + # type: () -> bytes + buf = BIO.MemoryBuffer() + m2.x509_write_pem(buf.bio_ptr(), self.x509) + return buf.read_all() + + def save_pem(self, filename): + # type: (AnyStr) -> int + """ + :param filename: name of the file to be loaded + :return: 1 for success or 0 for failure + """ + with BIO.openfile(filename, 'wb') as bio: + return m2.x509_write_pem(bio.bio_ptr(), self.x509) + + def save(self, filename, format=FORMAT_PEM): + # type: (AnyStr, int) -> int + """ + Saves X.509 certificate to a file. Default output + format is PEM. + + :param filename: Name of the file the cert will be saved to. + + :param format: Controls what output format is used to save the cert. + Either FORMAT_PEM or FORMAT_DER to save in PEM or + DER format. Raises a ValueError if an unknow + format is used. + + :return: 1 for success or 0 for failure + """ + with BIO.openfile(filename, 'wb') as bio: + if format == FORMAT_PEM: + return m2.x509_write_pem(bio.bio_ptr(), self.x509) + elif format == FORMAT_DER: + return m2.i2d_x509_bio(bio.bio_ptr(), self.x509) + else: + raise ValueError( + "Unknown filetype. Must be either FORMAT_PEM or FORMAT_DER") + + def set_version(self, version): + # type: (int) -> int + """ + Set version of the certificate. + + :param version: Version number. + :return: Returns 0 on failure. + """ + assert m2.x509_type_check(self.x509), "'x509' type error" + return m2.x509_set_version(self.x509, version) + + def set_not_before(self, asn1_time): + # type: (ASN1.ASN1_TIME) -> int + """ + :return: 1 on success, 0 on failure + """ + assert m2.x509_type_check(self.x509), "'x509' type error" + return m2.x509_set_not_before(self.x509, asn1_time._ptr()) + + def set_not_after(self, asn1_time): + # type: (ASN1.ASN1_TIME) -> int + """ + :return: 1 on success, 0 on failure + """ + assert m2.x509_type_check(self.x509), "'x509' type error" + return m2.x509_set_not_after(self.x509, asn1_time._ptr()) + + def set_subject_name(self, name): + # type: (X509_Name) -> int + """ + :return: 1 on success, 0 on failure + """ + assert m2.x509_type_check(self.x509), "'x509' type error" + return m2.x509_set_subject_name(self.x509, name.x509_name) + + def set_issuer_name(self, name): + # type: (X509_Name) -> int + """ + :return: 1 on success, 0 on failure + """ + assert m2.x509_type_check(self.x509), "'x509' type error" + return m2.x509_set_issuer_name(self.x509, name.x509_name) + + def get_version(self): + # type: () -> int + assert m2.x509_type_check(self.x509), "'x509' type error" + return m2.x509_get_version(self.x509) + + def get_serial_number(self): + # type: () -> ASN1.ASN1_Integer + assert m2.x509_type_check(self.x509), "'x509' type error" + asn1_integer = m2.x509_get_serial_number(self.x509) + return m2.asn1_integer_get(asn1_integer) + + def set_serial_number(self, serial): + # type: (ASN1.ASN1_Integer) -> int + """ + Set serial number. + + :param serial: Serial number. + + :return 1 for success and 0 for failure. + """ + assert m2.x509_type_check(self.x509), "'x509' type error" + # This "magically" changes serial since asn1_integer + # is C pointer to x509's internal serial number. + asn1_integer = m2.x509_get_serial_number(self.x509) + return m2.asn1_integer_set(asn1_integer, serial) + # XXX Or should I do this? + # asn1_integer = m2.asn1_integer_new() + # m2.asn1_integer_set(asn1_integer, serial) + # return m2.x509_set_serial_number(self.x509, asn1_integer) + + def get_not_before(self): + # type: () -> ASN1.ASN1_TIME + assert m2.x509_type_check(self.x509), "'x509' type error" + return ASN1.ASN1_TIME(m2.x509_get_not_before(self.x509)) + + def get_not_after(self): + # type: () -> ASN1.ASN1_TIME + assert m2.x509_type_check(self.x509), "'x509' type error" + out = ASN1.ASN1_TIME(m2.x509_get_not_after(self.x509)) + if 'Bad time value' in str(out): + raise X509Error( + '''M2Crypto cannot handle dates after year 2050. + See RFC 5280 4.1.2.5 for more information. + ''') + return out + + def get_pubkey(self): + # type: () -> EVP.PKey + assert m2.x509_type_check(self.x509), "'x509' type error" + return EVP.PKey(m2.x509_get_pubkey(self.x509), _pyfree=1) + + def set_pubkey(self, pkey): + # type: (EVP.PKey) -> int + """ + Set the public key for the certificate + + :param pkey: Public key + + :return 1 for success and 0 for failure + """ + assert m2.x509_type_check(self.x509), "'x509' type error" + return m2.x509_set_pubkey(self.x509, pkey.pkey) + + def get_issuer(self): + # type: () -> X509_Name + assert m2.x509_type_check(self.x509), "'x509' type error" + return X509_Name(m2.x509_get_issuer_name(self.x509)) + + def set_issuer(self, name): + # type: (X509_Name) -> int + """ + Set issuer name. + + :param name: subjectName field. + + :return 1 for success and 0 for failure + """ + assert m2.x509_type_check(self.x509), "'x509' type error" + return m2.x509_set_issuer_name(self.x509, name.x509_name) + + def get_subject(self): + # type: () -> X509_Name + assert m2.x509_type_check(self.x509), "'x509' type error" + return X509_Name(m2.x509_get_subject_name(self.x509)) + + def set_subject(self, name): + # type: (X509_Name) -> int + """ + Set subject name. + + :param name: subjectName field. + + :return 1 for success and 0 for failure + """ + assert m2.x509_type_check(self.x509), "'x509' type error" + return m2.x509_set_subject_name(self.x509, name.x509_name) + + def add_ext(self, ext): + # type: (X509_Extension) -> int + """ + Add X509 extension to this certificate. + + :param ext: Extension + + :return 1 for success and 0 for failure + """ + assert m2.x509_type_check(self.x509), "'x509' type error" + return m2.x509_add_ext(self.x509, ext.x509_ext, -1) + + def get_ext(self, name): + # type: (str) -> X509_Extension + """ + Get X509 extension by name. + + :param name: Name of the extension + + :return: X509_Extension + """ + # Optimizations to reduce attribute accesses + m2x509_get_ext = m2.x509_get_ext + m2x509_extension_get_name = m2.x509_extension_get_name + x509 = self.x509 + + name = six.ensure_binary(name) + for i in range(m2.x509_get_ext_count(x509)): + ext_ptr = m2x509_get_ext(x509, i) + if m2x509_extension_get_name(ext_ptr) == name: + return X509_Extension(ext_ptr, _pyfree=0) + + raise LookupError + + def get_ext_at(self, index): + # type: (int) -> X509_Extension + """ + Get X509 extension by index. + + :param index: Name of the extension + + :return: X509_Extension + """ + if index < 0 or index >= self.get_ext_count(): + raise IndexError + + return X509_Extension(m2.x509_get_ext(self.x509, index), + _pyfree=0) + + def get_ext_count(self): + # type: () -> int + """ + Get X509 extension count. + """ + return m2.x509_get_ext_count(self.x509) + + def sign(self, pkey, md): + # type: (EVP.PKey, str) -> int + """ + Sign the certificate. + + :param pkey: Public key + + :param md: Message digest algorithm to use for signing, + for example 'sha1'. + + :return int + """ + assert m2.x509_type_check(self.x509), "'x509' type error" + mda = getattr(m2, md, None) + if mda is None: + raise ValueError('unknown message digest', md) + return m2.x509_sign(self.x509, pkey.pkey, mda()) + + def verify(self, pkey=None): + # type: (Optional[EVP.PKey]) -> int + assert m2.x509_type_check(self.x509), "'x509' type error" + if pkey: + return m2.x509_verify(self.x509, pkey.pkey) + else: + return m2.x509_verify(self.x509, self.get_pubkey().pkey) + + def check_ca(self): + # type: () -> int + """ + Check if the certificate is a Certificate Authority (CA) certificate. + + :return: 0 if the certificate is not CA, nonzero otherwise. + + :requires: OpenSSL 0.9.8 or newer + """ + return m2.x509_check_ca(self.x509) + + def check_purpose(self, id, ca): + # type: (int, int) -> int + """ + Check if the certificate's purpose matches the asked purpose. + + :param id: Purpose id. See X509_PURPOSE_* constants. + + :param ca: 1 if the certificate should be CA, 0 otherwise. + + :return: 0 if the certificate purpose does not match, nonzero + otherwise. + """ + return m2.x509_check_purpose(self.x509, id, ca) + + def get_fingerprint(self, md='md5'): + # type: (str) -> str + """ + Get the fingerprint of the certificate. + + :param md: Message digest algorithm to use. + + :return: String containing the fingerprint in hex format. + """ + der = self.as_der() + md = EVP.MessageDigest(md) + md.update(der) + digest = md.final() + return six.ensure_text(binascii.hexlify(digest).upper()) + + +def load_cert(file, format=FORMAT_PEM): + # type: (AnyStr, int) -> X509 + """ + Load certificate from file. + + :param file: Name of file containing certificate in either DER or + PEM format. + + :param format: Describes the format of the file to be loaded, + either PEM or DER. + + :return: M2Crypto.X509.X509 object. + """ + with BIO.openfile(file) as bio: + if format == FORMAT_PEM: + return load_cert_bio(bio) + elif format == FORMAT_DER: + cptr = m2.d2i_x509(bio._ptr()) + return X509(cptr, _pyfree=1) + else: + raise ValueError( + "Unknown format. Must be either FORMAT_DER or FORMAT_PEM") + + +def load_cert_bio(bio, format=FORMAT_PEM): + # type: (BIO.BIO, int) -> X509 + """ + Load certificate from a bio. + + :param bio: BIO pointing at a certificate in either DER or PEM format. + + :param format: Describes the format of the cert to be loaded, + either PEM or DER (via constants FORMAT_PEM + and FORMAT_FORMAT_DER) + + :return: M2Crypto.X509.X509 object. + """ + if format == FORMAT_PEM: + cptr = m2.x509_read_pem(bio._ptr()) + elif format == FORMAT_DER: + cptr = m2.d2i_x509(bio._ptr()) + else: + raise ValueError( + "Unknown format. Must be either FORMAT_DER or FORMAT_PEM") + return X509(cptr, _pyfree=1) + + +def load_cert_string(string, format=FORMAT_PEM): + # type: (AnyStr, int) -> X509 + """ + Load certificate from a string. + + :param string: String containing a certificate in either DER or PEM format. + + :param format: Describes the format of the cert to be loaded, + either PEM or DER (via constants FORMAT_PEM + and FORMAT_FORMAT_DER) + + :return: M2Crypto.X509.X509 object. + """ + string = six.ensure_binary(string) + bio = BIO.MemoryBuffer(string) + return load_cert_bio(bio, format) + + +def load_cert_der_string(string): + # type: (AnyStr) -> X509 + """ + Load certificate from a string. + + :param string: String containing a certificate in DER format. + + :return: M2Crypto.X509.X509 object. + """ + string = six.ensure_binary(string) + bio = BIO.MemoryBuffer(string) + cptr = m2.d2i_x509(bio._ptr()) + return X509(cptr, _pyfree=1) + + +class X509_Store_Context(object): + """ + X509 Store Context + """ + + m2_x509_store_ctx_free = m2.x509_store_ctx_free + + def __init__(self, x509_store_ctx, _pyfree=0): + # type: (bytes, int) -> None + """ + + :param x509_store_ctx: binary data for + OpenSSL X509_STORE_CTX type + """ + self.ctx = x509_store_ctx + self._pyfree = _pyfree + + def __del__(self): + # type: () -> None + # see BIO.py - unbalanced __init__ / __del__ + if not hasattr(self, '_pyfree'): + pass # print("OOPS") + elif self._pyfree: + self.m2_x509_store_ctx_free(self.ctx) + + def _ptr(self): + return self.ctx + + def get_current_cert(self): + # type: () -> X509 + """ + Get current X.509 certificate. + + :warning: The returned certificate is NOT refcounted, so you can not + rely on it being valid once the store context goes + away or is modified. + """ + return X509(m2.x509_store_ctx_get_current_cert(self.ctx), _pyfree=0) + + def get_error(self): + # type: () -> int + """ + Get error code. + """ + return m2.x509_store_ctx_get_error(self.ctx) + + def get_error_depth(self): + # type: () -> int + """ + Get error depth. + """ + return m2.x509_store_ctx_get_error_depth(self.ctx) + + def get1_chain(self): + # type: () -> X509_Stack + """ + Get certificate chain. + + :return: Reference counted (i.e. safe to use even after the store + context goes away) stack of certificates in the chain. + """ + return X509_Stack(m2.x509_store_ctx_get1_chain(self.ctx), 1, 1) + + +class X509_Store(object): + """ + X509 Store + """ + + m2_x509_store_free = m2.x509_store_free + + def __init__(self, store=None, _pyfree=0): + # type: (Optional[bytes], int) -> None + """ + :param store: binary data for OpenSSL X509_STORE_CTX type. + """ + if store is not None: + self.store = store + self._pyfree = _pyfree + else: + self.store = m2.x509_store_new() + self._pyfree = 1 + + def __del__(self): + # type: () -> None + if getattr(self, '_pyfree', 0): + self.m2_x509_store_free(self.store) + + def _ptr(self): + return self.store + + def load_info(self, file): + # type: (AnyStr) -> int + """ + :param file: filename + + :return: 1 on success, 0 on failure + """ + ret = m2.x509_store_load_locations(self.store, file) + return ret + + load_locations = load_info + + def add_x509(self, x509): + # type: (X509) -> int + assert isinstance(x509, X509) + return m2.x509_store_add_cert(self.store, x509._ptr()) + + def set_verify_cb(self, callback=None): + # type: (Optional[callable]) -> None + """ + Set callback which will be called when the store is verified. + Wrapper over OpenSSL X509_STORE_set_verify_cb(). + + :param callback: Callable to specify verification options. + Type of the callable must be: + (int, X509_Store_Context) -> int. + If None: set the standard options. + + :note: compile-time or run-time errors in the callback would result + in mysterious errors during verification, which could be hard + to trace. + + :note: Python exceptions raised in callbacks do not propagate to + verify() call. + + :return: None + """ + if callback is None: + return self.set_verify_cb(x509_store_default_cb) + + if not callable(callback): + raise X509Error("set_verify(): callback is not callable") + return m2.x509_store_set_verify_cb(self.store, callback) + + add_cert = add_x509 + + def set_flags(self, flags): + # type: (int) -> int + """ + Set the verification flags for the X509Store + Wrapper over OpenSSL X509_STORE_set_flags() + + :param flags: `VERIFICATION FLAGS` section of the + X509_VERIFY_PARAM_set_flags man page has + a complete description of values the flags + parameter can take. + Their M2Crypto equivalent is transformed following + the pattern: "X509_V_FLAG_XYZ" -> lowercase("VERIFY_XYZ") + """ + return m2.x509_store_set_flags(self.store, flags) + + +class X509_Stack(object): + """ + X509 Stack + + :warning: Do not modify the underlying OpenSSL stack + except through this interface, or use any OpenSSL + functions that do so indirectly. Doing so will get the + OpenSSL stack and the internal pystack of this class out + of sync, leading to python memory leaks, exceptions or + even python crashes! + """ + + m2_sk_x509_free = m2.sk_x509_free + + def __init__(self, stack=None, _pyfree=0, _pyfree_x509=0): + # type: (bytes, int, int) -> None + if stack is not None: + self.stack = stack + self._pyfree = _pyfree + self.pystack = [] # This must be kept in sync with self.stack + num = m2.sk_x509_num(self.stack) + for i in range(num): + self.pystack.append(X509(m2.sk_x509_value(self.stack, i), + _pyfree=_pyfree_x509)) + else: + self.stack = m2.sk_x509_new_null() + self._pyfree = 1 + self.pystack = [] # This must be kept in sync with self.stack + + def __del__(self): + # type: () -> None + if getattr(self, '_pyfree', 0): + self.m2_sk_x509_free(self.stack) + + def __len__(self): + # type: () -> int + assert m2.sk_x509_num(self.stack) == len(self.pystack) + return len(self.pystack) + + def __getitem__(self, idx): + # type: (int) -> X509 + return self.pystack[idx] + + def __iter__(self): + return iter(self.pystack) + + def _ptr(self): + return self.stack + + def push(self, x509): + # type: (X509) -> int + """ + push an X509 certificate onto the stack. + + :param x509: X509 object. + + :return: The number of X509 objects currently on the stack. + """ + assert isinstance(x509, X509) + self.pystack.append(x509) + ret = m2.sk_x509_push(self.stack, x509._ptr()) + assert ret == len(self.pystack) + return ret + + def pop(self): + # type: () -> X509 + """ + pop a certificate from the stack. + + :return: X509 object that was popped, or None if there is + nothing to pop. + """ + x509_ptr = m2.sk_x509_pop(self.stack) + if x509_ptr is None: + assert len(self.pystack) == 0 + return None + return self.pystack.pop() + + def as_der(self): + # type: () -> bytes + """ + Return the stack as a DER encoded string + """ + return m2.get_der_encoding_stack(self.stack) + + +def new_stack_from_der(der_string): + # type: (bytes) -> X509_Stack + """ + Create a new X509_Stack from DER string. + + :return: X509_Stack + """ + der_string = six.ensure_binary(der_string) + stack_ptr = m2.make_stack_from_der_sequence(der_string) + return X509_Stack(stack_ptr, 1, 1) + + +class Request(object): + """ + X509 Certificate Request. + """ + + m2_x509_req_free = m2.x509_req_free + + def __init__(self, req=None, _pyfree=0): + # type: (Optional[int], int) -> None + if req is not None: + self.req = req + self._pyfree = _pyfree + else: + self.req = m2.x509_req_new() + m2.x509_req_set_version(self.req, 0) + self._pyfree = 1 + + def __del__(self): + # type: () -> None + if getattr(self, '_pyfree', 0): + self.m2_x509_req_free(self.req) + + def as_text(self): + # type: () -> str + buf = BIO.MemoryBuffer() + m2.x509_req_print(buf.bio_ptr(), self.req) + return six.ensure_text(buf.read_all()) + + def as_pem(self): + # type: () -> bytes + buf = BIO.MemoryBuffer() + m2.x509_req_write_pem(buf.bio_ptr(), self.req) + return buf.read_all() + + def as_der(self): + # type: () -> bytes + buf = BIO.MemoryBuffer() + m2.i2d_x509_req_bio(buf.bio_ptr(), self.req) + return buf.read_all() + + def save_pem(self, filename): + # type: (AnyStr) -> int + with BIO.openfile(filename, 'wb') as bio: + return m2.x509_req_write_pem(bio.bio_ptr(), self.req) + + def save(self, filename, format=FORMAT_PEM): + # type: (AnyStr, int) -> int + """ + Saves X.509 certificate request to a file. Default output + format is PEM. + + :param filename: Name of the file the request will be saved to. + + :param format: Controls what output format is used to save the + request. Either FORMAT_PEM or FORMAT_DER to save + in PEM or DER format. Raises ValueError if an + unknown format is used. + + :return: 1 for success, 0 for failure. + The error code can be obtained by ERR_get_error. + """ + with BIO.openfile(filename, 'wb') as bio: + if format == FORMAT_PEM: + return m2.x509_req_write_pem(bio.bio_ptr(), self.req) + elif format == FORMAT_DER: + return m2.i2d_x509_req_bio(bio.bio_ptr(), self.req) + else: + raise ValueError( + "Unknown filetype. Must be either FORMAT_DER or FORMAT_PEM") + + def get_pubkey(self): + # type: () -> EVP.PKey + """ + Get the public key for the request. + + :return: Public key from the request. + """ + return EVP.PKey(m2.x509_req_get_pubkey(self.req), _pyfree=1) + + def set_pubkey(self, pkey): + # type: (EVP.PKey) -> int + """ + Set the public key for the request. + + :param pkey: Public key + + :return: Return 1 for success and 0 for failure. + """ + return m2.x509_req_set_pubkey(self.req, pkey.pkey) + + def get_version(self): + # type: () -> int + """ + Get version. + + :return: Returns version. + """ + return m2.x509_req_get_version(self.req) + + def set_version(self, version): + # type: (int) -> int + """ + Set version. + + :param version: Version number. + :return: Returns 0 on failure. + """ + return m2.x509_req_set_version(self.req, version) + + def get_subject(self): + # type: () -> X509_Name + return X509_Name(m2.x509_req_get_subject_name(self.req)) + + def set_subject_name(self, name): + # type: (X509_Name) -> int + """ + Set subject name. + + :param name: subjectName field. + :return: 1 for success and 0 for failure + """ + return m2.x509_req_set_subject_name(self.req, name.x509_name) + + set_subject = set_subject_name + + def add_extensions(self, ext_stack): + # type: (X509_Extension_Stack) -> int + """ + Add X509 extensions to this request. + + :param ext_stack: Stack of extensions to add. + :return: 1 for success and 0 for failure + """ + return m2.x509_req_add_extensions(self.req, ext_stack._ptr()) + + def verify(self, pkey): + # type: (EVP.PKey) -> int + """ + + :param pkey: PKey to be verified + :return: 1 for success and 0 for failure + """ + return m2.x509_req_verify(self.req, pkey.pkey) + + def sign(self, pkey, md): + # type: (EVP.PKey, str) -> int + """ + + :param pkey: PKey to be signed + :param md: used algorigthm + :return: 1 for success and 0 for failure + """ + mda = getattr(m2, md, None) + if mda is None: + raise ValueError('unknown message digest', md) + return m2.x509_req_sign(self.req, pkey.pkey, mda()) + + +def load_request(file, format=FORMAT_PEM): + # type: (AnyStr, int) -> Request + """ + Load certificate request from file. + + :param file: Name of file containing certificate request in + either PEM or DER format. + :param format: Describes the format of the file to be loaded, + either PEM or DER. (using constants FORMAT_PEM + and FORMAT_DER) + :return: Request object. + """ + with BIO.openfile(file) as f: + if format == FORMAT_PEM: + cptr = m2.x509_req_read_pem(f.bio_ptr()) + elif format == FORMAT_DER: + cptr = m2.d2i_x509_req(f.bio_ptr()) + else: + raise ValueError( + "Unknown filetype. Must be either FORMAT_PEM or FORMAT_DER") + + return Request(cptr, 1) + + +def load_request_bio(bio, format=FORMAT_PEM): + # type: (BIO.BIO, int) -> Request + """ + Load certificate request from a bio. + + :param bio: BIO pointing at a certificate request in + either DER or PEM format. + :param format: Describes the format of the request to be loaded, + either PEM or DER. (using constants FORMAT_PEM + and FORMAT_DER) + :return: M2Crypto.X509.Request object. + """ + if format == FORMAT_PEM: + cptr = m2.x509_req_read_pem(bio._ptr()) + elif format == FORMAT_DER: + cptr = m2.d2i_x509_req(bio._ptr()) + else: + raise ValueError( + "Unknown format. Must be either FORMAT_DER or FORMAT_PEM") + + return Request(cptr, _pyfree=1) + + +def load_request_string(string, format=FORMAT_PEM): + # type: (AnyStr, int) -> Request + """ + Load certificate request from a string. + + :param string: String containing a certificate request in + either DER or PEM format. + :param format: Describes the format of the request to be loaded, + either PEM or DER. (using constants FORMAT_PEM + and FORMAT_DER) + + :return: M2Crypto.X509.Request object. + """ + string = six.ensure_binary(string) + bio = BIO.MemoryBuffer(string) + return load_request_bio(bio, format) + + +def load_request_der_string(string): + # type: (AnyStr) -> Request + """ + Load certificate request from a string. + + :param string: String containing a certificate request in DER format. + :return: M2Crypto.X509.Request object. + """ + string = six.ensure_binary(string) + bio = BIO.MemoryBuffer(string) + return load_request_bio(bio, FORMAT_DER) + + +class CRL(object): + """ + X509 Certificate Revocation List + """ + + m2_x509_crl_free = m2.x509_crl_free + + def __init__(self, crl=None, _pyfree=0): + # type: (Optional[bytes], int) -> None + """ + + :param crl: binary representation of + the underlying OpenSSL X509_CRL object. + """ + if crl is not None: + self.crl = crl + self._pyfree = _pyfree + else: + self.crl = m2.x509_crl_new() + self._pyfree = 1 + + def __del__(self): + # type: () -> None + if getattr(self, '_pyfree', 0): + self.m2_x509_crl_free(self.crl) + + def as_text(self): + # type: () -> str + """ + Return CRL in PEM format in a string. + + :return: String containing the CRL in PEM format. + """ + buf = BIO.MemoryBuffer() + m2.x509_crl_print(buf.bio_ptr(), self.crl) + return six.ensure_text(buf.read_all()) + + +def load_crl(file): + # type: (AnyStr) -> CRL + """ + Load CRL from file. + + :param file: Name of file containing CRL in PEM format. + + :return: M2Crypto.X509.CRL object. + """ + with BIO.openfile(file) as f: + cptr = m2.x509_crl_read_pem(f.bio_ptr()) + + return CRL(cptr, 1) |