summaryrefslogtreecommitdiff
path: root/tests/integration_tests/modules/test_combined.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/integration_tests/modules/test_combined.py')
-rw-r--r--tests/integration_tests/modules/test_combined.py213
1 files changed, 121 insertions, 92 deletions
diff --git a/tests/integration_tests/modules/test_combined.py b/tests/integration_tests/modules/test_combined.py
index 26a8397d..7a9a6e27 100644
--- a/tests/integration_tests/modules/test_combined.py
+++ b/tests/integration_tests/modules/test_combined.py
@@ -6,9 +6,10 @@ the same instance launch. Most independent module coherence tests can go
here.
"""
import json
-import pytest
import re
+import pytest
+
from tests.integration_tests.clouds import ImageSpecification
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import (
@@ -76,7 +77,7 @@ class TestCombined:
Also tests LP 1511485: final_message is silent.
"""
client = class_client
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
expected = (
"This is my final message!\n"
r"\d+\.\d+.*\n"
@@ -94,10 +95,10 @@ class TestCombined:
configuring the archives.
"""
client = class_client
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'W: Failed to fetch' not in log
- assert 'W: Some index files failed to download' not in log
- assert 'E: Unable to locate package ntp' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "W: Failed to fetch" not in log
+ assert "W: Some index files failed to download" not in log
+ assert "E: Unable to locate package ntp" not in log
def test_byobu(self, class_client: IntegrationInstance):
"""Test byobu configured as enabled by default."""
@@ -107,22 +108,18 @@ class TestCombined:
def test_configured_locale(self, class_client: IntegrationInstance):
"""Test locale can be configured correctly."""
client = class_client
- default_locale = client.read_from_file('/etc/default/locale')
- assert 'LANG=en_GB.UTF-8' in default_locale
+ default_locale = client.read_from_file("/etc/default/locale")
+ assert "LANG=en_GB.UTF-8" in default_locale
- locale_a = client.execute('locale -a')
- verify_ordered_items_in_text([
- 'en_GB.utf8',
- 'en_US.utf8'
- ], locale_a)
+ locale_a = client.execute("locale -a")
+ verify_ordered_items_in_text(["en_GB.utf8", "en_US.utf8"], locale_a)
locale_gen = client.execute(
"cat /etc/locale.gen | grep -v '^#' | uniq"
)
- verify_ordered_items_in_text([
- 'en_GB.UTF-8',
- 'en_US.UTF-8'
- ], locale_gen)
+ verify_ordered_items_in_text(
+ ["en_GB.UTF-8", "en_US.UTF-8"], locale_gen
+ )
def test_random_seed_data(self, class_client: IntegrationInstance):
"""Integration test for the random seed module.
@@ -141,12 +138,12 @@ class TestCombined:
def test_rsyslog(self, class_client: IntegrationInstance):
"""Test rsyslog is configured correctly."""
client = class_client
- assert 'My test log' in client.read_from_file('/var/tmp/rsyslog.log')
+ assert "My test log" in client.read_from_file("/var/tmp/rsyslog.log")
def test_runcmd(self, class_client: IntegrationInstance):
"""Test runcmd works as expected"""
client = class_client
- assert 'hello world' == client.read_from_file('/var/tmp/runcmd_output')
+ assert "hello world" == client.read_from_file("/var/tmp/runcmd_output")
@retry(tries=30, delay=1)
def test_ssh_import_id(self, class_client: IntegrationInstance):
@@ -160,11 +157,10 @@ class TestCombined:
/home/ubuntu; this will need modification to run on other OSes.
"""
client = class_client
- ssh_output = client.read_from_file(
- "/home/ubuntu/.ssh/authorized_keys")
+ ssh_output = client.read_from_file("/home/ubuntu/.ssh/authorized_keys")
- assert '# ssh-import-id gh:powersj' in ssh_output
- assert '# ssh-import-id lp:smoser' in ssh_output
+ assert "# ssh-import-id gh:powersj" in ssh_output
+ assert "# ssh-import-id lp:smoser" in ssh_output
def test_snap(self, class_client: IntegrationInstance):
"""Integration test for the snap module.
@@ -185,21 +181,22 @@ class TestCombined:
"""
client = class_client
timezone_output = client.execute(
- 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"')
+ 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"'
+ )
assert timezone_output.strip() == "HDT"
def test_no_problems(self, class_client: IntegrationInstance):
"""Test no errors, warnings, or tracebacks"""
client = class_client
- status_file = client.read_from_file('/run/cloud-init/status.json')
- status_json = json.loads(status_file)['v1']
- for stage in ('init', 'init-local', 'modules-config', 'modules-final'):
- assert status_json[stage]['errors'] == []
- result_file = client.read_from_file('/run/cloud-init/result.json')
- result_json = json.loads(result_file)['v1']
- assert result_json['errors'] == []
-
- log = client.read_from_file('/var/log/cloud-init.log')
+ status_file = client.read_from_file("/run/cloud-init/status.json")
+ status_json = json.loads(status_file)["v1"]
+ for stage in ("init", "init-local", "modules-config", "modules-final"):
+ assert status_json[stage]["errors"] == []
+ result_file = client.read_from_file("/run/cloud-init/result.json")
+ result_json = json.loads(result_file)["v1"]
+ assert result_json["errors"] == []
+
+ log = client.read_from_file("/var/log/cloud-init.log")
verify_clean_log(log)
def test_correct_datasource_detected(
@@ -208,93 +205,122 @@ class TestCombined:
"""Test datasource is detected at the proper boot stage."""
client = class_client
status_file = client.read_from_file("/run/cloud-init/status.json")
-
- platform_datasources = {
- "azure": "DataSourceAzure [seed=/dev/sr0]",
- "ec2": "DataSourceEc2Local",
- "gce": "DataSourceGCELocal",
- "oci": "DataSourceOracle",
- "openstack": "DataSourceOpenStackLocal [net,ver=2]",
- "lxd_container": (
- "DataSourceNoCloud "
- "[seed=/var/lib/cloud/seed/nocloud-net][dsmode=net]"
- ),
- "lxd_vm": "DataSourceNoCloud [seed=/dev/sr0][dsmode=net]",
- }
-
- assert (
- platform_datasources[client.settings.PLATFORM]
- == json.loads(status_file)["v1"]["datasource"]
+ parsed_datasource = json.loads(status_file)["v1"]["datasource"]
+
+ if client.settings.PLATFORM in ["lxd_container", "lxd_vm"]:
+ assert parsed_datasource.startswith("DataSourceNoCloud")
+ else:
+ platform_datasources = {
+ "azure": "DataSourceAzure [seed=/dev/sr0]",
+ "ec2": "DataSourceEc2Local",
+ "gce": "DataSourceGCELocal",
+ "oci": "DataSourceOracle",
+ "openstack": "DataSourceOpenStackLocal [net,ver=2]",
+ }
+ assert (
+ platform_datasources[client.settings.PLATFORM]
+ == parsed_datasource
+ )
+
+ def test_cloud_id_file_symlink(self, class_client: IntegrationInstance):
+ cloud_id = class_client.execute("cloud-id").stdout
+ expected_link_output = (
+ "'/run/cloud-init/cloud-id' -> "
+ f"'/run/cloud-init/cloud-id-{cloud_id}'"
+ )
+ assert expected_link_output == str(
+ class_client.execute("stat -c %N /run/cloud-init/cloud-id")
)
def _check_common_metadata(self, data):
- assert data['base64_encoded_keys'] == []
- assert data['merged_cfg'] == 'redacted for non-root user'
+ assert data["base64_encoded_keys"] == []
+ assert data["merged_cfg"] == "redacted for non-root user"
image_spec = ImageSpecification.from_os_image()
- assert data['sys_info']['dist'][0] == image_spec.os
+ assert data["sys_info"]["dist"][0] == image_spec.os
- v1_data = data['v1']
- assert re.match(r'\d\.\d+\.\d+-\d+', v1_data['kernel_release'])
- assert v1_data['variant'] == image_spec.os
- assert v1_data['distro'] == image_spec.os
- assert v1_data['distro_release'] == image_spec.release
- assert v1_data['machine'] == 'x86_64'
- assert re.match(r'3.\d\.\d', v1_data['python_version'])
+ v1_data = data["v1"]
+ assert re.match(r"\d\.\d+\.\d+-\d+", v1_data["kernel_release"])
+ assert v1_data["variant"] == image_spec.os
+ assert v1_data["distro"] == image_spec.os
+ assert v1_data["distro_release"] == image_spec.release
+ assert v1_data["machine"] == "x86_64"
+ assert re.match(r"3.\d\.\d", v1_data["python_version"])
@pytest.mark.lxd_container
def test_instance_json_lxd(self, class_client: IntegrationInstance):
client = class_client
instance_json_file = client.read_from_file(
- '/run/cloud-init/instance-data.json')
+ "/run/cloud-init/instance-data.json"
+ )
data = json.loads(instance_json_file)
self._check_common_metadata(data)
- v1_data = data['v1']
- assert v1_data['cloud_name'] == 'unknown'
- assert v1_data['platform'] == 'lxd'
- assert v1_data['subplatform'] == (
- 'seed-dir (/var/lib/cloud/seed/nocloud-net)')
- assert v1_data['availability_zone'] is None
- assert v1_data['instance_id'] == client.instance.name
- assert v1_data['local_hostname'] == client.instance.name
- assert v1_data['region'] is None
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "unknown"
+ assert v1_data["platform"] == "lxd"
+ assert v1_data["cloud_id"] == "lxd"
+ assert f"{v1_data['cloud_id']}" == client.read_from_file(
+ "/run/cloud-init/cloud-id-lxd"
+ )
+ assert (
+ v1_data["subplatform"]
+ == "seed-dir (/var/lib/cloud/seed/nocloud-net)"
+ )
+ assert v1_data["availability_zone"] is None
+ assert v1_data["instance_id"] == client.instance.name
+ assert v1_data["local_hostname"] == client.instance.name
+ assert v1_data["region"] is None
@pytest.mark.lxd_vm
def test_instance_json_lxd_vm(self, class_client: IntegrationInstance):
client = class_client
instance_json_file = client.read_from_file(
- '/run/cloud-init/instance-data.json')
+ "/run/cloud-init/instance-data.json"
+ )
data = json.loads(instance_json_file)
self._check_common_metadata(data)
- v1_data = data['v1']
- assert v1_data['cloud_name'] == 'unknown'
- assert v1_data['platform'] == 'lxd'
- assert any([
- '/var/lib/cloud/seed/nocloud-net' in v1_data['subplatform'],
- '/dev/sr0' in v1_data['subplatform']
- ])
- assert v1_data['availability_zone'] is None
- assert v1_data['instance_id'] == client.instance.name
- assert v1_data['local_hostname'] == client.instance.name
- assert v1_data['region'] is None
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "unknown"
+ assert v1_data["platform"] == "lxd"
+ assert v1_data["cloud_id"] == "lxd"
+ assert f"{v1_data['cloud_id']}" == client.read_from_file(
+ "/run/cloud-init/cloud-id-lxd"
+ )
+ assert any(
+ [
+ "/var/lib/cloud/seed/nocloud-net" in v1_data["subplatform"],
+ "/dev/sr0" in v1_data["subplatform"],
+ ]
+ )
+ assert v1_data["availability_zone"] is None
+ assert v1_data["instance_id"] == client.instance.name
+ assert v1_data["local_hostname"] == client.instance.name
+ assert v1_data["region"] is None
@pytest.mark.ec2
def test_instance_json_ec2(self, class_client: IntegrationInstance):
client = class_client
instance_json_file = client.read_from_file(
- '/run/cloud-init/instance-data.json')
+ "/run/cloud-init/instance-data.json"
+ )
data = json.loads(instance_json_file)
- v1_data = data['v1']
- assert v1_data['cloud_name'] == 'aws'
- assert v1_data['platform'] == 'ec2'
- assert v1_data['subplatform'].startswith('metadata')
- assert v1_data[
- 'availability_zone'] == client.instance.availability_zone
- assert v1_data['instance_id'] == client.instance.name
- assert v1_data['local_hostname'].startswith('ip-')
- assert v1_data['region'] == client.cloud.cloud_instance.region
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "aws"
+ assert v1_data["platform"] == "ec2"
+ # Different regions will show up as ec2-(gov|china)
+ assert v1_data["cloud_id"].startswith("ec2")
+ assert f"{v1_data['cloud_id']}" == client.read_from_file(
+ "/run/cloud-init/cloud-id-ec2"
+ )
+ assert v1_data["subplatform"].startswith("metadata")
+ assert (
+ v1_data["availability_zone"] == client.instance.availability_zone
+ )
+ assert v1_data["instance_id"] == client.instance.name
+ assert v1_data["local_hostname"].startswith("ip-")
+ assert v1_data["region"] == client.cloud.cloud_instance.region
@pytest.mark.gce
def test_instance_json_gce(self, class_client: IntegrationInstance):
@@ -307,6 +333,9 @@ class TestCombined:
v1_data = data["v1"]
assert v1_data["cloud_name"] == "gce"
assert v1_data["platform"] == "gce"
+ assert f"{v1_data['cloud_id']}" == client.read_from_file(
+ "/run/cloud-init/cloud-id-gce"
+ )
assert v1_data["subplatform"].startswith("metadata")
assert v1_data["availability_zone"] == client.instance.zone
assert v1_data["instance_id"] == client.instance.instance_id