summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Paul Calderone <exarkun@twistedmatrix.com>2013-12-29 17:00:04 -0500
committerJean-Paul Calderone <exarkun@twistedmatrix.com>2013-12-29 17:00:04 -0500
commitdba578b61dd8a014f5b7ae28df49d905310c9171 (patch)
treedb33e3acc3043fccfdf5eb94694d14f82d5d7d97
parente1316079db9c43c6db73012f905642a72ec5a1a8 (diff)
downloadpyopenssl-dba578b61dd8a014f5b7ae28df49d905310c9171.tar.gz
Only crypto uses new_mem_buf so move it back there for ease of exception handling.
Also replace all of the ZeroDivisionErrors in crypto with proper (yet untested) exception-raising behavior.
-rw-r--r--OpenSSL/_util.py19
-rw-r--r--OpenSSL/crypto.py172
2 files changed, 130 insertions, 61 deletions
diff --git a/OpenSSL/_util.py b/OpenSSL/_util.py
index 4062fe5..ba00f56 100644
--- a/OpenSSL/_util.py
+++ b/OpenSSL/_util.py
@@ -14,22 +14,3 @@ def exception_from_error_queue(exceptionType):
ffi.string(lib.ERR_reason_error_string(error))))
raise exceptionType(errors)
-
-
-
-def new_mem_buf(buffer=None):
- if buffer is None:
- bio = lib.BIO_new(lib.BIO_s_mem())
- free = lib.BIO_free
- else:
- data = ffi.new("char[]", buffer)
- bio = lib.BIO_new_mem_buf(data, len(buffer))
- # Keep the memory alive as long as the bio is alive!
- def free(bio, ref=data):
- return lib.BIO_free(bio)
-
- if bio == ffi.NULL:
- 1/0
-
- bio = ffi.gc(bio, free)
- return bio
diff --git a/OpenSSL/crypto.py b/OpenSSL/crypto.py
index 437d111..3178b5a 100644
--- a/OpenSSL/crypto.py
+++ b/OpenSSL/crypto.py
@@ -4,7 +4,6 @@ from functools import partial
from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
- new_mem_buf as _new_mem_buf,
exception_from_error_queue as _exception_from_error_queue)
FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
@@ -25,6 +24,43 @@ class Error(Exception):
_raise_current_error = partial(_exception_from_error_queue, Error)
+def _untested_error(where):
+ """
+ An OpenSSL API failed somehow. Additionally, the failure which was
+ encountered isn't one that's exercised by the test suite so future behavior
+ of pyOpenSSL is now somewhat less predictable.
+ """
+ raise RuntimeError("Unknown %s failure" % (where,))
+
+
+
+def _new_mem_buf(buffer=None):
+ """
+ Allocate a new OpenSSL memory BIO.
+
+ Arrange for the garbage collector to clean it up automatically.
+
+ :param buffer: None or some bytes to use to put into the BIO so that they
+ can be read out.
+ """
+ if buffer is None:
+ bio = _lib.BIO_new(_lib.BIO_s_mem())
+ free = _lib.BIO_free
+ else:
+ data = _ffi.new("char[]", buffer)
+ bio = _lib.BIO_new_mem_buf(data, len(buffer))
+ # Keep the memory alive as long as the bio is alive!
+ def free(bio, ref=data):
+ return _lib.BIO_free(bio)
+
+ if bio == _ffi.NULL:
+ # TODO: This is untested.
+ _raise_current_error()
+
+ bio = _ffi.gc(bio, free)
+ return bio
+
+
def _bio_to_string(bio):
"""
@@ -63,8 +99,7 @@ def _set_asn1_time(boundary, when):
if not check_result:
raise ValueError("Invalid string")
else:
- # TODO No tests for this case
- raise RuntimeError("Unknown ASN1_GENERALIZEDTIME_set_string failure")
+ _untested_error()
@@ -87,7 +122,16 @@ def _get_asn1_time(timestamp):
generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**")
_lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
if generalized_timestamp[0] == _ffi.NULL:
- 1/0
+ # This may happen:
+ # - if timestamp was not an ASN1_TIME
+ # - if allocating memory for the ASN1_GENERALIZEDTIME failed
+ # - if a copy of the time data from timestamp cannot be made for
+ # the newly allocated ASN1_GENERALIZEDTIME
+ #
+ # These are difficult to test. cffi enforces the ASN1_TIME type.
+ # Memory allocation failures are a pain to trigger
+ # deterministically.
+ _untested_error("ASN1_TIME_to_generalizedtime")
else:
string_timestamp = _ffi.cast(
"ASN1_STRING*", generalized_timestamp[0])
@@ -135,22 +179,33 @@ class PKey(object):
rsa = _lib.RSA_new()
result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL)
- if result == -1:
- 1/0
+ if result == 0:
+ # TODO: The test for this case is commented out. Different
+ # builds of OpenSSL appear to have different failure modes that
+ # make it hard to test. Visual inspection of the OpenSSL
+ # source reveals that a return value of 0 signals an error.
+ # Manual testing on a particular build of OpenSSL suggests that
+ # this is probably the appropriate way to handle those errors.
+ _raise_current_error()
result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa)
if not result:
- 1/0
+ # TODO: It appears as though this can fail if an engine is in
+ # use which does not support RSA.
+ _raise_current_error()
elif type == TYPE_DSA:
dsa = _lib.DSA_generate_parameters(
bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL)
if dsa == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
if not _lib.DSA_generate_key(dsa):
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
if not _lib.EVP_PKEY_assign_DSA(self._pkey, dsa):
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
else:
raise Error("No such key type")
@@ -278,7 +333,8 @@ class X509Name(object):
result_buffer = _ffi.new("unsigned char**")
data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data)
if data_length < 0:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
try:
result = _ffi.buffer(result_buffer[0], data_length)[:].decode('utf-8')
@@ -306,7 +362,8 @@ class X509Name(object):
self._name, result_buffer, len(result_buffer))
if format_result == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
return "<X509Name object '%s'>" % (_ffi.string(result_buffer),)
@@ -330,7 +387,8 @@ class X509Name(object):
result_buffer = _ffi.new('unsigned char**')
encode_result = _lib.i2d_X509_NAME(self._name, result_buffer)
if encode_result < 0:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
string_result = _ffi.buffer(result_buffer[0], encode_result)[:]
_lib.OPENSSL_free(result_buffer[0])
@@ -434,7 +492,8 @@ class X509Extension(object):
def _subjectAltNameString(self):
method = _lib.X509V3_EXT_get(self._extension)
if method == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
payload = self._extension.value.data
length = self._extension.value.length
@@ -475,7 +534,8 @@ class X509Extension(object):
bio = _new_mem_buf()
print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0)
if not print_result:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
return _bio_to_string(bio)
@@ -530,7 +590,8 @@ class X509Req(object):
"""
set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey)
if not set_result:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
def get_pubkey(self):
@@ -542,7 +603,8 @@ class X509Req(object):
pkey = PKey.__new__(PKey)
pkey._pkey = _lib.X509_REQ_get_pubkey(self._req)
if pkey._pkey == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
pkey._only_public = True
return pkey
@@ -580,7 +642,8 @@ class X509Req(object):
name = X509Name.__new__(X509Name)
name._name = _lib.X509_REQ_get_subject_name(self._req)
if name._name == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
# The name is owned by the X509Req structure. As long as the X509Name
# Python object is alive, keep the X509Req Python object alive.
@@ -598,7 +661,8 @@ class X509Req(object):
"""
stack = _lib.sk_X509_EXTENSION_new_null()
if stack == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
stack = _ffi.gc(stack, _lib.sk_X509_EXTENSION_free)
@@ -611,7 +675,8 @@ class X509Req(object):
add_result = _lib.X509_REQ_add_extensions(self._req, stack)
if not add_result:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
def sign(self, pkey, digest):
@@ -634,7 +699,8 @@ class X509Req(object):
sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj)
if not sign_result:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
def verify(self, pkey):
@@ -785,7 +851,8 @@ class X509(object):
self._x509, digest, result_buffer, result_length)
if not digest_result:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
return ':'.join([
ch.encode('hex').upper() for ch
@@ -982,7 +1049,8 @@ class X509(object):
name = X509Name.__new__(X509Name)
name._name = which(self._x509)
if name._name == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
# The name is owned by the X509 structure. As long as the X509Name
# Python object is alive, keep the X509 Python object alive.
@@ -996,7 +1064,8 @@ class X509(object):
raise TypeError("name must be an X509Name")
set_result = which(self._x509, name._name)
if not set_result:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
def get_issuer(self):
@@ -1210,7 +1279,8 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None):
def _X509_REVOKED_dup(original):
copy = _lib.X509_REVOKED_new()
if copy == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
if original.serialNumber != _ffi.NULL:
copy.serialNumber = _lib.ASN1_INTEGER_dup(original.serialNumber)
@@ -1283,7 +1353,8 @@ class Revoked(object):
result = _lib.i2a_ASN1_INTEGER(bio, self._revoked.serialNumber)
if result < 0:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
return _bio_to_string(bio)
@@ -1318,19 +1389,22 @@ class Revoked(object):
new_reason_ext = _lib.ASN1_ENUMERATED_new()
if new_reason_ext == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
new_reason_ext = _ffi.gc(new_reason_ext, _lib.ASN1_ENUMERATED_free)
set_result = _lib.ASN1_ENUMERATED_set(new_reason_ext, reason_code)
if set_result == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
self._delete_reason()
add_result = _lib.X509_REVOKED_add1_ext_i2d(
self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0)
if not add_result:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
def get_reason(self):
@@ -1349,7 +1423,8 @@ class Revoked(object):
if not print_result:
print_result = _lib.M_ASN1_OCTET_STRING_print(bio, ext.value)
if print_result == 0:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
return _bio_to_string(bio)
@@ -1430,10 +1505,13 @@ class CRL(object):
"""
copy = _X509_REVOKED_dup(revoked._revoked)
if copy == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
add_result = _lib.X509_CRL_add0_revoked(self._crl, copy)
- # TODO what check on add_result?
+ if add_result == 0:
+ # TODO: This is untested.
+ _raise_current_error()
def export(self, cert, key, type=FILETYPE_PEM, days=100):
@@ -1462,12 +1540,14 @@ class CRL(object):
bio = _lib.BIO_new(_lib.BIO_s_mem())
if bio == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
# A scratch time object to give different values to different CRL fields
sometime = _lib.ASN1_TIME_new()
if sometime == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
_lib.X509_gmtime_adj(sometime, 0)
_lib.X509_CRL_set_lastUpdate(self._crl, sometime)
@@ -1492,7 +1572,8 @@ class CRL(object):
"type argument must be FILETYPE_PEM, FILETYPE_ASN1, or FILETYPE_TEXT")
if not ret:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
return _bio_to_string(bio)
CRLType = CRL
@@ -1746,7 +1827,8 @@ class NetscapeSPKI(object):
sign_result = _lib.NETSCAPE_SPKI_sign(self._spki, pkey._pkey, digest_obj)
if not sign_result:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
def verify(self, key):
@@ -1785,7 +1867,8 @@ class NetscapeSPKI(object):
pkey = PKey.__new__(PKey)
pkey._pkey = _lib.NETSCAPE_SPKI_get_pubkey(self._spki)
if pkey._pkey == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
pkey._only_public = True
return pkey
@@ -1800,7 +1883,8 @@ class NetscapeSPKI(object):
"""
set_result = _lib.NETSCAPE_SPKI_set_pubkey(self._spki, pkey._pkey)
if not set_result:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
NetscapeSPKIType = NetscapeSPKI
@@ -1923,7 +2007,8 @@ def dump_certificate_request(type, req):
raise ValueError("type argument must be FILETYPE_PEM, FILETYPE_ASN1, or FILETYPE_TEXT")
if result_code == 0:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
return _bio_to_string(bio)
@@ -1982,7 +2067,8 @@ def sign(pkey, data, digest):
md_ctx, signature_buffer, signature_length, pkey._pkey)
if final_result != 1:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
return _ffi.buffer(signature_buffer, signature_length[0])[:]
@@ -2004,7 +2090,8 @@ def verify(cert, signature, data, digest):
pkey = _lib.X509_get_pubkey(cert._x509)
if pkey == _ffi.NULL:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
md_ctx = _ffi.new("EVP_MD_CTX*")
@@ -2061,7 +2148,8 @@ def load_pkcs7_data(type, buffer):
elif type == FILETYPE_ASN1:
pass
else:
- 1/0
+ # TODO: This is untested.
+ _raise_current_error()
raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
if pkcs7 == _ffi.NULL: