summaryrefslogtreecommitdiff
path: root/tests/unittests/config/test_cc_runcmd.py
blob: 9fdc575b3ba24d2d099c2e22d7d3f5a2ef6873d7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# This file is part of cloud-init. See LICENSE file for license information.
import logging
import os
import stat
from unittest.mock import patch

import pytest

from cloudinit import helpers, subp, util
from cloudinit.config.cc_runcmd import handle
from cloudinit.config.schema import (
    SchemaValidationError,
    get_schema,
    validate_cloudconfig_schema,
)
from tests.unittests.helpers import (
    FilesystemMockingTestCase,
    skipUnlessJsonSchema,
)
from tests.unittests.util import get_cloud

LOG = logging.getLogger(__name__)


class TestRuncmd(FilesystemMockingTestCase):

    with_logs = True

    def setUp(self):
        super(TestRuncmd, self).setUp()
        self.subp = subp.subp
        self.new_root = self.tmp_dir()
        self.patchUtils(self.new_root)
        self.paths = helpers.Paths({"scripts": self.new_root})

    def test_handler_skip_if_no_runcmd(self):
        """When the provided config doesn't contain runcmd, skip it."""
        cfg = {}
        mycloud = get_cloud(paths=self.paths)
        handle("notimportant", cfg, mycloud, None)
        self.assertIn(
            "Skipping module named notimportant, no 'runcmd' key",
            self.logs.getvalue(),
        )

    @patch("cloudinit.util.shellify")
    def test_runcmd_shellify_fails(self, cls):
        """When shellify fails throw exception"""
        cls.side_effect = TypeError("patched shellify")
        valid_config = {"runcmd": ["echo 42"]}
        cc = get_cloud(paths=self.paths)
        with self.assertRaises(TypeError) as cm:
            with self.allow_subp(["/bin/sh"]):
                handle("cc_runcmd", valid_config, cc, None)
        self.assertIn("Failed to shellify", str(cm.exception))

    def test_handler_invalid_command_set(self):
        """Commands which can't be converted to shell will raise errors."""
        invalid_config = {"runcmd": 1}
        cc = get_cloud(paths=self.paths)
        with self.assertRaises(TypeError) as cm:
            handle("cc_runcmd", invalid_config, cc, [])
        self.assertIn(
            "Failed to shellify 1 into file"
            " /var/lib/cloud/instances/iid-datasource-none/scripts/runcmd",
            str(cm.exception),
        )

    def test_handler_write_valid_runcmd_schema_to_file(self):
        """Valid runcmd schema is written to a runcmd shell script."""
        valid_config = {"runcmd": [["ls", "/"]]}
        cc = get_cloud(paths=self.paths)
        handle("cc_runcmd", valid_config, cc, [])
        runcmd_file = os.path.join(
            self.new_root,
            "var/lib/cloud/instances/iid-datasource-none/scripts/runcmd",
        )
        self.assertEqual("#!/bin/sh\n'ls' '/'\n", util.load_file(runcmd_file))
        file_stat = os.stat(runcmd_file)
        self.assertEqual(0o700, stat.S_IMODE(file_stat.st_mode))


@skipUnlessJsonSchema()
class TestRunCmdSchema:
    @pytest.mark.parametrize(
        "config, error_msg",
        (
            # Ensure duplicate commands are valid
            ({"runcmd": [["echo", "bye"], ["echo", "bye"]]}, None),
            ({"runcmd": ["echo bye", "echo bye"]}, None),
            # Invalid schemas
            ({"runcmd": 1}, "1 is not of type 'array'"),
            ({"runcmd": []}, r"runcmd: \[\] is too short"),
            (
                {
                    "runcmd": [
                        "ls /",
                        20,
                        ["wget", "http://stuff/blah"],
                        {"a": "n"},
                    ]
                },
                "",
            ),
        ),
    )
    @skipUnlessJsonSchema()
    def test_schema_validation(self, config, error_msg):
        if error_msg is None:
            validate_cloudconfig_schema(config, get_schema(), strict=True)
        else:
            with pytest.raises(SchemaValidationError, match=error_msg):
                validate_cloudconfig_schema(config, get_schema(), strict=True)


# vi: ts=4 expandtab