diff options
Diffstat (limited to 'tests/unittests/config/test_cc_ca_certs.py')
-rw-r--r-- | tests/unittests/config/test_cc_ca_certs.py | 326 |
1 files changed, 236 insertions, 90 deletions
diff --git a/tests/unittests/config/test_cc_ca_certs.py b/tests/unittests/config/test_cc_ca_certs.py index 91b005d0..39614635 100644 --- a/tests/unittests/config/test_cc_ca_certs.py +++ b/tests/unittests/config/test_cc_ca_certs.py @@ -1,18 +1,22 @@ # This file is part of cloud-init. See LICENSE file for license information. import logging +import re import shutil import tempfile import unittest from contextlib import ExitStack from unittest import mock -from cloudinit import distros -from cloudinit.config import cc_ca_certs -from cloudinit import helpers -from cloudinit import subp -from cloudinit import util -from tests.unittests.helpers import TestCase +import pytest +from cloudinit import distros, helpers, subp, util +from cloudinit.config import cc_ca_certs +from cloudinit.config.schema import ( + SchemaValidationError, + get_schema, + validate_cloudconfig_schema, +) +from tests.unittests.helpers import TestCase, skipUnlessJsonSchema from tests.unittests.util import get_cloud @@ -31,12 +35,15 @@ class TestNoConfig(unittest.TestCase): config = util.get_builtin_cfg() with ExitStack() as mocks: util_mock = mocks.enter_context( - mock.patch.object(util, 'write_file')) + mock.patch.object(util, "write_file") + ) certs_mock = mocks.enter_context( - mock.patch.object(cc_ca_certs, 'update_ca_certs')) + mock.patch.object(cc_ca_certs, "update_ca_certs") + ) - cc_ca_certs.handle(self.name, config, self.cloud_init, self.log, - self.args) + cc_ca_certs.handle( + self.name, config, self.cloud_init, self.log, self.args + ) self.assertEqual(util_mock.call_count, 0) self.assertEqual(certs_mock.call_count, 0) @@ -61,11 +68,14 @@ class TestConfig(TestCase): # Mock out the functions that actually modify the system self.mock_add = self.mocks.enter_context( - mock.patch.object(cc_ca_certs, 'add_ca_certs')) + mock.patch.object(cc_ca_certs, "add_ca_certs") + ) self.mock_update = self.mocks.enter_context( - mock.patch.object(cc_ca_certs, 'update_ca_certs')) + mock.patch.object(cc_ca_certs, "update_ca_certs") + ) self.mock_remove = self.mocks.enter_context( - mock.patch.object(cc_ca_certs, 'remove_default_ca_certs')) + mock.patch.object(cc_ca_certs, "remove_default_ca_certs") + ) def test_no_trusted_list(self): """ @@ -106,7 +116,7 @@ class TestConfig(TestCase): conf = cc_ca_certs._distro_ca_certs_configs(distro_name) cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.mock_add.assert_called_once_with(conf, ['CERT1']) + self.mock_add.assert_called_once_with(conf, ["CERT1"]) self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 0) @@ -120,13 +130,13 @@ class TestConfig(TestCase): conf = cc_ca_certs._distro_ca_certs_configs(distro_name) cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.mock_add.assert_called_once_with(conf, ['CERT1', 'CERT2']) + self.mock_add.assert_called_once_with(conf, ["CERT1", "CERT2"]) self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 0) def test_remove_default_ca_certs(self): """Test remove_defaults works as expected.""" - config = {"ca-certs": {"remove-defaults": True}} + config = {"ca_certs": {"remove_defaults": True}} for distro_name in cc_ca_certs.distros: self._mock_init() @@ -139,7 +149,7 @@ class TestConfig(TestCase): def test_no_remove_defaults_if_false(self): """Test remove_defaults is not called when config value is False.""" - config = {"ca-certs": {"remove-defaults": False}} + config = {"ca_certs": {"remove_defaults": False}} for distro_name in cc_ca_certs.distros: self._mock_init() @@ -152,7 +162,7 @@ class TestConfig(TestCase): def test_correct_order_for_remove_then_add(self): """Test remove_defaults is not called when config value is False.""" - config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}} + config = {"ca_certs": {"remove_defaults": True, "trusted": ["CERT1"]}} for distro_name in cc_ca_certs.distros: self._mock_init() @@ -160,20 +170,21 @@ class TestConfig(TestCase): conf = cc_ca_certs._distro_ca_certs_configs(distro_name) cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.mock_add.assert_called_once_with(conf, ['CERT1']) + self.mock_add.assert_called_once_with(conf, ["CERT1"]) self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 1) class TestAddCaCerts(TestCase): - def setUp(self): super(TestAddCaCerts, self).setUp() tmpdir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, tmpdir) - self.paths = helpers.Paths({ - 'cloud_dir': tmpdir, - }) + self.paths = helpers.Paths( + { + "cloud_dir": tmpdir, + } + ) self.add_patch("cloudinit.config.cc_ca_certs.os.stat", "m_stat") def _fetch_distro(self, kind): @@ -185,7 +196,7 @@ class TestAddCaCerts(TestCase): """Test that no certificate are written if not provided.""" for distro_name in cc_ca_certs.distros: conf = cc_ca_certs._distro_ca_certs_configs(distro_name) - with mock.patch.object(util, 'write_file') as mockobj: + with mock.patch.object(util, "write_file") as mockobj: cc_ca_certs.add_ca_certs(conf, []) self.assertEqual(mockobj.call_count, 0) @@ -204,21 +215,28 @@ class TestAddCaCerts(TestCase): with ExitStack() as mocks: mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) + mock.patch.object(util, "write_file") + ) mock_load = mocks.enter_context( - mock.patch.object(util, 'load_file', - return_value=ca_certs_content)) + mock.patch.object( + util, "load_file", return_value=ca_certs_content + ) + ) cc_ca_certs.add_ca_certs(conf, [cert]) - mock_write.assert_has_calls([ - mock.call(conf['ca_cert_full_path'], - cert, mode=0o644)]) - if conf['ca_cert_config'] is not None: - mock_write.assert_has_calls([ - mock.call(conf['ca_cert_config'], - expected, omode="wb")]) - mock_load.assert_called_once_with(conf['ca_cert_config']) + mock_write.assert_has_calls( + [mock.call(conf["ca_cert_full_path"], cert, mode=0o644)] + ) + if conf["ca_cert_config"] is not None: + mock_write.assert_has_calls( + [ + mock.call( + conf["ca_cert_config"], expected, omode="wb" + ) + ] + ) + mock_load.assert_called_once_with(conf["ca_cert_config"]) def test_single_cert_no_trailing_cr(self): """Test adding a single certificate to the trusted CAs @@ -234,24 +252,32 @@ class TestAddCaCerts(TestCase): with ExitStack() as mocks: mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) + mock.patch.object(util, "write_file") + ) mock_load = mocks.enter_context( - mock.patch.object(util, 'load_file', - return_value=ca_certs_content)) + mock.patch.object( + util, "load_file", return_value=ca_certs_content + ) + ) cc_ca_certs.add_ca_certs(conf, [cert]) - mock_write.assert_has_calls([ - mock.call(conf['ca_cert_full_path'], - cert, mode=0o644)]) - if conf['ca_cert_config'] is not None: - mock_write.assert_has_calls([ - mock.call(conf['ca_cert_config'], - "%s\n%s\n" % (ca_certs_content, - conf['ca_cert_filename']), - omode="wb")]) - - mock_load.assert_called_once_with(conf['ca_cert_config']) + mock_write.assert_has_calls( + [mock.call(conf["ca_cert_full_path"], cert, mode=0o644)] + ) + if conf["ca_cert_config"] is not None: + mock_write.assert_has_calls( + [ + mock.call( + conf["ca_cert_config"], + "%s\n%s\n" + % (ca_certs_content, conf["ca_cert_filename"]), + omode="wb", + ) + ] + ) + + mock_load.assert_called_once_with(conf["ca_cert_config"]) def test_single_cert_to_empty_existing_ca_file(self): """Test adding a single certificate to the trusted CAs @@ -264,18 +290,23 @@ class TestAddCaCerts(TestCase): for distro_name in cc_ca_certs.distros: conf = cc_ca_certs._distro_ca_certs_configs(distro_name) - with mock.patch.object(util, 'write_file', - autospec=True) as m_write: + with mock.patch.object( + util, "write_file", autospec=True + ) as m_write: cc_ca_certs.add_ca_certs(conf, [cert]) - m_write.assert_has_calls([ - mock.call(conf['ca_cert_full_path'], - cert, mode=0o644)]) - if conf['ca_cert_config'] is not None: - m_write.assert_has_calls([ - mock.call(conf['ca_cert_config'], - expected, omode="wb")]) + m_write.assert_has_calls( + [mock.call(conf["ca_cert_full_path"], cert, mode=0o644)] + ) + if conf["ca_cert_config"] is not None: + m_write.assert_has_calls( + [ + mock.call( + conf["ca_cert_config"], expected, omode="wb" + ) + ] + ) def test_multiple_certs(self): """Test adding multiple certificates to the trusted CAs.""" @@ -290,45 +321,61 @@ class TestAddCaCerts(TestCase): with ExitStack() as mocks: mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) + mock.patch.object(util, "write_file") + ) mock_load = mocks.enter_context( - mock.patch.object(util, 'load_file', - return_value=ca_certs_content)) + mock.patch.object( + util, "load_file", return_value=ca_certs_content + ) + ) cc_ca_certs.add_ca_certs(conf, certs) - mock_write.assert_has_calls([ - mock.call(conf['ca_cert_full_path'], - expected_cert_file, mode=0o644)]) - if conf['ca_cert_config'] is not None: - mock_write.assert_has_calls([ - mock.call(conf['ca_cert_config'], - "%s\n%s\n" % (ca_certs_content, - conf['ca_cert_filename']), - omode='wb')]) - - mock_load.assert_called_once_with(conf['ca_cert_config']) + mock_write.assert_has_calls( + [ + mock.call( + conf["ca_cert_full_path"], + expected_cert_file, + mode=0o644, + ) + ] + ) + if conf["ca_cert_config"] is not None: + mock_write.assert_has_calls( + [ + mock.call( + conf["ca_cert_config"], + "%s\n%s\n" + % (ca_certs_content, conf["ca_cert_filename"]), + omode="wb", + ) + ] + ) + + mock_load.assert_called_once_with(conf["ca_cert_config"]) class TestUpdateCaCerts(unittest.TestCase): def test_commands(self): for distro_name in cc_ca_certs.distros: conf = cc_ca_certs._distro_ca_certs_configs(distro_name) - with mock.patch.object(subp, 'subp') as mockobj: + with mock.patch.object(subp, "subp") as mockobj: cc_ca_certs.update_ca_certs(conf) mockobj.assert_called_once_with( - conf['ca_cert_update_cmd'], capture=False) + conf["ca_cert_update_cmd"], capture=False + ) class TestRemoveDefaultCaCerts(TestCase): - def setUp(self): super(TestRemoveDefaultCaCerts, self).setUp() tmpdir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, tmpdir) - self.paths = helpers.Paths({ - 'cloud_dir': tmpdir, - }) + self.paths = helpers.Paths( + { + "cloud_dir": tmpdir, + } + ) def test_commands(self): for distro_name in cc_ca_certs.distros: @@ -336,26 +383,125 @@ class TestRemoveDefaultCaCerts(TestCase): with ExitStack() as mocks: mock_delete = mocks.enter_context( - mock.patch.object(util, 'delete_dir_contents')) + mock.patch.object(util, "delete_dir_contents") + ) mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) + mock.patch.object(util, "write_file") + ) mock_subp = mocks.enter_context( - mock.patch.object(subp, 'subp')) + mock.patch.object(subp, "subp") + ) cc_ca_certs.remove_default_ca_certs(distro_name, conf) - mock_delete.assert_has_calls([ - mock.call(conf['ca_cert_path']), - mock.call(conf['ca_cert_system_path'])]) + mock_delete.assert_has_calls( + [ + mock.call(conf["ca_cert_path"]), + mock.call(conf["ca_cert_system_path"]), + ] + ) - if conf['ca_cert_config'] is not None: + if conf["ca_cert_config"] is not None: mock_write.assert_called_once_with( - conf['ca_cert_config'], "", mode=0o644) + conf["ca_cert_config"], "", mode=0o644 + ) - if distro_name in ['debian', 'ubuntu']: + if distro_name in ["debian", "ubuntu"]: mock_subp.assert_called_once_with( - ('debconf-set-selections', '-'), - "ca-certificates \ -ca-certificates/trust_new_crts select no") + ("debconf-set-selections", "-"), + "ca-certificates ca-certificates/trust_new_crts" + " select no", + ) + + +class TestCACertsSchema: + """Directly test schema rather than through handle.""" + + @pytest.mark.parametrize( + "config, error_msg", + ( + # Valid, yet deprecated schemas + ({"ca-certs": {"remove-defaults": True}}, None), + # Invalid schemas + ( + {"ca_certs": 1}, + "ca_certs: 1 is not of type 'object'", + ), + ( + {"ca_certs": {}}, + re.escape("ca_certs: {} does not have enough properties"), + ), + ( + {"ca_certs": {"boguskey": 1}}, + re.escape( + "ca_certs: Additional properties are not allowed" + " ('boguskey' was unexpected)" + ), + ), + ( + {"ca_certs": {"remove_defaults": 1}}, + "ca_certs.remove_defaults: 1 is not of type 'boolean'", + ), + ( + {"ca_certs": {"trusted": [1]}}, + "ca_certs.trusted.0: 1 is not of type 'string'", + ), + ( + {"ca_certs": {"trusted": []}}, + re.escape("ca_certs.trusted: [] is too short"), + ), + ), + ) + @skipUnlessJsonSchema() + def test_schema_validation(self, config, error_msg): + """Assert expected schema validation and error messages.""" + # New-style schema $defs exist in config/cloud-init-schema*.json + schema = get_schema() + if error_msg is None: + validate_cloudconfig_schema(config, schema, strict=True) + else: + with pytest.raises(SchemaValidationError, match=error_msg): + validate_cloudconfig_schema(config, schema, strict=True) + + @mock.patch.object(cc_ca_certs, "update_ca_certs") + def test_deprecate_key_warnings(self, update_ca_certs, caplog): + """Assert warnings are logged for deprecated keys.""" + log = logging.getLogger("CALogTest") + cloud = get_cloud("ubuntu") + cc_ca_certs.handle( + "IGNORE", {"ca-certs": {"remove-defaults": False}}, cloud, log, [] + ) + expected_warnings = [ + "DEPRECATION: key 'ca-certs' is now deprecated. Use 'ca_certs'" + " instead.", + "DEPRECATION: key 'ca-certs.remove-defaults' is now deprecated." + " Use 'ca_certs.remove_defaults' instead.", + ] + for warning in expected_warnings: + assert warning in caplog.text + assert 1 == update_ca_certs.call_count + + @mock.patch.object(cc_ca_certs, "update_ca_certs") + def test_duplicate_keys(self, update_ca_certs, caplog): + """Assert warnings are logged for deprecated keys.""" + log = logging.getLogger("CALogTest") + cloud = get_cloud("ubuntu") + cc_ca_certs.handle( + "IGNORE", + { + "ca-certs": {"remove-defaults": True}, + "ca_certs": {"remove_defaults": False}, + }, + cloud, + log, + [], + ) + expected_warning = ( + "Found both ca-certs (deprecated) and ca_certs config keys." + " Ignoring ca-certs." + ) + assert expected_warning in caplog.text + assert 1 == update_ca_certs.call_count + # vi: ts=4 expandtab |