summaryrefslogtreecommitdiff
path: root/tests/unittests/config/test_cc_snap.py
blob: 432f72cee5f9c88bef652c7512dbbd100858e39c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# This file is part of cloud-init. See LICENSE file for license information.

import logging
import os
import re
from io import StringIO

import pytest

from cloudinit import helpers, util
from cloudinit.config.cc_snap import add_assertions, handle, run_commands
from cloudinit.config.schema import (
    SchemaValidationError,
    get_schema,
    validate_cloudconfig_schema,
)
from tests.unittests.helpers import CiTestCase, mock, skipUnlessJsonSchema
from tests.unittests.util import get_cloud

M_PATH = "cloudinit.config.cc_snap."
ASSERTIONS_FILE = "/var/lib/cloud/instance/snapd.assertions"

SYSTEM_USER_ASSERTION = """\
type: system-user
authority-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
brand-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
email: foo@bar.com
password: $6$E5YiAuMIPAwX58jG$miomhVNui/vf7f/3ctB/f0RWSKFxG0YXzrJ9rtJ1ikvzt
series:
- 16
since: 2016-09-10T16:34:00+03:00
until: 2017-11-10T16:34:00+03:00
username: baz
sign-key-sha3-384: RuVvnp4n52GilycjfbbTCI3_L8Y6QlIE75wxMc0KzGV3AUQqVd9GuXoj

AcLBXAQAAQoABgUCV/UU1wAKCRBKnlMoJQLkZVeLD/9/+hIeVywtzsDA3oxl+P+u9D13y9s6svP
Jd6Wnf4FTw6sq1GjBE4ZA7lrwSaRCUJ9Vcsvf2q9OGPY7mOb2TBxaDe0PbUMjrSrqllSSQwhpNI
zG+NxkkKuxsUmLzFa+k9m6cyojNbw5LFhQZBQCGlr3JYqC0tIREq/UsZxj+90TUC87lDJwkU8GF
s4CR+rejZj4itIcDcVxCSnJH6hv6j2JrJskJmvObqTnoOlcab+JXdamXqbldSP3UIhWoyVjqzkj
+to7mXgx+cCUA9+ngNCcfUG+1huGGTWXPCYkZ78HvErcRlIdeo4d3xwtz1cl/w3vYnq9og1XwsP
Yfetr3boig2qs1Y+j/LpsfYBYncgWjeDfAB9ZZaqQz/oc8n87tIPZDJHrusTlBfop8CqcM4xsKS
d+wnEY8e/F24mdSOYmS1vQCIDiRU3MKb6x138Ud6oHXFlRBbBJqMMctPqWDunWzb5QJ7YR0I39q
BrnEqv5NE0G7w6HOJ1LSPG5Hae3P4T2ea+ATgkb03RPr3KnXnzXg4TtBbW1nytdlgoNc/BafE1H
f3NThcq9gwX4xWZ2PAWnqVPYdDMyCtzW3Ck+o6sIzx+dh4gDLPHIi/6TPe/pUuMop9CBpWwez7V
v1z+1+URx6Xlq3Jq18y5pZ6fY3IDJ6km2nQPMzcm4Q=="""

