summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Chan <alex@alexwlchan.net>2017-01-30 14:04:47 +0000
committerHynek Schlawack <hs@ox.cx>2017-01-30 15:04:47 +0100
commitb748099426db3963c7b06d2e0aec29e12e016c07 (patch)
tree566b566cb2029e76d6ddc6234adb83952aff324a
parent1089ed2ea06b8fde66d03d1cababefe9617d6585 (diff)
downloadpyopenssl-b748099426db3963c7b06d2e0aec29e12e016c07.tar.gz
Rip out the last vestages of unittest from the test suite (#599)
-rw-r--r--tests/test_ssl.py635
-rw-r--r--tests/util.py295
2 files changed, 282 insertions, 648 deletions
diff --git a/tests/test_ssl.py b/tests/test_ssl.py
index 14b2310..d2a56b7 100644
--- a/tests/test_ssl.py
+++ b/tests/test_ssl.py
@@ -15,7 +15,7 @@ from socket import MSG_PEEK, SHUT_RDWR, error, socket
from os import makedirs
from os.path import join
from weakref import ref
-from warnings import catch_warnings, simplefilter
+from warnings import simplefilter
import pytest
@@ -81,8 +81,7 @@ try:
except ImportError:
SSL_ST_INIT = SSL_ST_BEFORE = SSL_ST_OK = SSL_ST_RENEGOTIATE = None
-from .util import (
- WARNING_TYPE_EXPECTED, NON_ASCII, TestCase, is_consistent_type)
+from .util import WARNING_TYPE_EXPECTED, NON_ASCII, is_consistent_type
from .test_crypto import (
cleartextCertificatePEM, cleartextPrivateKeyPEM,
client_cert_pem, client_key_pem, server_cert_pem, server_key_pem,
@@ -222,28 +221,6 @@ def _create_certificate_chain():
return [(cakey, cacert), (ikey, icert), (skey, scert)]
-class _LoopbackMixin(object):
- """
- Helper mixin which defines methods for creating a connected socket pair and
- for forcing two connected SSL sockets to talk to each other via memory
- BIOs.
- """
- def _loopbackClientFactory(self, socket):
- return loopback_client_factory(socket)
-
- def _loopbackServerFactory(self, socket):
- return loopback_server_factory(socket)
-
- def _loopback(self, serverFactory=None, clientFactory=None):
- return loopback(serverFactory, clientFactory)
-
- def _interactInMemory(self, client_conn, server_conn):
- return interact_in_memory(client_conn, server_conn)
-
- def _handshakeInMemory(self, client_conn, server_conn):
- return handshake_in_memory(client_conn, server_conn)
-
-
def loopback_client_factory(socket):
client = Connection(Context(TLSv1_METHOD), socket)
client.set_connect_state()
@@ -342,32 +319,30 @@ def handshake_in_memory(client_conn, server_conn):
interact_in_memory(client_conn, server_conn)
-class VersionTests(TestCase):
+class TestVersion(object):
"""
- Tests for version information exposed by
- :py:obj:`OpenSSL.SSL.SSLeay_version` and
- :py:obj:`OpenSSL.SSL.OPENSSL_VERSION_NUMBER`.
+ Tests for version information exposed by `OpenSSL.SSL.SSLeay_version` and
+ `OpenSSL.SSL.OPENSSL_VERSION_NUMBER`.
"""
def test_OPENSSL_VERSION_NUMBER(self):
"""
- :py:obj:`OPENSSL_VERSION_NUMBER` is an integer with status in the low
- byte and the patch, fix, minor, and major versions in the
- nibbles above that.
+ `OPENSSL_VERSION_NUMBER` is an integer with status in the low byte and
+ the patch, fix, minor, and major versions in the nibbles above that.
"""
- self.assertTrue(isinstance(OPENSSL_VERSION_NUMBER, int))
+ assert isinstance(OPENSSL_VERSION_NUMBER, int)
def test_SSLeay_version(self):
"""
- :py:obj:`SSLeay_version` takes a version type indicator and returns
- one of a number of version strings based on that indicator.
+ `SSLeay_version` takes a version type indicator and returns one of a
+ number of version strings based on that indicator.
"""
versions = {}
for t in [SSLEAY_VERSION, SSLEAY_CFLAGS, SSLEAY_BUILT_ON,
SSLEAY_PLATFORM, SSLEAY_DIR]:
version = SSLeay_version(t)
versions[version] = t
- self.assertTrue(isinstance(version, bytes))
- self.assertEqual(len(versions), 5)
+ assert isinstance(version, bytes)
+ assert len(versions) == 5
@pytest.fixture
@@ -2671,97 +2646,84 @@ class TestConnection(object):
assert 2 == len(data)
-class ConnectionGetCipherListTests(TestCase):
+class TestConnectionGetCipherList(object):
"""
- Tests for :py:obj:`Connection.get_cipher_list`.
+ Tests for `Connection.get_cipher_list`.
"""
- def test_wrong_args(self):
- """
- :py:obj:`Connection.get_cipher_list` raises :py:obj:`TypeError` if
- called with any arguments.
- """
- connection = Connection(Context(TLSv1_METHOD), None)
- self.assertRaises(TypeError, connection.get_cipher_list, None)
-
def test_result(self):
"""
- :py:obj:`Connection.get_cipher_list` returns a :py:obj:`list` of
- :py:obj:`bytes` giving the names of the ciphers which might be used.
+ `Connection.get_cipher_list` returns a list of `bytes` giving the
+ names of the ciphers which might be used.
"""
connection = Connection(Context(TLSv1_METHOD), None)
ciphers = connection.get_cipher_list()
- self.assertTrue(isinstance(ciphers, list))
+ assert isinstance(ciphers, list)
for cipher in ciphers:
- self.assertTrue(isinstance(cipher, str))
+ assert isinstance(cipher, str)
-class ConnectionSendTests(TestCase, _LoopbackMixin):
+class TestConnectionSend(object):
"""
- Tests for :py:obj:`Connection.send`
+ Tests for `Connection.send`.
"""
def test_wrong_args(self):
"""
When called with arguments other than string argument for its first
- parameter or more than two arguments, :py:obj:`Connection.send` raises
- :py:obj:`TypeError`.
+ parameter, `Connection.send` raises `TypeError`.
"""
connection = Connection(Context(TLSv1_METHOD), None)
- self.assertRaises(TypeError, connection.send)
- self.assertRaises(TypeError, connection.send, object())
- self.assertRaises(TypeError, connection.send, "foo", object(), "bar")
+ with pytest.raises(TypeError):
+ connection.send(object())
def test_short_bytes(self):
"""
- When passed a short byte string, :py:obj:`Connection.send` transmits
- all of it and returns the number of bytes sent.
+ When passed a short byte string, `Connection.send` transmits all of it
+ and returns the number of bytes sent.
"""
- server, client = self._loopback()
+ server, client = loopback()
count = server.send(b'xy')
- self.assertEquals(count, 2)
- self.assertEquals(client.recv(2), b'xy')
+ assert count == 2
+ assert client.recv(2) == b'xy'
def test_text(self):
"""
- When passed a text, :py:obj:`Connection.send` transmits all of it and
+ When passed a text, `Connection.send` transmits all of it and
returns the number of bytes sent. It also raises a DeprecationWarning.
"""
- server, client = self._loopback()
- with catch_warnings(record=True) as w:
+ server, client = loopback()
+ with pytest.warns(DeprecationWarning) as w:
simplefilter("always")
count = server.send(b"xy".decode("ascii"))
- self.assertEqual(
+ assert (
"{0} for buf is no longer accepted, use bytes".format(
WARNING_TYPE_EXPECTED
- ),
- str(w[-1].message)
- )
- self.assertIs(w[-1].category, DeprecationWarning)
- self.assertEquals(count, 2)
- self.assertEquals(client.recv(2), b"xy")
+ ) == str(w[-1].message))
+ assert count == 2
+ assert client.recv(2) == b'xy'
@skip_if_py26
def test_short_memoryview(self):
"""
When passed a memoryview onto a small number of bytes,
- :py:obj:`Connection.send` transmits all of them and returns the number
+ `Connection.send` transmits all of them and returns the number
of bytes sent.
"""
- server, client = self._loopback()
+ server, client = loopback()
count = server.send(memoryview(b'xy'))
- self.assertEquals(count, 2)
- self.assertEquals(client.recv(2), b'xy')
+ assert count == 2
+ assert client.recv(2) == b'xy'
@skip_if_py3
def test_short_buffer(self):
"""
When passed a buffer containing a small number of bytes,
- :py:obj:`Connection.send` transmits all of them and returns the number
+ `Connection.send` transmits all of them and returns the number
of bytes sent.
"""
- server, client = self._loopback()
+ server, client = loopback()
count = server.send(buffer(b'xy'))
- self.assertEquals(count, 2)
- self.assertEquals(client.recv(2), b'xy')
+ assert count == 2
+ assert client.recv(2) == b'xy'
def _make_memoryview(size):
@@ -2772,52 +2734,50 @@ def _make_memoryview(size):
return memoryview(bytearray(size))
-class ConnectionRecvIntoTests(TestCase, _LoopbackMixin):
+class TestConnectionRecvInto(object):
"""
- Tests for :py:obj:`Connection.recv_into`
+ Tests for `Connection.recv_into`.
"""
def _no_length_test(self, factory):
"""
- Assert that when the given buffer is passed to
- ``Connection.recv_into``, whatever bytes are available to be received
- that fit into that buffer are written into that buffer.
+ Assert that when the given buffer is passed to `Connection.recv_into`,
+ whatever bytes are available to be received that fit into that buffer
+ are written into that buffer.
"""
output_buffer = factory(5)
- server, client = self._loopback()
+ server, client = loopback()
server.send(b'xy')
- self.assertEqual(client.recv_into(output_buffer), 2)
- self.assertEqual(output_buffer, bytearray(b'xy\x00\x00\x00'))
+ assert client.recv_into(output_buffer) == 2
+ assert output_buffer == bytearray(b'xy\x00\x00\x00')
def test_bytearray_no_length(self):
"""
- :py:obj:`Connection.recv_into` can be passed a ``bytearray`` instance
- and data in the receive buffer is written to it.
+ `Connection.recv_into` can be passed a `bytearray` instance and data
+ in the receive buffer is written to it.
"""
self._no_length_test(bytearray)
def _respects_length_test(self, factory):
"""
- Assert that when the given buffer is passed to ``Connection.recv_into``
- along with a value for ``nbytes`` that is less than the size of that
- buffer, only ``nbytes`` bytes are written into the buffer.
+ Assert that when the given buffer is passed to `Connection.recv_into`
+ along with a value for `nbytes` that is less than the size of that
+ buffer, only `nbytes` bytes are written into the buffer.
"""
output_buffer = factory(10)
- server, client = self._loopback()
+ server, client = loopback()
server.send(b'abcdefghij')
- self.assertEqual(client.recv_into(output_buffer, 5), 5)
- self.assertEqual(
- output_buffer, bytearray(b'abcde\x00\x00\x00\x00\x00')
- )
+ assert client.recv_into(output_buffer, 5) == 5
+ assert output_buffer == bytearray(b'abcde\x00\x00\x00\x00\x00')
def test_bytearray_respects_length(self):
"""
- When called with a ``bytearray`` instance,
- :py:obj:`Connection.recv_into` respects the ``nbytes`` parameter and
- doesn't copy in more than that number of bytes.
+ When called with a `bytearray` instance, `Connection.recv_into`
+ respects the `nbytes` parameter and doesn't copy in more than that
+ number of bytes.
"""
self._respects_length_test(bytearray)
@@ -2825,153 +2785,144 @@ class ConnectionRecvIntoTests(TestCase, _LoopbackMixin):
"""
Assert that if there are more bytes available to be read from the
receive buffer than would fit into the buffer passed to
- :py:obj:`Connection.recv_into`, only as many as fit are written into
- it.
+ `Connection.recv_into`, only as many as fit are written into it.
"""
output_buffer = factory(5)
- server, client = self._loopback()
+ server, client = loopback()
server.send(b'abcdefghij')
- self.assertEqual(client.recv_into(output_buffer), 5)
- self.assertEqual(output_buffer, bytearray(b'abcde'))
+ assert client.recv_into(output_buffer) == 5
+ assert output_buffer == bytearray(b'abcde')
rest = client.recv(5)
- self.assertEqual(b'fghij', rest)
+ assert b'fghij' == rest
def test_bytearray_doesnt_overfill(self):
"""
- When called with a ``bytearray`` instance,
- :py:obj:`Connection.recv_into` respects the size of the array and
- doesn't write more bytes into it than will fit.
+ When called with a `bytearray` instance, `Connection.recv_into`
+ respects the size of the array and doesn't write more bytes into it
+ than will fit.
"""
self._doesnt_overfill_test(bytearray)
def test_bytearray_really_doesnt_overfill(self):
"""
- When called with a ``bytearray`` instance and an ``nbytes`` value that
- is too large, :py:obj:`Connection.recv_into` respects the size of the
- array and not the ``nbytes`` value and doesn't write more bytes into
- the buffer than will fit.
+ When called with a `bytearray` instance and an `nbytes` value that is
+ too large, `Connection.recv_into` respects the size of the array and
+ not the `nbytes` value and doesn't write more bytes into the buffer
+ than will fit.
"""
self._doesnt_overfill_test(bytearray)
def test_peek(self):
- server, client = self._loopback()
+ server, client = loopback()
server.send(b'xy')
for _ in range(2):
output_buffer = bytearray(5)
- self.assertEqual(
- client.recv_into(output_buffer, flags=MSG_PEEK), 2)
- self.assertEqual(output_buffer, bytearray(b'xy\x00\x00\x00'))
+ assert client.recv_into(output_buffer, flags=MSG_PEEK) == 2
+ assert output_buffer == bytearray(b'xy\x00\x00\x00')
@skip_if_py26
def test_memoryview_no_length(self):
"""
- :py:obj:`Connection.recv_into` can be passed a ``memoryview``
- instance and data in the receive buffer is written to it.
+ `Connection.recv_into` can be passed a `memoryview` instance and data
+ in the receive buffer is written to it.
"""
self._no_length_test(_make_memoryview)
@skip_if_py26
def test_memoryview_respects_length(self):
"""
- When called with a ``memoryview`` instance,
- :py:obj:`Connection.recv_into` respects the ``nbytes`` parameter
- and doesn't copy more than that number of bytes in.
+ When called with a `memoryview` instance, `Connection.recv_into`
+ respects the ``nbytes`` parameter and doesn't copy more than that
+ number of bytes in.
"""
self._respects_length_test(_make_memoryview)
@skip_if_py26
def test_memoryview_doesnt_overfill(self):
"""
- When called with a ``memoryview`` instance,
- :py:obj:`Connection.recv_into` respects the size of the array and
- doesn't write more bytes into it than will fit.
+ When called with a `memoryview` instance, `Connection.recv_into`
+ respects the size of the array and doesn't write more bytes into it
+ than will fit.
"""
self._doesnt_overfill_test(_make_memoryview)
@skip_if_py26
def test_memoryview_really_doesnt_overfill(self):
"""
- When called with a ``memoryview`` instance and an ``nbytes`` value
- that is too large, :py:obj:`Connection.recv_into` respects the size
- of the array and not the ``nbytes`` value and doesn't write more
- bytes into the buffer than will fit.
+ When called with a `memoryview` instance and an `nbytes` value that is
+ too large, `Connection.recv_into` respects the size of the array and
+ not the `nbytes` value and doesn't write more bytes into the buffer
+ than will fit.
"""
self._doesnt_overfill_test(_make_memoryview)
-class ConnectionSendallTests(TestCase, _LoopbackMixin):
+class TestConnectionSendall(object):
"""
- Tests for :py:obj:`Connection.sendall`.
+ Tests for `Connection.sendall`.
"""
def test_wrong_args(self):
"""
When called with arguments other than a string argument for its first
- parameter or with more than two arguments, :py:obj:`Connection.sendall`
- raises :py:obj:`TypeError`.
+ parameter, `Connection.sendall` raises `TypeError`.
"""
connection = Connection(Context(TLSv1_METHOD), None)
- self.assertRaises(TypeError, connection.sendall)
- self.assertRaises(TypeError, connection.sendall, object())
- self.assertRaises(
- TypeError, connection.sendall, "foo", object(), "bar")
+ with pytest.raises(TypeError):
+ connection.sendall(object())
def test_short(self):
"""
- :py:obj:`Connection.sendall` transmits all of the bytes in the string
+ `Connection.sendall` transmits all of the bytes in the string
passed to it.
"""
- server, client = self._loopback()
+ server, client = loopback()
server.sendall(b'x')
- self.assertEquals(client.recv(1), b'x')
+ assert client.recv(1) == b'x'
def test_text(self):
"""
- :py:obj:`Connection.sendall` transmits all the content in the string
- passed to it raising a DeprecationWarning in case of this being a text.
+ `Connection.sendall` transmits all the content in the string passed
+ to it, raising a DeprecationWarning in case of this being a text.
"""
- server, client = self._loopback()
- with catch_warnings(record=True) as w:
+ server, client = loopback()
+ with pytest.warns(DeprecationWarning) as w:
simplefilter("always")
server.sendall(b"x".decode("ascii"))
- self.assertEqual(
+ assert (
"{0} for buf is no longer accepted, use bytes".format(
WARNING_TYPE_EXPECTED
- ),
- str(w[-1].message)
- )
- self.assertIs(w[-1].category, DeprecationWarning)
- self.assertEquals(client.recv(1), b"x")
+ ) == str(w[-1].message))
+ assert client.recv(1) == b"x"
@skip_if_py26
def test_short_memoryview(self):
"""
When passed a memoryview onto a small number of bytes,
- :py:obj:`Connection.sendall` transmits all of them.
+ `Connection.sendall` transmits all of them.
"""
- server, client = self._loopback()
+ server, client = loopback()
server.sendall(memoryview(b'x'))
- self.assertEquals(client.recv(1), b'x')
+ assert client.recv(1) == b'x'
@skip_if_py3
def test_short_buffers(self):
"""
When passed a buffer containing a small number of bytes,
- :py:obj:`Connection.sendall` transmits all of them.
+ `Connection.sendall` transmits all of them.
"""
- server, client = self._loopback()
+ server, client = loopback()
server.sendall(buffer(b'x'))
- self.assertEquals(client.recv(1), b'x')
+ assert client.recv(1) == b'x'
def test_long(self):
"""
- :py:obj:`Connection.sendall` transmits all of the bytes in the string
- passed to it even if this requires multiple calls of an underlying
- write function.
+ `Connection.sendall` transmits all the bytes in the string passed to it
+ even if this requires multiple calls of an underlying write function.
"""
- server, client = self._loopback()
+ server, client = loopback()
# Should be enough, underlying SSL_write should only do 16k at a time.
# On Windows, after 32k of bytes the write will block (forever
# - because no one is yet reading).
@@ -2983,55 +2934,40 @@ class ConnectionSendallTests(TestCase, _LoopbackMixin):
data = client.recv(1024)
accum.append(data)
received += len(data)
- self.assertEquals(message, b''.join(accum))
+ assert message == b''.join(accum)
def test_closed(self):
"""
- If the underlying socket is closed, :py:obj:`Connection.sendall`
- propagates the write error from the low level write call.
+ If the underlying socket is closed, `Connection.sendall` propagates the
+ write error from the low level write call.
"""
- server, client = self._loopback()
+ server, client = loopback()
server.sock_shutdown(2)
- exc = self.assertRaises(SysCallError, server.sendall, b"hello, world")
+ with pytest.raises(SysCallError) as err:
+ server.sendall(b"hello, world")
if platform == "win32":
- self.assertEqual(exc.args[0], ESHUTDOWN)
+ assert err.value.args[0] == ESHUTDOWN
else:
- self.assertEqual(exc.args[0], EPIPE)
+ assert err.value.args[0] == EPIPE
-class ConnectionRenegotiateTests(TestCase, _LoopbackMixin):
+class TestConnectionRenegotiate(object):
"""
Tests for SSL renegotiation APIs.
"""
- def test_renegotiate_wrong_args(self):
- """
- :py:obj:`Connection.renegotiate` raises :py:obj:`TypeError` if called
- with any arguments.
- """
- connection = Connection(Context(TLSv1_METHOD), None)
- self.assertRaises(TypeError, connection.renegotiate, None)
-
- def test_total_renegotiations_wrong_args(self):
- """
- :py:obj:`Connection.total_renegotiations` raises :py:obj:`TypeError` if
- called with any arguments.
- """
- connection = Connection(Context(TLSv1_METHOD), None)
- self.assertRaises(TypeError, connection.total_renegotiations, None)
-
def test_total_renegotiations(self):
"""
- :py:obj:`Connection.total_renegotiations` returns :py:obj:`0` before
- any renegotiations have happened.
+ `Connection.total_renegotiations` returns `0` before any renegotiations
+ have happened.
"""
connection = Connection(Context(TLSv1_METHOD), None)
- self.assertEquals(connection.total_renegotiations(), 0)
+ assert connection.total_renegotiations() == 0
def test_renegotiate(self):
"""
Go through a complete renegotiation cycle.
"""
- server, client = self._loopback()
+ server, client = loopback()
server.send(b"hello world")
@@ -3055,21 +2991,21 @@ class ConnectionRenegotiateTests(TestCase, _LoopbackMixin):
pass
-class ErrorTests(TestCase):
+class TestError(object):
"""
- Unit tests for :py:obj:`OpenSSL.SSL.Error`.
+ Unit tests for `OpenSSL.SSL.Error`.
"""
def test_type(self):
"""
- :py:obj:`Error` is an exception type.
+ `Error` is an exception type.
"""
- self.assertTrue(issubclass(Error, Exception))
- self.assertEqual(Error.__name__, 'Error')
+ assert issubclass(Error, Exception)
+ assert Error.__name__ == 'Error'
-class ConstantsTests(TestCase):
+class TestConstants(object):
"""
- Tests for the values of constants exposed in :py:obj:`OpenSSL.SSL`.
+ Tests for the values of constants exposed in `OpenSSL.SSL`.
These are values defined by OpenSSL intended only to be used as flags to
OpenSSL APIs. The only assertions it seems can be made about them is
@@ -3081,10 +3017,10 @@ class ConstantsTests(TestCase):
)
def test_op_no_query_mtu(self):
"""
- The value of :py:obj:`OpenSSL.SSL.OP_NO_QUERY_MTU` is 0x1000, the value
- of :py:const:`SSL_OP_NO_QUERY_MTU` defined by :file:`openssl/ssl.h`.
+ The value of `OpenSSL.SSL.OP_NO_QUERY_MTU` is 0x1000, the value
+ of `SSL_OP_NO_QUERY_MTU` defined by `openssl/ssl.h`.
"""
- self.assertEqual(OP_NO_QUERY_MTU, 0x1000)
+ assert OP_NO_QUERY_MTU == 0x1000
@pytest.mark.skipif(
OP_COOKIE_EXCHANGE is None,
@@ -3093,11 +3029,10 @@ class ConstantsTests(TestCase):
)
def test_op_cookie_exchange(self):
"""
- The value of :py:obj:`OpenSSL.SSL.OP_COOKIE_EXCHANGE` is 0x2000, the
- value of :py:const:`SSL_OP_COOKIE_EXCHANGE` defined by
- :file:`openssl/ssl.h`.
+ The value of `OpenSSL.SSL.OP_COOKIE_EXCHANGE` is 0x2000, the
+ value of `SSL_OP_COOKIE_EXCHANGE` defined by `openssl/ssl.h`.
"""
- self.assertEqual(OP_COOKIE_EXCHANGE, 0x2000)
+ assert OP_COOKIE_EXCHANGE == 0x2000
@pytest.mark.skipif(
OP_NO_TICKET is None,
@@ -3105,10 +3040,10 @@ class ConstantsTests(TestCase):
)
def test_op_no_ticket(self):
"""
- The value of :py:obj:`OpenSSL.SSL.OP_NO_TICKET` is 0x4000, the value of
- :py:const:`SSL_OP_NO_TICKET` defined by :file:`openssl/ssl.h`.
+ The value of `OpenSSL.SSL.OP_NO_TICKET` is 0x4000, the value of
+ `SSL_OP_NO_TICKET` defined by `openssl/ssl.h`.
"""
- self.assertEqual(OP_NO_TICKET, 0x4000)
+ assert OP_NO_TICKET == 0x4000
@pytest.mark.skipif(
OP_NO_COMPRESSION is None,
@@ -3116,81 +3051,79 @@ class ConstantsTests(TestCase):
)
def test_op_no_compression(self):
"""
- The value of :py:obj:`OpenSSL.SSL.OP_NO_COMPRESSION` is 0x20000, the
- value of :py:const:`SSL_OP_NO_COMPRESSION` defined by
- :file:`openssl/ssl.h`.
+ The value of `OpenSSL.SSL.OP_NO_COMPRESSION` is 0x20000, the
+ value of `SSL_OP_NO_COMPRESSION` defined by `openssl/ssl.h`.
"""
- self.assertEqual(OP_NO_COMPRESSION, 0x20000)
+ assert OP_NO_COMPRESSION == 0x20000
def test_sess_cache_off(self):
"""
- The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_OFF` 0x0, the value of
- :py:obj:`SSL_SESS_CACHE_OFF` defined by ``openssl/ssl.h``.
+ The value of `OpenSSL.SSL.SESS_CACHE_OFF` 0x0, the value of
+ `SSL_SESS_CACHE_OFF` defined by `openssl/ssl.h`.
"""
- self.assertEqual(0x0, SESS_CACHE_OFF)
+ assert 0x0 == SESS_CACHE_OFF
def test_sess_cache_client(self):
"""
- The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_CLIENT` 0x1, the value of
- :py:obj:`SSL_SESS_CACHE_CLIENT` defined by ``openssl/ssl.h``.
+ The value of `OpenSSL.SSL.SESS_CACHE_CLIENT` 0x1, the value of
+ `SSL_SESS_CACHE_CLIENT` defined by `openssl/ssl.h`.
"""
- self.assertEqual(0x1, SESS_CACHE_CLIENT)
+ assert 0x1 == SESS_CACHE_CLIENT
def test_sess_cache_server(self):
"""
- The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_SERVER` 0x2, the value of
- :py:obj:`SSL_SESS_CACHE_SERVER` defined by ``openssl/ssl.h``.
+ The value of `OpenSSL.SSL.SESS_CACHE_SERVER` 0x2, the value of
+ `SSL_SESS_CACHE_SERVER` defined by `openssl/ssl.h`.
"""
- self.assertEqual(0x2, SESS_CACHE_SERVER)
+ assert 0x2 == SESS_CACHE_SERVER
def test_sess_cache_both(self):
"""
- The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_BOTH` 0x3, the value of
- :py:obj:`SSL_SESS_CACHE_BOTH` defined by ``openssl/ssl.h``.
+ The value of `OpenSSL.SSL.SESS_CACHE_BOTH` 0x3, the value of
+ `SSL_SESS_CACHE_BOTH` defined by `openssl/ssl.h`.
"""
- self.assertEqual(0x3, SESS_CACHE_BOTH)
+ assert 0x3 == SESS_CACHE_BOTH
def test_sess_cache_no_auto_clear(self):
"""
- The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_AUTO_CLEAR` 0x80, the
- value of :py:obj:`SSL_SESS_CACHE_NO_AUTO_CLEAR` defined by
- ``openssl/ssl.h``.
+ The value of `OpenSSL.SSL.SESS_CACHE_NO_AUTO_CLEAR` 0x80, the
+ value of `SSL_SESS_CACHE_NO_AUTO_CLEAR` defined by
+ `openssl/ssl.h`.
"""
- self.assertEqual(0x80, SESS_CACHE_NO_AUTO_CLEAR)
+ assert 0x80 == SESS_CACHE_NO_AUTO_CLEAR
def test_sess_cache_no_internal_lookup(self):
"""
- The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_LOOKUP` 0x100,
- the value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL_LOOKUP` defined by
- ``openssl/ssl.h``.
+ The value of `OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_LOOKUP` 0x100,
+ the value of `SSL_SESS_CACHE_NO_INTERNAL_LOOKUP` defined by
+ `openssl/ssl.h`.
"""
- self.assertEqual(0x100, SESS_CACHE_NO_INTERNAL_LOOKUP)
+ assert 0x100 == SESS_CACHE_NO_INTERNAL_LOOKUP
def test_sess_cache_no_internal_store(self):
"""
- The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_STORE` 0x200,
- the value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL_STORE` defined by
- ``openssl/ssl.h``.
+ The value of `OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_STORE` 0x200,
+ the value of `SSL_SESS_CACHE_NO_INTERNAL_STORE` defined by
+ `openssl/ssl.h`.
"""
- self.assertEqual(0x200, SESS_CACHE_NO_INTERNAL_STORE)
+ assert 0x200 == SESS_CACHE_NO_INTERNAL_STORE
def test_sess_cache_no_internal(self):
"""
- The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL` 0x300, the
- value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL` defined by
- ``openssl/ssl.h``.
+ The value of `OpenSSL.SSL.SESS_CACHE_NO_INTERNAL` 0x300, the
+ value of `SSL_SESS_CACHE_NO_INTERNAL` defined by
+ `openssl/ssl.h`.
"""
- self.assertEqual(0x300, SESS_CACHE_NO_INTERNAL)
+ assert 0x300 == SESS_CACHE_NO_INTERNAL
-class MemoryBIOTests(TestCase, _LoopbackMixin):
+class TestMemoryBIO(object):
"""
- Tests for :py:obj:`OpenSSL.SSL.Connection` using a memory BIO.
+ Tests for `OpenSSL.SSL.Connection` using a memory BIO.
"""
def _server(self, sock):
"""
- Create a new server-side SSL :py:obj:`Connection` object wrapped around
- :py:obj:`sock`.
+ Create a new server-side SSL `Connection` object wrapped around `sock`.
"""
# Create the server side Connection. This is mostly setup boilerplate
# - use TLSv1, use a particular certificate, etc.
@@ -3215,8 +3148,7 @@ class MemoryBIOTests(TestCase, _LoopbackMixin):
def _client(self, sock):
"""
- Create a new client-side SSL :py:obj:`Connection` object wrapped around
- :py:obj:`sock`.
+ Create a new client-side SSL `Connection` object wrapped around `sock`.
"""
# Now create the client side Connection. Similar boilerplate to the
# above.
@@ -3237,179 +3169,178 @@ class MemoryBIOTests(TestCase, _LoopbackMixin):
client_conn.set_connect_state()
return client_conn
- def test_memoryConnect(self):
+ def test_memory_connect(self):
"""
- Two :py:obj:`Connection`s which use memory BIOs can be manually
- connected by reading from the output of each and writing those bytes to
- the input of the other and in this way establish a connection and
- exchange application-level bytes with each other.
+ Two `Connection`s which use memory BIOs can be manually connected by
+ reading from the output of each and writing those bytes to the input of
+ the other and in this way establish a connection and exchange
+ application-level bytes with each other.
"""
server_conn = self._server(None)
client_conn = self._client(None)
# There should be no key or nonces yet.
- self.assertIdentical(server_conn.master_key(), None)
- self.assertIdentical(server_conn.client_random(), None)
- self.assertIdentical(server_conn.server_random(), None)
+ assert server_conn.master_key() is None
+ assert server_conn.client_random() is None
+ assert server_conn.server_random() is None
# First, the handshake needs to happen. We'll deliver bytes back and
# forth between the client and server until neither of them feels like
# speaking any more.
- self.assertIdentical(
- self._interactInMemory(client_conn, server_conn), None)
+ assert interact_in_memory(client_conn, server_conn) is None
# Now that the handshake is done, there should be a key and nonces.
- self.assertNotIdentical(server_conn.master_key(), None)
- self.assertNotIdentical(server_conn.client_random(), None)
- self.assertNotIdentical(server_conn.server_random(), None)
- self.assertEquals(
- server_conn.client_random(), client_conn.client_random())
- self.assertEquals(
- server_conn.server_random(), client_conn.server_random())
- self.assertNotEquals(
- server_conn.client_random(), server_conn.server_random())
- self.assertNotEquals(
- client_conn.client_random(), client_conn.server_random())
+ assert server_conn.master_key() is not None
+ assert server_conn.client_random() is not None
+ assert server_conn.server_random() is not None
+ assert server_conn.client_random() == client_conn.client_random()
+ assert server_conn.server_random() == client_conn.server_random()
+ assert server_conn.client_random() != server_conn.server_random()
+ assert client_conn.client_random() != client_conn.server_random()
# Here are the bytes we'll try to send.
important_message = b'One if by land, two if by sea.'
server_conn.write(important_message)
- self.assertEquals(
- self._interactInMemory(client_conn, server_conn),
+ assert (
+ interact_in_memory(client_conn, server_conn) ==
(client_conn, important_message))
client_conn.write(important_message[::-1])
- self.assertEquals(
- self._interactInMemory(client_conn, server_conn),
+ assert (
+ interact_in_memory(client_conn, server_conn) ==
(server_conn, important_message[::-1]))
- def test_socketConnect(self):
+ def test_socket_connect(self):
"""
- Just like :py:obj:`test_memoryConnect` but with an actual socket.
+ Just like `test_memory_connect` but with an actual socket.
This is primarily to rule out the memory BIO code as the source of any
- problems encountered while passing data over a :py:obj:`Connection` (if
+ problems encountered while passing data over a `Connection` (if
this test fails, there must be a problem outside the memory BIO code,
as no memory BIO is involved here). Even though this isn't a memory
BIO test, it's convenient to have it here.
"""
- server_conn, client_conn = self._loopback()
+ server_conn, client_conn = loopback()
important_message = b"Help me Obi Wan Kenobi, you're my only hope."
client_conn.send(important_message)
msg = server_conn.recv(1024)
- self.assertEqual(msg, important_message)
+ assert msg == important_message
# Again in the other direction, just for fun.
important_message = important_message[::-1]
server_conn.send(important_message)
msg = client_conn.recv(1024)
- self.assertEqual(msg, important_message)
+ assert msg == important_message
- def test_socketOverridesMemory(self):
+ def test_socket_overrides_memory(self):
"""
- Test that :py:obj:`OpenSSL.SSL.bio_read` and
- :py:obj:`OpenSSL.SSL.bio_write` don't work on
- :py:obj:`OpenSSL.SSL.Connection`() that use sockets.
+ Test that `OpenSSL.SSL.bio_read` and `OpenSSL.SSL.bio_write` don't
+ work on `OpenSSL.SSL.Connection`() that use sockets.
"""
context = Context(TLSv1_METHOD)
client = socket()
clientSSL = Connection(context, client)
- self.assertRaises(TypeError, clientSSL.bio_read, 100)
- self.assertRaises(TypeError, clientSSL.bio_write, "foo")
- self.assertRaises(TypeError, clientSSL.bio_shutdown)
+ with pytest.raises(TypeError):
+ clientSSL.bio_read(100)
+ with pytest.raises(TypeError):
+ clientSSL.bio_write("foo")
+ with pytest.raises(TypeError):
+ clientSSL.bio_shutdown()
- def test_outgoingOverflow(self):
+ def test_outgoing_overflow(self):
"""
If more bytes than can be written to the memory BIO are passed to
- :py:obj:`Connection.send` at once, the number of bytes which were
- written is returned and that many bytes from the beginning of the input
- can be read from the other end of the connection.
+ `Connection.send` at once, the number of bytes which were written is
+ returned and that many bytes from the beginning of the input can be
+ read from the other end of the connection.
"""
server = self._server(None)
client = self._client(None)
- self._interactInMemory(client, server)
+ interact_in_memory(client, server)
size = 2 ** 15
sent = client.send(b"x" * size)
# Sanity check. We're trying to test what happens when the entire
# input can't be sent. If the entire input was sent, this test is
# meaningless.
- self.assertTrue(sent < size)
+ assert sent < size
- receiver, received = self._interactInMemory(client, server)
- self.assertIdentical(receiver, server)
+ receiver, received = interact_in_memory(client, server)
+ assert receiver is server
# We can rely on all of these bytes being received at once because
- # _loopback passes 2 ** 16 to recv - more than 2 ** 15.
- self.assertEquals(len(received), sent)
+ # loopback passes 2 ** 16 to recv - more than 2 ** 15.
+ assert len(received) == sent
def test_shutdown(self):
"""
- :py:obj:`Connection.bio_shutdown` signals the end of the data stream
- from which the :py:obj:`Connection` reads.
+ `Connection.bio_shutdown` signals the end of the data stream
+ from which the `Connection` reads.
"""
server = self._server(None)
server.bio_shutdown()
- e = self.assertRaises(Error, server.recv, 1024)
+ with pytest.raises(Error) as err:
+ server.recv(1024)
# We don't want WantReadError or ZeroReturnError or anything - it's a
# handshake failure.
- assert type(e) in [Error, SysCallError]
+ assert type(err.value) in [Error, SysCallError]
- def test_unexpectedEndOfFile(self):
+ def test_unexpected_EOF(self):
"""
If the connection is lost before an orderly SSL shutdown occurs,
- :py:obj:`OpenSSL.SSL.SysCallError` is raised with a message of
+ `OpenSSL.SSL.SysCallError` is raised with a message of
"Unexpected EOF".
"""
- server_conn, client_conn = self._loopback()
+ server_conn, client_conn = loopback()
client_conn.sock_shutdown(SHUT_RDWR)
- exc = self.assertRaises(SysCallError, server_conn.recv, 1024)
- self.assertEqual(exc.args, (-1, "Unexpected EOF"))
+ with pytest.raises(SysCallError) as err:
+ server_conn.recv(1024)
+ assert err.value.args == (-1, "Unexpected EOF")
def _check_client_ca_list(self, func):
"""
- Verify the return value of the :py:obj:`get_client_ca_list` method for
+ Verify the return value of the `get_client_ca_list` method for
server and client connections.
:param func: A function which will be called with the server context
before the client and server are connected to each other. This
function should specify a list of CAs for the server to send to the
client and return that same list. The list will be used to verify
- that :py:obj:`get_client_ca_list` returns the proper value at
+ that `get_client_ca_list` returns the proper value at
various times.
"""
server = self._server(None)
client = self._client(None)
- self.assertEqual(client.get_client_ca_list(), [])
- self.assertEqual(server.get_client_ca_list(), [])
+ assert client.get_client_ca_list() == []
+ assert server.get_client_ca_list() == []
ctx = server.get_context()
expected = func(ctx)
- self.assertEqual(client.get_client_ca_list(), [])
- self.assertEqual(server.get_client_ca_list(), expected)
- self._interactInMemory(client, server)
- self.assertEqual(client.get_client_ca_list(), expected)
- self.assertEqual(server.get_client_ca_list(), expected)
+ assert client.get_client_ca_list() == []
+ assert server.get_client_ca_list() == expected
+ interact_in_memory(client, server)
+ assert client.get_client_ca_list() == expected
+ assert server.get_client_ca_list() == expected
def test_set_client_ca_list_errors(self):
"""
- :py:obj:`Context.set_client_ca_list` raises a :py:obj:`TypeError` if
- called with a non-list or a list that contains objects other than
- X509Names.
+ `Context.set_client_ca_list` raises a `TypeError` if called with a
+ non-list or a list that contains objects other than X509Names.
"""
ctx = Context(TLSv1_METHOD)
- self.assertRaises(TypeError, ctx.set_client_ca_list, "spam")
- self.assertRaises(TypeError, ctx.set_client_ca_list, ["spam"])
- self.assertIdentical(ctx.set_client_ca_list([]), None)
+ with pytest.raises(TypeError):
+ ctx.set_client_ca_list("spam")
+ with pytest.raises(TypeError):
+ ctx.set_client_ca_list(["spam"])
def test_set_empty_ca_list(self):
"""
- If passed an empty list, :py:obj:`Context.set_client_ca_list`
- configures the context to send no CA names to the client and, on both
- the server and client sides, :py:obj:`Connection.get_client_ca_list`
- returns an empty list after the connection is set up.
+ If passed an empty list, `Context.set_client_ca_list` configures the
+ context to send no CA names to the client and, on both the server and
+ client sides, `Connection.get_client_ca_list` returns an empty list
+ after the connection is set up.
"""
def no_ca(ctx):
ctx.set_client_ca_list([])
@@ -3419,9 +3350,9 @@ class MemoryBIOTests(TestCase, _LoopbackMixin):
def test_set_one_ca_list(self):
"""
If passed a list containing a single X509Name,
- :py:obj:`Context.set_client_ca_list` configures the context to send
+ `Context.set_client_ca_list` configures the context to send
that CA name to the client and, on both the server and client sides,
- :py:obj:`Connection.get_client_ca_list` returns a list containing that
+ `Connection.get_client_ca_list` returns a list containing that
X509Name after the connection is set up.
"""
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
@@ -3435,9 +3366,9 @@ class MemoryBIOTests(TestCase, _LoopbackMixin):
def test_set_multiple_ca_list(self):
"""
If passed a list containing multiple X509Name objects,
- :py:obj:`Context.set_client_ca_list` configures the context to send
+ `Context.set_client_ca_list` configures the context to send
those CA names to the client and, on both the server and client sides,
- :py:obj:`Connection.get_client_ca_list` returns a list containing those
+ `Connection.get_client_ca_list` returns a list containing those
X509Names after the connection is set up.
"""
secert = load_certificate(FILETYPE_PEM, server_cert_pem)
@@ -3455,7 +3386,7 @@ class MemoryBIOTests(TestCase, _LoopbackMixin):
def test_reset_ca_list(self):
"""
If called multiple times, only the X509Names passed to the final call
- of :py:obj:`Context.set_client_ca_list` are used to configure the CA
+ of `Context.set_client_ca_list` are used to configure the CA
names sent to the client.
"""
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
@@ -3474,7 +3405,7 @@ class MemoryBIOTests(TestCase, _LoopbackMixin):
def test_mutated_ca_list(self):
"""
- If the list passed to :py:obj:`Context.set_client_ca_list` is mutated
+ If the list passed to `Context.set_client_ca_list` is mutated
afterwards, this does not affect the list of CA names sent to the
client.
"""
@@ -3491,21 +3422,19 @@ class MemoryBIOTests(TestCase, _LoopbackMixin):
return [cadesc]
self._check_client_ca_list(mutated_ca)
- def test_add_client_ca_errors(self):
+ def test_add_client_ca_wrong_args(self):
"""
- :py:obj:`Context.add_client_ca` raises :py:obj:`TypeError` if called
- with a non-X509 object or with a number of arguments other than one.
+ `Context.add_client_ca` raises `TypeError` if called with
+ a non-X509 object.
"""
ctx = Context(TLSv1_METHOD)
- cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
- self.assertRaises(TypeError, ctx.add_client_ca)
- self.assertRaises(TypeError, ctx.add_client_ca, "spam")
- self.assertRaises(TypeError, ctx.add_client_ca, cacert, cacert)
+ with pytest.raises(TypeError):
+ ctx.add_client_ca("spam")
def test_one_add_client_ca(self):
"""
A certificate's subject can be added as a CA to be sent to the client
- with :py:obj:`Context.add_client_ca`.
+ with `Context.add_client_ca`.
"""
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
cadesc = cacert.get_subject()
@@ -3518,7 +3447,7 @@ class MemoryBIOTests(TestCase, _LoopbackMixin):
def test_multiple_add_client_ca(self):
"""
Multiple CA names can be sent to the client by calling
- :py:obj:`Context.add_client_ca` with multiple X509 objects.
+ `Context.add_client_ca` with multiple X509 objects.
"""
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
secert = load_certificate(FILETYPE_PEM, server_cert_pem)
@@ -3534,8 +3463,8 @@ class MemoryBIOTests(TestCase, _LoopbackMixin):
def test_set_and_add_client_ca(self):
"""
- A call to :py:obj:`Context.set_client_ca_list` followed by a call to
- :py:obj:`Context.add_client_ca` results in using the CA names from the
+ A call to `Context.set_client_ca_list` followed by a call to
+ `Context.add_client_ca` results in using the CA names from the
first call and the CA name from the second call.
"""
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
@@ -3554,8 +3483,8 @@ class MemoryBIOTests(TestCase, _LoopbackMixin):
def test_set_after_add_client_ca(self):
"""
- A call to :py:obj:`Context.set_client_ca_list` after a call to
- :py:obj:`Context.add_client_ca` replaces the CA name specified by the
+ A call to `Context.set_client_ca_list` after a call to
+ `Context.add_client_ca` replaces the CA name specified by the
former call with the names specified by the latter call.
"""
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
@@ -3573,7 +3502,7 @@ class MemoryBIOTests(TestCase, _LoopbackMixin):
self._check_client_ca_list(set_replaces_add_ca)
-class InfoConstantTests(TestCase):
+class TestInfoConstants(object):
"""
Tests for assorted constants exposed for use in info callbacks.
"""
@@ -3642,7 +3571,7 @@ class TestRequires(object):
assert results == []
-class TestOCSP(_LoopbackMixin):
+class TestOCSP(object):
"""
Tests for PyOpenSSL's OCSP stapling support.
"""
@@ -3699,7 +3628,7 @@ class TestOCSP(_LoopbackMixin):
callback=ocsp_callback, data=None, request_ocsp=False
)
server = self._server_connection(callback=ocsp_callback, data=None)
- self._handshakeInMemory(client, server)
+ handshake_in_memory(client, server)
assert not called
@@ -3715,8 +3644,8 @@ class TestOCSP(_LoopbackMixin):
return True
client = self._client_connection(callback=ocsp_callback, data=None)
- server = self._loopbackServerFactory(socket=None)
- self._handshakeInMemory(client, server)
+ server = loopback_server_factory(socket=None)
+ handshake_in_memory(client, server)
assert len(called) == 1
assert called[0] == b''
@@ -3736,7 +3665,7 @@ class TestOCSP(_LoopbackMixin):
client = self._client_connection(callback=client_callback, data=None)
server = self._server_connection(callback=server_callback, data=None)
- self._handshakeInMemory(client, server)
+ handshake_in_memory(client, server)
assert len(calls) == 1
assert calls[0] == self.sample_ocsp_data
@@ -3758,7 +3687,7 @@ class TestOCSP(_LoopbackMixin):
client = self._client_connection(callback=client_callback, data=None)
server = self._server_connection(callback=server_callback, data=None)
- self._handshakeInMemory(client, server)
+ handshake_in_memory(client, server)
assert len(client_calls) == 1
assert len(server_calls) == 1
@@ -3788,7 +3717,7 @@ class TestOCSP(_LoopbackMixin):
server = self._server_connection(
callback=server_callback, data=sentinel
)
- self._handshakeInMemory(client, server)
+ handshake_in_memory(client, server)
assert len(calls) == 2
assert calls[0][-1] is sentinel
@@ -3810,7 +3739,7 @@ class TestOCSP(_LoopbackMixin):
client = self._client_connection(callback=client_callback, data=None)
server = self._server_connection(callback=server_callback, data=None)
- self._handshakeInMemory(client, server)
+ handshake_in_memory(client, server)
assert len(client_calls) == 1
assert client_calls[0] == b''
@@ -3829,7 +3758,7 @@ class TestOCSP(_LoopbackMixin):
server = self._server_connection(callback=server_callback, data=None)
with pytest.raises(Error):
- self._handshakeInMemory(client, server)
+ handshake_in_memory(client, server)
def test_exceptions_in_client_bubble_up(self):
"""
@@ -3848,7 +3777,7 @@ class TestOCSP(_LoopbackMixin):
server = self._server_connection(callback=server_callback, data=None)
with pytest.raises(SentinelException):
- self._handshakeInMemory(client, server)
+ handshake_in_memory(client, server)
def test_exceptions_in_server_bubble_up(self):
"""
@@ -3867,7 +3796,7 @@ class TestOCSP(_LoopbackMixin):
server = self._server_connection(callback=server_callback, data=None)
with pytest.raises(SentinelException):
- self._handshakeInMemory(client, server)
+ handshake_in_memory(client, server)
def test_server_must_return_bytes(self):
"""
@@ -3883,4 +3812,4 @@ class TestOCSP(_LoopbackMixin):
server = self._server_connection(callback=server_callback, data=None)
with pytest.raises(TypeError):
- self._handshakeInMemory(client, server)
+ handshake_in_memory(client, server)
diff --git a/tests/util.py b/tests/util.py
index e1b8656..4464379 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,313 +1,18 @@
# Copyright (C) Jean-Paul Calderone
# Copyright (C) Twisted Matrix Laboratories.
# See LICENSE for details.
-
"""
Helpers for the OpenSSL test suite, largely copied from
U{Twisted<http://twistedmatrix.com/>}.
"""
-import shutil
-import sys
-import traceback
-
-from tempfile import mktemp, mkdtemp
-from unittest import TestCase
-
-import pytest
-
from six import PY3
-from OpenSSL._util import exception_from_error_queue
-from OpenSSL.crypto import Error
-
-
-from . import memdbg
-
-from OpenSSL._util import ffi, lib
-
# This is the UTF-8 encoding of the SNOWMAN unicode code point.
NON_ASCII = b"\xe2\x98\x83".decode("utf-8")
-class TestCase(TestCase):
- """
- :py:class:`TestCase` adds useful testing functionality beyond what is
- available from the standard library :py:class:`unittest.TestCase`.
- """
-
- def run(self, result):
- run = super(TestCase, self).run
- if memdbg.heap is None:
- return run(result)
-
- # Run the test as usual
- before = set(memdbg.heap)
- run(result)
-
- # Clean up some long-lived allocations so they won't be reported as
- # memory leaks.
- lib.CRYPTO_cleanup_all_ex_data()
- lib.ERR_clear_error()
- after = set(memdbg.heap)
-
- if not after - before:
- # No leaks, fast succeed
- return
-
- if result.wasSuccessful():
- # If it passed, run it again with memory debugging
- before = set(memdbg.heap)
- run(result)
-
- # Clean up some long-lived allocations so they won't be reported as
- # memory leaks.
- lib.CRYPTO_cleanup_all_ex_data()
- lib.ERR_clear_error()
-
- after = set(memdbg.heap)
-
- self._reportLeaks(after - before, result)
-
- def _reportLeaks(self, leaks, result):
- def format_leak(p):
- """
- c_stack looks something like this (interesting parts indicated
- with inserted arrows not part of the data):
-
- cpython/2.7/python(PyCFunction_Call+0x8b) [0x56265a]
- cpython/2.7/python() [0x4d5f52]
- cpython/2.7/python(PyEval_EvalFrameEx+0x753b) [0x4d0e1e]
- cpython/2.7/python() [0x4d6419]
- cpython/2.7/python() [0x4d6129]
- cpython/2.7/python(PyEval_EvalFrameEx+0x753b) [0x4d0e1e]
- cpython/2.7/python(PyEval_EvalCodeEx+0x1043) [0x4d3726]
- cpython/2.7/python() [0x55fd51]
- cpython/2.7/python(PyObject_Call+0x7e) [0x420ee6]
- cpython/2.7/python(PyEval_CallObjectWithKeywords+0x158) [0x4d56ec]
- _cffi_backend.so(+0xe96e) [0x7fe2e38be96e]
- libffi.so.6(ffi_closure_unix64_inner+0x1b9) [0x7fe2e36ad819]
- libffi.so.6(ffi_closure_unix64+0x46) [0x7fe2e36adb7c]
-
- |----- end interesting
- v
- libcrypto.so.1.0.0(CRYPTO_malloc+0x64) [0x7fe2e1cef784]
- libcrypto.so.1.0.0(lh_insert+0x16b) [0x7fe2e1d6a24b]
- libcrypto.so.1.0.0(+0x61c18) [0x7fe2e1cf0c18]
- libcrypto.so.1.0.0(+0x625ec) [0x7fe2e1cf15ec]
- libcrypto.so.1.0.0(DSA_new_method+0xe6) [0x7fe2e1d524d6]
- libcrypto.so.1.0.0(DSA_generate_parameters+0x3a) [0x7fe2e1d5364a]
- ^
- |----- begin interesting
-
- _cffi__x305d4698xb539baaa.so(+0x1f397) [0x7fe2df84d397]
- cpython/2.7/python(PyCFunction_Call+0x8b) [0x56265a]
- cpython/2.7/python() [0x4d5f52]
- cpython/2.7/python(PyEval_EvalFrameEx+0x753b) [0x4d0e1e]
- cpython/2.7/python() [0x4d6419]
- ...
-
- Notice the stack is upside down compared to a Python traceback.
- Identify the start and end of interesting bits and stuff it into
- the stack we report.
- """
- stacks = memdbg.heap[p]
- # Eventually look at multiple stacks for the realloc() case. For
- # now just look at the original allocation location.
- (size, python_stack, c_stack) = stacks[0]
-
- stack = traceback.format_list(python_stack)[:-1]
- saved = list(c_stack)
-
- # Figure the first interesting frame will be after a the
- # cffi-compiled module
- while c_stack and '/__pycache__/_cffi__' not in c_stack[-1]:
- c_stack.pop()
-
- # Figure the last interesting frame will always be CRYPTO_malloc,
- # since that's where we hooked in to things.
- while (
- c_stack and 'CRYPTO_malloc' not in c_stack[0] and
- 'CRYPTO_realloc' not in c_stack[0]
- ):
- c_stack.pop(0)
-
- if c_stack:
- c_stack.reverse()
- else:
- c_stack = saved[::-1]
- stack.extend([frame + "\n" for frame in c_stack])
-
- stack.insert(0, "Leaked (%s) at:\n")
- return "".join(stack)
-
- if leaks:
- unique_leaks = {}
- for p in leaks:
- size = memdbg.heap[p][-1][0]
- new_leak = format_leak(p)
- if new_leak not in unique_leaks:
- unique_leaks[new_leak] = [(size, p)]
- else:
- unique_leaks[new_leak].append((size, p))
- memdbg.free(p)
-
- for (stack, allocs) in unique_leaks.iteritems():
- allocs_accum = []
- for (size, pointer) in allocs:
-
- addr = int(ffi.cast('uintptr_t', pointer))
- allocs_accum.append("%d@0x%x" % (size, addr))
- allocs_report = ", ".join(sorted(allocs_accum))
-
- result.addError(
- self,
- (None, Exception(stack % (allocs_report,)), None))
-
- _tmpdir = None
-
- @property
- def tmpdir(self):
- """
- On demand create a temporary directory.
- """
- if self._tmpdir is not None:
- return self._tmpdir
-
- self._tmpdir = mkdtemp(dir=".")
- return self._tmpdir
-
- def tearDown(self):
- """
- Clean up any files or directories created using
- :py:meth:`TestCase.mktemp`. Subclasses must invoke this method if they
- override it or the cleanup will not occur.
- """
- if self._tmpdir is not None:
- shutil.rmtree(self._tmpdir)
-
- try:
- exception_from_error_queue(Error)
- except Error:
- e = sys.exc_info()[1]
- if e.args != ([],):
- self.fail(
- "Left over errors in OpenSSL error queue: " + repr(e)
- )
-
- def assertIsInstance(self, instance, classOrTuple, message=None):
- """
- Fail if C{instance} is not an instance of the given class or of
- one of the given classes.
-
- @param instance: the object to test the type (first argument of the
- C{isinstance} call).
- @type instance: any.
- @param classOrTuple: the class or classes to test against (second
- argument of the C{isinstance} call).
- @type classOrTuple: class, type, or tuple.
-
- @param message: Custom text to include in the exception text if the
- assertion fails.
- """
- assert isinstance(instance, classOrTuple)
-
- def failUnlessIn(self, containee, container, msg=None):
- """
- Fail the test if :py:data:`containee` is not found in
- :py:data:`container`.
-
- :param containee: the value that should be in :py:class:`container`
- :param container: a sequence type, or in the case of a mapping type,
- will follow semantics of 'if key in dict.keys()'
- :param msg: if msg is None, then the failure message will be
- '%r not in %r' % (first, second)
- """
- assert containee in container
- assertIn = failUnlessIn
-
- def assertNotIn(self, containee, container, msg=None):
- """
- Fail the test if C{containee} is found in C{container}.
-
- @param containee: the value that should not be in C{container}
- @param container: a sequence type, or in the case of a mapping type,
- will follow semantics of 'if key in dict.keys()'
- @param msg: if msg is None, then the failure message will be
- '%r in %r' % (first, second)
- """
- assert containee not in container
- failIfIn = assertNotIn
-
- def assertIs(self, first, second, msg=None):
- """
- Fail the test if :py:data:`first` is not :py:data:`second`. This is an
- obect-identity-equality test, not an object equality
- (i.e. :py:func:`__eq__`) test.
-
- :param msg: if msg is None, then the failure message will be
- '%r is not %r' % (first, second)
- """
- assert first is second
- assertIdentical = failUnlessIdentical = assertIs
-
- def assertIsNot(self, first, second, msg=None):
- """
- Fail the test if :py:data:`first` is :py:data:`second`. This is an
- obect-identity-equality test, not an object equality
- (i.e. :py:func:`__eq__`) test.
-
- :param msg: if msg is None, then the failure message will be
- '%r is %r' % (first, second)
- """
- assert first is not second
- assertNotIdentical = failIfIdentical = assertIsNot
-
- def failUnlessRaises(self, exception, f, *args, **kwargs):
- """
- Fail the test unless calling the function :py:data:`f` with the given
- :py:data:`args` and :py:data:`kwargs` raises :py:data:`exception`. The
- failure will report the traceback and call stack of the unexpected
- exception.
-
- :param exception: exception type that is to be expected
- :param f: the function to call
-
- :return: The raised exception instance, if it is of the given type.
- :raise self.failureException: Raised if the function call does
- not raise an exception or if it raises an exception of a
- different type.
- """
- with pytest.raises(exception) as cm:
- f(*args, **kwargs)
- return cm.value
- assertRaises = failUnlessRaises
-
- def mktemp(self):
- """
- Return UTF-8-encoded bytes of a path to a tmp file.
-
- The file will be cleaned up after the test run.
- """
- return mktemp(dir=self.tmpdir).encode("utf-8")
-
- # Other stuff
- def assertConsistentType(self, theType, name, *constructionArgs):
- """
- Perform various assertions about :py:data:`theType` to ensure that it
- is a well-defined type. This is useful for extension types, where it's
- pretty easy to do something wacky. If something about the type is
- unusual, an exception will be raised.
-
- :param theType: The type object about which to make assertions.
- :param name: A string giving the name of the type.
- :param constructionArgs: Positional arguments to use with
- :py:data:`theType` to create an instance of it.
- """
- assert is_consistent_type(theType, name, *constructionArgs)
-
-
def is_consistent_type(theType, name, *constructionArgs):
"""
Perform various assertions about *theType* to ensure that it is a