summaryrefslogtreecommitdiff
path: root/OpenSSL/crypto.py
diff options
context:
space:
mode:
Diffstat (limited to 'OpenSSL/crypto.py')
-rw-r--r--OpenSSL/crypto.py117
1 files changed, 75 insertions, 42 deletions
diff --git a/OpenSSL/crypto.py b/OpenSSL/crypto.py
index 0c78b31..0a9e8a2 100644
--- a/OpenSSL/crypto.py
+++ b/OpenSSL/crypto.py
@@ -1,10 +1,18 @@
from time import time
+from base64 import b16encode
from functools import partial
+from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__
+
+from six import (
+ integer_types as _integer_types,
+ text_type as _text_type)
from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
- exception_from_error_queue as _exception_from_error_queue)
+ exception_from_error_queue as _exception_from_error_queue,
+ byte_string as _byte_string,
+ native as _native)
FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1
@@ -276,7 +284,7 @@ class X509Name(object):
raise TypeError("attribute name must be string, not '%.200s'" % (
type(value).__name__,))
- nid = _lib.OBJ_txt2nid(name)
+ nid = _lib.OBJ_txt2nid(_byte_string(name))
if nid == _lib.NID_undef:
try:
_raise_current_error()
@@ -294,7 +302,7 @@ class X509Name(object):
_lib.X509_NAME_ENTRY_free(ent)
break
- if isinstance(value, unicode):
+ if isinstance(value, _text_type):
value = value.encode('utf-8')
add_result = _lib.X509_NAME_add_entry_by_NID(
@@ -310,7 +318,7 @@ class X509Name(object):
organization (alias O), organizationalUnit (alias OU), commonName (alias
CN) and more...
"""
- nid = _lib.OBJ_txt2nid(name)
+ nid = _lib.OBJ_txt2nid(_byte_string(name))
if nid == _lib.NID_undef:
# This is a bit weird. OBJ_txt2nid indicated failure, but it seems
# a lower level function, a2d_ASN1_OBJECT, also feels the need to
@@ -344,14 +352,22 @@ class X509Name(object):
return result
- def __cmp__(self, other):
- if not isinstance(other, X509Name):
- return NotImplemented
+ def _cmp(op):
+ def f(self, other):
+ if not isinstance(other, X509Name):
+ return NotImplemented
+ result = _lib.X509_NAME_cmp(self._name, other._name)
+ return op(result, 0)
+ return f
- result = _lib.X509_NAME_cmp(self._name, other._name)
- # TODO result == -2 is an error case that maybe should be checked for
- return result
+ __eq__ = _cmp(__eq__)
+ __ne__ = _cmp(__ne__)
+
+ __lt__ = _cmp(__lt__)
+ __le__ = _cmp(__le__)
+ __gt__ = _cmp(__gt__)
+ __ge__ = _cmp(__ge__)
def __repr__(self):
"""
@@ -365,7 +381,8 @@ class X509Name(object):
# TODO: This is untested.
_raise_current_error()
- return "<X509Name object '%s'>" % (_ffi.string(result_buffer),)
+ return "<X509Name object '%s'>" % (
+ _native(_ffi.string(result_buffer)),)
def hash(self):
@@ -471,7 +488,7 @@ class X509Extension(object):
# with strings? (However, X509V3_EXT_i2d in particular seems like it
# would be a better API to invoke. I do not know where to get the
# ext_struc it desires for its last parameter, though.)
- value = "critical," + value
+ value = b"critical," + value
extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value)
if extension == _ffi.NULL:
@@ -484,9 +501,9 @@ class X509Extension(object):
return _lib.OBJ_obj2nid(self._extension.object)
_prefixes = {
- _lib.GEN_EMAIL: b"email",
- _lib.GEN_DNS: b"DNS",
- _lib.GEN_URI: b"URI",
+ _lib.GEN_EMAIL: "email",
+ _lib.GEN_DNS: "DNS",
+ _lib.GEN_URI: "URI",
}
def _subjectAltNameString(self):
@@ -517,11 +534,12 @@ class X509Extension(object):
except KeyError:
bio = _new_mem_buf()
_lib.GENERAL_NAME_print(bio, name)
- parts.append(_bio_to_string(bio))
+ parts.append(_native(_bio_to_string(bio)))
else:
- value = _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]
- parts.append(label + b":" + value)
- return b", ".join(parts)
+ value = _native(
+ _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:])
+ parts.append(label + ":" + value)
+ return ", ".join(parts)
def __str__(self):
@@ -537,7 +555,7 @@ class X509Extension(object):
# TODO: This is untested.
_raise_current_error()
- return _bio_to_string(bio)
+ return _native(_bio_to_string(bio))
def get_critical(self):
@@ -693,7 +711,7 @@ class X509Req(object):
if not pkey._initialized:
raise ValueError("Key is uninitialized")
- digest_obj = _lib.EVP_get_digestbyname(digest)
+ digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
@@ -806,7 +824,7 @@ class X509(object):
if not pkey._initialized:
raise ValueError("Key is uninitialized")
- evp_md = _lib.EVP_get_digestbyname(digest)
+ evp_md = _lib.EVP_get_digestbyname(_byte_string(digest))
if evp_md == _ffi.NULL:
raise ValueError("No such digest method")
@@ -839,7 +857,7 @@ class X509(object):
:return: The digest of the object
"""
- digest = _lib.EVP_get_digestbyname(digest_name)
+ digest = _lib.EVP_get_digestbyname(_byte_string(digest_name))
if digest == _ffi.NULL:
raise ValueError("No such digest method")
@@ -854,8 +872,8 @@ class X509(object):
# TODO: This is untested.
_raise_current_error()
- return ':'.join([
- ch.encode('hex').upper() for ch
+ return b":".join([
+ b16encode(ch).upper() for ch
in _ffi.buffer(result_buffer, result_length[0])])
@@ -877,7 +895,7 @@ class X509(object):
:return: None
"""
- if not isinstance(serial, (int, long)):
+ if not isinstance(serial, _integer_types):
raise TypeError("serial must be an integer")
hex_serial = hex(serial)[2:]
@@ -1246,7 +1264,11 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None):
bio = _new_mem_buf()
if cipher is not None:
- cipher_obj = _lib.EVP_get_cipherbyname(cipher)
+ if passphrase is None:
+ raise TypeError(
+ "if a value is given for cipher "
+ "one must also be given for passphrase")
+ cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher))
if cipher_obj == _ffi.NULL:
raise ValueError("Invalid cipher name")
else:
@@ -1307,14 +1329,14 @@ class Revoked(object):
# OCSP_crl_reason_str. We use the latter, just like the command line
# program.
_crl_reasons = [
- "unspecified",
- "keyCompromise",
- "CACompromise",
- "affiliationChanged",
- "superseded",
- "cessationOfOperation",
- "certificateHold",
- # "removeFromCRL",
+ b"unspecified",
+ b"keyCompromise",
+ b"CACompromise",
+ b"affiliationChanged",
+ b"superseded",
+ b"cessationOfOperation",
+ b"certificateHold",
+ # b"removeFromCRL",
]
def __init__(self):
@@ -1384,7 +1406,7 @@ class Revoked(object):
elif not isinstance(reason, bytes):
raise TypeError("reason must be None or a byte string")
else:
- reason = reason.lower().replace(' ', '')
+ reason = reason.lower().replace(b' ', b'')
reason_code = [r.lower() for r in self._crl_reasons].index(reason)
new_reason_ext = _lib.ASN1_ENUMERATED_new()
@@ -1821,7 +1843,7 @@ class NetscapeSPKI(object):
if not pkey._initialized:
raise ValueError("Key is uninitialized")
- digest_obj = _lib.EVP_get_digestbyname(digest)
+ digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
@@ -1926,10 +1948,10 @@ class _PassphraseHelper(object):
try:
_exception_from_error_queue(exceptionType)
except exceptionType as e:
- pass
+ from_queue = e
if self._problems:
raise self._problems[0]
- return e
+ return from_queue
def _read_passphrase(self, buf, size, rwflag, userdata):
@@ -1946,7 +1968,7 @@ class _PassphraseHelper(object):
else:
raise ValueError("passphrase returned by callback is too long")
for i in range(len(result)):
- buf[i] = result[i]
+ buf[i] = result[i:i + 1]
return len(result)
except Exception as e:
self._problems.append(e)
@@ -2050,7 +2072,7 @@ def sign(pkey, data, digest):
:param digest: message digest to use
:return: signature
"""
- digest_obj = _lib.EVP_get_digestbyname(digest)
+ digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
@@ -2084,7 +2106,7 @@ def verify(cert, signature, data, digest):
:param digest: message digest to use
:return: None if the signature is correct, raise exception otherwise
"""
- digest_obj = _lib.EVP_get_digestbyname(digest)
+ digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
@@ -2257,3 +2279,14 @@ except ImportError:
else:
_initialize_openssl_threads(get_ident, Lock)
del get_ident, Lock
+
+# There are no direct unit tests for this initialization. It is tested
+# indirectly since it is necessary for functions like dump_privatekey when
+# using encryption.
+#
+# Thus OpenSSL.test.test_crypto.FunctionTests.test_dump_privatekey_passphrase
+# and some other similar tests may fail without this (though they may not if
+# the Python runtime has already done some initialization of the underlying
+# OpenSSL library (and is linked against the same one that cryptography is
+# using)).
+_lib.OpenSSL_add_all_algorithms()