summaryrefslogtreecommitdiff
path: root/tests/unittests/config/test_cc_ca_certs.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/config/test_cc_ca_certs.py')
-rw-r--r--tests/unittests/config/test_cc_ca_certs.py326
1 files changed, 236 insertions, 90 deletions
diff --git a/tests/unittests/config/test_cc_ca_certs.py b/tests/unittests/config/test_cc_ca_certs.py
index 91b005d0..39614635 100644
--- a/tests/unittests/config/test_cc_ca_certs.py
+++ b/tests/unittests/config/test_cc_ca_certs.py
@@ -1,18 +1,22 @@
# This file is part of cloud-init. See LICENSE file for license information.
import logging
+import re
import shutil
import tempfile
import unittest
from contextlib import ExitStack
from unittest import mock
-from cloudinit import distros
-from cloudinit.config import cc_ca_certs
-from cloudinit import helpers
-from cloudinit import subp
-from cloudinit import util
-from tests.unittests.helpers import TestCase
+import pytest
+from cloudinit import distros, helpers, subp, util
+from cloudinit.config import cc_ca_certs
+from cloudinit.config.schema import (
+ SchemaValidationError,
+ get_schema,
+ validate_cloudconfig_schema,
+)
+from tests.unittests.helpers import TestCase, skipUnlessJsonSchema
from tests.unittests.util import get_cloud
@@ -31,12 +35,15 @@ class TestNoConfig(unittest.TestCase):
config = util.get_builtin_cfg()
with ExitStack() as mocks:
util_mock = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
+ mock.patch.object(util, "write_file")
+ )
certs_mock = mocks.enter_context(
- mock.patch.object(cc_ca_certs, 'update_ca_certs'))
+ mock.patch.object(cc_ca_certs, "update_ca_certs")
+ )
- cc_ca_certs.handle(self.name, config, self.cloud_init, self.log,
- self.args)
+ cc_ca_certs.handle(
+ self.name, config, self.cloud_init, self.log, self.args
+ )
self.assertEqual(util_mock.call_count, 0)
self.assertEqual(certs_mock.call_count, 0)
@@ -61,11 +68,14 @@ class TestConfig(TestCase):
# Mock out the functions that actually modify the system
self.mock_add = self.mocks.enter_context(
- mock.patch.object(cc_ca_certs, 'add_ca_certs'))
+ mock.patch.object(cc_ca_certs, "add_ca_certs")
+ )
self.mock_update = self.mocks.enter_context(
- mock.patch.object(cc_ca_certs, 'update_ca_certs'))
+ mock.patch.object(cc_ca_certs, "update_ca_certs")
+ )
self.mock_remove = self.mocks.enter_context(
- mock.patch.object(cc_ca_certs, 'remove_default_ca_certs'))
+ mock.patch.object(cc_ca_certs, "remove_default_ca_certs")
+ )
def test_no_trusted_list(self):
"""
@@ -106,7 +116,7 @@ class TestConfig(TestCase):
conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.mock_add.assert_called_once_with(conf, ['CERT1'])
+ self.mock_add.assert_called_once_with(conf, ["CERT1"])
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 0)
@@ -120,13 +130,13 @@ class TestConfig(TestCase):
conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.mock_add.assert_called_once_with(conf, ['CERT1', 'CERT2'])
+ self.mock_add.assert_called_once_with(conf, ["CERT1", "CERT2"])
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 0)
def test_remove_default_ca_certs(self):
"""Test remove_defaults works as expected."""
- config = {"ca-certs": {"remove-defaults": True}}
+ config = {"ca_certs": {"remove_defaults": True}}
for distro_name in cc_ca_certs.distros:
self._mock_init()
@@ -139,7 +149,7 @@ class TestConfig(TestCase):
def test_no_remove_defaults_if_false(self):
"""Test remove_defaults is not called when config value is False."""
- config = {"ca-certs": {"remove-defaults": False}}
+ config = {"ca_certs": {"remove_defaults": False}}
for distro_name in cc_ca_certs.distros:
self._mock_init()
@@ -152,7 +162,7 @@ class TestConfig(TestCase):
def test_correct_order_for_remove_then_add(self):
"""Test remove_defaults is not called when config value is False."""
- config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
+ config = {"ca_certs": {"remove_defaults": True, "trusted": ["CERT1"]}}
for distro_name in cc_ca_certs.distros:
self._mock_init()
@@ -160,20 +170,21 @@ class TestConfig(TestCase):
conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.mock_add.assert_called_once_with(conf, ['CERT1'])
+ self.mock_add.assert_called_once_with(conf, ["CERT1"])
self.assertEqual(self.mock_update.call_count, 1)
self.assertEqual(self.mock_remove.call_count, 1)
class TestAddCaCerts(TestCase):
-
def setUp(self):
super(TestAddCaCerts, self).setUp()
tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir)
- self.paths = helpers.Paths({
- 'cloud_dir': tmpdir,
- })
+ self.paths = helpers.Paths(
+ {
+ "cloud_dir": tmpdir,
+ }
+ )
self.add_patch("cloudinit.config.cc_ca_certs.os.stat", "m_stat")
def _fetch_distro(self, kind):
@@ -185,7 +196,7 @@ class TestAddCaCerts(TestCase):
"""Test that no certificate are written if not provided."""
for distro_name in cc_ca_certs.distros:
conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
- with mock.patch.object(util, 'write_file') as mockobj:
+ with mock.patch.object(util, "write_file") as mockobj:
cc_ca_certs.add_ca_certs(conf, [])
self.assertEqual(mockobj.call_count, 0)
@@ -204,21 +215,28 @@ class TestAddCaCerts(TestCase):
with ExitStack() as mocks:
mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
+ mock.patch.object(util, "write_file")
+ )
mock_load = mocks.enter_context(
- mock.patch.object(util, 'load_file',
- return_value=ca_certs_content))
+ mock.patch.object(
+ util, "load_file", return_value=ca_certs_content
+ )
+ )
cc_ca_certs.add_ca_certs(conf, [cert])
- mock_write.assert_has_calls([
- mock.call(conf['ca_cert_full_path'],
- cert, mode=0o644)])
- if conf['ca_cert_config'] is not None:
- mock_write.assert_has_calls([
- mock.call(conf['ca_cert_config'],
- expected, omode="wb")])
- mock_load.assert_called_once_with(conf['ca_cert_config'])
+ mock_write.assert_has_calls(
+ [mock.call(conf["ca_cert_full_path"], cert, mode=0o644)]
+ )
+ if conf["ca_cert_config"] is not None:
+ mock_write.assert_has_calls(
+ [
+ mock.call(
+ conf["ca_cert_config"], expected, omode="wb"
+ )
+ ]
+ )
+ mock_load.assert_called_once_with(conf["ca_cert_config"])
def test_single_cert_no_trailing_cr(self):
"""Test adding a single certificate to the trusted CAs
@@ -234,24 +252,32 @@ class TestAddCaCerts(TestCase):
with ExitStack() as mocks:
mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
+ mock.patch.object(util, "write_file")
+ )
mock_load = mocks.enter_context(
- mock.patch.object(util, 'load_file',
- return_value=ca_certs_content))
+ mock.patch.object(
+ util, "load_file", return_value=ca_certs_content
+ )
+ )
cc_ca_certs.add_ca_certs(conf, [cert])
- mock_write.assert_has_calls([
- mock.call(conf['ca_cert_full_path'],
- cert, mode=0o644)])
- if conf['ca_cert_config'] is not None:
- mock_write.assert_has_calls([
- mock.call(conf['ca_cert_config'],
- "%s\n%s\n" % (ca_certs_content,
- conf['ca_cert_filename']),
- omode="wb")])
-
- mock_load.assert_called_once_with(conf['ca_cert_config'])
+ mock_write.assert_has_calls(
+ [mock.call(conf["ca_cert_full_path"], cert, mode=0o644)]
+ )
+ if conf["ca_cert_config"] is not None:
+ mock_write.assert_has_calls(
+ [
+ mock.call(
+ conf["ca_cert_config"],
+ "%s\n%s\n"
+ % (ca_certs_content, conf["ca_cert_filename"]),
+ omode="wb",
+ )
+ ]
+ )
+
+ mock_load.assert_called_once_with(conf["ca_cert_config"])
def test_single_cert_to_empty_existing_ca_file(self):
"""Test adding a single certificate to the trusted CAs
@@ -264,18 +290,23 @@ class TestAddCaCerts(TestCase):
for distro_name in cc_ca_certs.distros:
conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
- with mock.patch.object(util, 'write_file',
- autospec=True) as m_write:
+ with mock.patch.object(
+ util, "write_file", autospec=True
+ ) as m_write:
cc_ca_certs.add_ca_certs(conf, [cert])
- m_write.assert_has_calls([
- mock.call(conf['ca_cert_full_path'],
- cert, mode=0o644)])
- if conf['ca_cert_config'] is not None:
- m_write.assert_has_calls([
- mock.call(conf['ca_cert_config'],
- expected, omode="wb")])
+ m_write.assert_has_calls(
+ [mock.call(conf["ca_cert_full_path"], cert, mode=0o644)]
+ )
+ if conf["ca_cert_config"] is not None:
+ m_write.assert_has_calls(
+ [
+ mock.call(
+ conf["ca_cert_config"], expected, omode="wb"
+ )
+ ]
+ )
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs."""
@@ -290,45 +321,61 @@ class TestAddCaCerts(TestCase):
with ExitStack() as mocks:
mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
+ mock.patch.object(util, "write_file")
+ )
mock_load = mocks.enter_context(
- mock.patch.object(util, 'load_file',
- return_value=ca_certs_content))
+ mock.patch.object(
+ util, "load_file", return_value=ca_certs_content
+ )
+ )
cc_ca_certs.add_ca_certs(conf, certs)
- mock_write.assert_has_calls([
- mock.call(conf['ca_cert_full_path'],
- expected_cert_file, mode=0o644)])
- if conf['ca_cert_config'] is not None:
- mock_write.assert_has_calls([
- mock.call(conf['ca_cert_config'],
- "%s\n%s\n" % (ca_certs_content,
- conf['ca_cert_filename']),
- omode='wb')])
-
- mock_load.assert_called_once_with(conf['ca_cert_config'])
+ mock_write.assert_has_calls(
+ [
+ mock.call(
+ conf["ca_cert_full_path"],
+ expected_cert_file,
+ mode=0o644,
+ )
+ ]
+ )
+ if conf["ca_cert_config"] is not None:
+ mock_write.assert_has_calls(
+ [
+ mock.call(
+ conf["ca_cert_config"],
+ "%s\n%s\n"
+ % (ca_certs_content, conf["ca_cert_filename"]),
+ omode="wb",
+ )
+ ]
+ )
+
+ mock_load.assert_called_once_with(conf["ca_cert_config"])
class TestUpdateCaCerts(unittest.TestCase):
def test_commands(self):
for distro_name in cc_ca_certs.distros:
conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
- with mock.patch.object(subp, 'subp') as mockobj:
+ with mock.patch.object(subp, "subp") as mockobj:
cc_ca_certs.update_ca_certs(conf)
mockobj.assert_called_once_with(
- conf['ca_cert_update_cmd'], capture=False)
+ conf["ca_cert_update_cmd"], capture=False
+ )
class TestRemoveDefaultCaCerts(TestCase):
-
def setUp(self):
super(TestRemoveDefaultCaCerts, self).setUp()
tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir)
- self.paths = helpers.Paths({
- 'cloud_dir': tmpdir,
- })
+ self.paths = helpers.Paths(
+ {
+ "cloud_dir": tmpdir,
+ }
+ )
def test_commands(self):
for distro_name in cc_ca_certs.distros:
@@ -336,26 +383,125 @@ class TestRemoveDefaultCaCerts(TestCase):
with ExitStack() as mocks:
mock_delete = mocks.enter_context(
- mock.patch.object(util, 'delete_dir_contents'))
+ mock.patch.object(util, "delete_dir_contents")
+ )
mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
+ mock.patch.object(util, "write_file")
+ )
mock_subp = mocks.enter_context(
- mock.patch.object(subp, 'subp'))
+ mock.patch.object(subp, "subp")
+ )
cc_ca_certs.remove_default_ca_certs(distro_name, conf)
- mock_delete.assert_has_calls([
- mock.call(conf['ca_cert_path']),
- mock.call(conf['ca_cert_system_path'])])
+ mock_delete.assert_has_calls(
+ [
+ mock.call(conf["ca_cert_path"]),
+ mock.call(conf["ca_cert_system_path"]),
+ ]
+ )
- if conf['ca_cert_config'] is not None:
+ if conf["ca_cert_config"] is not None:
mock_write.assert_called_once_with(
- conf['ca_cert_config'], "", mode=0o644)
+ conf["ca_cert_config"], "", mode=0o644
+ )
- if distro_name in ['debian', 'ubuntu']:
+ if distro_name in ["debian", "ubuntu"]:
mock_subp.assert_called_once_with(
- ('debconf-set-selections', '-'),
- "ca-certificates \
-ca-certificates/trust_new_crts select no")
+ ("debconf-set-selections", "-"),
+ "ca-certificates ca-certificates/trust_new_crts"
+ " select no",
+ )
+
+
+class TestCACertsSchema:
+ """Directly test schema rather than through handle."""
+
+ @pytest.mark.parametrize(
+ "config, error_msg",
+ (
+ # Valid, yet deprecated schemas
+ ({"ca-certs": {"remove-defaults": True}}, None),
+ # Invalid schemas
+ (
+ {"ca_certs": 1},
+ "ca_certs: 1 is not of type 'object'",
+ ),
+ (
+ {"ca_certs": {}},
+ re.escape("ca_certs: {} does not have enough properties"),
+ ),
+ (
+ {"ca_certs": {"boguskey": 1}},
+ re.escape(
+ "ca_certs: Additional properties are not allowed"
+ " ('boguskey' was unexpected)"
+ ),
+ ),
+ (
+ {"ca_certs": {"remove_defaults": 1}},
+ "ca_certs.remove_defaults: 1 is not of type 'boolean'",
+ ),
+ (
+ {"ca_certs": {"trusted": [1]}},
+ "ca_certs.trusted.0: 1 is not of type 'string'",
+ ),
+ (
+ {"ca_certs": {"trusted": []}},
+ re.escape("ca_certs.trusted: [] is too short"),
+ ),
+ ),
+ )
+ @skipUnlessJsonSchema()
+ def test_schema_validation(self, config, error_msg):
+ """Assert expected schema validation and error messages."""
+ # New-style schema $defs exist in config/cloud-init-schema*.json
+ schema = get_schema()
+ if error_msg is None:
+ validate_cloudconfig_schema(config, schema, strict=True)
+ else:
+ with pytest.raises(SchemaValidationError, match=error_msg):
+ validate_cloudconfig_schema(config, schema, strict=True)
+
+ @mock.patch.object(cc_ca_certs, "update_ca_certs")
+ def test_deprecate_key_warnings(self, update_ca_certs, caplog):
+ """Assert warnings are logged for deprecated keys."""
+ log = logging.getLogger("CALogTest")
+ cloud = get_cloud("ubuntu")
+ cc_ca_certs.handle(
+ "IGNORE", {"ca-certs": {"remove-defaults": False}}, cloud, log, []
+ )
+ expected_warnings = [
+ "DEPRECATION: key 'ca-certs' is now deprecated. Use 'ca_certs'"
+ " instead.",
+ "DEPRECATION: key 'ca-certs.remove-defaults' is now deprecated."
+ " Use 'ca_certs.remove_defaults' instead.",
+ ]
+ for warning in expected_warnings:
+ assert warning in caplog.text
+ assert 1 == update_ca_certs.call_count
+
+ @mock.patch.object(cc_ca_certs, "update_ca_certs")
+ def test_duplicate_keys(self, update_ca_certs, caplog):
+ """Assert warnings are logged for deprecated keys."""
+ log = logging.getLogger("CALogTest")
+ cloud = get_cloud("ubuntu")
+ cc_ca_certs.handle(
+ "IGNORE",
+ {
+ "ca-certs": {"remove-defaults": True},
+ "ca_certs": {"remove_defaults": False},
+ },
+ cloud,
+ log,
+ [],
+ )
+ expected_warning = (
+ "Found both ca-certs (deprecated) and ca_certs config keys."
+ " Ignoring ca-certs."
+ )
+ assert expected_warning in caplog.text
+ assert 1 == update_ca_certs.call_count
+
# vi: ts=4 expandtab