summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Paul Calderone <exarkun@twistedmatrix.com>2014-02-09 09:25:44 -0500
committerJean-Paul Calderone <exarkun@twistedmatrix.com>2014-02-09 09:25:44 -0500
commitab984a4c3b64d74ff7056321fdfd4b436cd81d72 (patch)
treef4a54e7f77b216542146542657662e7613665158
parent2a78d98ce71b46c0e0013b745d4e937aa7fafa41 (diff)
parentf73a3cb6eea1cb122a1621926994cc8124f2c872 (diff)
downloadpyopenssl-ab984a4c3b64d74ff7056321fdfd4b436cd81d72.tar.gz
Merge pull request #28 from pyca/other-int-fixes
Add some missing test coverage for handling values of type `long` and make them pass by loosening the explicit type checks in various APIs.
-rw-r--r--OpenSSL/SSL.py26
-rw-r--r--OpenSSL/test/test_ssl.py237
2 files changed, 215 insertions, 48 deletions
diff --git a/OpenSSL/SSL.py b/OpenSSL/SSL.py
index f21ad9d..a257f16 100644
--- a/OpenSSL/SSL.py
+++ b/OpenSSL/SSL.py
@@ -252,7 +252,7 @@ class Context(object):
:param method: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or
TLSv1_METHOD.
"""
- if not isinstance(method, int):
+ if not isinstance(method, integer_types):
raise TypeError("method must be an integer")
try:
@@ -383,7 +383,7 @@ class Context(object):
certfile = certfile.encode("utf-8")
if not isinstance(certfile, bytes):
raise TypeError("certfile must be bytes or unicode")
- if not isinstance(filetype, int):
+ if not isinstance(filetype, integer_types):
raise TypeError("filetype must be an integer")
use_result = _lib.SSL_CTX_use_certificate_file(self._context, certfile, filetype)
@@ -449,7 +449,7 @@ class Context(object):
if filetype is _unspecified:
filetype = FILETYPE_PEM
- elif not isinstance(filetype, int):
+ elif not isinstance(filetype, integer_types):
raise TypeError("filetype must be an integer")
use_result = _lib.SSL_CTX_use_PrivateKey_file(
@@ -508,7 +508,7 @@ class Context(object):
bitwise or)
:returns: The previously set caching mode.
"""
- if not isinstance(mode, int):
+ if not isinstance(mode, integer_types):
raise TypeError("mode must be an integer")
return _lib.SSL_CTX_set_session_cache_mode(self._context, mode)
@@ -532,7 +532,7 @@ class Context(object):
See SSL_CTX_set_verify(3SSL) for further details.
"""
- if not isinstance(mode, int):
+ if not isinstance(mode, integer_types):
raise TypeError("mode must be an integer")
if not callable(callback):
@@ -550,7 +550,7 @@ class Context(object):
:param depth: An integer specifying the verify depth
:return: None
"""
- if not isinstance(depth, int):
+ if not isinstance(depth, integer_types):
raise TypeError("depth must be an integer")
_lib.SSL_CTX_set_verify_depth(self._context, depth)
@@ -675,7 +675,7 @@ class Context(object):
:param timeout: The timeout in seconds
:return: The previous session timeout
"""
- if not isinstance(timeout, int):
+ if not isinstance(timeout, integer_types):
raise TypeError("timeout must be an integer")
return _lib.SSL_CTX_set_timeout(self._context, timeout)
@@ -747,7 +747,7 @@ class Context(object):
:param options: The options to add.
:return: The new option bitmask.
"""
- if not isinstance(options, int):
+ if not isinstance(options, integer_types):
raise TypeError("options must be an integer")
return _lib.SSL_CTX_set_options(self._context, options)
@@ -760,7 +760,7 @@ class Context(object):
:param mode: The mode to add.
:return: The new mode bitmask.
"""
- if not isinstance(mode, int):
+ if not isinstance(mode, integer_types):
raise TypeError("mode must be an integer")
return _lib.SSL_CTX_set_mode(self._context, mode)
@@ -945,8 +945,6 @@ class Connection(object):
buf = buf.tobytes()
if not isinstance(buf, bytes):
raise TypeError("data must be a byte string")
- if not isinstance(flags, int):
- raise TypeError("flags must be an integer")
result = _lib.SSL_write(self._ssl, buf, len(buf))
self._raise_ssl_error(self._ssl, result)
@@ -969,8 +967,6 @@ class Connection(object):
buf = buf.tobytes()
if not isinstance(buf, bytes):
raise TypeError("buf must be a byte string")
- if not isinstance(flags, int):
- raise TypeError("flags must be an integer")
left_to_send = len(buf)
total_sent = 0
@@ -1031,7 +1027,7 @@ class Connection(object):
if self._from_ssl is None:
raise TypeError("Connection sock was not None")
- if not isinstance(bufsiz, int):
+ if not isinstance(bufsiz, integer_types):
raise TypeError("bufsiz must be an integer")
buf = _ffi.new("char[]", bufsiz)
@@ -1254,7 +1250,7 @@ class Connection(object):
:param state - bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN.
:return: None
"""
- if not isinstance(state, int):
+ if not isinstance(state, integer_types):
raise TypeError("state must be an integer")
_lib.SSL_set_shutdown(self._ssl, state)
diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py
index a9b9890..ecee95f 100644
--- a/OpenSSL/test/test_ssl.py
+++ b/OpenSSL/test/test_ssl.py
@@ -14,7 +14,7 @@ from os.path import join
from unittest import main
from weakref import ref
-from six import u
+from six import PY3, u
from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM
from OpenSSL.crypto import PKey, X509, X509Extension, X509Store
@@ -337,6 +337,16 @@ class ContextTests(TestCase, _LoopbackMixin):
self.assertRaises(ValueError, Context, 10)
+ if not PY3:
+ def test_method_long(self):
+ """
+ On Python 2 :py:class:`Context` accepts values of type
+ :py:obj:`long` as well as :py:obj:`int`.
+ """
+ Context(long(TLSv1_METHOD))
+
+
+
def test_type(self):
"""
:py:obj:`Context` and :py:obj:`ContextType` refer to the same type object and can be
@@ -366,6 +376,25 @@ class ContextTests(TestCase, _LoopbackMixin):
self.assertRaises(Error, ctx.use_privatekey_file, self.mktemp())
+ if not PY3:
+ def test_use_privatekey_file_long(self):
+ """
+ On Python 2 :py:obj:`Context.use_privatekey_file` accepts a
+ filetype of type :py:obj:`long` as well as :py:obj:`int`.
+ """
+ pemfile = self.mktemp()
+
+ key = PKey()
+ key.generate_key(TYPE_RSA, 128)
+
+ with open(pemfile, "wt") as pem:
+ pem.write(
+ dump_privatekey(FILETYPE_PEM, key).decode("ascii"))
+
+ ctx = Context(TLSv1_METHOD)
+ ctx.use_privatekey_file(pemfile, long(FILETYPE_PEM))
+
+
def test_use_certificate_wrong_args(self):
"""
:py:obj:`Context.use_certificate_wrong_args` raises :py:obj:`TypeError`
@@ -445,6 +474,20 @@ class ContextTests(TestCase, _LoopbackMixin):
ctx.use_certificate_file(pem_filename)
+ if not PY3:
+ def test_use_certificate_file_long(self):
+ """
+ On Python 2 :py:obj:`Context.use_certificate_file` accepts a
+ filetype of type :py:obj:`long` as well as :py:obj:`int`.
+ """
+ pem_filename = self.mktemp()
+ with open(pem_filename, "wb") as pem_file:
+ pem_file.write(cleartextCertificatePEM)
+
+ ctx = Context(TLSv1_METHOD)
+ ctx.use_certificate_file(pem_filename, long(FILETYPE_PEM))
+
+
def test_set_app_data_wrong_args(self):
"""
:py:obj:`Context.set_app_data` raises :py:obj:`TypeError` if called with other than
@@ -486,6 +529,26 @@ class ContextTests(TestCase, _LoopbackMixin):
self.assertRaises(TypeError, context.set_options, 1, None)
+ def test_set_options(self):
+ """
+ :py:obj:`Context.set_options` returns the new options value.
+ """
+ context = Context(TLSv1_METHOD)
+ options = context.set_options(OP_NO_SSLv2)
+ self.assertTrue(OP_NO_SSLv2 & options)
+
+
+ if not PY3:
+ def test_set_options_long(self):
+ """
+ On Python 2 :py:obj:`Context.set_options` accepts values of type
+ :py:obj:`long` as well as :py:obj:`int`.
+ """
+ context = Context(TLSv1_METHOD)
+ options = context.set_options(long(OP_NO_SSLv2))
+ self.assertTrue(OP_NO_SSLv2 & options)
+
+
def test_set_mode_wrong_args(self):
"""
:py:obj:`Context.set`mode} raises :py:obj:`TypeError` if called with the wrong
@@ -506,6 +569,16 @@ class ContextTests(TestCase, _LoopbackMixin):
context = Context(TLSv1_METHOD)
self.assertTrue(
MODE_RELEASE_BUFFERS & context.set_mode(MODE_RELEASE_BUFFERS))
+
+ if not PY3:
+ def test_set_mode_long(self):
+ """
+ On Python 2 :py:obj:`Context.set_mode` accepts values of type
+ :py:obj:`long` as well as :py:obj:`int`.
+ """
+ context = Context(TLSv1_METHOD)
+ mode = context.set_mode(long(MODE_RELEASE_BUFFERS))
+ self.assertTrue(MODE_RELEASE_BUFFERS & mode)
else:
"MODE_RELEASE_BUFFERS unavailable - OpenSSL version may be too old"
@@ -540,6 +613,17 @@ class ContextTests(TestCase, _LoopbackMixin):
self.assertEquals(context.get_timeout(), 1234)
+ if not PY3:
+ def test_timeout_long(self):
+ """
+ On Python 2 :py:obj:`Context.set_timeout` accepts values of type
+ `long` as well as int.
+ """
+ context = Context(TLSv1_METHOD)
+ context.set_timeout(long(1234))
+ self.assertEquals(context.get_timeout(), 1234)
+
+
def test_set_verify_depth_wrong_args(self):
"""
:py:obj:`Context.set_verify_depth` raises :py:obj:`TypeError` if called with the wrong
@@ -570,6 +654,17 @@ class ContextTests(TestCase, _LoopbackMixin):
self.assertEquals(context.get_verify_depth(), 11)
+ if not PY3:
+ def test_verify_depth_long(self):
+ """
+ On Python 2 :py:obj:`Context.set_verify_depth` accepts values of
+ type `long` as well as int.
+ """
+ context = Context(TLSv1_METHOD)
+ context.set_verify_depth(long(11))
+ self.assertEquals(context.get_verify_depth(), 11)
+
+
def _write_encrypted_pem(self, passphrase):
"""
Write a new private key out to a new file, encrypted using the given
@@ -1016,7 +1111,7 @@ class ContextTests(TestCase, _LoopbackMixin):
self.assertRaises(TypeError, context.get_verify_mode, None)
- def test_get_verify_mode(self):
+ def test_set_verify_mode(self):
"""
:py:obj:`Context.get_verify_mode` returns the verify mode flags previously
passed to :py:obj:`Context.set_verify`.
@@ -1029,6 +1124,20 @@ class ContextTests(TestCase, _LoopbackMixin):
context.get_verify_mode(), VERIFY_PEER | VERIFY_CLIENT_ONCE)
+ if not PY3:
+ def test_set_verify_mode_long(self):
+ """
+ On Python 2 :py:obj:`Context.set_verify_mode` accepts values of
+ type :py:obj:`long` as well as :py:obj:`int`.
+ """
+ context = Context(TLSv1_METHOD)
+ self.assertEquals(context.get_verify_mode(), 0)
+ context.set_verify(
+ long(VERIFY_PEER | VERIFY_CLIENT_ONCE), lambda *args: None)
+ self.assertEquals(
+ context.get_verify_mode(), VERIFY_PEER | VERIFY_CLIENT_ONCE)
+
+
def test_load_tmp_dh_wrong_args(self):
"""
:py:obj:`Context.load_tmp_dh` raises :py:obj:`TypeError` if called with the wrong
@@ -1104,8 +1213,8 @@ class ContextTests(TestCase, _LoopbackMixin):
def test_set_session_cache_mode_wrong_args(self):
"""
- L{Context.set_session_cache_mode} raises L{TypeError} if called with
- other than one integer argument.
+ :py:obj:`Context.set_session_cache_mode` raises :py:obj:`TypeError` if
+ called with other than one integer argument.
"""
context = Context(TLSv1_METHOD)
self.assertRaises(TypeError, context.set_session_cache_mode)
@@ -1114,8 +1223,8 @@ class ContextTests(TestCase, _LoopbackMixin):
def test_get_session_cache_mode_wrong_args(self):
"""
- L{Context.get_session_cache_mode} raises L{TypeError} if called with any
- arguments.
+ :py:obj:`Context.get_session_cache_mode` raises :py:obj:`TypeError` if
+ called with any arguments.
"""
context = Context(TLSv1_METHOD)
self.assertRaises(TypeError, context.get_session_cache_mode, 1)
@@ -1123,15 +1232,27 @@ class ContextTests(TestCase, _LoopbackMixin):
def test_session_cache_mode(self):
"""
- L{Context.set_session_cache_mode} specifies how sessions are cached.
- The setting can be retrieved via L{Context.get_session_cache_mode}.
+ :py:obj:`Context.set_session_cache_mode` specifies how sessions are
+ cached. The setting can be retrieved via
+ :py:obj:`Context.get_session_cache_mode`.
"""
context = Context(TLSv1_METHOD)
- old = context.set_session_cache_mode(SESS_CACHE_OFF)
+ context.set_session_cache_mode(SESS_CACHE_OFF)
off = context.set_session_cache_mode(SESS_CACHE_BOTH)
self.assertEqual(SESS_CACHE_OFF, off)
self.assertEqual(SESS_CACHE_BOTH, context.get_session_cache_mode())
+ if not PY3:
+ def test_session_cache_mode_long(self):
+ """
+ On Python 2 :py:obj:`Context.set_session_cache_mode` accepts values
+ of type :py:obj:`long` as well as :py:obj:`int`.
+ """
+ context = Context(TLSv1_METHOD)
+ context.set_session_cache_mode(long(SESS_CACHE_BOTH))
+ self.assertEqual(
+ SESS_CACHE_BOTH, context.get_session_cache_mode())
+
def test_get_cert_store(self):
"""
@@ -1539,6 +1660,17 @@ class ConnectionTests(TestCase, _LoopbackMixin):
self.assertEquals(connection.get_shutdown(), RECEIVED_SHUTDOWN)
+ if not PY3:
+ def test_set_shutdown_long(self):
+ """
+ On Python 2 :py:obj:`Connection.set_shutdown` accepts an argument
+ of type :py:obj:`long` as well as :py:obj:`int`.
+ """
+ connection = Connection(Context(TLSv1_METHOD), socket())
+ connection.set_shutdown(long(RECEIVED_SHUTDOWN))
+ self.assertEquals(connection.get_shutdown(), RECEIVED_SHUTDOWN)
+
+
def test_app_data_wrong_args(self):
"""
:py:obj:`Connection.set_app_data` raises :py:obj:`TypeError` if called with other than
@@ -1834,13 +1966,14 @@ class ConnectionSendTests(TestCase, _LoopbackMixin):
"""
def test_wrong_args(self):
"""
- When called with arguments other than a single string,
- :py:obj:`Connection.send` raises :py:obj:`TypeError`.
+ When called with arguments other than string argument for its first
+ parameter or more than two arguments, :py:obj:`Connection.send` raises
+ :py:obj:`TypeError`.
"""
connection = Connection(Context(TLSv1_METHOD), None)
self.assertRaises(TypeError, connection.send)
self.assertRaises(TypeError, connection.send, object())
- self.assertRaises(TypeError, connection.send, "foo", "bar")
+ self.assertRaises(TypeError, connection.send, "foo", object(), "bar")
def test_short_bytes(self):
@@ -1877,13 +2010,15 @@ class ConnectionSendallTests(TestCase, _LoopbackMixin):
"""
def test_wrong_args(self):
"""
- When called with arguments other than a single string,
- :py:obj:`Connection.sendall` raises :py:obj:`TypeError`.
+ When called with arguments other than a string argument for its first
+ parameter or with more than two arguments, :py:obj:`Connection.sendall`
+ raises :py:obj:`TypeError`.
"""
connection = Connection(Context(TLSv1_METHOD), None)
self.assertRaises(TypeError, connection.sendall)
self.assertRaises(TypeError, connection.sendall, object())
- self.assertRaises(TypeError, connection.sendall, "foo", "bar")
+ self.assertRaises(
+ TypeError, connection.sendall, "foo", object(), "bar")
def test_short(self):
@@ -2067,66 +2202,68 @@ class ConstantsTests(TestCase):
def test_sess_cache_off(self):
"""
- The value of L{OpenSSL.SSL.SESS_CACHE_OFF} 0x0, the value of
- L{SSL_SESS_CACHE_OFF} defined by I{openssl/ssl.h}.
+ The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_OFF` 0x0, the value of
+ :py:obj:`SSL_SESS_CACHE_OFF` defined by ``openssl/ssl.h``.
"""
self.assertEqual(0x0, SESS_CACHE_OFF)
def test_sess_cache_client(self):
"""
- The value of L{OpenSSL.SSL.SESS_CACHE_CLIENT} 0x1, the value of
- L{SSL_SESS_CACHE_CLIENT} defined by I{openssl/ssl.h}.
+ The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_CLIENT` 0x1, the value of
+ :py:obj:`SSL_SESS_CACHE_CLIENT` defined by ``openssl/ssl.h``.
"""
self.assertEqual(0x1, SESS_CACHE_CLIENT)
def test_sess_cache_server(self):
"""
- The value of L{OpenSSL.SSL.SESS_CACHE_SERVER} 0x2, the value of
- L{SSL_SESS_CACHE_SERVER} defined by I{openssl/ssl.h}.
+ The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_SERVER` 0x2, the value of
+ :py:obj:`SSL_SESS_CACHE_SERVER` defined by ``openssl/ssl.h``.
"""
self.assertEqual(0x2, SESS_CACHE_SERVER)
def test_sess_cache_both(self):
"""
- The value of L{OpenSSL.SSL.SESS_CACHE_BOTH} 0x3, the value of
- L{SSL_SESS_CACHE_BOTH} defined by I{openssl/ssl.h}.
+ The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_BOTH` 0x3, the value of
+ :py:obj:`SSL_SESS_CACHE_BOTH` defined by ``openssl/ssl.h``.
"""
self.assertEqual(0x3, SESS_CACHE_BOTH)
def test_sess_cache_no_auto_clear(self):
"""
- The value of L{OpenSSL.SSL.SESS_CACHE_NO_AUTO_CLEAR} 0x80, the value of
- L{SSL_SESS_CACHE_NO_AUTO_CLEAR} defined by I{openssl/ssl.h}.
+ The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_AUTO_CLEAR` 0x80, the
+ value of :py:obj:`SSL_SESS_CACHE_NO_AUTO_CLEAR` defined by
+ ``openssl/ssl.h``.
"""
self.assertEqual(0x80, SESS_CACHE_NO_AUTO_CLEAR)
def test_sess_cache_no_internal_lookup(self):
"""
- The value of L{OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_LOOKUP} 0x100, the
- value of L{SSL_SESS_CACHE_NO_INTERNAL_LOOKUP} defined by
- I{openssl/ssl.h}.
+ The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_LOOKUP` 0x100,
+ the value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL_LOOKUP` defined by
+ ``openssl/ssl.h``.
"""
self.assertEqual(0x100, SESS_CACHE_NO_INTERNAL_LOOKUP)
def test_sess_cache_no_internal_store(self):
"""
- The value of L{OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_STORE} 0x200, the
- value of L{SSL_SESS_CACHE_NO_INTERNAL_STORE} defined by
- I{openssl/ssl.h}.
+ The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_STORE` 0x200,
+ the value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL_STORE` defined by
+ ``openssl/ssl.h``.
"""
self.assertEqual(0x200, SESS_CACHE_NO_INTERNAL_STORE)
def test_sess_cache_no_internal(self):
"""
- The value of L{OpenSSL.SSL.SESS_CACHE_NO_INTERNAL} 0x300, the value of
- L{SSL_SESS_CACHE_NO_INTERNAL} defined by I{openssl/ssl.h}.
+ The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL` 0x300, the
+ value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL` defined by
+ ``openssl/ssl.h``.
"""
self.assertEqual(0x300, SESS_CACHE_NO_INTERNAL)
@@ -2537,6 +2674,40 @@ class ConnectionBIOTests(TestCase):
self.assertRaises(WantReadError, conn.bio_read, 1024)
+ def test_buffer_size(self):
+ """
+ :py:obj:`Connection.bio_read` accepts an integer giving the maximum
+ number of bytes to read and return.
+ """
+ ctx = Context(TLSv1_METHOD)
+ conn = Connection(ctx, None)
+ conn.set_connect_state()
+ try:
+ conn.do_handshake()
+ except WantReadError:
+ pass
+ data = conn.bio_read(2)
+ self.assertEqual(2, len(data))
+
+
+ if not PY3:
+ def test_buffer_size_long(self):
+ """
+ On Python 2 :py:obj:`Connection.bio_read` accepts values of type
+ :py:obj:`long` as well as :py:obj:`int`.
+ """
+ ctx = Context(TLSv1_METHOD)
+ conn = Connection(ctx, None)
+ conn.set_connect_state()
+ try:
+ conn.do_handshake()
+ except WantReadError:
+ pass
+ data = conn.bio_read(long(2))
+ self.assertEqual(2, len(data))
+
+
+
class InfoConstantTests(TestCase):
"""