summaryrefslogtreecommitdiff
path: root/tests/unittests/test_handler/test_handler_write_files.py
blob: 466e45f8d78cd9ab09389ab68d5030ea27259d7a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from cloudinit.config.cc_write_files import write_files
from cloudinit import log as logging
from cloudinit import util

from ..helpers import FilesystemMockingTestCase

import base64
import gzip
import shutil
import six
import tempfile

LOG = logging.getLogger(__name__)

YAML_TEXT = """
write_files:
 - encoding: gzip
   content: !!binary |
     H4sIAIDb/U8C/1NW1E/KzNMvzuBKTc7IV8hIzcnJVyjPL8pJ4QIA6N+MVxsAAAA=
   path: /usr/bin/hello
   permissions: '0755'
 - content: !!binary |
     Zm9vYmFyCg==
   path: /wark
   permissions: '0755'
 - content: |
    hi mom line 1
    hi mom line 2
   path: /tmp/message
"""

YAML_CONTENT_EXPECTED = {
    '/usr/bin/hello': "#!/bin/sh\necho hello world\n",
    '/wark': "foobar\n",
    '/tmp/message': "hi mom line 1\nhi mom line 2\n",
}


class TestWriteFiles(FilesystemMockingTestCase):
    def setUp(self):
        super(TestWriteFiles, self).setUp()
        self.tmp = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, self.tmp)

    def test_simple(self):
        self.patchUtils(self.tmp)
        expected = "hello world\n"
        filename = "/tmp/my.file"
        write_files(
            "test_simple", [{"content": expected, "path": filename}], LOG)
        self.assertEqual(util.load_file(filename), expected)

    def test_yaml_binary(self):
        self.patchUtils(self.tmp)
        data = util.load_yaml(YAML_TEXT)
        write_files("testname", data['write_files'], LOG)
        for path, content in YAML_CONTENT_EXPECTED.items():
            self.assertEqual(util.load_file(path), content)

    def test_all_decodings(self):
        self.patchUtils(self.tmp)

        # build a 'files' array that has a dictionary of encodings
        # for 'gz', 'gzip', 'gz+base64' ...
        data = b"foobzr"
        utf8_valid = b"foobzr"
        utf8_invalid = b'ab\xaadef'
        files = []
        expected = []

        gz_aliases = ('gz', 'gzip')
        gz_b64_aliases = ('gz+base64', 'gzip+base64', 'gz+b64', 'gzip+b64')
        b64_aliases = ('base64', 'b64')

        datum = (("utf8", utf8_valid), ("no-utf8", utf8_invalid))
        for name, data in datum:
            gz = (_gzip_bytes(data), gz_aliases)
            gz_b64 = (base64.b64encode(_gzip_bytes(data)), gz_b64_aliases)
            b64 = (base64.b64encode(data), b64_aliases)
            for content, aliases in (gz, gz_b64, b64):
                for enc in aliases:
                    cur = {'content': content,
                           'path': '/tmp/file-%s-%s' % (name, enc),
                           'encoding': enc}
                    files.append(cur)
                    expected.append((cur['path'], data))

        write_files("test_decoding", files, LOG)

        for path, content in expected:
            self.assertEqual(util.load_file(path, decode=False), content)

        # make sure we actually wrote *some* files.
        flen_expected = (
            len(gz_aliases + gz_b64_aliases + b64_aliases) * len(datum))
        self.assertEqual(len(expected), flen_expected)


def _gzip_bytes(data):
    buf = six.BytesIO()
    fp = None
    try:
        fp = gzip.GzipFile(fileobj=buf, mode="wb")
        fp.write(data)
        fp.close()
        return buf.getvalue()
    finally:
        if fp:
            fp.close()


# vi: ts=4 expandtab