summaryrefslogtreecommitdiff
path: root/tests/unittests/test_handler
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_handler')
-rw-r--r--tests/unittests/test_handler/__init__.py0
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure.py109
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure_sources_list.py180
-rw-r--r--tests/unittests/test_handler/test_handler_apt_source.py516
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py271
-rw-r--r--tests/unittests/test_handler/test_handler_chef.py192
-rw-r--r--tests/unittests/test_handler/test_handler_debug.py81
-rw-r--r--tests/unittests/test_handler/test_handler_disk_setup.py30
-rw-r--r--tests/unittests/test_handler/test_handler_growpart.py220
-rw-r--r--tests/unittests/test_handler/test_handler_locale.py67
-rw-r--r--tests/unittests/test_handler/test_handler_lxd.py134
-rw-r--r--tests/unittests/test_handler/test_handler_mcollective.py148
-rw-r--r--tests/unittests/test_handler/test_handler_mounts.py133
-rw-r--r--tests/unittests/test_handler/test_handler_power_state.py127
-rw-r--r--tests/unittests/test_handler/test_handler_rsyslog.py174
-rw-r--r--tests/unittests/test_handler/test_handler_seed_random.py227
-rw-r--r--tests/unittests/test_handler/test_handler_set_hostname.py72
-rw-r--r--tests/unittests/test_handler/test_handler_snappy.py306
-rw-r--r--tests/unittests/test_handler/test_handler_timezone.py76
-rw-r--r--tests/unittests/test_handler/test_handler_write_files.py112
-rw-r--r--tests/unittests/test_handler/test_handler_yum_add_repo.py68
21 files changed, 0 insertions, 3243 deletions
diff --git a/tests/unittests/test_handler/__init__.py b/tests/unittests/test_handler/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/tests/unittests/test_handler/__init__.py
+++ /dev/null
diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py
deleted file mode 100644
index d1dca2c4..00000000
--- a/tests/unittests/test_handler/test_handler_apt_configure.py
+++ /dev/null
@@ -1,109 +0,0 @@
-from cloudinit.config import cc_apt_configure
-from cloudinit import util
-
-from ..helpers import TestCase
-
-import os
-import re
-import shutil
-import tempfile
-
-
-def load_tfile_or_url(*args, **kwargs):
- return(util.decode_binary(util.read_file_or_url(*args, **kwargs).contents))
-
-
-class TestAptProxyConfig(TestCase):
- def setUp(self):
- super(TestAptProxyConfig, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.pfile = os.path.join(self.tmp, "proxy.cfg")
- self.cfile = os.path.join(self.tmp, "config.cfg")
-
- def _search_apt_config(self, contents, ptype, value):
- return re.search(
- r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value),
- contents, flags=re.IGNORECASE)
-
- def test_apt_proxy_written(self):
- cfg = {'apt_proxy': 'myproxy'}
- cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile)
-
- self.assertTrue(os.path.isfile(self.pfile))
- self.assertFalse(os.path.isfile(self.cfile))
-
- contents = load_tfile_or_url(self.pfile)
- self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
-
- def test_apt_http_proxy_written(self):
- cfg = {'apt_http_proxy': 'myproxy'}
- cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile)
-
- self.assertTrue(os.path.isfile(self.pfile))
- self.assertFalse(os.path.isfile(self.cfile))
-
- contents = load_tfile_or_url(self.pfile)
- self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
-
- def test_apt_all_proxy_written(self):
- cfg = {'apt_http_proxy': 'myproxy_http_proxy',
- 'apt_https_proxy': 'myproxy_https_proxy',
- 'apt_ftp_proxy': 'myproxy_ftp_proxy'}
-
- values = {'http': cfg['apt_http_proxy'],
- 'https': cfg['apt_https_proxy'],
- 'ftp': cfg['apt_ftp_proxy'],
- }
-
- cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile)
-
- self.assertTrue(os.path.isfile(self.pfile))
- self.assertFalse(os.path.isfile(self.cfile))
-
- contents = load_tfile_or_url(self.pfile)
-
- for ptype, pval in values.items():
- self.assertTrue(self._search_apt_config(contents, ptype, pval))
-
- def test_proxy_deleted(self):
- util.write_file(self.cfile, "content doesnt matter")
- cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile)
- self.assertFalse(os.path.isfile(self.pfile))
- self.assertFalse(os.path.isfile(self.cfile))
-
- def test_proxy_replaced(self):
- util.write_file(self.cfile, "content doesnt matter")
- cc_apt_configure.apply_apt_config({'apt_proxy': "foo"},
- self.pfile, self.cfile)
- self.assertTrue(os.path.isfile(self.pfile))
- contents = load_tfile_or_url(self.pfile)
- self.assertTrue(self._search_apt_config(contents, "http", "foo"))
-
- def test_config_written(self):
- payload = 'this is my apt config'
- cfg = {'apt_config': payload}
-
- cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile)
-
- self.assertTrue(os.path.isfile(self.cfile))
- self.assertFalse(os.path.isfile(self.pfile))
-
- self.assertEqual(load_tfile_or_url(self.cfile), payload)
-
- def test_config_replaced(self):
- util.write_file(self.pfile, "content doesnt matter")
- cc_apt_configure.apply_apt_config({'apt_config': "foo"},
- self.pfile, self.cfile)
- self.assertTrue(os.path.isfile(self.cfile))
- self.assertEqual(load_tfile_or_url(self.cfile), "foo")
-
- def test_config_deleted(self):
- # if no 'apt_config' is provided, delete any previously written file
- util.write_file(self.pfile, "content doesnt matter")
- cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile)
- self.assertFalse(os.path.isfile(self.pfile))
- self.assertFalse(os.path.isfile(self.cfile))
-
-
-# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py
deleted file mode 100644
index acde0863..00000000
--- a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py
+++ /dev/null
@@ -1,180 +0,0 @@
-""" test_handler_apt_configure_sources_list
-Test templating of sources list
-"""
-import logging
-import os
-import shutil
-import tempfile
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-
-from cloudinit import cloud
-from cloudinit import distros
-from cloudinit import helpers
-from cloudinit import templater
-from cloudinit import util
-
-from cloudinit.config import cc_apt_configure
-from cloudinit.sources import DataSourceNone
-
-from cloudinit.distros.debian import Distro
-
-from .. import helpers as t_help
-
-LOG = logging.getLogger(__name__)
-
-YAML_TEXT_CUSTOM_SL = """
-apt_mirror: http://archive.ubuntu.com/ubuntu/
-apt_custom_sources_list: |
- ## template:jinja
- ## Note, this file is written by cloud-init on first boot of an instance
- ## modifications made here will not survive a re-bundle.
- ## if you wish to make changes you can:
- ## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg
- ## or do the same in user-data
- ## b.) add sources in /etc/apt/sources.list.d
- ## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl
-
- # See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to
- # newer versions of the distribution.
- deb {{mirror}} {{codename}} main restricted
- deb-src {{mirror}} {{codename}} main restricted
- # FIND_SOMETHING_SPECIAL
-"""
-
-EXPECTED_CONVERTED_CONTENT = (
- """## Note, this file is written by cloud-init on first boot of an instance
-## modifications made here will not survive a re-bundle.
-## if you wish to make changes you can:
-## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg
-## or do the same in user-data
-## b.) add sources in /etc/apt/sources.list.d
-## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl
-
-# See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to
-# newer versions of the distribution.
-deb http://archive.ubuntu.com/ubuntu/ fakerelease main restricted
-deb-src http://archive.ubuntu.com/ubuntu/ fakerelease main restricted
-# FIND_SOMETHING_SPECIAL
-""")
-
-
-def load_tfile_or_url(*args, **kwargs):
- """load_tfile_or_url
- load file and return content after decoding
- """
- return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents)
-
-
-class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase):
- """TestAptSourceConfigSourceList
- Main Class to test sources list rendering
- """
- def setUp(self):
- super(TestAptSourceConfigSourceList, self).setUp()
- self.subp = util.subp
- self.new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.new_root)
-
- def _get_cloud(self, distro, metadata=None):
- self.patchUtils(self.new_root)
- paths = helpers.Paths({})
- cls = distros.fetch(distro)
- mydist = cls(distro, {}, paths)
- myds = DataSourceNone.DataSourceNone({}, mydist, paths)
- if metadata:
- myds.metadata.update(metadata)
- return cloud.Cloud(myds, paths, {}, mydist, None)
-
- def apt_source_list(self, distro, mirror, mirrorcheck=None):
- """apt_source_list
- Test rendering of a source.list from template for a given distro
- """
- if mirrorcheck is None:
- mirrorcheck = mirror
-
- if isinstance(mirror, list):
- cfg = {'apt_mirror_search': mirror}
- else:
- cfg = {'apt_mirror': mirror}
- mycloud = self._get_cloud(distro)
-
- with mock.patch.object(templater, 'render_to_file') as mocktmpl:
- with mock.patch.object(os.path, 'isfile',
- return_value=True) as mockisfile:
- with mock.patch.object(util, 'rename'):
- cc_apt_configure.handle("notimportant", cfg, mycloud,
- LOG, None)
-
- mockisfile.assert_any_call(
- ('/etc/cloud/templates/sources.list.%s.tmpl' % distro))
- mocktmpl.assert_called_once_with(
- ('/etc/cloud/templates/sources.list.%s.tmpl' % distro),
- '/etc/apt/sources.list',
- {'codename': '', 'primary': mirrorcheck, 'mirror': mirrorcheck})
-
- def test_apt_source_list_debian(self):
- """Test rendering of a source.list from template for debian"""
- self.apt_source_list('debian', 'http://httpredir.debian.org/debian')
-
- def test_apt_source_list_ubuntu(self):
- """Test rendering of a source.list from template for ubuntu"""
- self.apt_source_list('ubuntu', 'http://archive.ubuntu.com/ubuntu/')
-
- @staticmethod
- def myresolve(name):
- """Fake util.is_resolvable for mirrorfail tests"""
- if name == "does.not.exist":
- print("Faking FAIL for '%s'" % name)
- return False
- else:
- print("Faking SUCCESS for '%s'" % name)
- return True
-
- def test_apt_srcl_debian_mirrorfail(self):
- """Test rendering of a source.list from template for debian"""
- with mock.patch.object(util, 'is_resolvable',
- side_effect=self.myresolve) as mockresolve:
- self.apt_source_list('debian',
- ['http://does.not.exist',
- 'http://httpredir.debian.org/debian'],
- 'http://httpredir.debian.org/debian')
- mockresolve.assert_any_call("does.not.exist")
- mockresolve.assert_any_call("httpredir.debian.org")
-
- def test_apt_srcl_ubuntu_mirrorfail(self):
- """Test rendering of a source.list from template for ubuntu"""
- with mock.patch.object(util, 'is_resolvable',
- side_effect=self.myresolve) as mockresolve:
- self.apt_source_list('ubuntu',
- ['http://does.not.exist',
- 'http://archive.ubuntu.com/ubuntu/'],
- 'http://archive.ubuntu.com/ubuntu/')
- mockresolve.assert_any_call("does.not.exist")
- mockresolve.assert_any_call("archive.ubuntu.com")
-
- def test_apt_srcl_custom(self):
- """Test rendering from a custom source.list template"""
- cfg = util.load_yaml(YAML_TEXT_CUSTOM_SL)
- mycloud = self._get_cloud('ubuntu')
-
- # the second mock restores the original subp
- with mock.patch.object(util, 'write_file') as mockwrite:
- with mock.patch.object(util, 'subp', self.subp):
- with mock.patch.object(cc_apt_configure, 'get_release',
- return_value='fakerelease'):
- with mock.patch.object(Distro, 'get_primary_arch',
- return_value='amd64'):
- cc_apt_configure.handle("notimportant", cfg, mycloud,
- LOG, None)
-
- mockwrite.assert_called_once_with(
- '/etc/apt/sources.list',
- EXPECTED_CONVERTED_CONTENT,
- mode=420)
-
-
-# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_apt_source.py b/tests/unittests/test_handler/test_handler_apt_source.py
deleted file mode 100644
index 99a4d860..00000000
--- a/tests/unittests/test_handler/test_handler_apt_source.py
+++ /dev/null
@@ -1,516 +0,0 @@
-""" test_handler_apt_source
-Testing various config variations of the apt_source config
-"""
-import os
-import re
-import shutil
-import tempfile
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-from mock import call
-
-from cloudinit.config import cc_apt_configure
-from cloudinit import gpg
-from cloudinit import util
-
-from ..helpers import TestCase
-
-EXPECTEDKEY = """-----BEGIN PGP PUBLIC KEY BLOCK-----
-Version: GnuPG v1
-
-mI0ESuZLUgEEAKkqq3idtFP7g9hzOu1a8+v8ImawQN4TrvlygfScMU1TIS1eC7UQ
-NUA8Qqgr9iUaGnejb0VciqftLrU9D6WYHSKz+EITefgdyJ6SoQxjoJdsCpJ7o9Jy
-8PQnpRttiFm4qHu6BVnKnBNxw/z3ST9YMqW5kbMQpfxbGe+obRox59NpABEBAAG0
-HUxhdW5jaHBhZCBQUEEgZm9yIFNjb3R0IE1vc2VyiLYEEwECACAFAkrmS1ICGwMG
-CwkIBwMCBBUCCAMEFgIDAQIeAQIXgAAKCRAGILvPA2g/d3aEA/9tVjc10HOZwV29
-OatVuTeERjjrIbxflO586GLA8cp0C9RQCwgod/R+cKYdQcHjbqVcP0HqxveLg0RZ
-FJpWLmWKamwkABErwQLGlM/Hwhjfade8VvEQutH5/0JgKHmzRsoqfR+LMO6OS+Sm
-S0ORP6HXET3+jC8BMG4tBWCTK/XEZw==
-=ACB2
------END PGP PUBLIC KEY BLOCK-----"""
-
-
-def load_tfile_or_url(*args, **kwargs):
- """load_tfile_or_url
- load file and return content after decoding
- """
- return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents)
-
-
-class TestAptSourceConfig(TestCase):
- """TestAptSourceConfig
- Main Class to test apt_source configs
- """
- release = "fantastic"
-
- def setUp(self):
- super(TestAptSourceConfig, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.aptlistfile = os.path.join(self.tmp, "single-deb.list")
- self.aptlistfile2 = os.path.join(self.tmp, "single-deb2.list")
- self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list")
- self.join = os.path.join
- # mock fallback filename into writable tmp dir
- self.fallbackfn = os.path.join(self.tmp, "etc/apt/sources.list.d/",
- "cloud_config_sources.list")
-
- patcher = mock.patch("cloudinit.config.cc_apt_configure.get_release")
- get_rel = patcher.start()
- get_rel.return_value = self.release
- self.addCleanup(patcher.stop)
-
- @staticmethod
- def _get_default_params():
- """get_default_params
- Get the most basic default mrror and release info to be used in tests
- """
- params = {}
- params['RELEASE'] = cc_apt_configure.get_release()
- params['MIRROR'] = "http://archive.ubuntu.com/ubuntu"
- return params
-
- def myjoin(self, *args, **kwargs):
- """myjoin - redir into writable tmpdir"""
- if (args[0] == "/etc/apt/sources.list.d/" and
- args[1] == "cloud_config_sources.list" and
- len(args) == 2):
- return self.join(self.tmp, args[0].lstrip("/"), args[1])
- else:
- return self.join(*args, **kwargs)
-
- def apt_src_basic(self, filename, cfg):
- """apt_src_basic
- Test Fix deb source string, has to overwrite mirror conf in params
- """
- params = self._get_default_params()
-
- cc_apt_configure.add_apt_sources(cfg, params)
-
- self.assertTrue(os.path.isfile(filename))
-
- contents = load_tfile_or_url(filename)
- self.assertTrue(re.search(r"%s %s %s %s\n" %
- ("deb", "http://archive.ubuntu.com/ubuntu",
- "karmic-backports",
- "main universe multiverse restricted"),
- contents, flags=re.IGNORECASE))
-
- def test_apt_src_basic(self):
- """Test deb source string, overwrite mirror and filename"""
- cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu'
- ' karmic-backports'
- ' main universe multiverse restricted'),
- 'filename': self.aptlistfile}
- self.apt_src_basic(self.aptlistfile, [cfg])
-
- def test_apt_src_basic_dict(self):
- """Test deb source string, overwrite mirror and filename (dict)"""
- cfg = {self.aptlistfile: {'source':
- ('deb http://archive.ubuntu.com/ubuntu'
- ' karmic-backports'
- ' main universe multiverse restricted')}}
- self.apt_src_basic(self.aptlistfile, cfg)
-
- def apt_src_basic_tri(self, cfg):
- """apt_src_basic_tri
- Test Fix three deb source string, has to overwrite mirror conf in
- params. Test with filenames provided in config.
- generic part to check three files with different content
- """
- self.apt_src_basic(self.aptlistfile, cfg)
-
- # extra verify on two extra files of this test
- contents = load_tfile_or_url(self.aptlistfile2)
- self.assertTrue(re.search(r"%s %s %s %s\n" %
- ("deb", "http://archive.ubuntu.com/ubuntu",
- "precise-backports",
- "main universe multiverse restricted"),
- contents, flags=re.IGNORECASE))
- contents = load_tfile_or_url(self.aptlistfile3)
- self.assertTrue(re.search(r"%s %s %s %s\n" %
- ("deb", "http://archive.ubuntu.com/ubuntu",
- "lucid-backports",
- "main universe multiverse restricted"),
- contents, flags=re.IGNORECASE))
-
- def test_apt_src_basic_tri(self):
- """Test Fix three deb source string with filenames"""
- cfg1 = {'source': ('deb http://archive.ubuntu.com/ubuntu'
- ' karmic-backports'
- ' main universe multiverse restricted'),
- 'filename': self.aptlistfile}
- cfg2 = {'source': ('deb http://archive.ubuntu.com/ubuntu'
- ' precise-backports'
- ' main universe multiverse restricted'),
- 'filename': self.aptlistfile2}
- cfg3 = {'source': ('deb http://archive.ubuntu.com/ubuntu'
- ' lucid-backports'
- ' main universe multiverse restricted'),
- 'filename': self.aptlistfile3}
- self.apt_src_basic_tri([cfg1, cfg2, cfg3])
-
- def test_apt_src_basic_dict_tri(self):
- """Test Fix three deb source string with filenames (dict)"""
- cfg = {self.aptlistfile: {'source':
- ('deb http://archive.ubuntu.com/ubuntu'
- ' karmic-backports'
- ' main universe multiverse restricted')},
- self.aptlistfile2: {'source':
- ('deb http://archive.ubuntu.com/ubuntu'
- ' precise-backports'
- ' main universe multiverse restricted')},
- self.aptlistfile3: {'source':
- ('deb http://archive.ubuntu.com/ubuntu'
- ' lucid-backports'
- ' main universe multiverse restricted')}}
- self.apt_src_basic_tri(cfg)
-
- def test_apt_src_basic_nofn(self):
- """Test Fix three deb source string without filenames (dict)"""
- cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu'
- ' karmic-backports'
- ' main universe multiverse restricted')}
- with mock.patch.object(os.path, 'join', side_effect=self.myjoin):
- self.apt_src_basic(self.fallbackfn, [cfg])
-
- def apt_src_replacement(self, filename, cfg):
- """apt_src_replace
- Test Autoreplacement of MIRROR and RELEASE in source specs
- """
- params = self._get_default_params()
- cc_apt_configure.add_apt_sources(cfg, params)
-
- self.assertTrue(os.path.isfile(filename))
-
- contents = load_tfile_or_url(filename)
- self.assertTrue(re.search(r"%s %s %s %s\n" %
- ("deb", params['MIRROR'], params['RELEASE'],
- "multiverse"),
- contents, flags=re.IGNORECASE))
-
- def test_apt_src_replace(self):
- """Test Autoreplacement of MIRROR and RELEASE in source specs"""
- cfg = {'source': 'deb $MIRROR $RELEASE multiverse',
- 'filename': self.aptlistfile}
- self.apt_src_replacement(self.aptlistfile, [cfg])
-
- def apt_src_replace_tri(self, cfg):
- """apt_src_replace_tri
- Test three autoreplacements of MIRROR and RELEASE in source specs with
- generic part
- """
- self.apt_src_replacement(self.aptlistfile, cfg)
-
- # extra verify on two extra files of this test
- params = self._get_default_params()
- contents = load_tfile_or_url(self.aptlistfile2)
- self.assertTrue(re.search(r"%s %s %s %s\n" %
- ("deb", params['MIRROR'], params['RELEASE'],
- "main"),
- contents, flags=re.IGNORECASE))
- contents = load_tfile_or_url(self.aptlistfile3)
- self.assertTrue(re.search(r"%s %s %s %s\n" %
- ("deb", params['MIRROR'], params['RELEASE'],
- "universe"),
- contents, flags=re.IGNORECASE))
-
- def test_apt_src_replace_tri(self):
- """Test triple Autoreplacement of MIRROR and RELEASE in source specs"""
- cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse',
- 'filename': self.aptlistfile}
- cfg2 = {'source': 'deb $MIRROR $RELEASE main',
- 'filename': self.aptlistfile2}
- cfg3 = {'source': 'deb $MIRROR $RELEASE universe',
- 'filename': self.aptlistfile3}
- self.apt_src_replace_tri([cfg1, cfg2, cfg3])
-
- def test_apt_src_replace_dict_tri(self):
- """Test triple Autoreplacement in source specs (dict)"""
- cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'},
- 'notused': {'source': 'deb $MIRROR $RELEASE main',
- 'filename': self.aptlistfile2},
- self.aptlistfile3: {'source': 'deb $MIRROR $RELEASE universe'}}
- self.apt_src_replace_tri(cfg)
-
- def test_apt_src_replace_nofn(self):
- """Test Autoreplacement of MIRROR and RELEASE in source specs nofile"""
- cfg = {'source': 'deb $MIRROR $RELEASE multiverse'}
- with mock.patch.object(os.path, 'join', side_effect=self.myjoin):
- self.apt_src_replacement(self.fallbackfn, [cfg])
-
- def apt_src_keyid(self, filename, cfg, keynum):
- """apt_src_keyid
- Test specification of a source + keyid
- """
- params = self._get_default_params()
-
- with mock.patch.object(util, 'subp',
- return_value=('fakekey 1234', '')) as mockobj:
- cc_apt_configure.add_apt_sources(cfg, params)
-
- # check if it added the right ammount of keys
- calls = []
- for _ in range(keynum):
- calls.append(call(('apt-key', 'add', '-'), 'fakekey 1234'))
- mockobj.assert_has_calls(calls, any_order=True)
-
- self.assertTrue(os.path.isfile(filename))
-
- contents = load_tfile_or_url(filename)
- self.assertTrue(re.search(r"%s %s %s %s\n" %
- ("deb",
- ('http://ppa.launchpad.net/smoser/'
- 'cloud-init-test/ubuntu'),
- "xenial", "main"),
- contents, flags=re.IGNORECASE))
-
- def test_apt_src_keyid(self):
- """Test specification of a source + keyid with filename being set"""
- cfg = {'source': ('deb '
- 'http://ppa.launchpad.net/'
- 'smoser/cloud-init-test/ubuntu'
- ' xenial main'),
- 'keyid': "03683F77",
- 'filename': self.aptlistfile}
- self.apt_src_keyid(self.aptlistfile, [cfg], 1)
-
- def test_apt_src_keyid_tri(self):
- """Test 3x specification of a source + keyid with filename being set"""
- cfg1 = {'source': ('deb '
- 'http://ppa.launchpad.net/'
- 'smoser/cloud-init-test/ubuntu'
- ' xenial main'),
- 'keyid': "03683F77",
- 'filename': self.aptlistfile}
- cfg2 = {'source': ('deb '
- 'http://ppa.launchpad.net/'
- 'smoser/cloud-init-test/ubuntu'
- ' xenial universe'),
- 'keyid': "03683F77",
- 'filename': self.aptlistfile2}
- cfg3 = {'source': ('deb '
- 'http://ppa.launchpad.net/'
- 'smoser/cloud-init-test/ubuntu'
- ' xenial multiverse'),
- 'keyid': "03683F77",
- 'filename': self.aptlistfile3}
-
- self.apt_src_keyid(self.aptlistfile, [cfg1, cfg2, cfg3], 3)
- contents = load_tfile_or_url(self.aptlistfile2)
- self.assertTrue(re.search(r"%s %s %s %s\n" %
- ("deb",
- ('http://ppa.launchpad.net/smoser/'
- 'cloud-init-test/ubuntu'),
- "xenial", "universe"),
- contents, flags=re.IGNORECASE))
- contents = load_tfile_or_url(self.aptlistfile3)
- self.assertTrue(re.search(r"%s %s %s %s\n" %
- ("deb",
- ('http://ppa.launchpad.net/smoser/'
- 'cloud-init-test/ubuntu'),
- "xenial", "multiverse"),
- contents, flags=re.IGNORECASE))
-
- def test_apt_src_keyid_nofn(self):
- """Test specification of a source + keyid without filename being set"""
- cfg = {'source': ('deb '
- 'http://ppa.launchpad.net/'
- 'smoser/cloud-init-test/ubuntu'
- ' xenial main'),
- 'keyid': "03683F77"}
- with mock.patch.object(os.path, 'join', side_effect=self.myjoin):
- self.apt_src_keyid(self.fallbackfn, [cfg], 1)
-
- def apt_src_key(self, filename, cfg):
- """apt_src_key
- Test specification of a source + key
- """
- params = self._get_default_params()
-
- with mock.patch.object(util, 'subp') as mockobj:
- cc_apt_configure.add_apt_sources([cfg], params)
-
- mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 4321')
-
- self.assertTrue(os.path.isfile(filename))
-
- contents = load_tfile_or_url(filename)
- self.assertTrue(re.search(r"%s %s %s %s\n" %
- ("deb",
- ('http://ppa.launchpad.net/smoser/'
- 'cloud-init-test/ubuntu'),
- "xenial", "main"),
- contents, flags=re.IGNORECASE))
-
- def test_apt_src_key(self):
- """Test specification of a source + key with filename being set"""
- cfg = {'source': ('deb '
- 'http://ppa.launchpad.net/'
- 'smoser/cloud-init-test/ubuntu'
- ' xenial main'),
- 'key': "fakekey 4321",
- 'filename': self.aptlistfile}
- self.apt_src_key(self.aptlistfile, cfg)
-
- def test_apt_src_key_nofn(self):
- """Test specification of a source + key without filename being set"""
- cfg = {'source': ('deb '
- 'http://ppa.launchpad.net/'
- 'smoser/cloud-init-test/ubuntu'
- ' xenial main'),
- 'key': "fakekey 4321"}
- with mock.patch.object(os.path, 'join', side_effect=self.myjoin):
- self.apt_src_key(self.fallbackfn, cfg)
-
- def test_apt_src_keyonly(self):
- """Test specifying key without source"""
- params = self._get_default_params()
- cfg = {'key': "fakekey 4242",
- 'filename': self.aptlistfile}
-
- with mock.patch.object(util, 'subp') as mockobj:
- cc_apt_configure.add_apt_sources([cfg], params)
-
- mockobj.assert_called_once_with(('apt-key', 'add', '-'),
- 'fakekey 4242')
-
- # filename should be ignored on key only
- self.assertFalse(os.path.isfile(self.aptlistfile))
-
- def test_apt_src_keyidonly(self):
- """Test specification of a keyid without source"""
- params = self._get_default_params()
- cfg = {'keyid': "03683F77",
- 'filename': self.aptlistfile}
-
- with mock.patch.object(util, 'subp',
- return_value=('fakekey 1212', '')) as mockobj:
- cc_apt_configure.add_apt_sources([cfg], params)
-
- mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 1212')
-
- # filename should be ignored on key only
- self.assertFalse(os.path.isfile(self.aptlistfile))
-
- def apt_src_keyid_real(self, cfg, expectedkey):
- """apt_src_keyid_real
- Test specification of a keyid without source including
- up to addition of the key (add_apt_key_raw mocked to keep the
- environment as is)
- """
- params = self._get_default_params()
-
- with mock.patch.object(cc_apt_configure, 'add_apt_key_raw') as mockkey:
- with mock.patch.object(gpg, 'get_key_by_id',
- return_value=expectedkey) as mockgetkey:
- cc_apt_configure.add_apt_sources([cfg], params)
-
- mockgetkey.assert_called_with(cfg['keyid'],
- cfg.get('keyserver',
- 'keyserver.ubuntu.com'))
- mockkey.assert_called_with(expectedkey)
-
- # filename should be ignored on key only
- self.assertFalse(os.path.isfile(self.aptlistfile))
-
- def test_apt_src_keyid_real(self):
- """test_apt_src_keyid_real - Test keyid including key add"""
- keyid = "03683F77"
- cfg = {'keyid': keyid,
- 'filename': self.aptlistfile}
-
- self.apt_src_keyid_real(cfg, EXPECTEDKEY)
-
- def test_apt_src_longkeyid_real(self):
- """test_apt_src_longkeyid_real - Test long keyid including key add"""
- keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77"
- cfg = {'keyid': keyid,
- 'filename': self.aptlistfile}
-
- self.apt_src_keyid_real(cfg, EXPECTEDKEY)
-
- def test_apt_src_longkeyid_ks_real(self):
- """test_apt_src_longkeyid_ks_real - Test long keyid from other ks"""
- keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77"
- cfg = {'keyid': keyid,
- 'keyserver': 'keys.gnupg.net',
- 'filename': self.aptlistfile}
-
- self.apt_src_keyid_real(cfg, EXPECTEDKEY)
-
- def test_apt_src_ppa(self):
- """Test adding a ppa"""
- params = self._get_default_params()
- cfg = {'source': 'ppa:smoser/cloud-init-test',
- 'filename': self.aptlistfile}
-
- # default matcher needed for ppa
- matcher = re.compile(r'^[\w-]+:\w').search
-
- with mock.patch.object(util, 'subp') as mockobj:
- cc_apt_configure.add_apt_sources([cfg], params,
- aa_repo_match=matcher)
- mockobj.assert_called_once_with(['add-apt-repository',
- 'ppa:smoser/cloud-init-test'])
-
- # adding ppa should ignore filename (uses add-apt-repository)
- self.assertFalse(os.path.isfile(self.aptlistfile))
-
- def test_apt_src_ppa_tri(self):
- """Test adding three ppa's"""
- params = self._get_default_params()
- cfg1 = {'source': 'ppa:smoser/cloud-init-test',
- 'filename': self.aptlistfile}
- cfg2 = {'source': 'ppa:smoser/cloud-init-test2',
- 'filename': self.aptlistfile2}
- cfg3 = {'source': 'ppa:smoser/cloud-init-test3',
- 'filename': self.aptlistfile3}
-
- # default matcher needed for ppa
- matcher = re.compile(r'^[\w-]+:\w').search
-
- with mock.patch.object(util, 'subp') as mockobj:
- cc_apt_configure.add_apt_sources([cfg1, cfg2, cfg3], params,
- aa_repo_match=matcher)
- calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test']),
- call(['add-apt-repository', 'ppa:smoser/cloud-init-test2']),
- call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'])]
- mockobj.assert_has_calls(calls, any_order=True)
-
- # adding ppa should ignore all filenames (uses add-apt-repository)
- self.assertFalse(os.path.isfile(self.aptlistfile))
- self.assertFalse(os.path.isfile(self.aptlistfile2))
- self.assertFalse(os.path.isfile(self.aptlistfile3))
-
- def test_convert_to_new_format(self):
- """Test the conversion of old to new format"""
- cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse',
- 'filename': self.aptlistfile}
- cfg2 = {'source': 'deb $MIRROR $RELEASE main',
- 'filename': self.aptlistfile2}
- cfg3 = {'source': 'deb $MIRROR $RELEASE universe',
- 'filename': self.aptlistfile3}
- checkcfg = {self.aptlistfile: {'filename': self.aptlistfile,
- 'source': 'deb $MIRROR $RELEASE '
- 'multiverse'},
- self.aptlistfile2: {'filename': self.aptlistfile2,
- 'source': 'deb $MIRROR $RELEASE main'},
- self.aptlistfile3: {'filename': self.aptlistfile3,
- 'source': 'deb $MIRROR $RELEASE '
- 'universe'}}
-
- newcfg = cc_apt_configure.convert_to_new_format([cfg1, cfg2, cfg3])
- self.assertEqual(newcfg, checkcfg)
-
- newcfg2 = cc_apt_configure.convert_to_new_format(newcfg)
- self.assertEqual(newcfg2, checkcfg)
-
- with self.assertRaises(ValueError):
- cc_apt_configure.convert_to_new_format(5)
-
-
-# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
deleted file mode 100644
index 5e771731..00000000
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ /dev/null
@@ -1,271 +0,0 @@
-from cloudinit import cloud
-from cloudinit.config import cc_ca_certs
-from cloudinit import helpers
-from cloudinit import util
-
-from ..helpers import TestCase
-
-import logging
-import shutil
-import tempfile
-import unittest
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
-
-
-class TestNoConfig(unittest.TestCase):
- def setUp(self):
- super(TestNoConfig, self).setUp()
- self.name = "ca-certs"
- self.cloud_init = None
- self.log = logging.getLogger("TestNoConfig")
- self.args = []
-
- def test_no_config(self):
- """
- Test that nothing is done if no ca-certs configuration is provided.
- """
- config = util.get_builtin_cfg()
- with ExitStack() as mocks:
- util_mock = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- certs_mock = mocks.enter_context(
- mock.patch.object(cc_ca_certs, 'update_ca_certs'))
-
- cc_ca_certs.handle(self.name, config, self.cloud_init, self.log,
- self.args)
-
- self.assertEqual(util_mock.call_count, 0)
- self.assertEqual(certs_mock.call_count, 0)
-
-
-class TestConfig(TestCase):
- def setUp(self):
- super(TestConfig, self).setUp()
- self.name = "ca-certs"
- self.paths = None
- self.cloud = cloud.Cloud(None, self.paths, None, None, None)
- self.log = logging.getLogger("TestNoConfig")
- self.args = []
-
- self.mocks = ExitStack()
- self.addCleanup(self.mocks.close)
-
- # Mock out the functions that actually modify the system
- self.mock_add = self.mocks.enter_context(
- mock.patch.object(cc_ca_certs, 'add_ca_certs'))
- self.mock_update = self.mocks.enter_context(
- mock.patch.object(cc_ca_certs, 'update_ca_certs'))
- self.mock_remove = self.mocks.enter_context(
- mock.patch.object(cc_ca_certs, 'remove_default_ca_certs'))
-
- def test_no_trusted_list(self):
- """
- Test that no certificates are written if the 'trusted' key is not
- present.
- """
- config = {"ca-certs": {}}
-
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
-
- self.assertEqual(self.mock_add.call_count, 0)
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
-
- def test_empty_trusted_list(self):
- """Test that no certificate are written if 'trusted' list is empty."""
- config = {"ca-certs": {"trusted": []}}
-
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
-
- self.assertEqual(self.mock_add.call_count, 0)
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
-
- def test_single_trusted(self):
- """Test that a single cert gets passed to add_ca_certs."""
- config = {"ca-certs": {"trusted": ["CERT1"]}}
-
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
-
- self.mock_add.assert_called_once_with(['CERT1'])
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
-
- def test_multiple_trusted(self):
- """Test that multiple certs get passed to add_ca_certs."""
- config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
-
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
-
- self.mock_add.assert_called_once_with(['CERT1', 'CERT2'])
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
-
- def test_remove_default_ca_certs(self):
- """Test remove_defaults works as expected."""
- config = {"ca-certs": {"remove-defaults": True}}
-
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
-
- self.assertEqual(self.mock_add.call_count, 0)
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 1)
-
- def test_no_remove_defaults_if_false(self):
- """Test remove_defaults is not called when config value is False."""
- config = {"ca-certs": {"remove-defaults": False}}
-
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
-
- self.assertEqual(self.mock_add.call_count, 0)
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
-
- def test_correct_order_for_remove_then_add(self):
- """Test remove_defaults is not called when config value is False."""
- config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
-
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
-
- self.mock_add.assert_called_once_with(['CERT1'])
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 1)
-
-
-class TestAddCaCerts(TestCase):
-
- def setUp(self):
- super(TestAddCaCerts, self).setUp()
- tmpdir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, tmpdir)
- self.paths = helpers.Paths({
- 'cloud_dir': tmpdir,
- })
-
- def test_no_certs_in_list(self):
- """Test that no certificate are written if not provided."""
- with mock.patch.object(util, 'write_file') as mockobj:
- cc_ca_certs.add_ca_certs([])
- self.assertEqual(mockobj.call_count, 0)
-
- def test_single_cert_trailing_cr(self):
- """Test adding a single certificate to the trusted CAs
- when existing ca-certificates has trailing newline"""
- cert = "CERT1\nLINE2\nLINE3"
-
- ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"
- expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n"
-
- with ExitStack() as mocks:
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- mock_load = mocks.enter_context(
- mock.patch.object(util, 'load_file',
- return_value=ca_certs_content))
-
- cc_ca_certs.add_ca_certs([cert])
-
- mock_write.assert_has_calls([
- mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0o644),
- mock.call("/etc/ca-certificates.conf", expected, omode="wb")])
- mock_load.assert_called_once_with("/etc/ca-certificates.conf")
-
- def test_single_cert_no_trailing_cr(self):
- """Test adding a single certificate to the trusted CAs
- when existing ca-certificates has no trailing newline"""
- cert = "CERT1\nLINE2\nLINE3"
-
- ca_certs_content = "line1\nline2\nline3"
-
- with ExitStack() as mocks:
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- mock_load = mocks.enter_context(
- mock.patch.object(util, 'load_file',
- return_value=ca_certs_content))
-
- cc_ca_certs.add_ca_certs([cert])
-
- mock_write.assert_has_calls([
- mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0o644),
- mock.call("/etc/ca-certificates.conf",
- "%s\n%s\n" % (ca_certs_content,
- "cloud-init-ca-certs.crt"),
- omode="wb")])
-
- mock_load.assert_called_once_with("/etc/ca-certificates.conf")
-
- def test_multiple_certs(self):
- """Test adding multiple certificates to the trusted CAs."""
- certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
- expected_cert_file = "\n".join(certs)
- ca_certs_content = "line1\nline2\nline3"
-
- with ExitStack() as mocks:
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- mock_load = mocks.enter_context(
- mock.patch.object(util, 'load_file',
- return_value=ca_certs_content))
-
- cc_ca_certs.add_ca_certs(certs)
-
- mock_write.assert_has_calls([
- mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- expected_cert_file, mode=0o644),
- mock.call("/etc/ca-certificates.conf",
- "%s\n%s\n" % (ca_certs_content,
- "cloud-init-ca-certs.crt"),
- omode='wb')])
-
- mock_load.assert_called_once_with("/etc/ca-certificates.conf")
-
-
-class TestUpdateCaCerts(unittest.TestCase):
- def test_commands(self):
- with mock.patch.object(util, 'subp') as mockobj:
- cc_ca_certs.update_ca_certs()
- mockobj.assert_called_once_with(
- ["update-ca-certificates"], capture=False)
-
-
-class TestRemoveDefaultCaCerts(TestCase):
-
- def setUp(self):
- super(TestRemoveDefaultCaCerts, self).setUp()
- tmpdir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, tmpdir)
- self.paths = helpers.Paths({
- 'cloud_dir': tmpdir,
- })
-
- def test_commands(self):
- with ExitStack() as mocks:
- mock_delete = mocks.enter_context(
- mock.patch.object(util, 'delete_dir_contents'))
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- mock_subp = mocks.enter_context(mock.patch.object(util, 'subp'))
-
- cc_ca_certs.remove_default_ca_certs()
-
- mock_delete.assert_has_calls([
- mock.call("/usr/share/ca-certificates/"),
- mock.call("/etc/ssl/certs/")])
-
- mock_write.assert_called_once_with(
- "/etc/ca-certificates.conf", "", mode=0o644)
-
- mock_subp.assert_called_once_with(
- ('debconf-set-selections', '-'),
- "ca-certificates ca-certificates/trust_new_crts select no")
diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py
deleted file mode 100644
index 7a1bc317..00000000
--- a/tests/unittests/test_handler/test_handler_chef.py
+++ /dev/null
@@ -1,192 +0,0 @@
-import json
-import logging
-import os
-import shutil
-import six
-import tempfile
-
-from cloudinit import cloud
-from cloudinit.config import cc_chef
-from cloudinit import distros
-from cloudinit import helpers
-from cloudinit.sources import DataSourceNone
-from cloudinit import util
-
-from .. import helpers as t_help
-
-LOG = logging.getLogger(__name__)
-
-CLIENT_TEMPL = os.path.sep.join(["templates", "chef_client.rb.tmpl"])
-
-
-class TestChef(t_help.FilesystemMockingTestCase):
- def setUp(self):
- super(TestChef, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
-
- def fetch_cloud(self, distro_kind):
- cls = distros.fetch(distro_kind)
- paths = helpers.Paths({})
- distro = cls(distro_kind, {}, paths)
- ds = DataSourceNone.DataSourceNone({}, distro, paths, None)
- return cloud.Cloud(ds, paths, {}, distro, None)
-
- def test_no_config(self):
- self.patchUtils(self.tmp)
- self.patchOS(self.tmp)
-
- cfg = {}
- cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
- for d in cc_chef.CHEF_DIRS:
- self.assertFalse(os.path.isdir(d))
-
- @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL),
- CLIENT_TEMPL + " is not available")
- def test_basic_config(self):
- """
- test basic config looks sane
-
- # This should create a file of the format...
- # Created by cloud-init v. 0.7.6 on Sat, 11 Oct 2014 23:57:21 +0000
- log_level :info
- ssl_verify_mode :verify_none
- log_location "/var/log/chef/client.log"
- validation_client_name "bob"
- validation_key "/etc/chef/validation.pem"
- client_key "/etc/chef/client.pem"
- chef_server_url "localhost"
- environment "_default"
- node_name "iid-datasource-none"
- json_attribs "/etc/chef/firstboot.json"
- file_cache_path "/var/cache/chef"
- file_backup_path "/var/backups/chef"
- pid_file "/var/run/chef/client.pid"
- Chef::Log::Formatter.show_time = true
- """
- tpl_file = util.load_file('templates/chef_client.rb.tmpl')
- self.patchUtils(self.tmp)
- self.patchOS(self.tmp)
-
- util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file)
- cfg = {
- 'chef': {
- 'server_url': 'localhost',
- 'validation_name': 'bob',
- 'validation_key': "/etc/chef/vkey.pem",
- 'validation_cert': "this is my cert",
- },
- }
- cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
- for d in cc_chef.CHEF_DIRS:
- self.assertTrue(os.path.isdir(d))
- c = util.load_file(cc_chef.CHEF_RB_PATH)
-
- # the content of these keys is not expected to be rendered to tmpl
- unrendered_keys = ('validation_cert',)
- for k, v in cfg['chef'].items():
- if k in unrendered_keys:
- continue
- self.assertIn(v, c)
- for k, v in cc_chef.CHEF_RB_TPL_DEFAULTS.items():
- if k in unrendered_keys:
- continue
- # the value from the cfg overrides that in the default
- val = cfg['chef'].get(k, v)
- if isinstance(val, six.string_types):
- self.assertIn(val, c)
- c = util.load_file(cc_chef.CHEF_FB_PATH)
- self.assertEqual({}, json.loads(c))
-
- def test_firstboot_json(self):
- self.patchUtils(self.tmp)
- self.patchOS(self.tmp)
-
- cfg = {
- 'chef': {
- 'server_url': 'localhost',
- 'validation_name': 'bob',
- 'run_list': ['a', 'b', 'c'],
- 'initial_attributes': {
- 'c': 'd',
- }
- },
- }
- cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
- c = util.load_file(cc_chef.CHEF_FB_PATH)
- self.assertEqual(
- {
- 'run_list': ['a', 'b', 'c'],
- 'c': 'd',
- }, json.loads(c))
-
- @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL),
- CLIENT_TEMPL + " is not available")
- def test_template_deletes(self):
- tpl_file = util.load_file('templates/chef_client.rb.tmpl')
- self.patchUtils(self.tmp)
- self.patchOS(self.tmp)
-
- util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file)
- cfg = {
- 'chef': {
- 'server_url': 'localhost',
- 'validation_name': 'bob',
- 'json_attribs': None,
- 'show_time': None,
- },
- }
- cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
- c = util.load_file(cc_chef.CHEF_RB_PATH)
- self.assertNotIn('json_attribs', c)
- self.assertNotIn('Formatter.show_time', c)
-
- @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL),
- CLIENT_TEMPL + " is not available")
- def test_validation_cert_and_validation_key(self):
- # test validation_cert content is written to validation_key path
- tpl_file = util.load_file('templates/chef_client.rb.tmpl')
- self.patchUtils(self.tmp)
- self.patchOS(self.tmp)
-
- util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file)
- v_path = '/etc/chef/vkey.pem'
- v_cert = 'this is my cert'
- cfg = {
- 'chef': {
- 'server_url': 'localhost',
- 'validation_name': 'bob',
- 'validation_key': v_path,
- 'validation_cert': v_cert
- },
- }
- cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
- content = util.load_file(cc_chef.CHEF_RB_PATH)
- self.assertIn(v_path, content)
- util.load_file(v_path)
- self.assertEqual(v_cert, util.load_file(v_path))
-
- def test_validation_cert_with_system(self):
- # test validation_cert content is not written over system file
- tpl_file = util.load_file('templates/chef_client.rb.tmpl')
- self.patchUtils(self.tmp)
- self.patchOS(self.tmp)
-
- v_path = '/etc/chef/vkey.pem'
- v_cert = "system"
- expected_cert = "this is the system file certificate"
- cfg = {
- 'chef': {
- 'server_url': 'localhost',
- 'validation_name': 'bob',
- 'validation_key': v_path,
- 'validation_cert': v_cert
- },
- }
- util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file)
- util.write_file(v_path, expected_cert)
- cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
- content = util.load_file(cc_chef.CHEF_RB_PATH)
- self.assertIn(v_path, content)
- util.load_file(v_path)
- self.assertEqual(expected_cert, util.load_file(v_path))
diff --git a/tests/unittests/test_handler/test_handler_debug.py b/tests/unittests/test_handler/test_handler_debug.py
deleted file mode 100644
index 80708d7b..00000000
--- a/tests/unittests/test_handler/test_handler_debug.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# vi: ts=4 expandtab
-#
-# Copyright (C) 2014 Yahoo! Inc.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3, as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-from cloudinit.config import cc_debug
-
-from cloudinit import cloud
-from cloudinit import distros
-from cloudinit import helpers
-from cloudinit import util
-
-from cloudinit.sources import DataSourceNone
-
-from .. import helpers as t_help
-
-import logging
-import shutil
-import tempfile
-
-LOG = logging.getLogger(__name__)
-
-
-class TestDebug(t_help.FilesystemMockingTestCase):
- def setUp(self):
- super(TestDebug, self).setUp()
- self.new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.new_root)
-
- def _get_cloud(self, distro, metadata=None):
- self.patchUtils(self.new_root)
- paths = helpers.Paths({})
- cls = distros.fetch(distro)
- d = cls(distro, {}, paths)
- ds = DataSourceNone.DataSourceNone({}, d, paths)
- if metadata:
- ds.metadata.update(metadata)
- return cloud.Cloud(ds, paths, {}, d, None)
-
- def test_debug_write(self):
- cfg = {
- 'abc': '123',
- 'c': u'\u20a0',
- 'debug': {
- 'verbose': True,
- # Does not actually write here due to mocking...
- 'output': '/var/log/cloud-init-debug.log',
- },
- }
- cc = self._get_cloud('ubuntu')
- cc_debug.handle('cc_debug', cfg, cc, LOG, [])
- contents = util.load_file('/var/log/cloud-init-debug.log')
- # Some basic sanity tests...
- self.assertNotEqual(0, len(contents))
- for k in cfg.keys():
- self.assertIn(k, contents)
-
- def test_debug_no_write(self):
- cfg = {
- 'abc': '123',
- 'debug': {
- 'verbose': False,
- # Does not actually write here due to mocking...
- 'output': '/var/log/cloud-init-debug.log',
- },
- }
- cc = self._get_cloud('ubuntu')
- cc_debug.handle('cc_debug', cfg, cc, LOG, [])
- self.assertRaises(IOError,
- util.load_file, '/var/log/cloud-init-debug.log')
diff --git a/tests/unittests/test_handler/test_handler_disk_setup.py b/tests/unittests/test_handler/test_handler_disk_setup.py
deleted file mode 100644
index ddef8d48..00000000
--- a/tests/unittests/test_handler/test_handler_disk_setup.py
+++ /dev/null
@@ -1,30 +0,0 @@
-from cloudinit.config import cc_disk_setup
-from ..helpers import ExitStack, mock, TestCase
-
-
-class TestIsDiskUsed(TestCase):
-
- def setUp(self):
- super(TestIsDiskUsed, self).setUp()
- self.patches = ExitStack()
- mod_name = 'cloudinit.config.cc_disk_setup'
- self.enumerate_disk = self.patches.enter_context(
- mock.patch('{0}.enumerate_disk'.format(mod_name)))
- self.check_fs = self.patches.enter_context(
- mock.patch('{0}.check_fs'.format(mod_name)))
-
- def test_multiple_child_nodes_returns_true(self):
- self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(2))
- self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock())
- self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock()))
-
- def test_valid_filesystem_returns_true(self):
- self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1))
- self.check_fs.return_value = (
- mock.MagicMock(), 'ext4', mock.MagicMock())
- self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock()))
-
- def test_one_child_nodes_and_no_fs_returns_false(self):
- self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1))
- self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock())
- self.assertFalse(cc_disk_setup.is_disk_used(mock.MagicMock()))
diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py
deleted file mode 100644
index e653488a..00000000
--- a/tests/unittests/test_handler/test_handler_growpart.py
+++ /dev/null
@@ -1,220 +0,0 @@
-from cloudinit import cloud
-from cloudinit.config import cc_growpart
-from cloudinit import util
-
-from ..helpers import TestCase
-
-import errno
-import logging
-import os
-import re
-import unittest
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
-
-# growpart:
-# mode: auto # off, on, auto, 'growpart'
-# devices: ['root']
-
-HELP_GROWPART_RESIZE = """
-growpart disk partition
- rewrite partition table so that partition takes up all the space it can
- options:
- -h | --help print Usage and exit
-<SNIP>
- -u | --update R update the the kernel partition table info after growing
- this requires kernel support and 'partx --update'
- R is one of:
- - 'auto' : [default] update partition if possible
-<SNIP>
- Example:
- - growpart /dev/sda 1
- Resize partition 1 on /dev/sda
-"""
-
-HELP_GROWPART_NO_RESIZE = """
-growpart disk partition
- rewrite partition table so that partition takes up all the space it can
- options:
- -h | --help print Usage and exit
-<SNIP>
- Example:
- - growpart /dev/sda 1
- Resize partition 1 on /dev/sda
-"""
-
-
-class TestDisabled(unittest.TestCase):
- def setUp(self):
- super(TestDisabled, self).setUp()
- self.name = "growpart"
- self.cloud_init = None
- self.log = logging.getLogger("TestDisabled")
- self.args = []
-
- self.handle = cc_growpart.handle
-
- def test_mode_off(self):
- # Test that nothing is done if mode is off.
-
- # this really only verifies that resizer_factory isn't called
- config = {'growpart': {'mode': 'off'}}
-
- with mock.patch.object(cc_growpart, 'resizer_factory') as mockobj:
- self.handle(self.name, config, self.cloud_init, self.log,
- self.args)
- self.assertEqual(mockobj.call_count, 0)
-
-
-class TestConfig(TestCase):
- def setUp(self):
- super(TestConfig, self).setUp()
- self.name = "growpart"
- self.paths = None
- self.cloud = cloud.Cloud(None, self.paths, None, None, None)
- self.log = logging.getLogger("TestConfig")
- self.args = []
- os.environ = {}
-
- self.cloud_init = None
- self.handle = cc_growpart.handle
-
- def test_no_resizers_auto_is_fine(self):
- with mock.patch.object(
- util, 'subp',
- return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj:
-
- config = {'growpart': {'mode': 'auto'}}
- self.handle(self.name, config, self.cloud_init, self.log,
- self.args)
-
- mockobj.assert_called_once_with(
- ['growpart', '--help'], env={'LANG': 'C'})
-
- def test_no_resizers_mode_growpart_is_exception(self):
- with mock.patch.object(
- util, 'subp',
- return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj:
- config = {'growpart': {'mode': "growpart"}}
- self.assertRaises(
- ValueError, self.handle, self.name, config,
- self.cloud_init, self.log, self.args)
-
- mockobj.assert_called_once_with(
- ['growpart', '--help'], env={'LANG': 'C'})
-
- def test_mode_auto_prefers_growpart(self):
- with mock.patch.object(
- util, 'subp',
- return_value=(HELP_GROWPART_RESIZE, "")) as mockobj:
- ret = cc_growpart.resizer_factory(mode="auto")
- self.assertIsInstance(ret, cc_growpart.ResizeGrowPart)
-
- mockobj.assert_called_once_with(
- ['growpart', '--help'], env={'LANG': 'C'})
-
- def test_handle_with_no_growpart_entry(self):
- # if no 'growpart' entry in config, then mode=auto should be used
-
- myresizer = object()
- retval = (("/", cc_growpart.RESIZE.CHANGED, "my-message",),)
-
- with ExitStack() as mocks:
- factory = mocks.enter_context(
- mock.patch.object(cc_growpart, 'resizer_factory',
- return_value=myresizer))
- rsdevs = mocks.enter_context(
- mock.patch.object(cc_growpart, 'resize_devices',
- return_value=retval))
- mocks.enter_context(
- mock.patch.object(cc_growpart, 'RESIZERS',
- (('mysizer', object),)
- ))
-
- self.handle(self.name, {}, self.cloud_init, self.log, self.args)
-
- factory.assert_called_once_with('auto')
- rsdevs.assert_called_once_with(myresizer, ['/'])
-
-
-class TestResize(unittest.TestCase):
- def setUp(self):
- super(TestResize, self).setUp()
- self.name = "growpart"
- self.log = logging.getLogger("TestResize")
-
- def test_simple_devices(self):
- # test simple device list
- # this patches out devent2dev, os.stat, and device_part_info
- # so in the end, doesn't test a lot
- devs = ["/dev/XXda1", "/dev/YYda2"]
- devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5,
- st_nlink=1, st_uid=0, st_gid=6, st_size=0,
- st_atime=0, st_mtime=0, st_ctime=0)
- enoent = ["/dev/NOENT"]
- real_stat = os.stat
- resize_calls = []
-
- class myresizer(object):
- def resize(self, diskdev, partnum, partdev):
- resize_calls.append((diskdev, partnum, partdev))
- if partdev == "/dev/YYda2":
- return (1024, 2048)
- return (1024, 1024) # old size, new size
-
- def mystat(path):
- if path in devs:
- return devstat_ret
- if path in enoent:
- e = OSError("%s: does not exist" % path)
- e.errno = errno.ENOENT
- raise e
- return real_stat(path)
-
- try:
- opinfo = cc_growpart.device_part_info
- cc_growpart.device_part_info = simple_device_part_info
- os.stat = mystat
-
- resized = cc_growpart.resize_devices(myresizer(), devs + enoent)
-
- def find(name, res):
- for f in res:
- if f[0] == name:
- return f
- return None
-
- self.assertEqual(cc_growpart.RESIZE.NOCHANGE,
- find("/dev/XXda1", resized)[1])
- self.assertEqual(cc_growpart.RESIZE.CHANGED,
- find("/dev/YYda2", resized)[1])
- self.assertEqual(cc_growpart.RESIZE.SKIPPED,
- find(enoent[0], resized)[1])
- # self.assertEqual(resize_calls,
- # [("/dev/XXda", "1", "/dev/XXda1"),
- # ("/dev/YYda", "2", "/dev/YYda2")])
- finally:
- cc_growpart.device_part_info = opinfo
- os.stat = real_stat
-
-
-def simple_device_part_info(devpath):
- # simple stupid return (/dev/vda, 1) for /dev/vda
- ret = re.search("([^0-9]*)([0-9]*)$", devpath)
- x = (ret.group(1), ret.group(2))
- return x
-
-
-class Bunch(object):
- def __init__(self, **kwds):
- self.__dict__.update(kwds)
-
-
-# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py
deleted file mode 100644
index c91908f4..00000000
--- a/tests/unittests/test_handler/test_handler_locale.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
-#
-# Author: Juerg Haefliger <juerg.haefliger@hp.com>
-#
-# Based on test_handler_set_hostname.py
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3, as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-from cloudinit.config import cc_locale
-
-from cloudinit import cloud
-from cloudinit import distros
-from cloudinit import helpers
-from cloudinit import util
-
-from cloudinit.sources import DataSourceNoCloud
-
-from .. import helpers as t_help
-
-from configobj import ConfigObj
-
-from six import BytesIO
-
-import logging
-import shutil
-import tempfile
-
-LOG = logging.getLogger(__name__)
-
-
-class TestLocale(t_help.FilesystemMockingTestCase):
- def setUp(self):
- super(TestLocale, self).setUp()
- self.new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.new_root)
-
- def _get_cloud(self, distro):
- self.patchUtils(self.new_root)
- paths = helpers.Paths({})
-
- cls = distros.fetch(distro)
- d = cls(distro, {}, paths)
- ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths)
- cc = cloud.Cloud(ds, paths, {}, d, None)
- return cc
-
- def test_set_locale_sles(self):
-
- cfg = {
- 'locale': 'My.Locale',
- }
- cc = self._get_cloud('sles')
- cc_locale.handle('cc_locale', cfg, cc, LOG, [])
-
- contents = util.load_file('/etc/sysconfig/language', decode=False)
- n_cfg = ConfigObj(BytesIO(contents))
- self.assertEqual({'RC_LANG': cfg['locale']}, dict(n_cfg))
diff --git a/tests/unittests/test_handler/test_handler_lxd.py b/tests/unittests/test_handler/test_handler_lxd.py
deleted file mode 100644
index 6f90defb..00000000
--- a/tests/unittests/test_handler/test_handler_lxd.py
+++ /dev/null
@@ -1,134 +0,0 @@
-from cloudinit.config import cc_lxd
-from cloudinit.sources import DataSourceNoCloud
-from cloudinit import (distros, helpers, cloud)
-from .. import helpers as t_help
-
-import logging
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-
-LOG = logging.getLogger(__name__)
-
-
-class TestLxd(t_help.TestCase):
- lxd_cfg = {
- 'lxd': {
- 'init': {
- 'network_address': '0.0.0.0',
- 'storage_backend': 'zfs',
- 'storage_pool': 'poolname',
- }
- }
- }
-
- def setUp(self):
- super(TestLxd, self).setUp()
-
- def _get_cloud(self, distro):
- cls = distros.fetch(distro)
- paths = helpers.Paths({})
- d = cls(distro, {}, paths)
- ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths)
- cc = cloud.Cloud(ds, paths, {}, d, None)
- return cc
-
- @mock.patch("cloudinit.config.cc_lxd.util")
- def test_lxd_init(self, mock_util):
- cc = self._get_cloud('ubuntu')
- mock_util.which.return_value = True
- cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, [])
- self.assertTrue(mock_util.which.called)
- init_call = mock_util.subp.call_args_list[0][0][0]
- self.assertEqual(init_call,
- ['lxd', 'init', '--auto',
- '--network-address=0.0.0.0',
- '--storage-backend=zfs',
- '--storage-pool=poolname'])
-
- @mock.patch("cloudinit.config.cc_lxd.util")
- def test_lxd_install(self, mock_util):
- cc = self._get_cloud('ubuntu')
- cc.distro = mock.MagicMock()
- mock_util.which.return_value = None
- cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, [])
- self.assertTrue(cc.distro.install_packages.called)
- install_pkg = cc.distro.install_packages.call_args_list[0][0][0]
- self.assertEqual(sorted(install_pkg), ['lxd', 'zfs'])
-
- @mock.patch("cloudinit.config.cc_lxd.util")
- def test_no_init_does_nothing(self, mock_util):
- cc = self._get_cloud('ubuntu')
- cc.distro = mock.MagicMock()
- cc_lxd.handle('cc_lxd', {'lxd': {}}, cc, LOG, [])
- self.assertFalse(cc.distro.install_packages.called)
- self.assertFalse(mock_util.subp.called)
-
- @mock.patch("cloudinit.config.cc_lxd.util")
- def test_no_lxd_does_nothing(self, mock_util):
- cc = self._get_cloud('ubuntu')
- cc.distro = mock.MagicMock()
- cc_lxd.handle('cc_lxd', {'package_update': True}, cc, LOG, [])
- self.assertFalse(cc.distro.install_packages.called)
- self.assertFalse(mock_util.subp.called)
-
- def test_lxd_debconf_new_full(self):
- data = {"mode": "new",
- "name": "testbr0",
- "ipv4_address": "10.0.8.1",
- "ipv4_netmask": "24",
- "ipv4_dhcp_first": "10.0.8.2",
- "ipv4_dhcp_last": "10.0.8.254",
- "ipv4_dhcp_leases": "250",
- "ipv4_nat": "true",
- "ipv6_address": "fd98:9e0:3744::1",
- "ipv6_netmask": "64",
- "ipv6_nat": "true",
- "domain": "lxd"}
- self.assertEqual(
- cc_lxd.bridge_to_debconf(data),
- {"lxd/setup-bridge": "true",
- "lxd/bridge-name": "testbr0",
- "lxd/bridge-ipv4": "true",
- "lxd/bridge-ipv4-address": "10.0.8.1",
- "lxd/bridge-ipv4-netmask": "24",
- "lxd/bridge-ipv4-dhcp-first": "10.0.8.2",
- "lxd/bridge-ipv4-dhcp-last": "10.0.8.254",
- "lxd/bridge-ipv4-dhcp-leases": "250",
- "lxd/bridge-ipv4-nat": "true",
- "lxd/bridge-ipv6": "true",
- "lxd/bridge-ipv6-address": "fd98:9e0:3744::1",
- "lxd/bridge-ipv6-netmask": "64",
- "lxd/bridge-ipv6-nat": "true",
- "lxd/bridge-domain": "lxd"})
-
- def test_lxd_debconf_new_partial(self):
- data = {"mode": "new",
- "ipv6_address": "fd98:9e0:3744::1",
- "ipv6_netmask": "64",
- "ipv6_nat": "true"}
- self.assertEqual(
- cc_lxd.bridge_to_debconf(data),
- {"lxd/setup-bridge": "true",
- "lxd/bridge-ipv6": "true",
- "lxd/bridge-ipv6-address": "fd98:9e0:3744::1",
- "lxd/bridge-ipv6-netmask": "64",
- "lxd/bridge-ipv6-nat": "true"})
-
- def test_lxd_debconf_existing(self):
- data = {"mode": "existing",
- "name": "testbr0"}
- self.assertEqual(
- cc_lxd.bridge_to_debconf(data),
- {"lxd/setup-bridge": "false",
- "lxd/use-existing-bridge": "true",
- "lxd/bridge-name": "testbr0"})
-
- def test_lxd_debconf_none(self):
- data = {"mode": "none"}
- self.assertEqual(
- cc_lxd.bridge_to_debconf(data),
- {"lxd/setup-bridge": "false",
- "lxd/bridge-name": ""})
diff --git a/tests/unittests/test_handler/test_handler_mcollective.py b/tests/unittests/test_handler/test_handler_mcollective.py
deleted file mode 100644
index 6aefb93d..00000000
--- a/tests/unittests/test_handler/test_handler_mcollective.py
+++ /dev/null
@@ -1,148 +0,0 @@
-from cloudinit import (cloud, distros, helpers, util)
-from cloudinit.config import cc_mcollective
-from cloudinit.sources import DataSourceNoCloud
-
-from .. import helpers as t_help
-
-import configobj
-import logging
-import os
-import shutil
-from six import BytesIO
-import tempfile
-
-LOG = logging.getLogger(__name__)
-
-
-STOCK_CONFIG = """\
-main_collective = mcollective
-collectives = mcollective
-libdir = /usr/share/mcollective/plugins
-logfile = /var/log/mcollective.log
-loglevel = info
-daemonize = 1
-
-# Plugins
-securityprovider = psk
-plugin.psk = unset
-
-connector = activemq
-plugin.activemq.pool.size = 1
-plugin.activemq.pool.1.host = stomp1
-plugin.activemq.pool.1.port = 61613
-plugin.activemq.pool.1.user = mcollective
-plugin.activemq.pool.1.password = marionette
-
-# Facts
-factsource = yaml
-plugin.yaml = /etc/mcollective/facts.yaml
-"""
-
-
-class TestConfig(t_help.FilesystemMockingTestCase):
- def setUp(self):
- super(TestConfig, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- # "./": make os.path.join behave correctly with abs path as second arg
- self.server_cfg = os.path.join(
- self.tmp, "./" + cc_mcollective.SERVER_CFG)
- self.pubcert_file = os.path.join(
- self.tmp, "./" + cc_mcollective.PUBCERT_FILE)
- self.pricert_file= os.path.join(
- self.tmp, self.tmp, "./" + cc_mcollective.PRICERT_FILE)
-
- def test_basic_config(self):
- cfg = {
- 'mcollective': {
- 'conf': {
- 'loglevel': 'debug',
- 'connector': 'rabbitmq',
- 'logfile': '/var/log/mcollective.log',
- 'ttl': '4294957',
- 'collectives': 'mcollective',
- 'main_collective': 'mcollective',
- 'securityprovider': 'psk',
- 'daemonize': '1',
- 'factsource': 'yaml',
- 'direct_addressing': '1',
- 'plugin.psk': 'unset',
- 'libdir': '/usr/share/mcollective/plugins',
- 'identity': '1',
- },
- },
- }
- expected = cfg['mcollective']['conf']
-
- self.patchUtils(self.tmp)
- cc_mcollective.configure(cfg['mcollective']['conf'])
- contents = util.load_file(cc_mcollective.SERVER_CFG, decode=False)
- contents = configobj.ConfigObj(BytesIO(contents))
- self.assertEqual(expected, dict(contents))
-
- def test_existing_config_is_saved(self):
- cfg = {'loglevel': 'warn'}
- util.write_file(self.server_cfg, STOCK_CONFIG)
- cc_mcollective.configure(config=cfg, server_cfg=self.server_cfg)
- self.assertTrue(os.path.exists(self.server_cfg))
- self.assertTrue(os.path.exists(self.server_cfg + ".old"))
- self.assertEqual(util.load_file(self.server_cfg + ".old"), STOCK_CONFIG)
-
- def test_existing_updated(self):
- cfg = {'loglevel': 'warn'}
- util.write_file(self.server_cfg, STOCK_CONFIG)
- cc_mcollective.configure(config=cfg, server_cfg=self.server_cfg)
- cfgobj = configobj.ConfigObj(self.server_cfg)
- self.assertEqual(cfg['loglevel'], cfgobj['loglevel'])
-
- def test_certificats_written(self):
- # check public-cert and private-cert keys in config get written
- cfg = {'loglevel': 'debug',
- 'public-cert': "this is my public-certificate",
- 'private-cert': "secret private certificate"}
-
- cc_mcollective.configure(config=cfg,
- server_cfg=self.server_cfg, pricert_file=self.pricert_file,
- pubcert_file=self.pubcert_file)
-
- found = configobj.ConfigObj(self.server_cfg)
-
- # make sure these didnt get written in
- self.assertFalse('public-cert' in found)
- self.assertFalse('private-cert' in found)
-
- # these need updating to the specified paths
- self.assertEqual(found['plugin.ssl_server_public'], self.pubcert_file)
- self.assertEqual(found['plugin.ssl_server_private'], self.pricert_file)
-
- # and the security provider should be ssl
- self.assertEqual(found['securityprovider'], 'ssl')
-
- self.assertEqual(
- util.load_file(self.pricert_file), cfg['private-cert'])
- self.assertEqual(
- util.load_file(self.pubcert_file), cfg['public-cert'])
-
-
-class TestHandler(t_help.TestCase):
- def _get_cloud(self, distro):
- cls = distros.fetch(distro)
- paths = helpers.Paths({})
- d = cls(distro, {}, paths)
- ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths)
- cc = cloud.Cloud(ds, paths, {}, d, None)
- return cc
-
- @t_help.mock.patch("cloudinit.config.cc_mcollective.util")
- def test_mcollective_install(self, mock_util):
- cc = self._get_cloud('ubuntu')
- cc.distro = t_help.mock.MagicMock()
- mycfg = {'mcollective': {'conf': {'loglevel': 'debug'}}}
- cc_mcollective.handle('cc_mcollective', mycfg, cc, LOG, [])
- self.assertTrue(cc.distro.install_packages.called)
- install_pkg = cc.distro.install_packages.call_args_list[0][0][0]
- self.assertEqual(install_pkg, ('mcollective',))
-
- self.assertTrue(mock_util.subp.called)
- self.assertEqual(mock_util.subp.call_args_list[0][0][0],
- ['service', 'mcollective', 'restart'])
diff --git a/tests/unittests/test_handler/test_handler_mounts.py b/tests/unittests/test_handler/test_handler_mounts.py
deleted file mode 100644
index 355674b2..00000000
--- a/tests/unittests/test_handler/test_handler_mounts.py
+++ /dev/null
@@ -1,133 +0,0 @@
-import os.path
-import shutil
-import tempfile
-
-from cloudinit.config import cc_mounts
-
-from .. import helpers as test_helpers
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-
-
-class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase):
-
- def setUp(self):
- super(TestSanitizeDevname, self).setUp()
- self.new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.new_root)
- self.patchOS(self.new_root)
-
- def _touch(self, path):
- path = os.path.join(self.new_root, path.lstrip('/'))
- basedir = os.path.dirname(path)
- if not os.path.exists(basedir):
- os.makedirs(basedir)
- open(path, 'a').close()
-
- def _makedirs(self, directory):
- directory = os.path.join(self.new_root, directory.lstrip('/'))
- if not os.path.exists(directory):
- os.makedirs(directory)
-
- def mock_existence_of_disk(self, disk_path):
- self._touch(disk_path)
- self._makedirs(os.path.join('/sys/block', disk_path.split('/')[-1]))
-
- def mock_existence_of_partition(self, disk_path, partition_number):
- self.mock_existence_of_disk(disk_path)
- self._touch(disk_path + str(partition_number))
- disk_name = disk_path.split('/')[-1]
- self._makedirs(os.path.join('/sys/block',
- disk_name,
- disk_name + str(partition_number)))
-
- def test_existent_full_disk_path_is_returned(self):
- disk_path = '/dev/sda'
- self.mock_existence_of_disk(disk_path)
- self.assertEqual(disk_path,
- cc_mounts.sanitize_devname(disk_path,
- lambda x: None,
- mock.Mock()))
-
- def test_existent_disk_name_returns_full_path(self):
- disk_name = 'sda'
- disk_path = '/dev/' + disk_name
- self.mock_existence_of_disk(disk_path)
- self.assertEqual(disk_path,
- cc_mounts.sanitize_devname(disk_name,
- lambda x: None,
- mock.Mock()))
-
- def test_existent_meta_disk_is_returned(self):
- actual_disk_path = '/dev/sda'
- self.mock_existence_of_disk(actual_disk_path)
- self.assertEqual(
- actual_disk_path,
- cc_mounts.sanitize_devname('ephemeral0',
- lambda x: actual_disk_path,
- mock.Mock()))
-
- def test_existent_meta_partition_is_returned(self):
- disk_name, partition_part = '/dev/sda', '1'
- actual_partition_path = disk_name + partition_part
- self.mock_existence_of_partition(disk_name, partition_part)
- self.assertEqual(
- actual_partition_path,
- cc_mounts.sanitize_devname('ephemeral0.1',
- lambda x: disk_name,
- mock.Mock()))
-
- def test_existent_meta_partition_with_p_is_returned(self):
- disk_name, partition_part = '/dev/sda', 'p1'
- actual_partition_path = disk_name + partition_part
- self.mock_existence_of_partition(disk_name, partition_part)
- self.assertEqual(
- actual_partition_path,
- cc_mounts.sanitize_devname('ephemeral0.1',
- lambda x: disk_name,
- mock.Mock()))
-
- def test_first_partition_returned_if_existent_disk_is_partitioned(self):
- disk_name, partition_part = '/dev/sda', '1'
- actual_partition_path = disk_name + partition_part
- self.mock_existence_of_partition(disk_name, partition_part)
- self.assertEqual(
- actual_partition_path,
- cc_mounts.sanitize_devname('ephemeral0',
- lambda x: disk_name,
- mock.Mock()))
-
- def test_nth_partition_returned_if_requested(self):
- disk_name, partition_part = '/dev/sda', '3'
- actual_partition_path = disk_name + partition_part
- self.mock_existence_of_partition(disk_name, partition_part)
- self.assertEqual(
- actual_partition_path,
- cc_mounts.sanitize_devname('ephemeral0.3',
- lambda x: disk_name,
- mock.Mock()))
-
- def test_transformer_returning_none_returns_none(self):
- self.assertIsNone(
- cc_mounts.sanitize_devname(
- 'ephemeral0', lambda x: None, mock.Mock()))
-
- def test_missing_device_returns_none(self):
- self.assertIsNone(
- cc_mounts.sanitize_devname('/dev/sda', None, mock.Mock()))
-
- def test_missing_sys_returns_none(self):
- disk_path = '/dev/sda'
- self._makedirs(disk_path)
- self.assertIsNone(
- cc_mounts.sanitize_devname(disk_path, None, mock.Mock()))
-
- def test_existent_disk_but_missing_partition_returns_none(self):
- disk_path = '/dev/sda'
- self.mock_existence_of_disk(disk_path)
- self.assertIsNone(
- cc_mounts.sanitize_devname(
- 'ephemeral0.1', lambda x: disk_path, mock.Mock()))
diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py
deleted file mode 100644
index feff319d..00000000
--- a/tests/unittests/test_handler/test_handler_power_state.py
+++ /dev/null
@@ -1,127 +0,0 @@
-import sys
-
-from cloudinit.config import cc_power_state_change as psc
-
-from .. import helpers as t_help
-from ..helpers import mock
-
-
-class TestLoadPowerState(t_help.TestCase):
- def setUp(self):
- super(self.__class__, self).setUp()
-
- def test_no_config(self):
- # completely empty config should mean do nothing
- (cmd, _timeout, _condition) = psc.load_power_state({})
- self.assertEqual(cmd, None)
-
- def test_irrelevant_config(self):
- # no power_state field in config should return None for cmd
- (cmd, _timeout, _condition) = psc.load_power_state({'foo': 'bar'})
- self.assertEqual(cmd, None)
-
- def test_invalid_mode(self):
- cfg = {'power_state': {'mode': 'gibberish'}}
- self.assertRaises(TypeError, psc.load_power_state, cfg)
-
- cfg = {'power_state': {'mode': ''}}
- self.assertRaises(TypeError, psc.load_power_state, cfg)
-
- def test_empty_mode(self):
- cfg = {'power_state': {'message': 'goodbye'}}
- self.assertRaises(TypeError, psc.load_power_state, cfg)
-
- def test_valid_modes(self):
- cfg = {'power_state': {}}
- for mode in ('halt', 'poweroff', 'reboot'):
- cfg['power_state']['mode'] = mode
- check_lps_ret(psc.load_power_state(cfg), mode=mode)
-
- def test_invalid_delay(self):
- cfg = {'power_state': {'mode': 'poweroff', 'delay': 'goodbye'}}
- self.assertRaises(TypeError, psc.load_power_state, cfg)
-
- def test_valid_delay(self):
- cfg = {'power_state': {'mode': 'poweroff', 'delay': ''}}
- for delay in ("now", "+1", "+30"):
- cfg['power_state']['delay'] = delay
- check_lps_ret(psc.load_power_state(cfg))
-
- def test_message_present(self):
- cfg = {'power_state': {'mode': 'poweroff', 'message': 'GOODBYE'}}
- ret = psc.load_power_state(cfg)
- check_lps_ret(psc.load_power_state(cfg))
- self.assertIn(cfg['power_state']['message'], ret[0])
-
- def test_no_message(self):
- # if message is not present, then no argument should be passed for it
- cfg = {'power_state': {'mode': 'poweroff'}}
- (cmd, _timeout, _condition) = psc.load_power_state(cfg)
- self.assertNotIn("", cmd)
- check_lps_ret(psc.load_power_state(cfg))
- self.assertTrue(len(cmd) == 3)
-
- def test_condition_null_raises(self):
- cfg = {'power_state': {'mode': 'poweroff', 'condition': None}}
- self.assertRaises(TypeError, psc.load_power_state, cfg)
-
- def test_condition_default_is_true(self):
- cfg = {'power_state': {'mode': 'poweroff'}}
- _cmd, _timeout, cond = psc.load_power_state(cfg)
- self.assertEqual(cond, True)
-
-
-class TestCheckCondition(t_help.TestCase):
- def cmd_with_exit(self, rc):
- return([sys.executable, '-c', 'import sys; sys.exit(%s)' % rc])
-
- def test_true_is_true(self):
- self.assertEqual(psc.check_condition(True), True)
-
- def test_false_is_false(self):
- self.assertEqual(psc.check_condition(False), False)
-
- def test_cmd_exit_zero_true(self):
- self.assertEqual(psc.check_condition(self.cmd_with_exit(0)), True)
-
- def test_cmd_exit_one_false(self):
- self.assertEqual(psc.check_condition(self.cmd_with_exit(1)), False)
-
- def test_cmd_exit_nonzero_warns(self):
- mocklog = mock.Mock()
- self.assertEqual(
- psc.check_condition(self.cmd_with_exit(2), mocklog), False)
- self.assertEqual(mocklog.warn.call_count, 1)
-
-
-def check_lps_ret(psc_return, mode=None):
- if len(psc_return) != 3:
- raise TypeError("length returned = %d" % len(psc_return))
-
- errs = []
- cmd = psc_return[0]
- timeout = psc_return[1]
- condition = psc_return[2]
-
- if 'shutdown' not in psc_return[0][0]:
- errs.append("string 'shutdown' not in cmd")
-
- if condition is None:
- errs.append("condition was not returned")
-
- if mode is not None:
- opt = {'halt': '-H', 'poweroff': '-P', 'reboot': '-r'}[mode]
- if opt not in psc_return[0]:
- errs.append("opt '%s' not in cmd: %s" % (opt, cmd))
-
- if len(cmd) != 3 and len(cmd) != 4:
- errs.append("Invalid command length: %s" % len(cmd))
-
- try:
- float(timeout)
- except Exception:
- errs.append("timeout failed convert to float")
-
- if len(errs):
- lines = ["Errors in result: %s" % str(psc_return)] + errs
- raise Exception('\n'.join(lines))
diff --git a/tests/unittests/test_handler/test_handler_rsyslog.py b/tests/unittests/test_handler/test_handler_rsyslog.py
deleted file mode 100644
index 38636063..00000000
--- a/tests/unittests/test_handler/test_handler_rsyslog.py
+++ /dev/null
@@ -1,174 +0,0 @@
-import os
-import shutil
-import tempfile
-
-from cloudinit.config.cc_rsyslog import (
- apply_rsyslog_changes, DEF_DIR, DEF_FILENAME, DEF_RELOAD, load_config,
- parse_remotes_line, remotes_to_rsyslog_cfg)
-from cloudinit import util
-
-from .. import helpers as t_help
-
-
-class TestLoadConfig(t_help.TestCase):
- def setUp(self):
- super(TestLoadConfig, self).setUp()
- self.basecfg = {
- 'config_filename': DEF_FILENAME,
- 'config_dir': DEF_DIR,
- 'service_reload_command': DEF_RELOAD,
- 'configs': [],
- 'remotes': {},
- }
-
- def test_legacy_full(self):
- found = load_config({
- 'rsyslog': ['*.* @192.168.1.1'],
- 'rsyslog_dir': "mydir",
- 'rsyslog_filename': "myfilename"})
- self.basecfg.update({
- 'configs': ['*.* @192.168.1.1'],
- 'config_dir': "mydir",
- 'config_filename': 'myfilename',
- 'service_reload_command': 'auto'}
- )
-
- self.assertEqual(found, self.basecfg)
-
- def test_legacy_defaults(self):
- found = load_config({
- 'rsyslog': ['*.* @192.168.1.1']})
- self.basecfg.update({
- 'configs': ['*.* @192.168.1.1']})
- self.assertEqual(found, self.basecfg)
-
- def test_new_defaults(self):
- self.assertEqual(load_config({}), self.basecfg)
-
- def test_new_configs(self):
- cfgs = ['*.* myhost', '*.* my2host']
- self.basecfg.update({'configs': cfgs})
- self.assertEqual(
- load_config({'rsyslog': {'configs': cfgs}}),
- self.basecfg)
-
-
-class TestApplyChanges(t_help.TestCase):
- def setUp(self):
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
-
- def test_simple(self):
- cfgline = "*.* foohost"
- changed = apply_rsyslog_changes(
- configs=[cfgline], def_fname="foo.cfg", cfg_dir=self.tmp)
-
- fname = os.path.join(self.tmp, "foo.cfg")
- self.assertEqual([fname], changed)
- self.assertEqual(
- util.load_file(fname), cfgline + "\n")
-
- def test_multiple_files(self):
- configs = [
- '*.* foohost',
- {'content': 'abc', 'filename': 'my.cfg'},
- {'content': 'filefoo-content',
- 'filename': os.path.join(self.tmp, 'mydir/mycfg')},
- ]
-
- changed = apply_rsyslog_changes(
- configs=configs, def_fname="default.cfg", cfg_dir=self.tmp)
-
- expected = [
- (os.path.join(self.tmp, "default.cfg"),
- "*.* foohost\n"),
- (os.path.join(self.tmp, "my.cfg"), "abc\n"),
- (os.path.join(self.tmp, "mydir/mycfg"), "filefoo-content\n"),
- ]
- self.assertEqual([f[0] for f in expected], changed)
- actual = []
- for fname, _content in expected:
- util.load_file(fname)
- actual.append((fname, util.load_file(fname),))
- self.assertEqual(expected, actual)
-
- def test_repeat_def(self):
- configs = ['*.* foohost', "*.warn otherhost"]
-
- changed = apply_rsyslog_changes(
- configs=configs, def_fname="default.cfg", cfg_dir=self.tmp)
-
- fname = os.path.join(self.tmp, "default.cfg")
- self.assertEqual([fname], changed)
-
- expected_content = '\n'.join([c for c in configs]) + '\n'
- found_content = util.load_file(fname)
- self.assertEqual(expected_content, found_content)
-
- def test_multiline_content(self):
- configs = ['line1', 'line2\nline3\n']
-
- apply_rsyslog_changes(
- configs=configs, def_fname="default.cfg", cfg_dir=self.tmp)
-
- fname = os.path.join(self.tmp, "default.cfg")
- expected_content = '\n'.join([c for c in configs])
- found_content = util.load_file(fname)
- self.assertEqual(expected_content, found_content)
-
-
-class TestParseRemotesLine(t_help.TestCase):
- def test_valid_port(self):
- r = parse_remotes_line("foo:9")
- self.assertEqual(9, r.port)
-
- def test_invalid_port(self):
- with self.assertRaises(ValueError):
- parse_remotes_line("*.* foo:abc")
-
- def test_valid_ipv6(self):
- r = parse_remotes_line("*.* [::1]")
- self.assertEqual("*.* @[::1]", str(r))
-
- def test_valid_ipv6_with_port(self):
- r = parse_remotes_line("*.* [::1]:100")
- self.assertEqual(r.port, 100)
- self.assertEqual(r.addr, "::1")
- self.assertEqual("*.* @[::1]:100", str(r))
-
- def test_invalid_multiple_colon(self):
- with self.assertRaises(ValueError):
- parse_remotes_line("*.* ::1:100")
-
- def test_name_in_string(self):
- r = parse_remotes_line("syslog.host", name="foobar")
- self.assertEqual("*.* @syslog.host # foobar", str(r))
-
-
-class TestRemotesToSyslog(t_help.TestCase):
- def test_simple(self):
- # str rendered line must appear in remotes_to_ryslog_cfg return
- mycfg = "*.* myhost"
- myline = str(parse_remotes_line(mycfg, name="myname"))
- r = remotes_to_rsyslog_cfg({'myname': mycfg})
- lines = r.splitlines()
- self.assertEqual(1, len(lines))
- self.assertTrue(myline in r.splitlines())
-
- def test_header_footer(self):
- header = "#foo head"
- footer = "#foo foot"
- r = remotes_to_rsyslog_cfg(
- {'myname': "*.* myhost"}, header=header, footer=footer)
- lines = r.splitlines()
- self.assertTrue(header, lines[0])
- self.assertTrue(footer, lines[-1])
-
- def test_with_empty_or_null(self):
- mycfg = "*.* myhost"
- myline = str(parse_remotes_line(mycfg, name="myname"))
- r = remotes_to_rsyslog_cfg(
- {'myname': mycfg, 'removed': None, 'removed2': ""})
- lines = r.splitlines()
- self.assertEqual(1, len(lines))
- self.assertTrue(myline in r.splitlines())
diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py
deleted file mode 100644
index a0390da9..00000000
--- a/tests/unittests/test_handler/test_handler_seed_random.py
+++ /dev/null
@@ -1,227 +0,0 @@
-# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
-#
-# Author: Juerg Haefliger <juerg.haefliger@hp.com>
-#
-# Based on test_handler_set_hostname.py
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3, as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-from cloudinit.config import cc_seed_random
-
-import gzip
-import tempfile
-
-from six import BytesIO
-
-from cloudinit import cloud
-from cloudinit import distros
-from cloudinit import helpers
-from cloudinit import util
-
-from cloudinit.sources import DataSourceNone
-
-from .. import helpers as t_help
-
-import logging
-
-LOG = logging.getLogger(__name__)
-
-
-class TestRandomSeed(t_help.TestCase):
- def setUp(self):
- super(TestRandomSeed, self).setUp()
- self._seed_file = tempfile.mktemp()
- self.unapply = []
-
- # by default 'which' has nothing in its path
- self.apply_patches([(util, 'which', self._which)])
- self.apply_patches([(util, 'subp', self._subp)])
- self.subp_called = []
- self.whichdata = {}
-
- def tearDown(self):
- apply_patches([i for i in reversed(self.unapply)])
- util.del_file(self._seed_file)
-
- def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
-
- def _which(self, program):
- return self.whichdata.get(program)
-
- def _subp(self, *args, **kwargs):
- # supports subp calling with cmd as args or kwargs
- if 'args' not in kwargs:
- kwargs['args'] = args[0]
- self.subp_called.append(kwargs)
- return
-
- def _compress(self, text):
- contents = BytesIO()
- gz_fh = gzip.GzipFile(mode='wb', fileobj=contents)
- gz_fh.write(text)
- gz_fh.close()
- return contents.getvalue()
-
- def _get_cloud(self, distro, metadata=None):
- paths = helpers.Paths({})
- cls = distros.fetch(distro)
- ubuntu_distro = cls(distro, {}, paths)
- ds = DataSourceNone.DataSourceNone({}, ubuntu_distro, paths)
- if metadata:
- ds.metadata = metadata
- return cloud.Cloud(ds, paths, {}, ubuntu_distro, None)
-
- def test_append_random(self):
- cfg = {
- 'random_seed': {
- 'file': self._seed_file,
- 'data': 'tiny-tim-was-here',
- }
- }
- cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, [])
- contents = util.load_file(self._seed_file)
- self.assertEqual("tiny-tim-was-here", contents)
-
- def test_append_random_unknown_encoding(self):
- data = self._compress(b"tiny-toe")
- cfg = {
- 'random_seed': {
- 'file': self._seed_file,
- 'data': data,
- 'encoding': 'special_encoding',
- }
- }
- self.assertRaises(IOError, cc_seed_random.handle, 'test', cfg,
- self._get_cloud('ubuntu'), LOG, [])
-
- def test_append_random_gzip(self):
- data = self._compress(b"tiny-toe")
- cfg = {
- 'random_seed': {
- 'file': self._seed_file,
- 'data': data,
- 'encoding': 'gzip',
- }
- }
- cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, [])
- contents = util.load_file(self._seed_file)
- self.assertEqual("tiny-toe", contents)
-
- def test_append_random_gz(self):
- data = self._compress(b"big-toe")
- cfg = {
- 'random_seed': {
- 'file': self._seed_file,
- 'data': data,
- 'encoding': 'gz',
- }
- }
- cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, [])
- contents = util.load_file(self._seed_file)
- self.assertEqual("big-toe", contents)
-
- def test_append_random_base64(self):
- data = util.b64e('bubbles')
- cfg = {
- 'random_seed': {
- 'file': self._seed_file,
- 'data': data,
- 'encoding': 'base64',
- }
- }
- cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, [])
- contents = util.load_file(self._seed_file)
- self.assertEqual("bubbles", contents)
-
- def test_append_random_b64(self):
- data = util.b64e('kit-kat')
- cfg = {
- 'random_seed': {
- 'file': self._seed_file,
- 'data': data,
- 'encoding': 'b64',
- }
- }
- cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, [])
- contents = util.load_file(self._seed_file)
- self.assertEqual("kit-kat", contents)
-
- def test_append_random_metadata(self):
- cfg = {
- 'random_seed': {
- 'file': self._seed_file,
- 'data': 'tiny-tim-was-here',
- }
- }
- c = self._get_cloud('ubuntu', {'random_seed': '-so-was-josh'})
- cc_seed_random.handle('test', cfg, c, LOG, [])
- contents = util.load_file(self._seed_file)
- self.assertEqual('tiny-tim-was-here-so-was-josh', contents)
-
- def test_seed_command_provided_and_available(self):
- c = self._get_cloud('ubuntu', {})
- self.whichdata = {'pollinate': '/usr/bin/pollinate'}
- cfg = {'random_seed': {'command': ['pollinate', '-q']}}
- cc_seed_random.handle('test', cfg, c, LOG, [])
-
- subp_args = [f['args'] for f in self.subp_called]
- self.assertIn(['pollinate', '-q'], subp_args)
-
- def test_seed_command_not_provided(self):
- c = self._get_cloud('ubuntu', {})
- self.whichdata = {}
- cc_seed_random.handle('test', {}, c, LOG, [])
-
- # subp should not have been called as which would say not available
- self.assertFalse(self.subp_called)
-
- def test_unavailable_seed_command_and_required_raises_error(self):
- c = self._get_cloud('ubuntu', {})
- self.whichdata = {}
- cfg = {'random_seed': {'command': ['THIS_NO_COMMAND'],
- 'command_required': True}}
- self.assertRaises(ValueError, cc_seed_random.handle,
- 'test', cfg, c, LOG, [])
-
- def test_seed_command_and_required(self):
- c = self._get_cloud('ubuntu', {})
- self.whichdata = {'foo': 'foo'}
- cfg = {'random_seed': {'command_required': True, 'command': ['foo']}}
- cc_seed_random.handle('test', cfg, c, LOG, [])
-
- self.assertIn(['foo'], [f['args'] for f in self.subp_called])
-
- def test_file_in_environment_for_command(self):
- c = self._get_cloud('ubuntu', {})
- self.whichdata = {'foo': 'foo'}
- cfg = {'random_seed': {'command_required': True, 'command': ['foo'],
- 'file': self._seed_file}}
- cc_seed_random.handle('test', cfg, c, LOG, [])
-
- # this just instists that the first time subp was called,
- # RANDOM_SEED_FILE was in the environment set up correctly
- subp_env = [f['env'] for f in self.subp_called]
- self.assertEqual(subp_env[0].get('RANDOM_SEED_FILE'), self._seed_file)
-
-
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py
deleted file mode 100644
index 7effa124..00000000
--- a/tests/unittests/test_handler/test_handler_set_hostname.py
+++ /dev/null
@@ -1,72 +0,0 @@
-from cloudinit.config import cc_set_hostname
-
-from cloudinit import cloud
-from cloudinit import distros
-from cloudinit import helpers
-from cloudinit import util
-
-from .. import helpers as t_help
-
-from configobj import ConfigObj
-import logging
-import shutil
-from six import BytesIO
-import tempfile
-
-LOG = logging.getLogger(__name__)
-
-
-class TestHostname(t_help.FilesystemMockingTestCase):
- def setUp(self):
- super(TestHostname, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
-
- def _fetch_distro(self, kind):
- cls = distros.fetch(kind)
- paths = helpers.Paths({})
- return cls(kind, {}, paths)
-
- def test_write_hostname_rhel(self):
- cfg = {
- 'hostname': 'blah.blah.blah.yahoo.com',
- }
- distro = self._fetch_distro('rhel')
- paths = helpers.Paths({})
- ds = None
- cc = cloud.Cloud(ds, paths, {}, distro, None)
- self.patchUtils(self.tmp)
- cc_set_hostname.handle('cc_set_hostname',
- cfg, cc, LOG, [])
- if not distro.uses_systemd():
- contents = util.load_file("/etc/sysconfig/network", decode=False)
- n_cfg = ConfigObj(BytesIO(contents))
- self.assertEqual({'HOSTNAME': 'blah.blah.blah.yahoo.com'},
- dict(n_cfg))
-
- def test_write_hostname_debian(self):
- cfg = {
- 'hostname': 'blah.blah.blah.yahoo.com',
- }
- distro = self._fetch_distro('debian')
- paths = helpers.Paths({})
- ds = None
- cc = cloud.Cloud(ds, paths, {}, distro, None)
- self.patchUtils(self.tmp)
- cc_set_hostname.handle('cc_set_hostname',
- cfg, cc, LOG, [])
- contents = util.load_file("/etc/hostname")
- self.assertEqual('blah', contents.strip())
-
- def test_write_hostname_sles(self):
- cfg = {
- 'hostname': 'blah.blah.blah.suse.com',
- }
- distro = self._fetch_distro('sles')
- paths = helpers.Paths({})
- ds = None
- cc = cloud.Cloud(ds, paths, {}, distro, None)
- self.patchUtils(self.tmp)
- cc_set_hostname.handle('cc_set_hostname', cfg, cc, LOG, [])
- contents = util.load_file("/etc/HOSTNAME")
- self.assertEqual('blah', contents.strip())
diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py
deleted file mode 100644
index 57dce1bc..00000000
--- a/tests/unittests/test_handler/test_handler_snappy.py
+++ /dev/null
@@ -1,306 +0,0 @@
-from cloudinit.config.cc_snappy import (
- makeop, get_package_ops, render_snap_op)
-from cloudinit import util
-
-from .. import helpers as t_help
-
-import os
-import shutil
-import tempfile
-import yaml
-
-ALLOWED = (dict, list, int, str)
-
-
-class TestInstallPackages(t_help.TestCase):
- def setUp(self):
- super(TestInstallPackages, self).setUp()
- self.unapply = []
-
- # by default 'which' has nothing in its path
- self.apply_patches([(util, 'subp', self._subp)])
- self.subp_called = []
- self.snapcmds = []
- self.tmp = tempfile.mkdtemp(prefix="TestInstallPackages")
-
- def tearDown(self):
- apply_patches([i for i in reversed(self.unapply)])
- shutil.rmtree(self.tmp)
-
- def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
-
- def populate_tmp(self, files):
- return t_help.populate_dir(self.tmp, files)
-
- def _subp(self, *args, **kwargs):
- # supports subp calling with cmd as args or kwargs
- if 'args' not in kwargs:
- kwargs['args'] = args[0]
- self.subp_called.append(kwargs)
- args = kwargs['args']
- # here we basically parse the snappy command invoked
- # and append to snapcmds a list of (mode, pkg, config)
- if args[0:2] == ['snappy', 'config']:
- if args[3] == "-":
- config = kwargs.get('data', '')
- else:
- with open(args[3], "rb") as fp:
- config = yaml.safe_load(fp.read())
- self.snapcmds.append(['config', args[2], config])
- elif args[0:2] == ['snappy', 'install']:
- config = None
- pkg = None
- for arg in args[2:]:
- if arg.startswith("-"):
- continue
- if not pkg:
- pkg = arg
- elif not config:
- cfgfile = arg
- if cfgfile == "-":
- config = kwargs.get('data', '')
- elif cfgfile:
- with open(cfgfile, "rb") as fp:
- config = yaml.safe_load(fp.read())
- self.snapcmds.append(['install', pkg, config])
-
- def test_package_ops_1(self):
- ret = get_package_ops(
- packages=['pkg1', 'pkg2', 'pkg3'],
- configs={'pkg2': b'mycfg2'}, installed=[])
- self.assertEqual(
- ret, [makeop('install', 'pkg1', None, None),
- makeop('install', 'pkg2', b'mycfg2', None),
- makeop('install', 'pkg3', None, None)])
-
- def test_package_ops_config_only(self):
- ret = get_package_ops(
- packages=None,
- configs={'pkg2': b'mycfg2'}, installed=['pkg1', 'pkg2'])
- self.assertEqual(
- ret, [makeop('config', 'pkg2', b'mycfg2')])
-
- def test_package_ops_install_and_config(self):
- ret = get_package_ops(
- packages=['pkg3', 'pkg2'],
- configs={'pkg2': b'mycfg2', 'xinstalled': b'xcfg'},
- installed=['xinstalled'])
- self.assertEqual(
- ret, [makeop('install', 'pkg3'),
- makeop('install', 'pkg2', b'mycfg2'),
- makeop('config', 'xinstalled', b'xcfg')])
-
- def test_package_ops_install_long_config_short(self):
- # a package can be installed by full name, but have config by short
- cfg = {'k1': 'k2'}
- ret = get_package_ops(
- packages=['config-example.canonical'],
- configs={'config-example': cfg}, installed=[])
- self.assertEqual(
- ret, [makeop('install', 'config-example.canonical', cfg)])
-
- def test_package_ops_with_file(self):
- self.populate_tmp(
- {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg",
- "snapf2.snap": b"foo2", "foo.bar": "ignored"})
- ret = get_package_ops(
- packages=['pkg1'], configs={}, installed=[], fspath=self.tmp)
- self.assertEqual(
- ret,
- [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap",
- cfgfile="snapf1.config"),
- makeop_tmpd(self.tmp, 'install', 'snapf2', path="snapf2.snap"),
- makeop('install', 'pkg1')])
-
- def test_package_ops_common_filename(self):
- # fish package name from filename
- # package names likely look like: pkgname.namespace_version_arch.snap
-
- # find filenames
- self.populate_tmp(
- {"pkg-ws.smoser_0.3.4_all.snap": "pkg-ws-snapdata",
- "pkg-ws.config": "pkg-ws-config",
- "pkg1.smoser_1.2.3_all.snap": "pkg1.snapdata",
- "pkg1.smoser.config": "pkg1.smoser.config-data",
- "pkg1.config": "pkg1.config-data",
- "pkg2.smoser_0.0_amd64.snap": "pkg2-snapdata",
- "pkg2.smoser_0.0_amd64.config": "pkg2.config"})
-
- ret = get_package_ops(
- packages=[], configs={}, installed=[], fspath=self.tmp)
- self.assertEqual(
- ret,
- [makeop_tmpd(self.tmp, 'install', 'pkg-ws.smoser',
- path="pkg-ws.smoser_0.3.4_all.snap",
- cfgfile="pkg-ws.config"),
- makeop_tmpd(self.tmp, 'install', 'pkg1.smoser',
- path="pkg1.smoser_1.2.3_all.snap",
- cfgfile="pkg1.smoser.config"),
- makeop_tmpd(self.tmp, 'install', 'pkg2.smoser',
- path="pkg2.smoser_0.0_amd64.snap",
- cfgfile="pkg2.smoser_0.0_amd64.config"),
- ])
-
- def test_package_ops_config_overrides_file(self):
- # config data overrides local file .config
- self.populate_tmp(
- {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg"})
- ret = get_package_ops(
- packages=[], configs={'snapf1': 'snapf1cfg-config'},
- installed=[], fspath=self.tmp)
- self.assertEqual(
- ret, [makeop_tmpd(self.tmp, 'install', 'snapf1',
- path="snapf1.snap", config="snapf1cfg-config")])
-
- def test_package_ops_namespacing(self):
- cfgs = {
- 'config-example': {'k1': 'v1'},
- 'pkg1': {'p1': 'p2'},
- 'ubuntu-core': {'c1': 'c2'},
- 'notinstalled.smoser': {'s1': 's2'},
- }
- ret = get_package_ops(
- packages=['config-example.canonical'], configs=cfgs,
- installed=['config-example.smoser', 'pkg1.canonical',
- 'ubuntu-core'])
-
- expected_configs = [
- makeop('config', 'pkg1', config=cfgs['pkg1']),
- makeop('config', 'ubuntu-core', config=cfgs['ubuntu-core'])]
- expected_installs = [
- makeop('install', 'config-example.canonical',
- config=cfgs['config-example'])]
-
- installs = [i for i in ret if i['op'] == 'install']
- configs = [c for c in ret if c['op'] == 'config']
-
- self.assertEqual(installs, expected_installs)
- # configs are not ordered
- self.assertEqual(len(configs), len(expected_configs))
- self.assertTrue(all(found in expected_configs for found in configs))
-
- def test_render_op_localsnap(self):
- self.populate_tmp({"snapf1.snap": b"foo1"})
- op = makeop_tmpd(self.tmp, 'install', 'snapf1',
- path='snapf1.snap')
- render_snap_op(**op)
- self.assertEqual(
- self.snapcmds, [['install', op['path'], None]])
-
- def test_render_op_localsnap_localconfig(self):
- self.populate_tmp(
- {"snapf1.snap": b"foo1", 'snapf1.config': b'snapf1cfg'})
- op = makeop_tmpd(self.tmp, 'install', 'snapf1',
- path='snapf1.snap', cfgfile='snapf1.config')
- render_snap_op(**op)
- self.assertEqual(
- self.snapcmds, [['install', op['path'], 'snapf1cfg']])
-
- def test_render_op_snap(self):
- op = makeop('install', 'snapf1')
- render_snap_op(**op)
- self.assertEqual(
- self.snapcmds, [['install', 'snapf1', None]])
-
- def test_render_op_snap_config(self):
- mycfg = {'key1': 'value1'}
- name = "snapf1"
- op = makeop('install', name, config=mycfg)
- render_snap_op(**op)
- self.assertEqual(
- self.snapcmds, [['install', name, {'config': {name: mycfg}}]])
-
- def test_render_op_config_bytes(self):
- name = "snapf1"
- mycfg = b'myconfig'
- op = makeop('config', name, config=mycfg)
- render_snap_op(**op)
- self.assertEqual(
- self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]])
-
- def test_render_op_config_string(self):
- name = 'snapf1'
- mycfg = 'myconfig: foo\nhisconfig: bar\n'
- op = makeop('config', name, config=mycfg)
- render_snap_op(**op)
- self.assertEqual(
- self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]])
-
- def test_render_op_config_dict(self):
- # config entry for package can be a dict, not a string blob
- mycfg = {'foo': 'bar'}
- name = 'snapf1'
- op = makeop('config', name, config=mycfg)
- render_snap_op(**op)
- # snapcmds is a list of 3-entry lists. data_found will be the
- # blob of data in the file in 'snappy install --config=<file>'
- data_found = self.snapcmds[0][2]
- self.assertEqual(mycfg, data_found['config'][name])
-
- def test_render_op_config_list(self):
- # config entry for package can be a list, not a string blob
- mycfg = ['foo', 'bar', 'wark', {'f1': 'b1'}]
- name = "snapf1"
- op = makeop('config', name, config=mycfg)
- render_snap_op(**op)
- data_found = self.snapcmds[0][2]
- self.assertEqual(mycfg, data_found['config'][name])
-
- def test_render_op_config_int(self):
- # config entry for package can be a list, not a string blob
- mycfg = 1
- name = 'snapf1'
- op = makeop('config', name, config=mycfg)
- render_snap_op(**op)
- data_found = self.snapcmds[0][2]
- self.assertEqual(mycfg, data_found['config'][name])
-
- def test_render_long_configs_short(self):
- # install a namespaced package should have un-namespaced config
- mycfg = {'k1': 'k2'}
- name = 'snapf1'
- op = makeop('install', name + ".smoser", config=mycfg)
- render_snap_op(**op)
- data_found = self.snapcmds[0][2]
- self.assertEqual(mycfg, data_found['config'][name])
-
- def test_render_does_not_pad_cfgfile(self):
- # package_ops with cfgfile should not modify --file= content.
- mydata = "foo1: bar1\nk: [l1, l2, l3]\n"
- self.populate_tmp(
- {"snapf1.snap": b"foo1", "snapf1.config": mydata.encode()})
- ret = get_package_ops(
- packages=[], configs={}, installed=[], fspath=self.tmp)
- self.assertEqual(
- ret,
- [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap",
- cfgfile="snapf1.config")])
-
- # now the op was ok, but test that render didn't mess it up.
- render_snap_op(**ret[0])
- data_found = self.snapcmds[0][2]
- # the data found gets loaded in the snapcmd interpretation
- # so this comparison is a bit lossy, but input to snappy config
- # is expected to be yaml loadable, so it should be OK.
- self.assertEqual(yaml.safe_load(mydata), data_found)
-
-
-def makeop_tmpd(tmpd, op, name, config=None, path=None, cfgfile=None):
- if cfgfile:
- cfgfile = os.path.sep.join([tmpd, cfgfile])
- if path:
- path = os.path.sep.join([tmpd, path])
- return(makeop(op=op, name=name, config=config, path=path, cfgfile=cfgfile))
-
-
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
diff --git a/tests/unittests/test_handler/test_handler_timezone.py b/tests/unittests/test_handler/test_handler_timezone.py
deleted file mode 100644
index b7e6b03d..00000000
--- a/tests/unittests/test_handler/test_handler_timezone.py
+++ /dev/null
@@ -1,76 +0,0 @@
-# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
-#
-# Author: Juerg Haefliger <juerg.haefliger@hp.com>
-#
-# Based on test_handler_set_hostname.py
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3, as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-from cloudinit.config import cc_timezone
-
-from cloudinit import cloud
-from cloudinit import distros
-from cloudinit import helpers
-from cloudinit import util
-
-from cloudinit.sources import DataSourceNoCloud
-
-from .. import helpers as t_help
-
-from configobj import ConfigObj
-import logging
-import shutil
-from six import BytesIO
-import tempfile
-
-LOG = logging.getLogger(__name__)
-
-
-class TestTimezone(t_help.FilesystemMockingTestCase):
- def setUp(self):
- super(TestTimezone, self).setUp()
- self.new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.new_root)
-
- def _get_cloud(self, distro):
- self.patchUtils(self.new_root)
- self.patchOS(self.new_root)
-
- paths = helpers.Paths({})
-
- cls = distros.fetch(distro)
- d = cls(distro, {}, paths)
- ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths)
- cc = cloud.Cloud(ds, paths, {}, d, None)
- return cc
-
- def test_set_timezone_sles(self):
-
- cfg = {
- 'timezone': 'Tatooine/Bestine',
- }
- cc = self._get_cloud('sles')
-
- # Create a dummy timezone file
- dummy_contents = '0123456789abcdefgh'
- util.write_file('/usr/share/zoneinfo/%s' % cfg['timezone'],
- dummy_contents)
-
- cc_timezone.handle('cc_timezone', cfg, cc, LOG, [])
-
- contents = util.load_file('/etc/sysconfig/clock', decode=False)
- n_cfg = ConfigObj(BytesIO(contents))
- self.assertEqual({'TIMEZONE': cfg['timezone']}, dict(n_cfg))
-
- contents = util.load_file('/etc/localtime')
- self.assertEqual(dummy_contents, contents.strip())
diff --git a/tests/unittests/test_handler/test_handler_write_files.py b/tests/unittests/test_handler/test_handler_write_files.py
deleted file mode 100644
index 466e45f8..00000000
--- a/tests/unittests/test_handler/test_handler_write_files.py
+++ /dev/null
@@ -1,112 +0,0 @@
-from cloudinit.config.cc_write_files import write_files
-from cloudinit import log as logging
-from cloudinit import util
-
-from ..helpers import FilesystemMockingTestCase
-
-import base64
-import gzip
-import shutil
-import six
-import tempfile
-
-LOG = logging.getLogger(__name__)
-
-YAML_TEXT = """
-write_files:
- - encoding: gzip
- content: !!binary |
- H4sIAIDb/U8C/1NW1E/KzNMvzuBKTc7IV8hIzcnJVyjPL8pJ4QIA6N+MVxsAAAA=
- path: /usr/bin/hello
- permissions: '0755'
- - content: !!binary |
- Zm9vYmFyCg==
- path: /wark
- permissions: '0755'
- - content: |
- hi mom line 1
- hi mom line 2
- path: /tmp/message
-"""
-
-YAML_CONTENT_EXPECTED = {
- '/usr/bin/hello': "#!/bin/sh\necho hello world\n",
- '/wark': "foobar\n",
- '/tmp/message': "hi mom line 1\nhi mom line 2\n",
-}
-
-
-class TestWriteFiles(FilesystemMockingTestCase):
- def setUp(self):
- super(TestWriteFiles, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
-
- def test_simple(self):
- self.patchUtils(self.tmp)
- expected = "hello world\n"
- filename = "/tmp/my.file"
- write_files(
- "test_simple", [{"content": expected, "path": filename}], LOG)
- self.assertEqual(util.load_file(filename), expected)
-
- def test_yaml_binary(self):
- self.patchUtils(self.tmp)
- data = util.load_yaml(YAML_TEXT)
- write_files("testname", data['write_files'], LOG)
- for path, content in YAML_CONTENT_EXPECTED.items():
- self.assertEqual(util.load_file(path), content)
-
- def test_all_decodings(self):
- self.patchUtils(self.tmp)
-
- # build a 'files' array that has a dictionary of encodings
- # for 'gz', 'gzip', 'gz+base64' ...
- data = b"foobzr"
- utf8_valid = b"foobzr"
- utf8_invalid = b'ab\xaadef'
- files = []
- expected = []
-
- gz_aliases = ('gz', 'gzip')
- gz_b64_aliases = ('gz+base64', 'gzip+base64', 'gz+b64', 'gzip+b64')
- b64_aliases = ('base64', 'b64')
-
- datum = (("utf8", utf8_valid), ("no-utf8", utf8_invalid))
- for name, data in datum:
- gz = (_gzip_bytes(data), gz_aliases)
- gz_b64 = (base64.b64encode(_gzip_bytes(data)), gz_b64_aliases)
- b64 = (base64.b64encode(data), b64_aliases)
- for content, aliases in (gz, gz_b64, b64):
- for enc in aliases:
- cur = {'content': content,
- 'path': '/tmp/file-%s-%s' % (name, enc),
- 'encoding': enc}
- files.append(cur)
- expected.append((cur['path'], data))
-
- write_files("test_decoding", files, LOG)
-
- for path, content in expected:
- self.assertEqual(util.load_file(path, decode=False), content)
-
- # make sure we actually wrote *some* files.
- flen_expected = (
- len(gz_aliases + gz_b64_aliases + b64_aliases) * len(datum))
- self.assertEqual(len(expected), flen_expected)
-
-
-def _gzip_bytes(data):
- buf = six.BytesIO()
- fp = None
- try:
- fp = gzip.GzipFile(fileobj=buf, mode="wb")
- fp.write(data)
- fp.close()
- return buf.getvalue()
- finally:
- if fp:
- fp.close()
-
-
-# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py
deleted file mode 100644
index 28b060f8..00000000
--- a/tests/unittests/test_handler/test_handler_yum_add_repo.py
+++ /dev/null
@@ -1,68 +0,0 @@
-from cloudinit.config import cc_yum_add_repo
-from cloudinit import util
-
-from .. import helpers
-
-import configobj
-import logging
-import shutil
-from six import BytesIO
-import tempfile
-
-LOG = logging.getLogger(__name__)
-
-
-class TestConfig(helpers.FilesystemMockingTestCase):
- def setUp(self):
- super(TestConfig, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
-
- def test_bad_config(self):
- cfg = {
- 'yum_repos': {
- 'epel-testing': {
- 'name': 'Extra Packages for Enterprise Linux 5 - Testing',
- # Missing this should cause the repo not to be written
- # 'baseurl': 'http://blah.org/pub/epel/testing/5/$barch',
- 'enabled': False,
- 'gpgcheck': True,
- 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL',
- 'failovermethod': 'priority',
- },
- },
- }
- self.patchUtils(self.tmp)
- cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
- self.assertRaises(IOError, util.load_file,
- "/etc/yum.repos.d/epel_testing.repo")
-
- def test_write_config(self):
- cfg = {
- 'yum_repos': {
- 'epel-testing': {
- 'name': 'Extra Packages for Enterprise Linux 5 - Testing',
- 'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch',
- 'enabled': False,
- 'gpgcheck': True,
- 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL',
- 'failovermethod': 'priority',
- },
- },
- }
- self.patchUtils(self.tmp)
- cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
- contents = util.load_file("/etc/yum.repos.d/epel_testing.repo",
- decode=False)
- contents = configobj.ConfigObj(BytesIO(contents))
- expected = {
- 'epel_testing': {
- 'name': 'Extra Packages for Enterprise Linux 5 - Testing',
- 'failovermethod': 'priority',
- 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL',
- 'enabled': '0',
- 'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch',
- 'gpgcheck': '1',
- }
- }
- self.assertEqual(expected, dict(contents))