From b748099426db3963c7b06d2e0aec29e12e016c07 Mon Sep 17 00:00:00 2001 From: Alex Chan Date: Mon, 30 Jan 2017 14:04:47 +0000 Subject: Rip out the last vestages of unittest from the test suite (#599) --- tests/test_ssl.py | 635 ++++++++++++++++++++++++------------------------------ tests/util.py | 295 ------------------------- 2 files changed, 282 insertions(+), 648 deletions(-) diff --git a/tests/test_ssl.py b/tests/test_ssl.py index 14b2310..d2a56b7 100644 --- a/tests/test_ssl.py +++ b/tests/test_ssl.py @@ -15,7 +15,7 @@ from socket import MSG_PEEK, SHUT_RDWR, error, socket from os import makedirs from os.path import join from weakref import ref -from warnings import catch_warnings, simplefilter +from warnings import simplefilter import pytest @@ -81,8 +81,7 @@ try: except ImportError: SSL_ST_INIT = SSL_ST_BEFORE = SSL_ST_OK = SSL_ST_RENEGOTIATE = None -from .util import ( - WARNING_TYPE_EXPECTED, NON_ASCII, TestCase, is_consistent_type) +from .util import WARNING_TYPE_EXPECTED, NON_ASCII, is_consistent_type from .test_crypto import ( cleartextCertificatePEM, cleartextPrivateKeyPEM, client_cert_pem, client_key_pem, server_cert_pem, server_key_pem, @@ -222,28 +221,6 @@ def _create_certificate_chain(): return [(cakey, cacert), (ikey, icert), (skey, scert)] -class _LoopbackMixin(object): - """ - Helper mixin which defines methods for creating a connected socket pair and - for forcing two connected SSL sockets to talk to each other via memory - BIOs. - """ - def _loopbackClientFactory(self, socket): - return loopback_client_factory(socket) - - def _loopbackServerFactory(self, socket): - return loopback_server_factory(socket) - - def _loopback(self, serverFactory=None, clientFactory=None): - return loopback(serverFactory, clientFactory) - - def _interactInMemory(self, client_conn, server_conn): - return interact_in_memory(client_conn, server_conn) - - def _handshakeInMemory(self, client_conn, server_conn): - return handshake_in_memory(client_conn, server_conn) - - def loopback_client_factory(socket): client = Connection(Context(TLSv1_METHOD), socket) client.set_connect_state() @@ -342,32 +319,30 @@ def handshake_in_memory(client_conn, server_conn): interact_in_memory(client_conn, server_conn) -class VersionTests(TestCase): +class TestVersion(object): """ - Tests for version information exposed by - :py:obj:`OpenSSL.SSL.SSLeay_version` and - :py:obj:`OpenSSL.SSL.OPENSSL_VERSION_NUMBER`. + Tests for version information exposed by `OpenSSL.SSL.SSLeay_version` and + `OpenSSL.SSL.OPENSSL_VERSION_NUMBER`. """ def test_OPENSSL_VERSION_NUMBER(self): """ - :py:obj:`OPENSSL_VERSION_NUMBER` is an integer with status in the low - byte and the patch, fix, minor, and major versions in the - nibbles above that. + `OPENSSL_VERSION_NUMBER` is an integer with status in the low byte and + the patch, fix, minor, and major versions in the nibbles above that. """ - self.assertTrue(isinstance(OPENSSL_VERSION_NUMBER, int)) + assert isinstance(OPENSSL_VERSION_NUMBER, int) def test_SSLeay_version(self): """ - :py:obj:`SSLeay_version` takes a version type indicator and returns - one of a number of version strings based on that indicator. + `SSLeay_version` takes a version type indicator and returns one of a + number of version strings based on that indicator. """ versions = {} for t in [SSLEAY_VERSION, SSLEAY_CFLAGS, SSLEAY_BUILT_ON, SSLEAY_PLATFORM, SSLEAY_DIR]: version = SSLeay_version(t) versions[version] = t - self.assertTrue(isinstance(version, bytes)) - self.assertEqual(len(versions), 5) + assert isinstance(version, bytes) + assert len(versions) == 5 @pytest.fixture @@ -2671,97 +2646,84 @@ class TestConnection(object): assert 2 == len(data) -class ConnectionGetCipherListTests(TestCase): +class TestConnectionGetCipherList(object): """ - Tests for :py:obj:`Connection.get_cipher_list`. + Tests for `Connection.get_cipher_list`. """ - def test_wrong_args(self): - """ - :py:obj:`Connection.get_cipher_list` raises :py:obj:`TypeError` if - called with any arguments. - """ - connection = Connection(Context(TLSv1_METHOD), None) - self.assertRaises(TypeError, connection.get_cipher_list, None) - def test_result(self): """ - :py:obj:`Connection.get_cipher_list` returns a :py:obj:`list` of - :py:obj:`bytes` giving the names of the ciphers which might be used. + `Connection.get_cipher_list` returns a list of `bytes` giving the + names of the ciphers which might be used. """ connection = Connection(Context(TLSv1_METHOD), None) ciphers = connection.get_cipher_list() - self.assertTrue(isinstance(ciphers, list)) + assert isinstance(ciphers, list) for cipher in ciphers: - self.assertTrue(isinstance(cipher, str)) + assert isinstance(cipher, str) -class ConnectionSendTests(TestCase, _LoopbackMixin): +class TestConnectionSend(object): """ - Tests for :py:obj:`Connection.send` + Tests for `Connection.send`. """ def test_wrong_args(self): """ When called with arguments other than string argument for its first - parameter or more than two arguments, :py:obj:`Connection.send` raises - :py:obj:`TypeError`. + parameter, `Connection.send` raises `TypeError`. """ connection = Connection(Context(TLSv1_METHOD), None) - self.assertRaises(TypeError, connection.send) - self.assertRaises(TypeError, connection.send, object()) - self.assertRaises(TypeError, connection.send, "foo", object(), "bar") + with pytest.raises(TypeError): + connection.send(object()) def test_short_bytes(self): """ - When passed a short byte string, :py:obj:`Connection.send` transmits - all of it and returns the number of bytes sent. + When passed a short byte string, `Connection.send` transmits all of it + and returns the number of bytes sent. """ - server, client = self._loopback() + server, client = loopback() count = server.send(b'xy') - self.assertEquals(count, 2) - self.assertEquals(client.recv(2), b'xy') + assert count == 2 + assert client.recv(2) == b'xy' def test_text(self): """ - When passed a text, :py:obj:`Connection.send` transmits all of it and + When passed a text, `Connection.send` transmits all of it and returns the number of bytes sent. It also raises a DeprecationWarning. """ - server, client = self._loopback() - with catch_warnings(record=True) as w: + server, client = loopback() + with pytest.warns(DeprecationWarning) as w: simplefilter("always") count = server.send(b"xy".decode("ascii")) - self.assertEqual( + assert ( "{0} for buf is no longer accepted, use bytes".format( WARNING_TYPE_EXPECTED - ), - str(w[-1].message) - ) - self.assertIs(w[-1].category, DeprecationWarning) - self.assertEquals(count, 2) - self.assertEquals(client.recv(2), b"xy") + ) == str(w[-1].message)) + assert count == 2 + assert client.recv(2) == b'xy' @skip_if_py26 def test_short_memoryview(self): """ When passed a memoryview onto a small number of bytes, - :py:obj:`Connection.send` transmits all of them and returns the number + `Connection.send` transmits all of them and returns the number of bytes sent. """ - server, client = self._loopback() + server, client = loopback() count = server.send(memoryview(b'xy')) - self.assertEquals(count, 2) - self.assertEquals(client.recv(2), b'xy') + assert count == 2 + assert client.recv(2) == b'xy' @skip_if_py3 def test_short_buffer(self): """ When passed a buffer containing a small number of bytes, - :py:obj:`Connection.send` transmits all of them and returns the number + `Connection.send` transmits all of them and returns the number of bytes sent. """ - server, client = self._loopback() + server, client = loopback() count = server.send(buffer(b'xy')) - self.assertEquals(count, 2) - self.assertEquals(client.recv(2), b'xy') + assert count == 2 + assert client.recv(2) == b'xy' def _make_memoryview(size): @@ -2772,52 +2734,50 @@ def _make_memoryview(size): return memoryview(bytearray(size)) -class ConnectionRecvIntoTests(TestCase, _LoopbackMixin): +class TestConnectionRecvInto(object): """ - Tests for :py:obj:`Connection.recv_into` + Tests for `Connection.recv_into`. """ def _no_length_test(self, factory): """ - Assert that when the given buffer is passed to - ``Connection.recv_into``, whatever bytes are available to be received - that fit into that buffer are written into that buffer. + Assert that when the given buffer is passed to `Connection.recv_into`, + whatever bytes are available to be received that fit into that buffer + are written into that buffer. """ output_buffer = factory(5) - server, client = self._loopback() + server, client = loopback() server.send(b'xy') - self.assertEqual(client.recv_into(output_buffer), 2) - self.assertEqual(output_buffer, bytearray(b'xy\x00\x00\x00')) + assert client.recv_into(output_buffer) == 2 + assert output_buffer == bytearray(b'xy\x00\x00\x00') def test_bytearray_no_length(self): """ - :py:obj:`Connection.recv_into` can be passed a ``bytearray`` instance - and data in the receive buffer is written to it. + `Connection.recv_into` can be passed a `bytearray` instance and data + in the receive buffer is written to it. """ self._no_length_test(bytearray) def _respects_length_test(self, factory): """ - Assert that when the given buffer is passed to ``Connection.recv_into`` - along with a value for ``nbytes`` that is less than the size of that - buffer, only ``nbytes`` bytes are written into the buffer. + Assert that when the given buffer is passed to `Connection.recv_into` + along with a value for `nbytes` that is less than the size of that + buffer, only `nbytes` bytes are written into the buffer. """ output_buffer = factory(10) - server, client = self._loopback() + server, client = loopback() server.send(b'abcdefghij') - self.assertEqual(client.recv_into(output_buffer, 5), 5) - self.assertEqual( - output_buffer, bytearray(b'abcde\x00\x00\x00\x00\x00') - ) + assert client.recv_into(output_buffer, 5) == 5 + assert output_buffer == bytearray(b'abcde\x00\x00\x00\x00\x00') def test_bytearray_respects_length(self): """ - When called with a ``bytearray`` instance, - :py:obj:`Connection.recv_into` respects the ``nbytes`` parameter and - doesn't copy in more than that number of bytes. + When called with a `bytearray` instance, `Connection.recv_into` + respects the `nbytes` parameter and doesn't copy in more than that + number of bytes. """ self._respects_length_test(bytearray) @@ -2825,153 +2785,144 @@ class ConnectionRecvIntoTests(TestCase, _LoopbackMixin): """ Assert that if there are more bytes available to be read from the receive buffer than would fit into the buffer passed to - :py:obj:`Connection.recv_into`, only as many as fit are written into - it. + `Connection.recv_into`, only as many as fit are written into it. """ output_buffer = factory(5) - server, client = self._loopback() + server, client = loopback() server.send(b'abcdefghij') - self.assertEqual(client.recv_into(output_buffer), 5) - self.assertEqual(output_buffer, bytearray(b'abcde')) + assert client.recv_into(output_buffer) == 5 + assert output_buffer == bytearray(b'abcde') rest = client.recv(5) - self.assertEqual(b'fghij', rest) + assert b'fghij' == rest def test_bytearray_doesnt_overfill(self): """ - When called with a ``bytearray`` instance, - :py:obj:`Connection.recv_into` respects the size of the array and - doesn't write more bytes into it than will fit. + When called with a `bytearray` instance, `Connection.recv_into` + respects the size of the array and doesn't write more bytes into it + than will fit. """ self._doesnt_overfill_test(bytearray) def test_bytearray_really_doesnt_overfill(self): """ - When called with a ``bytearray`` instance and an ``nbytes`` value that - is too large, :py:obj:`Connection.recv_into` respects the size of the - array and not the ``nbytes`` value and doesn't write more bytes into - the buffer than will fit. + When called with a `bytearray` instance and an `nbytes` value that is + too large, `Connection.recv_into` respects the size of the array and + not the `nbytes` value and doesn't write more bytes into the buffer + than will fit. """ self._doesnt_overfill_test(bytearray) def test_peek(self): - server, client = self._loopback() + server, client = loopback() server.send(b'xy') for _ in range(2): output_buffer = bytearray(5) - self.assertEqual( - client.recv_into(output_buffer, flags=MSG_PEEK), 2) - self.assertEqual(output_buffer, bytearray(b'xy\x00\x00\x00')) + assert client.recv_into(output_buffer, flags=MSG_PEEK) == 2 + assert output_buffer == bytearray(b'xy\x00\x00\x00') @skip_if_py26 def test_memoryview_no_length(self): """ - :py:obj:`Connection.recv_into` can be passed a ``memoryview`` - instance and data in the receive buffer is written to it. + `Connection.recv_into` can be passed a `memoryview` instance and data + in the receive buffer is written to it. """ self._no_length_test(_make_memoryview) @skip_if_py26 def test_memoryview_respects_length(self): """ - When called with a ``memoryview`` instance, - :py:obj:`Connection.recv_into` respects the ``nbytes`` parameter - and doesn't copy more than that number of bytes in. + When called with a `memoryview` instance, `Connection.recv_into` + respects the ``nbytes`` parameter and doesn't copy more than that + number of bytes in. """ self._respects_length_test(_make_memoryview) @skip_if_py26 def test_memoryview_doesnt_overfill(self): """ - When called with a ``memoryview`` instance, - :py:obj:`Connection.recv_into` respects the size of the array and - doesn't write more bytes into it than will fit. + When called with a `memoryview` instance, `Connection.recv_into` + respects the size of the array and doesn't write more bytes into it + than will fit. """ self._doesnt_overfill_test(_make_memoryview) @skip_if_py26 def test_memoryview_really_doesnt_overfill(self): """ - When called with a ``memoryview`` instance and an ``nbytes`` value - that is too large, :py:obj:`Connection.recv_into` respects the size - of the array and not the ``nbytes`` value and doesn't write more - bytes into the buffer than will fit. + When called with a `memoryview` instance and an `nbytes` value that is + too large, `Connection.recv_into` respects the size of the array and + not the `nbytes` value and doesn't write more bytes into the buffer + than will fit. """ self._doesnt_overfill_test(_make_memoryview) -class ConnectionSendallTests(TestCase, _LoopbackMixin): +class TestConnectionSendall(object): """ - Tests for :py:obj:`Connection.sendall`. + Tests for `Connection.sendall`. """ def test_wrong_args(self): """ When called with arguments other than a string argument for its first - parameter or with more than two arguments, :py:obj:`Connection.sendall` - raises :py:obj:`TypeError`. + parameter, `Connection.sendall` raises `TypeError`. """ connection = Connection(Context(TLSv1_METHOD), None) - self.assertRaises(TypeError, connection.sendall) - self.assertRaises(TypeError, connection.sendall, object()) - self.assertRaises( - TypeError, connection.sendall, "foo", object(), "bar") + with pytest.raises(TypeError): + connection.sendall(object()) def test_short(self): """ - :py:obj:`Connection.sendall` transmits all of the bytes in the string + `Connection.sendall` transmits all of the bytes in the string passed to it. """ - server, client = self._loopback() + server, client = loopback() server.sendall(b'x') - self.assertEquals(client.recv(1), b'x') + assert client.recv(1) == b'x' def test_text(self): """ - :py:obj:`Connection.sendall` transmits all the content in the string - passed to it raising a DeprecationWarning in case of this being a text. + `Connection.sendall` transmits all the content in the string passed + to it, raising a DeprecationWarning in case of this being a text. """ - server, client = self._loopback() - with catch_warnings(record=True) as w: + server, client = loopback() + with pytest.warns(DeprecationWarning) as w: simplefilter("always") server.sendall(b"x".decode("ascii")) - self.assertEqual( + assert ( "{0} for buf is no longer accepted, use bytes".format( WARNING_TYPE_EXPECTED - ), - str(w[-1].message) - ) - self.assertIs(w[-1].category, DeprecationWarning) - self.assertEquals(client.recv(1), b"x") + ) == str(w[-1].message)) + assert client.recv(1) == b"x" @skip_if_py26 def test_short_memoryview(self): """ When passed a memoryview onto a small number of bytes, - :py:obj:`Connection.sendall` transmits all of them. + `Connection.sendall` transmits all of them. """ - server, client = self._loopback() + server, client = loopback() server.sendall(memoryview(b'x')) - self.assertEquals(client.recv(1), b'x') + assert client.recv(1) == b'x' @skip_if_py3 def test_short_buffers(self): """ When passed a buffer containing a small number of bytes, - :py:obj:`Connection.sendall` transmits all of them. + `Connection.sendall` transmits all of them. """ - server, client = self._loopback() + server, client = loopback() server.sendall(buffer(b'x')) - self.assertEquals(client.recv(1), b'x') + assert client.recv(1) == b'x' def test_long(self): """ - :py:obj:`Connection.sendall` transmits all of the bytes in the string - passed to it even if this requires multiple calls of an underlying - write function. + `Connection.sendall` transmits all the bytes in the string passed to it + even if this requires multiple calls of an underlying write function. """ - server, client = self._loopback() + server, client = loopback() # Should be enough, underlying SSL_write should only do 16k at a time. # On Windows, after 32k of bytes the write will block (forever # - because no one is yet reading). @@ -2983,55 +2934,40 @@ class ConnectionSendallTests(TestCase, _LoopbackMixin): data = client.recv(1024) accum.append(data) received += len(data) - self.assertEquals(message, b''.join(accum)) + assert message == b''.join(accum) def test_closed(self): """ - If the underlying socket is closed, :py:obj:`Connection.sendall` - propagates the write error from the low level write call. + If the underlying socket is closed, `Connection.sendall` propagates the + write error from the low level write call. """ - server, client = self._loopback() + server, client = loopback() server.sock_shutdown(2) - exc = self.assertRaises(SysCallError, server.sendall, b"hello, world") + with pytest.raises(SysCallError) as err: + server.sendall(b"hello, world") if platform == "win32": - self.assertEqual(exc.args[0], ESHUTDOWN) + assert err.value.args[0] == ESHUTDOWN else: - self.assertEqual(exc.args[0], EPIPE) + assert err.value.args[0] == EPIPE -class ConnectionRenegotiateTests(TestCase, _LoopbackMixin): +class TestConnectionRenegotiate(object): """ Tests for SSL renegotiation APIs. """ - def test_renegotiate_wrong_args(self): - """ - :py:obj:`Connection.renegotiate` raises :py:obj:`TypeError` if called - with any arguments. - """ - connection = Connection(Context(TLSv1_METHOD), None) - self.assertRaises(TypeError, connection.renegotiate, None) - - def test_total_renegotiations_wrong_args(self): - """ - :py:obj:`Connection.total_renegotiations` raises :py:obj:`TypeError` if - called with any arguments. - """ - connection = Connection(Context(TLSv1_METHOD), None) - self.assertRaises(TypeError, connection.total_renegotiations, None) - def test_total_renegotiations(self): """ - :py:obj:`Connection.total_renegotiations` returns :py:obj:`0` before - any renegotiations have happened. + `Connection.total_renegotiations` returns `0` before any renegotiations + have happened. """ connection = Connection(Context(TLSv1_METHOD), None) - self.assertEquals(connection.total_renegotiations(), 0) + assert connection.total_renegotiations() == 0 def test_renegotiate(self): """ Go through a complete renegotiation cycle. """ - server, client = self._loopback() + server, client = loopback() server.send(b"hello world") @@ -3055,21 +2991,21 @@ class ConnectionRenegotiateTests(TestCase, _LoopbackMixin): pass -class ErrorTests(TestCase): +class TestError(object): """ - Unit tests for :py:obj:`OpenSSL.SSL.Error`. + Unit tests for `OpenSSL.SSL.Error`. """ def test_type(self): """ - :py:obj:`Error` is an exception type. + `Error` is an exception type. """ - self.assertTrue(issubclass(Error, Exception)) - self.assertEqual(Error.__name__, 'Error') + assert issubclass(Error, Exception) + assert Error.__name__ == 'Error' -class ConstantsTests(TestCase): +class TestConstants(object): """ - Tests for the values of constants exposed in :py:obj:`OpenSSL.SSL`. + Tests for the values of constants exposed in `OpenSSL.SSL`. These are values defined by OpenSSL intended only to be used as flags to OpenSSL APIs. The only assertions it seems can be made about them is @@ -3081,10 +3017,10 @@ class ConstantsTests(TestCase): ) def test_op_no_query_mtu(self): """ - The value of :py:obj:`OpenSSL.SSL.OP_NO_QUERY_MTU` is 0x1000, the value - of :py:const:`SSL_OP_NO_QUERY_MTU` defined by :file:`openssl/ssl.h`. + The value of `OpenSSL.SSL.OP_NO_QUERY_MTU` is 0x1000, the value + of `SSL_OP_NO_QUERY_MTU` defined by `openssl/ssl.h`. """ - self.assertEqual(OP_NO_QUERY_MTU, 0x1000) + assert OP_NO_QUERY_MTU == 0x1000 @pytest.mark.skipif( OP_COOKIE_EXCHANGE is None, @@ -3093,11 +3029,10 @@ class ConstantsTests(TestCase): ) def test_op_cookie_exchange(self): """ - The value of :py:obj:`OpenSSL.SSL.OP_COOKIE_EXCHANGE` is 0x2000, the - value of :py:const:`SSL_OP_COOKIE_EXCHANGE` defined by - :file:`openssl/ssl.h`. + The value of `OpenSSL.SSL.OP_COOKIE_EXCHANGE` is 0x2000, the + value of `SSL_OP_COOKIE_EXCHANGE` defined by `openssl/ssl.h`. """ - self.assertEqual(OP_COOKIE_EXCHANGE, 0x2000) + assert OP_COOKIE_EXCHANGE == 0x2000 @pytest.mark.skipif( OP_NO_TICKET is None, @@ -3105,10 +3040,10 @@ class ConstantsTests(TestCase): ) def test_op_no_ticket(self): """ - The value of :py:obj:`OpenSSL.SSL.OP_NO_TICKET` is 0x4000, the value of - :py:const:`SSL_OP_NO_TICKET` defined by :file:`openssl/ssl.h`. + The value of `OpenSSL.SSL.OP_NO_TICKET` is 0x4000, the value of + `SSL_OP_NO_TICKET` defined by `openssl/ssl.h`. """ - self.assertEqual(OP_NO_TICKET, 0x4000) + assert OP_NO_TICKET == 0x4000 @pytest.mark.skipif( OP_NO_COMPRESSION is None, @@ -3116,81 +3051,79 @@ class ConstantsTests(TestCase): ) def test_op_no_compression(self): """ - The value of :py:obj:`OpenSSL.SSL.OP_NO_COMPRESSION` is 0x20000, the - value of :py:const:`SSL_OP_NO_COMPRESSION` defined by - :file:`openssl/ssl.h`. + The value of `OpenSSL.SSL.OP_NO_COMPRESSION` is 0x20000, the + value of `SSL_OP_NO_COMPRESSION` defined by `openssl/ssl.h`. """ - self.assertEqual(OP_NO_COMPRESSION, 0x20000) + assert OP_NO_COMPRESSION == 0x20000 def test_sess_cache_off(self): """ - The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_OFF` 0x0, the value of - :py:obj:`SSL_SESS_CACHE_OFF` defined by ``openssl/ssl.h``. + The value of `OpenSSL.SSL.SESS_CACHE_OFF` 0x0, the value of + `SSL_SESS_CACHE_OFF` defined by `openssl/ssl.h`. """ - self.assertEqual(0x0, SESS_CACHE_OFF) + assert 0x0 == SESS_CACHE_OFF def test_sess_cache_client(self): """ - The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_CLIENT` 0x1, the value of - :py:obj:`SSL_SESS_CACHE_CLIENT` defined by ``openssl/ssl.h``. + The value of `OpenSSL.SSL.SESS_CACHE_CLIENT` 0x1, the value of + `SSL_SESS_CACHE_CLIENT` defined by `openssl/ssl.h`. """ - self.assertEqual(0x1, SESS_CACHE_CLIENT) + assert 0x1 == SESS_CACHE_CLIENT def test_sess_cache_server(self): """ - The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_SERVER` 0x2, the value of - :py:obj:`SSL_SESS_CACHE_SERVER` defined by ``openssl/ssl.h``. + The value of `OpenSSL.SSL.SESS_CACHE_SERVER` 0x2, the value of + `SSL_SESS_CACHE_SERVER` defined by `openssl/ssl.h`. """ - self.assertEqual(0x2, SESS_CACHE_SERVER) + assert 0x2 == SESS_CACHE_SERVER def test_sess_cache_both(self): """ - The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_BOTH` 0x3, the value of - :py:obj:`SSL_SESS_CACHE_BOTH` defined by ``openssl/ssl.h``. + The value of `OpenSSL.SSL.SESS_CACHE_BOTH` 0x3, the value of + `SSL_SESS_CACHE_BOTH` defined by `openssl/ssl.h`. """ - self.assertEqual(0x3, SESS_CACHE_BOTH) + assert 0x3 == SESS_CACHE_BOTH def test_sess_cache_no_auto_clear(self): """ - The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_AUTO_CLEAR` 0x80, the - value of :py:obj:`SSL_SESS_CACHE_NO_AUTO_CLEAR` defined by - ``openssl/ssl.h``. + The value of `OpenSSL.SSL.SESS_CACHE_NO_AUTO_CLEAR` 0x80, the + value of `SSL_SESS_CACHE_NO_AUTO_CLEAR` defined by + `openssl/ssl.h`. """ - self.assertEqual(0x80, SESS_CACHE_NO_AUTO_CLEAR) + assert 0x80 == SESS_CACHE_NO_AUTO_CLEAR def test_sess_cache_no_internal_lookup(self): """ - The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_LOOKUP` 0x100, - the value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL_LOOKUP` defined by - ``openssl/ssl.h``. + The value of `OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_LOOKUP` 0x100, + the value of `SSL_SESS_CACHE_NO_INTERNAL_LOOKUP` defined by + `openssl/ssl.h`. """ - self.assertEqual(0x100, SESS_CACHE_NO_INTERNAL_LOOKUP) + assert 0x100 == SESS_CACHE_NO_INTERNAL_LOOKUP def test_sess_cache_no_internal_store(self): """ - The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_STORE` 0x200, - the value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL_STORE` defined by - ``openssl/ssl.h``. + The value of `OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_STORE` 0x200, + the value of `SSL_SESS_CACHE_NO_INTERNAL_STORE` defined by + `openssl/ssl.h`. """ - self.assertEqual(0x200, SESS_CACHE_NO_INTERNAL_STORE) + assert 0x200 == SESS_CACHE_NO_INTERNAL_STORE def test_sess_cache_no_internal(self): """ - The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL` 0x300, the - value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL` defined by - ``openssl/ssl.h``. + The value of `OpenSSL.SSL.SESS_CACHE_NO_INTERNAL` 0x300, the + value of `SSL_SESS_CACHE_NO_INTERNAL` defined by + `openssl/ssl.h`. """ - self.assertEqual(0x300, SESS_CACHE_NO_INTERNAL) + assert 0x300 == SESS_CACHE_NO_INTERNAL -class MemoryBIOTests(TestCase, _LoopbackMixin): +class TestMemoryBIO(object): """ - Tests for :py:obj:`OpenSSL.SSL.Connection` using a memory BIO. + Tests for `OpenSSL.SSL.Connection` using a memory BIO. """ def _server(self, sock): """ - Create a new server-side SSL :py:obj:`Connection` object wrapped around - :py:obj:`sock`. + Create a new server-side SSL `Connection` object wrapped around `sock`. """ # Create the server side Connection. This is mostly setup boilerplate # - use TLSv1, use a particular certificate, etc. @@ -3215,8 +3148,7 @@ class MemoryBIOTests(TestCase, _LoopbackMixin): def _client(self, sock): """ - Create a new client-side SSL :py:obj:`Connection` object wrapped around - :py:obj:`sock`. + Create a new client-side SSL `Connection` object wrapped around `sock`. """ # Now create the client side Connection. Similar boilerplate to the # above. @@ -3237,179 +3169,178 @@ class MemoryBIOTests(TestCase, _LoopbackMixin): client_conn.set_connect_state() return client_conn - def test_memoryConnect(self): + def test_memory_connect(self): """ - Two :py:obj:`Connection`s which use memory BIOs can be manually - connected by reading from the output of each and writing those bytes to - the input of the other and in this way establish a connection and - exchange application-level bytes with each other. + Two `Connection`s which use memory BIOs can be manually connected by + reading from the output of each and writing those bytes to the input of + the other and in this way establish a connection and exchange + application-level bytes with each other. """ server_conn = self._server(None) client_conn = self._client(None) # There should be no key or nonces yet. - self.assertIdentical(server_conn.master_key(), None) - self.assertIdentical(server_conn.client_random(), None) - self.assertIdentical(server_conn.server_random(), None) + assert server_conn.master_key() is None + assert server_conn.client_random() is None + assert server_conn.server_random() is None # First, the handshake needs to happen. We'll deliver bytes back and # forth between the client and server until neither of them feels like # speaking any more. - self.assertIdentical( - self._interactInMemory(client_conn, server_conn), None) + assert interact_in_memory(client_conn, server_conn) is None # Now that the handshake is done, there should be a key and nonces. - self.assertNotIdentical(server_conn.master_key(), None) - self.assertNotIdentical(server_conn.client_random(), None) - self.assertNotIdentical(server_conn.server_random(), None) - self.assertEquals( - server_conn.client_random(), client_conn.client_random()) - self.assertEquals( - server_conn.server_random(), client_conn.server_random()) - self.assertNotEquals( - server_conn.client_random(), server_conn.server_random()) - self.assertNotEquals( - client_conn.client_random(), client_conn.server_random()) + assert server_conn.master_key() is not None + assert server_conn.client_random() is not None + assert server_conn.server_random() is not None + assert server_conn.client_random() == client_conn.client_random() + assert server_conn.server_random() == client_conn.server_random() + assert server_conn.client_random() != server_conn.server_random() + assert client_conn.client_random() != client_conn.server_random() # Here are the bytes we'll try to send. important_message = b'One if by land, two if by sea.' server_conn.write(important_message) - self.assertEquals( - self._interactInMemory(client_conn, server_conn), + assert ( + interact_in_memory(client_conn, server_conn) == (client_conn, important_message)) client_conn.write(important_message[::-1]) - self.assertEquals( - self._interactInMemory(client_conn, server_conn), + assert ( + interact_in_memory(client_conn, server_conn) == (server_conn, important_message[::-1])) - def test_socketConnect(self): + def test_socket_connect(self): """ - Just like :py:obj:`test_memoryConnect` but with an actual socket. + Just like `test_memory_connect` but with an actual socket. This is primarily to rule out the memory BIO code as the source of any - problems encountered while passing data over a :py:obj:`Connection` (if + problems encountered while passing data over a `Connection` (if this test fails, there must be a problem outside the memory BIO code, as no memory BIO is involved here). Even though this isn't a memory BIO test, it's convenient to have it here. """ - server_conn, client_conn = self._loopback() + server_conn, client_conn = loopback() important_message = b"Help me Obi Wan Kenobi, you're my only hope." client_conn.send(important_message) msg = server_conn.recv(1024) - self.assertEqual(msg, important_message) + assert msg == important_message # Again in the other direction, just for fun. important_message = important_message[::-1] server_conn.send(important_message) msg = client_conn.recv(1024) - self.assertEqual(msg, important_message) + assert msg == important_message - def test_socketOverridesMemory(self): + def test_socket_overrides_memory(self): """ - Test that :py:obj:`OpenSSL.SSL.bio_read` and - :py:obj:`OpenSSL.SSL.bio_write` don't work on - :py:obj:`OpenSSL.SSL.Connection`() that use sockets. + Test that `OpenSSL.SSL.bio_read` and `OpenSSL.SSL.bio_write` don't + work on `OpenSSL.SSL.Connection`() that use sockets. """ context = Context(TLSv1_METHOD) client = socket() clientSSL = Connection(context, client) - self.assertRaises(TypeError, clientSSL.bio_read, 100) - self.assertRaises(TypeError, clientSSL.bio_write, "foo") - self.assertRaises(TypeError, clientSSL.bio_shutdown) + with pytest.raises(TypeError): + clientSSL.bio_read(100) + with pytest.raises(TypeError): + clientSSL.bio_write("foo") + with pytest.raises(TypeError): + clientSSL.bio_shutdown() - def test_outgoingOverflow(self): + def test_outgoing_overflow(self): """ If more bytes than can be written to the memory BIO are passed to - :py:obj:`Connection.send` at once, the number of bytes which were - written is returned and that many bytes from the beginning of the input - can be read from the other end of the connection. + `Connection.send` at once, the number of bytes which were written is + returned and that many bytes from the beginning of the input can be + read from the other end of the connection. """ server = self._server(None) client = self._client(None) - self._interactInMemory(client, server) + interact_in_memory(client, server) size = 2 ** 15 sent = client.send(b"x" * size) # Sanity check. We're trying to test what happens when the entire # input can't be sent. If the entire input was sent, this test is # meaningless. - self.assertTrue(sent < size) + assert sent < size - receiver, received = self._interactInMemory(client, server) - self.assertIdentical(receiver, server) + receiver, received = interact_in_memory(client, server) + assert receiver is server # We can rely on all of these bytes being received at once because - # _loopback passes 2 ** 16 to recv - more than 2 ** 15. - self.assertEquals(len(received), sent) + # loopback passes 2 ** 16 to recv - more than 2 ** 15. + assert len(received) == sent def test_shutdown(self): """ - :py:obj:`Connection.bio_shutdown` signals the end of the data stream - from which the :py:obj:`Connection` reads. + `Connection.bio_shutdown` signals the end of the data stream + from which the `Connection` reads. """ server = self._server(None) server.bio_shutdown() - e = self.assertRaises(Error, server.recv, 1024) + with pytest.raises(Error) as err: + server.recv(1024) # We don't want WantReadError or ZeroReturnError or anything - it's a # handshake failure. - assert type(e) in [Error, SysCallError] + assert type(err.value) in [Error, SysCallError] - def test_unexpectedEndOfFile(self): + def test_unexpected_EOF(self): """ If the connection is lost before an orderly SSL shutdown occurs, - :py:obj:`OpenSSL.SSL.SysCallError` is raised with a message of + `OpenSSL.SSL.SysCallError` is raised with a message of "Unexpected EOF". """ - server_conn, client_conn = self._loopback() + server_conn, client_conn = loopback() client_conn.sock_shutdown(SHUT_RDWR) - exc = self.assertRaises(SysCallError, server_conn.recv, 1024) - self.assertEqual(exc.args, (-1, "Unexpected EOF")) + with pytest.raises(SysCallError) as err: + server_conn.recv(1024) + assert err.value.args == (-1, "Unexpected EOF") def _check_client_ca_list(self, func): """ - Verify the return value of the :py:obj:`get_client_ca_list` method for + Verify the return value of the `get_client_ca_list` method for server and client connections. :param func: A function which will be called with the server context before the client and server are connected to each other. This function should specify a list of CAs for the server to send to the client and return that same list. The list will be used to verify - that :py:obj:`get_client_ca_list` returns the proper value at + that `get_client_ca_list` returns the proper value at various times. """ server = self._server(None) client = self._client(None) - self.assertEqual(client.get_client_ca_list(), []) - self.assertEqual(server.get_client_ca_list(), []) + assert client.get_client_ca_list() == [] + assert server.get_client_ca_list() == [] ctx = server.get_context() expected = func(ctx) - self.assertEqual(client.get_client_ca_list(), []) - self.assertEqual(server.get_client_ca_list(), expected) - self._interactInMemory(client, server) - self.assertEqual(client.get_client_ca_list(), expected) - self.assertEqual(server.get_client_ca_list(), expected) + assert client.get_client_ca_list() == [] + assert server.get_client_ca_list() == expected + interact_in_memory(client, server) + assert client.get_client_ca_list() == expected + assert server.get_client_ca_list() == expected def test_set_client_ca_list_errors(self): """ - :py:obj:`Context.set_client_ca_list` raises a :py:obj:`TypeError` if - called with a non-list or a list that contains objects other than - X509Names. + `Context.set_client_ca_list` raises a `TypeError` if called with a + non-list or a list that contains objects other than X509Names. """ ctx = Context(TLSv1_METHOD) - self.assertRaises(TypeError, ctx.set_client_ca_list, "spam") - self.assertRaises(TypeError, ctx.set_client_ca_list, ["spam"]) - self.assertIdentical(ctx.set_client_ca_list([]), None) + with pytest.raises(TypeError): + ctx.set_client_ca_list("spam") + with pytest.raises(TypeError): + ctx.set_client_ca_list(["spam"]) def test_set_empty_ca_list(self): """ - If passed an empty list, :py:obj:`Context.set_client_ca_list` - configures the context to send no CA names to the client and, on both - the server and client sides, :py:obj:`Connection.get_client_ca_list` - returns an empty list after the connection is set up. + If passed an empty list, `Context.set_client_ca_list` configures the + context to send no CA names to the client and, on both the server and + client sides, `Connection.get_client_ca_list` returns an empty list + after the connection is set up. """ def no_ca(ctx): ctx.set_client_ca_list([]) @@ -3419,9 +3350,9 @@ class MemoryBIOTests(TestCase, _LoopbackMixin): def test_set_one_ca_list(self): """ If passed a list containing a single X509Name, - :py:obj:`Context.set_client_ca_list` configures the context to send + `Context.set_client_ca_list` configures the context to send that CA name to the client and, on both the server and client sides, - :py:obj:`Connection.get_client_ca_list` returns a list containing that + `Connection.get_client_ca_list` returns a list containing that X509Name after the connection is set up. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) @@ -3435,9 +3366,9 @@ class MemoryBIOTests(TestCase, _LoopbackMixin): def test_set_multiple_ca_list(self): """ If passed a list containing multiple X509Name objects, - :py:obj:`Context.set_client_ca_list` configures the context to send + `Context.set_client_ca_list` configures the context to send those CA names to the client and, on both the server and client sides, - :py:obj:`Connection.get_client_ca_list` returns a list containing those + `Connection.get_client_ca_list` returns a list containing those X509Names after the connection is set up. """ secert = load_certificate(FILETYPE_PEM, server_cert_pem) @@ -3455,7 +3386,7 @@ class MemoryBIOTests(TestCase, _LoopbackMixin): def test_reset_ca_list(self): """ If called multiple times, only the X509Names passed to the final call - of :py:obj:`Context.set_client_ca_list` are used to configure the CA + of `Context.set_client_ca_list` are used to configure the CA names sent to the client. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) @@ -3474,7 +3405,7 @@ class MemoryBIOTests(TestCase, _LoopbackMixin): def test_mutated_ca_list(self): """ - If the list passed to :py:obj:`Context.set_client_ca_list` is mutated + If the list passed to `Context.set_client_ca_list` is mutated afterwards, this does not affect the list of CA names sent to the client. """ @@ -3491,21 +3422,19 @@ class MemoryBIOTests(TestCase, _LoopbackMixin): return [cadesc] self._check_client_ca_list(mutated_ca) - def test_add_client_ca_errors(self): + def test_add_client_ca_wrong_args(self): """ - :py:obj:`Context.add_client_ca` raises :py:obj:`TypeError` if called - with a non-X509 object or with a number of arguments other than one. + `Context.add_client_ca` raises `TypeError` if called with + a non-X509 object. """ ctx = Context(TLSv1_METHOD) - cacert = load_certificate(FILETYPE_PEM, root_cert_pem) - self.assertRaises(TypeError, ctx.add_client_ca) - self.assertRaises(TypeError, ctx.add_client_ca, "spam") - self.assertRaises(TypeError, ctx.add_client_ca, cacert, cacert) + with pytest.raises(TypeError): + ctx.add_client_ca("spam") def test_one_add_client_ca(self): """ A certificate's subject can be added as a CA to be sent to the client - with :py:obj:`Context.add_client_ca`. + with `Context.add_client_ca`. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) cadesc = cacert.get_subject() @@ -3518,7 +3447,7 @@ class MemoryBIOTests(TestCase, _LoopbackMixin): def test_multiple_add_client_ca(self): """ Multiple CA names can be sent to the client by calling - :py:obj:`Context.add_client_ca` with multiple X509 objects. + `Context.add_client_ca` with multiple X509 objects. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) secert = load_certificate(FILETYPE_PEM, server_cert_pem) @@ -3534,8 +3463,8 @@ class MemoryBIOTests(TestCase, _LoopbackMixin): def test_set_and_add_client_ca(self): """ - A call to :py:obj:`Context.set_client_ca_list` followed by a call to - :py:obj:`Context.add_client_ca` results in using the CA names from the + A call to `Context.set_client_ca_list` followed by a call to + `Context.add_client_ca` results in using the CA names from the first call and the CA name from the second call. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) @@ -3554,8 +3483,8 @@ class MemoryBIOTests(TestCase, _LoopbackMixin): def test_set_after_add_client_ca(self): """ - A call to :py:obj:`Context.set_client_ca_list` after a call to - :py:obj:`Context.add_client_ca` replaces the CA name specified by the + A call to `Context.set_client_ca_list` after a call to + `Context.add_client_ca` replaces the CA name specified by the former call with the names specified by the latter call. """ cacert = load_certificate(FILETYPE_PEM, root_cert_pem) @@ -3573,7 +3502,7 @@ class MemoryBIOTests(TestCase, _LoopbackMixin): self._check_client_ca_list(set_replaces_add_ca) -class InfoConstantTests(TestCase): +class TestInfoConstants(object): """ Tests for assorted constants exposed for use in info callbacks. """ @@ -3642,7 +3571,7 @@ class TestRequires(object): assert results == [] -class TestOCSP(_LoopbackMixin): +class TestOCSP(object): """ Tests for PyOpenSSL's OCSP stapling support. """ @@ -3699,7 +3628,7 @@ class TestOCSP(_LoopbackMixin): callback=ocsp_callback, data=None, request_ocsp=False ) server = self._server_connection(callback=ocsp_callback, data=None) - self._handshakeInMemory(client, server) + handshake_in_memory(client, server) assert not called @@ -3715,8 +3644,8 @@ class TestOCSP(_LoopbackMixin): return True client = self._client_connection(callback=ocsp_callback, data=None) - server = self._loopbackServerFactory(socket=None) - self._handshakeInMemory(client, server) + server = loopback_server_factory(socket=None) + handshake_in_memory(client, server) assert len(called) == 1 assert called[0] == b'' @@ -3736,7 +3665,7 @@ class TestOCSP(_LoopbackMixin): client = self._client_connection(callback=client_callback, data=None) server = self._server_connection(callback=server_callback, data=None) - self._handshakeInMemory(client, server) + handshake_in_memory(client, server) assert len(calls) == 1 assert calls[0] == self.sample_ocsp_data @@ -3758,7 +3687,7 @@ class TestOCSP(_LoopbackMixin): client = self._client_connection(callback=client_callback, data=None) server = self._server_connection(callback=server_callback, data=None) - self._handshakeInMemory(client, server) + handshake_in_memory(client, server) assert len(client_calls) == 1 assert len(server_calls) == 1 @@ -3788,7 +3717,7 @@ class TestOCSP(_LoopbackMixin): server = self._server_connection( callback=server_callback, data=sentinel ) - self._handshakeInMemory(client, server) + handshake_in_memory(client, server) assert len(calls) == 2 assert calls[0][-1] is sentinel @@ -3810,7 +3739,7 @@ class TestOCSP(_LoopbackMixin): client = self._client_connection(callback=client_callback, data=None) server = self._server_connection(callback=server_callback, data=None) - self._handshakeInMemory(client, server) + handshake_in_memory(client, server) assert len(client_calls) == 1 assert client_calls[0] == b'' @@ -3829,7 +3758,7 @@ class TestOCSP(_LoopbackMixin): server = self._server_connection(callback=server_callback, data=None) with pytest.raises(Error): - self._handshakeInMemory(client, server) + handshake_in_memory(client, server) def test_exceptions_in_client_bubble_up(self): """ @@ -3848,7 +3777,7 @@ class TestOCSP(_LoopbackMixin): server = self._server_connection(callback=server_callback, data=None) with pytest.raises(SentinelException): - self._handshakeInMemory(client, server) + handshake_in_memory(client, server) def test_exceptions_in_server_bubble_up(self): """ @@ -3867,7 +3796,7 @@ class TestOCSP(_LoopbackMixin): server = self._server_connection(callback=server_callback, data=None) with pytest.raises(SentinelException): - self._handshakeInMemory(client, server) + handshake_in_memory(client, server) def test_server_must_return_bytes(self): """ @@ -3883,4 +3812,4 @@ class TestOCSP(_LoopbackMixin): server = self._server_connection(callback=server_callback, data=None) with pytest.raises(TypeError): - self._handshakeInMemory(client, server) + handshake_in_memory(client, server) diff --git a/tests/util.py b/tests/util.py index e1b8656..4464379 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,313 +1,18 @@ # Copyright (C) Jean-Paul Calderone # Copyright (C) Twisted Matrix Laboratories. # See LICENSE for details. - """ Helpers for the OpenSSL test suite, largely copied from U{Twisted}. """ -import shutil -import sys -import traceback - -from tempfile import mktemp, mkdtemp -from unittest import TestCase - -import pytest - from six import PY3 -from OpenSSL._util import exception_from_error_queue -from OpenSSL.crypto import Error - - -from . import memdbg - -from OpenSSL._util import ffi, lib - # This is the UTF-8 encoding of the SNOWMAN unicode code point. NON_ASCII = b"\xe2\x98\x83".decode("utf-8") -class TestCase(TestCase): - """ - :py:class:`TestCase` adds useful testing functionality beyond what is - available from the standard library :py:class:`unittest.TestCase`. - """ - - def run(self, result): - run = super(TestCase, self).run - if memdbg.heap is None: - return run(result) - - # Run the test as usual - before = set(memdbg.heap) - run(result) - - # Clean up some long-lived allocations so they won't be reported as - # memory leaks. - lib.CRYPTO_cleanup_all_ex_data() - lib.ERR_clear_error() - after = set(memdbg.heap) - - if not after - before: - # No leaks, fast succeed - return - - if result.wasSuccessful(): - # If it passed, run it again with memory debugging - before = set(memdbg.heap) - run(result) - - # Clean up some long-lived allocations so they won't be reported as - # memory leaks. - lib.CRYPTO_cleanup_all_ex_data() - lib.ERR_clear_error() - - after = set(memdbg.heap) - - self._reportLeaks(after - before, result) - - def _reportLeaks(self, leaks, result): - def format_leak(p): - """ - c_stack looks something like this (interesting parts indicated - with inserted arrows not part of the data): - - cpython/2.7/python(PyCFunction_Call+0x8b) [0x56265a] - cpython/2.7/python() [0x4d5f52] - cpython/2.7/python(PyEval_EvalFrameEx+0x753b) [0x4d0e1e] - cpython/2.7/python() [0x4d6419] - cpython/2.7/python() [0x4d6129] - cpython/2.7/python(PyEval_EvalFrameEx+0x753b) [0x4d0e1e] - cpython/2.7/python(PyEval_EvalCodeEx+0x1043) [0x4d3726] - cpython/2.7/python() [0x55fd51] - cpython/2.7/python(PyObject_Call+0x7e) [0x420ee6] - cpython/2.7/python(PyEval_CallObjectWithKeywords+0x158) [0x4d56ec] - _cffi_backend.so(+0xe96e) [0x7fe2e38be96e] - libffi.so.6(ffi_closure_unix64_inner+0x1b9) [0x7fe2e36ad819] - libffi.so.6(ffi_closure_unix64+0x46) [0x7fe2e36adb7c] - - |----- end interesting - v - libcrypto.so.1.0.0(CRYPTO_malloc+0x64) [0x7fe2e1cef784] - libcrypto.so.1.0.0(lh_insert+0x16b) [0x7fe2e1d6a24b] - libcrypto.so.1.0.0(+0x61c18) [0x7fe2e1cf0c18] - libcrypto.so.1.0.0(+0x625ec) [0x7fe2e1cf15ec] - libcrypto.so.1.0.0(DSA_new_method+0xe6) [0x7fe2e1d524d6] - libcrypto.so.1.0.0(DSA_generate_parameters+0x3a) [0x7fe2e1d5364a] - ^ - |----- begin interesting - - _cffi__x305d4698xb539baaa.so(+0x1f397) [0x7fe2df84d397] - cpython/2.7/python(PyCFunction_Call+0x8b) [0x56265a] - cpython/2.7/python() [0x4d5f52] - cpython/2.7/python(PyEval_EvalFrameEx+0x753b) [0x4d0e1e] - cpython/2.7/python() [0x4d6419] - ... - - Notice the stack is upside down compared to a Python traceback. - Identify the start and end of interesting bits and stuff it into - the stack we report. - """ - stacks = memdbg.heap[p] - # Eventually look at multiple stacks for the realloc() case. For - # now just look at the original allocation location. - (size, python_stack, c_stack) = stacks[0] - - stack = traceback.format_list(python_stack)[:-1] - saved = list(c_stack) - - # Figure the first interesting frame will be after a the - # cffi-compiled module - while c_stack and '/__pycache__/_cffi__' not in c_stack[-1]: - c_stack.pop() - - # Figure the last interesting frame will always be CRYPTO_malloc, - # since that's where we hooked in to things. - while ( - c_stack and 'CRYPTO_malloc' not in c_stack[0] and - 'CRYPTO_realloc' not in c_stack[0] - ): - c_stack.pop(0) - - if c_stack: - c_stack.reverse() - else: - c_stack = saved[::-1] - stack.extend([frame + "\n" for frame in c_stack]) - - stack.insert(0, "Leaked (%s) at:\n") - return "".join(stack) - - if leaks: - unique_leaks = {} - for p in leaks: - size = memdbg.heap[p][-1][0] - new_leak = format_leak(p) - if new_leak not in unique_leaks: - unique_leaks[new_leak] = [(size, p)] - else: - unique_leaks[new_leak].append((size, p)) - memdbg.free(p) - - for (stack, allocs) in unique_leaks.iteritems(): - allocs_accum = [] - for (size, pointer) in allocs: - - addr = int(ffi.cast('uintptr_t', pointer)) - allocs_accum.append("%d@0x%x" % (size, addr)) - allocs_report = ", ".join(sorted(allocs_accum)) - - result.addError( - self, - (None, Exception(stack % (allocs_report,)), None)) - - _tmpdir = None - - @property - def tmpdir(self): - """ - On demand create a temporary directory. - """ - if self._tmpdir is not None: - return self._tmpdir - - self._tmpdir = mkdtemp(dir=".") - return self._tmpdir - - def tearDown(self): - """ - Clean up any files or directories created using - :py:meth:`TestCase.mktemp`. Subclasses must invoke this method if they - override it or the cleanup will not occur. - """ - if self._tmpdir is not None: - shutil.rmtree(self._tmpdir) - - try: - exception_from_error_queue(Error) - except Error: - e = sys.exc_info()[1] - if e.args != ([],): - self.fail( - "Left over errors in OpenSSL error queue: " + repr(e) - ) - - def assertIsInstance(self, instance, classOrTuple, message=None): - """ - Fail if C{instance} is not an instance of the given class or of - one of the given classes. - - @param instance: the object to test the type (first argument of the - C{isinstance} call). - @type instance: any. - @param classOrTuple: the class or classes to test against (second - argument of the C{isinstance} call). - @type classOrTuple: class, type, or tuple. - - @param message: Custom text to include in the exception text if the - assertion fails. - """ - assert isinstance(instance, classOrTuple) - - def failUnlessIn(self, containee, container, msg=None): - """ - Fail the test if :py:data:`containee` is not found in - :py:data:`container`. - - :param containee: the value that should be in :py:class:`container` - :param container: a sequence type, or in the case of a mapping type, - will follow semantics of 'if key in dict.keys()' - :param msg: if msg is None, then the failure message will be - '%r not in %r' % (first, second) - """ - assert containee in container - assertIn = failUnlessIn - - def assertNotIn(self, containee, container, msg=None): - """ - Fail the test if C{containee} is found in C{container}. - - @param containee: the value that should not be in C{container} - @param container: a sequence type, or in the case of a mapping type, - will follow semantics of 'if key in dict.keys()' - @param msg: if msg is None, then the failure message will be - '%r in %r' % (first, second) - """ - assert containee not in container - failIfIn = assertNotIn - - def assertIs(self, first, second, msg=None): - """ - Fail the test if :py:data:`first` is not :py:data:`second`. This is an - obect-identity-equality test, not an object equality - (i.e. :py:func:`__eq__`) test. - - :param msg: if msg is None, then the failure message will be - '%r is not %r' % (first, second) - """ - assert first is second - assertIdentical = failUnlessIdentical = assertIs - - def assertIsNot(self, first, second, msg=None): - """ - Fail the test if :py:data:`first` is :py:data:`second`. This is an - obect-identity-equality test, not an object equality - (i.e. :py:func:`__eq__`) test. - - :param msg: if msg is None, then the failure message will be - '%r is %r' % (first, second) - """ - assert first is not second - assertNotIdentical = failIfIdentical = assertIsNot - - def failUnlessRaises(self, exception, f, *args, **kwargs): - """ - Fail the test unless calling the function :py:data:`f` with the given - :py:data:`args` and :py:data:`kwargs` raises :py:data:`exception`. The - failure will report the traceback and call stack of the unexpected - exception. - - :param exception: exception type that is to be expected - :param f: the function to call - - :return: The raised exception instance, if it is of the given type. - :raise self.failureException: Raised if the function call does - not raise an exception or if it raises an exception of a - different type. - """ - with pytest.raises(exception) as cm: - f(*args, **kwargs) - return cm.value - assertRaises = failUnlessRaises - - def mktemp(self): - """ - Return UTF-8-encoded bytes of a path to a tmp file. - - The file will be cleaned up after the test run. - """ - return mktemp(dir=self.tmpdir).encode("utf-8") - - # Other stuff - def assertConsistentType(self, theType, name, *constructionArgs): - """ - Perform various assertions about :py:data:`theType` to ensure that it - is a well-defined type. This is useful for extension types, where it's - pretty easy to do something wacky. If something about the type is - unusual, an exception will be raised. - - :param theType: The type object about which to make assertions. - :param name: A string giving the name of the type. - :param constructionArgs: Positional arguments to use with - :py:data:`theType` to create an instance of it. - """ - assert is_consistent_type(theType, name, *constructionArgs) - - def is_consistent_type(theType, name, *constructionArgs): """ Perform various assertions about *theType* to ensure that it is a -- cgit v1.2.1