summaryrefslogtreecommitdiff
path: root/tests/unittests/test_data.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_data.py')
-rw-r--r--tests/unittests/test_data.py576
1 files changed, 0 insertions, 576 deletions
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
deleted file mode 100644
index 13db8a4c..00000000
--- a/tests/unittests/test_data.py
+++ /dev/null
@@ -1,576 +0,0 @@
-"""Tests for handling of userdata within cloud init."""
-
-import gzip
-import logging
-import os
-import shutil
-import tempfile
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-
-from six import BytesIO, StringIO
-
-from email import encoders
-from email.mime.application import MIMEApplication
-from email.mime.base import MIMEBase
-from email.mime.multipart import MIMEMultipart
-
-from cloudinit import handlers
-from cloudinit import helpers as c_helpers
-from cloudinit import log
-from cloudinit.settings import (PER_INSTANCE)
-from cloudinit import sources
-from cloudinit import stages
-from cloudinit import user_data as ud
-from cloudinit import util
-
-from . import helpers
-
-
-INSTANCE_ID = "i-testing"
-
-
-class FakeDataSource(sources.DataSource):
-
- def __init__(self, userdata=None, vendordata=None):
- sources.DataSource.__init__(self, {}, None, None)
- self.metadata = {'instance-id': INSTANCE_ID}
- self.userdata_raw = userdata
- self.vendordata_raw = vendordata
-
-
-def count_messages(root):
- am = 0
- for m in root.walk():
- if ud.is_skippable(m):
- continue
- am += 1
- return am
-
-
-def gzip_text(text):
- contents = BytesIO()
- f = gzip.GzipFile(fileobj=contents, mode='wb')
- f.write(util.encode_text(text))
- f.flush()
- f.close()
- return contents.getvalue()
-
-
-# FIXME: these tests shouldn't be checking log output??
-# Weirddddd...
-class TestConsumeUserData(helpers.FilesystemMockingTestCase):
-
- def setUp(self):
- super(TestConsumeUserData, self).setUp()
- self._log = None
- self._log_file = None
- self._log_handler = None
-
- def tearDown(self):
- if self._log_handler and self._log:
- self._log.removeHandler(self._log_handler)
- helpers.FilesystemMockingTestCase.tearDown(self)
-
- def _patchIn(self, root):
- self.patchOS(root)
- self.patchUtils(root)
-
- def capture_log(self, lvl=logging.DEBUG):
- log_file = StringIO()
- self._log_handler = logging.StreamHandler(log_file)
- self._log_handler.setLevel(lvl)
- self._log = log.getLogger()
- self._log.addHandler(self._log_handler)
- return log_file
-
- def test_simple_jsonp(self):
- blob = '''
-#cloud-config-jsonp
-[
- { "op": "add", "path": "/baz", "value": "qux" },
- { "op": "add", "path": "/bar", "value": "qux2" }
-]
-'''
-
- ci = stages.Init()
- ci.datasource = FakeDataSource(blob)
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self.patchUtils(new_root)
- self.patchOS(new_root)
- ci.fetch()
- ci.consume_data()
- cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
- cc = util.load_yaml(cc_contents)
- self.assertEqual(2, len(cc))
- self.assertEqual('qux', cc['baz'])
- self.assertEqual('qux2', cc['bar'])
-
- def test_simple_jsonp_vendor_and_user(self):
- # test that user-data wins over vendor
- user_blob = '''
-#cloud-config-jsonp
-[
- { "op": "add", "path": "/baz", "value": "qux" },
- { "op": "add", "path": "/bar", "value": "qux2" }
-]
-'''
- vendor_blob = '''
-#cloud-config-jsonp
-[
- { "op": "add", "path": "/baz", "value": "quxA" },
- { "op": "add", "path": "/bar", "value": "quxB" },
- { "op": "add", "path": "/foo", "value": "quxC" }
-]
-'''
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self._patchIn(new_root)
- initer = stages.Init()
- initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
- initer.read_cfg()
- initer.initialize()
- initer.fetch()
- initer.instancify()
- initer.update()
- initer.cloudify().run('consume_data',
- initer.consume_data,
- args=[PER_INSTANCE],
- freq=PER_INSTANCE)
- mods = stages.Modules(initer)
- (_which_ran, _failures) = mods.run_section('cloud_init_modules')
- cfg = mods.cfg
- self.assertIn('vendor_data', cfg)
- self.assertEqual('qux', cfg['baz'])
- self.assertEqual('qux2', cfg['bar'])
- self.assertEqual('quxC', cfg['foo'])
-
- def test_simple_jsonp_no_vendor_consumed(self):
- # make sure that vendor data is not consumed
- user_blob = '''
-#cloud-config-jsonp
-[
- { "op": "add", "path": "/baz", "value": "qux" },
- { "op": "add", "path": "/bar", "value": "qux2" },
- { "op": "add", "path": "/vendor_data", "value": {"enabled": "false"}}
-]
-'''
- vendor_blob = '''
-#cloud-config-jsonp
-[
- { "op": "add", "path": "/baz", "value": "quxA" },
- { "op": "add", "path": "/bar", "value": "quxB" },
- { "op": "add", "path": "/foo", "value": "quxC" }
-]
-'''
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self._patchIn(new_root)
- initer = stages.Init()
- initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
- initer.read_cfg()
- initer.initialize()
- initer.fetch()
- initer.instancify()
- initer.update()
- initer.cloudify().run('consume_data',
- initer.consume_data,
- args=[PER_INSTANCE],
- freq=PER_INSTANCE)
- mods = stages.Modules(initer)
- (_which_ran, _failures) = mods.run_section('cloud_init_modules')
- cfg = mods.cfg
- self.assertEqual('qux', cfg['baz'])
- self.assertEqual('qux2', cfg['bar'])
- self.assertNotIn('foo', cfg)
-
- def test_mixed_cloud_config(self):
- blob_cc = '''
-#cloud-config
-a: b
-c: d
-'''
- message_cc = MIMEBase("text", "cloud-config")
- message_cc.set_payload(blob_cc)
-
- blob_jp = '''
-#cloud-config-jsonp
-[
- { "op": "replace", "path": "/a", "value": "c" },
- { "op": "remove", "path": "/c" }
-]
-'''
-
- message_jp = MIMEBase('text', "cloud-config-jsonp")
- message_jp.set_payload(blob_jp)
-
- message = MIMEMultipart()
- message.attach(message_cc)
- message.attach(message_jp)
-
- ci = stages.Init()
- ci.datasource = FakeDataSource(str(message))
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self.patchUtils(new_root)
- self.patchOS(new_root)
- ci.fetch()
- ci.consume_data()
- cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
- cc = util.load_yaml(cc_contents)
- self.assertEqual(1, len(cc))
- self.assertEqual('c', cc['a'])
-
- def test_vendor_user_yaml_cloud_config(self):
- vendor_blob = '''
-#cloud-config
-a: b
-name: vendor
-run:
- - x
- - y
-'''
-
- user_blob = '''
-#cloud-config
-a: c
-vendor_data:
- enabled: True
- prefix: /bin/true
-name: user
-run:
- - z
-'''
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self._patchIn(new_root)
- initer = stages.Init()
- initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
- initer.read_cfg()
- initer.initialize()
- initer.fetch()
- initer.instancify()
- initer.update()
- initer.cloudify().run('consume_data',
- initer.consume_data,
- args=[PER_INSTANCE],
- freq=PER_INSTANCE)
- mods = stages.Modules(initer)
- (_which_ran, _failures) = mods.run_section('cloud_init_modules')
- cfg = mods.cfg
- self.assertIn('vendor_data', cfg)
- self.assertEqual('c', cfg['a'])
- self.assertEqual('user', cfg['name'])
- self.assertNotIn('x', cfg['run'])
- self.assertNotIn('y', cfg['run'])
- self.assertIn('z', cfg['run'])
-
- def test_vendordata_script(self):
- vendor_blob = '''
-#!/bin/bash
-echo "test"
-'''
-
- user_blob = '''
-#cloud-config
-vendor_data:
- enabled: True
- prefix: /bin/true
-'''
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self._patchIn(new_root)
- initer = stages.Init()
- initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
- initer.read_cfg()
- initer.initialize()
- initer.fetch()
- initer.instancify()
- initer.update()
- initer.cloudify().run('consume_data',
- initer.consume_data,
- args=[PER_INSTANCE],
- freq=PER_INSTANCE)
- mods = stages.Modules(initer)
- (_which_ran, _failures) = mods.run_section('cloud_init_modules')
- vendor_script = initer.paths.get_ipath_cur('vendor_scripts')
- vendor_script_fns = "%s%s/part-001" % (new_root, vendor_script)
- self.assertTrue(os.path.exists(vendor_script_fns))
-
- def test_merging_cloud_config(self):
- blob = '''
-#cloud-config
-a: b
-e: f
-run:
- - b
- - c
-'''
- message1 = MIMEBase("text", "cloud-config")
- message1.set_payload(blob)
-
- blob2 = '''
-#cloud-config
-a: e
-e: g
-run:
- - stuff
- - morestuff
-'''
- message2 = MIMEBase("text", "cloud-config")
- message2['X-Merge-Type'] = ('dict(recurse_array,'
- 'recurse_str)+list(append)+str(append)')
- message2.set_payload(blob2)
-
- blob3 = '''
-#cloud-config
-e:
- - 1
- - 2
- - 3
-p: 1
-'''
- message3 = MIMEBase("text", "cloud-config")
- message3.set_payload(blob3)
-
- messages = [message1, message2, message3]
-
- paths = c_helpers.Paths({}, ds=FakeDataSource(''))
- cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
-
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self.patchUtils(new_root)
- self.patchOS(new_root)
- cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
- None)
- for i, m in enumerate(messages):
- headers = dict(m)
- fn = "part-%s" % (i + 1)
- payload = m.get_payload(decode=True)
- cloud_cfg.handle_part(None, headers['Content-Type'],
- fn, payload, None, headers)
- cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None,
- None)
- contents = util.load_file(paths.get_ipath('cloud_config'))
- contents = util.load_yaml(contents)
- self.assertEqual(contents['run'], ['b', 'c', 'stuff', 'morestuff'])
- self.assertEqual(contents['a'], 'be')
- self.assertEqual(contents['e'], [1, 2, 3])
- self.assertEqual(contents['p'], 1)
-
- def test_unhandled_type_warning(self):
- """Raw text without magic is ignored but shows warning."""
- ci = stages.Init()
- data = "arbitrary text\n"
- ci.datasource = FakeDataSource(data)
-
- with mock.patch('cloudinit.util.write_file') as mockobj:
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertIn(
- "Unhandled non-multipart (text/x-not-multipart) userdata:",
- log_file.getvalue())
-
- mockobj.assert_called_once_with(
- ci.paths.get_ipath("cloud_config"), "", 0o600)
-
- def test_mime_gzip_compressed(self):
- """Tests that individual message gzip encoding works."""
-
- def gzip_part(text):
- return MIMEApplication(gzip_text(text), 'gzip')
-
- base_content1 = '''
-#cloud-config
-a: 2
-'''
-
- base_content2 = '''
-#cloud-config
-b: 3
-c: 4
-'''
-
- message = MIMEMultipart('test')
- message.attach(gzip_part(base_content1))
- message.attach(gzip_part(base_content2))
- ci = stages.Init()
- ci.datasource = FakeDataSource(str(message))
- new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, new_root)
- self.patchUtils(new_root)
- self.patchOS(new_root)
- ci.fetch()
- ci.consume_data()
- contents = util.load_file(ci.paths.get_ipath("cloud_config"))
- contents = util.load_yaml(contents)
- self.assertTrue(isinstance(contents, dict))
- self.assertEqual(3, len(contents))
- self.assertEqual(2, contents['a'])
- self.assertEqual(3, contents['b'])
- self.assertEqual(4, contents['c'])
-
- def test_mime_text_plain(self):
- """Mime message of type text/plain is ignored but shows warning."""
- ci = stages.Init()
- message = MIMEBase("text", "plain")
- message.set_payload("Just text")
- ci.datasource = FakeDataSource(message.as_string().encode())
-
- with mock.patch('cloudinit.util.write_file') as mockobj:
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertIn(
- "Unhandled unknown content-type (text/plain)",
- log_file.getvalue())
- mockobj.assert_called_once_with(
- ci.paths.get_ipath("cloud_config"), "", 0o600)
-
- def test_shellscript(self):
- """Raw text starting #!/bin/sh is treated as script."""
- ci = stages.Init()
- script = "#!/bin/sh\necho hello\n"
- ci.datasource = FakeDataSource(script)
-
- outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
-
- with mock.patch('cloudinit.util.write_file') as mockobj:
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
-
- mockobj.assert_has_calls([
- mock.call(outpath, script, 0o700),
- mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)])
-
- def test_mime_text_x_shellscript(self):
- """Mime message of type text/x-shellscript is treated as script."""
- ci = stages.Init()
- script = "#!/bin/sh\necho hello\n"
- message = MIMEBase("text", "x-shellscript")
- message.set_payload(script)
- ci.datasource = FakeDataSource(message.as_string())
-
- outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
-
- with mock.patch('cloudinit.util.write_file') as mockobj:
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
-
- mockobj.assert_has_calls([
- mock.call(outpath, script, 0o700),
- mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)])
-
- def test_mime_text_plain_shell(self):
- """Mime type text/plain starting #!/bin/sh is treated as script."""
- ci = stages.Init()
- script = "#!/bin/sh\necho hello\n"
- message = MIMEBase("text", "plain")
- message.set_payload(script)
- ci.datasource = FakeDataSource(message.as_string())
-
- outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
-
- with mock.patch('cloudinit.util.write_file') as mockobj:
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
-
- mockobj.assert_has_calls([
- mock.call(outpath, script, 0o700),
- mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)])
-
- def test_mime_application_octet_stream(self):
- """Mime type application/octet-stream is ignored but shows warning."""
- ci = stages.Init()
- message = MIMEBase("application", "octet-stream")
- message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc')
- encoders.encode_base64(message)
- ci.datasource = FakeDataSource(message.as_string().encode())
-
- with mock.patch('cloudinit.util.write_file') as mockobj:
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertIn(
- "Unhandled unknown content-type (application/octet-stream)",
- log_file.getvalue())
- mockobj.assert_called_once_with(
- ci.paths.get_ipath("cloud_config"), "", 0o600)
-
- def test_cloud_config_archive(self):
- non_decodable = b'\x11\xc9\xb4gTH\xee\x12'
- data = [{'content': '#cloud-config\npassword: gocubs\n'},
- {'content': '#cloud-config\nlocale: chicago\n'},
- {'content': non_decodable}]
- message = b'#cloud-config-archive\n' + util.yaml_dumps(data).encode()
-
- ci = stages.Init()
- ci.datasource = FakeDataSource(message)
-
- fs = {}
-
- def fsstore(filename, content, mode=0o0644, omode="wb"):
- fs[filename] = content
-
- # consuming the user-data provided should write 'cloud_config' file
- # which will have our yaml in it.
- with mock.patch('cloudinit.util.write_file') as mockobj:
- mockobj.side_effect = fsstore
- ci.fetch()
- ci.consume_data()
-
- cfg = util.load_yaml(fs[ci.paths.get_ipath("cloud_config")])
- self.assertEqual(cfg.get('password'), 'gocubs')
- self.assertEqual(cfg.get('locale'), 'chicago')
-
-
-class TestUDProcess(helpers.ResourceUsingTestCase):
-
- def test_bytes_in_userdata(self):
- msg = b'#cloud-config\napt_update: True\n'
- ud_proc = ud.UserDataProcessor(self.getCloudPaths())
- message = ud_proc.process(msg)
- self.assertTrue(count_messages(message) == 1)
-
- def test_string_in_userdata(self):
- msg = '#cloud-config\napt_update: True\n'
-
- ud_proc = ud.UserDataProcessor(self.getCloudPaths())
- message = ud_proc.process(msg)
- self.assertTrue(count_messages(message) == 1)
-
- def test_compressed_in_userdata(self):
- msg = gzip_text('#cloud-config\napt_update: True\n')
-
- ud_proc = ud.UserDataProcessor(self.getCloudPaths())
- message = ud_proc.process(msg)
- self.assertTrue(count_messages(message) == 1)
-
-
-class TestConvertString(helpers.TestCase):
- def test_handles_binary_non_utf8_decodable(self):
- blob = b'\x32\x99'
- msg = ud.convert_string(blob)
- self.assertEqual(blob, msg.get_payload(decode=True))
-
- def test_handles_binary_utf8_decodable(self):
- blob = b'\x32\x32'
- msg = ud.convert_string(blob)
- self.assertEqual(blob, msg.get_payload(decode=True))
-
- def test_handle_headers(self):
- text = "hi mom"
- msg = ud.convert_string(text)
- self.assertEqual(text, msg.get_payload(decode=False))