ACCOUNT_ASSERTION = """\
type: account-key
authority-id: canonical
revision: 2
public-key-sha3-384: BWDEoaqyr25nF5SNCvEv2v7QnM9QsfCc0PBMYD_i2NGSQ32EF2d4D0
account-id: canonical
name: store
since: 2016-04-01T00:00:00.0Z
body-length: 717
sign-key-sha3-384: -CvQKAwRQ5h3Ffn10FILJoEZUXOv6km9FwA80-Rcj-f-6jadQ89VRswH

AcbBTQRWhcGAARAA0KKYYQWuHOrsFVi4p4l7ZzSvX7kLgJFFeFgOkzdWKBTHEnsMKjl5mefFe9j
qe8NlmJdfY7BenP7XeBtwKp700H/t9lLrZbpTNAPHXYxEWFJp5bPqIcJYBZ+29oLVLN1Tc5X482
vCiDqL8+pPYqBrK2fNlyPlNNSum9wI70rDDL4r6FVvr+osTnGejibdV8JphWX+lrSQDnRSdM8KJ
UM43vTgLGTi9W54oRhsA2OFexRfRksTrnqGoonCjqX5wO3OFSaMDzMsO2MJ/hPfLgDqw53qjzuK
Iec9OL3k5basvu2cj5u9tKwVFDsCKK2GbKUsWWpx2KTpOifmhmiAbzkTHbH9KaoMS7p0kJwhTQG
o9aJ9VMTWHJc/NCBx7eu451u6d46sBPCXS/OMUh2766fQmoRtO1OwCTxsRKG2kkjbMn54UdFULl
VfzvyghMNRKIezsEkmM8wueTqGUGZWa6CEZqZKwhe/PROxOPYzqtDH18XZknbU1n5lNb7vNfem9
2ai+3+JyFnW9UhfvpVF7gzAgdyCqNli4C6BIN43uwoS8HkykocZS/+Gv52aUQ/NZ8BKOHLw+7an
Q0o8W9ltSLZbEMxFIPSN0stiZlkXAp6DLyvh1Y4wXSynDjUondTpej2fSvSlCz/W5v5V7qA4nIc
vUvV7RjVzv17ut0AEQEAAQ==

AcLDXAQAAQoABgUCV83k9QAKCRDUpVvql9g3IBT8IACKZ7XpiBZ3W4lqbPssY6On81WmxQLtvsM
WTp6zZpl/wWOSt2vMNUk9pvcmrNq1jG9CuhDfWFLGXEjcrrmVkN3YuCOajMSPFCGrxsIBLSRt/b
nrKykdLAAzMfG8rP1d82bjFFiIieE+urQ0Kcv09Jtdvavq3JT1Tek5mFyyfhHNlQEKOzWqmRWiL
3c3VOZUs1ZD8TSlnuq/x+5T0X0YtOyGjSlVxk7UybbyMNd6MZfNaMpIG4x+mxD3KHFtBAC7O6kL
eX3i6j5nCY5UABfA3DZEAkWP4zlmdBEOvZ9t293NaDdOpzsUHRkoi0Zez/9BHQ/kwx/uNc2WqrY
inCmu16JGNeXqsyinnLl7Ghn2RwhvDMlLxF6RTx8xdx1yk6p3PBTwhZMUvuZGjUtN/AG8BmVJQ1
rsGSRkkSywvnhVJRB2sudnrMBmNS2goJbzSbmJnOlBrd2WsV0T9SgNMWZBiov3LvU4o2SmAb6b+
rYwh8H5QHcuuYJuxDjFhPswIp6Wes5T6hUicf3SWtObcDS4HSkVS4ImBjjX9YgCuFy7QdnooOWE
aPvkRw3XCVeYq0K6w9GRsk1YFErD4XmXXZjDYY650MX9v42Sz5MmphHV8jdIY5ssbadwFSe2rCQ
6UX08zy7RsIb19hTndE6ncvSNDChUR9eEnCm73eYaWTWTnq1cxdVP/s52r8uss++OYOkPWqh5nO
haRn7INjH/yZX4qXjNXlTjo0PnHH0q08vNKDwLhxS+D9du+70FeacXFyLIbcWllSbJ7DmbumGpF
yYbtj3FDDPzachFQdIG3lSt+cSUGeyfSs6wVtc3cIPka/2Urx7RprfmoWSI6+a5NcLdj0u2z8O9
HxeIgxDpg/3gT8ZIuFKePMcLDM19Fh/p0ysCsX+84B9chNWtsMSmIaE57V+959MVtsLu7SLb9gi
skrju0pQCwsu2wHMLTNd1f3PTHmrr49hxetTus07HSQUApMtAGKzQilF5zqFjbyaTd4xgQbd+PK
CjFyzQTDOcUhXpuUGt/IzlqiFfsCsmbj2K4KdSNYMlqIgZ3Azu8KvZLIhsyN7v5vNIZSPfEbjde
ClU9r0VRiJmtYBUjcSghD9LWn+yRLwOxhfQVjm0cBwIt5R/yPF/qC76yIVuWUtM5Y2/zJR1J8OF
qWchvlImHtvDzS9FQeLyzJAOjvZ2CnWp2gILgUz0WQdOk1Dq8ax7KS9BQ42zxw9EZAEPw3PEFqR
IQsRTONp+iVS8YxSmoYZjDlCgRMWUmawez/Fv5b9Fb/XkO5Eq4e+KfrpUujXItaipb+tV8h5v3t
oG3Ie3WOHrVjCLXIdYslpL1O4nadqR6Xv58pHj6k"""


@pytest.fixture()
def fake_cloud(tmpdir):
    paths = helpers.Paths(
        {
            "cloud_dir": tmpdir.join("cloud"),
            "run_dir": tmpdir.join("cloud-init"),
            "templates_dir": tmpdir.join("templates"),
        }
    )
    cloud = get_cloud(paths=paths)
    yield cloud


