summaryrefslogtreecommitdiff
path: root/tests/integration_tests/modules/test_combined.py
blob: 8481b454a57bef50e32d581e555a7a16bc43fa66 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
# This file is part of cloud-init. See LICENSE file for license information.
"""A set of somewhat unrelated tests that can be combined into a single
instance launch. Generally tests should only be added here if a failure
of the test would be unlikely to affect the running of another test using
the same instance launch. Most independent module coherence tests can go
here.
"""
import glob
import importlib
import json
import re
import uuid
from pathlib import Path

import pytest

import cloudinit.config
from cloudinit.util import is_true
from tests.integration_tests.clouds import ImageSpecification
from tests.integration_tests.decorators import retry
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import (
    get_feature_flag_value,
    get_inactive_modules,
    lxd_has_nocloud,
    verify_clean_log,
    verify_ordered_items_in_text,
)

USER_DATA = """\
#cloud-config
users:
- default
- name: craig
  sudo: false  # make sure craig doesn't get elevated perms
apt:
  primary:
    - arches: [default]
      uri: http://us.archive.ubuntu.com/ubuntu/
byobu_by_default: enable
final_message: |
  This is my final message!
  $version
  $timestamp
  $datasource
  $uptime
locale: en_GB.UTF-8
locale_configfile: /etc/default/locale
ntp:
  servers: ['ntp.ubuntu.com']
package_update: true
random_seed:
  data: 'MYUb34023nD:LFDK10913jk;dfnk:Df'
  encoding: raw
  file: /root/seed
rsyslog:
  configs:
    - "*.* @@127.0.0.1"
    - filename: 0-basic-config.conf
      content: |
        module(load="imtcp")
        input(type="imtcp" port="514")
        $template RemoteLogs,"/var/spool/rsyslog/cloudinit.log"
        *.* ?RemoteLogs
        & ~
  remotes:
    me: "127.0.0.1"
runcmd:
  - echo 'hello world' > /var/tmp/runcmd_output

  - #
  - logger "My test log"
snap:
  commands:
    - snap install hello-world
ssh_import_id:
  - lp:smoser

timezone: US/Aleutian
"""


