From 343a00e828d9d2d33998ccaf96dca0b9417f04af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mois=C3=A9s=20Guimar=C3=A3es=20de=20Medeiros?= Date: Tue, 22 Dec 2020 00:05:09 +0100 Subject: Fix _wrap_socket_sni (#347) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Change the default value of ssl_version to None. When not set, the proper value between ssl.PROTOCOL_TLS_CLIENT and ssl.PROTOCOL_TLS_SERVER will be selected based on the param server_side in order to create a TLS Context object with better defaults that fit the desired connection side. * Change the default value of cert_reqs to None. The default value of ctx.verify_mode is ssl.CERT_NONE, but when ssl.PROTOCOL_TLS_CLIENT is used, ctx.verify_mode defaults to ssl.CERT_REQUIRED. * Fix context.check_hostname logic. Checking the hostname depends on having support of the SNI TLS extension and being provided with a server_hostname value. Another important thing to mention is that enabling hostname checking automatically sets verify_mode from ssl.CERT_NONE to ssl.CERT_REQUIRED in the stdlib ssl and it cannot be set back to ssl.CERT_NONE as long as hostname checking is enabled. * Refactor the SNI tests to test one thing at a time and removing some tests that were being repeated over and over. Signed-off-by: Moisés Guimarães de Medeiros --- amqp/transport.py | 29 +++++++--- t/certs/ca_certificate.pem | 20 +++++++ t/integration/test_rmq.py | 10 +++- t/unit/test_transport.py | 141 +++++++++++++++++++-------------------------- 4 files changed, 105 insertions(+), 95 deletions(-) create mode 100644 t/certs/ca_certificate.pem diff --git a/amqp/transport.py b/amqp/transport.py index 2a7c190..ec100e5 100644 --- a/amqp/transport.py +++ b/amqp/transport.py @@ -436,10 +436,10 @@ class SSLTransport(_AbstractTransport): return ctx.wrap_socket(sock, **sslopts) def _wrap_socket_sni(self, sock, keyfile=None, certfile=None, - server_side=False, cert_reqs=ssl.CERT_NONE, + server_side=False, cert_reqs=None, ca_certs=None, do_handshake_on_connect=False, suppress_ragged_eofs=True, server_hostname=None, - ciphers=None, ssl_version=ssl.PROTOCOL_TLS): + ciphers=None, ssl_version=None): """Socket wrap with SNI headers. stdlib :attr:`ssl.SSLContext.wrap_socket` method augmented with support @@ -510,20 +510,31 @@ class SSLTransport(_AbstractTransport): 'server_hostname': server_hostname, } + if ssl_version is None: + ssl_version = ( + ssl.PROTOCOL_TLS_SERVER + if server_side + else ssl.PROTOCOL_TLS_CLIENT + ) + context = ssl.SSLContext(ssl_version) + if certfile is not None: context.load_cert_chain(certfile, keyfile) if ca_certs is not None: context.load_verify_locations(ca_certs) - if ciphers: + if ciphers is not None: context.set_ciphers(ciphers) - if cert_reqs != ssl.CERT_NONE: - context.check_hostname = True - # Set SNI headers if supported - if (server_hostname is not None) and ( - hasattr(ssl, 'HAS_SNI') and ssl.HAS_SNI) and ( - hasattr(ssl, 'SSLContext')): + if cert_reqs is not None: context.verify_mode = cert_reqs + # Set SNI headers if supported + try: + context.check_hostname = ( + ssl.HAS_SNI and server_hostname is not None + ) + except AttributeError: + pass # ask forgiveness not permission + sock = context.wrap_socket(**opts) return sock diff --git a/t/certs/ca_certificate.pem b/t/certs/ca_certificate.pem new file mode 100644 index 0000000..009936d --- /dev/null +++ b/t/certs/ca_certificate.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDRzCCAi+gAwIBAgIJAMa1mrcNQtapMA0GCSqGSIb3DQEBCwUAMDExIDAeBgNV +BAMMF1RMU0dlblNlbGZTaWduZWR0Um9vdENBMQ0wCwYDVQQHDAQkJCQkMCAXDTIw +MDEwMzEyMDE0MFoYDzIxMTkxMjEwMTIwMTQwWjAxMSAwHgYDVQQDDBdUTFNHZW5T +ZWxmU2lnbmVkdFJvb3RDQTENMAsGA1UEBwwEJCQkJDCCASIwDQYJKoZIhvcNAQEB +BQADggEPADCCAQoCggEBAKdmOg5vtuZ5vNZmceToiVBlcFg9Y/xKNyCPBij6Wm5p +mXbnsjO1PhjGr97r2cMLq5QMvGt+FBEIjeeULtWVCBY7vMc4ATEZ1S2PmmKnOSXJ +MLMDIutznopZkyqt3gqWgXZDxxHIlIzJl0HirQmfeLm6eTOYyFoyFZV3CE2IeW4Y +n1zYhgZgIrU7Yo3I7wY9Js5yLk4p3etByN5tlLL2sdCOjRRXWGbOh/kb8uiyotEd +cxNThk0RQDugoEzaGYBU3bzDhKkm4v/v/xp/JxGLDl/e3heRMUbcw9d/0ujflouy +OQ66SNYGLWFQpmhtyHjalKzL5UbTcof4BQltoo/W7xECAwEAAaNgMF4wCwYDVR0P +BAQDAgEGMB0GA1UdDgQWBBTKOnbaptqaUCAiwtnwLcRTcbuRejAfBgNVHSMEGDAW +gBTKOnbaptqaUCAiwtnwLcRTcbuRejAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3 +DQEBCwUAA4IBAQB1tJUR9zoQ98bOz1es91PxgIt8VYR8/r6uIRtYWTBi7fgDRaaR +Glm6ZqOSXNlkacB6kjUzIyKJwGWnD9zU/06CH+ME1U497SVVhvtUEbdJb1COU+/5 +KavEHVINfc3tHD5Z5LJR3okEILAzBYkEcjYUECzBNYVi4l6PBSMSC2+RBKGqHkY7 +ApmD5batRghH5YtadiyF4h6bba/XSUqxzFcLKjKSyyds4ndvA1/yfl/7CrRtiZf0 +jw1pFl33/PTOhgi66MHa4uaKlL/hIjIlh4kJgJajqCN+TVU4Q6JNmSuIsq6rksSw +Rd5baBZrik2NHALr/ZN2Wy0nXiQJ3p+F20+X +-----END CERTIFICATE----- diff --git a/t/integration/test_rmq.py b/t/integration/test_rmq.py index f6b26d1..746aa65 100644 --- a/t/integration/test_rmq.py +++ b/t/integration/test_rmq.py @@ -8,12 +8,15 @@ import amqp def get_connection( - hostname, port, vhost, use_tls=False, keyfile=None, certfile=None): + hostname, port, vhost, use_tls=False, + keyfile=None, certfile=None, ca_certs=None +): host = f'{hostname}:{port}' if use_tls: return amqp.Connection(host=host, vhost=vhost, ssl={ 'keyfile': keyfile, - 'certfile': certfile + 'certfile': certfile, + 'ca_certs': ca_certs, } ) else: @@ -40,7 +43,8 @@ def connection(request): ).get("slaveid", None), use_tls=True, keyfile='t/certs/client_key.pem', - certfile='t/certs/client_certificate.pem' + certfile='t/certs/client_certificate.pem', + ca_certs='t/certs/ca_certificate.pem', ) diff --git a/t/unit/test_transport.py b/t/unit/test_transport.py index ad2750e..7f8d78c 100644 --- a/t/unit/test_transport.py +++ b/t/unit/test_transport.py @@ -640,108 +640,87 @@ class test_SSLTransport: def test_wrap_socket_sni(self): # testing default values of _wrap_socket_sni() sock = Mock() - with patch( - 'ssl.SSLContext.wrap_socket', - return_value=sentinel.WRAPPED_SOCKET) as mock_ssl_wrap: + with patch('ssl.SSLContext') as mock_ssl_context_class: + wrap_socket_method_mock = mock_ssl_context_class().wrap_socket + wrap_socket_method_mock.return_value = sentinel.WRAPPED_SOCKET ret = self.t._wrap_socket_sni(sock) - mock_ssl_wrap.assert_called_with(sock=sock, - server_side=False, - do_handshake_on_connect=False, - suppress_ragged_eofs=True, - server_hostname=None) - + mock_ssl_context_class.load_cert_chain.assert_not_called() + mock_ssl_context_class.load_verify_locations.assert_not_called() + mock_ssl_context_class.set_ciphers.assert_not_called() + mock_ssl_context_class.verify_mode.assert_not_called() + wrap_socket_method_mock.assert_called_with( + sock=sock, + server_side=False, + do_handshake_on_connect=False, + suppress_ragged_eofs=True, + server_hostname=None + ) assert ret == sentinel.WRAPPED_SOCKET def test_wrap_socket_sni_certfile(self): # testing _wrap_socket_sni() with parameters certfile and keyfile - sock = Mock() - with patch( - 'ssl.SSLContext.wrap_socket', - return_value=sentinel.WRAPPED_SOCKET - ) as mock_ssl_wrap, patch( - 'ssl.SSLContext.load_cert_chain' - ) as mock_load_cert_chain: - ret = self.t._wrap_socket_sni( - sock, keyfile=sentinel.KEYFILE, certfile=sentinel.CERTFILE) - - mock_load_cert_chain.assert_called_with( - sentinel.CERTFILE, sentinel.KEYFILE) - mock_ssl_wrap.assert_called_with(sock=sock, - server_side=False, - do_handshake_on_connect=False, - suppress_ragged_eofs=True, - server_hostname=None) + with patch('ssl.SSLContext') as mock_ssl_context_class: + load_cert_chain_method_mock = \ + mock_ssl_context_class().load_cert_chain + self.t._wrap_socket_sni( + Mock(), keyfile=sentinel.KEYFILE, certfile=sentinel.CERTFILE + ) - assert ret == sentinel.WRAPPED_SOCKET + load_cert_chain_method_mock.assert_called_with( + sentinel.CERTFILE, sentinel.KEYFILE + ) def test_wrap_socket_ca_certs(self): # testing _wrap_socket_sni() with parameter ca_certs - sock = Mock() - with patch( - 'ssl.SSLContext.wrap_socket', - return_value=sentinel.WRAPPED_SOCKET - ) as mock_ssl_wrap, patch( - 'ssl.SSLContext.load_verify_locations' - ) as mock_load_verify_locations: - ret = self.t._wrap_socket_sni(sock, ca_certs=sentinel.CA_CERTS) - - mock_load_verify_locations.assert_called_with(sentinel.CA_CERTS) - mock_ssl_wrap.assert_called_with(sock=sock, - server_side=False, - do_handshake_on_connect=False, - suppress_ragged_eofs=True, - server_hostname=None) + with patch('ssl.SSLContext') as mock_ssl_context_class: + load_verify_locations_method_mock = \ + mock_ssl_context_class().load_verify_locations + self.t._wrap_socket_sni(Mock(), ca_certs=sentinel.CA_CERTS) - assert ret == sentinel.WRAPPED_SOCKET + load_verify_locations_method_mock.assert_called_with(sentinel.CA_CERTS) def test_wrap_socket_ciphers(self): # testing _wrap_socket_sni() with parameter ciphers - sock = Mock() - with patch( - 'ssl.SSLContext.wrap_socket', - return_value=sentinel.WRAPPED_SOCKET) as mock_ssl_wrap, \ - patch('ssl.SSLContext.set_ciphers') as mock_set_ciphers: - ret = self.t._wrap_socket_sni(sock, ciphers=sentinel.CIPHERS) - - mock_set_ciphers.assert_called_with(sentinel.CIPHERS) - mock_ssl_wrap.assert_called_with(sock=sock, - server_side=False, - do_handshake_on_connect=False, - suppress_ragged_eofs=True, - server_hostname=None) - assert ret == sentinel.WRAPPED_SOCKET + with patch('ssl.SSLContext') as mock_ssl_context_class: + set_ciphers_method_mock = mock_ssl_context_class().set_ciphers + self.t._wrap_socket_sni(Mock(), ciphers=sentinel.CIPHERS) + + set_ciphers_method_mock.assert_called_with(sentinel.CIPHERS) def test_wrap_socket_sni_cert_reqs(self): # testing _wrap_socket_sni() with parameter cert_reqs - sock = Mock() with patch('ssl.SSLContext') as mock_ssl_context_class: - wrap_socket_method_mock = mock_ssl_context_class().wrap_socket - wrap_socket_method_mock.return_value = sentinel.WRAPPED_SOCKET - ret = self.t._wrap_socket_sni(sock, cert_reqs=sentinel.CERT_REQS) + self.t._wrap_socket_sni(Mock(), cert_reqs=sentinel.CERT_REQS) - wrap_socket_method_mock.assert_called_with( - sock=sock, - server_side=False, - do_handshake_on_connect=False, - suppress_ragged_eofs=True, - server_hostname=None - ) - assert mock_ssl_context_class().check_hostname is True - assert ret == sentinel.WRAPPED_SOCKET + assert mock_ssl_context_class().verify_mode == sentinel.CERT_REQS def test_wrap_socket_sni_setting_sni_header(self): - # testing _wrap_socket_sni() with setting SNI header + # testing _wrap_socket_sni() without parameter server_hostname + # SSL module supports SNI + with patch('ssl.SSLContext') as mock_ssl_context_class, \ + patch('ssl.HAS_SNI', new=True): + self.t._wrap_socket_sni(Mock()) + + assert mock_ssl_context_class().check_hostname is False + + # SSL module does not support SNI + with patch('ssl.SSLContext') as mock_ssl_context_class, \ + patch('ssl.HAS_SNI', new=False): + self.t._wrap_socket_sni(Mock()) + + assert mock_ssl_context_class().check_hostname is False + + # testing _wrap_socket_sni() with parameter server_hostname sock = Mock() with patch('ssl.SSLContext') as mock_ssl_context_class, \ patch('ssl.HAS_SNI', new=True): # SSL module supports SNI wrap_socket_method_mock = mock_ssl_context_class().wrap_socket - wrap_socket_method_mock.return_value = sentinel.WRAPPED_SOCKET - ret = self.t._wrap_socket_sni( - sock, cert_reqs=sentinel.CERT_REQS, - server_hostname=sentinel.SERVER_HOSTNAME + self.t._wrap_socket_sni( + sock, server_hostname=sentinel.SERVER_HOSTNAME ) + wrap_socket_method_mock.assert_called_with( sock=sock, server_side=False, @@ -749,17 +728,14 @@ class test_SSLTransport: suppress_ragged_eofs=True, server_hostname=sentinel.SERVER_HOSTNAME ) - assert mock_ssl_context_class().verify_mode == sentinel.CERT_REQS - assert ret == sentinel.WRAPPED_SOCKET + assert mock_ssl_context_class().check_hostname is True with patch('ssl.SSLContext') as mock_ssl_context_class, \ patch('ssl.HAS_SNI', new=False): # SSL module does not support SNI wrap_socket_method_mock = mock_ssl_context_class().wrap_socket - wrap_socket_method_mock.return_value = sentinel.WRAPPED_SOCKET - ret = self.t._wrap_socket_sni( - sock, cert_reqs=sentinel.CERT_REQS, - server_hostname=sentinel.SERVER_HOSTNAME + self.t._wrap_socket_sni( + sock, server_hostname=sentinel.SERVER_HOSTNAME ) wrap_socket_method_mock.assert_called_with( sock=sock, @@ -768,8 +744,7 @@ class test_SSLTransport: suppress_ragged_eofs=True, server_hostname=sentinel.SERVER_HOSTNAME ) - assert mock_ssl_context_class().verify_mode != sentinel.CERT_REQS - assert ret == sentinel.WRAPPED_SOCKET + assert mock_ssl_context_class().check_hostname is False def test_shutdown_transport(self): self.t.sock = None -- cgit v1.2.1 From ac3065ed54db922905251980ff3d78df8b8e8254 Mon Sep 17 00:00:00 2001 From: "Asif Saif Uddin (Auvi)" Date: Tue, 19 Jan 2021 20:45:39 +0600 Subject: changlog entry for 5.0.3 --- Changelog | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/Changelog b/Changelog index 71932c6..587c94e 100644 --- a/Changelog +++ b/Changelog @@ -5,6 +5,35 @@ py-amqp is fork of amqplib used by Kombu containing additional features and impr The previous amqplib changelog is here: http://code.google.com/p/py-amqplib/source/browse/CHANGES +.. _version-5.0.3: + +5.0.3 +===== +:release-date: 2021-01-19 9:00 P.M UTC+6:00 +:release-by: Asif Saif Uddin + +- Change the default value of ssl_version to None. When not set, the + proper value between ssl.PROTOCOL_TLS_CLIENT and ssl.PROTOCOL_TLS_SERVER + will be selected based on the param server_side in order to create + a TLS Context object with better defaults that fit the desired + connection side. + +- Change the default value of cert_reqs to None. The default value + of ctx.verify_mode is ssl.CERT_NONE, but when ssl.PROTOCOL_TLS_CLIENT + is used, ctx.verify_mode defaults to ssl.CERT_REQUIRED. + +- Fix context.check_hostname logic. Checking the hostname depends on + having support of the SNI TLS extension and being provided with a + server_hostname value. Another important thing to mention is that + enabling hostname checking automatically sets verify_mode from + ssl.CERT_NONE to ssl.CERT_REQUIRED in the stdlib ssl and it cannot + be set back to ssl.CERT_NONE as long as hostname checking is enabled. + +- Refactor the SNI tests to test one thing at a time and removing some + tests that were being repeated over and over. + + + .. _version-5.0.2: 5.0.2 -- cgit v1.2.1 From 32d2b7eefab2f03cce2037374c7be9fb183547ca Mon Sep 17 00:00:00 2001 From: "Asif Saif Uddin (Auvi)" Date: Wed, 20 Jan 2021 02:18:55 +0600 Subject: =?UTF-8?q?Bump=20version:=205.0.2=20=E2=86=92=205.0.3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- README.rst | 2 +- amqp/__init__.py | 2 +- docs/includes/introduction.txt | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index b8b8b54..e04a9cc 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 5.0.2 +current_version = 5.0.3 commit = True tag = True parse = (?P\d+)\.(?P\d+)\.(?P\d+)(?P[a-z\d]+)? diff --git a/README.rst b/README.rst index 2a5a8aa..a2272e3 100644 --- a/README.rst +++ b/README.rst @@ -4,7 +4,7 @@ |build-status| |coverage| |license| |wheel| |pyversion| |pyimp| -:Version: 5.0.2 +:Version: 5.0.3 :Web: https://amqp.readthedocs.io/ :Download: https://pypi.org/project/amqp/ :Source: http://github.com/celery/py-amqp/ diff --git a/amqp/__init__.py b/amqp/__init__.py index eb59152..556a5aa 100644 --- a/amqp/__init__.py +++ b/amqp/__init__.py @@ -4,7 +4,7 @@ import re from collections import namedtuple -__version__ = '5.0.2' +__version__ = '5.0.3' __author__ = 'Barry Pederson' __maintainer__ = 'Asif Saif Uddin, Matus Valo' __contact__ = 'pyamqp@celeryproject.org' diff --git a/docs/includes/introduction.txt b/docs/includes/introduction.txt index 7248046..ca1b7de 100644 --- a/docs/includes/introduction.txt +++ b/docs/includes/introduction.txt @@ -1,4 +1,4 @@ -:Version: 5.0.2 +:Version: 5.0.3 :Web: https://amqp.readthedocs.io/ :Download: https://pypi.org/project/amqp/ :Source: http://github.com/celery/py-amqp/ -- cgit v1.2.1 From 0b8a832d32179d33152d886acd6f081f25ea4bf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mois=C3=A9s=20Guimar=C3=A3es=20de=20Medeiros?= Date: Wed, 27 Jan 2021 09:55:20 +0100 Subject: Add missing load_default_certs() call. (#350) Fixes: #349 --- amqp/transport.py | 8 +++ t/integration/test_rmq.py | 27 +++++++++ t/unit/test_transport.py | 145 ++++++++++++++++++++++++++++------------------ 3 files changed, 124 insertions(+), 56 deletions(-) diff --git a/amqp/transport.py b/amqp/transport.py index ec100e5..4130681 100644 --- a/amqp/transport.py +++ b/amqp/transport.py @@ -535,6 +535,14 @@ class SSLTransport(_AbstractTransport): except AttributeError: pass # ask forgiveness not permission + if ca_certs is None and context.verify_mode != ssl.CERT_NONE: + purpose = ( + ssl.Purpose.CLIENT_AUTH + if server_side + else ssl.Purpose.SERVER_AUTH + ) + context.load_default_certs(purpose) + sock = context.wrap_socket(**opts) return sock diff --git a/t/integration/test_rmq.py b/t/integration/test_rmq.py index 746aa65..d89a233 100644 --- a/t/integration/test_rmq.py +++ b/t/integration/test_rmq.py @@ -5,6 +5,7 @@ from unittest.mock import ANY, Mock import pytest import amqp +from amqp import transport def get_connection( @@ -73,6 +74,32 @@ def test_tls_connect_fails(): connection.connect() +@pytest.mark.env('rabbitmq') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +def test_tls_default_certs(): + # testing TLS connection against badssl.com with default certs + connection = transport.Transport( + host="tls-v1-2.badssl.com:1012", + ssl=True, + ) + assert type(connection) == transport.SSLTransport + connection.connect() + + +@pytest.mark.env('rabbitmq') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +def test_tls_no_default_certs_fails(): + # testing TLS connection fails against badssl.com without default certs + connection = transport.Transport( + host="tls-v1-2.badssl.com:1012", + ssl={ + "ca_certs": 't/certs/ca_certificate.pem', + }, + ) + with pytest.raises(ssl.SSLError): + connection.connect() + + @pytest.mark.env('rabbitmq') class test_rabbitmq_operations(): diff --git a/t/unit/test_transport.py b/t/unit/test_transport.py index 7f8d78c..f217fb6 100644 --- a/t/unit/test_transport.py +++ b/t/unit/test_transport.py @@ -1,6 +1,7 @@ import errno import os import re +import ssl import socket import struct from struct import pack @@ -639,112 +640,144 @@ class test_SSLTransport: def test_wrap_socket_sni(self): # testing default values of _wrap_socket_sni() - sock = Mock() with patch('ssl.SSLContext') as mock_ssl_context_class: - wrap_socket_method_mock = mock_ssl_context_class().wrap_socket - wrap_socket_method_mock.return_value = sentinel.WRAPPED_SOCKET + sock = Mock() + context = mock_ssl_context_class() + context.wrap_socket.return_value = sentinel.WRAPPED_SOCKET ret = self.t._wrap_socket_sni(sock) - mock_ssl_context_class.load_cert_chain.assert_not_called() - mock_ssl_context_class.load_verify_locations.assert_not_called() - mock_ssl_context_class.set_ciphers.assert_not_called() - mock_ssl_context_class.verify_mode.assert_not_called() - wrap_socket_method_mock.assert_called_with( - sock=sock, - server_side=False, - do_handshake_on_connect=False, - suppress_ragged_eofs=True, - server_hostname=None - ) - assert ret == sentinel.WRAPPED_SOCKET + context.load_cert_chain.assert_not_called() + context.load_verify_locations.assert_not_called() + context.set_ciphers.assert_not_called() + context.verify_mode.assert_not_called() + + context.load_default_certs.assert_called_with( + ssl.Purpose.SERVER_AUTH + ) + context.wrap_socket.assert_called_with( + sock=sock, + server_side=False, + do_handshake_on_connect=False, + suppress_ragged_eofs=True, + server_hostname=None + ) + assert ret == sentinel.WRAPPED_SOCKET def test_wrap_socket_sni_certfile(self): # testing _wrap_socket_sni() with parameters certfile and keyfile with patch('ssl.SSLContext') as mock_ssl_context_class: - load_cert_chain_method_mock = \ - mock_ssl_context_class().load_cert_chain + sock = Mock() + context = mock_ssl_context_class() self.t._wrap_socket_sni( - Mock(), keyfile=sentinel.KEYFILE, certfile=sentinel.CERTFILE + sock, keyfile=sentinel.KEYFILE, certfile=sentinel.CERTFILE ) - load_cert_chain_method_mock.assert_called_with( - sentinel.CERTFILE, sentinel.KEYFILE - ) + context.load_default_certs.assert_called_with( + ssl.Purpose.SERVER_AUTH + ) + context.load_cert_chain.assert_called_with( + sentinel.CERTFILE, sentinel.KEYFILE + ) def test_wrap_socket_ca_certs(self): # testing _wrap_socket_sni() with parameter ca_certs with patch('ssl.SSLContext') as mock_ssl_context_class: - load_verify_locations_method_mock = \ - mock_ssl_context_class().load_verify_locations - self.t._wrap_socket_sni(Mock(), ca_certs=sentinel.CA_CERTS) + sock = Mock() + context = mock_ssl_context_class() + self.t._wrap_socket_sni(sock, ca_certs=sentinel.CA_CERTS) - load_verify_locations_method_mock.assert_called_with(sentinel.CA_CERTS) + context.load_default_certs.assert_not_called() + context.load_verify_locations.assert_called_with(sentinel.CA_CERTS) def test_wrap_socket_ciphers(self): # testing _wrap_socket_sni() with parameter ciphers with patch('ssl.SSLContext') as mock_ssl_context_class: - set_ciphers_method_mock = mock_ssl_context_class().set_ciphers - self.t._wrap_socket_sni(Mock(), ciphers=sentinel.CIPHERS) + sock = Mock() + context = mock_ssl_context_class() + set_ciphers_method_mock = context.set_ciphers + self.t._wrap_socket_sni(sock, ciphers=sentinel.CIPHERS) - set_ciphers_method_mock.assert_called_with(sentinel.CIPHERS) + set_ciphers_method_mock.assert_called_with(sentinel.CIPHERS) def test_wrap_socket_sni_cert_reqs(self): - # testing _wrap_socket_sni() with parameter cert_reqs + # testing _wrap_socket_sni() with parameter cert_reqs == ssl.CERT_NONE + with patch('ssl.SSLContext') as mock_ssl_context_class: + sock = Mock() + context = mock_ssl_context_class() + self.t._wrap_socket_sni(sock, cert_reqs=ssl.CERT_NONE) + + context.load_default_certs.assert_not_called() + assert context.verify_mode == ssl.CERT_NONE + + # testing _wrap_socket_sni() with parameter cert_reqs != ssl.CERT_NONE with patch('ssl.SSLContext') as mock_ssl_context_class: - self.t._wrap_socket_sni(Mock(), cert_reqs=sentinel.CERT_REQS) + sock = Mock() + context = mock_ssl_context_class() + self.t._wrap_socket_sni(sock, cert_reqs=sentinel.CERT_REQS) - assert mock_ssl_context_class().verify_mode == sentinel.CERT_REQS + context.load_default_certs.assert_called_with( + ssl.Purpose.SERVER_AUTH + ) + assert context.verify_mode == sentinel.CERT_REQS def test_wrap_socket_sni_setting_sni_header(self): # testing _wrap_socket_sni() without parameter server_hostname + # SSL module supports SNI with patch('ssl.SSLContext') as mock_ssl_context_class, \ patch('ssl.HAS_SNI', new=True): - self.t._wrap_socket_sni(Mock()) + sock = Mock() + context = mock_ssl_context_class() + self.t._wrap_socket_sni(sock) - assert mock_ssl_context_class().check_hostname is False + assert context.check_hostname is False # SSL module does not support SNI with patch('ssl.SSLContext') as mock_ssl_context_class, \ patch('ssl.HAS_SNI', new=False): - self.t._wrap_socket_sni(Mock()) + sock = Mock() + context = mock_ssl_context_class() + self.t._wrap_socket_sni(sock) - assert mock_ssl_context_class().check_hostname is False + assert context.check_hostname is False # testing _wrap_socket_sni() with parameter server_hostname - sock = Mock() + + # SSL module supports SNI with patch('ssl.SSLContext') as mock_ssl_context_class, \ patch('ssl.HAS_SNI', new=True): - # SSL module supports SNI - wrap_socket_method_mock = mock_ssl_context_class().wrap_socket + sock = Mock() + context = mock_ssl_context_class() self.t._wrap_socket_sni( sock, server_hostname=sentinel.SERVER_HOSTNAME ) - wrap_socket_method_mock.assert_called_with( - sock=sock, - server_side=False, - do_handshake_on_connect=False, - suppress_ragged_eofs=True, - server_hostname=sentinel.SERVER_HOSTNAME - ) - assert mock_ssl_context_class().check_hostname is True + context.wrap_socket.assert_called_with( + sock=sock, + server_side=False, + do_handshake_on_connect=False, + suppress_ragged_eofs=True, + server_hostname=sentinel.SERVER_HOSTNAME + ) + assert context.check_hostname is True + # SSL module does not support SNI with patch('ssl.SSLContext') as mock_ssl_context_class, \ patch('ssl.HAS_SNI', new=False): - # SSL module does not support SNI - wrap_socket_method_mock = mock_ssl_context_class().wrap_socket + sock = Mock() + context = mock_ssl_context_class() self.t._wrap_socket_sni( sock, server_hostname=sentinel.SERVER_HOSTNAME ) - wrap_socket_method_mock.assert_called_with( - sock=sock, - server_side=False, - do_handshake_on_connect=False, - suppress_ragged_eofs=True, - server_hostname=sentinel.SERVER_HOSTNAME - ) - assert mock_ssl_context_class().check_hostname is False + + context.wrap_socket.assert_called_with( + sock=sock, + server_side=False, + do_handshake_on_connect=False, + suppress_ragged_eofs=True, + server_hostname=sentinel.SERVER_HOSTNAME + ) + assert context.check_hostname is False def test_shutdown_transport(self): self.t.sock = None -- cgit v1.2.1