class TestAddAssertions:
    @mock.patch("cloudinit.config.cc_snap.subp.subp")
    def test_add_assertions_on_empty_list(self, m_subp, caplog, tmpdir):
        """When provided with an empty list, add_assertions does nothing."""
        assert_file = tmpdir.join("snapd.assertions")
        add_assertions([], assert_file)
        assert not caplog.text
        assert 0 == m_subp.call_count

    def test_add_assertions_on_non_list_or_dict(self, tmpdir):
        """When provided an invalid type, add_assertions raises an error."""
        assert_file = tmpdir.join("snapd.assertions")
        with pytest.raises(
            TypeError,
            match="assertion parameter was not a list or dict: I'm Not Valid",
        ):
            add_assertions("I'm Not Valid", assert_file)

    @mock.patch("cloudinit.config.cc_snap.subp.subp")
    def test_add_assertions_adds_assertions_as_list(
        self, m_subp, caplog, tmpdir
    ):
        """When provided with a list, add_assertions adds all assertions."""
        assert_file = tmpdir.join("snapd.assertions")
        assertions = [SYSTEM_USER_ASSERTION, ACCOUNT_ASSERTION]
        add_assertions(assertions, assert_file)
        assert "Importing user-provided snap assertions" in caplog.text
        assert "sertions" in caplog.text
        assert [
            mock.call(["snap", "ack", assert_file], capture=True)
        ] == m_subp.call_args_list
        compare_file = tmpdir.join("comparison")
        util.write_file(compare_file, "\n".join(assertions).encode("utf-8"))
        assert util.load_file(compare_file) == util.load_file(assert_file)

    @mock.patch("cloudinit.config.cc_snap.subp.subp")
    def test_add_assertions_adds_assertions_as_dict(
        self, m_subp, caplog, tmpdir
    ):
        """When provided with a dict, add_assertions adds all assertions."""
        assert_file = tmpdir.join("snapd.assertions")
        assertions = {"00": SYSTEM_USER_ASSERTION, "01": ACCOUNT_ASSERTION}
        add_assertions(assertions, assert_file)
        assert "Importing user-provided snap assertions" in caplog.text
        assert (
            M_PATH[:-1],
            logging.DEBUG,
            "Snap acking: ['type: system-user', 'authority-id: "
            "LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp']",
        ) in caplog.record_tuples
        assert (
            M_PATH[:-1],
            logging.DEBUG,
            "Snap acking: ['type: account-key', 'authority-id: canonical']",
        ) in caplog.record_tuples
        assert [
            mock.call(["snap", "ack", assert_file], capture=True)
        ] == m_subp.call_args_list
        compare_file = tmpdir.join("comparison")
        combined = "\n".join(assertions.values())
        util.write_file(compare_file, combined.encode("utf-8"))
        assert util.load_file(compare_file) == util.load_file(assert_file)


class TestRunCommands(CiTestCase):

    with_logs = True
    allowed_subp = [CiTestCase.SUBP_SHELL_TRUE]

    def setUp(self):
        super(TestRunCommands, self).setUp()
        self.tmp = self.tmp_dir()

    @mock.patch("cloudinit.config.cc_snap.subp.subp")
    def test_run_commands_on_empty_list(self, m_subp):
        """When provided with an empty list, run_commands does nothing."""
        run_commands([])
        self.assertEqual("", self.logs.getvalue())
        m_subp.assert_not_called()

    def test_run_commands_on_non_list_or_dict(self):
        """When provided an invalid type, run_commands raises an error."""
        with self.assertRaises(TypeError) as context_manager:
            run_commands(commands="I'm Not Valid")
        self.assertEqual(
            "commands parameter was not a list or dict: I'm Not Valid",
            str(context_manager.exception),
        )

    def test_run_command_logs_commands_and_exit_codes_to_stderr(self):
        """All exit codes are logged to stderr."""
        outfile = self.tmp_path("output.log", dir=self.tmp)

        cmd1 = 'echo "HI" >> %s' % outfile
        cmd2 = "bogus command"
        cmd3 = 'echo "MOM" >> %s' % outfile
        commands = [cmd1, cmd2, cmd3]

        mock_path = "cloudinit.config.cc_snap.sys.stderr"
        with mock.patch(mock_path, new_callable=StringIO) as m_stderr:
            with self.assertRaises(RuntimeError) as context_manager:
                run_commands(commands=commands)

        self.assertIsNotNone(
            re.search(
                r"bogus: (command )?not found", str(context_manager.exception)
            ),
            msg="Expected bogus command not found",
        )
        expected_stderr_log = "\n".join(
            [
                "Begin run command: {cmd}".format(cmd=cmd1),
                "End run command: exit(0)",
                "Begin run command: {cmd}".format(cmd=cmd2),
                "ERROR: End run command: exit(127)",
                "Begin run command: {cmd}".format(cmd=cmd3),
                "End run command: exit(0)\n",
            ]
        )
        self.assertEqual(expected_stderr_log, m_stderr.getvalue())

    def test_run_command_as_lists(self):
        """When commands are specified as a list, run them in order."""
        outfile = self.tmp_path("output.log", dir=self.tmp)

        cmd1 = 'echo "HI" >> %s' % outfile
        cmd2 = 'echo "MOM" >> %s' % outfile
        commands = [cmd1, cmd2]
        mock_path = "cloudinit.config.cc_snap.sys.stderr"
        with mock.patch(mock_path, new_callable=StringIO):
            run_commands(commands=commands)

        self.assertIn(
            "DEBUG: Running user-provided snap commands", self.logs.getvalue()
        )
        self.assertEqual("HI\nMOM\n", util.load_file(outfile))
        self.assertIn(
            "WARNING: Non-snap commands in snap config:", self.logs.getvalue()
        )

    def test_run_command_dict_sorted_as_command_script(self):
        """When commands are a dict, sort them and run."""
        outfile = self.tmp_path("output.log", dir=self.tmp)
        cmd1 = 'echo "HI" >> %s' % outfile
        cmd2 = 'echo "MOM" >> %s' % outfile
        commands = {"02": cmd1, "01": cmd2}
        mock_path = "cloudinit.config.cc_snap.sys.stderr"
        with mock.patch(mock_path, new_callable=StringIO):
            run_commands(commands=commands)

        expected_messages = ["DEBUG: Running user-provided snap commands"]
        for message in expected_messages:
            self.assertIn(message, self.logs.getvalue())
        self.assertEqual("MOM\nHI\n", util.load_file(outfile))