@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestCombined:
    @pytest.mark.ubuntu  # Because netplan
    def test_netplan_permissions(self, class_client: IntegrationInstance):
        """
        Test that netplan config file is generated with proper permissions
        """
        file_perms = class_client.execute(
            "stat -c %a /etc/netplan/50-cloud-init.yaml"
        )
        assert file_perms.ok, "Unable to check perms on 50-cloud-init.yaml"
        feature_netplan_root_only = is_true(
            get_feature_flag_value(
                class_client, "NETPLAN_CONFIG_ROOT_READ_ONLY"
            )
        )
        config_perms = "600" if feature_netplan_root_only else "644"
        assert config_perms == file_perms.stdout.strip()

    def test_final_message(self, class_client: IntegrationInstance):
        """Test that final_message module works as expected.

        Also tests LP 1511485: final_message is silent.
        """
        client = class_client
        log = client.read_from_file("/var/log/cloud-init.log")
        expected = (
            "This is my final message!\n"
            r"\d+\.\d+.*\n"
            r"\w{3}, \d{2} \w{3} \d{4} \d{2}:\d{2}:\d{2} \+\d{4}\n"  # Datetime
            "DataSource.*\n"
            r"\d+\.\d+"
        )

        assert re.search(expected, log)

    def test_deprecated_message(self, class_client: IntegrationInstance):
        """Check that deprecated key produces a log warning"""
        client = class_client
        log = client.read_from_file("/var/log/cloud-init.log")
        assert "Deprecated cloud-config provided" in log
        assert "The value of 'false' in user craig's 'sudo' config is " in log
        assert 2 == log.count("DEPRECATE")

    def test_ntp_with_apt(self, class_client: IntegrationInstance):
        """LP #1628337.

        cloud-init tries to install NTP before even
        configuring the archives.
        """
        client = class_client
        log = client.read_from_file("/var/log/cloud-init.log")
        assert "W: Failed to fetch" not in log
        assert "W: Some index files failed to download" not in log
        assert "E: Unable to locate package ntp" not in log

    def test_byobu(self, class_client: IntegrationInstance):
        """Test byobu configured as enabled by default."""
        client = class_client
        assert client.execute('test -e "/etc/byobu/autolaunch"').ok

    def test_configured_locale(self, class_client: IntegrationInstance):
        """Test locale can be configured correctly."""
        client = class_client
        default_locale = client.read_from_file("/etc/default/locale")
        assert "LANG=en_GB.UTF-8" in default_locale

        locale_a = client.execute("locale -a")
        verify_ordered_items_in_text(["en_GB.utf8", "en_US.utf8"], locale_a)

        locale_gen = client.execute(
            "cat /etc/locale.gen | grep -v '^#' | uniq"
        )
        verify_ordered_items_in_text(
            ["en_GB.UTF-8", "en_US.UTF-8"], locale_gen
        )

    def test_random_seed_data(self, class_client: IntegrationInstance):
        """Integration test for the random seed module.

        This test specifies a command to be executed by the ``seed_random``
        module, by providing a different data to be used as seed data. We will
        then check if that seed data was actually used.
        """
        client = class_client

        # Only read the first 31 characters, because the rest could be
        # binary data
        result = client.execute("head -c 31 < /root/seed")
        assert result.startswith("MYUb34023nD:LFDK10913jk;dfnk:Df")

    def test_rsyslog(self, class_client: IntegrationInstance):
        """Test rsyslog is configured correctly."""
        client = class_client
        assert "My test log" in client.read_from_file(
            "/var/spool/rsyslog/cloudinit.log"
        )

    def test_runcmd(self, class_client: IntegrationInstance):
        """Test runcmd works as expected"""
        client = class_client
        assert "hello world" == client.read_from_file("/var/tmp/runcmd_output")

    def test_snap(self, class_client: IntegrationInstance):
        """Integration test for the snap module.

        This test verify that the snap packages specified in the user-data
        were installed by the ``snap`` module during boot.
        """
        client = class_client
        snap_output = client.execute("snap list")
        assert "core " in snap_output
        assert "hello-world " in snap_output

    def test_timezone(self, class_client: IntegrationInstance):
        """Integration test for the timezone module.

        This test specifies a timezone to be used by the ``timezone`` module
        and then checks that if that timezone was respected during boot.
        """
        client = class_client
        timezone_output = client.execute(
            'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"'
        )
        assert timezone_output.strip() == "HDT"

    def test_no_problems(self, class_client: IntegrationInstance):
        """Test no errors, warnings, deprecations, tracebacks or
        inactive modules.
        """
        client = class_client
        status_file = client.read_from_file("/run/cloud-init/status.json")
        status_json = json.loads(status_file)["v1"]
        for stage in ("init", "init-local", "modules-config", "modules-final"):
            assert status_json[stage]["errors"] == []
        result_file = client.read_from_file("/run/cloud-init/result.json")
        result_json = json.loads(result_file)["v1"]
        assert result_json["errors"] == []

        log = client.read_from_file("/var/log/cloud-init.log")
        verify_clean_log(log, ignore_deprecations=False)
        requested_modules = {
            "apt_configure",
            "apt_pipelining",
            "byobu",
            "final_message",
            "locale",
            "ntp",
            "seed_random",
            "rsyslog",
            "runcmd",
            "snap",
            "ssh_import_id",
            "timezone",
        }
        inactive_modules = get_inactive_modules(log)
        assert not requested_modules.intersection(inactive_modules), (
            f"Expected active modules:"
            f" {requested_modules.intersection(inactive_modules)}"
        )

    def test_correct_datasource_detected(
        self, class_client: IntegrationInstance
    ):
        """Test datasource is detected at the proper boot stage."""
        client = class_client
        status_file = client.read_from_file("/run/cloud-init/status.json")
        parsed_datasource = json.loads(status_file)["v1"]["datasource"]

        if client.settings.PLATFORM in ["lxd_container", "lxd_vm"]:
            if lxd_has_nocloud(client):
                datasource = "DataSourceNoCloud"
            else:
                datasource = "DataSourceLXD"
            assert parsed_datasource.startswith(datasource)
        else:
            platform_datasources = {
                "azure": "DataSourceAzure [seed=/dev/sr0]",
                "ec2": "DataSourceEc2Local",
                "gce": "DataSourceGCELocal",
                "oci": "DataSourceOracle",
                "openstack": "DataSourceOpenStackLocal [net,ver=2]",
            }
            assert (
                platform_datasources[client.settings.PLATFORM]
                == parsed_datasource
            )

    def test_cloud_id_file_symlink(self, class_client: IntegrationInstance):
        cloud_id = class_client.execute("cloud-id").stdout
        expected_link_output = (
            "'/run/cloud-init/cloud-id' -> "
            f"'/run/cloud-init/cloud-id-{cloud_id}'"
        )
        assert expected_link_output == str(
            class_client.execute("stat -c %N /run/cloud-init/cloud-id")
        )

    def test_run_frequency(self, class_client: IntegrationInstance):
        log = class_client.read_from_file("/var/log/cloud-init.log")
        config_dir = Path(cloudinit.config.__file__).parent
        module_paths = glob.glob(str(config_dir / "cc*.py"))
        module_names = [Path(x).stem for x in module_paths]
        found_count = 0
        for name in module_names:
            mod = importlib.import_module(f"cloudinit.config.{name}")
            frequency = mod.meta["frequency"]
            # cc_ gets replaced with config- in logs
            log_name = name.replace("cc_", "config-")
            # Some modules have been filtered out in /etc/cloud/cloud.cfg,
            if f"running {log_name}" in log:
                found_count += 1  # Ensure we're matching on the right text
                assert f"running {log_name} with frequency {frequency}" in log
        assert (
            found_count > 10
        ), "Not enough modules found in log. Did the log message change?"
        assert "with frequency None" not in log

    def _check_common_metadata(self, data):
        assert data["base64_encoded_keys"] == []
        assert data["merged_cfg"] == "redacted for non-root user"

        image_spec = ImageSpecification.from_os_image()
        image_spec = ImageSpecification.from_os_image()
        assert data["sys_info"]["dist"][0] == image_spec.os

        v1_data = data["v1"]
        assert re.match(r"\d\.\d+\.\d+-\d+", v1_data["kernel_release"])
        assert v1_data["variant"] == image_spec.os
        assert v1_data["distro"] == image_spec.os
        assert v1_data["distro_release"] == image_spec.release
        assert v1_data["machine"] == "x86_64"
        assert re.match(r"3.\d+\.\d+", v1_data["python_version"])

    @pytest.mark.lxd_container
    def test_instance_json_lxd(self, class_client: IntegrationInstance):
        client = class_client
        instance_json_file = client.read_from_file(
            "/run/cloud-init/instance-data.json"
        )

        data = json.loads(instance_json_file)
        self._check_common_metadata(data)
        v1_data = data["v1"]
        if not lxd_has_nocloud(client):
            cloud_name = "lxd"
            subplatform = "LXD socket API v. 1.0 (/dev/lxd/sock)"
            # instance-id should be a UUID
            try:
                uuid.UUID(v1_data["instance_id"])
            except ValueError:
                raise AssertionError(
                    f"LXD instance-id is not a UUID: {v1_data['instance_id']}"
                )
        else:
            cloud_name = "unknown"
            subplatform = "seed-dir (/var/lib/cloud/seed/nocloud-net)"
            # Pre-Jammy instance-id and instance.name are synonymous
            assert v1_data["instance_id"] == client.instance.name
        assert v1_data["cloud_name"] == cloud_name
        assert v1_data["subplatform"] == subplatform
        assert v1_data["platform"] == "lxd"
        assert v1_data["cloud_id"] == "lxd"
        assert f"{v1_data['cloud_id']}" == client.read_from_file(
            "/run/cloud-init/cloud-id-lxd"
        )
        assert v1_data["availability_zone"] is None
        assert v1_data["local_hostname"] == client.instance.name
        assert v1_data["region"] is None

    @pytest.mark.lxd_vm
    def test_instance_json_lxd_vm(self, class_client: IntegrationInstance):
        client = class_client
        instance_json_file = client.read_from_file(
            "/run/cloud-init/instance-data.json"
        )

        data = json.loads(instance_json_file)
        self._check_common_metadata(data)
        v1_data = data["v1"]
        if not lxd_has_nocloud(client):
            cloud_name = "lxd"
            subplatform = "LXD socket API v. 1.0 (/dev/lxd/sock)"
            # instance-id should be a UUID
            try:
                uuid.UUID(v1_data["instance_id"])
            except ValueError as e:
                raise AssertionError(
                    f"LXD instance-id is not a UUID: {v1_data['instance_id']}"
                ) from e
            assert v1_data["subplatform"] == subplatform
            assert v1_data["platform"] == "lxd"
            assert v1_data["cloud_id"] == "lxd"
        else:
            cloud_name = "unknown"
            # Pre-Jammy instance-id and instance.name are synonymous
            assert v1_data["instance_id"] == client.instance.name
            assert any(
                [
                    "/var/lib/cloud/seed/nocloud-net"
                    in v1_data["subplatform"],
                    "/dev/sr0" in v1_data["subplatform"],
                ]
            )
            assert v1_data["platform"] in ["lxd", "nocloud"]
            assert v1_data["cloud_id"] in ["lxd", "nocloud"]
        assert v1_data["cloud_name"] == cloud_name
        assert f"{v1_data['cloud_id']}" == client.read_from_file(
            "/run/cloud-init/cloud-id"
        )

        assert v1_data["availability_zone"] is None
        assert v1_data["local_hostname"] == client.instance.name
        assert v1_data["region"] is None

    @pytest.mark.ec2
    def test_instance_json_ec2(self, class_client: IntegrationInstance):
        client = class_client
        instance_json_file = client.read_from_file(
            "/run/cloud-init/instance-data.json"
        )
        data = json.loads(instance_json_file)
        v1_data = data["v1"]
        assert v1_data["cloud_name"] == "aws"
        assert v1_data["platform"] == "ec2"
        # Different regions will show up as aws-(gov|china)
        assert v1_data["cloud_id"].startswith("aws")
        assert f"{v1_data['cloud_id']}" == client.read_from_file(
            "/run/cloud-init/cloud-id-aws"
        )
        assert v1_data["subplatform"].startswith("metadata")
        assert (
            v1_data["availability_zone"] == client.instance.availability_zone
        )
        assert v1_data["instance_id"] == client.instance.name
        assert v1_data["local_hostname"].startswith("ip-")
        assert v1_data["region"] == client.cloud.cloud_instance.region

    @pytest.mark.gce
    def test_instance_json_gce(self, class_client: IntegrationInstance):
        client = class_client
        instance_json_file = client.read_from_file(
            "/run/cloud-init/instance-data.json"
        )
        data = json.loads(instance_json_file)
        self._check_common_metadata(data)
        v1_data = data["v1"]
        assert v1_data["cloud_name"] == "gce"
        assert v1_data["platform"] == "gce"
        assert f"{v1_data['cloud_id']}" == client.read_from_file(
            "/run/cloud-init/cloud-id-gce"
        )
        assert v1_data["subplatform"].startswith("metadata")
        assert v1_data["availability_zone"] == client.instance.zone
        assert v1_data["instance_id"] == client.instance.instance_id
        assert v1_data["local_hostname"] == client.instance.name

    @pytest.mark.lxd_container
    @pytest.mark.azure
    @pytest.mark.gce
    @pytest.mark.ec2
    def test_instance_cloud_id_across_reboot(
        self, class_client: IntegrationInstance
    ):
        client = class_client
        platform = client.settings.PLATFORM
        cloud_id_alias = {"ec2": "aws", "lxd_container": "lxd"}
        cloud_file = f"cloud-id-{cloud_id_alias.get(platform, platform)}"
        assert client.execute(f"test -f /run/cloud-init/{cloud_file}").ok
        assert client.execute("test -f /run/cloud-init/cloud-id").ok
        client.restart()
        assert client.execute(f"test -f /run/cloud-init/{cloud_file}").ok
        assert client.execute("test -f /run/cloud-init/cloud-id").ok


@pytest.mark.user_data(USER_DATA)
class TestCombinedNoCI:
    @retry(tries=30, delay=1)
    def test_ssh_import_id(self, class_client: IntegrationInstance):
        """Integration test for the ssh_import_id module.

        This test specifies ssh keys to be imported by the ``ssh_import_id``
        module and then checks that if the ssh keys were successfully imported.

        TODO:
        * This test assumes that SSH keys will be imported into the
        /home/ubuntu; this will need modification to run on other OSes.
        """
        client = class_client
        ssh_output = client.read_from_file("/home/ubuntu/.ssh/authorized_keys")

        assert "# ssh-import-id lp:smoser" in ssh_output