summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Paul Calderone <exarkun@twistedmatrix.com>2014-01-11 11:46:06 -0800
committerJean-Paul Calderone <exarkun@twistedmatrix.com>2014-01-11 11:46:06 -0800
commitadd057026d4e6fb3e3fbabc1fdde7b656e850925 (patch)
tree168f81f76f2f4fa38244e99c7438cdeff3a7eecc
parent9efd9c6d5819203fbae65e7d5cd2824f5337b865 (diff)
parentaca50f455fb57e74bb69e270358ca90406d19729 (diff)
downloadpyopenssl-add057026d4e6fb3e3fbabc1fdde7b656e850925.tar.gz
Merge pull request #6 from pyca/python3-fixes
Python 3 fixes.
-rw-r--r--OpenSSL/SSL.py10
-rw-r--r--OpenSSL/__init__.py3
-rw-r--r--OpenSSL/_util.py33
-rw-r--r--OpenSSL/crypto.py102
-rw-r--r--OpenSSL/rand.py12
-rw-r--r--OpenSSL/test/test_crypto.py135
-rw-r--r--OpenSSL/test/test_rand.py4
-rw-r--r--OpenSSL/test/test_ssl.py28
-rw-r--r--OpenSSL/test/util.py22
-rwxr-xr-xsetup.py4
10 files changed, 204 insertions, 149 deletions
diff --git a/OpenSSL/SSL.py b/OpenSSL/SSL.py
index 805693b..8da25e2 100644
--- a/OpenSSL/SSL.py
+++ b/OpenSSL/SSL.py
@@ -7,7 +7,8 @@ from errno import errorcode
from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
- exception_from_error_queue as _exception_from_error_queue)
+ exception_from_error_queue as _exception_from_error_queue,
+ native as _native)
from OpenSSL.crypto import (
FILETYPE_PEM, _PassphraseHelper, PKey, X509Name, X509, X509Store)
@@ -20,7 +21,6 @@ except NameError:
class _memoryview(object):
pass
-
OPENSSL_VERSION_NUMBER = _lib.OPENSSL_VERSION_NUMBER
SSLEAY_VERSION = _lib.SSLEAY_VERSION
SSLEAY_CFLAGS = _lib.SSLEAY_CFLAGS
@@ -571,7 +571,7 @@ class Context(object):
if not isinstance(dhfile, bytes):
raise TypeError("dhfile must be a byte string")
- bio = _lib.BIO_new_file(dhfile, "r")
+ bio = _lib.BIO_new_file(dhfile, b"r")
if bio == _ffi.NULL:
_raise_current_error()
bio = _ffi.gc(bio, _lib.BIO_free)
@@ -895,7 +895,7 @@ class Connection(object):
"""
if not isinstance(name, bytes):
raise TypeError("name must be a byte string")
- elif "\0" in name:
+ elif b"\0" in name:
raise TypeError("name must not contain NUL byte")
# XXX I guess this can fail sometimes?
@@ -1157,7 +1157,7 @@ class Connection(object):
result = _lib.SSL_get_cipher_list(self._ssl, i)
if result == _ffi.NULL:
break
- ciphers.append(_ffi.string(result))
+ ciphers.append(_native(_ffi.string(result)))
return ciphers
diff --git a/OpenSSL/__init__.py b/OpenSSL/__init__.py
index db96e1f..396a97f 100644
--- a/OpenSSL/__init__.py
+++ b/OpenSSL/__init__.py
@@ -10,3 +10,6 @@ from OpenSSL.version import __version__
__all__ = [
'rand', 'crypto', 'SSL', 'tsafe', '__version__']
+
+
+# ERR_load_crypto_strings, SSL_library_init, ERR_load_SSL_strings
diff --git a/OpenSSL/_util.py b/OpenSSL/_util.py
index 001a873..7c606b9 100644
--- a/OpenSSL/_util.py
+++ b/OpenSSL/_util.py
@@ -1,3 +1,5 @@
+from six import PY3, binary_type, text_type
+
from cryptography.hazmat.bindings.openssl.binding import Binding
binding = Binding()
ffi = binding.ffi
@@ -15,3 +17,34 @@ def exception_from_error_queue(exceptionType):
ffi.string(lib.ERR_reason_error_string(error))))
raise exceptionType(errors)
+
+
+
+def native(s):
+ """
+ Convert :py:class:`bytes` or :py:class:`unicode` to the native
+ :py:class:`str` type, using UTF-8 encoding if conversion is necessary.
+
+ :raise UnicodeError: The input string is not UTF-8 decodeable.
+
+ :raise TypeError: The input is neither :py:class:`bytes` nor
+ :py:class:`unicode`.
+ """
+ if not isinstance(s, (binary_type, text_type)):
+ raise TypeError("%r is neither bytes nor unicode" % s)
+ if PY3:
+ if isinstance(s, binary_type):
+ return s.decode("utf-8")
+ else:
+ if isinstance(s, text_type):
+ return s.encode("utf-8")
+ return s
+
+
+
+if PY3:
+ def byte_string(s):
+ return s.encode("charmap")
+else:
+ def byte_string(s):
+ return s
diff --git a/OpenSSL/crypto.py b/OpenSSL/crypto.py
index 103deab..0a9e8a2 100644
--- a/OpenSSL/crypto.py
+++ b/OpenSSL/crypto.py
@@ -1,10 +1,18 @@
from time import time
+from base64 import b16encode
from functools import partial
+from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__
+
+from six import (
+ integer_types as _integer_types,
+ text_type as _text_type)
from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
- exception_from_error_queue as _exception_from_error_queue)
+ exception_from_error_queue as _exception_from_error_queue,
+ byte_string as _byte_string,
+ native as _native)
FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1
@@ -276,7 +284,7 @@ class X509Name(object):
raise TypeError("attribute name must be string, not '%.200s'" % (
type(value).__name__,))
- nid = _lib.OBJ_txt2nid(name)
+ nid = _lib.OBJ_txt2nid(_byte_string(name))
if nid == _lib.NID_undef:
try:
_raise_current_error()
@@ -294,7 +302,7 @@ class X509Name(object):
_lib.X509_NAME_ENTRY_free(ent)
break
- if isinstance(value, unicode):
+ if isinstance(value, _text_type):
value = value.encode('utf-8')
add_result = _lib.X509_NAME_add_entry_by_NID(
@@ -310,7 +318,7 @@ class X509Name(object):
organization (alias O), organizationalUnit (alias OU), commonName (alias
CN) and more...
"""
- nid = _lib.OBJ_txt2nid(name)
+ nid = _lib.OBJ_txt2nid(_byte_string(name))
if nid == _lib.NID_undef:
# This is a bit weird. OBJ_txt2nid indicated failure, but it seems
# a lower level function, a2d_ASN1_OBJECT, also feels the need to
@@ -344,14 +352,22 @@ class X509Name(object):
return result
- def __cmp__(self, other):
- if not isinstance(other, X509Name):
- return NotImplemented
+ def _cmp(op):
+ def f(self, other):
+ if not isinstance(other, X509Name):
+ return NotImplemented
+ result = _lib.X509_NAME_cmp(self._name, other._name)
+ return op(result, 0)
+ return f
- result = _lib.X509_NAME_cmp(self._name, other._name)
- # TODO result == -2 is an error case that maybe should be checked for
- return result
+ __eq__ = _cmp(__eq__)
+ __ne__ = _cmp(__ne__)
+
+ __lt__ = _cmp(__lt__)
+ __le__ = _cmp(__le__)
+ __gt__ = _cmp(__gt__)
+ __ge__ = _cmp(__ge__)
def __repr__(self):
"""
@@ -365,7 +381,8 @@ class X509Name(object):
# TODO: This is untested.
_raise_current_error()
- return "<X509Name object '%s'>" % (_ffi.string(result_buffer),)
+ return "<X509Name object '%s'>" % (
+ _native(_ffi.string(result_buffer)),)
def hash(self):
@@ -471,7 +488,7 @@ class X509Extension(object):
# with strings? (However, X509V3_EXT_i2d in particular seems like it
# would be a better API to invoke. I do not know where to get the
# ext_struc it desires for its last parameter, though.)
- value = "critical," + value
+ value = b"critical," + value
extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value)
if extension == _ffi.NULL:
@@ -484,9 +501,9 @@ class X509Extension(object):
return _lib.OBJ_obj2nid(self._extension.object)
_prefixes = {
- _lib.GEN_EMAIL: b"email",
- _lib.GEN_DNS: b"DNS",
- _lib.GEN_URI: b"URI",
+ _lib.GEN_EMAIL: "email",
+ _lib.GEN_DNS: "DNS",
+ _lib.GEN_URI: "URI",
}
def _subjectAltNameString(self):
@@ -517,11 +534,12 @@ class X509Extension(object):
except KeyError:
bio = _new_mem_buf()
_lib.GENERAL_NAME_print(bio, name)
- parts.append(_bio_to_string(bio))
+ parts.append(_native(_bio_to_string(bio)))
else:
- value = _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]
- parts.append(label + b":" + value)
- return b", ".join(parts)
+ value = _native(
+ _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:])
+ parts.append(label + ":" + value)
+ return ", ".join(parts)
def __str__(self):
@@ -537,7 +555,7 @@ class X509Extension(object):
# TODO: This is untested.
_raise_current_error()
- return _bio_to_string(bio)
+ return _native(_bio_to_string(bio))
def get_critical(self):
@@ -693,7 +711,7 @@ class X509Req(object):
if not pkey._initialized:
raise ValueError("Key is uninitialized")
- digest_obj = _lib.EVP_get_digestbyname(digest)
+ digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
@@ -806,7 +824,7 @@ class X509(object):
if not pkey._initialized:
raise ValueError("Key is uninitialized")
- evp_md = _lib.EVP_get_digestbyname(digest)
+ evp_md = _lib.EVP_get_digestbyname(_byte_string(digest))
if evp_md == _ffi.NULL:
raise ValueError("No such digest method")
@@ -839,7 +857,7 @@ class X509(object):
:return: The digest of the object
"""
- digest = _lib.EVP_get_digestbyname(digest_name)
+ digest = _lib.EVP_get_digestbyname(_byte_string(digest_name))
if digest == _ffi.NULL:
raise ValueError("No such digest method")
@@ -854,8 +872,8 @@ class X509(object):
# TODO: This is untested.
_raise_current_error()
- return ':'.join([
- ch.encode('hex').upper() for ch
+ return b":".join([
+ b16encode(ch).upper() for ch
in _ffi.buffer(result_buffer, result_length[0])])
@@ -877,7 +895,7 @@ class X509(object):
:return: None
"""
- if not isinstance(serial, (int, long)):
+ if not isinstance(serial, _integer_types):
raise TypeError("serial must be an integer")
hex_serial = hex(serial)[2:]
@@ -1246,7 +1264,11 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None):
bio = _new_mem_buf()
if cipher is not None:
- cipher_obj = _lib.EVP_get_cipherbyname(cipher)
+ if passphrase is None:
+ raise TypeError(
+ "if a value is given for cipher "
+ "one must also be given for passphrase")
+ cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher))
if cipher_obj == _ffi.NULL:
raise ValueError("Invalid cipher name")
else:
@@ -1307,14 +1329,14 @@ class Revoked(object):
# OCSP_crl_reason_str. We use the latter, just like the command line
# program.
_crl_reasons = [
- "unspecified",
- "keyCompromise",
- "CACompromise",
- "affiliationChanged",
- "superseded",
- "cessationOfOperation",
- "certificateHold",
- # "removeFromCRL",
+ b"unspecified",
+ b"keyCompromise",
+ b"CACompromise",
+ b"affiliationChanged",
+ b"superseded",
+ b"cessationOfOperation",
+ b"certificateHold",
+ # b"removeFromCRL",
]
def __init__(self):
@@ -1384,7 +1406,7 @@ class Revoked(object):
elif not isinstance(reason, bytes):
raise TypeError("reason must be None or a byte string")
else:
- reason = reason.lower().replace(' ', '')
+ reason = reason.lower().replace(b' ', b'')
reason_code = [r.lower() for r in self._crl_reasons].index(reason)
new_reason_ext = _lib.ASN1_ENUMERATED_new()
@@ -1821,7 +1843,7 @@ class NetscapeSPKI(object):
if not pkey._initialized:
raise ValueError("Key is uninitialized")
- digest_obj = _lib.EVP_get_digestbyname(digest)
+ digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
@@ -1946,7 +1968,7 @@ class _PassphraseHelper(object):
else:
raise ValueError("passphrase returned by callback is too long")
for i in range(len(result)):
- buf[i] = result[i]
+ buf[i] = result[i:i + 1]
return len(result)
except Exception as e:
self._problems.append(e)
@@ -2050,7 +2072,7 @@ def sign(pkey, data, digest):
:param digest: message digest to use
:return: signature
"""
- digest_obj = _lib.EVP_get_digestbyname(digest)
+ digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
@@ -2084,7 +2106,7 @@ def verify(cert, signature, data, digest):
:param digest: message digest to use
:return: None if the signature is correct, raise exception otherwise
"""
- digest_obj = _lib.EVP_get_digestbyname(digest)
+ digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
if digest_obj == _ffi.NULL:
raise ValueError("No such digest method")
diff --git a/OpenSSL/rand.py b/OpenSSL/rand.py
index cddfd2d..e754378 100644
--- a/OpenSSL/rand.py
+++ b/OpenSSL/rand.py
@@ -6,6 +6,8 @@ See the file RATIONALE for a short explanation of why this module was written.
from functools import partial
+from six import integer_types as _integer_types
+
from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
@@ -30,7 +32,7 @@ def bytes(num_bytes):
:param num_bytes: The number of bytes to fetch
:return: A string of random bytes
"""
- if not isinstance(num_bytes, (int, long)):
+ if not isinstance(num_bytes, _integer_types):
raise TypeError("num_bytes must be an integer")
if num_bytes < 0:
@@ -102,8 +104,8 @@ def egd(path, bytes=_unspecified):
:returns: The number of bytes read (NB: a value of 0 isn't necessarily an
error, check rand.status())
"""
- if not isinstance(path, str):
- raise TypeError("path must be a string")
+ if not isinstance(path, _builtin_bytes):
+ raise TypeError("path must be a byte string")
if bytes is _unspecified:
bytes = 255
@@ -134,7 +136,7 @@ def load_file(filename, maxbytes=_unspecified):
to read the entire file
:return: The number of bytes read
"""
- if not isinstance(filename, str):
+ if not isinstance(filename, _builtin_bytes):
raise TypeError("filename must be a string")
if maxbytes is _unspecified:
@@ -153,7 +155,7 @@ def write_file(filename):
:param filename: The file to write data to
:return: The number of bytes written
"""
- if not isinstance(filename, str):
+ if not isinstance(filename, _builtin_bytes):
raise TypeError("filename must be a string")
return _lib.RAND_write_file(filename)
diff --git a/OpenSSL/test/test_crypto.py b/OpenSSL/test/test_crypto.py
index a87a5e8..51da99b 100644
--- a/OpenSSL/test/test_crypto.py
+++ b/OpenSSL/test/test_crypto.py
@@ -11,6 +11,8 @@ import os, re
from subprocess import PIPE, Popen
from datetime import datetime, timedelta
+from six import binary_type
+
from OpenSSL.crypto import TYPE_RSA, TYPE_DSA, Error, PKey, PKeyType
from OpenSSL.crypto import X509, X509Type, X509Name, X509NameType
from OpenSSL.crypto import X509Store, X509StoreType, X509Req, X509ReqType
@@ -24,7 +26,8 @@ from OpenSSL.crypto import PKCS12, PKCS12Type, load_pkcs12
from OpenSSL.crypto import CRL, Revoked, load_crl
from OpenSSL.crypto import NetscapeSPKI, NetscapeSPKIType
from OpenSSL.crypto import sign, verify
-from OpenSSL.test.util import TestCase, bytes, b
+from OpenSSL.test.util import TestCase, b
+from OpenSSL._util import native
def normalize_certificate_pem(pem):
return dump_certificate(FILETYPE_PEM, load_certificate(FILETYPE_PEM, pem))
@@ -34,6 +37,12 @@ def normalize_privatekey_pem(pem):
return dump_privatekey(FILETYPE_PEM, load_privatekey(FILETYPE_PEM, pem))
+GOOD_CIPHER = "blowfish"
+BAD_CIPHER = "zippers"
+
+GOOD_DIGEST = "MD5"
+BAD_DIGEST = "monkeys"
+
root_cert_pem = b("""-----BEGIN CERTIFICATE-----
MIIC7TCCAlagAwIBAgIIPQzE4MbeufQwDQYJKoZIhvcNAQEFBQAwWDELMAkGA1UE
BhMCVVMxCzAJBgNVBAgTAklMMRAwDgYDVQQHEwdDaGljYWdvMRAwDgYDVQQKEwdU
@@ -957,7 +966,7 @@ class _PKeyInteractionTestsMixin:
"""
request = self.signable()
key = PKey()
- self.assertRaises(ValueError, request.sign, key, 'MD5')
+ self.assertRaises(ValueError, request.sign, key, GOOD_DIGEST)
def test_signWithPublicKey(self):
@@ -970,7 +979,7 @@ class _PKeyInteractionTestsMixin:
key.generate_key(TYPE_RSA, 512)
request.set_pubkey(key)
pub = request.get_pubkey()
- self.assertRaises(ValueError, request.sign, pub, 'MD5')
+ self.assertRaises(ValueError, request.sign, pub, GOOD_DIGEST)
def test_signWithUnknownDigest(self):
@@ -981,7 +990,7 @@ class _PKeyInteractionTestsMixin:
request = self.signable()
key = PKey()
key.generate_key(TYPE_RSA, 512)
- self.assertRaises(ValueError, request.sign, key, "monkeys")
+ self.assertRaises(ValueError, request.sign, key, BAD_DIGEST)
def test_sign(self):
@@ -993,7 +1002,7 @@ class _PKeyInteractionTestsMixin:
key = PKey()
key.generate_key(TYPE_RSA, 512)
request.set_pubkey(key)
- request.sign(key, 'MD5')
+ request.sign(key, GOOD_DIGEST)
# If the type has a verify method, cover that too.
if getattr(request, 'verify', None) is not None:
pub = request.get_pubkey()
@@ -1146,7 +1155,7 @@ class X509ReqTests(TestCase, _PKeyInteractionTestsMixin):
"""
request = X509Req()
pkey = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
- request.sign(pkey, b"SHA1")
+ request.sign(pkey, GOOD_DIGEST)
another_pkey = load_privatekey(FILETYPE_PEM, client_key_pem)
self.assertRaises(Error, request.verify, another_pkey)
@@ -1159,7 +1168,7 @@ class X509ReqTests(TestCase, _PKeyInteractionTestsMixin):
"""
request = X509Req()
pkey = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
- request.sign(pkey, b"SHA1")
+ request.sign(pkey, GOOD_DIGEST)
self.assertEqual(True, request.verify(pkey))
@@ -1438,7 +1447,10 @@ WpOdIpB8KksUTCzV591Nr1wd
"""
cert = X509()
self.assertEqual(
- cert.digest("md5"),
+ # This is MD5 instead of GOOD_DIGEST because the digest algorithm
+ # actually matters to the assertion (ie, another arbitrary, good
+ # digest will not product the same digest).
+ cert.digest("MD5"),
b("A8:EB:07:F8:53:25:0A:F2:56:05:C5:A5:C4:C4:C7:15"))
@@ -1537,7 +1549,7 @@ WpOdIpB8KksUTCzV591Nr1wd
algorithm.
"""
cert = X509()
- self.assertRaises(ValueError, cert.digest, "monkeys")
+ self.assertRaises(ValueError, cert.digest, BAD_DIGEST)
def test_get_subject_wrong_args(self):
@@ -1683,7 +1695,7 @@ WpOdIpB8KksUTCzV591Nr1wd
"""
# This certificate has been modified to indicate a bogus OID in the
# signature algorithm field so that OpenSSL does not recognize it.
- certPEM = """\
+ certPEM = b("""\
-----BEGIN CERTIFICATE-----
MIIC/zCCAmigAwIBAgIBATAGBgJ8BQUAMHsxCzAJBgNVBAYTAlNHMREwDwYDVQQK
EwhNMkNyeXB0bzEUMBIGA1UECxMLTTJDcnlwdG8gQ0ExJDAiBgNVBAMTG00yQ3J5
@@ -1703,7 +1715,7 @@ jEY7xKfpQngV599k1xhl11IMqizDwu0855agrckg2MCTmOI9DZzDD77tAYb+Dk0O
PEVk0Mk/V0aIsDE9bolfCi/i/QWZ3N8s5nTWMNyBBBmoSliWCm4jkkRZRD0ejgTN
tgI5
-----END CERTIFICATE-----
-"""
+""")
cert = load_certificate(FILETYPE_PEM, certPEM)
self.assertRaises(ValueError, cert.get_signature_algorithm)
@@ -1802,7 +1814,7 @@ class PKCS12Tests(TestCase):
A :py:obj:`PKCS12` with only a private key can be exported using
:py:obj:`PKCS12.export` and loaded again using :py:obj:`load_pkcs12`.
"""
- passwd = 'blah'
+ passwd = b"blah"
p12 = PKCS12()
pkey = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
p12.set_privatekey(pkey)
@@ -1829,7 +1841,7 @@ class PKCS12Tests(TestCase):
A :py:obj:`PKCS12` with only a certificate can be exported using
:py:obj:`PKCS12.export` and loaded again using :py:obj:`load_pkcs12`.
"""
- passwd = 'blah'
+ passwd = b"blah"
p12 = PKCS12()
cert = load_certificate(FILETYPE_PEM, cleartextCertificatePEM)
p12.set_certificate(cert)
@@ -1880,7 +1892,7 @@ class PKCS12Tests(TestCase):
return p12
- def check_recovery(self, p12_str, key=None, cert=None, ca=None, passwd='',
+ def check_recovery(self, p12_str, key=None, cert=None, ca=None, passwd=b"",
extra=()):
"""
Use openssl program to confirm three components are recoverable from a
@@ -1888,18 +1900,18 @@ class PKCS12Tests(TestCase):
"""
if key:
recovered_key = _runopenssl(
- p12_str, "pkcs12", '-nocerts', '-nodes', '-passin',
- 'pass:' + passwd, *extra)
+ p12_str, b"pkcs12", b"-nocerts", b"-nodes", b"-passin",
+ b"pass:" + passwd, *extra)
self.assertEqual(recovered_key[-len(key):], key)
if cert:
recovered_cert = _runopenssl(
- p12_str, "pkcs12", '-clcerts', '-nodes', '-passin',
- 'pass:' + passwd, '-nokeys', *extra)
+ p12_str, b"pkcs12", b"-clcerts", b"-nodes", b"-passin",
+ b"pass:" + passwd, b"-nokeys", *extra)
self.assertEqual(recovered_cert[-len(cert):], cert)
if ca:
recovered_cert = _runopenssl(
- p12_str, "pkcs12", '-cacerts', '-nodes', '-passin',
- 'pass:' + passwd, '-nokeys', *extra)
+ p12_str, b"pkcs12", b"-cacerts", b"-nodes", b"-passin",
+ b"pass:" + passwd, b"-nokeys", *extra)
self.assertEqual(recovered_cert[-len(ca):], ca)
@@ -1908,10 +1920,10 @@ class PKCS12Tests(TestCase):
A PKCS12 string generated using the openssl command line can be loaded
with :py:obj:`load_pkcs12` and its components extracted and examined.
"""
- passwd = 'whatever'
+ passwd = b"whatever"
pem = client_key_pem + client_cert_pem
p12_str = _runopenssl(
- pem, "pkcs12", '-export', '-clcerts', '-passout', 'pass:' + passwd)
+ pem, b"pkcs12", b"-export", b"-clcerts", b"-passout", b"pass:" + passwd)
p12 = load_pkcs12(p12_str, passwd)
# verify
self.assertTrue(isinstance(p12, PKCS12))
@@ -1928,8 +1940,8 @@ class PKCS12Tests(TestCase):
which is not a PKCS12 dump.
"""
passwd = 'whatever'
- e = self.assertRaises(Error, load_pkcs12, 'fruit loops', passwd)
- self.assertEqual( e.args[0][0][0], 'asn1 encoding routines')
+ e = self.assertRaises(Error, load_pkcs12, b'fruit loops', passwd)
+ self.assertEqual( e.args[0][0][0], b'asn1 encoding routines')
self.assertEqual( len(e.args[0][0]), 3)
@@ -1959,7 +1971,7 @@ class PKCS12Tests(TestCase):
:py:obj:`PKCS12.get_friendlyname` and :py:obj:`PKCS12_set_friendlyname`, and a
:py:obj:`PKCS12` with a friendly name set can be dumped with :py:obj:`PKCS12.export`.
"""
- passwd = 'Dogmeat[]{}!@#$%^&*()~`?/.,<>-_+=";:'
+ passwd = b'Dogmeat[]{}!@#$%^&*()~`?/.,<>-_+=";:'
p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem)
for friendly_name in [b('Serverlicious'), None, b('###')]:
p12.set_friendlyname(friendly_name)
@@ -1983,7 +1995,7 @@ class PKCS12Tests(TestCase):
export.
"""
p12 = self.gen_pkcs12(client_cert_pem, client_key_pem, root_cert_pem)
- passwd = ''
+ passwd = b""
dumped_p12_empty = p12.export(iter=2, maciter=0, passphrase=passwd)
dumped_p12_none = p12.export(iter=3, maciter=2, passphrase=None)
dumped_p12_nopw = p12.export(iter=9, maciter=4)
@@ -2008,19 +2020,19 @@ class PKCS12Tests(TestCase):
Exporting a PKCS12 with a :py:obj:`maciter` of ``-1`` excludes the MAC
entirely.
"""
- passwd = 'Lake Michigan'
+ passwd = b"Lake Michigan"
p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem)
dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2)
self.check_recovery(
dumped_p12, key=server_key_pem, cert=server_cert_pem,
- passwd=passwd, extra=('-nomacver',))
+ passwd=passwd, extra=(b"-nomacver",))
def test_load_without_mac(self):
"""
Loading a PKCS12 without a MAC does something other than crash.
"""
- passwd = 'Lake Michigan'
+ passwd = b"Lake Michigan"
p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem)
dumped_p12 = p12.export(maciter=-1, passphrase=passwd, iter=2)
try:
@@ -2058,7 +2070,7 @@ class PKCS12Tests(TestCase):
p12 = self.gen_pkcs12(server_cert_pem, server_key_pem, root_cert_pem)
dumped_p12 = p12.export() # no args
self.check_recovery(
- dumped_p12, key=server_key_pem, cert=server_cert_pem, passwd='')
+ dumped_p12, key=server_key_pem, cert=server_cert_pem, passwd=b"")
def test_key_cert_mismatch(self):
@@ -2072,8 +2084,8 @@ class PKCS12Tests(TestCase):
# These quoting functions taken directly from Twisted's twisted.python.win32.
-_cmdLineQuoteRe = re.compile(r'(\\*)"')
-_cmdLineQuoteRe2 = re.compile(r'(\\+)\Z')
+_cmdLineQuoteRe = re.compile(br'(\\*)"')
+_cmdLineQuoteRe2 = re.compile(br'(\\+)\Z')
def cmdLineQuote(s):
"""
Internal method for quoting a single command-line argument.
@@ -2087,8 +2099,8 @@ def cmdLineQuote(s):
:rtype: :py:obj:`str`
:return: A cmd.exe-style quoted string
"""
- s = _cmdLineQuoteRe2.sub(r"\1\1", _cmdLineQuoteRe.sub(r'\1\1\\"', s))
- return '"%s"' % s
+ s = _cmdLineQuoteRe2.sub(br"\1\1", _cmdLineQuoteRe.sub(br'\1\1\\"', s))
+ return b'"' + s + b'"'
@@ -2104,7 +2116,7 @@ def quoteArguments(arguments):
:rtype: :py:obj:`str`
:return: A space-delimited string containing quoted versions of :py:obj:`arguments`
"""
- return ' '.join(map(cmdLineQuote, arguments))
+ return b' '.join(map(cmdLineQuote, arguments))
@@ -2114,11 +2126,12 @@ def _runopenssl(pem, *args):
the given PEM to its stdin. Not safe for quotes.
"""
if os.name == 'posix':
- command = "openssl " + " ".join([
- "'%s'" % (arg.replace("'", "'\\''"),) for arg in args])
+ command = b"openssl " + b" ".join([
+ (b"'" + arg.replace(b"'", b"'\\''") + b"'")
+ for arg in args])
else:
- command = "openssl " + quoteArguments(args)
- proc = Popen(command, shell=True, stdin=PIPE, stdout=PIPE)
+ command = b"openssl " + quoteArguments(args)
+ proc = Popen(native(command), shell=True, stdin=PIPE, stdout=PIPE)
proc.stdin.write(pem)
proc.stdin.close()
output = proc.stdout.read()
@@ -2254,7 +2267,7 @@ class FunctionTests(TestCase):
self.assertRaises(TypeError, dump_privatekey)
# If cipher name is given, password is required.
self.assertRaises(
- ValueError, dump_privatekey, FILETYPE_PEM, PKey(), "foo")
+ TypeError, dump_privatekey, FILETYPE_PEM, PKey(), GOOD_CIPHER)
def test_dump_privatekey_unknown_cipher(self):
@@ -2266,7 +2279,7 @@ class FunctionTests(TestCase):
key.generate_key(TYPE_RSA, 512)
self.assertRaises(
ValueError, dump_privatekey,
- FILETYPE_PEM, key, "zippers", "passphrase")
+ FILETYPE_PEM, key, BAD_CIPHER, "passphrase")
def test_dump_privatekey_invalid_passphrase_type(self):
@@ -2278,7 +2291,7 @@ class FunctionTests(TestCase):
key.generate_key(TYPE_RSA, 512)
self.assertRaises(
TypeError,
- dump_privatekey, FILETYPE_PEM, key, "blowfish", object())
+ dump_privatekey, FILETYPE_PEM, key, GOOD_CIPHER, object())
def test_dump_privatekey_invalid_filetype(self):
@@ -2309,8 +2322,8 @@ class FunctionTests(TestCase):
"""
passphrase = b("foo")
key = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
- pem = dump_privatekey(FILETYPE_PEM, key, "blowfish", passphrase)
- self.assertTrue(isinstance(pem, bytes))
+ pem = dump_privatekey(FILETYPE_PEM, key, GOOD_CIPHER, passphrase)
+ self.assertTrue(isinstance(pem, binary_type))
loadedKey = load_privatekey(FILETYPE_PEM, pem, passphrase)
self.assertTrue(isinstance(loadedKey, PKeyType))
self.assertEqual(loadedKey.type(), key.type())
@@ -2325,7 +2338,7 @@ class FunctionTests(TestCase):
"""
key = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
self.assertRaises(ValueError,
- dump_privatekey, FILETYPE_ASN1, key, "blowfish", "secret")
+ dump_privatekey, FILETYPE_ASN1, key, GOOD_CIPHER, "secret")
def test_dump_certificate(self):
@@ -2337,13 +2350,13 @@ class FunctionTests(TestCase):
dumped_pem = dump_certificate(FILETYPE_PEM, cert)
self.assertEqual(dumped_pem, cleartextCertificatePEM)
dumped_der = dump_certificate(FILETYPE_ASN1, cert)
- good_der = _runopenssl(dumped_pem, "x509", "-outform", "DER")
+ good_der = _runopenssl(dumped_pem, b"x509", b"-outform", b"DER")
self.assertEqual(dumped_der, good_der)
cert2 = load_certificate(FILETYPE_ASN1, dumped_der)
dumped_pem2 = dump_certificate(FILETYPE_PEM, cert2)
self.assertEqual(dumped_pem2, cleartextCertificatePEM)
dumped_text = dump_certificate(FILETYPE_TEXT, cert)
- good_text = _runopenssl(dumped_pem, "x509", "-noout", "-text")
+ good_text = _runopenssl(dumped_pem, b"x509", b"-noout", b"-text")
self.assertEqual(dumped_text, good_text)
@@ -2366,7 +2379,7 @@ class FunctionTests(TestCase):
dumped_der = dump_privatekey(FILETYPE_ASN1, key)
# XXX This OpenSSL call writes "writing RSA key" to standard out. Sad.
- good_der = _runopenssl(dumped_pem, "rsa", "-outform", "DER")
+ good_der = _runopenssl(dumped_pem, b"rsa", b"-outform", b"DER")
self.assertEqual(dumped_der, good_der)
key2 = load_privatekey(FILETYPE_ASN1, dumped_der)
dumped_pem2 = dump_privatekey(FILETYPE_PEM, key2)
@@ -2381,7 +2394,7 @@ class FunctionTests(TestCase):
dumped_pem = dump_privatekey(FILETYPE_PEM, key)
dumped_text = dump_privatekey(FILETYPE_TEXT, key)
- good_text = _runopenssl(dumped_pem, "rsa", "-noout", "-text")
+ good_text = _runopenssl(dumped_pem, b"rsa", b"-noout", b"-text")
self.assertEqual(dumped_text, good_text)
@@ -2393,13 +2406,13 @@ class FunctionTests(TestCase):
dumped_pem = dump_certificate_request(FILETYPE_PEM, req)
self.assertEqual(dumped_pem, cleartextCertificateRequestPEM)
dumped_der = dump_certificate_request(FILETYPE_ASN1, req)
- good_der = _runopenssl(dumped_pem, "req", "-outform", "DER")
+ good_der = _runopenssl(dumped_pem, b"req", b"-outform", b"DER")
self.assertEqual(dumped_der, good_der)
req2 = load_certificate_request(FILETYPE_ASN1, dumped_der)
dumped_pem2 = dump_certificate_request(FILETYPE_PEM, req2)
self.assertEqual(dumped_pem2, cleartextCertificateRequestPEM)
dumped_text = dump_certificate_request(FILETYPE_TEXT, req)
- good_text = _runopenssl(dumped_pem, "req", "-noout", "-text")
+ good_text = _runopenssl(dumped_pem, b"req", b"-noout", b"-text")
self.assertEqual(dumped_text, good_text)
self.assertRaises(ValueError, dump_certificate_request, 100, req)
@@ -2415,8 +2428,8 @@ class FunctionTests(TestCase):
called.append(writing)
return passphrase
key = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
- pem = dump_privatekey(FILETYPE_PEM, key, "blowfish", cb)
- self.assertTrue(isinstance(pem, bytes))
+ pem = dump_privatekey(FILETYPE_PEM, key, GOOD_CIPHER, cb)
+ self.assertTrue(isinstance(pem, binary_type))
self.assertEqual(called, [True])
loadedKey = load_privatekey(FILETYPE_PEM, pem, passphrase)
self.assertTrue(isinstance(loadedKey, PKeyType))
@@ -2434,7 +2447,7 @@ class FunctionTests(TestCase):
key = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
self.assertRaises(ArithmeticError,
- dump_privatekey, FILETYPE_PEM, key, "blowfish", cb)
+ dump_privatekey, FILETYPE_PEM, key, GOOD_CIPHER, cb)
def test_dump_privatekey_passphraseCallbackLength(self):
@@ -2447,7 +2460,7 @@ class FunctionTests(TestCase):
key = load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)
self.assertRaises(ValueError,
- dump_privatekey, FILETYPE_PEM, key, "blowfish", cb)
+ dump_privatekey, FILETYPE_PEM, key, GOOD_CIPHER, cb)
def test_load_pkcs7_data(self):
@@ -2464,7 +2477,7 @@ class FunctionTests(TestCase):
If the data passed to :py:obj:`load_pkcs7_data` is invalid,
:py:obj:`Error` is raised.
"""
- self.assertRaises(Error, load_pkcs7_data, FILETYPE_PEM, "foo")
+ self.assertRaises(Error, load_pkcs7_data, FILETYPE_PEM, b"foo")
@@ -2641,7 +2654,7 @@ class NetscapeSPKITests(TestCase, _PKeyInteractionTestsMixin):
"""
nspki = NetscapeSPKI()
blob = nspki.b64_encode()
- self.assertTrue(isinstance(blob, bytes))
+ self.assertTrue(isinstance(blob, binary_type))
@@ -2799,14 +2812,14 @@ class CRLTests(TestCase):
# PEM format
dumped_crl = crl.export(self.cert, self.pkey, days=20)
- text = _runopenssl(dumped_crl, "crl", "-noout", "-text")
+ text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text")
text.index(b('Serial Number: 03AB'))
text.index(b('Superseded'))
text.index(b('Issuer: /C=US/ST=IL/L=Chicago/O=Testing/CN=Testing Root CA'))
# DER format
dumped_crl = crl.export(self.cert, self.pkey, FILETYPE_ASN1)
- text = _runopenssl(dumped_crl, "crl", "-noout", "-text", "-inform", "DER")
+ text = _runopenssl(dumped_crl, b"crl", b"-noout", b"-text", b"-inform", b"DER")
text.index(b('Serial Number: 03AB'))
text.index(b('Superseded'))
text.index(b('Issuer: /C=US/ST=IL/L=Chicago/O=Testing/CN=Testing Root CA'))
@@ -2927,7 +2940,7 @@ class CRLTests(TestCase):
self.assertEqual(revs[1].get_serial(), b('0100'))
self.assertEqual(revs[1].get_reason(), b('Superseded'))
- der = _runopenssl(crlData, "crl", "-outform", "DER")
+ der = _runopenssl(crlData, b"crl", b"-outform", b"DER")
crl = load_crl(FILETYPE_ASN1, der)
revs = crl.get_revoked()
self.assertEqual(len(revs), 2)
@@ -2960,7 +2973,7 @@ class CRLTests(TestCase):
Calling :py:obj:`OpenSSL.crypto.load_crl` with file data which can't
be loaded raises a :py:obj:`OpenSSL.crypto.Error`.
"""
- self.assertRaises(Error, load_crl, FILETYPE_PEM, "hello, world")
+ self.assertRaises(Error, load_crl, FILETYPE_PEM, b"hello, world")
diff --git a/OpenSSL/test/test_rand.py b/OpenSSL/test/test_rand.py
index 7d2559e..c52cb6b 100644
--- a/OpenSSL/test/test_rand.py
+++ b/OpenSSL/test/test_rand.py
@@ -30,7 +30,7 @@ class RandTests(TestCase):
:py:obj:`OpenSSL.rand.bytes` raises :py:obj:`MemoryError` if more bytes
are requested than will fit in memory.
"""
- self.assertRaises(MemoryError, rand.bytes, sys.maxint)
+ self.assertRaises(MemoryError, rand.bytes, sys.maxsize)
def test_bytes(self):
@@ -190,7 +190,7 @@ class RandTests(TestCase):
rand.write_file(tmpfile)
# Verify length of written file
size = os.stat(tmpfile)[stat.ST_SIZE]
- self.assertEquals(size, 1024)
+ self.assertEqual(1024, size)
# Read random bytes from file
rand.load_file(tmpfile)
rand.load_file(tmpfile, 4) # specify a length
diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py
index ea4fceb..95cb538 100644
--- a/OpenSSL/test/test_ssl.py
+++ b/OpenSSL/test/test_ssl.py
@@ -39,7 +39,7 @@ from OpenSSL.SSL import (
from OpenSSL.SSL import (
Context, ContextType, Session, Connection, ConnectionType, SSLeay_version)
-from OpenSSL.test.util import TestCase, bytes, b
+from OpenSSL.test.util import TestCase, b
from OpenSSL.test.test_crypto import (
cleartextCertificatePEM, cleartextPrivateKeyPEM)
from OpenSSL.test.test_crypto import (
@@ -436,7 +436,7 @@ class ContextTests(TestCase, _LoopbackMixin):
# OpenSSL if the cert and key agree using check_privatekey. Then as
# long as check_privatekey works right we're good...
pem_filename = self.mktemp()
- with open(pem_filename, "w") as pem_file:
+ with open(pem_filename, "wb") as pem_file:
pem_file.write(cleartextCertificatePEM)
ctx = Context(TLSv1_METHOD)
@@ -635,7 +635,7 @@ class ContextTests(TestCase, _LoopbackMixin):
"""
pemFile = self._write_encrypted_pem(b("monkeys are nice"))
def passphraseCallback(maxlen, verify, extra):
- return ""
+ return b""
context = Context(TLSv1_METHOD)
context.set_passwd_cb(passphraseCallback)
@@ -780,7 +780,7 @@ class ContextTests(TestCase, _LoopbackMixin):
# Hash values computed manually with c_rehash to avoid depending on
# c_rehash in the test suite. One is from OpenSSL 0.9.8, the other
# from OpenSSL 1.0.0.
- for name in ['c7adac82.0', 'c3705638.0']:
+ for name in [b'c7adac82.0', b'c3705638.0']:
cafile = join(capath, name)
fObj = open(cafile, 'w')
fObj.write(cleartextCertificatePEM.decode('ascii'))
@@ -830,7 +830,7 @@ class ContextTests(TestCase, _LoopbackMixin):
clientSSL = Connection(context, client)
clientSSL.set_connect_state()
clientSSL.do_handshake()
- clientSSL.send('GET / HTTP/1.0\r\n\r\n')
+ clientSSL.send(b"GET / HTTP/1.0\r\n\r\n")
self.assertTrue(clientSSL.recv(1024))
@@ -940,7 +940,7 @@ class ContextTests(TestCase, _LoopbackMixin):
clientContext = Context(TLSv1_METHOD)
clientContext.set_verify(
VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb)
- clientContext.load_verify_locations('ca.pem')
+ clientContext.load_verify_locations(b"ca.pem")
# Try it out.
self._handshake_test(serverContext, clientContext)
@@ -978,7 +978,7 @@ class ContextTests(TestCase, _LoopbackMixin):
clientContext = Context(TLSv1_METHOD)
clientContext.set_verify(
VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb)
- clientContext.load_verify_locations('ca.pem')
+ clientContext.load_verify_locations(b"ca.pem")
self._handshake_test(serverContext, clientContext)
@@ -1040,7 +1040,7 @@ class ContextTests(TestCase, _LoopbackMixin):
does not exist.
"""
context = Context(TLSv1_METHOD)
- self.assertRaises(Error, context.load_tmp_dh, "hello")
+ self.assertRaises(Error, context.load_tmp_dh, b"hello")
def test_load_tmp_dh(self):
@@ -1063,7 +1063,7 @@ class ContextTests(TestCase, _LoopbackMixin):
connections created with the context object will be able to choose from.
"""
context = Context(TLSv1_METHOD)
- context.set_cipher_list("hello world:EXP-RC4-MD5")
+ context.set_cipher_list(b"hello world:EXP-RC4-MD5")
conn = Connection(context, None)
self.assertEquals(conn.get_cipher_list(), ["EXP-RC4-MD5"])
@@ -1761,7 +1761,7 @@ class ConnectionTests(TestCase, _LoopbackMixin):
client_socket, server_socket = socket_pair()
# Fill up the client's send buffer so Connection won't be able to write
# anything.
- msg = 'x' * 1024
+ msg = b"x" * 1024
for i in range(1024):
try:
client_socket.send(msg)
@@ -1798,8 +1798,8 @@ class ConnectionGetCipherListTests(TestCase):
def test_result(self):
"""
- :py:obj:`Connection.get_cipher_list` returns a :py:obj:`list` of :py:obj:`str` giving the
- names of the ciphers which might be used.
+ :py:obj:`Connection.get_cipher_list` returns a :py:obj:`list` of
+ :py:obj:`bytes` giving the names of the ciphers which might be used.
"""
connection = Connection(Context(TLSv1_METHOD), None)
ciphers = connection.get_cipher_list()
@@ -1919,7 +1919,7 @@ class ConnectionSendallTests(TestCase, _LoopbackMixin):
"""
server, client = self._loopback()
server.sock_shutdown(2)
- exc = self.assertRaises(SysCallError, server.sendall, "hello, world")
+ exc = self.assertRaises(SysCallError, server.sendall, b"hello, world")
self.assertEqual(exc.args[0], EPIPE)
@@ -2250,7 +2250,7 @@ class MemoryBIOTests(TestCase, _LoopbackMixin):
self._interactInMemory(client, server)
size = 2 ** 15
- sent = client.send("x" * size)
+ sent = client.send(b"x" * size)
# Sanity check. We're trying to test what happens when the entire
# input can't be sent. If the entire input was sent, this test is
# meaningless.
diff --git a/OpenSSL/test/util.py b/OpenSSL/test/util.py
index bc398a1..011e7da 100644
--- a/OpenSSL/test/util.py
+++ b/OpenSSL/test/util.py
@@ -19,16 +19,7 @@ from OpenSSL.crypto import Error
import memdbg
-if sys.version_info < (3, 0):
- def b(s):
- return s
- bytes = str
-else:
- def b(s):
- return s.encode("charmap")
- bytes = bytes
-
-from OpenSSL._util import ffi, lib
+from OpenSSL._util import ffi, lib, byte_string as b
class TestCase(TestCase):
"""
@@ -283,20 +274,11 @@ class TestCase(TestCase):
"""
if self._temporaryFiles is None:
self._temporaryFiles = []
- temp = mktemp(dir=".")
+ temp = b(mktemp(dir="."))
self._temporaryFiles.append(temp)
return temp
- # Python 2.3 compatibility.
- def assertTrue(self, *a, **kw):
- return self.failUnless(*a, **kw)
-
-
- def assertFalse(self, *a, **kw):
- return self.failIf(*a, **kw)
-
-
# Other stuff
def assertConsistentType(self, theType, name, *constructionArgs):
"""
diff --git a/setup.py b/setup.py
index ac53487..7d5dce5 100755
--- a/setup.py
+++ b/setup.py
@@ -34,7 +34,7 @@ setup(name='pyOpenSSL', version=__version__,
maintainer_email = 'exarkun@twistedmatrix.com',
url = 'https://github.com/pyca/pyopenssl',
license = 'APL2',
- install_requires=["cryptography>=0.1"],
+ install_requires=["cryptography>=0.1", "six>=1.5.2"],
long_description = """\
High-level wrapper around a subset of the OpenSSL library, includes
* SSL.Connection objects, wrapping the methods of Python's portable
@@ -60,4 +60,4 @@ High-level wrapper around a subset of the OpenSSL library, includes
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Networking',
],
- test_suite="OpenSSL.test")
+ test_suite="OpenSSL")