diff options
Diffstat (limited to 'OpenSSL/crypto.py')
-rw-r--r-- | OpenSSL/crypto.py | 117 |
1 files changed, 75 insertions, 42 deletions
diff --git a/OpenSSL/crypto.py b/OpenSSL/crypto.py index 0c78b31..0a9e8a2 100644 --- a/OpenSSL/crypto.py +++ b/OpenSSL/crypto.py @@ -1,10 +1,18 @@ from time import time +from base64 import b16encode from functools import partial +from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__ + +from six import ( + integer_types as _integer_types, + text_type as _text_type) from OpenSSL._util import ( ffi as _ffi, lib as _lib, - exception_from_error_queue as _exception_from_error_queue) + exception_from_error_queue as _exception_from_error_queue, + byte_string as _byte_string, + native as _native) FILETYPE_PEM = _lib.SSL_FILETYPE_PEM FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1 @@ -276,7 +284,7 @@ class X509Name(object): raise TypeError("attribute name must be string, not '%.200s'" % ( type(value).__name__,)) - nid = _lib.OBJ_txt2nid(name) + nid = _lib.OBJ_txt2nid(_byte_string(name)) if nid == _lib.NID_undef: try: _raise_current_error() @@ -294,7 +302,7 @@ class X509Name(object): _lib.X509_NAME_ENTRY_free(ent) break - if isinstance(value, unicode): + if isinstance(value, _text_type): value = value.encode('utf-8') add_result = _lib.X509_NAME_add_entry_by_NID( @@ -310,7 +318,7 @@ class X509Name(object): organization (alias O), organizationalUnit (alias OU), commonName (alias CN) and more... """ - nid = _lib.OBJ_txt2nid(name) + nid = _lib.OBJ_txt2nid(_byte_string(name)) if nid == _lib.NID_undef: # This is a bit weird. OBJ_txt2nid indicated failure, but it seems # a lower level function, a2d_ASN1_OBJECT, also feels the need to @@ -344,14 +352,22 @@ class X509Name(object): return result - def __cmp__(self, other): - if not isinstance(other, X509Name): - return NotImplemented + def _cmp(op): + def f(self, other): + if not isinstance(other, X509Name): + return NotImplemented + result = _lib.X509_NAME_cmp(self._name, other._name) + return op(result, 0) + return f - result = _lib.X509_NAME_cmp(self._name, other._name) - # TODO result == -2 is an error case that maybe should be checked for - return result + __eq__ = _cmp(__eq__) + __ne__ = _cmp(__ne__) + + __lt__ = _cmp(__lt__) + __le__ = _cmp(__le__) + __gt__ = _cmp(__gt__) + __ge__ = _cmp(__ge__) def __repr__(self): """ @@ -365,7 +381,8 @@ class X509Name(object): # TODO: This is untested. _raise_current_error() - return "<X509Name object '%s'>" % (_ffi.string(result_buffer),) + return "<X509Name object '%s'>" % ( + _native(_ffi.string(result_buffer)),) def hash(self): @@ -471,7 +488,7 @@ class X509Extension(object): # with strings? (However, X509V3_EXT_i2d in particular seems like it # would be a better API to invoke. I do not know where to get the # ext_struc it desires for its last parameter, though.) - value = "critical," + value + value = b"critical," + value extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value) if extension == _ffi.NULL: @@ -484,9 +501,9 @@ class X509Extension(object): return _lib.OBJ_obj2nid(self._extension.object) _prefixes = { - _lib.GEN_EMAIL: b"email", - _lib.GEN_DNS: b"DNS", - _lib.GEN_URI: b"URI", + _lib.GEN_EMAIL: "email", + _lib.GEN_DNS: "DNS", + _lib.GEN_URI: "URI", } def _subjectAltNameString(self): @@ -517,11 +534,12 @@ class X509Extension(object): except KeyError: bio = _new_mem_buf() _lib.GENERAL_NAME_print(bio, name) - parts.append(_bio_to_string(bio)) + parts.append(_native(_bio_to_string(bio))) else: - value = _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:] - parts.append(label + b":" + value) - return b", ".join(parts) + value = _native( + _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]) + parts.append(label + ":" + value) + return ", ".join(parts) def __str__(self): @@ -537,7 +555,7 @@ class X509Extension(object): # TODO: This is untested. _raise_current_error() - return _bio_to_string(bio) + return _native(_bio_to_string(bio)) def get_critical(self): @@ -693,7 +711,7 @@ class X509Req(object): if not pkey._initialized: raise ValueError("Key is uninitialized") - digest_obj = _lib.EVP_get_digestbyname(digest) + digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) if digest_obj == _ffi.NULL: raise ValueError("No such digest method") @@ -806,7 +824,7 @@ class X509(object): if not pkey._initialized: raise ValueError("Key is uninitialized") - evp_md = _lib.EVP_get_digestbyname(digest) + evp_md = _lib.EVP_get_digestbyname(_byte_string(digest)) if evp_md == _ffi.NULL: raise ValueError("No such digest method") @@ -839,7 +857,7 @@ class X509(object): :return: The digest of the object """ - digest = _lib.EVP_get_digestbyname(digest_name) + digest = _lib.EVP_get_digestbyname(_byte_string(digest_name)) if digest == _ffi.NULL: raise ValueError("No such digest method") @@ -854,8 +872,8 @@ class X509(object): # TODO: This is untested. _raise_current_error() - return ':'.join([ - ch.encode('hex').upper() for ch + return b":".join([ + b16encode(ch).upper() for ch in _ffi.buffer(result_buffer, result_length[0])]) @@ -877,7 +895,7 @@ class X509(object): :return: None """ - if not isinstance(serial, (int, long)): + if not isinstance(serial, _integer_types): raise TypeError("serial must be an integer") hex_serial = hex(serial)[2:] @@ -1246,7 +1264,11 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): bio = _new_mem_buf() if cipher is not None: - cipher_obj = _lib.EVP_get_cipherbyname(cipher) + if passphrase is None: + raise TypeError( + "if a value is given for cipher " + "one must also be given for passphrase") + cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher)) if cipher_obj == _ffi.NULL: raise ValueError("Invalid cipher name") else: @@ -1307,14 +1329,14 @@ class Revoked(object): # OCSP_crl_reason_str. We use the latter, just like the command line # program. _crl_reasons = [ - "unspecified", - "keyCompromise", - "CACompromise", - "affiliationChanged", - "superseded", - "cessationOfOperation", - "certificateHold", - # "removeFromCRL", + b"unspecified", + b"keyCompromise", + b"CACompromise", + b"affiliationChanged", + b"superseded", + b"cessationOfOperation", + b"certificateHold", + # b"removeFromCRL", ] def __init__(self): @@ -1384,7 +1406,7 @@ class Revoked(object): elif not isinstance(reason, bytes): raise TypeError("reason must be None or a byte string") else: - reason = reason.lower().replace(' ', '') + reason = reason.lower().replace(b' ', b'') reason_code = [r.lower() for r in self._crl_reasons].index(reason) new_reason_ext = _lib.ASN1_ENUMERATED_new() @@ -1821,7 +1843,7 @@ class NetscapeSPKI(object): if not pkey._initialized: raise ValueError("Key is uninitialized") - digest_obj = _lib.EVP_get_digestbyname(digest) + digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) if digest_obj == _ffi.NULL: raise ValueError("No such digest method") @@ -1926,10 +1948,10 @@ class _PassphraseHelper(object): try: _exception_from_error_queue(exceptionType) except exceptionType as e: - pass + from_queue = e if self._problems: raise self._problems[0] - return e + return from_queue def _read_passphrase(self, buf, size, rwflag, userdata): @@ -1946,7 +1968,7 @@ class _PassphraseHelper(object): else: raise ValueError("passphrase returned by callback is too long") for i in range(len(result)): - buf[i] = result[i] + buf[i] = result[i:i + 1] return len(result) except Exception as e: self._problems.append(e) @@ -2050,7 +2072,7 @@ def sign(pkey, data, digest): :param digest: message digest to use :return: signature """ - digest_obj = _lib.EVP_get_digestbyname(digest) + digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) if digest_obj == _ffi.NULL: raise ValueError("No such digest method") @@ -2084,7 +2106,7 @@ def verify(cert, signature, data, digest): :param digest: message digest to use :return: None if the signature is correct, raise exception otherwise """ - digest_obj = _lib.EVP_get_digestbyname(digest) + digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) if digest_obj == _ffi.NULL: raise ValueError("No such digest method") @@ -2257,3 +2279,14 @@ except ImportError: else: _initialize_openssl_threads(get_ident, Lock) del get_ident, Lock + +# There are no direct unit tests for this initialization. It is tested +# indirectly since it is necessary for functions like dump_privatekey when +# using encryption. +# +# Thus OpenSSL.test.test_crypto.FunctionTests.test_dump_privatekey_passphrase +# and some other similar tests may fail without this (though they may not if +# the Python runtime has already done some initialization of the underlying +# OpenSSL library (and is linked against the same one that cryptography is +# using)). +_lib.OpenSSL_add_all_algorithms() |