summaryrefslogtreecommitdiff
path: root/cloudinit/tests/test_subp.py
blob: 911c1f3da4007acd40d15e2f71547430dbcc27ce (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# This file is part of cloud-init. See LICENSE file for license information.

"""Tests for cloudinit.subp utility functions"""

import json
import os
import sys
import stat

from unittest import mock

from cloudinit import subp, util
from cloudinit.tests.helpers import CiTestCase


BASH = subp.which('bash')
BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name'


class TestPrependBaseCommands(CiTestCase):

    with_logs = True

    def test_prepend_base_command_errors_on_neither_string_nor_list(self):
        """Raise an error for each command which is not a string or list."""
        orig_commands = ['ls', 1, {'not': 'gonna work'}, ['basecmd', 'list']]
        with self.assertRaises(TypeError) as context_manager:
            subp.prepend_base_command(
                base_command='basecmd', commands=orig_commands)
        self.assertEqual(
            "Invalid basecmd config. These commands are not a string or"
            " list:\n1\n{'not': 'gonna work'}",
            str(context_manager.exception))

    def test_prepend_base_command_warns_on_non_base_string_commands(self):
        """Warn on each non-base for commands of type string."""
        orig_commands = [
            'ls', 'basecmd list', 'touch /blah', 'basecmd install x']
        fixed_commands = subp.prepend_base_command(
            base_command='basecmd', commands=orig_commands)
        self.assertEqual(
            'WARNING: Non-basecmd commands in basecmd config:\n'
            'ls\ntouch /blah\n',
            self.logs.getvalue())
        self.assertEqual(orig_commands, fixed_commands)

    def test_prepend_base_command_prepends_on_non_base_list_commands(self):
        """Prepend 'basecmd' for each non-basecmd command of type list."""
        orig_commands = [['ls'], ['basecmd', 'list'], ['basecmda', '/blah'],
                         ['basecmd', 'install', 'x']]
        expected = [['basecmd', 'ls'], ['basecmd', 'list'],
                    ['basecmd', 'basecmda', '/blah'],
                    ['basecmd', 'install', 'x']]
        fixed_commands = subp.prepend_base_command(
            base_command='basecmd', commands=orig_commands)
        self.assertEqual('', self.logs.getvalue())
        self.assertEqual(expected, fixed_commands)

    def test_prepend_base_command_removes_first_item_when_none(self):
        """Remove the first element of a non-basecmd when it is None."""
        orig_commands = [[None, 'ls'], ['basecmd', 'list'],
                         [None, 'touch', '/blah'],
                         ['basecmd', 'install', 'x']]
        expected = [['ls'], ['basecmd', 'list'],
                    ['touch', '/blah'],
                    ['basecmd', 'install', 'x']]
        fixed_commands = subp.prepend_base_command(
            base_command='basecmd', commands=orig_commands)
        self.assertEqual('', self.logs.getvalue())
        self.assertEqual(expected, fixed_commands)


class TestSubp(CiTestCase):
    allowed_subp = [BASH, 'cat', CiTestCase.SUBP_SHELL_TRUE,
                    BOGUS_COMMAND, sys.executable]

    stdin2err = [BASH, '-c', 'cat >&2']
    stdin2out = ['cat']
    utf8_invalid = b'ab\xaadef'
    utf8_valid = b'start \xc3\xa9 end'
    utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
    printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--']

    def printf_cmd(self, *args):
        # bash's printf supports \xaa.  So does /usr/bin/printf
        # but by using bash, we remove dependency on another program.
        return([BASH, '-c', 'printf "$@"', 'printf'] + list(args))

    def test_subp_handles_bytestrings(self):
        """subp can run a bytestring command if shell is True."""
        tmp_file = self.tmp_path('test.out')
        cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
        (out, _err) = subp.subp(cmd.encode('utf-8'), shell=True)
        self.assertEqual(u'', out)
        self.assertEqual(u'', _err)
        self.assertEqual('HI MOM\n', util.load_file(tmp_file))

    def test_subp_handles_strings(self):
        """subp can run a string command if shell is True."""
        tmp_file = self.tmp_path('test.out')
        cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
        (out, _err) = subp.subp(cmd, shell=True)
        self.assertEqual(u'', out)
        self.assertEqual(u'', _err)
        self.assertEqual('HI MOM\n', util.load_file(tmp_file))

    def test_subp_handles_utf8(self):
        # The given bytes contain utf-8 accented characters as seen in e.g.
        # the "deja dup" package in Ubuntu.
        cmd = self.printf_cmd(self.utf8_valid_2)
        (out, _err) = subp.subp(cmd, capture=True)
        self.assertEqual(out, self.utf8_valid_2.decode('utf-8'))

    def test_subp_respects_decode_false(self):
        (out, err) = subp.subp(self.stdin2out, capture=True, decode=False,
                               data=self.utf8_valid)
        self.assertTrue(isinstance(out, bytes))
        self.assertTrue(isinstance(err, bytes))
        self.assertEqual(out, self.utf8_valid)

    def test_subp_decode_ignore(self):
        # this executes a string that writes invalid utf-8 to stdout
        (out, _err) = subp.subp(self.printf_cmd('abc\\xaadef'),
                                capture=True, decode='ignore')
        self.assertEqual(out, 'abcdef')

    def test_subp_decode_strict_valid_utf8(self):
        (out, _err) = subp.subp(self.stdin2out, capture=True,
                                decode='strict', data=self.utf8_valid)
        self.assertEqual(out, self.utf8_valid.decode('utf-8'))

    def test_subp_decode_invalid_utf8_replaces(self):
        (out, _err) = subp.subp(self.stdin2out, capture=True,
                                data=self.utf8_invalid)
        expected = self.utf8_invalid.decode('utf-8', 'replace')
        self.assertEqual(out, expected)

    def test_subp_decode_strict_raises(self):
        args = []
        kwargs = {'args': self.stdin2out, 'capture': True,
                  'decode': 'strict', 'data': self.utf8_invalid}
        self.assertRaises(UnicodeDecodeError, subp.subp, *args, **kwargs)

    def test_subp_capture_stderr(self):
        data = b'hello world'
        (out, err) = subp.subp(self.stdin2err, capture=True,
                               decode=False, data=data,
                               update_env={'LC_ALL': 'C'})
        self.assertEqual(err, data)
        self.assertEqual(out, b'')

    def test_subp_reads_env(self):
        with mock.patch.dict("os.environ", values={'FOO': 'BAR'}):
            out, _err = subp.subp(self.printenv + ['FOO'], capture=True)
        self.assertEqual('FOO=BAR', out.splitlines()[0])

    def test_subp_env_and_update_env(self):
        out, _err = subp.subp(
            self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
            env={'FOO': 'BAR'},
            update_env={'HOME': '/myhome', 'K2': 'V2'})
        self.assertEqual(
            ['FOO=BAR', 'HOME=/myhome', 'K1=', 'K2=V2'], out.splitlines())

    def test_subp_update_env(self):
        extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'}
        with mock.patch.dict("os.environ", values=extra):
            out, _err = subp.subp(
                self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
                update_env={'HOME': '/myhome', 'K2': 'V2'})

        self.assertEqual(
            ['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines())

    def test_subp_warn_missing_shebang(self):
        """Warn on no #! in script"""
        noshebang = self.tmp_path('noshebang')
        util.write_file(noshebang, 'true\n')

        print("os is %s" % os)
        os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
        with self.allow_subp([noshebang]):
            self.assertRaisesRegex(subp.ProcessExecutionError,
                                   r'Missing #! in script\?',
                                   subp.subp, (noshebang,))

    def test_subp_combined_stderr_stdout(self):
        """Providing combine_capture as True redirects stderr to stdout."""
        data = b'hello world'
        (out, err) = subp.subp(self.stdin2err, capture=True,
                               combine_capture=True, decode=False, data=data)
        self.assertEqual(b'', err)
        self.assertEqual(data, out)

    def test_returns_none_if_no_capture(self):
        (out, err) = subp.subp(self.stdin2out, data=b'', capture=False)
        self.assertIsNone(err)
        self.assertIsNone(out)

    def test_exception_has_out_err_are_bytes_if_decode_false(self):
        """Raised exc should have stderr, stdout as bytes if no decode."""
        with self.assertRaises(subp.ProcessExecutionError) as cm:
            subp.subp([BOGUS_COMMAND], decode=False)
        self.assertTrue(isinstance(cm.exception.stdout, bytes))
        self.assertTrue(isinstance(cm.exception.stderr, bytes))

    def test_exception_has_out_err_are_bytes_if_decode_true(self):
        """Raised exc should have stderr, stdout as string if no decode."""
        with self.assertRaises(subp.ProcessExecutionError) as cm:
            subp.subp([BOGUS_COMMAND], decode=True)
        self.assertTrue(isinstance(cm.exception.stdout, str))
        self.assertTrue(isinstance(cm.exception.stderr, str))

    def test_bunch_of_slashes_in_path(self):
        self.assertEqual("/target/my/path/",
                         subp.target_path("/target/", "//my/path/"))
        self.assertEqual("/target/my/path/",
                         subp.target_path("/target/", "///my/path/"))

    def test_c_lang_can_take_utf8_args(self):
        """Independent of system LC_CTYPE, args can contain utf-8 strings.

        When python starts up, its default encoding gets set based on
        the value of LC_CTYPE.  If no system locale is set, the default
        encoding for both python2 and python3 in some paths will end up
        being ascii.

        Attempts to use setlocale or patching (or changing) os.environ
        in the current environment seem to not be effective.

        This test starts up a python with LC_CTYPE set to C so that
        the default encoding will be set to ascii.  In such an environment
        Popen(['command', 'non-ascii-arg']) would cause a UnicodeDecodeError.
        """
        python_prog = '\n'.join([
            'import json, sys',
            'from cloudinit.subp import subp',
            'data = sys.stdin.read()',
            'cmd = json.loads(data)',
            'subp(cmd, capture=False)',
            ''])
        cmd = [BASH, '-c', 'echo -n "$@"', '--',
               self.utf8_valid.decode("utf-8")]
        python_subp = [sys.executable, '-c', python_prog]

        out, _err = subp.subp(
            python_subp, update_env={'LC_CTYPE': 'C'},
            data=json.dumps(cmd).encode("utf-8"),
            decode=False)
        self.assertEqual(self.utf8_valid, out)

    def test_bogus_command_logs_status_messages(self):
        """status_cb gets status messages logs on bogus commands provided."""
        logs = []

        def status_cb(log):
            logs.append(log)

        with self.assertRaises(subp.ProcessExecutionError):
            subp.subp([BOGUS_COMMAND], status_cb=status_cb)

        expected = [
            'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND),
            'ERROR: End run command: invalid command provided\n']
        self.assertEqual(expected, logs)

    def test_command_logs_exit_codes_to_status_cb(self):
        """status_cb gets status messages containing command exit code."""
        logs = []

        def status_cb(log):
            logs.append(log)

        with self.assertRaises(subp.ProcessExecutionError):
            subp.subp([BASH, '-c', 'exit 2'], status_cb=status_cb)
        subp.subp([BASH, '-c', 'exit 0'], status_cb=status_cb)

        expected = [
            'Begin run command: %s -c exit 2\n' % BASH,
            'ERROR: End run command: exit(2)\n',
            'Begin run command: %s -c exit 0\n' % BASH,
            'End run command: exit(0)\n']
        self.assertEqual(expected, logs)


# vi: ts=4 expandtab