diff options
author | Jean-Paul Calderone <exarkun@twistedmatrix.com> | 2014-02-02 18:13:31 -0500 |
---|---|---|
committer | Jean-Paul Calderone <exarkun@twistedmatrix.com> | 2014-02-02 18:13:31 -0500 |
commit | bef4f4c324fdb1b4040901993ffd590603a59690 (patch) | |
tree | 9097f5f4fdbc84c3211b959ab9829c662f304aa5 | |
parent | 2a78d98ce71b46c0e0013b745d4e937aa7fafa41 (diff) | |
download | pyopenssl-bef4f4c324fdb1b4040901993ffd590603a59690.tar.gz |
Add some missing test coverage for handling values of type `long` and fix the implementation to accept either `int` or `long` on Python 2 (more closely matching the API implemented by the old C code).
-rw-r--r-- | OpenSSL/SSL.py | 18 | ||||
-rw-r--r-- | OpenSSL/test/test_ssl.py | 183 |
2 files changed, 157 insertions, 44 deletions
diff --git a/OpenSSL/SSL.py b/OpenSSL/SSL.py index f21ad9d..9c375b0 100644 --- a/OpenSSL/SSL.py +++ b/OpenSSL/SSL.py @@ -508,7 +508,7 @@ class Context(object): bitwise or) :returns: The previously set caching mode. """ - if not isinstance(mode, int): + if not isinstance(mode, integer_types): raise TypeError("mode must be an integer") return _lib.SSL_CTX_set_session_cache_mode(self._context, mode) @@ -532,7 +532,7 @@ class Context(object): See SSL_CTX_set_verify(3SSL) for further details. """ - if not isinstance(mode, int): + if not isinstance(mode, integer_types): raise TypeError("mode must be an integer") if not callable(callback): @@ -550,7 +550,7 @@ class Context(object): :param depth: An integer specifying the verify depth :return: None """ - if not isinstance(depth, int): + if not isinstance(depth, integer_types): raise TypeError("depth must be an integer") _lib.SSL_CTX_set_verify_depth(self._context, depth) @@ -675,7 +675,7 @@ class Context(object): :param timeout: The timeout in seconds :return: The previous session timeout """ - if not isinstance(timeout, int): + if not isinstance(timeout, integer_types): raise TypeError("timeout must be an integer") return _lib.SSL_CTX_set_timeout(self._context, timeout) @@ -747,7 +747,7 @@ class Context(object): :param options: The options to add. :return: The new option bitmask. """ - if not isinstance(options, int): + if not isinstance(options, integer_types): raise TypeError("options must be an integer") return _lib.SSL_CTX_set_options(self._context, options) @@ -760,7 +760,7 @@ class Context(object): :param mode: The mode to add. :return: The new mode bitmask. """ - if not isinstance(mode, int): + if not isinstance(mode, integer_types): raise TypeError("mode must be an integer") return _lib.SSL_CTX_set_mode(self._context, mode) @@ -945,8 +945,6 @@ class Connection(object): buf = buf.tobytes() if not isinstance(buf, bytes): raise TypeError("data must be a byte string") - if not isinstance(flags, int): - raise TypeError("flags must be an integer") result = _lib.SSL_write(self._ssl, buf, len(buf)) self._raise_ssl_error(self._ssl, result) @@ -969,8 +967,6 @@ class Connection(object): buf = buf.tobytes() if not isinstance(buf, bytes): raise TypeError("buf must be a byte string") - if not isinstance(flags, int): - raise TypeError("flags must be an integer") left_to_send = len(buf) total_sent = 0 @@ -1031,7 +1027,7 @@ class Connection(object): if self._from_ssl is None: raise TypeError("Connection sock was not None") - if not isinstance(bufsiz, int): + if not isinstance(bufsiz, integer_types): raise TypeError("bufsiz must be an integer") buf = _ffi.new("char[]", bufsiz) diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py index a9b9890..4ec057f 100644 --- a/OpenSSL/test/test_ssl.py +++ b/OpenSSL/test/test_ssl.py @@ -14,7 +14,7 @@ from os.path import join from unittest import main from weakref import ref -from six import u +from six import PY3, u from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM from OpenSSL.crypto import PKey, X509, X509Extension, X509Store @@ -486,6 +486,26 @@ class ContextTests(TestCase, _LoopbackMixin): self.assertRaises(TypeError, context.set_options, 1, None) + def test_set_options(self): + """ + :py:obj:`Context.set_options` returns the new options value. + """ + context = Context(TLSv1_METHOD) + options = context.set_options(OP_NO_SSLv2) + self.assertTrue(OP_NO_SSLv2 & options) + + + if not PY3: + def test_set_options_long(self): + """ + On Python 2 :py:obj:`Context.set_options` accepts values of type + :py:obj:`long` as well as :py:obj:`int`. + """ + context = Context(TLSv1_METHOD) + options = context.set_options(long(OP_NO_SSLv2)) + self.assertTrue(OP_NO_SSLv2 & options) + + def test_set_mode_wrong_args(self): """ :py:obj:`Context.set`mode} raises :py:obj:`TypeError` if called with the wrong @@ -506,6 +526,16 @@ class ContextTests(TestCase, _LoopbackMixin): context = Context(TLSv1_METHOD) self.assertTrue( MODE_RELEASE_BUFFERS & context.set_mode(MODE_RELEASE_BUFFERS)) + + if not PY3: + def test_set_mode_long(self): + """ + On Python 2 :py:obj:`Context.set_mode` accepts values of type + :py:obj:`long` as well as :py:obj:`int`. + """ + context = Context(TLSv1_METHOD) + mode = context.set_mode(long(MODE_RELEASE_BUFFERS)) + self.assertTrue(MODE_RELEASE_BUFFERS & mode) else: "MODE_RELEASE_BUFFERS unavailable - OpenSSL version may be too old" @@ -540,6 +570,17 @@ class ContextTests(TestCase, _LoopbackMixin): self.assertEquals(context.get_timeout(), 1234) + if not PY3: + def test_timeout_long(self): + """ + On Python 2 :py:obj:`Context.set_timeout` accepts values of type + `long` as well as int. + """ + context = Context(TLSv1_METHOD) + context.set_timeout(long(1234)) + self.assertEquals(context.get_timeout(), 1234) + + def test_set_verify_depth_wrong_args(self): """ :py:obj:`Context.set_verify_depth` raises :py:obj:`TypeError` if called with the wrong @@ -570,6 +611,17 @@ class ContextTests(TestCase, _LoopbackMixin): self.assertEquals(context.get_verify_depth(), 11) + if not PY3: + def test_verify_depth_long(self): + """ + On Python 2 :py:obj:`Context.set_verify_depth` accepts values of + type `long` as well as int. + """ + context = Context(TLSv1_METHOD) + context.set_verify_depth(long(11)) + self.assertEquals(context.get_verify_depth(), 11) + + def _write_encrypted_pem(self, passphrase): """ Write a new private key out to a new file, encrypted using the given @@ -1016,7 +1068,7 @@ class ContextTests(TestCase, _LoopbackMixin): self.assertRaises(TypeError, context.get_verify_mode, None) - def test_get_verify_mode(self): + def test_set_verify_mode(self): """ :py:obj:`Context.get_verify_mode` returns the verify mode flags previously passed to :py:obj:`Context.set_verify`. @@ -1029,6 +1081,20 @@ class ContextTests(TestCase, _LoopbackMixin): context.get_verify_mode(), VERIFY_PEER | VERIFY_CLIENT_ONCE) + if not PY3: + def test_set_verify_mode_long(self): + """ + On Python 2 :py:obj:`Context.set_verify_mode` accepts values of + type :py:obj:`long` as well as :py:obj:`int`. + """ + context = Context(TLSv1_METHOD) + self.assertEquals(context.get_verify_mode(), 0) + context.set_verify( + long(VERIFY_PEER | VERIFY_CLIENT_ONCE), lambda *args: None) + self.assertEquals( + context.get_verify_mode(), VERIFY_PEER | VERIFY_CLIENT_ONCE) + + def test_load_tmp_dh_wrong_args(self): """ :py:obj:`Context.load_tmp_dh` raises :py:obj:`TypeError` if called with the wrong @@ -1104,8 +1170,8 @@ class ContextTests(TestCase, _LoopbackMixin): def test_set_session_cache_mode_wrong_args(self): """ - L{Context.set_session_cache_mode} raises L{TypeError} if called with - other than one integer argument. + :py:obj:`Context.set_session_cache_mode` raises :py:obj:`TypeError` if + called with other than one integer argument. """ context = Context(TLSv1_METHOD) self.assertRaises(TypeError, context.set_session_cache_mode) @@ -1114,8 +1180,8 @@ class ContextTests(TestCase, _LoopbackMixin): def test_get_session_cache_mode_wrong_args(self): """ - L{Context.get_session_cache_mode} raises L{TypeError} if called with any - arguments. + :py:obj:`Context.get_session_cache_mode` raises :py:obj:`TypeError` if + called with any arguments. """ context = Context(TLSv1_METHOD) self.assertRaises(TypeError, context.get_session_cache_mode, 1) @@ -1123,15 +1189,27 @@ class ContextTests(TestCase, _LoopbackMixin): def test_session_cache_mode(self): """ - L{Context.set_session_cache_mode} specifies how sessions are cached. - The setting can be retrieved via L{Context.get_session_cache_mode}. + :py:obj:`Context.set_session_cache_mode` specifies how sessions are + cached. The setting can be retrieved via + :py:obj:`Context.get_session_cache_mode`. """ context = Context(TLSv1_METHOD) - old = context.set_session_cache_mode(SESS_CACHE_OFF) + context.set_session_cache_mode(SESS_CACHE_OFF) off = context.set_session_cache_mode(SESS_CACHE_BOTH) self.assertEqual(SESS_CACHE_OFF, off) self.assertEqual(SESS_CACHE_BOTH, context.get_session_cache_mode()) + if not PY3: + def test_session_cache_mode_long(self): + """ + On Python 2 :py:obj:`Context.set_session_cache_mode` accepts values + of type :py:obj:`long` as well as :py:obj:`int`. + """ + context = Context(TLSv1_METHOD) + context.set_session_cache_mode(long(SESS_CACHE_BOTH)) + self.assertEqual( + SESS_CACHE_BOTH, context.get_session_cache_mode()) + def test_get_cert_store(self): """ @@ -1834,13 +1912,14 @@ class ConnectionSendTests(TestCase, _LoopbackMixin): """ def test_wrong_args(self): """ - When called with arguments other than a single string, - :py:obj:`Connection.send` raises :py:obj:`TypeError`. + When called with arguments other than string argument for its first + parameter or more than two arguments, :py:obj:`Connection.send` raises + :py:obj:`TypeError`. """ connection = Connection(Context(TLSv1_METHOD), None) self.assertRaises(TypeError, connection.send) self.assertRaises(TypeError, connection.send, object()) - self.assertRaises(TypeError, connection.send, "foo", "bar") + self.assertRaises(TypeError, connection.send, "foo", object(), "bar") def test_short_bytes(self): @@ -1877,13 +1956,15 @@ class ConnectionSendallTests(TestCase, _LoopbackMixin): """ def test_wrong_args(self): """ - When called with arguments other than a single string, - :py:obj:`Connection.sendall` raises :py:obj:`TypeError`. + When called with arguments other than a string argument for its first + parameter or with more than two arguments, :py:obj:`Connection.sendall` + raises :py:obj:`TypeError`. """ connection = Connection(Context(TLSv1_METHOD), None) self.assertRaises(TypeError, connection.sendall) self.assertRaises(TypeError, connection.sendall, object()) - self.assertRaises(TypeError, connection.sendall, "foo", "bar") + self.assertRaises( + TypeError, connection.sendall, "foo", object(), "bar") def test_short(self): @@ -2067,66 +2148,68 @@ class ConstantsTests(TestCase): def test_sess_cache_off(self): """ - The value of L{OpenSSL.SSL.SESS_CACHE_OFF} 0x0, the value of - L{SSL_SESS_CACHE_OFF} defined by I{openssl/ssl.h}. + The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_OFF` 0x0, the value of + :py:obj:`SSL_SESS_CACHE_OFF` defined by ``openssl/ssl.h``. """ self.assertEqual(0x0, SESS_CACHE_OFF) def test_sess_cache_client(self): """ - The value of L{OpenSSL.SSL.SESS_CACHE_CLIENT} 0x1, the value of - L{SSL_SESS_CACHE_CLIENT} defined by I{openssl/ssl.h}. + The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_CLIENT` 0x1, the value of + :py:obj:`SSL_SESS_CACHE_CLIENT` defined by ``openssl/ssl.h``. """ self.assertEqual(0x1, SESS_CACHE_CLIENT) def test_sess_cache_server(self): """ - The value of L{OpenSSL.SSL.SESS_CACHE_SERVER} 0x2, the value of - L{SSL_SESS_CACHE_SERVER} defined by I{openssl/ssl.h}. + The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_SERVER` 0x2, the value of + :py:obj:`SSL_SESS_CACHE_SERVER` defined by ``openssl/ssl.h``. """ self.assertEqual(0x2, SESS_CACHE_SERVER) def test_sess_cache_both(self): """ - The value of L{OpenSSL.SSL.SESS_CACHE_BOTH} 0x3, the value of - L{SSL_SESS_CACHE_BOTH} defined by I{openssl/ssl.h}. + The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_BOTH` 0x3, the value of + :py:obj:`SSL_SESS_CACHE_BOTH` defined by ``openssl/ssl.h``. """ self.assertEqual(0x3, SESS_CACHE_BOTH) def test_sess_cache_no_auto_clear(self): """ - The value of L{OpenSSL.SSL.SESS_CACHE_NO_AUTO_CLEAR} 0x80, the value of - L{SSL_SESS_CACHE_NO_AUTO_CLEAR} defined by I{openssl/ssl.h}. + The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_AUTO_CLEAR` 0x80, the + value of :py:obj:`SSL_SESS_CACHE_NO_AUTO_CLEAR` defined by + ``openssl/ssl.h``. """ self.assertEqual(0x80, SESS_CACHE_NO_AUTO_CLEAR) def test_sess_cache_no_internal_lookup(self): """ - The value of L{OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_LOOKUP} 0x100, the - value of L{SSL_SESS_CACHE_NO_INTERNAL_LOOKUP} defined by - I{openssl/ssl.h}. + The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_LOOKUP` 0x100, + the value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL_LOOKUP` defined by + ``openssl/ssl.h``. """ self.assertEqual(0x100, SESS_CACHE_NO_INTERNAL_LOOKUP) def test_sess_cache_no_internal_store(self): """ - The value of L{OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_STORE} 0x200, the - value of L{SSL_SESS_CACHE_NO_INTERNAL_STORE} defined by - I{openssl/ssl.h}. + The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_STORE` 0x200, + the value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL_STORE` defined by + ``openssl/ssl.h``. """ self.assertEqual(0x200, SESS_CACHE_NO_INTERNAL_STORE) def test_sess_cache_no_internal(self): """ - The value of L{OpenSSL.SSL.SESS_CACHE_NO_INTERNAL} 0x300, the value of - L{SSL_SESS_CACHE_NO_INTERNAL} defined by I{openssl/ssl.h}. + The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL` 0x300, the + value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL` defined by + ``openssl/ssl.h``. """ self.assertEqual(0x300, SESS_CACHE_NO_INTERNAL) @@ -2537,6 +2620,40 @@ class ConnectionBIOTests(TestCase): self.assertRaises(WantReadError, conn.bio_read, 1024) + def test_buffer_size(self): + """ + :py:obj:`Connection.bio_read` accepts an integer giving the maximum + number of bytes to read and return. + """ + ctx = Context(TLSv1_METHOD) + conn = Connection(ctx, None) + conn.set_connect_state() + try: + conn.do_handshake() + except WantReadError: + pass + data = conn.bio_read(2) + self.assertEqual(2, len(data)) + + + if not PY3: + def test_buffer_size_long(self): + """ + On Python 2 :py:obj:`Connection.bio_read` accepts values of type + :py:obj:`long` as well as :py:obj:`int`. + """ + ctx = Context(TLSv1_METHOD) + conn = Connection(ctx, None) + conn.set_connect_state() + try: + conn.do_handshake() + except WantReadError: + pass + data = conn.bio_read(long(2)) + self.assertEqual(2, len(data)) + + + class InfoConstantTests(TestCase): """ |