@skipUnlessJsonSchema()
class TestSnapSchema:
    @pytest.mark.parametrize(
        "config, error_msg",
        [
            # Valid
            ({"snap": {"commands": ["valid"]}}, None),
            ({"snap": {"commands": {"01": "also valid"}}}, None),
            ({"snap": {"assertions": ["valid"]}}, None),
            ({"snap": {"assertions": {"01": "also valid"}}}, None),
            ({"commands": [["echo", "bye"], ["echo", "bye"]]}, None),
            ({"commands": ["echo bye", "echo bye"]}, None),
            (
                {"commands": {"00": ["echo", "bye"], "01": ["echo", "bye"]}},
                None,
            ),
            ({"commands": {"00": "echo bye", "01": "echo bye"}}, None),
            # Invalid
            ({"snap": "wrong type"}, "'wrong type' is not of type 'object'"),
            (
                {"snap": {"commands": ["ls"], "invalid-key": ""}},
                "Additional properties are not allowed",
            ),
            ({"snap": {}}, "{} does not have enough properties"),
            (
                {"snap": {"commands": "broken"}},
                "'broken' is not of type 'object', 'array'",
            ),
            ({"snap": {"commands": []}}, r"snap.commands: \[\] is too short"),
            (
                {"snap": {"commands": {}}},
                r"snap.commands: {} does not have enough properties",
            ),
            ({"snap": {"commands": [123]}}, ""),
            ({"snap": {"commands": {"01": 123}}}, ""),
            ({"snap": {"commands": [["snap", "install", 123]]}}, ""),
            ({"snap": {"commands": {"01": ["snap", "install", 123]}}}, ""),
            ({"snap": {"assertions": [123]}}, "123 is not of type 'string'"),
            (
                {"snap": {"assertions": {"01": 123}}},
                "123 is not of type 'string'",
            ),
            (
                {"snap": {"assertions": "broken"}},
                "'broken' is not of type 'object', 'array'",
            ),
            ({"snap": {"assertions": []}}, r"\[\] is too short"),
            (
                {"snap": {"assertions": {}}},
                r"\{} does not have enough properties",
            ),
        ],
    )
    @skipUnlessJsonSchema()
    def test_schema_validation(self, config, error_msg):
        if error_msg is None:
            validate_cloudconfig_schema(config, get_schema(), strict=True)
        else:
            with pytest.raises(SchemaValidationError, match=error_msg):
                validate_cloudconfig_schema(config, get_schema(), strict=True)


class TestHandle:
    @mock.patch("cloudinit.config.cc_snap.subp.subp")
    def test_handle_adds_assertions(self, m_subp, fake_cloud, tmpdir):
        """Any configured snap assertions are provided to add_assertions."""
        assert_file = os.path.join(
            fake_cloud.paths.get_ipath_cur(), "snapd.assertions"
        )
        compare_file = tmpdir.join("comparison")
        cfg = {
            "snap": {"assertions": [SYSTEM_USER_ASSERTION, ACCOUNT_ASSERTION]}
        }
        handle("snap", cfg=cfg, cloud=fake_cloud, log=mock.Mock(), args=None)
        content = "\n".join(cfg["snap"]["assertions"])
        util.write_file(compare_file, content.encode("utf-8"))
        assert util.load_file(compare_file) == util.load_file(assert_file)


# vi: ts=4 expandtab