summaryrefslogtreecommitdiff
path: root/tests/unittests/config/test_cc_ca_certs.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/config/test_cc_ca_certs.py')
-rw-r--r--tests/unittests/config/test_cc_ca_certs.py251
1 files changed, 100 insertions, 151 deletions
diff --git a/tests/unittests/config/test_cc_ca_certs.py b/tests/unittests/config/test_cc_ca_certs.py
index a0b402ac..19e5d422 100644
--- a/tests/unittests/config/test_cc_ca_certs.py
+++ b/tests/unittests/config/test_cc_ca_certs.py
@@ -74,15 +74,19 @@ class TestConfig(TestCase):
mock.patch.object(cc_ca_certs, "update_ca_certs")
)
self.mock_remove = self.mocks.enter_context(
- mock.patch.object(cc_ca_certs, "remove_default_ca_certs")
+ mock.patch.object(cc_ca_certs, "disable_default_ca_certs")
)
- def test_no_trusted_list(self):
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_no_trusted_list(self, _):
"""
Test that no certificates are written if the 'trusted' key is not
present.
"""
- config = {"ca-certs": {}}
+ config = {"ca_certs": {}}
for distro_name in cc_ca_certs.distros:
self._mock_init()
@@ -93,9 +97,13 @@ class TestConfig(TestCase):
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 0)
- def test_empty_trusted_list(self):
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_empty_trusted_list(self, _):
"""Test that no certificate are written if 'trusted' list is empty."""
- config = {"ca-certs": {"trusted": []}}
+ config = {"ca_certs": {"trusted": []}}
for distro_name in cc_ca_certs.distros:
self._mock_init()
@@ -106,9 +114,13 @@ class TestConfig(TestCase):
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 0)
- def test_single_trusted(self):
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_single_trusted(self, _):
"""Test that a single cert gets passed to add_ca_certs."""
- config = {"ca-certs": {"trusted": ["CERT1"]}}
+ config = {"ca_certs": {"trusted": ["CERT1"]}}
for distro_name in cc_ca_certs.distros:
self._mock_init()
@@ -120,9 +132,13 @@ class TestConfig(TestCase):
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 0)
- def test_multiple_trusted(self):
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_multiple_trusted(self, _):
"""Test that multiple certs get passed to add_ca_certs."""
- config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
+ config = {"ca_certs": {"trusted": ["CERT1", "CERT2"]}}
for distro_name in cc_ca_certs.distros:
self._mock_init()
@@ -134,7 +150,11 @@ class TestConfig(TestCase):
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 0)
- def test_remove_default_ca_certs(self):
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_remove_default_ca_certs(self, _):
"""Test remove_defaults works as expected."""
config = {"ca_certs": {"remove_defaults": True}}
@@ -147,7 +167,11 @@ class TestConfig(TestCase):
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 1)
- def test_no_remove_defaults_if_false(self):
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_no_remove_defaults_if_false(self, _):
"""Test remove_defaults is not called when config value is False."""
config = {"ca_certs": {"remove_defaults": False}}
@@ -160,8 +184,14 @@ class TestConfig(TestCase):
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 0)
- def test_correct_order_for_remove_then_add(self):
- """Test remove_defaults is not called when config value is False."""
+ @mock.patch(
+ "cloudinit.distros.networking.subp.subp",
+ return_value=("", None),
+ )
+ def test_correct_order_for_remove_then_add(self, _):
+ """
+ Test remove_defaults is called before add.
+ """
config = {"ca_certs": {"remove_defaults": True, "trusted": ["CERT1"]}}
for distro_name in cc_ca_certs.distros:
@@ -170,9 +200,9 @@ class TestConfig(TestCase):
conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
+ self.assertEqual(self.mock_remove.call_count, 1)
self.mock_add.assert_called_once_with(conf, ["CERT1"])
self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 1)
class TestAddCaCerts(TestCase):
@@ -200,51 +230,10 @@ class TestAddCaCerts(TestCase):
cc_ca_certs.add_ca_certs(conf, [])
self.assertEqual(mockobj.call_count, 0)
- def test_single_cert_trailing_cr(self):
- """Test adding a single certificate to the trusted CAs
- when existing ca-certificates has trailing newline"""
- cert = "CERT1\nLINE2\nLINE3"
-
- ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"
- expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n"
-
- self.m_stat.return_value.st_size = 1
-
- for distro_name in cc_ca_certs.distros:
- conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
-
- with ExitStack() as mocks:
- mock_write = mocks.enter_context(
- mock.patch.object(util, "write_file")
- )
- mock_load = mocks.enter_context(
- mock.patch.object(
- util, "load_file", return_value=ca_certs_content
- )
- )
-
- cc_ca_certs.add_ca_certs(conf, [cert])
-
- mock_write.assert_has_calls(
- [mock.call(conf["ca_cert_full_path"], cert, mode=0o644)]
- )
- if conf["ca_cert_config"] is not None:
- mock_write.assert_has_calls(
- [
- mock.call(
- conf["ca_cert_config"], expected, omode="wb"
- )
- ]
- )
- mock_load.assert_called_once_with(conf["ca_cert_config"])
-
- def test_single_cert_no_trailing_cr(self):
- """Test adding a single certificate to the trusted CAs
- when existing ca-certificates has no trailing newline"""
+ def test_single_cert(self):
+ """Test adding a single certificate to the trusted CAs."""
cert = "CERT1\nLINE2\nLINE3"
- ca_certs_content = "line1\nline2\nline3"
-
self.m_stat.return_value.st_size = 1
for distro_name in cc_ca_certs.distros:
@@ -254,65 +243,24 @@ class TestAddCaCerts(TestCase):
mock_write = mocks.enter_context(
mock.patch.object(util, "write_file")
)
- mock_load = mocks.enter_context(
- mock.patch.object(
- util, "load_file", return_value=ca_certs_content
- )
- )
cc_ca_certs.add_ca_certs(conf, [cert])
mock_write.assert_has_calls(
- [mock.call(conf["ca_cert_full_path"], cert, mode=0o644)]
- )
- if conf["ca_cert_config"] is not None:
- mock_write.assert_has_calls(
- [
- mock.call(
- conf["ca_cert_config"],
- "%s\n%s\n"
- % (ca_certs_content, conf["ca_cert_filename"]),
- omode="wb",
- )
- ]
- )
-
- mock_load.assert_called_once_with(conf["ca_cert_config"])
-
- def test_single_cert_to_empty_existing_ca_file(self):
- """Test adding a single certificate to the trusted CAs
- when existing ca-certificates.conf is empty"""
- cert = "CERT1\nLINE2\nLINE3"
-
- expected = "cloud-init-ca-certs.crt\n"
-
- self.m_stat.return_value.st_size = 0
-
- for distro_name in cc_ca_certs.distros:
- conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
- with mock.patch.object(
- util, "write_file", autospec=True
- ) as m_write:
-
- cc_ca_certs.add_ca_certs(conf, [cert])
-
- m_write.assert_has_calls(
- [mock.call(conf["ca_cert_full_path"], cert, mode=0o644)]
+ [
+ mock.call(
+ conf["ca_cert_full_path"].format(cert_index=1),
+ cert,
+ mode=0o644,
+ )
+ ]
)
- if conf["ca_cert_config"] is not None:
- m_write.assert_has_calls(
- [
- mock.call(
- conf["ca_cert_config"], expected, omode="wb"
- )
- ]
- )
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs."""
certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
- expected_cert_file = "\n".join(certs)
- ca_certs_content = "line1\nline2\nline3"
+ expected_cert_1_file = certs[0]
+ expected_cert_2_file = certs[1]
self.m_stat.return_value.st_size = 1
@@ -323,36 +271,23 @@ class TestAddCaCerts(TestCase):
mock_write = mocks.enter_context(
mock.patch.object(util, "write_file")
)
- mock_load = mocks.enter_context(
- mock.patch.object(
- util, "load_file", return_value=ca_certs_content
- )
- )
cc_ca_certs.add_ca_certs(conf, certs)
mock_write.assert_has_calls(
[
mock.call(
- conf["ca_cert_full_path"],
- expected_cert_file,
+ conf["ca_cert_full_path"].format(cert_index=1),
+ expected_cert_1_file,
mode=0o644,
- )
+ ),
+ mock.call(
+ conf["ca_cert_full_path"].format(cert_index=2),
+ expected_cert_2_file,
+ mode=0o644,
+ ),
]
)
- if conf["ca_cert_config"] is not None:
- mock_write.assert_has_calls(
- [
- mock.call(
- conf["ca_cert_config"],
- "%s\n%s\n"
- % (ca_certs_content, conf["ca_cert_filename"]),
- omode="wb",
- )
- ]
- )
-
- mock_load.assert_called_once_with(conf["ca_cert_config"])
class TestUpdateCaCerts(unittest.TestCase):
@@ -378,6 +313,12 @@ class TestRemoveDefaultCaCerts(TestCase):
)
def test_commands(self):
+ ca_certs_content = "# line1\nline2\nline3\n"
+ expected = (
+ "# line1\n# Modified by cloud-init to deselect certs due to"
+ " user-data\n!line2\n!line3\n"
+ )
+
for distro_name in cc_ca_certs.distros:
conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
@@ -385,33 +326,42 @@ class TestRemoveDefaultCaCerts(TestCase):
mock_delete = mocks.enter_context(
mock.patch.object(util, "delete_dir_contents")
)
- mock_write = mocks.enter_context(
- mock.patch.object(util, "write_file")
+ mock_load = mocks.enter_context(
+ mock.patch.object(
+ util, "load_file", return_value=ca_certs_content
+ )
)
mock_subp = mocks.enter_context(
mock.patch.object(subp, "subp")
)
-
- cc_ca_certs.remove_default_ca_certs(distro_name, conf)
-
- mock_delete.assert_has_calls(
- [
- mock.call(conf["ca_cert_path"]),
- mock.call(conf["ca_cert_system_path"]),
- ]
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, "write_file")
)
- if conf["ca_cert_config"] is not None:
+ cc_ca_certs.disable_default_ca_certs(distro_name, conf)
+
+ if distro_name == "rhel":
+ mock_delete.assert_has_calls(
+ [
+ mock.call(conf["ca_cert_path"]),
+ mock.call(conf["ca_cert_local_path"]),
+ ]
+ )
+ self.assertEqual([], mock_subp.call_args_list)
+ elif distro_name in ["alpine", "debian", "ubuntu"]:
+ mock_load.assert_called_once_with(conf["ca_cert_config"])
mock_write.assert_called_once_with(
- conf["ca_cert_config"], "", mode=0o644
+ conf["ca_cert_config"], expected, omode="wb"
)
- if distro_name in ["debian", "ubuntu"]:
- mock_subp.assert_called_once_with(
- ("debconf-set-selections", "-"),
- "ca-certificates ca-certificates/trust_new_crts"
- " select no",
- )
+ if distro_name in ["debian", "ubuntu"]:
+ mock_subp.assert_called_once_with(
+ ("debconf-set-selections", "-"),
+ "ca-certificates ca-certificates/trust_new_crts"
+ " select no",
+ )
+ else:
+ assert mock_subp.call_count == 0
class TestCACertsSchema:
@@ -423,11 +373,10 @@ class TestCACertsSchema:
# Valid, yet deprecated schemas
(
{"ca-certs": {"remove-defaults": True}},
- "Cloud config schema deprecations: "
- "ca-certs: DEPRECATED. Dropped after April 2027. "
- "Use ``ca_certs``., "
- "ca-certs.remove-defaults: DEPRECATED. "
- "Dropped after April 2027. Use ``remove_defaults``.",
+ "Cloud config schema deprecations: ca-certs: "
+ "Deprecated in version 22.3. Use ``ca_certs`` instead.,"
+ " ca-certs.remove-defaults: Deprecated in version 22.3"
+ ". Use ``remove_defaults`` instead.",
),
# Invalid schemas
(