diff options
author | Jean-Paul Calderone <exarkun@twistedmatrix.com> | 2013-12-29 17:00:04 -0500 |
---|---|---|
committer | Jean-Paul Calderone <exarkun@twistedmatrix.com> | 2013-12-29 17:00:04 -0500 |
commit | dba578b61dd8a014f5b7ae28df49d905310c9171 (patch) | |
tree | db33e3acc3043fccfdf5eb94694d14f82d5d7d97 | |
parent | e1316079db9c43c6db73012f905642a72ec5a1a8 (diff) | |
download | pyopenssl-dba578b61dd8a014f5b7ae28df49d905310c9171.tar.gz |
Only crypto uses new_mem_buf so move it back there for ease of exception handling.
Also replace all of the ZeroDivisionErrors in crypto with proper (yet untested) exception-raising behavior.
-rw-r--r-- | OpenSSL/_util.py | 19 | ||||
-rw-r--r-- | OpenSSL/crypto.py | 172 |
2 files changed, 130 insertions, 61 deletions
diff --git a/OpenSSL/_util.py b/OpenSSL/_util.py index 4062fe5..ba00f56 100644 --- a/OpenSSL/_util.py +++ b/OpenSSL/_util.py @@ -14,22 +14,3 @@ def exception_from_error_queue(exceptionType): ffi.string(lib.ERR_reason_error_string(error)))) raise exceptionType(errors) - - - -def new_mem_buf(buffer=None): - if buffer is None: - bio = lib.BIO_new(lib.BIO_s_mem()) - free = lib.BIO_free - else: - data = ffi.new("char[]", buffer) - bio = lib.BIO_new_mem_buf(data, len(buffer)) - # Keep the memory alive as long as the bio is alive! - def free(bio, ref=data): - return lib.BIO_free(bio) - - if bio == ffi.NULL: - 1/0 - - bio = ffi.gc(bio, free) - return bio diff --git a/OpenSSL/crypto.py b/OpenSSL/crypto.py index 437d111..3178b5a 100644 --- a/OpenSSL/crypto.py +++ b/OpenSSL/crypto.py @@ -4,7 +4,6 @@ from functools import partial from OpenSSL._util import ( ffi as _ffi, lib as _lib, - new_mem_buf as _new_mem_buf, exception_from_error_queue as _exception_from_error_queue) FILETYPE_PEM = _lib.SSL_FILETYPE_PEM @@ -25,6 +24,43 @@ class Error(Exception): _raise_current_error = partial(_exception_from_error_queue, Error) +def _untested_error(where): + """ + An OpenSSL API failed somehow. Additionally, the failure which was + encountered isn't one that's exercised by the test suite so future behavior + of pyOpenSSL is now somewhat less predictable. + """ + raise RuntimeError("Unknown %s failure" % (where,)) + + + +def _new_mem_buf(buffer=None): + """ + Allocate a new OpenSSL memory BIO. + + Arrange for the garbage collector to clean it up automatically. + + :param buffer: None or some bytes to use to put into the BIO so that they + can be read out. + """ + if buffer is None: + bio = _lib.BIO_new(_lib.BIO_s_mem()) + free = _lib.BIO_free + else: + data = _ffi.new("char[]", buffer) + bio = _lib.BIO_new_mem_buf(data, len(buffer)) + # Keep the memory alive as long as the bio is alive! + def free(bio, ref=data): + return _lib.BIO_free(bio) + + if bio == _ffi.NULL: + # TODO: This is untested. + _raise_current_error() + + bio = _ffi.gc(bio, free) + return bio + + def _bio_to_string(bio): """ @@ -63,8 +99,7 @@ def _set_asn1_time(boundary, when): if not check_result: raise ValueError("Invalid string") else: - # TODO No tests for this case - raise RuntimeError("Unknown ASN1_GENERALIZEDTIME_set_string failure") + _untested_error() @@ -87,7 +122,16 @@ def _get_asn1_time(timestamp): generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**") _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp) if generalized_timestamp[0] == _ffi.NULL: - 1/0 + # This may happen: + # - if timestamp was not an ASN1_TIME + # - if allocating memory for the ASN1_GENERALIZEDTIME failed + # - if a copy of the time data from timestamp cannot be made for + # the newly allocated ASN1_GENERALIZEDTIME + # + # These are difficult to test. cffi enforces the ASN1_TIME type. + # Memory allocation failures are a pain to trigger + # deterministically. + _untested_error("ASN1_TIME_to_generalizedtime") else: string_timestamp = _ffi.cast( "ASN1_STRING*", generalized_timestamp[0]) @@ -135,22 +179,33 @@ class PKey(object): rsa = _lib.RSA_new() result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL) - if result == -1: - 1/0 + if result == 0: + # TODO: The test for this case is commented out. Different + # builds of OpenSSL appear to have different failure modes that + # make it hard to test. Visual inspection of the OpenSSL + # source reveals that a return value of 0 signals an error. + # Manual testing on a particular build of OpenSSL suggests that + # this is probably the appropriate way to handle those errors. + _raise_current_error() result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa) if not result: - 1/0 + # TODO: It appears as though this can fail if an engine is in + # use which does not support RSA. + _raise_current_error() elif type == TYPE_DSA: dsa = _lib.DSA_generate_parameters( bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL) if dsa == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() if not _lib.DSA_generate_key(dsa): - 1/0 + # TODO: This is untested. + _raise_current_error() if not _lib.EVP_PKEY_assign_DSA(self._pkey, dsa): - 1/0 + # TODO: This is untested. + _raise_current_error() else: raise Error("No such key type") @@ -278,7 +333,8 @@ class X509Name(object): result_buffer = _ffi.new("unsigned char**") data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data) if data_length < 0: - 1/0 + # TODO: This is untested. + _raise_current_error() try: result = _ffi.buffer(result_buffer[0], data_length)[:].decode('utf-8') @@ -306,7 +362,8 @@ class X509Name(object): self._name, result_buffer, len(result_buffer)) if format_result == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() return "<X509Name object '%s'>" % (_ffi.string(result_buffer),) @@ -330,7 +387,8 @@ class X509Name(object): result_buffer = _ffi.new('unsigned char**') encode_result = _lib.i2d_X509_NAME(self._name, result_buffer) if encode_result < 0: - 1/0 + # TODO: This is untested. + _raise_current_error() string_result = _ffi.buffer(result_buffer[0], encode_result)[:] _lib.OPENSSL_free(result_buffer[0]) @@ -434,7 +492,8 @@ class X509Extension(object): def _subjectAltNameString(self): method = _lib.X509V3_EXT_get(self._extension) if method == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() payload = self._extension.value.data length = self._extension.value.length @@ -475,7 +534,8 @@ class X509Extension(object): bio = _new_mem_buf() print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0) if not print_result: - 1/0 + # TODO: This is untested. + _raise_current_error() return _bio_to_string(bio) @@ -530,7 +590,8 @@ class X509Req(object): """ set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey) if not set_result: - 1/0 + # TODO: This is untested. + _raise_current_error() def get_pubkey(self): @@ -542,7 +603,8 @@ class X509Req(object): pkey = PKey.__new__(PKey) pkey._pkey = _lib.X509_REQ_get_pubkey(self._req) if pkey._pkey == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) pkey._only_public = True return pkey @@ -580,7 +642,8 @@ class X509Req(object): name = X509Name.__new__(X509Name) name._name = _lib.X509_REQ_get_subject_name(self._req) if name._name == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() # The name is owned by the X509Req structure. As long as the X509Name # Python object is alive, keep the X509Req Python object alive. @@ -598,7 +661,8 @@ class X509Req(object): """ stack = _lib.sk_X509_EXTENSION_new_null() if stack == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() stack = _ffi.gc(stack, _lib.sk_X509_EXTENSION_free) @@ -611,7 +675,8 @@ class X509Req(object): add_result = _lib.X509_REQ_add_extensions(self._req, stack) if not add_result: - 1/0 + # TODO: This is untested. + _raise_current_error() def sign(self, pkey, digest): @@ -634,7 +699,8 @@ class X509Req(object): sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj) if not sign_result: - 1/0 + # TODO: This is untested. + _raise_current_error() def verify(self, pkey): @@ -785,7 +851,8 @@ class X509(object): self._x509, digest, result_buffer, result_length) if not digest_result: - 1/0 + # TODO: This is untested. + _raise_current_error() return ':'.join([ ch.encode('hex').upper() for ch @@ -982,7 +1049,8 @@ class X509(object): name = X509Name.__new__(X509Name) name._name = which(self._x509) if name._name == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() # The name is owned by the X509 structure. As long as the X509Name # Python object is alive, keep the X509 Python object alive. @@ -996,7 +1064,8 @@ class X509(object): raise TypeError("name must be an X509Name") set_result = which(self._x509, name._name) if not set_result: - 1/0 + # TODO: This is untested. + _raise_current_error() def get_issuer(self): @@ -1210,7 +1279,8 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): def _X509_REVOKED_dup(original): copy = _lib.X509_REVOKED_new() if copy == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() if original.serialNumber != _ffi.NULL: copy.serialNumber = _lib.ASN1_INTEGER_dup(original.serialNumber) @@ -1283,7 +1353,8 @@ class Revoked(object): result = _lib.i2a_ASN1_INTEGER(bio, self._revoked.serialNumber) if result < 0: - 1/0 + # TODO: This is untested. + _raise_current_error() return _bio_to_string(bio) @@ -1318,19 +1389,22 @@ class Revoked(object): new_reason_ext = _lib.ASN1_ENUMERATED_new() if new_reason_ext == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() new_reason_ext = _ffi.gc(new_reason_ext, _lib.ASN1_ENUMERATED_free) set_result = _lib.ASN1_ENUMERATED_set(new_reason_ext, reason_code) if set_result == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() self._delete_reason() add_result = _lib.X509_REVOKED_add1_ext_i2d( self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0) if not add_result: - 1/0 + # TODO: This is untested. + _raise_current_error() def get_reason(self): @@ -1349,7 +1423,8 @@ class Revoked(object): if not print_result: print_result = _lib.M_ASN1_OCTET_STRING_print(bio, ext.value) if print_result == 0: - 1/0 + # TODO: This is untested. + _raise_current_error() return _bio_to_string(bio) @@ -1430,10 +1505,13 @@ class CRL(object): """ copy = _X509_REVOKED_dup(revoked._revoked) if copy == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() add_result = _lib.X509_CRL_add0_revoked(self._crl, copy) - # TODO what check on add_result? + if add_result == 0: + # TODO: This is untested. + _raise_current_error() def export(self, cert, key, type=FILETYPE_PEM, days=100): @@ -1462,12 +1540,14 @@ class CRL(object): bio = _lib.BIO_new(_lib.BIO_s_mem()) if bio == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() # A scratch time object to give different values to different CRL fields sometime = _lib.ASN1_TIME_new() if sometime == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() _lib.X509_gmtime_adj(sometime, 0) _lib.X509_CRL_set_lastUpdate(self._crl, sometime) @@ -1492,7 +1572,8 @@ class CRL(object): "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or FILETYPE_TEXT") if not ret: - 1/0 + # TODO: This is untested. + _raise_current_error() return _bio_to_string(bio) CRLType = CRL @@ -1746,7 +1827,8 @@ class NetscapeSPKI(object): sign_result = _lib.NETSCAPE_SPKI_sign(self._spki, pkey._pkey, digest_obj) if not sign_result: - 1/0 + # TODO: This is untested. + _raise_current_error() def verify(self, key): @@ -1785,7 +1867,8 @@ class NetscapeSPKI(object): pkey = PKey.__new__(PKey) pkey._pkey = _lib.NETSCAPE_SPKI_get_pubkey(self._spki) if pkey._pkey == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) pkey._only_public = True return pkey @@ -1800,7 +1883,8 @@ class NetscapeSPKI(object): """ set_result = _lib.NETSCAPE_SPKI_set_pubkey(self._spki, pkey._pkey) if not set_result: - 1/0 + # TODO: This is untested. + _raise_current_error() NetscapeSPKIType = NetscapeSPKI @@ -1923,7 +2007,8 @@ def dump_certificate_request(type, req): raise ValueError("type argument must be FILETYPE_PEM, FILETYPE_ASN1, or FILETYPE_TEXT") if result_code == 0: - 1/0 + # TODO: This is untested. + _raise_current_error() return _bio_to_string(bio) @@ -1982,7 +2067,8 @@ def sign(pkey, data, digest): md_ctx, signature_buffer, signature_length, pkey._pkey) if final_result != 1: - 1/0 + # TODO: This is untested. + _raise_current_error() return _ffi.buffer(signature_buffer, signature_length[0])[:] @@ -2004,7 +2090,8 @@ def verify(cert, signature, data, digest): pkey = _lib.X509_get_pubkey(cert._x509) if pkey == _ffi.NULL: - 1/0 + # TODO: This is untested. + _raise_current_error() pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) md_ctx = _ffi.new("EVP_MD_CTX*") @@ -2061,7 +2148,8 @@ def load_pkcs7_data(type, buffer): elif type == FILETYPE_ASN1: pass else: - 1/0 + # TODO: This is untested. + _raise_current_error() raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") if pkcs7 == _ffi.NULL: |