diff options
Diffstat (limited to 'tests/unittests/test_handler')
21 files changed, 0 insertions, 3243 deletions
diff --git a/tests/unittests/test_handler/__init__.py b/tests/unittests/test_handler/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/tests/unittests/test_handler/__init__.py +++ /dev/null diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py deleted file mode 100644 index d1dca2c4..00000000 --- a/tests/unittests/test_handler/test_handler_apt_configure.py +++ /dev/null @@ -1,109 +0,0 @@ -from cloudinit.config import cc_apt_configure -from cloudinit import util - -from ..helpers import TestCase - -import os -import re -import shutil -import tempfile - - -def load_tfile_or_url(*args, **kwargs): - return(util.decode_binary(util.read_file_or_url(*args, **kwargs).contents)) - - -class TestAptProxyConfig(TestCase): - def setUp(self): - super(TestAptProxyConfig, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - self.pfile = os.path.join(self.tmp, "proxy.cfg") - self.cfile = os.path.join(self.tmp, "config.cfg") - - def _search_apt_config(self, contents, ptype, value): - return re.search( - r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value), - contents, flags=re.IGNORECASE) - - def test_apt_proxy_written(self): - cfg = {'apt_proxy': 'myproxy'} - cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) - - self.assertTrue(os.path.isfile(self.pfile)) - self.assertFalse(os.path.isfile(self.cfile)) - - contents = load_tfile_or_url(self.pfile) - self.assertTrue(self._search_apt_config(contents, "http", "myproxy")) - - def test_apt_http_proxy_written(self): - cfg = {'apt_http_proxy': 'myproxy'} - cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) - - self.assertTrue(os.path.isfile(self.pfile)) - self.assertFalse(os.path.isfile(self.cfile)) - - contents = load_tfile_or_url(self.pfile) - self.assertTrue(self._search_apt_config(contents, "http", "myproxy")) - - def test_apt_all_proxy_written(self): - cfg = {'apt_http_proxy': 'myproxy_http_proxy', - 'apt_https_proxy': 'myproxy_https_proxy', - 'apt_ftp_proxy': 'myproxy_ftp_proxy'} - - values = {'http': cfg['apt_http_proxy'], - 'https': cfg['apt_https_proxy'], - 'ftp': cfg['apt_ftp_proxy'], - } - - cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) - - self.assertTrue(os.path.isfile(self.pfile)) - self.assertFalse(os.path.isfile(self.cfile)) - - contents = load_tfile_or_url(self.pfile) - - for ptype, pval in values.items(): - self.assertTrue(self._search_apt_config(contents, ptype, pval)) - - def test_proxy_deleted(self): - util.write_file(self.cfile, "content doesnt matter") - cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile) - self.assertFalse(os.path.isfile(self.pfile)) - self.assertFalse(os.path.isfile(self.cfile)) - - def test_proxy_replaced(self): - util.write_file(self.cfile, "content doesnt matter") - cc_apt_configure.apply_apt_config({'apt_proxy': "foo"}, - self.pfile, self.cfile) - self.assertTrue(os.path.isfile(self.pfile)) - contents = load_tfile_or_url(self.pfile) - self.assertTrue(self._search_apt_config(contents, "http", "foo")) - - def test_config_written(self): - payload = 'this is my apt config' - cfg = {'apt_config': payload} - - cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) - - self.assertTrue(os.path.isfile(self.cfile)) - self.assertFalse(os.path.isfile(self.pfile)) - - self.assertEqual(load_tfile_or_url(self.cfile), payload) - - def test_config_replaced(self): - util.write_file(self.pfile, "content doesnt matter") - cc_apt_configure.apply_apt_config({'apt_config': "foo"}, - self.pfile, self.cfile) - self.assertTrue(os.path.isfile(self.cfile)) - self.assertEqual(load_tfile_or_url(self.cfile), "foo") - - def test_config_deleted(self): - # if no 'apt_config' is provided, delete any previously written file - util.write_file(self.pfile, "content doesnt matter") - cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile) - self.assertFalse(os.path.isfile(self.pfile)) - self.assertFalse(os.path.isfile(self.cfile)) - - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py deleted file mode 100644 index acde0863..00000000 --- a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py +++ /dev/null @@ -1,180 +0,0 @@ -""" test_handler_apt_configure_sources_list -Test templating of sources list -""" -import logging -import os -import shutil -import tempfile - -try: - from unittest import mock -except ImportError: - import mock - -from cloudinit import cloud -from cloudinit import distros -from cloudinit import helpers -from cloudinit import templater -from cloudinit import util - -from cloudinit.config import cc_apt_configure -from cloudinit.sources import DataSourceNone - -from cloudinit.distros.debian import Distro - -from .. import helpers as t_help - -LOG = logging.getLogger(__name__) - -YAML_TEXT_CUSTOM_SL = """ -apt_mirror: http://archive.ubuntu.com/ubuntu/ -apt_custom_sources_list: | - ## template:jinja - ## Note, this file is written by cloud-init on first boot of an instance - ## modifications made here will not survive a re-bundle. - ## if you wish to make changes you can: - ## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg - ## or do the same in user-data - ## b.) add sources in /etc/apt/sources.list.d - ## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl - - # See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to - # newer versions of the distribution. - deb {{mirror}} {{codename}} main restricted - deb-src {{mirror}} {{codename}} main restricted - # FIND_SOMETHING_SPECIAL -""" - -EXPECTED_CONVERTED_CONTENT = ( - """## Note, this file is written by cloud-init on first boot of an instance -## modifications made here will not survive a re-bundle. -## if you wish to make changes you can: -## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg -## or do the same in user-data -## b.) add sources in /etc/apt/sources.list.d -## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl - -# See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to -# newer versions of the distribution. -deb http://archive.ubuntu.com/ubuntu/ fakerelease main restricted -deb-src http://archive.ubuntu.com/ubuntu/ fakerelease main restricted -# FIND_SOMETHING_SPECIAL -""") - - -def load_tfile_or_url(*args, **kwargs): - """load_tfile_or_url - load file and return content after decoding - """ - return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents) - - -class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): - """TestAptSourceConfigSourceList - Main Class to test sources list rendering - """ - def setUp(self): - super(TestAptSourceConfigSourceList, self).setUp() - self.subp = util.subp - self.new_root = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.new_root) - - def _get_cloud(self, distro, metadata=None): - self.patchUtils(self.new_root) - paths = helpers.Paths({}) - cls = distros.fetch(distro) - mydist = cls(distro, {}, paths) - myds = DataSourceNone.DataSourceNone({}, mydist, paths) - if metadata: - myds.metadata.update(metadata) - return cloud.Cloud(myds, paths, {}, mydist, None) - - def apt_source_list(self, distro, mirror, mirrorcheck=None): - """apt_source_list - Test rendering of a source.list from template for a given distro - """ - if mirrorcheck is None: - mirrorcheck = mirror - - if isinstance(mirror, list): - cfg = {'apt_mirror_search': mirror} - else: - cfg = {'apt_mirror': mirror} - mycloud = self._get_cloud(distro) - - with mock.patch.object(templater, 'render_to_file') as mocktmpl: - with mock.patch.object(os.path, 'isfile', - return_value=True) as mockisfile: - with mock.patch.object(util, 'rename'): - cc_apt_configure.handle("notimportant", cfg, mycloud, - LOG, None) - - mockisfile.assert_any_call( - ('/etc/cloud/templates/sources.list.%s.tmpl' % distro)) - mocktmpl.assert_called_once_with( - ('/etc/cloud/templates/sources.list.%s.tmpl' % distro), - '/etc/apt/sources.list', - {'codename': '', 'primary': mirrorcheck, 'mirror': mirrorcheck}) - - def test_apt_source_list_debian(self): - """Test rendering of a source.list from template for debian""" - self.apt_source_list('debian', 'http://httpredir.debian.org/debian') - - def test_apt_source_list_ubuntu(self): - """Test rendering of a source.list from template for ubuntu""" - self.apt_source_list('ubuntu', 'http://archive.ubuntu.com/ubuntu/') - - @staticmethod - def myresolve(name): - """Fake util.is_resolvable for mirrorfail tests""" - if name == "does.not.exist": - print("Faking FAIL for '%s'" % name) - return False - else: - print("Faking SUCCESS for '%s'" % name) - return True - - def test_apt_srcl_debian_mirrorfail(self): - """Test rendering of a source.list from template for debian""" - with mock.patch.object(util, 'is_resolvable', - side_effect=self.myresolve) as mockresolve: - self.apt_source_list('debian', - ['http://does.not.exist', - 'http://httpredir.debian.org/debian'], - 'http://httpredir.debian.org/debian') - mockresolve.assert_any_call("does.not.exist") - mockresolve.assert_any_call("httpredir.debian.org") - - def test_apt_srcl_ubuntu_mirrorfail(self): - """Test rendering of a source.list from template for ubuntu""" - with mock.patch.object(util, 'is_resolvable', - side_effect=self.myresolve) as mockresolve: - self.apt_source_list('ubuntu', - ['http://does.not.exist', - 'http://archive.ubuntu.com/ubuntu/'], - 'http://archive.ubuntu.com/ubuntu/') - mockresolve.assert_any_call("does.not.exist") - mockresolve.assert_any_call("archive.ubuntu.com") - - def test_apt_srcl_custom(self): - """Test rendering from a custom source.list template""" - cfg = util.load_yaml(YAML_TEXT_CUSTOM_SL) - mycloud = self._get_cloud('ubuntu') - - # the second mock restores the original subp - with mock.patch.object(util, 'write_file') as mockwrite: - with mock.patch.object(util, 'subp', self.subp): - with mock.patch.object(cc_apt_configure, 'get_release', - return_value='fakerelease'): - with mock.patch.object(Distro, 'get_primary_arch', - return_value='amd64'): - cc_apt_configure.handle("notimportant", cfg, mycloud, - LOG, None) - - mockwrite.assert_called_once_with( - '/etc/apt/sources.list', - EXPECTED_CONVERTED_CONTENT, - mode=420) - - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_source.py b/tests/unittests/test_handler/test_handler_apt_source.py deleted file mode 100644 index 99a4d860..00000000 --- a/tests/unittests/test_handler/test_handler_apt_source.py +++ /dev/null @@ -1,516 +0,0 @@ -""" test_handler_apt_source -Testing various config variations of the apt_source config -""" -import os -import re -import shutil -import tempfile - -try: - from unittest import mock -except ImportError: - import mock -from mock import call - -from cloudinit.config import cc_apt_configure -from cloudinit import gpg -from cloudinit import util - -from ..helpers import TestCase - -EXPECTEDKEY = """-----BEGIN PGP PUBLIC KEY BLOCK----- -Version: GnuPG v1 - -mI0ESuZLUgEEAKkqq3idtFP7g9hzOu1a8+v8ImawQN4TrvlygfScMU1TIS1eC7UQ -NUA8Qqgr9iUaGnejb0VciqftLrU9D6WYHSKz+EITefgdyJ6SoQxjoJdsCpJ7o9Jy -8PQnpRttiFm4qHu6BVnKnBNxw/z3ST9YMqW5kbMQpfxbGe+obRox59NpABEBAAG0 -HUxhdW5jaHBhZCBQUEEgZm9yIFNjb3R0IE1vc2VyiLYEEwECACAFAkrmS1ICGwMG -CwkIBwMCBBUCCAMEFgIDAQIeAQIXgAAKCRAGILvPA2g/d3aEA/9tVjc10HOZwV29 -OatVuTeERjjrIbxflO586GLA8cp0C9RQCwgod/R+cKYdQcHjbqVcP0HqxveLg0RZ -FJpWLmWKamwkABErwQLGlM/Hwhjfade8VvEQutH5/0JgKHmzRsoqfR+LMO6OS+Sm -S0ORP6HXET3+jC8BMG4tBWCTK/XEZw== -=ACB2 ------END PGP PUBLIC KEY BLOCK-----""" - - -def load_tfile_or_url(*args, **kwargs): - """load_tfile_or_url - load file and return content after decoding - """ - return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents) - - -class TestAptSourceConfig(TestCase): - """TestAptSourceConfig - Main Class to test apt_source configs - """ - release = "fantastic" - - def setUp(self): - super(TestAptSourceConfig, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - self.aptlistfile = os.path.join(self.tmp, "single-deb.list") - self.aptlistfile2 = os.path.join(self.tmp, "single-deb2.list") - self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list") - self.join = os.path.join - # mock fallback filename into writable tmp dir - self.fallbackfn = os.path.join(self.tmp, "etc/apt/sources.list.d/", - "cloud_config_sources.list") - - patcher = mock.patch("cloudinit.config.cc_apt_configure.get_release") - get_rel = patcher.start() - get_rel.return_value = self.release - self.addCleanup(patcher.stop) - - @staticmethod - def _get_default_params(): - """get_default_params - Get the most basic default mrror and release info to be used in tests - """ - params = {} - params['RELEASE'] = cc_apt_configure.get_release() - params['MIRROR'] = "http://archive.ubuntu.com/ubuntu" - return params - - def myjoin(self, *args, **kwargs): - """myjoin - redir into writable tmpdir""" - if (args[0] == "/etc/apt/sources.list.d/" and - args[1] == "cloud_config_sources.list" and - len(args) == 2): - return self.join(self.tmp, args[0].lstrip("/"), args[1]) - else: - return self.join(*args, **kwargs) - - def apt_src_basic(self, filename, cfg): - """apt_src_basic - Test Fix deb source string, has to overwrite mirror conf in params - """ - params = self._get_default_params() - - cc_apt_configure.add_apt_sources(cfg, params) - - self.assertTrue(os.path.isfile(filename)) - - contents = load_tfile_or_url(filename) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", "http://archive.ubuntu.com/ubuntu", - "karmic-backports", - "main universe multiverse restricted"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_basic(self): - """Test deb source string, overwrite mirror and filename""" - cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu' - ' karmic-backports' - ' main universe multiverse restricted'), - 'filename': self.aptlistfile} - self.apt_src_basic(self.aptlistfile, [cfg]) - - def test_apt_src_basic_dict(self): - """Test deb source string, overwrite mirror and filename (dict)""" - cfg = {self.aptlistfile: {'source': - ('deb http://archive.ubuntu.com/ubuntu' - ' karmic-backports' - ' main universe multiverse restricted')}} - self.apt_src_basic(self.aptlistfile, cfg) - - def apt_src_basic_tri(self, cfg): - """apt_src_basic_tri - Test Fix three deb source string, has to overwrite mirror conf in - params. Test with filenames provided in config. - generic part to check three files with different content - """ - self.apt_src_basic(self.aptlistfile, cfg) - - # extra verify on two extra files of this test - contents = load_tfile_or_url(self.aptlistfile2) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", "http://archive.ubuntu.com/ubuntu", - "precise-backports", - "main universe multiverse restricted"), - contents, flags=re.IGNORECASE)) - contents = load_tfile_or_url(self.aptlistfile3) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", "http://archive.ubuntu.com/ubuntu", - "lucid-backports", - "main universe multiverse restricted"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_basic_tri(self): - """Test Fix three deb source string with filenames""" - cfg1 = {'source': ('deb http://archive.ubuntu.com/ubuntu' - ' karmic-backports' - ' main universe multiverse restricted'), - 'filename': self.aptlistfile} - cfg2 = {'source': ('deb http://archive.ubuntu.com/ubuntu' - ' precise-backports' - ' main universe multiverse restricted'), - 'filename': self.aptlistfile2} - cfg3 = {'source': ('deb http://archive.ubuntu.com/ubuntu' - ' lucid-backports' - ' main universe multiverse restricted'), - 'filename': self.aptlistfile3} - self.apt_src_basic_tri([cfg1, cfg2, cfg3]) - - def test_apt_src_basic_dict_tri(self): - """Test Fix three deb source string with filenames (dict)""" - cfg = {self.aptlistfile: {'source': - ('deb http://archive.ubuntu.com/ubuntu' - ' karmic-backports' - ' main universe multiverse restricted')}, - self.aptlistfile2: {'source': - ('deb http://archive.ubuntu.com/ubuntu' - ' precise-backports' - ' main universe multiverse restricted')}, - self.aptlistfile3: {'source': - ('deb http://archive.ubuntu.com/ubuntu' - ' lucid-backports' - ' main universe multiverse restricted')}} - self.apt_src_basic_tri(cfg) - - def test_apt_src_basic_nofn(self): - """Test Fix three deb source string without filenames (dict)""" - cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu' - ' karmic-backports' - ' main universe multiverse restricted')} - with mock.patch.object(os.path, 'join', side_effect=self.myjoin): - self.apt_src_basic(self.fallbackfn, [cfg]) - - def apt_src_replacement(self, filename, cfg): - """apt_src_replace - Test Autoreplacement of MIRROR and RELEASE in source specs - """ - params = self._get_default_params() - cc_apt_configure.add_apt_sources(cfg, params) - - self.assertTrue(os.path.isfile(filename)) - - contents = load_tfile_or_url(filename) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", params['MIRROR'], params['RELEASE'], - "multiverse"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_replace(self): - """Test Autoreplacement of MIRROR and RELEASE in source specs""" - cfg = {'source': 'deb $MIRROR $RELEASE multiverse', - 'filename': self.aptlistfile} - self.apt_src_replacement(self.aptlistfile, [cfg]) - - def apt_src_replace_tri(self, cfg): - """apt_src_replace_tri - Test three autoreplacements of MIRROR and RELEASE in source specs with - generic part - """ - self.apt_src_replacement(self.aptlistfile, cfg) - - # extra verify on two extra files of this test - params = self._get_default_params() - contents = load_tfile_or_url(self.aptlistfile2) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", params['MIRROR'], params['RELEASE'], - "main"), - contents, flags=re.IGNORECASE)) - contents = load_tfile_or_url(self.aptlistfile3) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", params['MIRROR'], params['RELEASE'], - "universe"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_replace_tri(self): - """Test triple Autoreplacement of MIRROR and RELEASE in source specs""" - cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse', - 'filename': self.aptlistfile} - cfg2 = {'source': 'deb $MIRROR $RELEASE main', - 'filename': self.aptlistfile2} - cfg3 = {'source': 'deb $MIRROR $RELEASE universe', - 'filename': self.aptlistfile3} - self.apt_src_replace_tri([cfg1, cfg2, cfg3]) - - def test_apt_src_replace_dict_tri(self): - """Test triple Autoreplacement in source specs (dict)""" - cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'}, - 'notused': {'source': 'deb $MIRROR $RELEASE main', - 'filename': self.aptlistfile2}, - self.aptlistfile3: {'source': 'deb $MIRROR $RELEASE universe'}} - self.apt_src_replace_tri(cfg) - - def test_apt_src_replace_nofn(self): - """Test Autoreplacement of MIRROR and RELEASE in source specs nofile""" - cfg = {'source': 'deb $MIRROR $RELEASE multiverse'} - with mock.patch.object(os.path, 'join', side_effect=self.myjoin): - self.apt_src_replacement(self.fallbackfn, [cfg]) - - def apt_src_keyid(self, filename, cfg, keynum): - """apt_src_keyid - Test specification of a source + keyid - """ - params = self._get_default_params() - - with mock.patch.object(util, 'subp', - return_value=('fakekey 1234', '')) as mockobj: - cc_apt_configure.add_apt_sources(cfg, params) - - # check if it added the right ammount of keys - calls = [] - for _ in range(keynum): - calls.append(call(('apt-key', 'add', '-'), 'fakekey 1234')) - mockobj.assert_has_calls(calls, any_order=True) - - self.assertTrue(os.path.isfile(filename)) - - contents = load_tfile_or_url(filename) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", - ('http://ppa.launchpad.net/smoser/' - 'cloud-init-test/ubuntu'), - "xenial", "main"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_keyid(self): - """Test specification of a source + keyid with filename being set""" - cfg = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial main'), - 'keyid': "03683F77", - 'filename': self.aptlistfile} - self.apt_src_keyid(self.aptlistfile, [cfg], 1) - - def test_apt_src_keyid_tri(self): - """Test 3x specification of a source + keyid with filename being set""" - cfg1 = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial main'), - 'keyid': "03683F77", - 'filename': self.aptlistfile} - cfg2 = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial universe'), - 'keyid': "03683F77", - 'filename': self.aptlistfile2} - cfg3 = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial multiverse'), - 'keyid': "03683F77", - 'filename': self.aptlistfile3} - - self.apt_src_keyid(self.aptlistfile, [cfg1, cfg2, cfg3], 3) - contents = load_tfile_or_url(self.aptlistfile2) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", - ('http://ppa.launchpad.net/smoser/' - 'cloud-init-test/ubuntu'), - "xenial", "universe"), - contents, flags=re.IGNORECASE)) - contents = load_tfile_or_url(self.aptlistfile3) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", - ('http://ppa.launchpad.net/smoser/' - 'cloud-init-test/ubuntu'), - "xenial", "multiverse"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_keyid_nofn(self): - """Test specification of a source + keyid without filename being set""" - cfg = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial main'), - 'keyid': "03683F77"} - with mock.patch.object(os.path, 'join', side_effect=self.myjoin): - self.apt_src_keyid(self.fallbackfn, [cfg], 1) - - def apt_src_key(self, filename, cfg): - """apt_src_key - Test specification of a source + key - """ - params = self._get_default_params() - - with mock.patch.object(util, 'subp') as mockobj: - cc_apt_configure.add_apt_sources([cfg], params) - - mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 4321') - - self.assertTrue(os.path.isfile(filename)) - - contents = load_tfile_or_url(filename) - self.assertTrue(re.search(r"%s %s %s %s\n" % - ("deb", - ('http://ppa.launchpad.net/smoser/' - 'cloud-init-test/ubuntu'), - "xenial", "main"), - contents, flags=re.IGNORECASE)) - - def test_apt_src_key(self): - """Test specification of a source + key with filename being set""" - cfg = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial main'), - 'key': "fakekey 4321", - 'filename': self.aptlistfile} - self.apt_src_key(self.aptlistfile, cfg) - - def test_apt_src_key_nofn(self): - """Test specification of a source + key without filename being set""" - cfg = {'source': ('deb ' - 'http://ppa.launchpad.net/' - 'smoser/cloud-init-test/ubuntu' - ' xenial main'), - 'key': "fakekey 4321"} - with mock.patch.object(os.path, 'join', side_effect=self.myjoin): - self.apt_src_key(self.fallbackfn, cfg) - - def test_apt_src_keyonly(self): - """Test specifying key without source""" - params = self._get_default_params() - cfg = {'key': "fakekey 4242", - 'filename': self.aptlistfile} - - with mock.patch.object(util, 'subp') as mockobj: - cc_apt_configure.add_apt_sources([cfg], params) - - mockobj.assert_called_once_with(('apt-key', 'add', '-'), - 'fakekey 4242') - - # filename should be ignored on key only - self.assertFalse(os.path.isfile(self.aptlistfile)) - - def test_apt_src_keyidonly(self): - """Test specification of a keyid without source""" - params = self._get_default_params() - cfg = {'keyid': "03683F77", - 'filename': self.aptlistfile} - - with mock.patch.object(util, 'subp', - return_value=('fakekey 1212', '')) as mockobj: - cc_apt_configure.add_apt_sources([cfg], params) - - mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 1212') - - # filename should be ignored on key only - self.assertFalse(os.path.isfile(self.aptlistfile)) - - def apt_src_keyid_real(self, cfg, expectedkey): - """apt_src_keyid_real - Test specification of a keyid without source including - up to addition of the key (add_apt_key_raw mocked to keep the - environment as is) - """ - params = self._get_default_params() - - with mock.patch.object(cc_apt_configure, 'add_apt_key_raw') as mockkey: - with mock.patch.object(gpg, 'get_key_by_id', - return_value=expectedkey) as mockgetkey: - cc_apt_configure.add_apt_sources([cfg], params) - - mockgetkey.assert_called_with(cfg['keyid'], - cfg.get('keyserver', - 'keyserver.ubuntu.com')) - mockkey.assert_called_with(expectedkey) - - # filename should be ignored on key only - self.assertFalse(os.path.isfile(self.aptlistfile)) - - def test_apt_src_keyid_real(self): - """test_apt_src_keyid_real - Test keyid including key add""" - keyid = "03683F77" - cfg = {'keyid': keyid, - 'filename': self.aptlistfile} - - self.apt_src_keyid_real(cfg, EXPECTEDKEY) - - def test_apt_src_longkeyid_real(self): - """test_apt_src_longkeyid_real - Test long keyid including key add""" - keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77" - cfg = {'keyid': keyid, - 'filename': self.aptlistfile} - - self.apt_src_keyid_real(cfg, EXPECTEDKEY) - - def test_apt_src_longkeyid_ks_real(self): - """test_apt_src_longkeyid_ks_real - Test long keyid from other ks""" - keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77" - cfg = {'keyid': keyid, - 'keyserver': 'keys.gnupg.net', - 'filename': self.aptlistfile} - - self.apt_src_keyid_real(cfg, EXPECTEDKEY) - - def test_apt_src_ppa(self): - """Test adding a ppa""" - params = self._get_default_params() - cfg = {'source': 'ppa:smoser/cloud-init-test', - 'filename': self.aptlistfile} - - # default matcher needed for ppa - matcher = re.compile(r'^[\w-]+:\w').search - - with mock.patch.object(util, 'subp') as mockobj: - cc_apt_configure.add_apt_sources([cfg], params, - aa_repo_match=matcher) - mockobj.assert_called_once_with(['add-apt-repository', - 'ppa:smoser/cloud-init-test']) - - # adding ppa should ignore filename (uses add-apt-repository) - self.assertFalse(os.path.isfile(self.aptlistfile)) - - def test_apt_src_ppa_tri(self): - """Test adding three ppa's""" - params = self._get_default_params() - cfg1 = {'source': 'ppa:smoser/cloud-init-test', - 'filename': self.aptlistfile} - cfg2 = {'source': 'ppa:smoser/cloud-init-test2', - 'filename': self.aptlistfile2} - cfg3 = {'source': 'ppa:smoser/cloud-init-test3', - 'filename': self.aptlistfile3} - - # default matcher needed for ppa - matcher = re.compile(r'^[\w-]+:\w').search - - with mock.patch.object(util, 'subp') as mockobj: - cc_apt_configure.add_apt_sources([cfg1, cfg2, cfg3], params, - aa_repo_match=matcher) - calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test']), - call(['add-apt-repository', 'ppa:smoser/cloud-init-test2']), - call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'])] - mockobj.assert_has_calls(calls, any_order=True) - - # adding ppa should ignore all filenames (uses add-apt-repository) - self.assertFalse(os.path.isfile(self.aptlistfile)) - self.assertFalse(os.path.isfile(self.aptlistfile2)) - self.assertFalse(os.path.isfile(self.aptlistfile3)) - - def test_convert_to_new_format(self): - """Test the conversion of old to new format""" - cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse', - 'filename': self.aptlistfile} - cfg2 = {'source': 'deb $MIRROR $RELEASE main', - 'filename': self.aptlistfile2} - cfg3 = {'source': 'deb $MIRROR $RELEASE universe', - 'filename': self.aptlistfile3} - checkcfg = {self.aptlistfile: {'filename': self.aptlistfile, - 'source': 'deb $MIRROR $RELEASE ' - 'multiverse'}, - self.aptlistfile2: {'filename': self.aptlistfile2, - 'source': 'deb $MIRROR $RELEASE main'}, - self.aptlistfile3: {'filename': self.aptlistfile3, - 'source': 'deb $MIRROR $RELEASE ' - 'universe'}} - - newcfg = cc_apt_configure.convert_to_new_format([cfg1, cfg2, cfg3]) - self.assertEqual(newcfg, checkcfg) - - newcfg2 = cc_apt_configure.convert_to_new_format(newcfg) - self.assertEqual(newcfg2, checkcfg) - - with self.assertRaises(ValueError): - cc_apt_configure.convert_to_new_format(5) - - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py deleted file mode 100644 index 5e771731..00000000 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ /dev/null @@ -1,271 +0,0 @@ -from cloudinit import cloud -from cloudinit.config import cc_ca_certs -from cloudinit import helpers -from cloudinit import util - -from ..helpers import TestCase - -import logging -import shutil -import tempfile -import unittest - -try: - from unittest import mock -except ImportError: - import mock -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack - - -class TestNoConfig(unittest.TestCase): - def setUp(self): - super(TestNoConfig, self).setUp() - self.name = "ca-certs" - self.cloud_init = None - self.log = logging.getLogger("TestNoConfig") - self.args = [] - - def test_no_config(self): - """ - Test that nothing is done if no ca-certs configuration is provided. - """ - config = util.get_builtin_cfg() - with ExitStack() as mocks: - util_mock = mocks.enter_context( - mock.patch.object(util, 'write_file')) - certs_mock = mocks.enter_context( - mock.patch.object(cc_ca_certs, 'update_ca_certs')) - - cc_ca_certs.handle(self.name, config, self.cloud_init, self.log, - self.args) - - self.assertEqual(util_mock.call_count, 0) - self.assertEqual(certs_mock.call_count, 0) - - -class TestConfig(TestCase): - def setUp(self): - super(TestConfig, self).setUp() - self.name = "ca-certs" - self.paths = None - self.cloud = cloud.Cloud(None, self.paths, None, None, None) - self.log = logging.getLogger("TestNoConfig") - self.args = [] - - self.mocks = ExitStack() - self.addCleanup(self.mocks.close) - - # Mock out the functions that actually modify the system - self.mock_add = self.mocks.enter_context( - mock.patch.object(cc_ca_certs, 'add_ca_certs')) - self.mock_update = self.mocks.enter_context( - mock.patch.object(cc_ca_certs, 'update_ca_certs')) - self.mock_remove = self.mocks.enter_context( - mock.patch.object(cc_ca_certs, 'remove_default_ca_certs')) - - def test_no_trusted_list(self): - """ - Test that no certificates are written if the 'trusted' key is not - present. - """ - config = {"ca-certs": {}} - - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - - self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) - - def test_empty_trusted_list(self): - """Test that no certificate are written if 'trusted' list is empty.""" - config = {"ca-certs": {"trusted": []}} - - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - - self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) - - def test_single_trusted(self): - """Test that a single cert gets passed to add_ca_certs.""" - config = {"ca-certs": {"trusted": ["CERT1"]}} - - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - - self.mock_add.assert_called_once_with(['CERT1']) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) - - def test_multiple_trusted(self): - """Test that multiple certs get passed to add_ca_certs.""" - config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}} - - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - - self.mock_add.assert_called_once_with(['CERT1', 'CERT2']) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) - - def test_remove_default_ca_certs(self): - """Test remove_defaults works as expected.""" - config = {"ca-certs": {"remove-defaults": True}} - - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - - self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 1) - - def test_no_remove_defaults_if_false(self): - """Test remove_defaults is not called when config value is False.""" - config = {"ca-certs": {"remove-defaults": False}} - - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - - self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) - - def test_correct_order_for_remove_then_add(self): - """Test remove_defaults is not called when config value is False.""" - config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}} - - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - - self.mock_add.assert_called_once_with(['CERT1']) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 1) - - -class TestAddCaCerts(TestCase): - - def setUp(self): - super(TestAddCaCerts, self).setUp() - tmpdir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, tmpdir) - self.paths = helpers.Paths({ - 'cloud_dir': tmpdir, - }) - - def test_no_certs_in_list(self): - """Test that no certificate are written if not provided.""" - with mock.patch.object(util, 'write_file') as mockobj: - cc_ca_certs.add_ca_certs([]) - self.assertEqual(mockobj.call_count, 0) - - def test_single_cert_trailing_cr(self): - """Test adding a single certificate to the trusted CAs - when existing ca-certificates has trailing newline""" - cert = "CERT1\nLINE2\nLINE3" - - ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n" - expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n" - - with ExitStack() as mocks: - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_load = mocks.enter_context( - mock.patch.object(util, 'load_file', - return_value=ca_certs_content)) - - cc_ca_certs.add_ca_certs([cert]) - - mock_write.assert_has_calls([ - mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - cert, mode=0o644), - mock.call("/etc/ca-certificates.conf", expected, omode="wb")]) - mock_load.assert_called_once_with("/etc/ca-certificates.conf") - - def test_single_cert_no_trailing_cr(self): - """Test adding a single certificate to the trusted CAs - when existing ca-certificates has no trailing newline""" - cert = "CERT1\nLINE2\nLINE3" - - ca_certs_content = "line1\nline2\nline3" - - with ExitStack() as mocks: - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_load = mocks.enter_context( - mock.patch.object(util, 'load_file', - return_value=ca_certs_content)) - - cc_ca_certs.add_ca_certs([cert]) - - mock_write.assert_has_calls([ - mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - cert, mode=0o644), - mock.call("/etc/ca-certificates.conf", - "%s\n%s\n" % (ca_certs_content, - "cloud-init-ca-certs.crt"), - omode="wb")]) - - mock_load.assert_called_once_with("/etc/ca-certificates.conf") - - def test_multiple_certs(self): - """Test adding multiple certificates to the trusted CAs.""" - certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"] - expected_cert_file = "\n".join(certs) - ca_certs_content = "line1\nline2\nline3" - - with ExitStack() as mocks: - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_load = mocks.enter_context( - mock.patch.object(util, 'load_file', - return_value=ca_certs_content)) - - cc_ca_certs.add_ca_certs(certs) - - mock_write.assert_has_calls([ - mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - expected_cert_file, mode=0o644), - mock.call("/etc/ca-certificates.conf", - "%s\n%s\n" % (ca_certs_content, - "cloud-init-ca-certs.crt"), - omode='wb')]) - - mock_load.assert_called_once_with("/etc/ca-certificates.conf") - - -class TestUpdateCaCerts(unittest.TestCase): - def test_commands(self): - with mock.patch.object(util, 'subp') as mockobj: - cc_ca_certs.update_ca_certs() - mockobj.assert_called_once_with( - ["update-ca-certificates"], capture=False) - - -class TestRemoveDefaultCaCerts(TestCase): - - def setUp(self): - super(TestRemoveDefaultCaCerts, self).setUp() - tmpdir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, tmpdir) - self.paths = helpers.Paths({ - 'cloud_dir': tmpdir, - }) - - def test_commands(self): - with ExitStack() as mocks: - mock_delete = mocks.enter_context( - mock.patch.object(util, 'delete_dir_contents')) - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_subp = mocks.enter_context(mock.patch.object(util, 'subp')) - - cc_ca_certs.remove_default_ca_certs() - - mock_delete.assert_has_calls([ - mock.call("/usr/share/ca-certificates/"), - mock.call("/etc/ssl/certs/")]) - - mock_write.assert_called_once_with( - "/etc/ca-certificates.conf", "", mode=0o644) - - mock_subp.assert_called_once_with( - ('debconf-set-selections', '-'), - "ca-certificates ca-certificates/trust_new_crts select no") diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py deleted file mode 100644 index 7a1bc317..00000000 --- a/tests/unittests/test_handler/test_handler_chef.py +++ /dev/null @@ -1,192 +0,0 @@ -import json -import logging -import os -import shutil -import six -import tempfile - -from cloudinit import cloud -from cloudinit.config import cc_chef -from cloudinit import distros -from cloudinit import helpers -from cloudinit.sources import DataSourceNone -from cloudinit import util - -from .. import helpers as t_help - -LOG = logging.getLogger(__name__) - -CLIENT_TEMPL = os.path.sep.join(["templates", "chef_client.rb.tmpl"]) - - -class TestChef(t_help.FilesystemMockingTestCase): - def setUp(self): - super(TestChef, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - - def fetch_cloud(self, distro_kind): - cls = distros.fetch(distro_kind) - paths = helpers.Paths({}) - distro = cls(distro_kind, {}, paths) - ds = DataSourceNone.DataSourceNone({}, distro, paths, None) - return cloud.Cloud(ds, paths, {}, distro, None) - - def test_no_config(self): - self.patchUtils(self.tmp) - self.patchOS(self.tmp) - - cfg = {} - cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, []) - for d in cc_chef.CHEF_DIRS: - self.assertFalse(os.path.isdir(d)) - - @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL), - CLIENT_TEMPL + " is not available") - def test_basic_config(self): - """ - test basic config looks sane - - # This should create a file of the format... - # Created by cloud-init v. 0.7.6 on Sat, 11 Oct 2014 23:57:21 +0000 - log_level :info - ssl_verify_mode :verify_none - log_location "/var/log/chef/client.log" - validation_client_name "bob" - validation_key "/etc/chef/validation.pem" - client_key "/etc/chef/client.pem" - chef_server_url "localhost" - environment "_default" - node_name "iid-datasource-none" - json_attribs "/etc/chef/firstboot.json" - file_cache_path "/var/cache/chef" - file_backup_path "/var/backups/chef" - pid_file "/var/run/chef/client.pid" - Chef::Log::Formatter.show_time = true - """ - tpl_file = util.load_file('templates/chef_client.rb.tmpl') - self.patchUtils(self.tmp) - self.patchOS(self.tmp) - - util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file) - cfg = { - 'chef': { - 'server_url': 'localhost', - 'validation_name': 'bob', - 'validation_key': "/etc/chef/vkey.pem", - 'validation_cert': "this is my cert", - }, - } - cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, []) - for d in cc_chef.CHEF_DIRS: - self.assertTrue(os.path.isdir(d)) - c = util.load_file(cc_chef.CHEF_RB_PATH) - - # the content of these keys is not expected to be rendered to tmpl - unrendered_keys = ('validation_cert',) - for k, v in cfg['chef'].items(): - if k in unrendered_keys: - continue - self.assertIn(v, c) - for k, v in cc_chef.CHEF_RB_TPL_DEFAULTS.items(): - if k in unrendered_keys: - continue - # the value from the cfg overrides that in the default - val = cfg['chef'].get(k, v) - if isinstance(val, six.string_types): - self.assertIn(val, c) - c = util.load_file(cc_chef.CHEF_FB_PATH) - self.assertEqual({}, json.loads(c)) - - def test_firstboot_json(self): - self.patchUtils(self.tmp) - self.patchOS(self.tmp) - - cfg = { - 'chef': { - 'server_url': 'localhost', - 'validation_name': 'bob', - 'run_list': ['a', 'b', 'c'], - 'initial_attributes': { - 'c': 'd', - } - }, - } - cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, []) - c = util.load_file(cc_chef.CHEF_FB_PATH) - self.assertEqual( - { - 'run_list': ['a', 'b', 'c'], - 'c': 'd', - }, json.loads(c)) - - @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL), - CLIENT_TEMPL + " is not available") - def test_template_deletes(self): - tpl_file = util.load_file('templates/chef_client.rb.tmpl') - self.patchUtils(self.tmp) - self.patchOS(self.tmp) - - util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file) - cfg = { - 'chef': { - 'server_url': 'localhost', - 'validation_name': 'bob', - 'json_attribs': None, - 'show_time': None, - }, - } - cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, []) - c = util.load_file(cc_chef.CHEF_RB_PATH) - self.assertNotIn('json_attribs', c) - self.assertNotIn('Formatter.show_time', c) - - @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL), - CLIENT_TEMPL + " is not available") - def test_validation_cert_and_validation_key(self): - # test validation_cert content is written to validation_key path - tpl_file = util.load_file('templates/chef_client.rb.tmpl') - self.patchUtils(self.tmp) - self.patchOS(self.tmp) - - util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file) - v_path = '/etc/chef/vkey.pem' - v_cert = 'this is my cert' - cfg = { - 'chef': { - 'server_url': 'localhost', - 'validation_name': 'bob', - 'validation_key': v_path, - 'validation_cert': v_cert - }, - } - cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, []) - content = util.load_file(cc_chef.CHEF_RB_PATH) - self.assertIn(v_path, content) - util.load_file(v_path) - self.assertEqual(v_cert, util.load_file(v_path)) - - def test_validation_cert_with_system(self): - # test validation_cert content is not written over system file - tpl_file = util.load_file('templates/chef_client.rb.tmpl') - self.patchUtils(self.tmp) - self.patchOS(self.tmp) - - v_path = '/etc/chef/vkey.pem' - v_cert = "system" - expected_cert = "this is the system file certificate" - cfg = { - 'chef': { - 'server_url': 'localhost', - 'validation_name': 'bob', - 'validation_key': v_path, - 'validation_cert': v_cert - }, - } - util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file) - util.write_file(v_path, expected_cert) - cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, []) - content = util.load_file(cc_chef.CHEF_RB_PATH) - self.assertIn(v_path, content) - util.load_file(v_path) - self.assertEqual(expected_cert, util.load_file(v_path)) diff --git a/tests/unittests/test_handler/test_handler_debug.py b/tests/unittests/test_handler/test_handler_debug.py deleted file mode 100644 index 80708d7b..00000000 --- a/tests/unittests/test_handler/test_handler_debug.py +++ /dev/null @@ -1,81 +0,0 @@ -# vi: ts=4 expandtab -# -# Copyright (C) 2014 Yahoo! Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -from cloudinit.config import cc_debug - -from cloudinit import cloud -from cloudinit import distros -from cloudinit import helpers -from cloudinit import util - -from cloudinit.sources import DataSourceNone - -from .. import helpers as t_help - -import logging -import shutil -import tempfile - -LOG = logging.getLogger(__name__) - - -class TestDebug(t_help.FilesystemMockingTestCase): - def setUp(self): - super(TestDebug, self).setUp() - self.new_root = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.new_root) - - def _get_cloud(self, distro, metadata=None): - self.patchUtils(self.new_root) - paths = helpers.Paths({}) - cls = distros.fetch(distro) - d = cls(distro, {}, paths) - ds = DataSourceNone.DataSourceNone({}, d, paths) - if metadata: - ds.metadata.update(metadata) - return cloud.Cloud(ds, paths, {}, d, None) - - def test_debug_write(self): - cfg = { - 'abc': '123', - 'c': u'\u20a0', - 'debug': { - 'verbose': True, - # Does not actually write here due to mocking... - 'output': '/var/log/cloud-init-debug.log', - }, - } - cc = self._get_cloud('ubuntu') - cc_debug.handle('cc_debug', cfg, cc, LOG, []) - contents = util.load_file('/var/log/cloud-init-debug.log') - # Some basic sanity tests... - self.assertNotEqual(0, len(contents)) - for k in cfg.keys(): - self.assertIn(k, contents) - - def test_debug_no_write(self): - cfg = { - 'abc': '123', - 'debug': { - 'verbose': False, - # Does not actually write here due to mocking... - 'output': '/var/log/cloud-init-debug.log', - }, - } - cc = self._get_cloud('ubuntu') - cc_debug.handle('cc_debug', cfg, cc, LOG, []) - self.assertRaises(IOError, - util.load_file, '/var/log/cloud-init-debug.log') diff --git a/tests/unittests/test_handler/test_handler_disk_setup.py b/tests/unittests/test_handler/test_handler_disk_setup.py deleted file mode 100644 index ddef8d48..00000000 --- a/tests/unittests/test_handler/test_handler_disk_setup.py +++ /dev/null @@ -1,30 +0,0 @@ -from cloudinit.config import cc_disk_setup -from ..helpers import ExitStack, mock, TestCase - - -class TestIsDiskUsed(TestCase): - - def setUp(self): - super(TestIsDiskUsed, self).setUp() - self.patches = ExitStack() - mod_name = 'cloudinit.config.cc_disk_setup' - self.enumerate_disk = self.patches.enter_context( - mock.patch('{0}.enumerate_disk'.format(mod_name))) - self.check_fs = self.patches.enter_context( - mock.patch('{0}.check_fs'.format(mod_name))) - - def test_multiple_child_nodes_returns_true(self): - self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(2)) - self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock()) - self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock())) - - def test_valid_filesystem_returns_true(self): - self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1)) - self.check_fs.return_value = ( - mock.MagicMock(), 'ext4', mock.MagicMock()) - self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock())) - - def test_one_child_nodes_and_no_fs_returns_false(self): - self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1)) - self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock()) - self.assertFalse(cc_disk_setup.is_disk_used(mock.MagicMock())) diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py deleted file mode 100644 index e653488a..00000000 --- a/tests/unittests/test_handler/test_handler_growpart.py +++ /dev/null @@ -1,220 +0,0 @@ -from cloudinit import cloud -from cloudinit.config import cc_growpart -from cloudinit import util - -from ..helpers import TestCase - -import errno -import logging -import os -import re -import unittest - -try: - from unittest import mock -except ImportError: - import mock -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack - -# growpart: -# mode: auto # off, on, auto, 'growpart' -# devices: ['root'] - -HELP_GROWPART_RESIZE = """ -growpart disk partition - rewrite partition table so that partition takes up all the space it can - options: - -h | --help print Usage and exit -<SNIP> - -u | --update R update the the kernel partition table info after growing - this requires kernel support and 'partx --update' - R is one of: - - 'auto' : [default] update partition if possible -<SNIP> - Example: - - growpart /dev/sda 1 - Resize partition 1 on /dev/sda -""" - -HELP_GROWPART_NO_RESIZE = """ -growpart disk partition - rewrite partition table so that partition takes up all the space it can - options: - -h | --help print Usage and exit -<SNIP> - Example: - - growpart /dev/sda 1 - Resize partition 1 on /dev/sda -""" - - -class TestDisabled(unittest.TestCase): - def setUp(self): - super(TestDisabled, self).setUp() - self.name = "growpart" - self.cloud_init = None - self.log = logging.getLogger("TestDisabled") - self.args = [] - - self.handle = cc_growpart.handle - - def test_mode_off(self): - # Test that nothing is done if mode is off. - - # this really only verifies that resizer_factory isn't called - config = {'growpart': {'mode': 'off'}} - - with mock.patch.object(cc_growpart, 'resizer_factory') as mockobj: - self.handle(self.name, config, self.cloud_init, self.log, - self.args) - self.assertEqual(mockobj.call_count, 0) - - -class TestConfig(TestCase): - def setUp(self): - super(TestConfig, self).setUp() - self.name = "growpart" - self.paths = None - self.cloud = cloud.Cloud(None, self.paths, None, None, None) - self.log = logging.getLogger("TestConfig") - self.args = [] - os.environ = {} - - self.cloud_init = None - self.handle = cc_growpart.handle - - def test_no_resizers_auto_is_fine(self): - with mock.patch.object( - util, 'subp', - return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj: - - config = {'growpart': {'mode': 'auto'}} - self.handle(self.name, config, self.cloud_init, self.log, - self.args) - - mockobj.assert_called_once_with( - ['growpart', '--help'], env={'LANG': 'C'}) - - def test_no_resizers_mode_growpart_is_exception(self): - with mock.patch.object( - util, 'subp', - return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj: - config = {'growpart': {'mode': "growpart"}} - self.assertRaises( - ValueError, self.handle, self.name, config, - self.cloud_init, self.log, self.args) - - mockobj.assert_called_once_with( - ['growpart', '--help'], env={'LANG': 'C'}) - - def test_mode_auto_prefers_growpart(self): - with mock.patch.object( - util, 'subp', - return_value=(HELP_GROWPART_RESIZE, "")) as mockobj: - ret = cc_growpart.resizer_factory(mode="auto") - self.assertIsInstance(ret, cc_growpart.ResizeGrowPart) - - mockobj.assert_called_once_with( - ['growpart', '--help'], env={'LANG': 'C'}) - - def test_handle_with_no_growpart_entry(self): - # if no 'growpart' entry in config, then mode=auto should be used - - myresizer = object() - retval = (("/", cc_growpart.RESIZE.CHANGED, "my-message",),) - - with ExitStack() as mocks: - factory = mocks.enter_context( - mock.patch.object(cc_growpart, 'resizer_factory', - return_value=myresizer)) - rsdevs = mocks.enter_context( - mock.patch.object(cc_growpart, 'resize_devices', - return_value=retval)) - mocks.enter_context( - mock.patch.object(cc_growpart, 'RESIZERS', - (('mysizer', object),) - )) - - self.handle(self.name, {}, self.cloud_init, self.log, self.args) - - factory.assert_called_once_with('auto') - rsdevs.assert_called_once_with(myresizer, ['/']) - - -class TestResize(unittest.TestCase): - def setUp(self): - super(TestResize, self).setUp() - self.name = "growpart" - self.log = logging.getLogger("TestResize") - - def test_simple_devices(self): - # test simple device list - # this patches out devent2dev, os.stat, and device_part_info - # so in the end, doesn't test a lot - devs = ["/dev/XXda1", "/dev/YYda2"] - devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5, - st_nlink=1, st_uid=0, st_gid=6, st_size=0, - st_atime=0, st_mtime=0, st_ctime=0) - enoent = ["/dev/NOENT"] - real_stat = os.stat - resize_calls = [] - - class myresizer(object): - def resize(self, diskdev, partnum, partdev): - resize_calls.append((diskdev, partnum, partdev)) - if partdev == "/dev/YYda2": - return (1024, 2048) - return (1024, 1024) # old size, new size - - def mystat(path): - if path in devs: - return devstat_ret - if path in enoent: - e = OSError("%s: does not exist" % path) - e.errno = errno.ENOENT - raise e - return real_stat(path) - - try: - opinfo = cc_growpart.device_part_info - cc_growpart.device_part_info = simple_device_part_info - os.stat = mystat - - resized = cc_growpart.resize_devices(myresizer(), devs + enoent) - - def find(name, res): - for f in res: - if f[0] == name: - return f - return None - - self.assertEqual(cc_growpart.RESIZE.NOCHANGE, - find("/dev/XXda1", resized)[1]) - self.assertEqual(cc_growpart.RESIZE.CHANGED, - find("/dev/YYda2", resized)[1]) - self.assertEqual(cc_growpart.RESIZE.SKIPPED, - find(enoent[0], resized)[1]) - # self.assertEqual(resize_calls, - # [("/dev/XXda", "1", "/dev/XXda1"), - # ("/dev/YYda", "2", "/dev/YYda2")]) - finally: - cc_growpart.device_part_info = opinfo - os.stat = real_stat - - -def simple_device_part_info(devpath): - # simple stupid return (/dev/vda, 1) for /dev/vda - ret = re.search("([^0-9]*)([0-9]*)$", devpath) - x = (ret.group(1), ret.group(2)) - return x - - -class Bunch(object): - def __init__(self, **kwds): - self.__dict__.update(kwds) - - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py deleted file mode 100644 index c91908f4..00000000 --- a/tests/unittests/test_handler/test_handler_locale.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright (C) 2013 Hewlett-Packard Development Company, L.P. -# -# Author: Juerg Haefliger <juerg.haefliger@hp.com> -# -# Based on test_handler_set_hostname.py -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -from cloudinit.config import cc_locale - -from cloudinit import cloud -from cloudinit import distros -from cloudinit import helpers -from cloudinit import util - -from cloudinit.sources import DataSourceNoCloud - -from .. import helpers as t_help - -from configobj import ConfigObj - -from six import BytesIO - -import logging -import shutil -import tempfile - -LOG = logging.getLogger(__name__) - - -class TestLocale(t_help.FilesystemMockingTestCase): - def setUp(self): - super(TestLocale, self).setUp() - self.new_root = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.new_root) - - def _get_cloud(self, distro): - self.patchUtils(self.new_root) - paths = helpers.Paths({}) - - cls = distros.fetch(distro) - d = cls(distro, {}, paths) - ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths) - cc = cloud.Cloud(ds, paths, {}, d, None) - return cc - - def test_set_locale_sles(self): - - cfg = { - 'locale': 'My.Locale', - } - cc = self._get_cloud('sles') - cc_locale.handle('cc_locale', cfg, cc, LOG, []) - - contents = util.load_file('/etc/sysconfig/language', decode=False) - n_cfg = ConfigObj(BytesIO(contents)) - self.assertEqual({'RC_LANG': cfg['locale']}, dict(n_cfg)) diff --git a/tests/unittests/test_handler/test_handler_lxd.py b/tests/unittests/test_handler/test_handler_lxd.py deleted file mode 100644 index 6f90defb..00000000 --- a/tests/unittests/test_handler/test_handler_lxd.py +++ /dev/null @@ -1,134 +0,0 @@ -from cloudinit.config import cc_lxd -from cloudinit.sources import DataSourceNoCloud -from cloudinit import (distros, helpers, cloud) -from .. import helpers as t_help - -import logging - -try: - from unittest import mock -except ImportError: - import mock - -LOG = logging.getLogger(__name__) - - -class TestLxd(t_help.TestCase): - lxd_cfg = { - 'lxd': { - 'init': { - 'network_address': '0.0.0.0', - 'storage_backend': 'zfs', - 'storage_pool': 'poolname', - } - } - } - - def setUp(self): - super(TestLxd, self).setUp() - - def _get_cloud(self, distro): - cls = distros.fetch(distro) - paths = helpers.Paths({}) - d = cls(distro, {}, paths) - ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths) - cc = cloud.Cloud(ds, paths, {}, d, None) - return cc - - @mock.patch("cloudinit.config.cc_lxd.util") - def test_lxd_init(self, mock_util): - cc = self._get_cloud('ubuntu') - mock_util.which.return_value = True - cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, []) - self.assertTrue(mock_util.which.called) - init_call = mock_util.subp.call_args_list[0][0][0] - self.assertEqual(init_call, - ['lxd', 'init', '--auto', - '--network-address=0.0.0.0', - '--storage-backend=zfs', - '--storage-pool=poolname']) - - @mock.patch("cloudinit.config.cc_lxd.util") - def test_lxd_install(self, mock_util): - cc = self._get_cloud('ubuntu') - cc.distro = mock.MagicMock() - mock_util.which.return_value = None - cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, []) - self.assertTrue(cc.distro.install_packages.called) - install_pkg = cc.distro.install_packages.call_args_list[0][0][0] - self.assertEqual(sorted(install_pkg), ['lxd', 'zfs']) - - @mock.patch("cloudinit.config.cc_lxd.util") - def test_no_init_does_nothing(self, mock_util): - cc = self._get_cloud('ubuntu') - cc.distro = mock.MagicMock() - cc_lxd.handle('cc_lxd', {'lxd': {}}, cc, LOG, []) - self.assertFalse(cc.distro.install_packages.called) - self.assertFalse(mock_util.subp.called) - - @mock.patch("cloudinit.config.cc_lxd.util") - def test_no_lxd_does_nothing(self, mock_util): - cc = self._get_cloud('ubuntu') - cc.distro = mock.MagicMock() - cc_lxd.handle('cc_lxd', {'package_update': True}, cc, LOG, []) - self.assertFalse(cc.distro.install_packages.called) - self.assertFalse(mock_util.subp.called) - - def test_lxd_debconf_new_full(self): - data = {"mode": "new", - "name": "testbr0", - "ipv4_address": "10.0.8.1", - "ipv4_netmask": "24", - "ipv4_dhcp_first": "10.0.8.2", - "ipv4_dhcp_last": "10.0.8.254", - "ipv4_dhcp_leases": "250", - "ipv4_nat": "true", - "ipv6_address": "fd98:9e0:3744::1", - "ipv6_netmask": "64", - "ipv6_nat": "true", - "domain": "lxd"} - self.assertEqual( - cc_lxd.bridge_to_debconf(data), - {"lxd/setup-bridge": "true", - "lxd/bridge-name": "testbr0", - "lxd/bridge-ipv4": "true", - "lxd/bridge-ipv4-address": "10.0.8.1", - "lxd/bridge-ipv4-netmask": "24", - "lxd/bridge-ipv4-dhcp-first": "10.0.8.2", - "lxd/bridge-ipv4-dhcp-last": "10.0.8.254", - "lxd/bridge-ipv4-dhcp-leases": "250", - "lxd/bridge-ipv4-nat": "true", - "lxd/bridge-ipv6": "true", - "lxd/bridge-ipv6-address": "fd98:9e0:3744::1", - "lxd/bridge-ipv6-netmask": "64", - "lxd/bridge-ipv6-nat": "true", - "lxd/bridge-domain": "lxd"}) - - def test_lxd_debconf_new_partial(self): - data = {"mode": "new", - "ipv6_address": "fd98:9e0:3744::1", - "ipv6_netmask": "64", - "ipv6_nat": "true"} - self.assertEqual( - cc_lxd.bridge_to_debconf(data), - {"lxd/setup-bridge": "true", - "lxd/bridge-ipv6": "true", - "lxd/bridge-ipv6-address": "fd98:9e0:3744::1", - "lxd/bridge-ipv6-netmask": "64", - "lxd/bridge-ipv6-nat": "true"}) - - def test_lxd_debconf_existing(self): - data = {"mode": "existing", - "name": "testbr0"} - self.assertEqual( - cc_lxd.bridge_to_debconf(data), - {"lxd/setup-bridge": "false", - "lxd/use-existing-bridge": "true", - "lxd/bridge-name": "testbr0"}) - - def test_lxd_debconf_none(self): - data = {"mode": "none"} - self.assertEqual( - cc_lxd.bridge_to_debconf(data), - {"lxd/setup-bridge": "false", - "lxd/bridge-name": ""}) diff --git a/tests/unittests/test_handler/test_handler_mcollective.py b/tests/unittests/test_handler/test_handler_mcollective.py deleted file mode 100644 index 6aefb93d..00000000 --- a/tests/unittests/test_handler/test_handler_mcollective.py +++ /dev/null @@ -1,148 +0,0 @@ -from cloudinit import (cloud, distros, helpers, util) -from cloudinit.config import cc_mcollective -from cloudinit.sources import DataSourceNoCloud - -from .. import helpers as t_help - -import configobj -import logging -import os -import shutil -from six import BytesIO -import tempfile - -LOG = logging.getLogger(__name__) - - -STOCK_CONFIG = """\ -main_collective = mcollective -collectives = mcollective -libdir = /usr/share/mcollective/plugins -logfile = /var/log/mcollective.log -loglevel = info -daemonize = 1 - -# Plugins -securityprovider = psk -plugin.psk = unset - -connector = activemq -plugin.activemq.pool.size = 1 -plugin.activemq.pool.1.host = stomp1 -plugin.activemq.pool.1.port = 61613 -plugin.activemq.pool.1.user = mcollective -plugin.activemq.pool.1.password = marionette - -# Facts -factsource = yaml -plugin.yaml = /etc/mcollective/facts.yaml -""" - - -class TestConfig(t_help.FilesystemMockingTestCase): - def setUp(self): - super(TestConfig, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - # "./": make os.path.join behave correctly with abs path as second arg - self.server_cfg = os.path.join( - self.tmp, "./" + cc_mcollective.SERVER_CFG) - self.pubcert_file = os.path.join( - self.tmp, "./" + cc_mcollective.PUBCERT_FILE) - self.pricert_file= os.path.join( - self.tmp, self.tmp, "./" + cc_mcollective.PRICERT_FILE) - - def test_basic_config(self): - cfg = { - 'mcollective': { - 'conf': { - 'loglevel': 'debug', - 'connector': 'rabbitmq', - 'logfile': '/var/log/mcollective.log', - 'ttl': '4294957', - 'collectives': 'mcollective', - 'main_collective': 'mcollective', - 'securityprovider': 'psk', - 'daemonize': '1', - 'factsource': 'yaml', - 'direct_addressing': '1', - 'plugin.psk': 'unset', - 'libdir': '/usr/share/mcollective/plugins', - 'identity': '1', - }, - }, - } - expected = cfg['mcollective']['conf'] - - self.patchUtils(self.tmp) - cc_mcollective.configure(cfg['mcollective']['conf']) - contents = util.load_file(cc_mcollective.SERVER_CFG, decode=False) - contents = configobj.ConfigObj(BytesIO(contents)) - self.assertEqual(expected, dict(contents)) - - def test_existing_config_is_saved(self): - cfg = {'loglevel': 'warn'} - util.write_file(self.server_cfg, STOCK_CONFIG) - cc_mcollective.configure(config=cfg, server_cfg=self.server_cfg) - self.assertTrue(os.path.exists(self.server_cfg)) - self.assertTrue(os.path.exists(self.server_cfg + ".old")) - self.assertEqual(util.load_file(self.server_cfg + ".old"), STOCK_CONFIG) - - def test_existing_updated(self): - cfg = {'loglevel': 'warn'} - util.write_file(self.server_cfg, STOCK_CONFIG) - cc_mcollective.configure(config=cfg, server_cfg=self.server_cfg) - cfgobj = configobj.ConfigObj(self.server_cfg) - self.assertEqual(cfg['loglevel'], cfgobj['loglevel']) - - def test_certificats_written(self): - # check public-cert and private-cert keys in config get written - cfg = {'loglevel': 'debug', - 'public-cert': "this is my public-certificate", - 'private-cert': "secret private certificate"} - - cc_mcollective.configure(config=cfg, - server_cfg=self.server_cfg, pricert_file=self.pricert_file, - pubcert_file=self.pubcert_file) - - found = configobj.ConfigObj(self.server_cfg) - - # make sure these didnt get written in - self.assertFalse('public-cert' in found) - self.assertFalse('private-cert' in found) - - # these need updating to the specified paths - self.assertEqual(found['plugin.ssl_server_public'], self.pubcert_file) - self.assertEqual(found['plugin.ssl_server_private'], self.pricert_file) - - # and the security provider should be ssl - self.assertEqual(found['securityprovider'], 'ssl') - - self.assertEqual( - util.load_file(self.pricert_file), cfg['private-cert']) - self.assertEqual( - util.load_file(self.pubcert_file), cfg['public-cert']) - - -class TestHandler(t_help.TestCase): - def _get_cloud(self, distro): - cls = distros.fetch(distro) - paths = helpers.Paths({}) - d = cls(distro, {}, paths) - ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths) - cc = cloud.Cloud(ds, paths, {}, d, None) - return cc - - @t_help.mock.patch("cloudinit.config.cc_mcollective.util") - def test_mcollective_install(self, mock_util): - cc = self._get_cloud('ubuntu') - cc.distro = t_help.mock.MagicMock() - mycfg = {'mcollective': {'conf': {'loglevel': 'debug'}}} - cc_mcollective.handle('cc_mcollective', mycfg, cc, LOG, []) - self.assertTrue(cc.distro.install_packages.called) - install_pkg = cc.distro.install_packages.call_args_list[0][0][0] - self.assertEqual(install_pkg, ('mcollective',)) - - self.assertTrue(mock_util.subp.called) - self.assertEqual(mock_util.subp.call_args_list[0][0][0], - ['service', 'mcollective', 'restart']) diff --git a/tests/unittests/test_handler/test_handler_mounts.py b/tests/unittests/test_handler/test_handler_mounts.py deleted file mode 100644 index 355674b2..00000000 --- a/tests/unittests/test_handler/test_handler_mounts.py +++ /dev/null @@ -1,133 +0,0 @@ -import os.path -import shutil -import tempfile - -from cloudinit.config import cc_mounts - -from .. import helpers as test_helpers - -try: - from unittest import mock -except ImportError: - import mock - - -class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase): - - def setUp(self): - super(TestSanitizeDevname, self).setUp() - self.new_root = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.new_root) - self.patchOS(self.new_root) - - def _touch(self, path): - path = os.path.join(self.new_root, path.lstrip('/')) - basedir = os.path.dirname(path) - if not os.path.exists(basedir): - os.makedirs(basedir) - open(path, 'a').close() - - def _makedirs(self, directory): - directory = os.path.join(self.new_root, directory.lstrip('/')) - if not os.path.exists(directory): - os.makedirs(directory) - - def mock_existence_of_disk(self, disk_path): - self._touch(disk_path) - self._makedirs(os.path.join('/sys/block', disk_path.split('/')[-1])) - - def mock_existence_of_partition(self, disk_path, partition_number): - self.mock_existence_of_disk(disk_path) - self._touch(disk_path + str(partition_number)) - disk_name = disk_path.split('/')[-1] - self._makedirs(os.path.join('/sys/block', - disk_name, - disk_name + str(partition_number))) - - def test_existent_full_disk_path_is_returned(self): - disk_path = '/dev/sda' - self.mock_existence_of_disk(disk_path) - self.assertEqual(disk_path, - cc_mounts.sanitize_devname(disk_path, - lambda x: None, - mock.Mock())) - - def test_existent_disk_name_returns_full_path(self): - disk_name = 'sda' - disk_path = '/dev/' + disk_name - self.mock_existence_of_disk(disk_path) - self.assertEqual(disk_path, - cc_mounts.sanitize_devname(disk_name, - lambda x: None, - mock.Mock())) - - def test_existent_meta_disk_is_returned(self): - actual_disk_path = '/dev/sda' - self.mock_existence_of_disk(actual_disk_path) - self.assertEqual( - actual_disk_path, - cc_mounts.sanitize_devname('ephemeral0', - lambda x: actual_disk_path, - mock.Mock())) - - def test_existent_meta_partition_is_returned(self): - disk_name, partition_part = '/dev/sda', '1' - actual_partition_path = disk_name + partition_part - self.mock_existence_of_partition(disk_name, partition_part) - self.assertEqual( - actual_partition_path, - cc_mounts.sanitize_devname('ephemeral0.1', - lambda x: disk_name, - mock.Mock())) - - def test_existent_meta_partition_with_p_is_returned(self): - disk_name, partition_part = '/dev/sda', 'p1' - actual_partition_path = disk_name + partition_part - self.mock_existence_of_partition(disk_name, partition_part) - self.assertEqual( - actual_partition_path, - cc_mounts.sanitize_devname('ephemeral0.1', - lambda x: disk_name, - mock.Mock())) - - def test_first_partition_returned_if_existent_disk_is_partitioned(self): - disk_name, partition_part = '/dev/sda', '1' - actual_partition_path = disk_name + partition_part - self.mock_existence_of_partition(disk_name, partition_part) - self.assertEqual( - actual_partition_path, - cc_mounts.sanitize_devname('ephemeral0', - lambda x: disk_name, - mock.Mock())) - - def test_nth_partition_returned_if_requested(self): - disk_name, partition_part = '/dev/sda', '3' - actual_partition_path = disk_name + partition_part - self.mock_existence_of_partition(disk_name, partition_part) - self.assertEqual( - actual_partition_path, - cc_mounts.sanitize_devname('ephemeral0.3', - lambda x: disk_name, - mock.Mock())) - - def test_transformer_returning_none_returns_none(self): - self.assertIsNone( - cc_mounts.sanitize_devname( - 'ephemeral0', lambda x: None, mock.Mock())) - - def test_missing_device_returns_none(self): - self.assertIsNone( - cc_mounts.sanitize_devname('/dev/sda', None, mock.Mock())) - - def test_missing_sys_returns_none(self): - disk_path = '/dev/sda' - self._makedirs(disk_path) - self.assertIsNone( - cc_mounts.sanitize_devname(disk_path, None, mock.Mock())) - - def test_existent_disk_but_missing_partition_returns_none(self): - disk_path = '/dev/sda' - self.mock_existence_of_disk(disk_path) - self.assertIsNone( - cc_mounts.sanitize_devname( - 'ephemeral0.1', lambda x: disk_path, mock.Mock())) diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py deleted file mode 100644 index feff319d..00000000 --- a/tests/unittests/test_handler/test_handler_power_state.py +++ /dev/null @@ -1,127 +0,0 @@ -import sys - -from cloudinit.config import cc_power_state_change as psc - -from .. import helpers as t_help -from ..helpers import mock - - -class TestLoadPowerState(t_help.TestCase): - def setUp(self): - super(self.__class__, self).setUp() - - def test_no_config(self): - # completely empty config should mean do nothing - (cmd, _timeout, _condition) = psc.load_power_state({}) - self.assertEqual(cmd, None) - - def test_irrelevant_config(self): - # no power_state field in config should return None for cmd - (cmd, _timeout, _condition) = psc.load_power_state({'foo': 'bar'}) - self.assertEqual(cmd, None) - - def test_invalid_mode(self): - cfg = {'power_state': {'mode': 'gibberish'}} - self.assertRaises(TypeError, psc.load_power_state, cfg) - - cfg = {'power_state': {'mode': ''}} - self.assertRaises(TypeError, psc.load_power_state, cfg) - - def test_empty_mode(self): - cfg = {'power_state': {'message': 'goodbye'}} - self.assertRaises(TypeError, psc.load_power_state, cfg) - - def test_valid_modes(self): - cfg = {'power_state': {}} - for mode in ('halt', 'poweroff', 'reboot'): - cfg['power_state']['mode'] = mode - check_lps_ret(psc.load_power_state(cfg), mode=mode) - - def test_invalid_delay(self): - cfg = {'power_state': {'mode': 'poweroff', 'delay': 'goodbye'}} - self.assertRaises(TypeError, psc.load_power_state, cfg) - - def test_valid_delay(self): - cfg = {'power_state': {'mode': 'poweroff', 'delay': ''}} - for delay in ("now", "+1", "+30"): - cfg['power_state']['delay'] = delay - check_lps_ret(psc.load_power_state(cfg)) - - def test_message_present(self): - cfg = {'power_state': {'mode': 'poweroff', 'message': 'GOODBYE'}} - ret = psc.load_power_state(cfg) - check_lps_ret(psc.load_power_state(cfg)) - self.assertIn(cfg['power_state']['message'], ret[0]) - - def test_no_message(self): - # if message is not present, then no argument should be passed for it - cfg = {'power_state': {'mode': 'poweroff'}} - (cmd, _timeout, _condition) = psc.load_power_state(cfg) - self.assertNotIn("", cmd) - check_lps_ret(psc.load_power_state(cfg)) - self.assertTrue(len(cmd) == 3) - - def test_condition_null_raises(self): - cfg = {'power_state': {'mode': 'poweroff', 'condition': None}} - self.assertRaises(TypeError, psc.load_power_state, cfg) - - def test_condition_default_is_true(self): - cfg = {'power_state': {'mode': 'poweroff'}} - _cmd, _timeout, cond = psc.load_power_state(cfg) - self.assertEqual(cond, True) - - -class TestCheckCondition(t_help.TestCase): - def cmd_with_exit(self, rc): - return([sys.executable, '-c', 'import sys; sys.exit(%s)' % rc]) - - def test_true_is_true(self): - self.assertEqual(psc.check_condition(True), True) - - def test_false_is_false(self): - self.assertEqual(psc.check_condition(False), False) - - def test_cmd_exit_zero_true(self): - self.assertEqual(psc.check_condition(self.cmd_with_exit(0)), True) - - def test_cmd_exit_one_false(self): - self.assertEqual(psc.check_condition(self.cmd_with_exit(1)), False) - - def test_cmd_exit_nonzero_warns(self): - mocklog = mock.Mock() - self.assertEqual( - psc.check_condition(self.cmd_with_exit(2), mocklog), False) - self.assertEqual(mocklog.warn.call_count, 1) - - -def check_lps_ret(psc_return, mode=None): - if len(psc_return) != 3: - raise TypeError("length returned = %d" % len(psc_return)) - - errs = [] - cmd = psc_return[0] - timeout = psc_return[1] - condition = psc_return[2] - - if 'shutdown' not in psc_return[0][0]: - errs.append("string 'shutdown' not in cmd") - - if condition is None: - errs.append("condition was not returned") - - if mode is not None: - opt = {'halt': '-H', 'poweroff': '-P', 'reboot': '-r'}[mode] - if opt not in psc_return[0]: - errs.append("opt '%s' not in cmd: %s" % (opt, cmd)) - - if len(cmd) != 3 and len(cmd) != 4: - errs.append("Invalid command length: %s" % len(cmd)) - - try: - float(timeout) - except Exception: - errs.append("timeout failed convert to float") - - if len(errs): - lines = ["Errors in result: %s" % str(psc_return)] + errs - raise Exception('\n'.join(lines)) diff --git a/tests/unittests/test_handler/test_handler_rsyslog.py b/tests/unittests/test_handler/test_handler_rsyslog.py deleted file mode 100644 index 38636063..00000000 --- a/tests/unittests/test_handler/test_handler_rsyslog.py +++ /dev/null @@ -1,174 +0,0 @@ -import os -import shutil -import tempfile - -from cloudinit.config.cc_rsyslog import ( - apply_rsyslog_changes, DEF_DIR, DEF_FILENAME, DEF_RELOAD, load_config, - parse_remotes_line, remotes_to_rsyslog_cfg) -from cloudinit import util - -from .. import helpers as t_help - - -class TestLoadConfig(t_help.TestCase): - def setUp(self): - super(TestLoadConfig, self).setUp() - self.basecfg = { - 'config_filename': DEF_FILENAME, - 'config_dir': DEF_DIR, - 'service_reload_command': DEF_RELOAD, - 'configs': [], - 'remotes': {}, - } - - def test_legacy_full(self): - found = load_config({ - 'rsyslog': ['*.* @192.168.1.1'], - 'rsyslog_dir': "mydir", - 'rsyslog_filename': "myfilename"}) - self.basecfg.update({ - 'configs': ['*.* @192.168.1.1'], - 'config_dir': "mydir", - 'config_filename': 'myfilename', - 'service_reload_command': 'auto'} - ) - - self.assertEqual(found, self.basecfg) - - def test_legacy_defaults(self): - found = load_config({ - 'rsyslog': ['*.* @192.168.1.1']}) - self.basecfg.update({ - 'configs': ['*.* @192.168.1.1']}) - self.assertEqual(found, self.basecfg) - - def test_new_defaults(self): - self.assertEqual(load_config({}), self.basecfg) - - def test_new_configs(self): - cfgs = ['*.* myhost', '*.* my2host'] - self.basecfg.update({'configs': cfgs}) - self.assertEqual( - load_config({'rsyslog': {'configs': cfgs}}), - self.basecfg) - - -class TestApplyChanges(t_help.TestCase): - def setUp(self): - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - - def test_simple(self): - cfgline = "*.* foohost" - changed = apply_rsyslog_changes( - configs=[cfgline], def_fname="foo.cfg", cfg_dir=self.tmp) - - fname = os.path.join(self.tmp, "foo.cfg") - self.assertEqual([fname], changed) - self.assertEqual( - util.load_file(fname), cfgline + "\n") - - def test_multiple_files(self): - configs = [ - '*.* foohost', - {'content': 'abc', 'filename': 'my.cfg'}, - {'content': 'filefoo-content', - 'filename': os.path.join(self.tmp, 'mydir/mycfg')}, - ] - - changed = apply_rsyslog_changes( - configs=configs, def_fname="default.cfg", cfg_dir=self.tmp) - - expected = [ - (os.path.join(self.tmp, "default.cfg"), - "*.* foohost\n"), - (os.path.join(self.tmp, "my.cfg"), "abc\n"), - (os.path.join(self.tmp, "mydir/mycfg"), "filefoo-content\n"), - ] - self.assertEqual([f[0] for f in expected], changed) - actual = [] - for fname, _content in expected: - util.load_file(fname) - actual.append((fname, util.load_file(fname),)) - self.assertEqual(expected, actual) - - def test_repeat_def(self): - configs = ['*.* foohost', "*.warn otherhost"] - - changed = apply_rsyslog_changes( - configs=configs, def_fname="default.cfg", cfg_dir=self.tmp) - - fname = os.path.join(self.tmp, "default.cfg") - self.assertEqual([fname], changed) - - expected_content = '\n'.join([c for c in configs]) + '\n' - found_content = util.load_file(fname) - self.assertEqual(expected_content, found_content) - - def test_multiline_content(self): - configs = ['line1', 'line2\nline3\n'] - - apply_rsyslog_changes( - configs=configs, def_fname="default.cfg", cfg_dir=self.tmp) - - fname = os.path.join(self.tmp, "default.cfg") - expected_content = '\n'.join([c for c in configs]) - found_content = util.load_file(fname) - self.assertEqual(expected_content, found_content) - - -class TestParseRemotesLine(t_help.TestCase): - def test_valid_port(self): - r = parse_remotes_line("foo:9") - self.assertEqual(9, r.port) - - def test_invalid_port(self): - with self.assertRaises(ValueError): - parse_remotes_line("*.* foo:abc") - - def test_valid_ipv6(self): - r = parse_remotes_line("*.* [::1]") - self.assertEqual("*.* @[::1]", str(r)) - - def test_valid_ipv6_with_port(self): - r = parse_remotes_line("*.* [::1]:100") - self.assertEqual(r.port, 100) - self.assertEqual(r.addr, "::1") - self.assertEqual("*.* @[::1]:100", str(r)) - - def test_invalid_multiple_colon(self): - with self.assertRaises(ValueError): - parse_remotes_line("*.* ::1:100") - - def test_name_in_string(self): - r = parse_remotes_line("syslog.host", name="foobar") - self.assertEqual("*.* @syslog.host # foobar", str(r)) - - -class TestRemotesToSyslog(t_help.TestCase): - def test_simple(self): - # str rendered line must appear in remotes_to_ryslog_cfg return - mycfg = "*.* myhost" - myline = str(parse_remotes_line(mycfg, name="myname")) - r = remotes_to_rsyslog_cfg({'myname': mycfg}) - lines = r.splitlines() - self.assertEqual(1, len(lines)) - self.assertTrue(myline in r.splitlines()) - - def test_header_footer(self): - header = "#foo head" - footer = "#foo foot" - r = remotes_to_rsyslog_cfg( - {'myname': "*.* myhost"}, header=header, footer=footer) - lines = r.splitlines() - self.assertTrue(header, lines[0]) - self.assertTrue(footer, lines[-1]) - - def test_with_empty_or_null(self): - mycfg = "*.* myhost" - myline = str(parse_remotes_line(mycfg, name="myname")) - r = remotes_to_rsyslog_cfg( - {'myname': mycfg, 'removed': None, 'removed2': ""}) - lines = r.splitlines() - self.assertEqual(1, len(lines)) - self.assertTrue(myline in r.splitlines()) diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py deleted file mode 100644 index a0390da9..00000000 --- a/tests/unittests/test_handler/test_handler_seed_random.py +++ /dev/null @@ -1,227 +0,0 @@ -# Copyright (C) 2013 Hewlett-Packard Development Company, L.P. -# -# Author: Juerg Haefliger <juerg.haefliger@hp.com> -# -# Based on test_handler_set_hostname.py -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -from cloudinit.config import cc_seed_random - -import gzip -import tempfile - -from six import BytesIO - -from cloudinit import cloud -from cloudinit import distros -from cloudinit import helpers -from cloudinit import util - -from cloudinit.sources import DataSourceNone - -from .. import helpers as t_help - -import logging - -LOG = logging.getLogger(__name__) - - -class TestRandomSeed(t_help.TestCase): - def setUp(self): - super(TestRandomSeed, self).setUp() - self._seed_file = tempfile.mktemp() - self.unapply = [] - - # by default 'which' has nothing in its path - self.apply_patches([(util, 'which', self._which)]) - self.apply_patches([(util, 'subp', self._subp)]) - self.subp_called = [] - self.whichdata = {} - - def tearDown(self): - apply_patches([i for i in reversed(self.unapply)]) - util.del_file(self._seed_file) - - def apply_patches(self, patches): - ret = apply_patches(patches) - self.unapply += ret - - def _which(self, program): - return self.whichdata.get(program) - - def _subp(self, *args, **kwargs): - # supports subp calling with cmd as args or kwargs - if 'args' not in kwargs: - kwargs['args'] = args[0] - self.subp_called.append(kwargs) - return - - def _compress(self, text): - contents = BytesIO() - gz_fh = gzip.GzipFile(mode='wb', fileobj=contents) - gz_fh.write(text) - gz_fh.close() - return contents.getvalue() - - def _get_cloud(self, distro, metadata=None): - paths = helpers.Paths({}) - cls = distros.fetch(distro) - ubuntu_distro = cls(distro, {}, paths) - ds = DataSourceNone.DataSourceNone({}, ubuntu_distro, paths) - if metadata: - ds.metadata = metadata - return cloud.Cloud(ds, paths, {}, ubuntu_distro, None) - - def test_append_random(self): - cfg = { - 'random_seed': { - 'file': self._seed_file, - 'data': 'tiny-tim-was-here', - } - } - cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) - contents = util.load_file(self._seed_file) - self.assertEqual("tiny-tim-was-here", contents) - - def test_append_random_unknown_encoding(self): - data = self._compress(b"tiny-toe") - cfg = { - 'random_seed': { - 'file': self._seed_file, - 'data': data, - 'encoding': 'special_encoding', - } - } - self.assertRaises(IOError, cc_seed_random.handle, 'test', cfg, - self._get_cloud('ubuntu'), LOG, []) - - def test_append_random_gzip(self): - data = self._compress(b"tiny-toe") - cfg = { - 'random_seed': { - 'file': self._seed_file, - 'data': data, - 'encoding': 'gzip', - } - } - cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) - contents = util.load_file(self._seed_file) - self.assertEqual("tiny-toe", contents) - - def test_append_random_gz(self): - data = self._compress(b"big-toe") - cfg = { - 'random_seed': { - 'file': self._seed_file, - 'data': data, - 'encoding': 'gz', - } - } - cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) - contents = util.load_file(self._seed_file) - self.assertEqual("big-toe", contents) - - def test_append_random_base64(self): - data = util.b64e('bubbles') - cfg = { - 'random_seed': { - 'file': self._seed_file, - 'data': data, - 'encoding': 'base64', - } - } - cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) - contents = util.load_file(self._seed_file) - self.assertEqual("bubbles", contents) - - def test_append_random_b64(self): - data = util.b64e('kit-kat') - cfg = { - 'random_seed': { - 'file': self._seed_file, - 'data': data, - 'encoding': 'b64', - } - } - cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) - contents = util.load_file(self._seed_file) - self.assertEqual("kit-kat", contents) - - def test_append_random_metadata(self): - cfg = { - 'random_seed': { - 'file': self._seed_file, - 'data': 'tiny-tim-was-here', - } - } - c = self._get_cloud('ubuntu', {'random_seed': '-so-was-josh'}) - cc_seed_random.handle('test', cfg, c, LOG, []) - contents = util.load_file(self._seed_file) - self.assertEqual('tiny-tim-was-here-so-was-josh', contents) - - def test_seed_command_provided_and_available(self): - c = self._get_cloud('ubuntu', {}) - self.whichdata = {'pollinate': '/usr/bin/pollinate'} - cfg = {'random_seed': {'command': ['pollinate', '-q']}} - cc_seed_random.handle('test', cfg, c, LOG, []) - - subp_args = [f['args'] for f in self.subp_called] - self.assertIn(['pollinate', '-q'], subp_args) - - def test_seed_command_not_provided(self): - c = self._get_cloud('ubuntu', {}) - self.whichdata = {} - cc_seed_random.handle('test', {}, c, LOG, []) - - # subp should not have been called as which would say not available - self.assertFalse(self.subp_called) - - def test_unavailable_seed_command_and_required_raises_error(self): - c = self._get_cloud('ubuntu', {}) - self.whichdata = {} - cfg = {'random_seed': {'command': ['THIS_NO_COMMAND'], - 'command_required': True}} - self.assertRaises(ValueError, cc_seed_random.handle, - 'test', cfg, c, LOG, []) - - def test_seed_command_and_required(self): - c = self._get_cloud('ubuntu', {}) - self.whichdata = {'foo': 'foo'} - cfg = {'random_seed': {'command_required': True, 'command': ['foo']}} - cc_seed_random.handle('test', cfg, c, LOG, []) - - self.assertIn(['foo'], [f['args'] for f in self.subp_called]) - - def test_file_in_environment_for_command(self): - c = self._get_cloud('ubuntu', {}) - self.whichdata = {'foo': 'foo'} - cfg = {'random_seed': {'command_required': True, 'command': ['foo'], - 'file': self._seed_file}} - cc_seed_random.handle('test', cfg, c, LOG, []) - - # this just instists that the first time subp was called, - # RANDOM_SEED_FILE was in the environment set up correctly - subp_env = [f['env'] for f in self.subp_called] - self.assertEqual(subp_env[0].get('RANDOM_SEED_FILE'), self._seed_file) - - -def apply_patches(patches): - ret = [] - for (ref, name, replace) in patches: - if replace is None: - continue - orig = getattr(ref, name) - setattr(ref, name, replace) - ret.append((ref, name, orig)) - return ret diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py deleted file mode 100644 index 7effa124..00000000 --- a/tests/unittests/test_handler/test_handler_set_hostname.py +++ /dev/null @@ -1,72 +0,0 @@ -from cloudinit.config import cc_set_hostname - -from cloudinit import cloud -from cloudinit import distros -from cloudinit import helpers -from cloudinit import util - -from .. import helpers as t_help - -from configobj import ConfigObj -import logging -import shutil -from six import BytesIO -import tempfile - -LOG = logging.getLogger(__name__) - - -class TestHostname(t_help.FilesystemMockingTestCase): - def setUp(self): - super(TestHostname, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - - def _fetch_distro(self, kind): - cls = distros.fetch(kind) - paths = helpers.Paths({}) - return cls(kind, {}, paths) - - def test_write_hostname_rhel(self): - cfg = { - 'hostname': 'blah.blah.blah.yahoo.com', - } - distro = self._fetch_distro('rhel') - paths = helpers.Paths({}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - cc_set_hostname.handle('cc_set_hostname', - cfg, cc, LOG, []) - if not distro.uses_systemd(): - contents = util.load_file("/etc/sysconfig/network", decode=False) - n_cfg = ConfigObj(BytesIO(contents)) - self.assertEqual({'HOSTNAME': 'blah.blah.blah.yahoo.com'}, - dict(n_cfg)) - - def test_write_hostname_debian(self): - cfg = { - 'hostname': 'blah.blah.blah.yahoo.com', - } - distro = self._fetch_distro('debian') - paths = helpers.Paths({}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - cc_set_hostname.handle('cc_set_hostname', - cfg, cc, LOG, []) - contents = util.load_file("/etc/hostname") - self.assertEqual('blah', contents.strip()) - - def test_write_hostname_sles(self): - cfg = { - 'hostname': 'blah.blah.blah.suse.com', - } - distro = self._fetch_distro('sles') - paths = helpers.Paths({}) - ds = None - cc = cloud.Cloud(ds, paths, {}, distro, None) - self.patchUtils(self.tmp) - cc_set_hostname.handle('cc_set_hostname', cfg, cc, LOG, []) - contents = util.load_file("/etc/HOSTNAME") - self.assertEqual('blah', contents.strip()) diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py deleted file mode 100644 index 57dce1bc..00000000 --- a/tests/unittests/test_handler/test_handler_snappy.py +++ /dev/null @@ -1,306 +0,0 @@ -from cloudinit.config.cc_snappy import ( - makeop, get_package_ops, render_snap_op) -from cloudinit import util - -from .. import helpers as t_help - -import os -import shutil -import tempfile -import yaml - -ALLOWED = (dict, list, int, str) - - -class TestInstallPackages(t_help.TestCase): - def setUp(self): - super(TestInstallPackages, self).setUp() - self.unapply = [] - - # by default 'which' has nothing in its path - self.apply_patches([(util, 'subp', self._subp)]) - self.subp_called = [] - self.snapcmds = [] - self.tmp = tempfile.mkdtemp(prefix="TestInstallPackages") - - def tearDown(self): - apply_patches([i for i in reversed(self.unapply)]) - shutil.rmtree(self.tmp) - - def apply_patches(self, patches): - ret = apply_patches(patches) - self.unapply += ret - - def populate_tmp(self, files): - return t_help.populate_dir(self.tmp, files) - - def _subp(self, *args, **kwargs): - # supports subp calling with cmd as args or kwargs - if 'args' not in kwargs: - kwargs['args'] = args[0] - self.subp_called.append(kwargs) - args = kwargs['args'] - # here we basically parse the snappy command invoked - # and append to snapcmds a list of (mode, pkg, config) - if args[0:2] == ['snappy', 'config']: - if args[3] == "-": - config = kwargs.get('data', '') - else: - with open(args[3], "rb") as fp: - config = yaml.safe_load(fp.read()) - self.snapcmds.append(['config', args[2], config]) - elif args[0:2] == ['snappy', 'install']: - config = None - pkg = None - for arg in args[2:]: - if arg.startswith("-"): - continue - if not pkg: - pkg = arg - elif not config: - cfgfile = arg - if cfgfile == "-": - config = kwargs.get('data', '') - elif cfgfile: - with open(cfgfile, "rb") as fp: - config = yaml.safe_load(fp.read()) - self.snapcmds.append(['install', pkg, config]) - - def test_package_ops_1(self): - ret = get_package_ops( - packages=['pkg1', 'pkg2', 'pkg3'], - configs={'pkg2': b'mycfg2'}, installed=[]) - self.assertEqual( - ret, [makeop('install', 'pkg1', None, None), - makeop('install', 'pkg2', b'mycfg2', None), - makeop('install', 'pkg3', None, None)]) - - def test_package_ops_config_only(self): - ret = get_package_ops( - packages=None, - configs={'pkg2': b'mycfg2'}, installed=['pkg1', 'pkg2']) - self.assertEqual( - ret, [makeop('config', 'pkg2', b'mycfg2')]) - - def test_package_ops_install_and_config(self): - ret = get_package_ops( - packages=['pkg3', 'pkg2'], - configs={'pkg2': b'mycfg2', 'xinstalled': b'xcfg'}, - installed=['xinstalled']) - self.assertEqual( - ret, [makeop('install', 'pkg3'), - makeop('install', 'pkg2', b'mycfg2'), - makeop('config', 'xinstalled', b'xcfg')]) - - def test_package_ops_install_long_config_short(self): - # a package can be installed by full name, but have config by short - cfg = {'k1': 'k2'} - ret = get_package_ops( - packages=['config-example.canonical'], - configs={'config-example': cfg}, installed=[]) - self.assertEqual( - ret, [makeop('install', 'config-example.canonical', cfg)]) - - def test_package_ops_with_file(self): - self.populate_tmp( - {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg", - "snapf2.snap": b"foo2", "foo.bar": "ignored"}) - ret = get_package_ops( - packages=['pkg1'], configs={}, installed=[], fspath=self.tmp) - self.assertEqual( - ret, - [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap", - cfgfile="snapf1.config"), - makeop_tmpd(self.tmp, 'install', 'snapf2', path="snapf2.snap"), - makeop('install', 'pkg1')]) - - def test_package_ops_common_filename(self): - # fish package name from filename - # package names likely look like: pkgname.namespace_version_arch.snap - - # find filenames - self.populate_tmp( - {"pkg-ws.smoser_0.3.4_all.snap": "pkg-ws-snapdata", - "pkg-ws.config": "pkg-ws-config", - "pkg1.smoser_1.2.3_all.snap": "pkg1.snapdata", - "pkg1.smoser.config": "pkg1.smoser.config-data", - "pkg1.config": "pkg1.config-data", - "pkg2.smoser_0.0_amd64.snap": "pkg2-snapdata", - "pkg2.smoser_0.0_amd64.config": "pkg2.config"}) - - ret = get_package_ops( - packages=[], configs={}, installed=[], fspath=self.tmp) - self.assertEqual( - ret, - [makeop_tmpd(self.tmp, 'install', 'pkg-ws.smoser', - path="pkg-ws.smoser_0.3.4_all.snap", - cfgfile="pkg-ws.config"), - makeop_tmpd(self.tmp, 'install', 'pkg1.smoser', - path="pkg1.smoser_1.2.3_all.snap", - cfgfile="pkg1.smoser.config"), - makeop_tmpd(self.tmp, 'install', 'pkg2.smoser', - path="pkg2.smoser_0.0_amd64.snap", - cfgfile="pkg2.smoser_0.0_amd64.config"), - ]) - - def test_package_ops_config_overrides_file(self): - # config data overrides local file .config - self.populate_tmp( - {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg"}) - ret = get_package_ops( - packages=[], configs={'snapf1': 'snapf1cfg-config'}, - installed=[], fspath=self.tmp) - self.assertEqual( - ret, [makeop_tmpd(self.tmp, 'install', 'snapf1', - path="snapf1.snap", config="snapf1cfg-config")]) - - def test_package_ops_namespacing(self): - cfgs = { - 'config-example': {'k1': 'v1'}, - 'pkg1': {'p1': 'p2'}, - 'ubuntu-core': {'c1': 'c2'}, - 'notinstalled.smoser': {'s1': 's2'}, - } - ret = get_package_ops( - packages=['config-example.canonical'], configs=cfgs, - installed=['config-example.smoser', 'pkg1.canonical', - 'ubuntu-core']) - - expected_configs = [ - makeop('config', 'pkg1', config=cfgs['pkg1']), - makeop('config', 'ubuntu-core', config=cfgs['ubuntu-core'])] - expected_installs = [ - makeop('install', 'config-example.canonical', - config=cfgs['config-example'])] - - installs = [i for i in ret if i['op'] == 'install'] - configs = [c for c in ret if c['op'] == 'config'] - - self.assertEqual(installs, expected_installs) - # configs are not ordered - self.assertEqual(len(configs), len(expected_configs)) - self.assertTrue(all(found in expected_configs for found in configs)) - - def test_render_op_localsnap(self): - self.populate_tmp({"snapf1.snap": b"foo1"}) - op = makeop_tmpd(self.tmp, 'install', 'snapf1', - path='snapf1.snap') - render_snap_op(**op) - self.assertEqual( - self.snapcmds, [['install', op['path'], None]]) - - def test_render_op_localsnap_localconfig(self): - self.populate_tmp( - {"snapf1.snap": b"foo1", 'snapf1.config': b'snapf1cfg'}) - op = makeop_tmpd(self.tmp, 'install', 'snapf1', - path='snapf1.snap', cfgfile='snapf1.config') - render_snap_op(**op) - self.assertEqual( - self.snapcmds, [['install', op['path'], 'snapf1cfg']]) - - def test_render_op_snap(self): - op = makeop('install', 'snapf1') - render_snap_op(**op) - self.assertEqual( - self.snapcmds, [['install', 'snapf1', None]]) - - def test_render_op_snap_config(self): - mycfg = {'key1': 'value1'} - name = "snapf1" - op = makeop('install', name, config=mycfg) - render_snap_op(**op) - self.assertEqual( - self.snapcmds, [['install', name, {'config': {name: mycfg}}]]) - - def test_render_op_config_bytes(self): - name = "snapf1" - mycfg = b'myconfig' - op = makeop('config', name, config=mycfg) - render_snap_op(**op) - self.assertEqual( - self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]]) - - def test_render_op_config_string(self): - name = 'snapf1' - mycfg = 'myconfig: foo\nhisconfig: bar\n' - op = makeop('config', name, config=mycfg) - render_snap_op(**op) - self.assertEqual( - self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]]) - - def test_render_op_config_dict(self): - # config entry for package can be a dict, not a string blob - mycfg = {'foo': 'bar'} - name = 'snapf1' - op = makeop('config', name, config=mycfg) - render_snap_op(**op) - # snapcmds is a list of 3-entry lists. data_found will be the - # blob of data in the file in 'snappy install --config=<file>' - data_found = self.snapcmds[0][2] - self.assertEqual(mycfg, data_found['config'][name]) - - def test_render_op_config_list(self): - # config entry for package can be a list, not a string blob - mycfg = ['foo', 'bar', 'wark', {'f1': 'b1'}] - name = "snapf1" - op = makeop('config', name, config=mycfg) - render_snap_op(**op) - data_found = self.snapcmds[0][2] - self.assertEqual(mycfg, data_found['config'][name]) - - def test_render_op_config_int(self): - # config entry for package can be a list, not a string blob - mycfg = 1 - name = 'snapf1' - op = makeop('config', name, config=mycfg) - render_snap_op(**op) - data_found = self.snapcmds[0][2] - self.assertEqual(mycfg, data_found['config'][name]) - - def test_render_long_configs_short(self): - # install a namespaced package should have un-namespaced config - mycfg = {'k1': 'k2'} - name = 'snapf1' - op = makeop('install', name + ".smoser", config=mycfg) - render_snap_op(**op) - data_found = self.snapcmds[0][2] - self.assertEqual(mycfg, data_found['config'][name]) - - def test_render_does_not_pad_cfgfile(self): - # package_ops with cfgfile should not modify --file= content. - mydata = "foo1: bar1\nk: [l1, l2, l3]\n" - self.populate_tmp( - {"snapf1.snap": b"foo1", "snapf1.config": mydata.encode()}) - ret = get_package_ops( - packages=[], configs={}, installed=[], fspath=self.tmp) - self.assertEqual( - ret, - [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap", - cfgfile="snapf1.config")]) - - # now the op was ok, but test that render didn't mess it up. - render_snap_op(**ret[0]) - data_found = self.snapcmds[0][2] - # the data found gets loaded in the snapcmd interpretation - # so this comparison is a bit lossy, but input to snappy config - # is expected to be yaml loadable, so it should be OK. - self.assertEqual(yaml.safe_load(mydata), data_found) - - -def makeop_tmpd(tmpd, op, name, config=None, path=None, cfgfile=None): - if cfgfile: - cfgfile = os.path.sep.join([tmpd, cfgfile]) - if path: - path = os.path.sep.join([tmpd, path]) - return(makeop(op=op, name=name, config=config, path=path, cfgfile=cfgfile)) - - -def apply_patches(patches): - ret = [] - for (ref, name, replace) in patches: - if replace is None: - continue - orig = getattr(ref, name) - setattr(ref, name, replace) - ret.append((ref, name, orig)) - return ret diff --git a/tests/unittests/test_handler/test_handler_timezone.py b/tests/unittests/test_handler/test_handler_timezone.py deleted file mode 100644 index b7e6b03d..00000000 --- a/tests/unittests/test_handler/test_handler_timezone.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright (C) 2013 Hewlett-Packard Development Company, L.P. -# -# Author: Juerg Haefliger <juerg.haefliger@hp.com> -# -# Based on test_handler_set_hostname.py -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -from cloudinit.config import cc_timezone - -from cloudinit import cloud -from cloudinit import distros -from cloudinit import helpers -from cloudinit import util - -from cloudinit.sources import DataSourceNoCloud - -from .. import helpers as t_help - -from configobj import ConfigObj -import logging -import shutil -from six import BytesIO -import tempfile - -LOG = logging.getLogger(__name__) - - -class TestTimezone(t_help.FilesystemMockingTestCase): - def setUp(self): - super(TestTimezone, self).setUp() - self.new_root = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.new_root) - - def _get_cloud(self, distro): - self.patchUtils(self.new_root) - self.patchOS(self.new_root) - - paths = helpers.Paths({}) - - cls = distros.fetch(distro) - d = cls(distro, {}, paths) - ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths) - cc = cloud.Cloud(ds, paths, {}, d, None) - return cc - - def test_set_timezone_sles(self): - - cfg = { - 'timezone': 'Tatooine/Bestine', - } - cc = self._get_cloud('sles') - - # Create a dummy timezone file - dummy_contents = '0123456789abcdefgh' - util.write_file('/usr/share/zoneinfo/%s' % cfg['timezone'], - dummy_contents) - - cc_timezone.handle('cc_timezone', cfg, cc, LOG, []) - - contents = util.load_file('/etc/sysconfig/clock', decode=False) - n_cfg = ConfigObj(BytesIO(contents)) - self.assertEqual({'TIMEZONE': cfg['timezone']}, dict(n_cfg)) - - contents = util.load_file('/etc/localtime') - self.assertEqual(dummy_contents, contents.strip()) diff --git a/tests/unittests/test_handler/test_handler_write_files.py b/tests/unittests/test_handler/test_handler_write_files.py deleted file mode 100644 index 466e45f8..00000000 --- a/tests/unittests/test_handler/test_handler_write_files.py +++ /dev/null @@ -1,112 +0,0 @@ -from cloudinit.config.cc_write_files import write_files -from cloudinit import log as logging -from cloudinit import util - -from ..helpers import FilesystemMockingTestCase - -import base64 -import gzip -import shutil -import six -import tempfile - -LOG = logging.getLogger(__name__) - -YAML_TEXT = """ -write_files: - - encoding: gzip - content: !!binary | - H4sIAIDb/U8C/1NW1E/KzNMvzuBKTc7IV8hIzcnJVyjPL8pJ4QIA6N+MVxsAAAA= - path: /usr/bin/hello - permissions: '0755' - - content: !!binary | - Zm9vYmFyCg== - path: /wark - permissions: '0755' - - content: | - hi mom line 1 - hi mom line 2 - path: /tmp/message -""" - -YAML_CONTENT_EXPECTED = { - '/usr/bin/hello': "#!/bin/sh\necho hello world\n", - '/wark': "foobar\n", - '/tmp/message': "hi mom line 1\nhi mom line 2\n", -} - - -class TestWriteFiles(FilesystemMockingTestCase): - def setUp(self): - super(TestWriteFiles, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - - def test_simple(self): - self.patchUtils(self.tmp) - expected = "hello world\n" - filename = "/tmp/my.file" - write_files( - "test_simple", [{"content": expected, "path": filename}], LOG) - self.assertEqual(util.load_file(filename), expected) - - def test_yaml_binary(self): - self.patchUtils(self.tmp) - data = util.load_yaml(YAML_TEXT) - write_files("testname", data['write_files'], LOG) - for path, content in YAML_CONTENT_EXPECTED.items(): - self.assertEqual(util.load_file(path), content) - - def test_all_decodings(self): - self.patchUtils(self.tmp) - - # build a 'files' array that has a dictionary of encodings - # for 'gz', 'gzip', 'gz+base64' ... - data = b"foobzr" - utf8_valid = b"foobzr" - utf8_invalid = b'ab\xaadef' - files = [] - expected = [] - - gz_aliases = ('gz', 'gzip') - gz_b64_aliases = ('gz+base64', 'gzip+base64', 'gz+b64', 'gzip+b64') - b64_aliases = ('base64', 'b64') - - datum = (("utf8", utf8_valid), ("no-utf8", utf8_invalid)) - for name, data in datum: - gz = (_gzip_bytes(data), gz_aliases) - gz_b64 = (base64.b64encode(_gzip_bytes(data)), gz_b64_aliases) - b64 = (base64.b64encode(data), b64_aliases) - for content, aliases in (gz, gz_b64, b64): - for enc in aliases: - cur = {'content': content, - 'path': '/tmp/file-%s-%s' % (name, enc), - 'encoding': enc} - files.append(cur) - expected.append((cur['path'], data)) - - write_files("test_decoding", files, LOG) - - for path, content in expected: - self.assertEqual(util.load_file(path, decode=False), content) - - # make sure we actually wrote *some* files. - flen_expected = ( - len(gz_aliases + gz_b64_aliases + b64_aliases) * len(datum)) - self.assertEqual(len(expected), flen_expected) - - -def _gzip_bytes(data): - buf = six.BytesIO() - fp = None - try: - fp = gzip.GzipFile(fileobj=buf, mode="wb") - fp.write(data) - fp.close() - return buf.getvalue() - finally: - if fp: - fp.close() - - -# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py deleted file mode 100644 index 28b060f8..00000000 --- a/tests/unittests/test_handler/test_handler_yum_add_repo.py +++ /dev/null @@ -1,68 +0,0 @@ -from cloudinit.config import cc_yum_add_repo -from cloudinit import util - -from .. import helpers - -import configobj -import logging -import shutil -from six import BytesIO -import tempfile - -LOG = logging.getLogger(__name__) - - -class TestConfig(helpers.FilesystemMockingTestCase): - def setUp(self): - super(TestConfig, self).setUp() - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - - def test_bad_config(self): - cfg = { - 'yum_repos': { - 'epel-testing': { - 'name': 'Extra Packages for Enterprise Linux 5 - Testing', - # Missing this should cause the repo not to be written - # 'baseurl': 'http://blah.org/pub/epel/testing/5/$barch', - 'enabled': False, - 'gpgcheck': True, - 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL', - 'failovermethod': 'priority', - }, - }, - } - self.patchUtils(self.tmp) - cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, []) - self.assertRaises(IOError, util.load_file, - "/etc/yum.repos.d/epel_testing.repo") - - def test_write_config(self): - cfg = { - 'yum_repos': { - 'epel-testing': { - 'name': 'Extra Packages for Enterprise Linux 5 - Testing', - 'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch', - 'enabled': False, - 'gpgcheck': True, - 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL', - 'failovermethod': 'priority', - }, - }, - } - self.patchUtils(self.tmp) - cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, []) - contents = util.load_file("/etc/yum.repos.d/epel_testing.repo", - decode=False) - contents = configobj.ConfigObj(BytesIO(contents)) - expected = { - 'epel_testing': { - 'name': 'Extra Packages for Enterprise Linux 5 - Testing', - 'failovermethod': 'priority', - 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL', - 'enabled': '0', - 'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch', - 'gpgcheck': '1', - } - } - self.assertEqual(expected, dict(contents)) |