summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/__init__.py0
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py452
-rw-r--r--tests/unittests/test_datasource/test_azure.py640
-rw-r--r--tests/unittests/test_datasource/test_azure_helper.py412
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py99
-rw-r--r--tests/unittests/test_datasource/test_cloudstack.py78
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py597
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py127
-rw-r--r--tests/unittests/test_datasource/test_gce.py166
-rw-r--r--tests/unittests/test_datasource/test_maas.py163
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py178
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py300
-rw-r--r--tests/unittests/test_datasource/test_openstack.py347
-rw-r--r--tests/unittests/test_datasource/test_smartos.py543
14 files changed, 0 insertions, 4102 deletions
diff --git a/tests/unittests/test_datasource/__init__.py b/tests/unittests/test_datasource/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/tests/unittests/test_datasource/__init__.py
+++ /dev/null
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
deleted file mode 100644
index 12966563..00000000
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ /dev/null
@@ -1,452 +0,0 @@
-# vi: ts=4 expandtab
-#
-# Copyright (C) 2009-2010 Canonical Ltd.
-# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
-# Copyright (C) 2012 Yahoo! Inc.
-#
-# Author: Joe VLcek <JVLcek@RedHat.com>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3, as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-'''
-This test file exercises the code in sources DataSourceAltCloud.py
-'''
-
-import os
-import shutil
-import tempfile
-
-from cloudinit import helpers
-from cloudinit import util
-from unittest import TestCase
-
-# Get the cloudinit.sources.DataSourceAltCloud import items needed.
-import cloudinit.sources.DataSourceAltCloud
-from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud
-from cloudinit.sources.DataSourceAltCloud import read_user_data_callback
-
-OS_UNAME_ORIG = getattr(os, 'uname')
-
-
-def _write_cloud_info_file(value):
- '''
- Populate the CLOUD_INFO_FILE which would be populated
- with a cloud backend identifier ImageFactory when building
- an image with ImageFactory.
- '''
- cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w')
- cifile.write(value)
- cifile.close()
- os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0o664)
-
-
-def _remove_cloud_info_file():
- '''
- Remove the test CLOUD_INFO_FILE
- '''
- os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE)
-
-
-def _write_user_data_files(mount_dir, value):
- '''
- Populate the deltacloud_user_data_file the user_data_file
- which would be populated with user data.
- '''
- deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
- user_data_file = mount_dir + '/user-data.txt'
-
- udfile = open(deltacloud_user_data_file, 'w')
- udfile.write(value)
- udfile.close()
- os.chmod(deltacloud_user_data_file, 0o664)
-
- udfile = open(user_data_file, 'w')
- udfile.write(value)
- udfile.close()
- os.chmod(user_data_file, 0o664)
-
-
-def _remove_user_data_files(mount_dir,
- dc_file=True,
- non_dc_file=True):
- '''
- Remove the test files: deltacloud_user_data_file and
- user_data_file
- '''
- deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
- user_data_file = mount_dir + '/user-data.txt'
-
- # Ignore any failures removeing files that are already gone.
- if dc_file:
- try:
- os.remove(deltacloud_user_data_file)
- except OSError:
- pass
-
- if non_dc_file:
- try:
- os.remove(user_data_file)
- except OSError:
- pass
-
-
-def _dmi_data(expected):
- '''
- Spoof the data received over DMI
- '''
- def _data(key):
- return expected
-
- return _data
-
-
-class TestGetCloudType(TestCase):
- '''
- Test to exercise method: DataSourceAltCloud.get_cloud_type()
- '''
-
- def setUp(self):
- '''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
- self.dmi_data = util.read_dmi_data
- # We have a different code path for arm to deal with LP1243287
- # We have to switch arch to x86_64 to avoid test failure
- force_arch('x86_64')
-
- def tearDown(self):
- # Reset
- util.read_dmi_data = self.dmi_data
- force_arch()
-
- def test_rhev(self):
- '''
- Test method get_cloud_type() for RHEVm systems.
- Forcing read_dmi_data return to match a RHEVm system: RHEV Hypervisor
- '''
- util.read_dmi_data = _dmi_data('RHEV')
- dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEqual('RHEV', dsrc.get_cloud_type())
-
- def test_vsphere(self):
- '''
- Test method get_cloud_type() for vSphere systems.
- Forcing read_dmi_data return to match a vSphere system: RHEV Hypervisor
- '''
- util.read_dmi_data = _dmi_data('VMware Virtual Platform')
- dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEqual('VSPHERE', dsrc.get_cloud_type())
-
- def test_unknown(self):
- '''
- Test method get_cloud_type() for unknown systems.
- Forcing read_dmi_data return to match an unrecognized return.
- '''
- util.read_dmi_data = _dmi_data('Unrecognized Platform')
- dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEqual('UNKNOWN', dsrc.get_cloud_type())
-
-
-class TestGetDataCloudInfoFile(TestCase):
- '''
- Test to exercise method: DataSourceAltCloud.get_data()
- With a contrived CLOUD_INFO_FILE
- '''
- def setUp(self):
- '''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
- self.cloud_info_file = tempfile.mkstemp()[1]
- self.dmi_data = util.read_dmi_data
- cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
- self.cloud_info_file
-
- def tearDown(self):
- # Reset
-
- # Attempt to remove the temp file ignoring errors
- try:
- os.remove(self.cloud_info_file)
- except OSError:
- pass
-
- util.read_dmi_data = self.dmi_data
- cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
- '/etc/sysconfig/cloud-info'
-
- def test_rhev(self):
- '''Success Test module get_data() forcing RHEV.'''
-
- _write_cloud_info_file('RHEV')
- dsrc = DataSourceAltCloud({}, None, self.paths)
- dsrc.user_data_rhevm = lambda: True
- self.assertEqual(True, dsrc.get_data())
-
- def test_vsphere(self):
- '''Success Test module get_data() forcing VSPHERE.'''
-
- _write_cloud_info_file('VSPHERE')
- dsrc = DataSourceAltCloud({}, None, self.paths)
- dsrc.user_data_vsphere = lambda: True
- self.assertEqual(True, dsrc.get_data())
-
- def test_fail_rhev(self):
- '''Failure Test module get_data() forcing RHEV.'''
-
- _write_cloud_info_file('RHEV')
- dsrc = DataSourceAltCloud({}, None, self.paths)
- dsrc.user_data_rhevm = lambda: False
- self.assertEqual(False, dsrc.get_data())
-
- def test_fail_vsphere(self):
- '''Failure Test module get_data() forcing VSPHERE.'''
-
- _write_cloud_info_file('VSPHERE')
- dsrc = DataSourceAltCloud({}, None, self.paths)
- dsrc.user_data_vsphere = lambda: False
- self.assertEqual(False, dsrc.get_data())
-
- def test_unrecognized(self):
- '''Failure Test module get_data() forcing unrecognized.'''
-
- _write_cloud_info_file('unrecognized')
- dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEqual(False, dsrc.get_data())
-
-
-class TestGetDataNoCloudInfoFile(TestCase):
- '''
- Test to exercise method: DataSourceAltCloud.get_data()
- Without a CLOUD_INFO_FILE
- '''
- def setUp(self):
- '''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
- self.dmi_data = util.read_dmi_data
- cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
- 'no such file'
- # We have a different code path for arm to deal with LP1243287
- # We have to switch arch to x86_64 to avoid test failure
- force_arch('x86_64')
-
- def tearDown(self):
- # Reset
- cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
- '/etc/sysconfig/cloud-info'
- util.read_dmi_data = self.dmi_data
- # Return back to original arch
- force_arch()
-
- def test_rhev_no_cloud_file(self):
- '''Test No cloud info file module get_data() forcing RHEV.'''
-
- util.read_dmi_data = _dmi_data('RHEV Hypervisor')
- dsrc = DataSourceAltCloud({}, None, self.paths)
- dsrc.user_data_rhevm = lambda: True
- self.assertEqual(True, dsrc.get_data())
-
- def test_vsphere_no_cloud_file(self):
- '''Test No cloud info file module get_data() forcing VSPHERE.'''
-
- util.read_dmi_data = _dmi_data('VMware Virtual Platform')
- dsrc = DataSourceAltCloud({}, None, self.paths)
- dsrc.user_data_vsphere = lambda: True
- self.assertEqual(True, dsrc.get_data())
-
- def test_failure_no_cloud_file(self):
- '''Test No cloud info file module get_data() forcing unrecognized.'''
-
- util.read_dmi_data = _dmi_data('Unrecognized Platform')
- dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEqual(False, dsrc.get_data())
-
-
-class TestUserDataRhevm(TestCase):
- '''
- Test to exercise method: DataSourceAltCloud.user_data_rhevm()
- '''
- def setUp(self):
- '''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
- self.mount_dir = tempfile.mkdtemp()
-
- _write_user_data_files(self.mount_dir, 'test user data')
-
- def tearDown(self):
- # Reset
-
- _remove_user_data_files(self.mount_dir)
-
- # Attempt to remove the temp dir ignoring errors
- try:
- shutil.rmtree(self.mount_dir)
- except OSError:
- pass
-
- cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
- '/etc/sysconfig/cloud-info'
- cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
- ['/sbin/modprobe', 'floppy']
- cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
- ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5']
-
- def test_mount_cb_fails(self):
- '''Test user_data_rhevm() where mount_cb fails.'''
-
- cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
- ['echo', 'modprobe floppy']
-
- dsrc = DataSourceAltCloud({}, None, self.paths)
-
- self.assertEqual(False, dsrc.user_data_rhevm())
-
- def test_modprobe_fails(self):
- '''Test user_data_rhevm() where modprobe fails.'''
-
- cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
- ['ls', 'modprobe floppy']
-
- dsrc = DataSourceAltCloud({}, None, self.paths)
-
- self.assertEqual(False, dsrc.user_data_rhevm())
-
- def test_no_modprobe_cmd(self):
- '''Test user_data_rhevm() with no modprobe command.'''
-
- cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
- ['bad command', 'modprobe floppy']
-
- dsrc = DataSourceAltCloud({}, None, self.paths)
-
- self.assertEqual(False, dsrc.user_data_rhevm())
-
- def test_udevadm_fails(self):
- '''Test user_data_rhevm() where udevadm fails.'''
-
- cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
- ['ls', 'udevadm floppy']
-
- dsrc = DataSourceAltCloud({}, None, self.paths)
-
- self.assertEqual(False, dsrc.user_data_rhevm())
-
- def test_no_udevadm_cmd(self):
- '''Test user_data_rhevm() with no udevadm command.'''
-
- cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
- ['bad command', 'udevadm floppy']
-
- dsrc = DataSourceAltCloud({}, None, self.paths)
-
- self.assertEqual(False, dsrc.user_data_rhevm())
-
-
-class TestUserDataVsphere(TestCase):
- '''
- Test to exercise method: DataSourceAltCloud.user_data_vsphere()
- '''
- def setUp(self):
- '''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
- self.mount_dir = tempfile.mkdtemp()
-
- _write_user_data_files(self.mount_dir, 'test user data')
-
- def tearDown(self):
- # Reset
-
- _remove_user_data_files(self.mount_dir)
-
- # Attempt to remove the temp dir ignoring errors
- try:
- shutil.rmtree(self.mount_dir)
- except OSError:
- pass
-
- cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
- '/etc/sysconfig/cloud-info'
-
- def test_user_data_vsphere(self):
- '''Test user_data_vsphere() where mount_cb fails.'''
-
- cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir
-
- dsrc = DataSourceAltCloud({}, None, self.paths)
-
- self.assertEqual(False, dsrc.user_data_vsphere())
-
-
-class TestReadUserDataCallback(TestCase):
- '''
- Test to exercise method: DataSourceAltCloud.read_user_data_callback()
- '''
- def setUp(self):
- '''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
- self.mount_dir = tempfile.mkdtemp()
-
- _write_user_data_files(self.mount_dir, 'test user data')
-
- def tearDown(self):
- # Reset
-
- _remove_user_data_files(self.mount_dir)
-
- # Attempt to remove the temp dir ignoring errors
- try:
- shutil.rmtree(self.mount_dir)
- except OSError:
- pass
-
- def test_callback_both(self):
- '''Test read_user_data_callback() with both files.'''
-
- self.assertEqual('test user data',
- read_user_data_callback(self.mount_dir))
-
- def test_callback_dc(self):
- '''Test read_user_data_callback() with only DC file.'''
-
- _remove_user_data_files(self.mount_dir,
- dc_file=False,
- non_dc_file=True)
-
- self.assertEqual('test user data',
- read_user_data_callback(self.mount_dir))
-
- def test_callback_non_dc(self):
- '''Test read_user_data_callback() with only non-DC file.'''
-
- _remove_user_data_files(self.mount_dir,
- dc_file=True,
- non_dc_file=False)
-
- self.assertEqual('test user data',
- read_user_data_callback(self.mount_dir))
-
- def test_callback_none(self):
- '''Test read_user_data_callback() no files are found.'''
-
- _remove_user_data_files(self.mount_dir)
- self.assertEqual(None, read_user_data_callback(self.mount_dir))
-
-
-def force_arch(arch=None):
-
- def _os_uname():
- return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', arch)
-
- if arch:
- setattr(os, 'uname', _os_uname)
- elif arch is None:
- setattr(os, 'uname', OS_UNAME_ORIG)
-
-
-# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
deleted file mode 100644
index e90e903c..00000000
--- a/tests/unittests/test_datasource/test_azure.py
+++ /dev/null
@@ -1,640 +0,0 @@
-from cloudinit import helpers
-from cloudinit.util import b64e, decode_binary, load_file
-from cloudinit.sources import DataSourceAzure
-
-from ..helpers import TestCase, populate_dir, mock, ExitStack, PY26, SkipTest
-
-import crypt
-import os
-import shutil
-import stat
-import tempfile
-import xml.etree.ElementTree as ET
-import yaml
-
-
-def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
- if data is None:
- data = {'HostName': 'FOOHOST'}
- if pubkeys is None:
- pubkeys = {}
-
- content = """<?xml version="1.0" encoding="utf-8"?>
-<Environment xmlns="http://schemas.dmtf.org/ovf/environment/1"
- xmlns:oe="http://schemas.dmtf.org/ovf/environment/1"
- xmlns:wa="http://schemas.microsoft.com/windowsazure"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-
- <wa:ProvisioningSection><wa:Version>1.0</wa:Version>
- <LinuxProvisioningConfigurationSet
- xmlns="http://schemas.microsoft.com/windowsazure"
- xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
- <ConfigurationSetType>LinuxProvisioningConfiguration</ConfigurationSetType>
- """
- for key, dval in data.items():
- if isinstance(dval, dict):
- val = dval.get('text')
- attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v in dval.items()
- if k != 'text'])
- else:
- val = dval
- attrs = ""
- content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)
-
- if userdata:
- content += "<UserData>%s</UserData>\n" % (b64e(userdata))
-
- if pubkeys:
- content += "<SSH><PublicKeys>\n"
- for fp, path, value in pubkeys:
- content += " <PublicKey>"
- if fp and path:
- content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" %
- (fp, path))
- if value:
- content += "<Value>%s</Value>" % value
- content += "</PublicKey>\n"
- content += "</PublicKeys></SSH>"
- content += """
- </LinuxProvisioningConfigurationSet>
- </wa:ProvisioningSection>
- <wa:PlatformSettingsSection><wa:Version>1.0</wa:Version>
- <PlatformSettings xmlns="http://schemas.microsoft.com/windowsazure"
- xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
- <KmsServerHostname>kms.core.windows.net</KmsServerHostname>
- <ProvisionGuestAgent>false</ProvisionGuestAgent>
- <GuestAgentPackageName i:nil="true" />
- </PlatformSettings></wa:PlatformSettingsSection>
-</Environment>
- """
-
- return content
-
-
-class TestAzureDataSource(TestCase):
-
- def setUp(self):
- super(TestAzureDataSource, self).setUp()
- if PY26:
- raise SkipTest("Does not work on python 2.6")
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
-
- # patch cloud_dir, so our 'seed_dir' is guaranteed empty
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
- self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
-
- self.patches = ExitStack()
- self.addCleanup(self.patches.close)
-
- super(TestAzureDataSource, self).setUp()
-
- def apply_patches(self, patches):
- for module, name, new in patches:
- self.patches.enter_context(mock.patch.object(module, name, new))
-
- def _get_ds(self, data):
-
- def dsdevs():
- return data.get('dsdevs', [])
-
- def _invoke_agent(cmd):
- data['agent_invoked'] = cmd
-
- def _wait_for_files(flist, _maxwait=None, _naplen=None):
- data['waited'] = flist
- return []
-
- def _pubkeys_from_crt_files(flist):
- data['pubkey_files'] = flist
- return ["pubkey_from: %s" % f for f in flist]
-
- if data.get('ovfcontent') is not None:
- populate_dir(os.path.join(self.paths.seed_dir, "azure"),
- {'ovf-env.xml': data['ovfcontent']})
-
- mod = DataSourceAzure
- mod.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
-
- self.get_metadata_from_fabric = mock.MagicMock(return_value={
- 'public-keys': [],
- })
-
- self.instance_id = 'test-instance-id'
-
- self.apply_patches([
- (mod, 'list_possible_azure_ds_devs', dsdevs),
- (mod, 'invoke_agent', _invoke_agent),
- (mod, 'wait_for_files', _wait_for_files),
- (mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files),
- (mod, 'perform_hostname_bounce', mock.MagicMock()),
- (mod, 'get_hostname', mock.MagicMock()),
- (mod, 'set_hostname', mock.MagicMock()),
- (mod, 'get_metadata_from_fabric', self.get_metadata_from_fabric),
- (mod.util, 'read_dmi_data', mock.MagicMock(
- return_value=self.instance_id)),
- ])
-
- dsrc = mod.DataSourceAzureNet(
- data.get('sys_cfg', {}), distro=None, paths=self.paths)
-
- return dsrc
-
- def xml_equals(self, oxml, nxml):
- """Compare two sets of XML to make sure they are equal"""
-
- def create_tag_index(xml):
- et = ET.fromstring(xml)
- ret = {}
- for x in et.iter():
- ret[x.tag] = x
- return ret
-
- def tags_exists(x, y):
- for tag in x.keys():
- self.assertIn(tag, y)
- for tag in y.keys():
- self.assertIn(tag, x)
-
- def tags_equal(x, y):
- for x_tag, x_val in x.items():
- y_val = y.get(x_val.tag)
- self.assertEqual(x_val.text, y_val.text)
-
- old_cnt = create_tag_index(oxml)
- new_cnt = create_tag_index(nxml)
- tags_exists(old_cnt, new_cnt)
- tags_equal(old_cnt, new_cnt)
-
- def xml_notequals(self, oxml, nxml):
- try:
- self.xml_equals(oxml, nxml)
- except AssertionError:
- return
- raise AssertionError("XML is the same")
-
- def test_basic_seed_dir(self):
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': {}}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, "")
- self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName'])
- self.assertTrue(os.path.isfile(
- os.path.join(self.waagent_d, 'ovf-env.xml')))
-
- def test_waagent_d_has_0700_perms(self):
- # we expect /var/lib/waagent to be created 0700
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertTrue(os.path.isdir(self.waagent_d))
- self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700)
-
- def test_user_cfg_set_agent_command_plain(self):
- # set dscfg in via plaintext
- # we must have friendly-to-xml formatted plaintext in yaml_cfg
- # not all plaintext is expected to work.
- yaml_cfg = "{agent_command: my_command}\n"
- cfg = yaml.safe_load(yaml_cfg)
- odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(data['agent_invoked'], cfg['agent_command'])
-
- def test_user_cfg_set_agent_command(self):
- # set dscfg in via base64 encoded yaml
- cfg = {'agent_command': "my_command"}
- odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': b64e(yaml.dump(cfg)),
- 'encoding': 'base64'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(data['agent_invoked'], cfg['agent_command'])
-
- def test_sys_cfg_set_agent_command(self):
- sys_cfg = {'datasource': {'Azure': {'agent_command': '_COMMAND'}}}
- data = {'ovfcontent': construct_valid_ovf_env(data={}),
- 'sys_cfg': sys_cfg}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(data['agent_invoked'], '_COMMAND')
-
- def test_username_used(self):
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(dsrc.cfg['system_info']['default_user']['name'],
- "myuser")
-
- def test_password_given(self):
- odata = {'HostName': "myhost", 'UserName': "myuser",
- 'UserPassword': "mypass"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertTrue('default_user' in dsrc.cfg['system_info'])
- defuser = dsrc.cfg['system_info']['default_user']
-
- # default user should be updated username and should not be locked.
- self.assertEqual(defuser['name'], odata['UserName'])
- self.assertFalse(defuser['lock_passwd'])
- # passwd is crypt formated string $id$salt$encrypted
- # encrypting plaintext with salt value of everything up to final '$'
- # should equal that after the '$'
- pos = defuser['passwd'].rfind("$") + 1
- self.assertEqual(defuser['passwd'],
- crypt.crypt(odata['UserPassword'],
- defuser['passwd'][0:pos]))
-
- def test_userdata_plain(self):
- mydata = "FOOBAR"
- odata = {'UserData': {'text': mydata, 'encoding': 'plain'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(decode_binary(dsrc.userdata_raw), mydata)
-
- def test_userdata_found(self):
- mydata = "FOOBAR"
- odata = {'UserData': {'text': b64e(mydata), 'encoding': 'base64'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, mydata.encode('utf-8'))
-
- def test_no_datasource_expected(self):
- # no source should be found if no seed_dir and no devs
- data = {}
- dsrc = self._get_ds({})
- ret = dsrc.get_data()
- self.assertFalse(ret)
- self.assertFalse('agent_invoked' in data)
-
- def test_cfg_has_pubkeys_fingerprint(self):
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}]
- pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
- data = {'ovfcontent': construct_valid_ovf_env(data=odata,
- pubkeys=pubkeys)}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- for mypk in mypklist:
- self.assertIn(mypk, dsrc.cfg['_pubkeys'])
- self.assertIn('pubkey_from', dsrc.metadata['public-keys'][-1])
-
- def test_cfg_has_pubkeys_value(self):
- # make sure that provided key is used over fingerprint
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': 'value1'}]
- pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
- data = {'ovfcontent': construct_valid_ovf_env(data=odata,
- pubkeys=pubkeys)}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
-
- for mypk in mypklist:
- self.assertIn(mypk, dsrc.cfg['_pubkeys'])
- self.assertIn(mypk['value'], dsrc.metadata['public-keys'])
-
- def test_cfg_has_no_fingerprint_has_value(self):
- # test value is used when fingerprint not provided
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- mypklist = [{'fingerprint': None, 'path': 'path1', 'value': 'value1'}]
- pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
- data = {'ovfcontent': construct_valid_ovf_env(data=odata,
- pubkeys=pubkeys)}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
-
- for mypk in mypklist:
- self.assertIn(mypk['value'], dsrc.metadata['public-keys'])
-
- def test_default_ephemeral(self):
- # make sure the ephemeral device works
- odata = {}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': {}}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- cfg = dsrc.get_config_obj()
-
- self.assertEqual(dsrc.device_name_to_device("ephemeral0"),
- "/dev/sdb")
- assert 'disk_setup' in cfg
- assert 'fs_setup' in cfg
- self.assertIsInstance(cfg['disk_setup'], dict)
- self.assertIsInstance(cfg['fs_setup'], list)
-
- def test_provide_disk_aliases(self):
- # Make sure that user can affect disk aliases
- dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}}
- odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': b64e(yaml.dump(dscfg)),
- 'encoding': 'base64'}}
- usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'},
- 'ephemeral0': False}}
- userdata = '#cloud-config' + yaml.dump(usercfg) + "\n"
-
- ovfcontent = construct_valid_ovf_env(data=odata, userdata=userdata)
- data = {'ovfcontent': ovfcontent, 'sys_cfg': {}}
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- cfg = dsrc.get_config_obj()
- self.assertTrue(cfg)
-
- def test_userdata_arrives(self):
- userdata = "This is my user-data"
- xml = construct_valid_ovf_env(data={}, userdata=userdata)
- data = {'ovfcontent': xml}
- dsrc = self._get_ds(data)
- dsrc.get_data()
-
- self.assertEqual(userdata.encode('us-ascii'), dsrc.userdata_raw)
-
- def test_password_redacted_in_ovf(self):
- odata = {'HostName': "myhost", 'UserName': "myuser",
- 'UserPassword': "mypass"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
-
- self.assertTrue(ret)
- ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml')
-
- # The XML should not be same since the user password is redacted
- on_disk_ovf = load_file(ovf_env_path)
- self.xml_notequals(data['ovfcontent'], on_disk_ovf)
-
- # Make sure that the redacted password on disk is not used by CI
- self.assertNotEqual(dsrc.cfg.get('password'),
- DataSourceAzure.DEF_PASSWD_REDACTION)
-
- # Make sure that the password was really encrypted
- et = ET.fromstring(on_disk_ovf)
- for elem in et.iter():
- if 'UserPassword' in elem.tag:
- self.assertEqual(DataSourceAzure.DEF_PASSWD_REDACTION,
- elem.text)
-
- def test_ovf_env_arrives_in_waagent_dir(self):
- xml = construct_valid_ovf_env(data={}, userdata="FOODATA")
- dsrc = self._get_ds({'ovfcontent': xml})
- dsrc.get_data()
-
- # 'data_dir' is '/var/lib/waagent' (walinux-agent's state dir)
- # we expect that the ovf-env.xml file is copied there.
- ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml')
- self.assertTrue(os.path.exists(ovf_env_path))
- self.xml_equals(xml, load_file(ovf_env_path))
-
- def test_ovf_can_include_unicode(self):
- xml = construct_valid_ovf_env(data={})
- xml = u'\ufeff{0}'.format(xml)
- dsrc = self._get_ds({'ovfcontent': xml})
- dsrc.get_data()
-
- def test_exception_fetching_fabric_data_doesnt_propagate(self):
- ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
- ds.ds_cfg['agent_command'] = '__builtin__'
- self.get_metadata_from_fabric.side_effect = Exception
- self.assertFalse(ds.get_data())
-
- def test_fabric_data_included_in_metadata(self):
- ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
- ds.ds_cfg['agent_command'] = '__builtin__'
- self.get_metadata_from_fabric.return_value = {'test': 'value'}
- ret = ds.get_data()
- self.assertTrue(ret)
- self.assertEqual('value', ds.metadata['test'])
-
- def test_instance_id_from_dmidecode_used(self):
- ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
- ds.get_data()
- self.assertEqual(self.instance_id, ds.metadata['instance-id'])
-
- def test_instance_id_from_dmidecode_used_for_builtin(self):
- ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
- ds.ds_cfg['agent_command'] = '__builtin__'
- ds.get_data()
- self.assertEqual(self.instance_id, ds.metadata['instance-id'])
-
-
-class TestAzureBounce(TestCase):
-
- def mock_out_azure_moving_parts(self):
- self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'invoke_agent'))
- self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'wait_for_files'))
- self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'list_possible_azure_ds_devs',
- mock.MagicMock(return_value=[])))
- self.patches.enter_context(
- mock.patch.object(DataSourceAzure,
- 'find_fabric_formatted_ephemeral_disk',
- mock.MagicMock(return_value=None)))
- self.patches.enter_context(
- mock.patch.object(DataSourceAzure,
- 'find_fabric_formatted_ephemeral_part',
- mock.MagicMock(return_value=None)))
- self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'get_metadata_from_fabric',
- mock.MagicMock(return_value={})))
- self.patches.enter_context(
- mock.patch.object(DataSourceAzure.util, 'read_dmi_data',
- mock.MagicMock(return_value='test-instance-id')))
-
- def setUp(self):
- super(TestAzureBounce, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
- self.addCleanup(shutil.rmtree, self.tmp)
- DataSourceAzure.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
- self.patches = ExitStack()
- self.mock_out_azure_moving_parts()
- self.get_hostname = self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'get_hostname'))
- self.set_hostname = self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'set_hostname'))
- self.subp = self.patches.enter_context(
- mock.patch('cloudinit.sources.DataSourceAzure.util.subp'))
-
- def tearDown(self):
- self.patches.close()
-
- def _get_ds(self, ovfcontent=None):
- if ovfcontent is not None:
- populate_dir(os.path.join(self.paths.seed_dir, "azure"),
- {'ovf-env.xml': ovfcontent})
- return DataSourceAzure.DataSourceAzureNet(
- {}, distro=None, paths=self.paths)
-
- def get_ovf_env_with_dscfg(self, hostname, cfg):
- odata = {
- 'HostName': hostname,
- 'dscfg': {
- 'text': b64e(yaml.dump(cfg)),
- 'encoding': 'base64'
- }
- }
- return construct_valid_ovf_env(data=odata)
-
- def test_disabled_bounce_does_not_change_hostname(self):
- cfg = {'hostname_bounce': {'policy': 'off'}}
- self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
- self.assertEqual(0, self.set_hostname.call_count)
-
- @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
- def test_disabled_bounce_does_not_perform_bounce(
- self, perform_hostname_bounce):
- cfg = {'hostname_bounce': {'policy': 'off'}}
- self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
- self.assertEqual(0, perform_hostname_bounce.call_count)
-
- def test_same_hostname_does_not_change_hostname(self):
- host_name = 'unchanged-host-name'
- self.get_hostname.return_value = host_name
- cfg = {'hostname_bounce': {'policy': 'yes'}}
- self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
- self.assertEqual(0, self.set_hostname.call_count)
-
- @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
- def test_unchanged_hostname_does_not_perform_bounce(
- self, perform_hostname_bounce):
- host_name = 'unchanged-host-name'
- self.get_hostname.return_value = host_name
- cfg = {'hostname_bounce': {'policy': 'yes'}}
- self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
- self.assertEqual(0, perform_hostname_bounce.call_count)
-
- @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
- def test_force_performs_bounce_regardless(self, perform_hostname_bounce):
- host_name = 'unchanged-host-name'
- self.get_hostname.return_value = host_name
- cfg = {'hostname_bounce': {'policy': 'force'}}
- self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
- self.assertEqual(1, perform_hostname_bounce.call_count)
-
- def test_different_hostnames_sets_hostname(self):
- expected_hostname = 'azure-expected-host-name'
- self.get_hostname.return_value = 'default-host-name'
- self._get_ds(
- self.get_ovf_env_with_dscfg(expected_hostname, {})).get_data()
- self.assertEqual(expected_hostname,
- self.set_hostname.call_args_list[0][0][0])
-
- @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
- def test_different_hostnames_performs_bounce(
- self, perform_hostname_bounce):
- expected_hostname = 'azure-expected-host-name'
- self.get_hostname.return_value = 'default-host-name'
- self._get_ds(
- self.get_ovf_env_with_dscfg(expected_hostname, {})).get_data()
- self.assertEqual(1, perform_hostname_bounce.call_count)
-
- def test_different_hostnames_sets_hostname_back(self):
- initial_host_name = 'default-host-name'
- self.get_hostname.return_value = initial_host_name
- self._get_ds(
- self.get_ovf_env_with_dscfg('some-host-name', {})).get_data()
- self.assertEqual(initial_host_name,
- self.set_hostname.call_args_list[-1][0][0])
-
- @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
- def test_failure_in_bounce_still_resets_host_name(
- self, perform_hostname_bounce):
- perform_hostname_bounce.side_effect = Exception
- initial_host_name = 'default-host-name'
- self.get_hostname.return_value = initial_host_name
- self._get_ds(
- self.get_ovf_env_with_dscfg('some-host-name', {})).get_data()
- self.assertEqual(initial_host_name,
- self.set_hostname.call_args_list[-1][0][0])
-
- def test_environment_correct_for_bounce_command(self):
- interface = 'int0'
- hostname = 'my-new-host'
- old_hostname = 'my-old-host'
- self.get_hostname.return_value = old_hostname
- cfg = {'hostname_bounce': {'interface': interface, 'policy': 'force'}}
- data = self.get_ovf_env_with_dscfg(hostname, cfg)
- self._get_ds(data).get_data()
- self.assertEqual(1, self.subp.call_count)
- bounce_env = self.subp.call_args[1]['env']
- self.assertEqual(interface, bounce_env['interface'])
- self.assertEqual(hostname, bounce_env['hostname'])
- self.assertEqual(old_hostname, bounce_env['old_hostname'])
-
- def test_default_bounce_command_used_by_default(self):
- cmd = 'default-bounce-command'
- DataSourceAzure.BUILTIN_DS_CONFIG['hostname_bounce']['command'] = cmd
- cfg = {'hostname_bounce': {'policy': 'force'}}
- data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
- self._get_ds(data).get_data()
- self.assertEqual(1, self.subp.call_count)
- bounce_args = self.subp.call_args[1]['args']
- self.assertEqual(cmd, bounce_args)
-
- @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
- def test_set_hostname_option_can_disable_bounce(
- self, perform_hostname_bounce):
- cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}}
- data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
- self._get_ds(data).get_data()
-
- self.assertEqual(0, perform_hostname_bounce.call_count)
-
- def test_set_hostname_option_can_disable_hostname_set(self):
- cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}}
- data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
- self._get_ds(data).get_data()
-
- self.assertEqual(0, self.set_hostname.call_count)
-
-
-class TestReadAzureOvf(TestCase):
- def test_invalid_xml_raises_non_azure_ds(self):
- invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
- self.assertRaises(DataSourceAzure.BrokenAzureDataSource,
- DataSourceAzure.read_azure_ovf, invalid_xml)
-
- def test_load_with_pubkeys(self):
- mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}]
- pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
- content = construct_valid_ovf_env(pubkeys=pubkeys)
- (_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content)
- for mypk in mypklist:
- self.assertIn(mypk, cfg['_pubkeys'])
diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py
deleted file mode 100644
index 65202ff0..00000000
--- a/tests/unittests/test_datasource/test_azure_helper.py
+++ /dev/null
@@ -1,412 +0,0 @@
-import os
-
-from cloudinit.sources.helpers import azure as azure_helper
-
-from ..helpers import ExitStack, mock, TestCase
-
-
-GOAL_STATE_TEMPLATE = """\
-<?xml version="1.0" encoding="utf-8"?>
-<GoalState xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:noNamespaceSchemaLocation="goalstate10.xsd">
- <Version>2012-11-30</Version>
- <Incarnation>{incarnation}</Incarnation>
- <Machine>
- <ExpectedState>Started</ExpectedState>
- <StopRolesDeadlineHint>300000</StopRolesDeadlineHint>
- <LBProbePorts>
- <Port>16001</Port>
- </LBProbePorts>
- <ExpectHealthReport>FALSE</ExpectHealthReport>
- </Machine>
- <Container>
- <ContainerId>{container_id}</ContainerId>
- <RoleInstanceList>
- <RoleInstance>
- <InstanceId>{instance_id}</InstanceId>
- <State>Started</State>
- <Configuration>
- <HostingEnvironmentConfig>
- http://100.86.192.70:80/...hostingEnvironmentConfig...
- </HostingEnvironmentConfig>
- <SharedConfig>http://100.86.192.70:80/..SharedConfig..</SharedConfig>
- <ExtensionsConfig>
- http://100.86.192.70:80/...extensionsConfig...
- </ExtensionsConfig>
- <FullConfig>http://100.86.192.70:80/...fullConfig...</FullConfig>
- <Certificates>{certificates_url}</Certificates>
- <ConfigName>68ce47.0.68ce47.0.utl-trusty--292258.1.xml</ConfigName>
- </Configuration>
- </RoleInstance>
- </RoleInstanceList>
- </Container>
-</GoalState>
-"""
-
-
-class TestFindEndpoint(TestCase):
-
- def setUp(self):
- super(TestFindEndpoint, self).setUp()
- patches = ExitStack()
- self.addCleanup(patches.close)
-
- self.load_file = patches.enter_context(
- mock.patch.object(azure_helper.util, 'load_file'))
-
- def test_missing_file(self):
- self.load_file.side_effect = IOError
- self.assertRaises(IOError,
- azure_helper.WALinuxAgentShim.find_endpoint)
-
- def test_missing_special_azure_line(self):
- self.load_file.return_value = ''
- self.assertRaises(ValueError,
- azure_helper.WALinuxAgentShim.find_endpoint)
-
- @staticmethod
- def _build_lease_content(encoded_address):
- return '\n'.join([
- 'lease {',
- ' interface "eth0";',
- ' option unknown-245 {0};'.format(encoded_address),
- '}'])
-
- def test_latest_lease_used(self):
- encoded_addresses = ['5:4:3:2', '4:3:2:1']
- file_content = '\n'.join([self._build_lease_content(encoded_address)
- for encoded_address in encoded_addresses])
- self.load_file.return_value = file_content
- self.assertEqual(encoded_addresses[-1].replace(':', '.'),
- azure_helper.WALinuxAgentShim.find_endpoint())
-
-
-class TestExtractIpAddressFromLeaseValue(TestCase):
-
- def test_hex_string(self):
- ip_address, encoded_address = '98.76.54.32', '62:4c:36:20'
- self.assertEqual(
- ip_address,
- azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
- encoded_address
- ))
-
- def test_hex_string_with_single_character_part(self):
- ip_address, encoded_address = '4.3.2.1', '4:3:2:1'
- self.assertEqual(
- ip_address,
- azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
- encoded_address
- ))
-
- def test_packed_string(self):
- ip_address, encoded_address = '98.76.54.32', 'bL6 '
- self.assertEqual(
- ip_address,
- azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
- encoded_address
- ))
-
- def test_packed_string_with_escaped_quote(self):
- ip_address, encoded_address = '100.72.34.108', 'dH\\"l'
- self.assertEqual(
- ip_address,
- azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
- encoded_address
- ))
-
- def test_packed_string_containing_a_colon(self):
- ip_address, encoded_address = '100.72.58.108', 'dH:l'
- self.assertEqual(
- ip_address,
- azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
- encoded_address
- ))
-
-
-class TestGoalStateParsing(TestCase):
-
- default_parameters = {
- 'incarnation': 1,
- 'container_id': 'MyContainerId',
- 'instance_id': 'MyInstanceId',
- 'certificates_url': 'MyCertificatesUrl',
- }
-
- def _get_goal_state(self, http_client=None, **kwargs):
- if http_client is None:
- http_client = mock.MagicMock()
- parameters = self.default_parameters.copy()
- parameters.update(kwargs)
- xml = GOAL_STATE_TEMPLATE.format(**parameters)
- if parameters['certificates_url'] is None:
- new_xml_lines = []
- for line in xml.splitlines():
- if 'Certificates' in line:
- continue
- new_xml_lines.append(line)
- xml = '\n'.join(new_xml_lines)
- return azure_helper.GoalState(xml, http_client)
-
- def test_incarnation_parsed_correctly(self):
- incarnation = '123'
- goal_state = self._get_goal_state(incarnation=incarnation)
- self.assertEqual(incarnation, goal_state.incarnation)
-
- def test_container_id_parsed_correctly(self):
- container_id = 'TestContainerId'
- goal_state = self._get_goal_state(container_id=container_id)
- self.assertEqual(container_id, goal_state.container_id)
-
- def test_instance_id_parsed_correctly(self):
- instance_id = 'TestInstanceId'
- goal_state = self._get_goal_state(instance_id=instance_id)
- self.assertEqual(instance_id, goal_state.instance_id)
-
- def test_certificates_xml_parsed_and_fetched_correctly(self):
- http_client = mock.MagicMock()
- certificates_url = 'TestCertificatesUrl'
- goal_state = self._get_goal_state(
- http_client=http_client, certificates_url=certificates_url)
- certificates_xml = goal_state.certificates_xml
- self.assertEqual(1, http_client.get.call_count)
- self.assertEqual(certificates_url, http_client.get.call_args[0][0])
- self.assertTrue(http_client.get.call_args[1].get('secure', False))
- self.assertEqual(http_client.get.return_value.contents,
- certificates_xml)
-
- def test_missing_certificates_skips_http_get(self):
- http_client = mock.MagicMock()
- goal_state = self._get_goal_state(
- http_client=http_client, certificates_url=None)
- certificates_xml = goal_state.certificates_xml
- self.assertEqual(0, http_client.get.call_count)
- self.assertIsNone(certificates_xml)
-
-
-class TestAzureEndpointHttpClient(TestCase):
-
- regular_headers = {
- 'x-ms-agent-name': 'WALinuxAgent',
- 'x-ms-version': '2012-11-30',
- }
-
- def setUp(self):
- super(TestAzureEndpointHttpClient, self).setUp()
- patches = ExitStack()
- self.addCleanup(patches.close)
-
- self.read_file_or_url = patches.enter_context(
- mock.patch.object(azure_helper.util, 'read_file_or_url'))
-
- def test_non_secure_get(self):
- client = azure_helper.AzureEndpointHttpClient(mock.MagicMock())
- url = 'MyTestUrl'
- response = client.get(url, secure=False)
- self.assertEqual(1, self.read_file_or_url.call_count)
- self.assertEqual(self.read_file_or_url.return_value, response)
- self.assertEqual(mock.call(url, headers=self.regular_headers),
- self.read_file_or_url.call_args)
-
- def test_secure_get(self):
- url = 'MyTestUrl'
- certificate = mock.MagicMock()
- expected_headers = self.regular_headers.copy()
- expected_headers.update({
- "x-ms-cipher-name": "DES_EDE3_CBC",
- "x-ms-guest-agent-public-x509-cert": certificate,
- })
- client = azure_helper.AzureEndpointHttpClient(certificate)
- response = client.get(url, secure=True)
- self.assertEqual(1, self.read_file_or_url.call_count)
- self.assertEqual(self.read_file_or_url.return_value, response)
- self.assertEqual(mock.call(url, headers=expected_headers),
- self.read_file_or_url.call_args)
-
- def test_post(self):
- data = mock.MagicMock()
- url = 'MyTestUrl'
- client = azure_helper.AzureEndpointHttpClient(mock.MagicMock())
- response = client.post(url, data=data)
- self.assertEqual(1, self.read_file_or_url.call_count)
- self.assertEqual(self.read_file_or_url.return_value, response)
- self.assertEqual(
- mock.call(url, data=data, headers=self.regular_headers),
- self.read_file_or_url.call_args)
-
- def test_post_with_extra_headers(self):
- url = 'MyTestUrl'
- client = azure_helper.AzureEndpointHttpClient(mock.MagicMock())
- extra_headers = {'test': 'header'}
- client.post(url, extra_headers=extra_headers)
- self.assertEqual(1, self.read_file_or_url.call_count)
- expected_headers = self.regular_headers.copy()
- expected_headers.update(extra_headers)
- self.assertEqual(
- mock.call(mock.ANY, data=mock.ANY, headers=expected_headers),
- self.read_file_or_url.call_args)
-
-
-class TestOpenSSLManager(TestCase):
-
- def setUp(self):
- super(TestOpenSSLManager, self).setUp()
- patches = ExitStack()
- self.addCleanup(patches.close)
-
- self.subp = patches.enter_context(
- mock.patch.object(azure_helper.util, 'subp'))
- try:
- self.open = patches.enter_context(
- mock.patch('__builtin__.open'))
- except ImportError:
- self.open = patches.enter_context(
- mock.patch('builtins.open'))
-
- @mock.patch.object(azure_helper, 'cd', mock.MagicMock())
- @mock.patch.object(azure_helper.tempfile, 'mkdtemp')
- def test_openssl_manager_creates_a_tmpdir(self, mkdtemp):
- manager = azure_helper.OpenSSLManager()
- self.assertEqual(mkdtemp.return_value, manager.tmpdir)
-
- def test_generate_certificate_uses_tmpdir(self):
- subp_directory = {}
-
- def capture_directory(*args, **kwargs):
- subp_directory['path'] = os.getcwd()
-
- self.subp.side_effect = capture_directory
- manager = azure_helper.OpenSSLManager()
- self.assertEqual(manager.tmpdir, subp_directory['path'])
- manager.clean_up()
-
- @mock.patch.object(azure_helper, 'cd', mock.MagicMock())
- @mock.patch.object(azure_helper.tempfile, 'mkdtemp', mock.MagicMock())
- @mock.patch.object(azure_helper.util, 'del_dir')
- def test_clean_up(self, del_dir):
- manager = azure_helper.OpenSSLManager()
- manager.clean_up()
- self.assertEqual([mock.call(manager.tmpdir)], del_dir.call_args_list)
-
-
-class TestWALinuxAgentShim(TestCase):
-
- def setUp(self):
- super(TestWALinuxAgentShim, self).setUp()
- patches = ExitStack()
- self.addCleanup(patches.close)
-
- self.AzureEndpointHttpClient = patches.enter_context(
- mock.patch.object(azure_helper, 'AzureEndpointHttpClient'))
- self.find_endpoint = patches.enter_context(
- mock.patch.object(
- azure_helper.WALinuxAgentShim, 'find_endpoint'))
- self.GoalState = patches.enter_context(
- mock.patch.object(azure_helper, 'GoalState'))
- self.OpenSSLManager = patches.enter_context(
- mock.patch.object(azure_helper, 'OpenSSLManager'))
- patches.enter_context(
- mock.patch.object(azure_helper.time, 'sleep', mock.MagicMock()))
-
- def test_http_client_uses_certificate(self):
- shim = azure_helper.WALinuxAgentShim()
- shim.register_with_azure_and_fetch_data()
- self.assertEqual(
- [mock.call(self.OpenSSLManager.return_value.certificate)],
- self.AzureEndpointHttpClient.call_args_list)
-
- def test_correct_url_used_for_goalstate(self):
- self.find_endpoint.return_value = 'test_endpoint'
- shim = azure_helper.WALinuxAgentShim()
- shim.register_with_azure_and_fetch_data()
- get = self.AzureEndpointHttpClient.return_value.get
- self.assertEqual(
- [mock.call('http://test_endpoint/machine/?comp=goalstate')],
- get.call_args_list)
- self.assertEqual(
- [mock.call(get.return_value.contents,
- self.AzureEndpointHttpClient.return_value)],
- self.GoalState.call_args_list)
-
- def test_certificates_used_to_determine_public_keys(self):
- shim = azure_helper.WALinuxAgentShim()
- data = shim.register_with_azure_and_fetch_data()
- self.assertEqual(
- [mock.call(self.GoalState.return_value.certificates_xml)],
- self.OpenSSLManager.return_value.parse_certificates.call_args_list)
- self.assertEqual(
- self.OpenSSLManager.return_value.parse_certificates.return_value,
- data['public-keys'])
-
- def test_absent_certificates_produces_empty_public_keys(self):
- self.GoalState.return_value.certificates_xml = None
- shim = azure_helper.WALinuxAgentShim()
- data = shim.register_with_azure_and_fetch_data()
- self.assertEqual([], data['public-keys'])
-
- def test_correct_url_used_for_report_ready(self):
- self.find_endpoint.return_value = 'test_endpoint'
- shim = azure_helper.WALinuxAgentShim()
- shim.register_with_azure_and_fetch_data()
- expected_url = 'http://test_endpoint/machine?comp=health'
- self.assertEqual(
- [mock.call(expected_url, data=mock.ANY, extra_headers=mock.ANY)],
- self.AzureEndpointHttpClient.return_value.post.call_args_list)
-
- def test_goal_state_values_used_for_report_ready(self):
- self.GoalState.return_value.incarnation = 'TestIncarnation'
- self.GoalState.return_value.container_id = 'TestContainerId'
- self.GoalState.return_value.instance_id = 'TestInstanceId'
- shim = azure_helper.WALinuxAgentShim()
- shim.register_with_azure_and_fetch_data()
- posted_document = (
- self.AzureEndpointHttpClient.return_value.post.call_args[1]['data']
- )
- self.assertIn('TestIncarnation', posted_document)
- self.assertIn('TestContainerId', posted_document)
- self.assertIn('TestInstanceId', posted_document)
-
- def test_clean_up_can_be_called_at_any_time(self):
- shim = azure_helper.WALinuxAgentShim()
- shim.clean_up()
-
- def test_clean_up_will_clean_up_openssl_manager_if_instantiated(self):
- shim = azure_helper.WALinuxAgentShim()
- shim.register_with_azure_and_fetch_data()
- shim.clean_up()
- self.assertEqual(
- 1, self.OpenSSLManager.return_value.clean_up.call_count)
-
- def test_failure_to_fetch_goalstate_bubbles_up(self):
- class SentinelException(Exception):
- pass
- self.AzureEndpointHttpClient.return_value.get.side_effect = (
- SentinelException)
- shim = azure_helper.WALinuxAgentShim()
- self.assertRaises(SentinelException,
- shim.register_with_azure_and_fetch_data)
-
-
-class TestGetMetadataFromFabric(TestCase):
-
- @mock.patch.object(azure_helper, 'WALinuxAgentShim')
- def test_data_from_shim_returned(self, shim):
- ret = azure_helper.get_metadata_from_fabric()
- self.assertEqual(
- shim.return_value.register_with_azure_and_fetch_data.return_value,
- ret)
-
- @mock.patch.object(azure_helper, 'WALinuxAgentShim')
- def test_success_calls_clean_up(self, shim):
- azure_helper.get_metadata_from_fabric()
- self.assertEqual(1, shim.return_value.clean_up.call_count)
-
- @mock.patch.object(azure_helper, 'WALinuxAgentShim')
- def test_failure_in_registration_calls_clean_up(self, shim):
- class SentinelException(Exception):
- pass
- shim.return_value.register_with_azure_and_fetch_data.side_effect = (
- SentinelException)
- self.assertRaises(SentinelException,
- azure_helper.get_metadata_from_fabric)
- self.assertEqual(1, shim.return_value.clean_up.call_count)
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
deleted file mode 100644
index 2a42ce0c..00000000
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ /dev/null
@@ -1,99 +0,0 @@
-# coding: utf-8
-
-import copy
-
-from cloudinit.cs_utils import Cepko
-from cloudinit.sources import DataSourceCloudSigma
-
-from .. import helpers as test_helpers
-
-SERVER_CONTEXT = {
- "cpu": 1000,
- "cpus_instead_of_cores": False,
- "global_context": {"some_global_key": "some_global_val"},
- "mem": 1073741824,
- "meta": {
- "ssh_public_key": "ssh-rsa AAAAB3NzaC1yc2E.../hQ5D5 john@doe",
- "cloudinit-user-data": "#cloud-config\n\n...",
- },
- "name": "test_server",
- "requirements": [],
- "smp": 1,
- "tags": ["much server", "very performance"],
- "uuid": "65b2fb23-8c03-4187-a3ba-8b7c919e8890",
- "vnc_password": "9e84d6cb49e46379",
- "vendor_data": {
- "location": "zrh",
- "cloudinit": "#cloud-config\n\n...",
- }
-}
-
-
-class CepkoMock(Cepko):
- def __init__(self, mocked_context):
- self.result = mocked_context
-
- def all(self):
- return self
-
-
-class DataSourceCloudSigmaTest(test_helpers.TestCase):
- def setUp(self):
- super(DataSourceCloudSigmaTest, self).setUp()
- self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
- self.datasource.is_running_in_cloudsigma = lambda: True
- self.datasource.cepko = CepkoMock(SERVER_CONTEXT)
- self.datasource.get_data()
-
- def test_get_hostname(self):
- self.assertEqual("test_server", self.datasource.get_hostname())
- self.datasource.metadata['name'] = ''
- self.assertEqual("65b2fb23", self.datasource.get_hostname())
- self.datasource.metadata['name'] = u'тест'
- self.assertEqual("65b2fb23", self.datasource.get_hostname())
-
- def test_get_public_ssh_keys(self):
- self.assertEqual([SERVER_CONTEXT['meta']['ssh_public_key']],
- self.datasource.get_public_ssh_keys())
-
- def test_get_instance_id(self):
- self.assertEqual(SERVER_CONTEXT['uuid'],
- self.datasource.get_instance_id())
-
- def test_metadata(self):
- self.assertEqual(self.datasource.metadata, SERVER_CONTEXT)
-
- def test_user_data(self):
- self.assertEqual(self.datasource.userdata_raw,
- SERVER_CONTEXT['meta']['cloudinit-user-data'])
-
- def test_encoded_user_data(self):
- encoded_context = copy.deepcopy(SERVER_CONTEXT)
- encoded_context['meta']['base64_fields'] = 'cloudinit-user-data'
- encoded_context['meta']['cloudinit-user-data'] = 'aGkgd29ybGQK'
- self.datasource.cepko = CepkoMock(encoded_context)
- self.datasource.get_data()
-
- self.assertEqual(self.datasource.userdata_raw, b'hi world\n')
-
- def test_vendor_data(self):
- self.assertEqual(self.datasource.vendordata_raw,
- SERVER_CONTEXT['vendor_data']['cloudinit'])
-
- def test_lack_of_vendor_data(self):
- stripped_context = copy.deepcopy(SERVER_CONTEXT)
- del stripped_context["vendor_data"]
- self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
- self.datasource.cepko = CepkoMock(stripped_context)
- self.datasource.get_data()
-
- self.assertIsNone(self.datasource.vendordata_raw)
-
- def test_lack_of_cloudinit_key_in_vendor_data(self):
- stripped_context = copy.deepcopy(SERVER_CONTEXT)
- del stripped_context["vendor_data"]["cloudinit"]
- self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
- self.datasource.cepko = CepkoMock(stripped_context)
- self.datasource.get_data()
-
- self.assertIsNone(self.datasource.vendordata_raw)
diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py
deleted file mode 100644
index b1aab17b..00000000
--- a/tests/unittests/test_datasource/test_cloudstack.py
+++ /dev/null
@@ -1,78 +0,0 @@
-from cloudinit import helpers
-from cloudinit.sources.DataSourceCloudStack import DataSourceCloudStack
-
-from ..helpers import TestCase, mock, ExitStack
-
-
-class TestCloudStackPasswordFetching(TestCase):
-
- def setUp(self):
- super(TestCloudStackPasswordFetching, self).setUp()
- self.patches = ExitStack()
- self.addCleanup(self.patches.close)
- mod_name = 'cloudinit.sources.DataSourceCloudStack'
- self.patches.enter_context(mock.patch('{0}.ec2'.format(mod_name)))
- self.patches.enter_context(mock.patch('{0}.uhelp'.format(mod_name)))
-
- def _set_password_server_response(self, response_string):
- subp = mock.MagicMock(return_value=(response_string, ''))
- self.patches.enter_context(
- mock.patch('cloudinit.sources.DataSourceCloudStack.util.subp',
- subp))
- return subp
-
- def test_empty_password_doesnt_create_config(self):
- self._set_password_server_response('')
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
- ds.get_data()
- self.assertEqual({}, ds.get_config_obj())
-
- def test_saved_password_doesnt_create_config(self):
- self._set_password_server_response('saved_password')
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
- ds.get_data()
- self.assertEqual({}, ds.get_config_obj())
-
- def test_password_sets_password(self):
- password = 'SekritSquirrel'
- self._set_password_server_response(password)
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
- ds.get_data()
- self.assertEqual(password, ds.get_config_obj()['password'])
-
- def test_bad_request_doesnt_stop_ds_from_working(self):
- self._set_password_server_response('bad_request')
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
- self.assertTrue(ds.get_data())
-
- def assertRequestTypesSent(self, subp, expected_request_types):
- request_types = []
- for call in subp.call_args_list:
- args = call[0][0]
- for arg in args:
- if arg.startswith('DomU_Request'):
- request_types.append(arg.split()[1])
- self.assertEqual(expected_request_types, request_types)
-
- def test_valid_response_means_password_marked_as_saved(self):
- password = 'SekritSquirrel'
- subp = self._set_password_server_response(password)
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
- ds.get_data()
- self.assertRequestTypesSent(subp,
- ['send_my_password', 'saved_password'])
-
- def _check_password_not_saved_for(self, response_string):
- subp = self._set_password_server_response(response_string)
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
- ds.get_data()
- self.assertRequestTypesSent(subp, ['send_my_password'])
-
- def test_password_not_saved_if_empty(self):
- self._check_password_not_saved_for('')
-
- def test_password_not_saved_if_already_saved(self):
- self._check_password_not_saved_for('saved_password')
-
- def test_password_not_saved_if_bad_request(self):
- self._check_password_not_saved_for('bad_request')
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
deleted file mode 100644
index 18551b92..00000000
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ /dev/null
@@ -1,597 +0,0 @@
-from copy import copy
-import json
-import os
-import shutil
-import six
-import tempfile
-
-from cloudinit import helpers
-from cloudinit.net import eni
-from cloudinit.net import network_state
-from cloudinit import settings
-from cloudinit.sources import DataSourceConfigDrive as ds
-from cloudinit.sources.helpers import openstack
-from cloudinit import util
-
-from ..helpers import TestCase, ExitStack, mock
-
-
-PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
-EC2_META = {
- 'ami-id': 'ami-00000001',
- 'ami-launch-index': 0,
- 'ami-manifest-path': 'FIXME',
- 'block-device-mapping': {
- 'ami': 'sda1',
- 'ephemeral0': 'sda2',
- 'root': '/dev/sda1',
- 'swap': 'sda3'},
- 'hostname': 'sm-foo-test.novalocal',
- 'instance-action': 'none',
- 'instance-id': 'i-00000001',
- 'instance-type': 'm1.tiny',
- 'local-hostname': 'sm-foo-test.novalocal',
- 'local-ipv4': None,
- 'placement': {'availability-zone': 'nova'},
- 'public-hostname': 'sm-foo-test.novalocal',
- 'public-ipv4': '',
- 'public-keys': {'0': {'openssh-key': PUBKEY}},
- 'reservation-id': 'r-iru5qm4m',
- 'security-groups': ['default']
-}
-USER_DATA = b'#!/bin/sh\necho This is user data\n'
-OSTACK_META = {
- 'availability_zone': 'nova',
- 'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'},
- {'content_path': '/content/0001', 'path': '/etc/bar/bar.cfg'}],
- 'hostname': 'sm-foo-test.novalocal',
- 'meta': {'dsmode': 'local', 'my-meta': 'my-value'},
- 'name': 'sm-foo-test',
- 'public_keys': {'mykey': PUBKEY},
- 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'}
-
-CONTENT_0 = b'This is contents of /etc/foo.cfg\n'
-CONTENT_1 = b'# this is /etc/bar/bar.cfg\n'
-NETWORK_DATA = {
- 'services': [
- {'type': 'dns', 'address': '199.204.44.24'},
- {'type': 'dns', 'address': '199.204.47.54'}
- ],
- 'links': [
- {'vif_id': '2ecc7709-b3f7-4448-9580-e1ec32d75bbd',
- 'ethernet_mac_address': 'fa:16:3e:69:b0:58',
- 'type': 'ovs', 'mtu': None, 'id': 'tap2ecc7709-b3'},
- {'vif_id': '2f88d109-5b57-40e6-af32-2472df09dc33',
- 'ethernet_mac_address': 'fa:16:3e:d4:57:ad',
- 'type': 'ovs', 'mtu': None, 'id': 'tap2f88d109-5b'},
- {'vif_id': '1a5382f8-04c5-4d75-ab98-d666c1ef52cc',
- 'ethernet_mac_address': 'fa:16:3e:05:30:fe',
- 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04', 'name': 'nic0'}
- ],
- 'networks': [
- {'link': 'tap2ecc7709-b3', 'type': 'ipv4_dhcp',
- 'network_id': '6d6357ac-0f70-4afa-8bd7-c274cc4ea235',
- 'id': 'network0'},
- {'link': 'tap2f88d109-5b', 'type': 'ipv4_dhcp',
- 'network_id': 'd227a9b3-6960-4d94-8976-ee5788b44f54',
- 'id': 'network1'},
- {'link': 'tap1a5382f8-04', 'type': 'ipv4_dhcp',
- 'network_id': 'dab2ba57-cae2-4311-a5ed-010b263891f5',
- 'id': 'network2'}
- ]
-}
-
-NETWORK_DATA_2 = {
- "services": [
- {"type": "dns", "address": "1.1.1.191"},
- {"type": "dns", "address": "1.1.1.4"}],
- "networks": [
- {"network_id": "d94bbe94-7abc-48d4-9c82-4628ea26164a", "type": "ipv4",
- "netmask": "255.255.255.248", "link": "eth0",
- "routes": [{"netmask": "0.0.0.0", "network": "0.0.0.0",
- "gateway": "2.2.2.9"}],
- "ip_address": "2.2.2.10", "id": "network0-ipv4"},
- {"network_id": "ca447c83-6409-499b-aaef-6ad1ae995348", "type": "ipv4",
- "netmask": "255.255.255.224", "link": "eth1",
- "routes": [], "ip_address": "3.3.3.24", "id": "network1-ipv4"}],
- "links": [
- {"ethernet_mac_address": "fa:16:3e:dd:50:9a", "mtu": 1500,
- "type": "vif", "id": "eth0", "vif_id": "vif-foo1"},
- {"ethernet_mac_address": "fa:16:3e:a8:14:69", "mtu": 1500,
- "type": "vif", "id": "eth1", "vif_id": "vif-foo2"}]
-}
-
-
-KNOWN_MACS = {
- 'fa:16:3e:69:b0:58': 'enp0s1',
- 'fa:16:3e:d4:57:ad': 'enp0s2',
- 'fa:16:3e:dd:50:9a': 'foo1',
- 'fa:16:3e:a8:14:69': 'foo2',
- 'fa:16:3e:ed:9a:59': 'foo3',
-}
-
-CFG_DRIVE_FILES_V2 = {
- 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
- 'ec2/2009-04-04/user-data': USER_DATA,
- 'ec2/latest/meta-data.json': json.dumps(EC2_META),
- 'ec2/latest/user-data': USER_DATA,
- 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META),
- 'openstack/2012-08-10/user_data': USER_DATA,
- 'openstack/content/0000': CONTENT_0,
- 'openstack/content/0001': CONTENT_1,
- 'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
- 'openstack/latest/user_data': USER_DATA,
- 'openstack/latest/network_data.json': json.dumps(NETWORK_DATA),
- 'openstack/2015-10-15/meta_data.json': json.dumps(OSTACK_META),
- 'openstack/2015-10-15/user_data': USER_DATA,
- 'openstack/2015-10-15/network_data.json': json.dumps(NETWORK_DATA)}
-
-
-class TestConfigDriveDataSource(TestCase):
-
- def setUp(self):
- super(TestConfigDriveDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
-
- def test_ec2_metadata(self):
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- found = ds.read_config_drive(self.tmp)
- self.assertTrue('ec2-metadata' in found)
- ec2_md = found['ec2-metadata']
- self.assertEqual(EC2_META, ec2_md)
-
- def test_dev_os_remap(self):
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
- None,
- helpers.Paths({}))
- found = ds.read_config_drive(self.tmp)
- cfg_ds.metadata = found['metadata']
- name_tests = {
- 'ami': '/dev/vda1',
- 'root': '/dev/vda1',
- 'ephemeral0': '/dev/vda2',
- 'swap': '/dev/vda3',
- }
- for name, dev_name in name_tests.items():
- with ExitStack() as mocks:
- provided_name = dev_name[len('/dev/'):]
- provided_name = "s" + provided_name[1:]
- find_mock = mocks.enter_context(
- mock.patch.object(util, 'find_devs_with',
- return_value=[provided_name]))
- # We want os.path.exists() to return False on its first call,
- # and True on its second call. We use a handy generator as
- # the mock side effect for this. The mocked function returns
- # what the side effect returns.
-
- def exists_side_effect():
- yield False
- yield True
- exists_mock = mocks.enter_context(
- mock.patch.object(os.path, 'exists',
- side_effect=exists_side_effect()))
- device = cfg_ds.device_name_to_device(name)
- self.assertEqual(dev_name, device)
-
- find_mock.assert_called_once_with(mock.ANY)
- self.assertEqual(exists_mock.call_count, 2)
-
- def test_dev_os_map(self):
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
- None,
- helpers.Paths({}))
- found = ds.read_config_drive(self.tmp)
- os_md = found['metadata']
- cfg_ds.metadata = os_md
- name_tests = {
- 'ami': '/dev/vda1',
- 'root': '/dev/vda1',
- 'ephemeral0': '/dev/vda2',
- 'swap': '/dev/vda3',
- }
- for name, dev_name in name_tests.items():
- with ExitStack() as mocks:
- find_mock = mocks.enter_context(
- mock.patch.object(util, 'find_devs_with',
- return_value=[dev_name]))
- exists_mock = mocks.enter_context(
- mock.patch.object(os.path, 'exists',
- return_value=True))
- device = cfg_ds.device_name_to_device(name)
- self.assertEqual(dev_name, device)
-
- find_mock.assert_called_once_with(mock.ANY)
- exists_mock.assert_called_once_with(mock.ANY)
-
- def test_dev_ec2_remap(self):
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
- None,
- helpers.Paths({}))
- found = ds.read_config_drive(self.tmp)
- ec2_md = found['ec2-metadata']
- os_md = found['metadata']
- cfg_ds.ec2_metadata = ec2_md
- cfg_ds.metadata = os_md
- name_tests = {
- 'ami': '/dev/vda1',
- 'root': '/dev/vda1',
- 'ephemeral0': '/dev/vda2',
- 'swap': '/dev/vda3',
- None: None,
- 'bob': None,
- 'root2k': None,
- }
- for name, dev_name in name_tests.items():
- # We want os.path.exists() to return False on its first call,
- # and True on its second call. We use a handy generator as
- # the mock side effect for this. The mocked function returns
- # what the side effect returns.
- def exists_side_effect():
- yield False
- yield True
- with mock.patch.object(os.path, 'exists',
- side_effect=exists_side_effect()):
- device = cfg_ds.device_name_to_device(name)
- self.assertEqual(dev_name, device)
- # We don't assert the call count for os.path.exists() because
- # not all of the entries in name_tests results in two calls to
- # that function. Specifically, 'root2k' doesn't seem to call
- # it at all.
-
- def test_dev_ec2_map(self):
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
- None,
- helpers.Paths({}))
- found = ds.read_config_drive(self.tmp)
- ec2_md = found['ec2-metadata']
- os_md = found['metadata']
- cfg_ds.ec2_metadata = ec2_md
- cfg_ds.metadata = os_md
- name_tests = {
- 'ami': '/dev/sda1',
- 'root': '/dev/sda1',
- 'ephemeral0': '/dev/sda2',
- 'swap': '/dev/sda3',
- None: None,
- 'bob': None,
- 'root2k': None,
- }
- for name, dev_name in name_tests.items():
- with mock.patch.object(os.path, 'exists', return_value=True):
- device = cfg_ds.device_name_to_device(name)
- self.assertEqual(dev_name, device)
-
- def test_dir_valid(self):
- """Verify a dir is read as such."""
-
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
-
- found = ds.read_config_drive(self.tmp)
-
- expected_md = copy(OSTACK_META)
- expected_md['instance-id'] = expected_md['uuid']
- expected_md['local-hostname'] = expected_md['hostname']
-
- self.assertEqual(USER_DATA, found['userdata'])
- self.assertEqual(expected_md, found['metadata'])
- self.assertEqual(NETWORK_DATA, found['networkdata'])
- self.assertEqual(found['files']['/etc/foo.cfg'], CONTENT_0)
- self.assertEqual(found['files']['/etc/bar/bar.cfg'], CONTENT_1)
-
- def test_seed_dir_valid_extra(self):
- """Verify extra files do not affect datasource validity."""
-
- data = copy(CFG_DRIVE_FILES_V2)
- data["myfoofile.txt"] = "myfoocontent"
- data["openstack/latest/random-file.txt"] = "random-content"
-
- populate_dir(self.tmp, data)
-
- found = ds.read_config_drive(self.tmp)
-
- expected_md = copy(OSTACK_META)
- expected_md['instance-id'] = expected_md['uuid']
- expected_md['local-hostname'] = expected_md['hostname']
-
- self.assertEqual(expected_md, found['metadata'])
-
- def test_seed_dir_bad_json_metadata(self):
- """Verify that bad json in metadata raises BrokenConfigDriveDir."""
- data = copy(CFG_DRIVE_FILES_V2)
-
- data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}"
- data["openstack/2015-10-15/meta_data.json"] = "non-json garbage {}"
- data["openstack/latest/meta_data.json"] = "non-json garbage {}"
-
- populate_dir(self.tmp, data)
-
- self.assertRaises(openstack.BrokenMetadata,
- ds.read_config_drive, self.tmp)
-
- def test_seed_dir_no_configdrive(self):
- """Verify that no metadata raises NonConfigDriveDir."""
-
- my_d = os.path.join(self.tmp, "non-configdrive")
- data = copy(CFG_DRIVE_FILES_V2)
- data["myfoofile.txt"] = "myfoocontent"
- data["openstack/latest/random-file.txt"] = "random-content"
- data["content/foo"] = "foocontent"
-
- self.assertRaises(openstack.NonReadable,
- ds.read_config_drive, my_d)
-
- def test_seed_dir_missing(self):
- """Verify that missing seed_dir raises NonConfigDriveDir."""
- my_d = os.path.join(self.tmp, "nonexistantdirectory")
- self.assertRaises(openstack.NonReadable,
- ds.read_config_drive, my_d)
-
- def test_find_candidates(self):
- devs_with_answers = {}
-
- def my_devs_with(*args, **kwargs):
- criteria = args[0] if len(args) else kwargs.pop('criteria', None)
- return devs_with_answers.get(criteria, [])
-
- def my_is_partition(dev):
- return dev[-1] in "0123456789" and not dev.startswith("sr")
-
- try:
- orig_find_devs_with = util.find_devs_with
- util.find_devs_with = my_devs_with
-
- orig_is_partition = util.is_partition
- util.is_partition = my_is_partition
-
- devs_with_answers = {"TYPE=vfat": [],
- "TYPE=iso9660": ["/dev/vdb"],
- "LABEL=config-2": ["/dev/vdb"]}
- self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
-
- # add a vfat item
- # zdd reverse sorts after vdb, but config-2 label is preferred
- devs_with_answers['TYPE=vfat'] = ["/dev/zdd"]
- self.assertEqual(["/dev/vdb", "/dev/zdd"],
- ds.find_candidate_devs())
-
- # verify that partitions are considered, that have correct label.
- devs_with_answers = {"TYPE=vfat": ["/dev/sda1"],
- "TYPE=iso9660": [],
- "LABEL=config-2": ["/dev/vdb3"]}
- self.assertEqual(["/dev/vdb3"],
- ds.find_candidate_devs())
-
- finally:
- util.find_devs_with = orig_find_devs_with
- util.is_partition = orig_is_partition
-
- @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
- def test_pubkeys_v2(self, on_first_boot):
- """Verify that public-keys work in config-drive-v2."""
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- myds = cfg_ds_from_dir(self.tmp)
- self.assertEqual(myds.get_public_ssh_keys(),
- [OSTACK_META['public_keys']['mykey']])
-
-
-class TestNetJson(TestCase):
- def setUp(self):
- super(TestNetJson, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.maxDiff = None
-
- @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
- def test_network_data_is_found(self, on_first_boot):
- """Verify that network_data is present in ds in config-drive-v2."""
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- myds = cfg_ds_from_dir(self.tmp)
- self.assertIsNotNone(myds.network_json)
-
- @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
- def test_network_config_is_converted(self, on_first_boot):
- """Verify that network_data is converted and present on ds object."""
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- myds = cfg_ds_from_dir(self.tmp)
- network_config = openstack.convert_net_json(NETWORK_DATA,
- known_macs=KNOWN_MACS)
- self.assertEqual(myds.network_config, network_config)
-
- def test_network_config_conversions(self):
- """Tests a bunch of input network json and checks the
- expected conversions."""
- in_datas = [
- NETWORK_DATA,
- {
- 'services': [{'type': 'dns', 'address': '172.19.0.12'}],
- 'networks': [{
- 'network_id': 'dacd568d-5be6-4786-91fe-750c374b78b4',
- 'type': 'ipv4',
- 'netmask': '255.255.252.0',
- 'link': 'tap1a81968a-79',
- 'routes': [{
- 'netmask': '0.0.0.0',
- 'network': '0.0.0.0',
- 'gateway': '172.19.3.254',
- }],
- 'ip_address': '172.19.1.34',
- 'id': 'network0',
- }],
- 'links': [{
- 'type': 'bridge',
- 'vif_id': '1a81968a-797a-400f-8a80-567f997eb93f',
- 'ethernet_mac_address': 'fa:16:3e:ed:9a:59',
- 'id': 'tap1a81968a-79',
- 'mtu': None,
- }],
- },
- ]
- out_datas = [
- {
- 'version': 1,
- 'config': [
- {
- 'subnets': [{'type': 'dhcp4'}],
- 'type': 'physical',
- 'mac_address': 'fa:16:3e:69:b0:58',
- 'name': 'enp0s1',
- 'mtu': None,
- },
- {
- 'subnets': [{'type': 'dhcp4'}],
- 'type': 'physical',
- 'mac_address': 'fa:16:3e:d4:57:ad',
- 'name': 'enp0s2',
- 'mtu': None,
- },
- {
- 'subnets': [{'type': 'dhcp4'}],
- 'type': 'physical',
- 'mac_address': 'fa:16:3e:05:30:fe',
- 'name': 'nic0',
- 'mtu': None,
- },
- {
- 'type': 'nameserver',
- 'address': '199.204.44.24',
- },
- {
- 'type': 'nameserver',
- 'address': '199.204.47.54',
- }
- ],
-
- },
- {
- 'version': 1,
- 'config': [
- {
- 'name': 'foo3',
- 'mac_address': 'fa:16:3e:ed:9a:59',
- 'mtu': None,
- 'type': 'physical',
- 'subnets': [
- {
- 'address': '172.19.1.34',
- 'netmask': '255.255.252.0',
- 'type': 'static',
- 'ipv4': True,
- 'routes': [{
- 'gateway': '172.19.3.254',
- 'netmask': '0.0.0.0',
- 'network': '0.0.0.0',
- }],
- }
- ]
- },
- {
- 'type': 'nameserver',
- 'address': '172.19.0.12',
- }
- ],
- },
- ]
- for in_data, out_data in zip(in_datas, out_datas):
- conv_data = openstack.convert_net_json(in_data,
- known_macs=KNOWN_MACS)
- self.assertEqual(out_data, conv_data)
-
-
-class TestConvertNetworkData(TestCase):
- def setUp(self):
- super(TestConvertNetworkData, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
-
- def _getnames_in_config(self, ncfg):
- return set([n['name'] for n in ncfg['config']
- if n['type'] == 'physical'])
-
- def test_conversion_fills_names(self):
- ncfg = openstack.convert_net_json(NETWORK_DATA, known_macs=KNOWN_MACS)
- expected = set(['nic0', 'enp0s1', 'enp0s2'])
- found = self._getnames_in_config(ncfg)
- self.assertEqual(found, expected)
-
- @mock.patch('cloudinit.net.get_interfaces_by_mac')
- def test_convert_reads_system_prefers_name(self, get_interfaces_by_mac):
- macs = KNOWN_MACS.copy()
- macs.update({'fa:16:3e:05:30:fe': 'foonic1',
- 'fa:16:3e:69:b0:58': 'ens1'})
- get_interfaces_by_mac.return_value = macs
-
- ncfg = openstack.convert_net_json(NETWORK_DATA)
- expected = set(['nic0', 'ens1', 'enp0s2'])
- found = self._getnames_in_config(ncfg)
- self.assertEqual(found, expected)
-
- def test_convert_raises_value_error_on_missing_name(self):
- macs = {'aa:aa:aa:aa:aa:00': 'ens1'}
- self.assertRaises(ValueError, openstack.convert_net_json,
- NETWORK_DATA, known_macs=macs)
-
- def test_conversion_with_route(self):
- ncfg = openstack.convert_net_json(NETWORK_DATA_2,
- known_macs=KNOWN_MACS)
- # not the best test, but see that we get a route in the
- # network config and that it gets rendered to an ENI file
- routes = []
- for n in ncfg['config']:
- for s in n.get('subnets', []):
- routes.extend(s.get('routes', []))
- self.assertIn(
- {'network': '0.0.0.0', 'netmask': '0.0.0.0', 'gateway': '2.2.2.9'},
- routes)
- eni_renderer = eni.Renderer()
- eni_renderer.render_network_state(
- self.tmp, network_state.parse_net_config_data(ncfg))
- with open(os.path.join(self.tmp, "etc",
- "network", "interfaces"), 'r') as f:
- eni_rendering = f.read()
- self.assertIn("route add default gw 2.2.2.9", eni_rendering)
-
-
-def cfg_ds_from_dir(seed_d):
- cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None,
- helpers.Paths({}))
- cfg_ds.seed_dir = seed_d
- cfg_ds.known_macs = KNOWN_MACS.copy()
- if not cfg_ds.get_data():
- raise RuntimeError("Data source did not extract itself from"
- " seed directory %s" % seed_d)
- return cfg_ds
-
-
-def populate_ds_from_read_config(cfg_ds, source, results):
- """Patch the DataSourceConfigDrive from the results of
- read_config_drive_dir hopefully in line with what it would have
- if cfg_ds.get_data had been successfully called"""
- cfg_ds.source = source
- cfg_ds.metadata = results.get('metadata')
- cfg_ds.ec2_metadata = results.get('ec2-metadata')
- cfg_ds.userdata_raw = results.get('userdata')
- cfg_ds.version = results.get('version')
- cfg_ds.network_json = results.get('networkdata')
- cfg_ds._network_config = openstack.convert_net_json(
- cfg_ds.network_json, known_macs=KNOWN_MACS)
-
-
-def populate_dir(seed_dir, files):
- for (name, content) in files.items():
- path = os.path.join(seed_dir, name)
- dirname = os.path.dirname(path)
- if not os.path.isdir(dirname):
- os.makedirs(dirname)
- if isinstance(content, six.text_type):
- mode = "w"
- else:
- mode = "wb"
- with open(path, mode) as fp:
- fp.write(content)
-
-# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
deleted file mode 100644
index 8936a1e3..00000000
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ /dev/null
@@ -1,127 +0,0 @@
-#
-# Copyright (C) 2014 Neal Shrader
-#
-# Author: Neal Shrader <neal@digitalocean.com>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3, as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import re
-
-from six.moves.urllib_parse import urlparse
-
-from cloudinit import helpers
-from cloudinit import settings
-from cloudinit.sources import DataSourceDigitalOcean
-
-from .. import helpers as test_helpers
-
-httpretty = test_helpers.import_httpretty()
-
-# Abbreviated for the test
-DO_INDEX = """id
- hostname
- user-data
- vendor-data
- public-keys
- region"""
-
-DO_MULTIPLE_KEYS = """ssh-rsa AAAAB3NzaC1yc2EAAAA... neal@digitalocean.com
- ssh-rsa AAAAB3NzaC1yc2EAAAA... neal2@digitalocean.com"""
-DO_SINGLE_KEY = "ssh-rsa AAAAB3NzaC1yc2EAAAA... neal@digitalocean.com"
-
-DO_META = {
- '': DO_INDEX,
- 'user-data': '#!/bin/bash\necho "user-data"',
- 'vendor-data': '#!/bin/bash\necho "vendor-data"',
- 'public-keys': DO_SINGLE_KEY,
- 'region': 'nyc3',
- 'id': '2000000',
- 'hostname': 'cloudinit-test',
-}
-
-MD_URL_RE = re.compile(r'http://169.254.169.254/metadata/v1/.*')
-
-
-def _request_callback(method, uri, headers):
- url_path = urlparse(uri).path
- if url_path.startswith('/metadata/v1/'):
- path = url_path.split('/metadata/v1/')[1:][0]
- else:
- path = None
- if path in DO_META:
- return (200, headers, DO_META.get(path))
- else:
- return (404, headers, '')
-
-
-class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
-
- def setUp(self):
- self.ds = DataSourceDigitalOcean.DataSourceDigitalOcean(
- settings.CFG_BUILTIN, None,
- helpers.Paths({}))
- super(TestDataSourceDigitalOcean, self).setUp()
-
- @httpretty.activate
- def test_connection(self):
- httpretty.register_uri(
- httpretty.GET, MD_URL_RE,
- body=_request_callback)
-
- success = self.ds.get_data()
- self.assertTrue(success)
-
- @httpretty.activate
- def test_metadata(self):
- httpretty.register_uri(
- httpretty.GET, MD_URL_RE,
- body=_request_callback)
- self.ds.get_data()
-
- self.assertEqual(DO_META.get('user-data'),
- self.ds.get_userdata_raw())
-
- self.assertEqual(DO_META.get('vendor-data'),
- self.ds.get_vendordata_raw())
-
- self.assertEqual(DO_META.get('region'),
- self.ds.availability_zone)
-
- self.assertEqual(DO_META.get('id'),
- self.ds.get_instance_id())
-
- self.assertEqual(DO_META.get('hostname'),
- self.ds.get_hostname())
-
- self.assertEqual('http://mirrors.digitalocean.com/',
- self.ds.get_package_mirror_info())
-
- # Single key
- self.assertEqual([DO_META.get('public-keys')],
- self.ds.get_public_ssh_keys())
-
- self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
-
- @httpretty.activate
- def test_multiple_ssh_keys(self):
- DO_META['public_keys'] = DO_MULTIPLE_KEYS
- httpretty.register_uri(
- httpretty.GET, MD_URL_RE,
- body=_request_callback)
- self.ds.get_data()
-
- # Multiple keys
- self.assertEqual(DO_META.get('public-keys').splitlines(),
- self.ds.get_public_ssh_keys())
-
- self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
deleted file mode 100644
index 6e62a4d2..00000000
--- a/tests/unittests/test_datasource/test_gce.py
+++ /dev/null
@@ -1,166 +0,0 @@
-#
-# Copyright (C) 2014 Vaidas Jablonskis
-#
-# Author: Vaidas Jablonskis <jablonskis@gmail.com>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3, as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import re
-
-from base64 import b64encode, b64decode
-from six.moves.urllib_parse import urlparse
-
-from cloudinit import helpers
-from cloudinit import settings
-from cloudinit.sources import DataSourceGCE
-
-from .. import helpers as test_helpers
-
-httpretty = test_helpers.import_httpretty()
-
-GCE_META = {
- 'instance/id': '123',
- 'instance/zone': 'foo/bar',
- 'project/attributes/sshKeys': 'user:ssh-rsa AA2..+aRD0fyVw== root@server',
- 'instance/hostname': 'server.project-foo.local',
- 'instance/attributes/user-data': b'/bin/echo foo\n',
-}
-
-GCE_META_PARTIAL = {
- 'instance/id': '1234',
- 'instance/hostname': 'server.project-bar.local',
- 'instance/zone': 'bar/baz',
-}
-
-GCE_META_ENCODING = {
- 'instance/id': '12345',
- 'instance/hostname': 'server.project-baz.local',
- 'instance/zone': 'baz/bang',
- 'instance/attributes/user-data': b64encode(b'/bin/echo baz\n'),
- 'instance/attributes/user-data-encoding': 'base64',
-}
-
-HEADERS = {'X-Google-Metadata-Request': 'True'}
-MD_URL_RE = re.compile(
- r'http://metadata.google.internal/computeMetadata/v1/.*')
-
-
-def _set_mock_metadata(gce_meta=None):
- if gce_meta is None:
- gce_meta = GCE_META
-
- def _request_callback(method, uri, headers):
- url_path = urlparse(uri).path
- if url_path.startswith('/computeMetadata/v1/'):
- path = url_path.split('/computeMetadata/v1/')[1:][0]
- else:
- path = None
- if path in gce_meta:
- return (200, headers, gce_meta.get(path))
- else:
- return (404, headers, '')
-
- httpretty.register_uri(httpretty.GET, MD_URL_RE, body=_request_callback)
-
-
-@httpretty.activate
-class TestDataSourceGCE(test_helpers.HttprettyTestCase):
-
- def setUp(self):
- self.ds = DataSourceGCE.DataSourceGCE(
- settings.CFG_BUILTIN, None,
- helpers.Paths({}))
- super(TestDataSourceGCE, self).setUp()
-
- def test_connection(self):
- _set_mock_metadata()
- success = self.ds.get_data()
- self.assertTrue(success)
-
- req_header = httpretty.last_request().headers
- self.assertDictContainsSubset(HEADERS, req_header)
-
- def test_metadata(self):
- _set_mock_metadata()
- self.ds.get_data()
-
- shostname = GCE_META.get('instance/hostname').split('.')[0]
- self.assertEqual(shostname,
- self.ds.get_hostname())
-
- self.assertEqual(GCE_META.get('instance/id'),
- self.ds.get_instance_id())
-
- self.assertEqual(GCE_META.get('instance/attributes/user-data'),
- self.ds.get_userdata_raw())
-
- # test partial metadata (missing user-data in particular)
- def test_metadata_partial(self):
- _set_mock_metadata(GCE_META_PARTIAL)
- self.ds.get_data()
-
- self.assertEqual(GCE_META_PARTIAL.get('instance/id'),
- self.ds.get_instance_id())
-
- shostname = GCE_META_PARTIAL.get('instance/hostname').split('.')[0]
- self.assertEqual(shostname, self.ds.get_hostname())
-
- def test_metadata_encoding(self):
- _set_mock_metadata(GCE_META_ENCODING)
- self.ds.get_data()
-
- decoded = b64decode(
- GCE_META_ENCODING.get('instance/attributes/user-data'))
- self.assertEqual(decoded, self.ds.get_userdata_raw())
-
- def test_missing_required_keys_return_false(self):
- for required_key in ['instance/id', 'instance/zone',
- 'instance/hostname']:
- meta = GCE_META_PARTIAL.copy()
- del meta[required_key]
- _set_mock_metadata(meta)
- self.assertEqual(False, self.ds.get_data())
- httpretty.reset()
-
- def test_project_level_ssh_keys_are_used(self):
- _set_mock_metadata()
- self.ds.get_data()
-
- # we expect a list of public ssh keys with user names stripped
- self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'],
- self.ds.get_public_ssh_keys())
-
- def test_instance_level_ssh_keys_are_used(self):
- key_content = 'ssh-rsa JustAUser root@server'
- meta = GCE_META.copy()
- meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content)
-
- _set_mock_metadata(meta)
- self.ds.get_data()
-
- self.assertIn(key_content, self.ds.get_public_ssh_keys())
-
- def test_instance_level_keys_replace_project_level_keys(self):
- key_content = 'ssh-rsa JustAUser root@server'
- meta = GCE_META.copy()
- meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content)
-
- _set_mock_metadata(meta)
- self.ds.get_data()
-
- self.assertEqual([key_content], self.ds.get_public_ssh_keys())
-
- def test_only_last_part_of_zone_used_for_availability_zone(self):
- _set_mock_metadata()
- self.ds.get_data()
- self.assertEqual('bar', self.ds.availability_zone)
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
deleted file mode 100644
index f66f1c6d..00000000
--- a/tests/unittests/test_datasource/test_maas.py
+++ /dev/null
@@ -1,163 +0,0 @@
-from copy import copy
-import os
-import shutil
-import tempfile
-
-from cloudinit.sources import DataSourceMAAS
-from cloudinit import url_helper
-from ..helpers import TestCase, populate_dir
-
-try:
- from unittest import mock
-except ImportError:
- import mock
-
-
-class TestMAASDataSource(TestCase):
-
- def setUp(self):
- super(TestMAASDataSource, self).setUp()
- # Make a temp directoy for tests to use.
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
-
- def test_seed_dir_valid(self):
- """Verify a valid seeddir is read as such."""
-
- data = {'instance-id': 'i-valid01',
- 'local-hostname': 'valid01-hostname',
- 'user-data': b'valid01-userdata',
- 'public-keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname'}
-
- my_d = os.path.join(self.tmp, "valid")
- populate_dir(my_d, data)
-
- (userdata, metadata) = DataSourceMAAS.read_maas_seed_dir(my_d)
-
- self.assertEqual(userdata, data['user-data'])
- for key in ('instance-id', 'local-hostname'):
- self.assertEqual(data[key], metadata[key])
-
- # verify that 'userdata' is not returned as part of the metadata
- self.assertFalse(('user-data' in metadata))
-
- def test_seed_dir_valid_extra(self):
- """Verify extra files do not affect seed_dir validity."""
-
- data = {'instance-id': 'i-valid-extra',
- 'local-hostname': 'valid-extra-hostname',
- 'user-data': b'valid-extra-userdata', 'foo': 'bar'}
-
- my_d = os.path.join(self.tmp, "valid_extra")
- populate_dir(my_d, data)
-
- (userdata, metadata) = DataSourceMAAS.read_maas_seed_dir(my_d)
-
- self.assertEqual(userdata, data['user-data'])
- for key in ('instance-id', 'local-hostname'):
- self.assertEqual(data[key], metadata[key])
-
- # additional files should not just appear as keys in metadata atm
- self.assertFalse(('foo' in metadata))
-
- def test_seed_dir_invalid(self):
- """Verify that invalid seed_dir raises MAASSeedDirMalformed."""
-
- valid = {'instance-id': 'i-instanceid',
- 'local-hostname': 'test-hostname', 'user-data': ''}
-
- my_based = os.path.join(self.tmp, "valid_extra")
-
- # missing 'userdata' file
- my_d = "%s-01" % my_based
- invalid_data = copy(valid)
- del invalid_data['local-hostname']
- populate_dir(my_d, invalid_data)
- self.assertRaises(DataSourceMAAS.MAASSeedDirMalformed,
- DataSourceMAAS.read_maas_seed_dir, my_d)
-
- # missing 'instance-id'
- my_d = "%s-02" % my_based
- invalid_data = copy(valid)
- del invalid_data['instance-id']
- populate_dir(my_d, invalid_data)
- self.assertRaises(DataSourceMAAS.MAASSeedDirMalformed,
- DataSourceMAAS.read_maas_seed_dir, my_d)
-
- def test_seed_dir_none(self):
- """Verify that empty seed_dir raises MAASSeedDirNone."""
-
- my_d = os.path.join(self.tmp, "valid_empty")
- self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
- DataSourceMAAS.read_maas_seed_dir, my_d)
-
- def test_seed_dir_missing(self):
- """Verify that missing seed_dir raises MAASSeedDirNone."""
- self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
- DataSourceMAAS.read_maas_seed_dir,
- os.path.join(self.tmp, "nonexistantdirectory"))
-
- def test_seed_url_valid(self):
- """Verify that valid seed_url is read as such."""
- valid = {
- 'meta-data/instance-id': 'i-instanceid',
- 'meta-data/local-hostname': 'test-hostname',
- 'meta-data/public-keys': 'test-hostname',
- 'user-data': b'foodata',
- }
- valid_order = [
- 'meta-data/local-hostname',
- 'meta-data/instance-id',
- 'meta-data/public-keys',
- 'user-data',
- ]
- my_seed = "http://example.com/xmeta"
- my_ver = "1999-99-99"
- my_headers = {'header1': 'value1', 'header2': 'value2'}
-
- def my_headers_cb(url):
- return my_headers
-
- # Each time url_helper.readurl() is called, something different is
- # returned based on the canned data above. We need to build up a list
- # of side effect return values, which the mock will return. At the
- # same time, we'll build up a list of expected call arguments for
- # asserting after the code under test is run.
- calls = []
-
- def side_effect():
- for key in valid_order:
- resp = valid.get(key)
- url = "%s/%s/%s" % (my_seed, my_ver, key)
- calls.append(
- mock.call(url, headers=None, timeout=mock.ANY,
- data=mock.ANY, sec_between=mock.ANY,
- ssl_details=mock.ANY, retries=mock.ANY,
- headers_cb=my_headers_cb,
- exception_cb=mock.ANY))
- yield url_helper.StringResponse(resp)
-
- # Now do the actual call of the code under test.
- with mock.patch.object(url_helper, 'readurl',
- side_effect=side_effect()) as mockobj:
- userdata, metadata = DataSourceMAAS.read_maas_seed_url(
- my_seed, version=my_ver)
-
- self.assertEqual(b"foodata", userdata)
- self.assertEqual(metadata['instance-id'],
- valid['meta-data/instance-id'])
- self.assertEqual(metadata['local-hostname'],
- valid['meta-data/local-hostname'])
-
- mockobj.has_calls(calls)
-
- def test_seed_url_invalid(self):
- """Verify that invalid seed_url raises MAASSeedDirMalformed."""
- pass
-
- def test_seed_url_missing(self):
- """Verify seed_url with no found entries raises MAASSeedDirNone."""
- pass
-
-
-# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
deleted file mode 100644
index b0fa1130..00000000
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ /dev/null
@@ -1,178 +0,0 @@
-from cloudinit import helpers
-from cloudinit.sources import DataSourceNoCloud
-from cloudinit import util
-from ..helpers import TestCase, populate_dir, mock, ExitStack
-
-import os
-import shutil
-import tempfile
-
-import yaml
-
-
-class TestNoCloudDataSource(TestCase):
-
- def setUp(self):
- super(TestNoCloudDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
-
- self.cmdline = "root=TESTCMDLINE"
-
- self.mocks = ExitStack()
- self.addCleanup(self.mocks.close)
-
- self.mocks.enter_context(
- mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))
-
- def test_nocloud_seed_dir(self):
- md = {'instance-id': 'IID', 'dsmode': 'local'}
- ud = b"USER_DATA_HERE"
- populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
- {'user-data': ud, 'meta-data': yaml.safe_dump(md)})
-
- sys_cfg = {
- 'datasource': {'NoCloud': {'fs_label': None}}
- }
-
- ds = DataSourceNoCloud.DataSourceNoCloud
-
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
- ret = dsrc.get_data()
- self.assertEqual(dsrc.userdata_raw, ud)
- self.assertEqual(dsrc.metadata, md)
- self.assertTrue(ret)
-
- def test_fs_label(self):
- # find_devs_with should not be called ff fs_label is None
- ds = DataSourceNoCloud.DataSourceNoCloud
-
- class PsuedoException(Exception):
- pass
-
- def my_find_devs_with(*args, **kwargs):
- raise PsuedoException
-
- self.mocks.enter_context(
- mock.patch.object(util, 'find_devs_with',
- side_effect=PsuedoException))
-
- # by default, NoCloud should search for filesystems by label
- sys_cfg = {'datasource': {'NoCloud': {}}}
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
- self.assertRaises(PsuedoException, dsrc.get_data)
-
- # but disabling searching should just end up with None found
- sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
- ret = dsrc.get_data()
- self.assertFalse(ret)
-
- def test_no_datasource_expected(self):
- # no source should be found if no cmdline, config, and fs_label=None
- sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
-
- ds = DataSourceNoCloud.DataSourceNoCloud
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
- self.assertFalse(dsrc.get_data())
-
- def test_seed_in_config(self):
- ds = DataSourceNoCloud.DataSourceNoCloud
-
- data = {
- 'fs_label': None,
- 'meta-data': yaml.safe_dump({'instance-id': 'IID'}),
- 'user-data': b"USER_DATA_RAW",
- }
-
- sys_cfg = {'datasource': {'NoCloud': data}}
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
- ret = dsrc.get_data()
- self.assertEqual(dsrc.userdata_raw, b"USER_DATA_RAW")
- self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
- self.assertTrue(ret)
-
- def test_nocloud_seed_with_vendordata(self):
- md = {'instance-id': 'IID', 'dsmode': 'local'}
- ud = b"USER_DATA_HERE"
- vd = b"THIS IS MY VENDOR_DATA"
-
- populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
- {'user-data': ud, 'meta-data': yaml.safe_dump(md),
- 'vendor-data': vd})
-
- sys_cfg = {
- 'datasource': {'NoCloud': {'fs_label': None}}
- }
-
- ds = DataSourceNoCloud.DataSourceNoCloud
-
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
- ret = dsrc.get_data()
- self.assertEqual(dsrc.userdata_raw, ud)
- self.assertEqual(dsrc.metadata, md)
- self.assertEqual(dsrc.vendordata_raw, vd)
- self.assertTrue(ret)
-
- def test_nocloud_no_vendordata(self):
- populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
- {'user-data': b"ud", 'meta-data': "instance-id: IID\n"})
-
- sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
-
- ds = DataSourceNoCloud.DataSourceNoCloud
-
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
- ret = dsrc.get_data()
- self.assertEqual(dsrc.userdata_raw, b"ud")
- self.assertFalse(dsrc.vendordata)
- self.assertTrue(ret)
-
-
-class TestParseCommandLineData(TestCase):
-
- def test_parse_cmdline_data_valid(self):
- ds_id = "ds=nocloud"
- pairs = (
- ("root=/dev/sda1 %(ds_id)s", {}),
- ("%(ds_id)s; root=/dev/foo", {}),
- ("%(ds_id)s", {}),
- ("%(ds_id)s;", {}),
- ("%(ds_id)s;s=SEED", {'seedfrom': 'SEED'}),
- ("%(ds_id)s;seedfrom=SEED;local-hostname=xhost",
- {'seedfrom': 'SEED', 'local-hostname': 'xhost'}),
- ("%(ds_id)s;h=xhost",
- {'local-hostname': 'xhost'}),
- ("%(ds_id)s;h=xhost;i=IID",
- {'local-hostname': 'xhost', 'instance-id': 'IID'}),
- )
-
- for (fmt, expected) in pairs:
- fill = {}
- cmdline = fmt % {'ds_id': ds_id}
- ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill,
- cmdline=cmdline)
- self.assertEqual(expected, fill)
- self.assertTrue(ret)
-
- def test_parse_cmdline_data_none(self):
- ds_id = "ds=foo"
- cmdlines = (
- "root=/dev/sda1 ro",
- "console=/dev/ttyS0 root=/dev/foo",
- "",
- "ds=foocloud",
- "ds=foo-net",
- "ds=nocloud;s=SEED",
- )
-
- for cmdline in cmdlines:
- fill = {}
- ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill,
- cmdline=cmdline)
- self.assertEqual(fill, {})
- self.assertFalse(ret)
-
-
-# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
deleted file mode 100644
index d796f030..00000000
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ /dev/null
@@ -1,300 +0,0 @@
-from cloudinit import helpers
-from cloudinit.sources import DataSourceOpenNebula as ds
-from cloudinit import util
-from ..helpers import TestCase, populate_dir
-
-import os
-import pwd
-import shutil
-import tempfile
-import unittest
-
-
-TEST_VARS = {
- 'VAR1': 'single',
- 'VAR2': 'double word',
- 'VAR3': 'multi\nline\n',
- 'VAR4': "'single'",
- 'VAR5': "'double word'",
- 'VAR6': "'multi\nline\n'",
- 'VAR7': 'single\\t',
- 'VAR8': 'double\\tword',
- 'VAR9': 'multi\\t\nline\n',
- 'VAR10': '\\', # expect '\'
- 'VAR11': '\'', # expect '
- 'VAR12': '$', # expect $
-}
-
-INVALID_CONTEXT = ';'
-USER_DATA = '#cloud-config\napt_upgrade: true'
-SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i'
-HOSTNAME = 'foo.example.com'
-PUBLIC_IP = '10.0.0.3'
-
-CMD_IP_OUT = '''\
-1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
- link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
-2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
- link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff
-'''
-
-
-class TestOpenNebulaDataSource(TestCase):
- parsed_user = None
-
- def setUp(self):
- super(TestOpenNebulaDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
-
- # defaults for few tests
- self.ds = ds.DataSourceOpenNebula
- self.seed_dir = os.path.join(self.paths.seed_dir, "opennebula")
- self.sys_cfg = {'datasource': {'OpenNebula': {'dsmode': 'local'}}}
-
- # we don't want 'sudo' called in tests. so we patch switch_user_cmd
- def my_switch_user_cmd(user):
- self.parsed_user = user
- return []
-
- self.switch_user_cmd_real = ds.switch_user_cmd
- ds.switch_user_cmd = my_switch_user_cmd
-
- def tearDown(self):
- ds.switch_user_cmd = self.switch_user_cmd_real
- super(TestOpenNebulaDataSource, self).tearDown()
-
- def test_get_data_non_contextdisk(self):
- orig_find_devs_with = util.find_devs_with
- try:
- # dont' try to lookup for CDs
- util.find_devs_with = lambda n: []
- dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
- ret = dsrc.get_data()
- self.assertFalse(ret)
- finally:
- util.find_devs_with = orig_find_devs_with
-
- def test_get_data_broken_contextdisk(self):
- orig_find_devs_with = util.find_devs_with
- try:
- # dont' try to lookup for CDs
- util.find_devs_with = lambda n: []
- populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT})
- dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
- self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
- finally:
- util.find_devs_with = orig_find_devs_with
-
- def test_get_data_invalid_identity(self):
- orig_find_devs_with = util.find_devs_with
- try:
- # generate non-existing system user name
- sys_cfg = self.sys_cfg
- invalid_user = 'invalid'
- while not sys_cfg['datasource']['OpenNebula'].get('parseuser'):
- try:
- pwd.getpwnam(invalid_user)
- invalid_user += 'X'
- except KeyError:
- sys_cfg['datasource']['OpenNebula']['parseuser'] = \
- invalid_user
-
- # dont' try to lookup for CDs
- util.find_devs_with = lambda n: []
- populate_context_dir(self.seed_dir, {'KEY1': 'val1'})
- dsrc = self.ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
- self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data)
- finally:
- util.find_devs_with = orig_find_devs_with
-
- def test_get_data(self):
- orig_find_devs_with = util.find_devs_with
- try:
- # dont' try to lookup for CDs
- util.find_devs_with = lambda n: []
- populate_context_dir(self.seed_dir, {'KEY1': 'val1'})
- dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- finally:
- util.find_devs_with = orig_find_devs_with
-
- def test_seed_dir_non_contextdisk(self):
- self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir,
- self.seed_dir)
-
- def test_seed_dir_empty1_context(self):
- populate_dir(self.seed_dir, {'context.sh': ''})
- results = ds.read_context_disk_dir(self.seed_dir)
-
- self.assertEqual(results['userdata'], None)
- self.assertEqual(results['metadata'], {})
-
- def test_seed_dir_empty2_context(self):
- populate_context_dir(self.seed_dir, {})
- results = ds.read_context_disk_dir(self.seed_dir)
-
- self.assertEqual(results['userdata'], None)
- self.assertEqual(results['metadata'], {})
-
- def test_seed_dir_broken_context(self):
- populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT})
-
- self.assertRaises(ds.BrokenContextDiskDir,
- ds.read_context_disk_dir,
- self.seed_dir)
-
- def test_context_parser(self):
- populate_context_dir(self.seed_dir, TEST_VARS)
- results = ds.read_context_disk_dir(self.seed_dir)
-
- self.assertTrue('metadata' in results)
- self.assertEqual(TEST_VARS, results['metadata'])
-
- def test_ssh_key(self):
- public_keys = ['first key', 'second key']
- for c in range(4):
- for k in ('SSH_KEY', 'SSH_PUBLIC_KEY'):
- my_d = os.path.join(self.tmp, "%s-%i" % (k, c))
- populate_context_dir(my_d, {k: '\n'.join(public_keys)})
- results = ds.read_context_disk_dir(my_d)
-
- self.assertTrue('metadata' in results)
- self.assertTrue('public-keys' in results['metadata'])
- self.assertEqual(public_keys,
- results['metadata']['public-keys'])
-
- public_keys.append(SSH_KEY % (c + 1,))
-
- def test_user_data_plain(self):
- for k in ('USER_DATA', 'USERDATA'):
- my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: USER_DATA,
- 'USERDATA_ENCODING': ''})
- results = ds.read_context_disk_dir(my_d)
-
- self.assertTrue('userdata' in results)
- self.assertEqual(USER_DATA, results['userdata'])
-
- def test_user_data_encoding_required_for_decode(self):
- b64userdata = util.b64e(USER_DATA)
- for k in ('USER_DATA', 'USERDATA'):
- my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: b64userdata})
- results = ds.read_context_disk_dir(my_d)
-
- self.assertTrue('userdata' in results)
- self.assertEqual(b64userdata, results['userdata'])
-
- def test_user_data_base64_encoding(self):
- for k in ('USER_DATA', 'USERDATA'):
- my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: util.b64e(USER_DATA),
- 'USERDATA_ENCODING': 'base64'})
- results = ds.read_context_disk_dir(my_d)
-
- self.assertTrue('userdata' in results)
- self.assertEqual(USER_DATA, results['userdata'])
-
- def test_hostname(self):
- for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'):
- my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: PUBLIC_IP})
- results = ds.read_context_disk_dir(my_d)
-
- self.assertTrue('metadata' in results)
- self.assertTrue('local-hostname' in results['metadata'])
- self.assertEqual(PUBLIC_IP, results['metadata']['local-hostname'])
-
- def test_network_interfaces(self):
- populate_context_dir(self.seed_dir, {'ETH0_IP': '1.2.3.4'})
- results = ds.read_context_disk_dir(self.seed_dir)
-
- self.assertTrue('network-interfaces' in results)
-
- def test_find_candidates(self):
- def my_devs_with(criteria):
- return {
- "LABEL=CONTEXT": ["/dev/sdb"],
- "LABEL=CDROM": ["/dev/sr0"],
- "TYPE=iso9660": ["/dev/vdb"],
- }.get(criteria, [])
-
- orig_find_devs_with = util.find_devs_with
- try:
- util.find_devs_with = my_devs_with
- self.assertEqual(["/dev/sdb", "/dev/sr0", "/dev/vdb"],
- ds.find_candidate_devs())
- finally:
- util.find_devs_with = orig_find_devs_with
-
-
-class TestOpenNebulaNetwork(unittest.TestCase):
-
- def setUp(self):
- super(TestOpenNebulaNetwork, self).setUp()
-
- def test_lo(self):
- net = ds.OpenNebulaNetwork('', {})
- self.assertEqual(net.gen_conf(), u'''\
-auto lo
-iface lo inet loopback
-''')
-
- def test_eth0(self):
- net = ds.OpenNebulaNetwork(CMD_IP_OUT, {})
- self.assertEqual(net.gen_conf(), u'''\
-auto lo
-iface lo inet loopback
-
-auto eth0
-iface eth0 inet static
- address 10.18.1.1
- network 10.18.1.0
- netmask 255.255.255.0
-''')
-
- def test_eth0_override(self):
- context = {
- 'DNS': '1.2.3.8',
- 'ETH0_IP': '1.2.3.4',
- 'ETH0_NETWORK': '1.2.3.0',
- 'ETH0_MASK': '255.255.0.0',
- 'ETH0_GATEWAY': '1.2.3.5',
- 'ETH0_DOMAIN': 'example.com',
- 'ETH0_DNS': '1.2.3.6 1.2.3.7'
- }
-
- net = ds.OpenNebulaNetwork(CMD_IP_OUT, context)
- self.assertEqual(net.gen_conf(), u'''\
-auto lo
-iface lo inet loopback
-
-auto eth0
-iface eth0 inet static
- address 1.2.3.4
- network 1.2.3.0
- netmask 255.255.0.0
- gateway 1.2.3.5
- dns-search example.com
- dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7
-''')
-
-
-class TestParseShellConfig(unittest.TestCase):
- def test_no_seconds(self):
- cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"])
- # we could test 'sleep 2', but that would make the test run slower.
- ret = ds.parse_shell_config(cfg)
- self.assertEqual(ret, {"foo": "bar", "xx": "foo"})
-
-
-def populate_context_dir(path, variables):
- data = "# Context variables generated by OpenNebula\n"
- for k, v in variables.items():
- data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''")))
- populate_dir(path, {'context.sh': data})
-
-# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
deleted file mode 100644
index 5c8592c5..00000000
--- a/tests/unittests/test_datasource/test_openstack.py
+++ /dev/null
@@ -1,347 +0,0 @@
-# vi: ts=4 expandtab
-#
-# Copyright (C) 2014 Yahoo! Inc.
-#
-# Author: Joshua Harlow <harlowja@yahoo-inc.com>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3, as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import copy
-import json
-import re
-
-from .. import helpers as test_helpers
-
-from six.moves.urllib.parse import urlparse
-from six import StringIO
-
-from cloudinit import helpers
-from cloudinit import settings
-from cloudinit.sources import DataSourceOpenStack as ds
-from cloudinit.sources.helpers import openstack
-from cloudinit import util
-
-hp = test_helpers.import_httpretty()
-
-BASE_URL = "http://169.254.169.254"
-PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
-EC2_META = {
- 'ami-id': 'ami-00000001',
- 'ami-launch-index': '0',
- 'ami-manifest-path': 'FIXME',
- 'hostname': 'sm-foo-test.novalocal',
- 'instance-action': 'none',
- 'instance-id': 'i-00000001',
- 'instance-type': 'm1.tiny',
- 'local-hostname': 'sm-foo-test.novalocal',
- 'local-ipv4': '0.0.0.0',
- 'public-hostname': 'sm-foo-test.novalocal',
- 'public-ipv4': '0.0.0.1',
- 'reservation-id': 'r-iru5qm4m',
-}
-USER_DATA = b'#!/bin/sh\necho This is user data\n'
-VENDOR_DATA = {
- 'magic': '',
-}
-OSTACK_META = {
- 'availability_zone': 'nova',
- 'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'},
- {'content_path': '/content/0001', 'path': '/etc/bar/bar.cfg'}],
- 'hostname': 'sm-foo-test.novalocal',
- 'meta': {'dsmode': 'local', 'my-meta': 'my-value'},
- 'name': 'sm-foo-test',
- 'public_keys': {'mykey': PUBKEY},
- 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c',
-}
-CONTENT_0 = b'This is contents of /etc/foo.cfg\n'
-CONTENT_1 = b'# this is /etc/bar/bar.cfg\n'
-OS_FILES = {
- 'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
- 'openstack/latest/user_data': USER_DATA,
- 'openstack/content/0000': CONTENT_0,
- 'openstack/content/0001': CONTENT_1,
- 'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
- 'openstack/latest/user_data': USER_DATA,
- 'openstack/latest/vendor_data.json': json.dumps(VENDOR_DATA),
-}
-EC2_FILES = {
- 'latest/user-data': USER_DATA,
-}
-EC2_VERSIONS = [
- 'latest',
-]
-
-
-def _register_uris(version, ec2_files, ec2_meta, os_files):
- """Registers a set of url patterns into httpretty that will mimic the
- same data returned by the openstack metadata service (and ec2 service)."""
-
- def match_ec2_url(uri, headers):
- path = uri.path.strip("/")
- if len(path) == 0:
- return (200, headers, "\n".join(EC2_VERSIONS))
- path = uri.path.lstrip("/")
- if path in ec2_files:
- return (200, headers, ec2_files.get(path))
- if path == 'latest/meta-data/':
- buf = StringIO()
- for (k, v) in ec2_meta.items():
- if isinstance(v, (list, tuple)):
- buf.write("%s/" % (k))
- else:
- buf.write("%s" % (k))
- buf.write("\n")
- return (200, headers, buf.getvalue())
- if path.startswith('latest/meta-data/'):
- value = None
- pieces = path.split("/")
- if path.endswith("/"):
- pieces = pieces[2:-1]
- value = util.get_cfg_by_path(ec2_meta, pieces)
- else:
- pieces = pieces[2:]
- value = util.get_cfg_by_path(ec2_meta, pieces)
- if value is not None:
- return (200, headers, str(value))
- return (404, headers, '')
-
- def match_os_uri(uri, headers):
- path = uri.path.strip("/")
- if path == 'openstack':
- return (200, headers, "\n".join([openstack.OS_LATEST]))
- path = uri.path.lstrip("/")
- if path in os_files:
- return (200, headers, os_files.get(path))
- return (404, headers, '')
-
- def get_request_callback(method, uri, headers):
- uri = urlparse(uri)
- path = uri.path.lstrip("/").split("/")
- if path[0] == 'openstack':
- return match_os_uri(uri, headers)
- return match_ec2_url(uri, headers)
-
- hp.register_uri(hp.GET, re.compile(r'http://169.254.169.254/.*'),
- body=get_request_callback)
-
-
-def _read_metadata_service():
- return ds.read_metadata_service(BASE_URL, retries=0, timeout=0.1)
-
-
-class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
- VERSION = 'latest'
-
- @hp.activate
- def test_successful(self):
- _register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES)
- f = _read_metadata_service()
- self.assertEqual(VENDOR_DATA, f.get('vendordata'))
- self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg'])
- self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
- self.assertEqual(2, len(f['files']))
- self.assertEqual(USER_DATA, f.get('userdata'))
- self.assertEqual(EC2_META, f.get('ec2-metadata'))
- self.assertEqual(2, f.get('version'))
- metadata = f['metadata']
- self.assertEqual('nova', metadata.get('availability_zone'))
- self.assertEqual('sm-foo-test.novalocal', metadata.get('hostname'))
- self.assertEqual('sm-foo-test.novalocal',
- metadata.get('local-hostname'))
- self.assertEqual('sm-foo-test', metadata.get('name'))
- self.assertEqual('b0fa911b-69d4-4476-bbe2-1c92bff6535c',
- metadata.get('uuid'))
- self.assertEqual('b0fa911b-69d4-4476-bbe2-1c92bff6535c',
- metadata.get('instance-id'))
-
- @hp.activate
- def test_no_ec2(self):
- _register_uris(self.VERSION, {}, {}, OS_FILES)
- f = _read_metadata_service()
- self.assertEqual(VENDOR_DATA, f.get('vendordata'))
- self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg'])
- self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
- self.assertEqual(USER_DATA, f.get('userdata'))
- self.assertEqual({}, f.get('ec2-metadata'))
- self.assertEqual(2, f.get('version'))
-
- @hp.activate
- def test_bad_metadata(self):
- os_files = copy.deepcopy(OS_FILES)
- for k in list(os_files.keys()):
- if k.endswith('meta_data.json'):
- os_files.pop(k, None)
- _register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.NonReadable, _read_metadata_service)
-
- @hp.activate
- def test_bad_uuid(self):
- os_files = copy.deepcopy(OS_FILES)
- os_meta = copy.deepcopy(OSTACK_META)
- os_meta.pop('uuid')
- for k in list(os_files.keys()):
- if k.endswith('meta_data.json'):
- os_files[k] = json.dumps(os_meta)
- _register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.BrokenMetadata, _read_metadata_service)
-
- @hp.activate
- def test_userdata_empty(self):
- os_files = copy.deepcopy(OS_FILES)
- for k in list(os_files.keys()):
- if k.endswith('user_data'):
- os_files.pop(k, None)
- _register_uris(self.VERSION, {}, {}, os_files)
- f = _read_metadata_service()
- self.assertEqual(VENDOR_DATA, f.get('vendordata'))
- self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg'])
- self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
- self.assertFalse(f.get('userdata'))
-
- @hp.activate
- def test_vendordata_empty(self):
- os_files = copy.deepcopy(OS_FILES)
- for k in list(os_files.keys()):
- if k.endswith('vendor_data.json'):
- os_files.pop(k, None)
- _register_uris(self.VERSION, {}, {}, os_files)
- f = _read_metadata_service()
- self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg'])
- self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
- self.assertFalse(f.get('vendordata'))
-
- @hp.activate
- def test_vendordata_invalid(self):
- os_files = copy.deepcopy(OS_FILES)
- for k in list(os_files.keys()):
- if k.endswith('vendor_data.json'):
- os_files[k] = '{' # some invalid json
- _register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.BrokenMetadata, _read_metadata_service)
-
- @hp.activate
- def test_metadata_invalid(self):
- os_files = copy.deepcopy(OS_FILES)
- for k in list(os_files.keys()):
- if k.endswith('meta_data.json'):
- os_files[k] = '{' # some invalid json
- _register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.BrokenMetadata, _read_metadata_service)
-
- @hp.activate
- def test_datasource(self):
- _register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES)
- ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
- None,
- helpers.Paths({}))
- self.assertIsNone(ds_os.version)
- found = ds_os.get_data(timeout=0.1, retries=0)
- self.assertTrue(found)
- self.assertEqual(2, ds_os.version)
- md = dict(ds_os.metadata)
- md.pop('instance-id', None)
- md.pop('local-hostname', None)
- self.assertEqual(OSTACK_META, md)
- self.assertEqual(EC2_META, ds_os.ec2_metadata)
- self.assertEqual(USER_DATA, ds_os.userdata_raw)
- self.assertEqual(2, len(ds_os.files))
- self.assertEqual(VENDOR_DATA, ds_os.vendordata_pure)
- self.assertEqual(ds_os.vendordata_raw, None)
-
- @hp.activate
- def test_bad_datasource_meta(self):
- os_files = copy.deepcopy(OS_FILES)
- for k in list(os_files.keys()):
- if k.endswith('meta_data.json'):
- os_files[k] = '{' # some invalid json
- _register_uris(self.VERSION, {}, {}, os_files)
- ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
- None,
- helpers.Paths({}))
- self.assertIsNone(ds_os.version)
- found = ds_os.get_data(timeout=0.1, retries=0)
- self.assertFalse(found)
- self.assertIsNone(ds_os.version)
-
- @hp.activate
- def test_no_datasource(self):
- os_files = copy.deepcopy(OS_FILES)
- for k in list(os_files.keys()):
- if k.endswith('meta_data.json'):
- os_files.pop(k)
- _register_uris(self.VERSION, {}, {}, os_files)
- ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
- None,
- helpers.Paths({}))
- ds_os.ds_cfg = {
- 'max_wait': 0,
- 'timeout': 0,
- }
- self.assertIsNone(ds_os.version)
- found = ds_os.get_data(timeout=0.1, retries=0)
- self.assertFalse(found)
- self.assertIsNone(ds_os.version)
-
- @hp.activate
- def test_disabled_datasource(self):
- os_files = copy.deepcopy(OS_FILES)
- os_meta = copy.deepcopy(OSTACK_META)
- os_meta['meta'] = {
- 'dsmode': 'disabled',
- }
- for k in list(os_files.keys()):
- if k.endswith('meta_data.json'):
- os_files[k] = json.dumps(os_meta)
- _register_uris(self.VERSION, {}, {}, os_files)
- ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
- None,
- helpers.Paths({}))
- ds_os.ds_cfg = {
- 'max_wait': 0,
- 'timeout': 0,
- }
- self.assertIsNone(ds_os.version)
- found = ds_os.get_data(timeout=0.1, retries=0)
- self.assertFalse(found)
- self.assertIsNone(ds_os.version)
-
-
-class TestVendorDataLoading(test_helpers.TestCase):
- def cvj(self, data):
- return openstack.convert_vendordata_json(data)
-
- def test_vd_load_none(self):
- # non-existant vendor-data should return none
- self.assertIsNone(self.cvj(None))
-
- def test_vd_load_string(self):
- self.assertEqual(self.cvj("foobar"), "foobar")
-
- def test_vd_load_list(self):
- data = [{'foo': 'bar'}, 'mystring', list(['another', 'list'])]
- self.assertEqual(self.cvj(data), data)
-
- def test_vd_load_dict_no_ci(self):
- self.assertEqual(self.cvj({'foo': 'bar'}), None)
-
- def test_vd_load_dict_ci_dict(self):
- self.assertRaises(ValueError, self.cvj,
- {'foo': 'bar', 'cloud-init': {'x': 1}})
-
- def test_vd_load_dict_ci_string(self):
- data = {'foo': 'bar', 'cloud-init': 'VENDOR_DATA'}
- self.assertEqual(self.cvj(data), data['cloud-init'])
-
- def test_vd_load_dict_ci_list(self):
- data = {'foo': 'bar', 'cloud-init': ['VD_1', 'VD_2']}
- self.assertEqual(self.cvj(data), data['cloud-init'])
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
deleted file mode 100644
index 9c6c8768..00000000
--- a/tests/unittests/test_datasource/test_smartos.py
+++ /dev/null
@@ -1,543 +0,0 @@
-# vi: ts=4 expandtab
-#
-# Copyright (C) 2013 Canonical Ltd.
-#
-# Author: Ben Howard <ben.howard@canonical.com>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3, as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-#
-# This is a testcase for the SmartOS datasource. It replicates a serial
-# console and acts like the SmartOS console does in order to validate
-# return responses.
-#
-
-from __future__ import print_function
-
-from binascii import crc32
-import json
-import os
-import os.path
-import re
-import shutil
-import stat
-import tempfile
-import uuid
-
-from cloudinit import serial
-from cloudinit.sources import DataSourceSmartOS
-
-import six
-
-from cloudinit import helpers as c_helpers
-from cloudinit.util import b64e
-
-from ..helpers import mock, FilesystemMockingTestCase, TestCase
-
-SDC_NICS = json.loads("""
-[
- {
- "nic_tag": "external",
- "primary": true,
- "mtu": 1500,
- "model": "virtio",
- "gateway": "8.12.42.1",
- "netmask": "255.255.255.0",
- "ip": "8.12.42.102",
- "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
- "gateways": [
- "8.12.42.1"
- ],
- "vlan_id": 324,
- "mac": "90:b8:d0:f5:e4:f5",
- "interface": "net0",
- "ips": [
- "8.12.42.102/24"
- ]
- },
- {
- "nic_tag": "sdc_overlay/16187209",
- "gateway": "192.168.128.1",
- "model": "virtio",
- "mac": "90:b8:d0:a5:ff:cd",
- "netmask": "255.255.252.0",
- "ip": "192.168.128.93",
- "network_uuid": "4cad71da-09bc-452b-986d-03562a03a0a9",
- "gateways": [
- "192.168.128.1"
- ],
- "vlan_id": 2,
- "mtu": 8500,
- "interface": "net1",
- "ips": [
- "192.168.128.93/22"
- ]
- }
-]
-""")
-
-MOCK_RETURNS = {
- 'hostname': 'test-host',
- 'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname',
- 'disable_iptables_flag': None,
- 'enable_motd_sys_info': None,
- 'test-var1': 'some data',
- 'cloud-init:user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']),
- 'sdc:datacenter_name': 'somewhere2',
- 'sdc:operator-script': '\n'.join(['bin/true', '']),
- 'sdc:uuid': str(uuid.uuid4()),
- 'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']),
- 'user-data': '\n'.join(['something', '']),
- 'user-script': '\n'.join(['/bin/true', '']),
- 'sdc:nics': json.dumps(SDC_NICS),
-}
-
-DMI_DATA_RETURN = 'smartdc'
-
-
-class PsuedoJoyentClient(object):
- def __init__(self, data=None):
- if data is None:
- data = MOCK_RETURNS.copy()
- self.data = data
- return
-
- def get(self, key, default=None, strip=False):
- if key in self.data:
- r = self.data[key]
- if strip:
- r = r.strip()
- else:
- r = default
- return r
-
- def get_json(self, key, default=None):
- result = self.get(key, default=default)
- if result is None:
- return default
- return json.loads(result)
-
- def exists(self):
- return True
-
-
-class TestSmartOSDataSource(FilesystemMockingTestCase):
- def setUp(self):
- super(TestSmartOSDataSource, self).setUp()
-
- dsmos = 'cloudinit.sources.DataSourceSmartOS'
- patcher = mock.patch(dsmos + ".jmc_client_factory")
- self.jmc_cfact = patcher.start()
- self.addCleanup(patcher.stop)
- patcher = mock.patch(dsmos + ".get_smartos_environ")
- self.get_smartos_environ = patcher.start()
- self.addCleanup(patcher.stop)
-
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = c_helpers.Paths({'cloud_dir': self.tmp})
-
- self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp')
- os.mkdir(self.legacy_user_d)
-
- self.orig_lud = DataSourceSmartOS.LEGACY_USER_D
- DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d
-
- def tearDown(self):
- DataSourceSmartOS.LEGACY_USER_D = self.orig_lud
- super(TestSmartOSDataSource, self).tearDown()
-
- def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
- sys_cfg=None, ds_cfg=None):
- self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)
- self.get_smartos_environ.return_value = mode
-
- if sys_cfg is None:
- sys_cfg = {}
-
- if ds_cfg is not None:
- sys_cfg['datasource'] = sys_cfg.get('datasource', {})
- sys_cfg['datasource']['SmartOS'] = ds_cfg
-
- return DataSourceSmartOS.DataSourceSmartOS(
- sys_cfg, distro=None, paths=self.paths)
-
- def test_no_base64(self):
- ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
- dsrc = self._get_ds(ds_cfg=ds_cfg)
- ret = dsrc.get_data()
- self.assertTrue(ret)
-
- def test_uuid(self):
- dsrc = self._get_ds(mockdata=MOCK_RETURNS)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['sdc:uuid'],
- dsrc.metadata['instance-id'])
-
- def test_root_keys(self):
- dsrc = self._get_ds(mockdata=MOCK_RETURNS)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['root_authorized_keys'],
- dsrc.metadata['public-keys'])
-
- def test_hostname_b64(self):
- dsrc = self._get_ds(mockdata=MOCK_RETURNS)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
-
- def test_hostname(self):
- dsrc = self._get_ds(mockdata=MOCK_RETURNS)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
-
- def test_userdata(self):
- dsrc = self._get_ds(mockdata=MOCK_RETURNS)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['user-data'],
- dsrc.metadata['legacy-user-data'])
- self.assertEqual(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
-
- def test_sdc_nics(self):
- dsrc = self._get_ds(mockdata=MOCK_RETURNS)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(json.loads(MOCK_RETURNS['sdc:nics']),
- dsrc.metadata['network-data'])
-
- def test_sdc_scripts(self):
- dsrc = self._get_ds(mockdata=MOCK_RETURNS)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['user-script'],
- dsrc.metadata['user-script'])
-
- legacy_script_f = "%s/user-script" % self.legacy_user_d
- self.assertTrue(os.path.exists(legacy_script_f))
- self.assertTrue(os.path.islink(legacy_script_f))
- user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
- self.assertEqual(user_script_perm, '700')
-
- def test_scripts_shebanged(self):
- dsrc = self._get_ds(mockdata=MOCK_RETURNS)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['user-script'],
- dsrc.metadata['user-script'])
-
- legacy_script_f = "%s/user-script" % self.legacy_user_d
- self.assertTrue(os.path.exists(legacy_script_f))
- self.assertTrue(os.path.islink(legacy_script_f))
- shebang = None
- with open(legacy_script_f, 'r') as f:
- shebang = f.readlines()[0].strip()
- self.assertEqual(shebang, "#!/bin/bash")
- user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
- self.assertEqual(user_script_perm, '700')
-
- def test_scripts_shebang_not_added(self):
- """
- Test that the SmartOS requirement that plain text scripts
- are executable. This test makes sure that plain texts scripts
- with out file magic have it added appropriately by cloud-init.
- """
-
- my_returns = MOCK_RETURNS.copy()
- my_returns['user-script'] = '\n'.join(['#!/usr/bin/perl',
- 'print("hi")', ''])
-
- dsrc = self._get_ds(mockdata=my_returns)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(my_returns['user-script'],
- dsrc.metadata['user-script'])
-
- legacy_script_f = "%s/user-script" % self.legacy_user_d
- self.assertTrue(os.path.exists(legacy_script_f))
- self.assertTrue(os.path.islink(legacy_script_f))
- shebang = None
- with open(legacy_script_f, 'r') as f:
- shebang = f.readlines()[0].strip()
- self.assertEqual(shebang, "#!/usr/bin/perl")
-
- def test_userdata_removed(self):
- """
- User-data in the SmartOS world is supposed to be written to a file
- each and every boot. This tests to make sure that in the event the
- legacy user-data is removed, the existing user-data is backed-up
- and there is no /var/db/user-data left.
- """
-
- user_data_f = "%s/mdata-user-data" % self.legacy_user_d
- with open(user_data_f, 'w') as f:
- f.write("PREVIOUS")
-
- my_returns = MOCK_RETURNS.copy()
- del my_returns['user-data']
-
- dsrc = self._get_ds(mockdata=my_returns)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertFalse(dsrc.metadata.get('legacy-user-data'))
-
- found_new = False
- for root, _dirs, files in os.walk(self.legacy_user_d):
- for name in files:
- name_f = os.path.join(root, name)
- permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:]
- if re.match(r'.*\/mdata-user-data$', name_f):
- found_new = True
- print(name_f)
- self.assertEqual(permissions, '400')
-
- self.assertFalse(found_new)
-
- def test_vendor_data_not_default(self):
- dsrc = self._get_ds(mockdata=MOCK_RETURNS)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['sdc:vendor-data'],
- dsrc.metadata['vendor-data'])
-
- def test_default_vendor_data(self):
- my_returns = MOCK_RETURNS.copy()
- def_op_script = my_returns['sdc:vendor-data']
- del my_returns['sdc:vendor-data']
- dsrc = self._get_ds(mockdata=my_returns)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertNotEqual(def_op_script, dsrc.metadata['vendor-data'])
-
- # we expect default vendor-data is a boothook
- self.assertTrue(dsrc.vendordata_raw.startswith("#cloud-boothook"))
-
- def test_disable_iptables_flag(self):
- dsrc = self._get_ds(mockdata=MOCK_RETURNS)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['disable_iptables_flag'],
- dsrc.metadata['iptables_disable'])
-
- def test_motd_sys_info(self):
- dsrc = self._get_ds(mockdata=MOCK_RETURNS)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['enable_motd_sys_info'],
- dsrc.metadata['motd_sys_info'])
-
- def test_default_ephemeral(self):
- # Test to make sure that the builtin config has the ephemeral
- # configuration.
- dsrc = self._get_ds()
- cfg = dsrc.get_config_obj()
-
- ret = dsrc.get_data()
- self.assertTrue(ret)
-
- assert 'disk_setup' in cfg
- assert 'fs_setup' in cfg
- self.assertIsInstance(cfg['disk_setup'], dict)
- self.assertIsInstance(cfg['fs_setup'], list)
-
- def test_override_disk_aliases(self):
- # Test to make sure that the built-in DS is overriden
- builtin = DataSourceSmartOS.BUILTIN_DS_CONFIG
-
- mydscfg = {'disk_aliases': {'FOO': '/dev/bar'}}
-
- # expect that these values are in builtin, or this is pointless
- for k in mydscfg:
- self.assertIn(k, builtin)
-
- dsrc = self._get_ds(ds_cfg=mydscfg)
- ret = dsrc.get_data()
- self.assertTrue(ret)
-
- self.assertEqual(mydscfg['disk_aliases']['FOO'],
- dsrc.ds_cfg['disk_aliases']['FOO'])
-
- self.assertEqual(dsrc.device_name_to_device('FOO'),
- mydscfg['disk_aliases']['FOO'])
-
-
-class TestJoyentMetadataClient(FilesystemMockingTestCase):
-
- def setUp(self):
- super(TestJoyentMetadataClient, self).setUp()
-
- self.serial = mock.MagicMock(spec=serial.Serial)
- self.request_id = 0xabcdef12
- self.metadata_value = 'value'
- self.response_parts = {
- 'command': 'SUCCESS',
- 'crc': 'b5a9ff00',
- 'length': 17 + len(b64e(self.metadata_value)),
- 'payload': b64e(self.metadata_value),
- 'request_id': '{0:08x}'.format(self.request_id),
- }
-
- def make_response():
- payloadstr = ''
- if 'payload' in self.response_parts:
- payloadstr = ' {0}'.format(self.response_parts['payload'])
- return ('V2 {length} {crc} {request_id} '
- '{command}{payloadstr}\n'.format(
- payloadstr=payloadstr,
- **self.response_parts).encode('ascii'))
-
- self.metasource_data = None
-
- def read_response(length):
- if not self.metasource_data:
- self.metasource_data = make_response()
- self.metasource_data_len = len(self.metasource_data)
- resp = self.metasource_data[:length]
- self.metasource_data = self.metasource_data[length:]
- return resp
-
- self.serial.read.side_effect = read_response
- self.patched_funcs.enter_context(
- mock.patch('cloudinit.sources.DataSourceSmartOS.random.randint',
- mock.Mock(return_value=self.request_id)))
-
- def _get_client(self):
- return DataSourceSmartOS.JoyentMetadataClient(
- fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM)
-
- def assertEndsWith(self, haystack, prefix):
- self.assertTrue(haystack.endswith(prefix),
- "{0} does not end with '{1}'".format(
- repr(haystack), prefix))
-
- def assertStartsWith(self, haystack, prefix):
- self.assertTrue(haystack.startswith(prefix),
- "{0} does not start with '{1}'".format(
- repr(haystack), prefix))
-
- def test_get_metadata_writes_a_single_line(self):
- client = self._get_client()
- client.get('some_key')
- self.assertEqual(1, self.serial.write.call_count)
- written_line = self.serial.write.call_args[0][0]
- print(type(written_line))
- self.assertEndsWith(written_line.decode('ascii'),
- b'\n'.decode('ascii'))
- self.assertEqual(1, written_line.count(b'\n'))
-
- def _get_written_line(self, key='some_key'):
- client = self._get_client()
- client.get(key)
- return self.serial.write.call_args[0][0]
-
- def test_get_metadata_writes_bytes(self):
- self.assertIsInstance(self._get_written_line(), six.binary_type)
-
- def test_get_metadata_line_starts_with_v2(self):
- foo = self._get_written_line()
- self.assertStartsWith(foo.decode('ascii'), b'V2'.decode('ascii'))
-
- def test_get_metadata_uses_get_command(self):
- parts = self._get_written_line().decode('ascii').strip().split(' ')
- self.assertEqual('GET', parts[4])
-
- def test_get_metadata_base64_encodes_argument(self):
- key = 'my_key'
- parts = self._get_written_line(key).decode('ascii').strip().split(' ')
- self.assertEqual(b64e(key), parts[5])
-
- def test_get_metadata_calculates_length_correctly(self):
- parts = self._get_written_line().decode('ascii').strip().split(' ')
- expected_length = len(' '.join(parts[3:]))
- self.assertEqual(expected_length, int(parts[1]))
-
- def test_get_metadata_uses_appropriate_request_id(self):
- parts = self._get_written_line().decode('ascii').strip().split(' ')
- request_id = parts[3]
- self.assertEqual(8, len(request_id))
- self.assertEqual(request_id, request_id.lower())
-
- def test_get_metadata_uses_random_number_for_request_id(self):
- line = self._get_written_line()
- request_id = line.decode('ascii').strip().split(' ')[3]
- self.assertEqual('{0:08x}'.format(self.request_id), request_id)
-
- def test_get_metadata_checksums_correctly(self):
- parts = self._get_written_line().decode('ascii').strip().split(' ')
- expected_checksum = '{0:08x}'.format(
- crc32(' '.join(parts[3:]).encode('utf-8')) & 0xffffffff)
- checksum = parts[2]
- self.assertEqual(expected_checksum, checksum)
-
- def test_get_metadata_reads_a_line(self):
- client = self._get_client()
- client.get('some_key')
- self.assertEqual(self.metasource_data_len, self.serial.read.call_count)
-
- def test_get_metadata_returns_valid_value(self):
- client = self._get_client()
- value = client.get('some_key')
- self.assertEqual(self.metadata_value, value)
-
- def test_get_metadata_throws_exception_for_incorrect_length(self):
- self.response_parts['length'] = 0
- client = self._get_client()
- self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get, 'some_key')
-
- def test_get_metadata_throws_exception_for_incorrect_crc(self):
- self.response_parts['crc'] = 'deadbeef'
- client = self._get_client()
- self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get, 'some_key')
-
- def test_get_metadata_throws_exception_for_request_id_mismatch(self):
- self.response_parts['request_id'] = 'deadbeef'
- client = self._get_client()
- client._checksum = lambda _: self.response_parts['crc']
- self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get, 'some_key')
-
- def test_get_metadata_returns_None_if_value_not_found(self):
- self.response_parts['payload'] = ''
- self.response_parts['command'] = 'NOTFOUND'
- self.response_parts['length'] = 17
- client = self._get_client()
- client._checksum = lambda _: self.response_parts['crc']
- self.assertIsNone(client.get('some_key'))
-
-
-class TestNetworkConversion(TestCase):
-
- def test_convert_simple(self):
- expected = {
- 'version': 1,
- 'config': [
- {'name': 'net0', 'type': 'physical',
- 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
- 'netmask': '255.255.255.0',
- 'address': '8.12.42.102/24'}],
- 'mtu': 1500, 'mac_address': '90:b8:d0:f5:e4:f5'},
- {'name': 'net1', 'type': 'physical',
- 'subnets': [{'type': 'static', 'gateway': '192.168.128.1',
- 'netmask': '255.255.252.0',
- 'address': '192.168.128.93/22'}],
- 'mtu': 8500, 'mac_address': '90:b8:d0:a5:ff:cd'}]}
- found = DataSourceSmartOS.convert_smartos_network_data(SDC_NICS)
- self.assertEqual(expected, found)