diff options
Diffstat (limited to 'test/units')
67 files changed, 2240 insertions, 3314 deletions
diff --git a/test/units/README.md b/test/units/README.md deleted file mode 100644 index d0b3dd5abd..0000000000 --- a/test/units/README.md +++ /dev/null @@ -1,5 +0,0 @@ -Unit tests -========== - -Tests at code level. Should be concise and to the point, and organized by subject. - diff --git a/test/units/TestConstants.py b/test/units/TestConstants.py deleted file mode 100644 index f3b96e8abc..0000000000 --- a/test/units/TestConstants.py +++ /dev/null @@ -1,64 +0,0 @@ -# -*- coding: utf-8 -*- - -import unittest - -from ansible.constants import get_config -import ConfigParser -import random -import string -import os - - -def random_string(length): - return ''.join(random.choice(string.ascii_uppercase) for x in range(6)) - -p = ConfigParser.ConfigParser() -p.read(os.path.join(os.path.dirname(__file__), 'ansible.cfg')) - -class TestConstants(unittest.TestCase): - - ##################################### - ### get_config unit tests - - - def test_configfile_and_env_both_set(self): - r = random_string(6) - env_var = 'ANSIBLE_TEST_%s' % r - os.environ[env_var] = r - - res = get_config(p, 'defaults', 'test_key', env_var, 'default') - del os.environ[env_var] - - assert res == r - - - def test_configfile_set_env_not_set(self): - r = random_string(6) - env_var = 'ANSIBLE_TEST_%s' % r - assert env_var not in os.environ - - res = get_config(p, 'defaults', 'test_key', env_var, 'default') - - print res - assert res == 'test_value' - - - def test_configfile_not_set_env_set(self): - r = random_string(6) - env_var = 'ANSIBLE_TEST_%s' % r - os.environ[env_var] = r - - res = get_config(p, 'defaults', 'doesnt_exist', env_var, 'default') - del os.environ[env_var] - - assert res == r - - - def test_configfile_not_set_env_not_set(self): - r = random_string(6) - env_var = 'ANSIBLE_TEST_%s' % r - assert env_var not in os.environ - - res = get_config(p, 'defaults', 'doesnt_exist', env_var, 'default') - - assert res == 'default' diff --git a/test/units/TestFilters.py b/test/units/TestFilters.py deleted file mode 100644 index 3c7eb4506e..0000000000 --- a/test/units/TestFilters.py +++ /dev/null @@ -1,191 +0,0 @@ -''' -Test bundled filters -''' - -import os.path -import unittest, tempfile, shutil -from ansible import playbook, inventory, callbacks -import ansible.runner.filter_plugins.core -import ansible.runner.filter_plugins.mathstuff - -INVENTORY = inventory.Inventory(['localhost']) - -BOOK = ''' -- hosts: localhost - vars: - var: { a: [1,2,3] } - tasks: - - template: src=%s dest=%s -''' - -SRC = ''' -- -{{ var|to_json }} -- -{{ var|to_nice_json }} -- -{{ var|to_yaml }} -- -{{ var|to_nice_yaml }} -''' - -DEST = ''' -- -{"a": [1, 2, 3]} -- -{ - "a": [ - 1, - 2, - 3 - ] -} -- -a: [1, 2, 3] - -- -a: -- 1 -- 2 -- 3 -''' - -class TestFilters(unittest.TestCase): - - def setUp(self): - self.tmpdir = tempfile.mkdtemp(dir='/tmp') - - def tearDown(self): - shutil.rmtree(self.tmpdir) - - def temp(self, name, data=''): - '''write a temporary file and return the name''' - name = self.tmpdir + '/' + name - with open(name, 'w') as f: - f.write(data) - return name - - def test_bool_none(self): - a = ansible.runner.filter_plugins.core.bool(None) - assert a == None - - def test_bool_true(self): - a = ansible.runner.filter_plugins.core.bool(True) - assert a == True - - def test_bool_yes(self): - a = ansible.runner.filter_plugins.core.bool('Yes') - assert a == True - - def test_bool_no(self): - a = ansible.runner.filter_plugins.core.bool('Foo') - assert a == False - - def test_quotes(self): - a = ansible.runner.filter_plugins.core.quote('ls | wc -l') - assert a == "'ls | wc -l'" - - def test_fileglob(self): - pathname = os.path.join(os.path.dirname(__file__), '*') - a = ansible.runner.filter_plugins.core.fileglob(pathname) - assert __file__ in a - - def test_regex(self): - a = ansible.runner.filter_plugins.core.regex('ansible', 'ansible', - match_type='findall') - assert a == True - - def test_match_case_sensitive(self): - a = ansible.runner.filter_plugins.core.match('ansible', 'ansible') - assert a == True - - def test_match_case_insensitive(self): - a = ansible.runner.filter_plugins.core.match('ANSIBLE', 'ansible', - True) - assert a == True - - def test_match_no_match(self): - a = ansible.runner.filter_plugins.core.match(' ansible', 'ansible') - assert a == False - - def test_search_case_sensitive(self): - a = ansible.runner.filter_plugins.core.search(' ansible ', 'ansible') - assert a == True - - def test_search_case_insensitive(self): - a = ansible.runner.filter_plugins.core.search(' ANSIBLE ', 'ansible', - True) - assert a == True - - def test_regex_replace_case_sensitive(self): - a = ansible.runner.filter_plugins.core.regex_replace('ansible', '^a.*i(.*)$', - 'a\\1') - assert a == 'able' - - def test_regex_replace_case_insensitive(self): - a = ansible.runner.filter_plugins.core.regex_replace('ansible', '^A.*I(.*)$', - 'a\\1', True) - assert a == 'able' - - def test_regex_replace_no_match(self): - a = ansible.runner.filter_plugins.core.regex_replace('ansible', '^b.*i(.*)$', - 'a\\1') - assert a == 'ansible' - - def test_to_uuid(self): - a = ansible.runner.filter_plugins.core.to_uuid('example.com') - - assert a == 'ae780c3a-a3ab-53c2-bfb4-098da300b3fe' - - #def test_filters(self): - - # this test is pretty low level using a playbook, hence I am disabling it for now -- MPD. - #return - - #src = self.temp('src.j2', SRC) - #dest = self.temp('dest.txt') - #book = self.temp('book', BOOK % (src, dest)) - - #playbook.PlayBook( - # playbook = book, - # inventory = INVENTORY, - # transport = 'local', - # callbacks = callbacks.PlaybookCallbacks(), - # runner_callbacks = callbacks.DefaultRunnerCallbacks(), - # stats = callbacks.AggregateStats(), - #).run() - - #out = open(dest).read() - #self.assertEqual(DEST, out) - - def test_version_compare(self): - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(0, 1.1, 'lt', False)) - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.1, 1.2, '<')) - - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.2, 1.2, '==')) - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.2, 1.2, '=')) - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.2, 1.2, 'eq')) - - - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.3, 1.2, 'gt')) - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.3, 1.2, '>')) - - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.3, 1.2, 'ne')) - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.3, 1.2, '!=')) - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.3, 1.2, '<>')) - - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.1, 1.1, 'ge')) - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.2, 1.1, '>=')) - - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.1, 1.1, 'le')) - self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.0, 1.1, '<=')) - - self.assertTrue(ansible.runner.filter_plugins.core.version_compare('12.04', 12, 'ge')) - - def test_min(self): - a = ansible.runner.filter_plugins.mathstuff.min([3, 2, 5, 4]) - assert a == 2 - - def test_max(self): - a = ansible.runner.filter_plugins.mathstuff.max([3, 2, 5, 4]) - assert a == 5 diff --git a/test/units/TestInventory.py b/test/units/TestInventory.py deleted file mode 100644 index dc3a0ce6d6..0000000000 --- a/test/units/TestInventory.py +++ /dev/null @@ -1,505 +0,0 @@ -import os -import unittest -from nose.tools import raises - -from ansible import errors -from ansible.inventory import Inventory - -class TestInventory(unittest.TestCase): - - def setUp(self): - - self.cwd = os.getcwd() - self.test_dir = os.path.join(self.cwd, 'inventory_test_data') - - self.inventory_file = os.path.join(self.test_dir, 'simple_hosts') - self.large_range_inventory_file = os.path.join(self.test_dir, 'large_range') - self.complex_inventory_file = os.path.join(self.test_dir, 'complex_hosts') - self.inventory_script = os.path.join(self.test_dir, 'inventory_api.py') - self.inventory_dir = os.path.join(self.test_dir, 'inventory_dir') - - os.chmod(self.inventory_script, 0755) - - def tearDown(self): - os.chmod(self.inventory_script, 0644) - - def compare(self, left, right, sort=True): - if sort: - left = sorted(left) - right = sorted(right) - print left - print right - assert left == right - - def empty_inventory(self): - return Inventory(None) - - def simple_inventory(self): - return Inventory(self.inventory_file) - - def large_range_inventory(self): - return Inventory(self.large_range_inventory_file) - - def script_inventory(self): - return Inventory(self.inventory_script) - - def complex_inventory(self): - return Inventory(self.complex_inventory_file) - - def dir_inventory(self): - return Inventory(self.inventory_dir) - - all_simple_hosts=['jupiter', 'saturn', 'zeus', 'hera', - 'cerberus001','cerberus002','cerberus003', - 'cottus99', 'cottus100', - 'poseidon', 'thor', 'odin', 'loki', - 'thrudgelmir0', 'thrudgelmir1', 'thrudgelmir2', - 'thrudgelmir3', 'thrudgelmir4', 'thrudgelmir5', - 'Hotep-a', 'Hotep-b', 'Hotep-c', - 'BastC', 'BastD', 'neptun', ] - - ##################################### - ### Empty inventory format tests - - def test_empty(self): - inventory = self.empty_inventory() - hosts = inventory.list_hosts() - self.assertEqual(hosts, []) - - ##################################### - ### Simple inventory format tests - - def test_simple(self): - inventory = self.simple_inventory() - hosts = inventory.list_hosts() - self.assertEqual(sorted(hosts), sorted(self.all_simple_hosts)) - - def test_simple_all(self): - inventory = self.simple_inventory() - hosts = inventory.list_hosts('all') - self.assertEqual(sorted(hosts), sorted(self.all_simple_hosts)) - - def test_get_hosts(self): - inventory = Inventory('127.0.0.1,192.168.1.1') - hosts = inventory.get_hosts('!10.0.0.1') - hosts_all = inventory.get_hosts('all') - self.assertEqual(sorted(hosts), sorted(hosts_all)) - - def test_no_src(self): - inventory = Inventory('127.0.0.1,') - self.assertEqual(inventory.src(), None) - - def test_simple_norse(self): - inventory = self.simple_inventory() - hosts = inventory.list_hosts("norse") - - expected_hosts=['thor', 'odin', 'loki'] - assert sorted(hosts) == sorted(expected_hosts) - - def test_simple_ungrouped(self): - inventory = self.simple_inventory() - hosts = inventory.list_hosts("ungrouped") - - expected_hosts=['jupiter', 'saturn', - 'thrudgelmir0', 'thrudgelmir1', 'thrudgelmir2', - 'thrudgelmir3', 'thrudgelmir4', 'thrudgelmir5'] - assert sorted(hosts) == sorted(expected_hosts) - - def test_simple_combined(self): - inventory = self.simple_inventory() - hosts = inventory.list_hosts("norse:greek") - - expected_hosts=['zeus', 'hera', 'poseidon', - 'cerberus001','cerberus002','cerberus003', - 'cottus99','cottus100', - 'thor', 'odin', 'loki'] - assert sorted(hosts) == sorted(expected_hosts) - - def test_simple_restrict(self): - inventory = self.simple_inventory() - - restricted_hosts = ['hera', 'poseidon', 'thor'] - expected_hosts=['zeus', 'hera', 'poseidon', - 'cerberus001','cerberus002','cerberus003', - 'cottus99', 'cottus100', - 'thor', 'odin', 'loki'] - - inventory.restrict_to(restricted_hosts) - hosts = inventory.list_hosts("norse:greek") - - assert sorted(hosts) == sorted(restricted_hosts) - - inventory.lift_restriction() - hosts = inventory.list_hosts("norse:greek") - - assert sorted(hosts) == sorted(expected_hosts) - - def test_simple_string_ipv4(self): - inventory = Inventory('127.0.0.1,192.168.1.1') - hosts = inventory.list_hosts() - self.assertEqual(sorted(hosts), sorted(['127.0.0.1','192.168.1.1'])) - - def test_simple_string_ipv4_port(self): - inventory = Inventory('127.0.0.1:2222,192.168.1.1') - hosts = inventory.list_hosts() - self.assertEqual(sorted(hosts), sorted(['127.0.0.1','192.168.1.1'])) - - def test_simple_string_ipv4_vars(self): - inventory = Inventory('127.0.0.1:2222,192.168.1.1') - var = inventory.get_variables('127.0.0.1') - self.assertEqual(var['ansible_ssh_port'], 2222) - - def test_simple_string_ipv6(self): - inventory = Inventory('FE80:EF45::12:1,192.168.1.1') - hosts = inventory.list_hosts() - self.assertEqual(sorted(hosts), sorted(['FE80:EF45::12:1','192.168.1.1'])) - - def test_simple_string_ipv6_port(self): - inventory = Inventory('[FE80:EF45::12:1]:2222,192.168.1.1') - hosts = inventory.list_hosts() - self.assertEqual(sorted(hosts), sorted(['FE80:EF45::12:1','192.168.1.1'])) - - def test_simple_string_ipv6_vars(self): - inventory = Inventory('[FE80:EF45::12:1]:2222,192.168.1.1') - var = inventory.get_variables('FE80:EF45::12:1') - self.assertEqual(var['ansible_ssh_port'], 2222) - - def test_simple_string_fqdn(self): - inventory = Inventory('foo.example.com,bar.example.com') - hosts = inventory.list_hosts() - self.assertEqual(sorted(hosts), sorted(['foo.example.com','bar.example.com'])) - - def test_simple_string_fqdn_port(self): - inventory = Inventory('foo.example.com:2222,bar.example.com') - hosts = inventory.list_hosts() - self.assertEqual(sorted(hosts), sorted(['foo.example.com','bar.example.com'])) - - def test_simple_string_fqdn_vars(self): - inventory = Inventory('foo.example.com:2222,bar.example.com') - var = inventory.get_variables('foo.example.com') - self.assertEqual(var['ansible_ssh_port'], 2222) - - def test_simple_vars(self): - inventory = self.simple_inventory() - vars = inventory.get_variables('thor') - - assert vars == {'group_names': ['norse'], - 'inventory_hostname': 'thor', - 'inventory_hostname_short': 'thor'} - - def test_simple_port(self): - inventory = self.simple_inventory() - vars = inventory.get_variables('hera') - - expected = { 'ansible_ssh_port': 3000, - 'group_names': ['greek'], - 'inventory_hostname': 'hera', - 'inventory_hostname_short': 'hera' } - assert vars == expected - - def test_large_range(self): - inventory = self.large_range_inventory() - hosts = inventory.list_hosts() - self.assertEqual(sorted(hosts), sorted('bob%03i' %i for i in range(0, 143))) - - def test_subset(self): - inventory = self.simple_inventory() - inventory.subset('odin;thor,loki') - self.assertEqual(sorted(inventory.list_hosts()), sorted(['thor','odin','loki'])) - - def test_subset_range(self): - inventory = self.simple_inventory() - inventory.subset('greek[0-2];norse[0]') - self.assertEqual(sorted(inventory.list_hosts()), sorted(['zeus','hera','thor'])) - - def test_subet_range_empty_group(self): - inventory = self.simple_inventory() - inventory.subset('missing[0]') - self.assertEqual(sorted(inventory.list_hosts()), sorted([])) - - def test_subset_filename(self): - inventory = self.simple_inventory() - inventory.subset('@' + os.path.join(self.test_dir, 'restrict_pattern')) - self.assertEqual(sorted(inventory.list_hosts()), sorted(['thor','odin'])) - - @raises(errors.AnsibleError) - def testinvalid_entry(self): - Inventory('1234') - - ################################################### - ### INI file advanced tests - - def test_complex_vars(self): - inventory = self.complex_inventory() - - vars = inventory.get_variables('rtp_a') - print vars - - expected = dict( - a=1, b=2, c=3, d=10002, e=10003, f='10004 != 10005', - g=' g ', h=' h ', i="' i \"", j='" j', - k=[ 'k1', 'k2' ], - rga=1, rgb=2, rgc=3, - inventory_hostname='rtp_a', inventory_hostname_short='rtp_a', - group_names=[ 'eastcoast', 'nc', 'redundantgroup', 'redundantgroup2', 'redundantgroup3', 'rtp', 'us' ] - ) - print vars - print expected - assert vars == expected - - def test_complex_group_names(self): - inventory = self.complex_inventory() - tests = { - 'host1': [ 'role1', 'role3' ], - 'host2': [ 'role1', 'role2' ], - 'host3': [ 'role2', 'role3' ] - } - for host, roles in tests.iteritems(): - group_names = inventory.get_variables(host)['group_names'] - assert sorted(group_names) == sorted(roles) - - def test_complex_exclude(self): - inventory = self.complex_inventory() - hosts = inventory.list_hosts("nc:florida:!triangle:!orlando") - expected_hosts = ['miami', 'rtp_a', 'rtp_b', 'rtp_c'] - print "HOSTS=%s" % sorted(hosts) - print "EXPECTED=%s" % sorted(expected_hosts) - assert sorted(hosts) == sorted(expected_hosts) - - def test_regex_exclude(self): - inventory = self.complex_inventory() - hosts = inventory.list_hosts("~rtp_[ac]") - expected_hosts = ['rtp_a', 'rtp_c'] - print "HOSTS=%s" % sorted(hosts) - print "EXPECTED=%s" % sorted(expected_hosts) - assert sorted(hosts) == sorted(expected_hosts) - - def test_regex_grouping(self): - inventory = self.simple_inventory() - hosts = inventory.list_hosts("~(cer[a-z]|berc)(erus00[13])") - expected_hosts = ['cerberus001', 'cerberus003'] - print "HOSTS=%s" % sorted(hosts) - print "EXPECTED=%s" % sorted(expected_hosts) - assert sorted(hosts) == sorted(expected_hosts) - - def test_complex_enumeration(self): - - - expected1 = ['rtp_b'] - expected2 = ['rtp_a', 'rtp_b'] - expected3 = ['rtp_a', 'rtp_b', 'rtp_c', 'tri_a', 'tri_b', 'tri_c'] - expected4 = ['rtp_b', 'orlando' ] - expected5 = ['blade-a-1'] - - inventory = self.complex_inventory() - hosts = inventory.list_hosts("nc[1]") - self.compare(hosts, expected1, sort=False) - hosts = inventory.list_hosts("nc[0-2]") - self.compare(hosts, expected2, sort=False) - hosts = inventory.list_hosts("nc[0-99999]") - self.compare(hosts, expected3, sort=False) - hosts = inventory.list_hosts("nc[1-2]:florida[0-1]") - self.compare(hosts, expected4, sort=False) - hosts = inventory.list_hosts("blade-a-1") - self.compare(hosts, expected5, sort=False) - - def test_complex_intersect(self): - inventory = self.complex_inventory() - hosts = inventory.list_hosts("nc:&redundantgroup:!rtp_c") - self.compare(hosts, ['rtp_a']) - hosts = inventory.list_hosts("nc:&triangle:!tri_c") - self.compare(hosts, ['tri_a', 'tri_b']) - - @raises(errors.AnsibleError) - def test_invalid_range(self): - Inventory(os.path.join(self.test_dir, 'inventory','test_incorrect_range')) - - @raises(errors.AnsibleError) - def test_missing_end(self): - Inventory(os.path.join(self.test_dir, 'inventory','test_missing_end')) - - @raises(errors.AnsibleError) - def test_incorrect_format(self): - Inventory(os.path.join(self.test_dir, 'inventory','test_incorrect_format')) - - @raises(errors.AnsibleError) - def test_alpha_end_before_beg(self): - Inventory(os.path.join(self.test_dir, 'inventory','test_alpha_end_before_beg')) - - def test_combined_range(self): - i = Inventory(os.path.join(self.test_dir, 'inventory','test_combined_range')) - hosts = i.list_hosts('test') - expected_hosts=['host1A','host2A','host1B','host2B'] - assert sorted(hosts) == sorted(expected_hosts) - - def test_leading_range(self): - i = Inventory(os.path.join(self.test_dir, 'inventory','test_leading_range')) - hosts = i.list_hosts('test') - expected_hosts=['1.host','2.host','A.host','B.host'] - assert sorted(hosts) == sorted(expected_hosts) - - hosts2 = i.list_hosts('test2') - expected_hosts2=['1.host','2.host','3.host'] - assert sorted(hosts2) == sorted(expected_hosts2) - - ################################################### - ### Inventory API tests - - def test_script(self): - inventory = self.script_inventory() - hosts = inventory.list_hosts() - - expected_hosts=['jupiter', 'saturn', 'zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki'] - - print "Expected: %s"%(expected_hosts) - print "Got : %s"%(hosts) - assert sorted(hosts) == sorted(expected_hosts) - - def test_script_all(self): - inventory = self.script_inventory() - hosts = inventory.list_hosts('all') - - expected_hosts=['jupiter', 'saturn', 'zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki'] - assert sorted(hosts) == sorted(expected_hosts) - - def test_script_norse(self): - inventory = self.script_inventory() - hosts = inventory.list_hosts("norse") - - expected_hosts=['thor', 'odin', 'loki'] - assert sorted(hosts) == sorted(expected_hosts) - - def test_script_combined(self): - inventory = self.script_inventory() - hosts = inventory.list_hosts("norse:greek") - - expected_hosts=['zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki'] - assert sorted(hosts) == sorted(expected_hosts) - - def test_script_restrict(self): - inventory = self.script_inventory() - - restricted_hosts = ['hera', 'poseidon', 'thor'] - expected_hosts=['zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki'] - - inventory.restrict_to(restricted_hosts) - hosts = inventory.list_hosts("norse:greek") - - assert sorted(hosts) == sorted(restricted_hosts) - - inventory.lift_restriction() - hosts = inventory.list_hosts("norse:greek") - - assert sorted(hosts) == sorted(expected_hosts) - - def test_script_vars(self): - inventory = self.script_inventory() - vars = inventory.get_variables('thor') - - print "VARS=%s" % vars - - assert vars == {'hammer':True, - 'group_names': ['norse'], - 'inventory_hostname': 'thor', - 'inventory_hostname_short': 'thor'} - - def test_hosts_list(self): - # Test the case when playbook 'hosts' var is a list. - inventory = self.script_inventory() - host_names = sorted(['thor', 'loki', 'odin']) # Not sure if sorting is in the contract or not - actual_hosts = inventory.get_hosts(host_names) - actual_host_names = [host.name for host in actual_hosts] - assert host_names == actual_host_names - - def test_script_multiple_groups(self): - inventory = self.script_inventory() - vars = inventory.get_variables('zeus') - - print "VARS=%s" % vars - - assert vars == {'inventory_hostname': 'zeus', - 'inventory_hostname_short': 'zeus', - 'group_names': ['greek', 'major-god']} - - def test_allows_equals_sign_in_var(self): - inventory = self.simple_inventory() - auth = inventory.get_variables('neptun')['auth'] - assert auth == 'YWRtaW46YWRtaW4=' - - def test_dir_inventory(self): - inventory = self.dir_inventory() - - host_vars = inventory.get_variables('zeus') - - expected_vars = {'inventory_hostname': 'zeus', - 'inventory_hostname_short': 'zeus', - 'group_names': ['greek', 'major-god'], - 'var_a': '3#4'} - - print "HOST VARS=%s" % host_vars - print "EXPECTED VARS=%s" % expected_vars - - assert host_vars == expected_vars - - def test_dir_inventory_multiple_groups(self): - inventory = self.dir_inventory() - group_greek = inventory.get_hosts('greek') - actual_host_names = [host.name for host in group_greek] - print "greek : %s " % actual_host_names - assert actual_host_names == ['zeus', 'morpheus'] - - def test_dir_inventory_skip_extension(self): - inventory = self.dir_inventory() - assert 'skipme' not in [h.name for h in inventory.get_hosts()] - - def test_dir_inventory_group_hosts(self): - inventory = self.dir_inventory() - expected_groups = {'all': ['morpheus', 'thor', 'zeus'], - 'major-god': ['thor', 'zeus'], - 'minor-god': ['morpheus'], - 'norse': ['thor'], - 'greek': ['morpheus', 'zeus'], - 'ungrouped': []} - - actual_groups = {} - for group in inventory.get_groups(): - actual_groups[group.name] = sorted([h.name for h in group.get_hosts()]) - print "INVENTORY groups[%s].hosts=%s" % (group.name, actual_groups[group.name]) - print "EXPECTED groups[%s].hosts=%s" % (group.name, expected_groups[group.name]) - - assert actual_groups == expected_groups - - def test_dir_inventory_groups_for_host(self): - inventory = self.dir_inventory() - expected_groups_for_host = {'morpheus': ['all', 'greek', 'minor-god'], - 'thor': ['all', 'major-god', 'norse'], - 'zeus': ['all', 'greek', 'major-god']} - - actual_groups_for_host = {} - for (host, expected) in expected_groups_for_host.iteritems(): - groups = inventory.groups_for_host(host) - names = sorted([g.name for g in groups]) - actual_groups_for_host[host] = names - print "INVENTORY groups_for_host(%s)=%s" % (host, names) - print "EXPECTED groups_for_host(%s)=%s" % (host, expected) - - assert actual_groups_for_host == expected_groups_for_host - - def test_dir_inventory_groups_list(self): - inventory = self.dir_inventory() - inventory_groups = inventory.groups_list() - - expected_groups = {'all': ['morpheus', 'thor', 'zeus'], - 'major-god': ['thor', 'zeus'], - 'minor-god': ['morpheus'], - 'norse': ['thor'], - 'greek': ['morpheus', 'zeus'], - 'ungrouped': []} - - for (name, expected_hosts) in expected_groups.iteritems(): - inventory_groups[name] = sorted(inventory_groups.get(name, [])) - print "INVENTORY groups_list['%s']=%s" % (name, inventory_groups[name]) - print "EXPECTED groups_list['%s']=%s" % (name, expected_hosts) - - assert inventory_groups == expected_groups - diff --git a/test/units/TestModuleUtilsBasic.py b/test/units/TestModuleUtilsBasic.py deleted file mode 100644 index 5b8be28307..0000000000 --- a/test/units/TestModuleUtilsBasic.py +++ /dev/null @@ -1,334 +0,0 @@ -import os -import tempfile - -import unittest -from nose.tools import raises -from nose.tools import timed - -from ansible import errors -from ansible.module_common import ModuleReplacer -from ansible.module_utils.basic import heuristic_log_sanitize -from ansible.utils import checksum as utils_checksum - -TEST_MODULE_DATA = """ -from ansible.module_utils.basic import * - -def get_module(): - return AnsibleModule( - argument_spec = dict(), - supports_check_mode = True, - no_log = True, - ) - -get_module() - -""" - -class TestModuleUtilsBasic(unittest.TestCase): - - def cleanup_temp_file(self, fd, path): - try: - os.close(fd) - os.remove(path) - except: - pass - - def cleanup_temp_dir(self, path): - try: - os.rmdir(path) - except: - pass - - def setUp(self): - # create a temporary file for the test module - # we're about to generate - self.tmp_fd, self.tmp_path = tempfile.mkstemp() - os.write(self.tmp_fd, TEST_MODULE_DATA) - - # template the module code and eval it - module_data, module_style, shebang = ModuleReplacer().modify_module(self.tmp_path, {}, "", {}) - - d = {} - exec(module_data, d, d) - self.module = d['get_module']() - - # module_utils/basic.py screws with CWD, let's save it and reset - self.cwd = os.getcwd() - - def tearDown(self): - self.cleanup_temp_file(self.tmp_fd, self.tmp_path) - # Reset CWD back to what it was before basic.py changed it - os.chdir(self.cwd) - - ################################################################################# - # run_command() tests - - # test run_command with a string command - def test_run_command_string(self): - (rc, out, err) = self.module.run_command("/bin/echo -n 'foo bar'") - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar') - (rc, out, err) = self.module.run_command("/bin/echo -n 'foo bar'", use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar') - - # test run_command with an array of args (with both use_unsafe_shell=True|False) - def test_run_command_args(self): - (rc, out, err) = self.module.run_command(['/bin/echo', '-n', "foo bar"]) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar') - (rc, out, err) = self.module.run_command(['/bin/echo', '-n', "foo bar"], use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar') - - # test run_command with leading environment variables - @raises(SystemExit) - def test_run_command_string_with_env_variables(self): - self.module.run_command('FOO=bar /bin/echo -n "foo bar"') - - @raises(SystemExit) - def test_run_command_args_with_env_variables(self): - self.module.run_command(['FOO=bar', '/bin/echo', '-n', 'foo bar']) - - def test_run_command_string_unsafe_with_env_variables(self): - (rc, out, err) = self.module.run_command('FOO=bar /bin/echo -n "foo bar"', use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar') - - # test run_command with a command pipe (with both use_unsafe_shell=True|False) - def test_run_command_string_unsafe_with_pipe(self): - (rc, out, err) = self.module.run_command('echo "foo bar" | cat', use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar\n') - - # test run_command with a shell redirect in (with both use_unsafe_shell=True|False) - def test_run_command_string_unsafe_with_redirect_in(self): - (rc, out, err) = self.module.run_command('cat << EOF\nfoo bar\nEOF', use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar\n') - - # test run_command with a shell redirect out (with both use_unsafe_shell=True|False) - def test_run_command_string_unsafe_with_redirect_out(self): - tmp_fd, tmp_path = tempfile.mkstemp() - try: - (rc, out, err) = self.module.run_command('echo "foo bar" > %s' % tmp_path, use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertTrue(os.path.exists(tmp_path)) - checksum = utils_checksum(tmp_path) - self.assertEqual(checksum, 'd53a205a336e07cf9eac45471b3870f9489288ec') - except: - raise - finally: - self.cleanup_temp_file(tmp_fd, tmp_path) - - # test run_command with a double shell redirect out (append) (with both use_unsafe_shell=True|False) - def test_run_command_string_unsafe_with_double_redirect_out(self): - tmp_fd, tmp_path = tempfile.mkstemp() - try: - (rc, out, err) = self.module.run_command('echo "foo bar" >> %s' % tmp_path, use_unsafe_shell=True) - self.assertEqual(rc, 0) - self.assertTrue(os.path.exists(tmp_path)) - checksum = utils_checksum(tmp_path) - self.assertEqual(checksum, 'd53a205a336e07cf9eac45471b3870f9489288ec') - except: - raise - finally: - self.cleanup_temp_file(tmp_fd, tmp_path) - - # test run_command with data - def test_run_command_string_with_data(self): - (rc, out, err) = self.module.run_command('cat', data='foo bar') - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo bar\n') - - # test run_command with binary data - def test_run_command_string_with_binary_data(self): - (rc, out, err) = self.module.run_command('cat', data='\x41\x42\x43\x44', binary_data=True) - self.assertEqual(rc, 0) - self.assertEqual(out, 'ABCD') - - # test run_command with a cwd set - def test_run_command_string_with_cwd(self): - tmp_path = tempfile.mkdtemp() - try: - (rc, out, err) = self.module.run_command('pwd', cwd=tmp_path) - self.assertEqual(rc, 0) - self.assertTrue(os.path.exists(tmp_path)) - self.assertEqual(out.strip(), os.path.realpath(tmp_path)) - except: - raise - finally: - self.cleanup_temp_dir(tmp_path) - - -class TestModuleUtilsBasicHelpers(unittest.TestCase): - ''' Test some implementation details of AnsibleModule - - Some pieces of AnsibleModule are implementation details but they have - potential cornercases that we need to check. Go ahead and test at - this level that the functions are behaving even though their API may - change and we'd have to rewrite these tests so that we know that we - need to check for those problems in any rewrite. - - In the future we might want to restructure higher level code to be - friendlier to unittests so that we can test at the level that the public - is interacting with the APIs. - ''' - - MANY_RECORDS = 7000 - URL_SECRET = 'http://username:pas:word@foo.com/data' - SSH_SECRET = 'username:pas:word@foo.com/data' - - def cleanup_temp_file(self, fd, path): - try: - os.close(fd) - os.remove(path) - except: - pass - - def cleanup_temp_dir(self, path): - try: - os.rmdir(path) - except: - pass - - def _gen_data(self, records, per_rec, top_level, secret_text): - hostvars = {'hostvars': {}} - for i in range(1, records, 1): - host_facts = {'host%s' % i: - {'pstack': - {'running': '875.1', - 'symlinked': '880.0', - 'tars': [], - 'versions': ['885.0']}, - }} - - if per_rec: - host_facts['host%s' % i]['secret'] = secret_text - hostvars['hostvars'].update(host_facts) - if top_level: - hostvars['secret'] = secret_text - return hostvars - - def setUp(self): - self.many_url = repr(self._gen_data(self.MANY_RECORDS, True, True, - self.URL_SECRET)) - self.many_ssh = repr(self._gen_data(self.MANY_RECORDS, True, True, - self.SSH_SECRET)) - self.one_url = repr(self._gen_data(self.MANY_RECORDS, False, True, - self.URL_SECRET)) - self.one_ssh = repr(self._gen_data(self.MANY_RECORDS, False, True, - self.SSH_SECRET)) - self.zero_secrets = repr(self._gen_data(self.MANY_RECORDS, False, - False, '')) - self.few_url = repr(self._gen_data(2, True, True, self.URL_SECRET)) - self.few_ssh = repr(self._gen_data(2, True, True, self.SSH_SECRET)) - - # create a temporary file for the test module - # we're about to generate - self.tmp_fd, self.tmp_path = tempfile.mkstemp() - os.write(self.tmp_fd, TEST_MODULE_DATA) - - # template the module code and eval it - module_data, module_style, shebang = ModuleReplacer().modify_module(self.tmp_path, {}, "", {}) - - d = {} - exec(module_data, d, d) - self.module = d['get_module']() - - # module_utils/basic.py screws with CWD, let's save it and reset - self.cwd = os.getcwd() - - def tearDown(self): - self.cleanup_temp_file(self.tmp_fd, self.tmp_path) - # Reset CWD back to what it was before basic.py changed it - os.chdir(self.cwd) - - - ################################################################################# - - # - # Speed tests - # - - # Previously, we used regexes which had some pathologically slow cases for - # parameters with large amounts of data with many ':' but no '@'. The - # present function gets slower when there are many replacements so we may - # want to explore regexes in the future (for the speed when substituting - # or flexibility). These speed tests will hopefully tell us if we're - # introducing code that has cases that are simply too slow. - # - # Some regex notes: - # * re.sub() is faster than re.match() + str.join(). - # * We may be able to detect a large number of '@' symbols and then use - # a regex else use the present function. - - @timed(5) - def test_log_sanitize_speed_many_url(self): - heuristic_log_sanitize(self.many_url) - - @timed(5) - def test_log_sanitize_speed_many_ssh(self): - heuristic_log_sanitize(self.many_ssh) - - @timed(5) - def test_log_sanitize_speed_one_url(self): - heuristic_log_sanitize(self.one_url) - - @timed(5) - def test_log_sanitize_speed_one_ssh(self): - heuristic_log_sanitize(self.one_ssh) - - @timed(5) - def test_log_sanitize_speed_zero_secrets(self): - heuristic_log_sanitize(self.zero_secrets) - - # - # Test that the password obfuscation sanitizes somewhat cleanly. - # - - def test_log_sanitize_correctness(self): - url_data = repr(self._gen_data(3, True, True, self.URL_SECRET)) - ssh_data = repr(self._gen_data(3, True, True, self.SSH_SECRET)) - - url_output = heuristic_log_sanitize(url_data) - ssh_output = heuristic_log_sanitize(ssh_data) - - # Basic functionality: Successfully hid the password - try: - self.assertNotIn('pas:word', url_output) - self.assertNotIn('pas:word', ssh_output) - - # Slightly more advanced, we hid all of the password despite the ":" - self.assertNotIn('pas', url_output) - self.assertNotIn('pas', ssh_output) - except AttributeError: - # python2.6 or less's unittest - self.assertFalse('pas:word' in url_output, '%s is present in %s' % ('"pas:word"', url_output)) - self.assertFalse('pas:word' in ssh_output, '%s is present in %s' % ('"pas:word"', ssh_output)) - - self.assertFalse('pas' in url_output, '%s is present in %s' % ('"pas"', url_output)) - self.assertFalse('pas' in ssh_output, '%s is present in %s' % ('"pas"', ssh_output)) - - # In this implementation we replace the password with 8 "*" which is - # also the length of our password. The url fields should be able to - # accurately detect where the password ends so the length should be - # the same: - self.assertEqual(len(url_output), len(url_data)) - - # ssh checking is harder as the heuristic is overzealous in many - # cases. Since the input will have at least one ":" present before - # the password we can tell some things about the beginning and end of - # the data, though: - self.assertTrue(ssh_output.startswith("{'")) - self.assertTrue(ssh_output.endswith("}")) - try: - self.assertIn(":********@foo.com/data'", ssh_output) - except AttributeError: - # python2.6 or less's unittest - self.assertTrue(":********@foo.com/data'" in ssh_output, '%s is not present in %s' % (":********@foo.com/data'", ssh_output)) - - # The overzealous-ness here may lead to us changing the algorithm in - # the future. We could make it consume less of the data (with the - # possibility of leaving partial passwords exposed) and encourage - # people to use no_log instead of relying on this obfuscation. diff --git a/test/units/TestModuleUtilsDatabase.py b/test/units/TestModuleUtilsDatabase.py deleted file mode 100644 index 67da0b60e0..0000000000 --- a/test/units/TestModuleUtilsDatabase.py +++ /dev/null @@ -1,118 +0,0 @@ -import collections -import mock -import os -import re - -from nose.tools import eq_ -try: - from nose.tools import assert_raises_regexp -except ImportError: - # Python < 2.7 - def assert_raises_regexp(expected, regexp, callable, *a, **kw): - try: - callable(*a, **kw) - except expected as e: - if isinstance(regexp, basestring): - regexp = re.compile(regexp) - if not regexp.search(str(e)): - raise Exception('"%s" does not match "%s"' % - (regexp.pattern, str(e))) - else: - if hasattr(expected,'__name__'): excName = expected.__name__ - else: excName = str(expected) - raise AssertionError("%s not raised" % excName) - -from ansible.module_utils.database import ( - pg_quote_identifier, - SQLParseError, -) - - -# Note: Using nose's generator test cases here so we can't inherit from -# unittest.TestCase -class TestQuotePgIdentifier(object): - - # These are all valid strings - # The results are based on interpreting the identifier as a table name - valid = { - # User quoted - '"public.table"': '"public.table"', - '"public"."table"': '"public"."table"', - '"schema test"."table test"': '"schema test"."table test"', - - # We quote part - 'public.table': '"public"."table"', - '"public".table': '"public"."table"', - 'public."table"': '"public"."table"', - 'schema test.table test': '"schema test"."table test"', - '"schema test".table test': '"schema test"."table test"', - 'schema test."table test"': '"schema test"."table test"', - - # Embedded double quotes - 'table "test"': '"table ""test"""', - 'public."table ""test"""': '"public"."table ""test"""', - 'public.table "test"': '"public"."table ""test"""', - 'schema "test".table': '"schema ""test"""."table"', - '"schema ""test""".table': '"schema ""test"""."table"', - '"""wat"""."""test"""': '"""wat"""."""test"""', - # Sigh, handle these as well: - '"no end quote': '"""no end quote"', - 'schema."table': '"schema"."""table"', - '"schema.table': '"""schema"."table"', - 'schema."table.something': '"schema"."""table"."something"', - - # Embedded dots - '"schema.test"."table.test"': '"schema.test"."table.test"', - '"schema.".table': '"schema."."table"', - '"schema."."table"': '"schema."."table"', - 'schema.".table"': '"schema".".table"', - '"schema".".table"': '"schema".".table"', - '"schema.".".table"': '"schema.".".table"', - # These are valid but maybe not what the user intended - '."table"': '".""table"""', - 'table.': '"table."', - } - - invalid = { - ('test.too.many.dots', 'table'): 'PostgreSQL does not support table with more than 3 dots', - ('"test.too".many.dots', 'database'): 'PostgreSQL does not support database with more than 1 dots', - ('test.too."many.dots"', 'database'): 'PostgreSQL does not support database with more than 1 dots', - ('"test"."too"."many"."dots"', 'database'): "PostgreSQL does not support database with more than 1 dots", - ('"test"."too"."many"."dots"', 'schema'): "PostgreSQL does not support schema with more than 2 dots", - ('"test"."too"."many"."dots"', 'table'): "PostgreSQL does not support table with more than 3 dots", - ('"test"."too"."many"."dots"."for"."column"', 'column'): "PostgreSQL does not support column with more than 4 dots", - ('"table "invalid" double quote"', 'table'): 'User escaped identifiers must escape extra quotes', - ('"schema "invalid"""."table "invalid"', 'table'): 'User escaped identifiers must escape extra quotes', - ('"schema."table"','table'): 'User escaped identifiers must escape extra quotes', - ('"schema".', 'table'): 'Identifier name unspecified or unquoted trailing dot', - } - - def check_valid_quotes(self, identifier, quoted_identifier): - eq_(pg_quote_identifier(identifier, 'table'), quoted_identifier) - - def test_valid_quotes(self): - for identifier in self.valid: - yield self.check_valid_quotes, identifier, self.valid[identifier] - - def check_invalid_quotes(self, identifier, id_type, msg): - assert_raises_regexp(SQLParseError, msg, pg_quote_identifier, *(identifier, id_type)) - - def test_invalid_quotes(self): - for test in self.invalid: - yield self.check_invalid_quotes, test[0], test[1], self.invalid[test] - - def test_how_many_dots(self): - eq_(pg_quote_identifier('role', 'role'), '"role"') - assert_raises_regexp(SQLParseError, "PostgreSQL does not support role with more than 1 dots", pg_quote_identifier, *('role.more', 'role')) - - eq_(pg_quote_identifier('db', 'database'), '"db"') - assert_raises_regexp(SQLParseError, "PostgreSQL does not support database with more than 1 dots", pg_quote_identifier, *('db.more', 'database')) - - eq_(pg_quote_identifier('db.schema', 'schema'), '"db"."schema"') - assert_raises_regexp(SQLParseError, "PostgreSQL does not support schema with more than 2 dots", pg_quote_identifier, *('db.schema.more', 'schema')) - - eq_(pg_quote_identifier('db.schema.table', 'table'), '"db"."schema"."table"') - assert_raises_regexp(SQLParseError, "PostgreSQL does not support table with more than 3 dots", pg_quote_identifier, *('db.schema.table.more', 'table')) - - eq_(pg_quote_identifier('db.schema.table.column', 'column'), '"db"."schema"."table"."column"') - assert_raises_regexp(SQLParseError, "PostgreSQL does not support column with more than 4 dots", pg_quote_identifier, *('db.schema.table.column.more', 'column')) diff --git a/test/units/TestModules.py b/test/units/TestModules.py deleted file mode 100644 index aef2e83ed6..0000000000 --- a/test/units/TestModules.py +++ /dev/null @@ -1,32 +0,0 @@ -# -*- coding: utf-8 -*- - -import os -import ast -import unittest -from ansible import utils - - -class TestModules(unittest.TestCase): - - def list_all_modules(self): - paths = utils.plugins.module_finder._get_paths() - paths = [x for x in paths if os.path.isdir(x)] - module_list = [] - for path in paths: - for (dirpath, dirnames, filenames) in os.walk(path): - for filename in filenames: - (path, ext) = os.path.splitext(filename) - if ext == ".py": - module_list.append(os.path.join(dirpath, filename)) - return module_list - - def test_ast_parse(self): - module_list = self.list_all_modules() - ERRORS = [] - # attempt to parse each module with ast - for m in module_list: - try: - ast.parse(''.join(open(m))) - except Exception, e: - ERRORS.append((m, e)) - assert len(ERRORS) == 0, "get_docstring errors: %s" % ERRORS diff --git a/test/units/TestPlayVarsFiles.py b/test/units/TestPlayVarsFiles.py deleted file mode 100644 index 9d42b73e8b..0000000000 --- a/test/units/TestPlayVarsFiles.py +++ /dev/null @@ -1,390 +0,0 @@ -#!/usr/bin/env python - -import os -import shutil -from tempfile import mkstemp -from tempfile import mkdtemp -from ansible.playbook.play import Play -import ansible - -import unittest -from nose.plugins.skip import SkipTest - - -class FakeCallBacks(object): - def __init__(self): - pass - def on_vars_prompt(self): - pass - def on_import_for_host(self, host, filename): - pass - -class FakeInventory(object): - def __init__(self): - self.hosts = {} - def basedir(self): - return "." - def src(self): - return "fakeinventory" - def get_variables(self, host, vault_password=None): - if host in self.hosts: - return self.hosts[host] - else: - return {} - -class FakePlayBook(object): - def __init__(self): - self.extra_vars = {} - self.remote_user = None - self.remote_port = None - self.sudo = None - self.sudo_user = None - self.su = None - self.su_user = None - self.become = None - self.become_method = None - self.become_user = None - self.transport = None - self.only_tags = None - self.skip_tags = None - self.force_handlers = None - self.VARS_CACHE = {} - self.SETUP_CACHE = {} - self.inventory = FakeInventory() - self.callbacks = FakeCallBacks() - - self.VARS_CACHE['localhost'] = {} - - -class TestMe(unittest.TestCase): - - ######################################## - # BASIC FILE LOADING BEHAVIOR TESTS - ######################################## - - def test_play_constructor(self): - # __init__(self, playbook, ds, basedir, vault_password=None) - playbook = FakePlayBook() - ds = { "hosts": "localhost"} - basedir = "." - play = Play(playbook, ds, basedir) - - def test_vars_file(self): - - # make a vars file - fd, temp_path = mkstemp() - f = open(temp_path, "wb") - f.write("foo: bar\n") - f.close() - - # create a play with a vars_file - playbook = FakePlayBook() - ds = { "hosts": "localhost", - "vars_files": [temp_path]} - basedir = "." - play = Play(playbook, ds, basedir) - os.remove(temp_path) - - # make sure the variable was loaded - assert 'foo' in play.vars_file_vars, "vars_file was not loaded into play.vars_file_vars" - assert play.vars_file_vars['foo'] == 'bar', "foo was not set to bar in play.vars_file_vars" - - def test_vars_file_nonlist_error(self): - - # make a vars file - fd, temp_path = mkstemp() - f = open(temp_path, "wb") - f.write("foo: bar\n") - f.close() - - # create a play with a string for vars_files - playbook = FakePlayBook() - ds = { "hosts": "localhost", - "vars_files": temp_path} - basedir = "." - error_hit = False - try: - play = Play(playbook, ds, basedir) - except: - error_hit = True - os.remove(temp_path) - - assert error_hit == True, "no error was thrown when vars_files was not a list" - - - def test_multiple_vars_files(self): - - # make a vars file - fd, temp_path = mkstemp() - f = open(temp_path, "wb") - f.write("foo: bar\n") - f.close() - - # make a second vars file - fd, temp_path2 = mkstemp() - f = open(temp_path2, "wb") - f.write("baz: bang\n") - f.close() - - - # create a play with two vars_files - playbook = FakePlayBook() - ds = { "hosts": "localhost", - "vars_files": [temp_path, temp_path2]} - basedir = "." - play = Play(playbook, ds, basedir) - os.remove(temp_path) - os.remove(temp_path2) - - # make sure the variables were loaded - assert 'foo' in play.vars_file_vars, "vars_file was not loaded into play.vars_file_vars" - assert play.vars_file_vars['foo'] == 'bar', "foo was not set to bar in play.vars_file_vars" - assert 'baz' in play.vars_file_vars, "vars_file2 was not loaded into play.vars_file_vars" - assert play.vars_file_vars['baz'] == 'bang', "baz was not set to bang in play.vars_file_vars" - - def test_vars_files_first_found(self): - - # make a vars file - fd, temp_path = mkstemp() - f = open(temp_path, "wb") - f.write("foo: bar\n") - f.close() - - # get a random file path - fd, temp_path2 = mkstemp() - # make sure this file doesn't exist - os.remove(temp_path2) - - # create a play - playbook = FakePlayBook() - ds = { "hosts": "localhost", - "vars_files": [[temp_path2, temp_path]]} - basedir = "." - play = Play(playbook, ds, basedir) - os.remove(temp_path) - - # make sure the variable was loaded - assert 'foo' in play.vars_file_vars, "vars_file was not loaded into play.vars_file_vars" - assert play.vars_file_vars['foo'] == 'bar', "foo was not set to bar in play.vars_file_vars" - - def test_vars_files_multiple_found(self): - - # make a vars file - fd, temp_path = mkstemp() - f = open(temp_path, "wb") - f.write("foo: bar\n") - f.close() - - # make a second vars file - fd, temp_path2 = mkstemp() - f = open(temp_path2, "wb") - f.write("baz: bang\n") - f.close() - - # create a play - playbook = FakePlayBook() - ds = { "hosts": "localhost", - "vars_files": [[temp_path, temp_path2]]} - basedir = "." - play = Play(playbook, ds, basedir) - os.remove(temp_path) - os.remove(temp_path2) - - # make sure the variables were loaded - assert 'foo' in play.vars_file_vars, "vars_file was not loaded into play.vars_file_vars" - assert play.vars_file_vars['foo'] == 'bar', "foo was not set to bar in play.vars_file_vars" - assert 'baz' not in play.vars_file_vars, "vars_file2 was loaded after vars_file1 was loaded" - - def test_vars_files_assert_all_found(self): - - # make a vars file - fd, temp_path = mkstemp() - f = open(temp_path, "wb") - f.write("foo: bar\n") - f.close() - - # make a second vars file - fd, temp_path2 = mkstemp() - # make sure it doesn't exist - os.remove(temp_path2) - - # create a play - playbook = FakePlayBook() - ds = { "hosts": "localhost", - "vars_files": [temp_path, temp_path2]} - basedir = "." - - error_hit = False - error_msg = None - - try: - play = Play(playbook, ds, basedir) - except ansible.errors.AnsibleError, e: - error_hit = True - error_msg = e - - os.remove(temp_path) - assert error_hit == True, "no error was thrown for missing vars_file" - - - ######################################## - # VARIABLE PRECEDENCE TESTS - ######################################## - - # On the first run vars_files are loaded into play.vars_file_vars by host == None - # * only files with vars from host==None will work here - # On the secondary run(s), a host is given and the vars_files are loaded into VARS_CACHE - # * this only occurs if host is not None, filename2 has vars in the name, and filename3 does not - - # filename -- the original string - # filename2 -- filename templated with play vars - # filename3 -- filename2 template with inject (hostvars + setup_cache + vars_cache) - # filename4 -- path_dwim(filename3) - - def test_vars_files_for_host(self): - - # host != None - # vars in filename2 - # no vars in filename3 - - # make a vars file - fd, temp_path = mkstemp() - f = open(temp_path, "wb") - f.write("foo: bar\n") - f.close() - - # build play attributes - playbook = FakePlayBook() - ds = { "hosts": "localhost", - "vars_files": ["{{ temp_path }}"]} - basedir = "." - playbook.VARS_CACHE['localhost']['temp_path'] = temp_path - - # create play and do first run - play = Play(playbook, ds, basedir) - - # the second run is started by calling update_vars_files - play.update_vars_files(['localhost']) - os.remove(temp_path) - - assert 'foo' in play.playbook.VARS_CACHE['localhost'], "vars_file vars were not loaded into vars_cache" - assert play.playbook.VARS_CACHE['localhost']['foo'] == 'bar', "foo does not equal bar" - - - ######################################## - # COMPLEX FILENAME TEMPLATING TESTS - ######################################## - - def test_vars_files_two_vars_in_name(self): - - # self.vars_file_vars = ds['vars'] - # self.vars_file_vars += _get_vars() ... aka extra_vars - - # make a temp dir - temp_dir = mkdtemp() - - # make a temp file - fd, temp_file = mkstemp(dir=temp_dir) - f = open(temp_file, "wb") - f.write("foo: bar\n") - f.close() - - # build play attributes - playbook = FakePlayBook() - ds = { "hosts": "localhost", - "vars": { "temp_dir": os.path.dirname(temp_file), - "temp_file": os.path.basename(temp_file) }, - "vars_files": ["{{ temp_dir + '/' + temp_file }}"]} - basedir = "." - - # create play and do first run - play = Play(playbook, ds, basedir) - - # cleanup - shutil.rmtree(temp_dir) - - assert 'foo' in play.vars_file_vars, "double var templated vars_files filename not loaded" - - def test_vars_files_two_vars_different_scope(self): - - # - # Use a play var and an inventory var to create the filename - # - - # self.playbook.inventory.get_variables(host) - # {'group_names': ['ungrouped'], 'inventory_hostname': 'localhost', - # 'ansible_ssh_user': 'root', 'inventory_hostname_short': 'localhost'} - - # make a temp dir - temp_dir = mkdtemp() - - # make a temp file - fd, temp_file = mkstemp(dir=temp_dir) - f = open(temp_file, "wb") - f.write("foo: bar\n") - f.close() - - # build play attributes - playbook = FakePlayBook() - playbook.inventory.hosts['localhost'] = {'inventory_hostname': os.path.basename(temp_file)} - ds = { "hosts": "localhost", - "vars": { "temp_dir": os.path.dirname(temp_file)}, - "vars_files": ["{{ temp_dir + '/' + inventory_hostname }}"]} - basedir = "." - - # create play and do first run - play = Play(playbook, ds, basedir) - - # do the host run - play.update_vars_files(['localhost']) - - # cleanup - shutil.rmtree(temp_dir) - - assert 'foo' not in play.vars_file_vars, \ - "mixed scope vars_file loaded into play vars" - assert 'foo' in play.playbook.VARS_CACHE['localhost'], \ - "differently scoped templated vars_files filename not loaded" - assert play.playbook.VARS_CACHE['localhost']['foo'] == 'bar', \ - "foo is not bar" - - def test_vars_files_two_vars_different_scope_first_found(self): - - # - # Use a play var and an inventory var to create the filename - # - - # make a temp dir - temp_dir = mkdtemp() - - # make a temp file - fd, temp_file = mkstemp(dir=temp_dir) - f = open(temp_file, "wb") - f.write("foo: bar\n") - f.close() - - # build play attributes - playbook = FakePlayBook() - playbook.inventory.hosts['localhost'] = {'inventory_hostname': os.path.basename(temp_file)} - ds = { "hosts": "localhost", - "vars": { "temp_dir": os.path.dirname(temp_file)}, - "vars_files": [["{{ temp_dir + '/' + inventory_hostname }}"]]} - basedir = "." - - # create play and do first run - play = Play(playbook, ds, basedir) - - # do the host run - play.update_vars_files(['localhost']) - - # cleanup - shutil.rmtree(temp_dir) - - assert 'foo' not in play.vars_file_vars, \ - "mixed scope vars_file loaded into play vars" - assert 'foo' in play.playbook.VARS_CACHE['localhost'], \ - "differently scoped templated vars_files filename not loaded" - assert play.playbook.VARS_CACHE['localhost']['foo'] == 'bar', \ - "foo is not bar" - - diff --git a/test/units/TestSynchronize.py b/test/units/TestSynchronize.py deleted file mode 100644 index cf28ea5d80..0000000000 --- a/test/units/TestSynchronize.py +++ /dev/null @@ -1,176 +0,0 @@ - -import unittest -import getpass -import os -import shutil -import time -import tempfile -from nose.plugins.skip import SkipTest - -from ansible.runner.action_plugins.synchronize import ActionModule as Synchronize - -class FakeRunner(object): - def __init__(self): - self.connection = None - self.transport = None - self.basedir = None - self.sudo = None - self.remote_user = None - self.private_key_file = None - self.check = False - self.become = False - self.become_method = 'sudo' - self.become_user = False - - def _execute_module(self, conn, tmp, module_name, args, - async_jid=None, async_module=None, async_limit=None, inject=None, - persist_files=False, complex_args=None, delete_remote_tmp=True): - self.executed_conn = conn - self.executed_tmp = tmp - self.executed_module_name = module_name - self.executed_args = args - self.executed_async_jid = async_jid - self.executed_async_module = async_module - self.executed_async_limit = async_limit - self.executed_inject = inject - self.executed_persist_files = persist_files - self.executed_complex_args = complex_args - self.executed_delete_remote_tmp = delete_remote_tmp - - def noop_on_check(self, inject): - return self.check - -class FakeConn(object): - def __init__(self): - self.host = None - self.delegate = None - -class TestSynchronize(unittest.TestCase): - - - def test_synchronize_action_basic(self): - - """ verify the synchronize action plugin sets - the delegate to 127.0.0.1 and remote path to user@host:/path """ - - runner = FakeRunner() - runner.remote_user = "root" - runner.transport = "ssh" - conn = FakeConn() - inject = { - 'inventory_hostname': "el6.lab.net", - 'inventory_hostname_short': "el6", - 'ansible_connection': None, - 'ansible_ssh_user': 'root', - 'delegate_to': None, - 'playbook_dir': '.', - } - - x = Synchronize(runner) - x.setup("synchronize", inject) - x.run(conn, "/tmp", "synchronize", "src=/tmp/foo dest=/tmp/bar", inject) - - assert runner.executed_inject['delegate_to'] == "127.0.0.1", "was not delegated to 127.0.0.1" - assert runner.executed_complex_args == {"dest":"root@el6.lab.net:/tmp/bar", "src":"/tmp/foo"}, "wrong args used" - assert runner.sudo == None, "sudo was not reset to None" - - def test_synchronize_action_sudo(self): - - """ verify the synchronize action plugin unsets and then sets sudo """ - - runner = FakeRunner() - runner.become = True - runner.remote_user = "root" - runner.transport = "ssh" - conn = FakeConn() - inject = { - 'inventory_hostname': "el6.lab.net", - 'inventory_hostname_short': "el6", - 'ansible_connection': None, - 'ansible_ssh_user': 'root', - 'delegate_to': None, - 'playbook_dir': '.', - } - - x = Synchronize(runner) - x.setup("synchronize", inject) - x.run(conn, "/tmp", "synchronize", "src=/tmp/foo dest=/tmp/bar", inject) - - assert runner.executed_inject['delegate_to'] == "127.0.0.1", "was not delegated to 127.0.0.1" - assert runner.executed_complex_args == {'dest':'root@el6.lab.net:/tmp/bar', - 'src':'/tmp/foo', - 'rsync_path':'"sudo rsync"'}, "wrong args used" - assert runner.become == True, "sudo was not reset to True" - - - def test_synchronize_action_local(self): - - """ verify the synchronize action plugin sets - the delegate to 127.0.0.1 and does not alter the dest """ - - runner = FakeRunner() - runner.remote_user = "jtanner" - runner.transport = "paramiko" - conn = FakeConn() - conn.host = "127.0.0.1" - conn.delegate = "thishost" - inject = { - 'inventory_hostname': "thishost", - 'ansible_ssh_host': '127.0.0.1', - 'ansible_connection': 'local', - 'delegate_to': None, - 'playbook_dir': '.', - } - - x = Synchronize(runner) - x.setup("synchronize", inject) - x.run(conn, "/tmp", "synchronize", "src=/tmp/foo dest=/tmp/bar", inject) - - assert runner.transport == "paramiko", "runner transport was changed" - assert runner.remote_user == "jtanner", "runner remote_user was changed" - assert runner.executed_inject['delegate_to'] == "127.0.0.1", "was not delegated to 127.0.0.1" - assert "dest_port" not in runner.executed_complex_args, "dest_port should not have been set" - assert runner.executed_complex_args.get("src") == "/tmp/foo", "source was set incorrectly" - assert runner.executed_complex_args.get("dest") == "/tmp/bar", "dest was set incorrectly" - - - def test_synchronize_action_vagrant(self): - - """ Verify the action plugin accommodates the common - scenarios for vagrant boxes. """ - - runner = FakeRunner() - runner.remote_user = "jtanner" - runner.transport = "ssh" - conn = FakeConn() - conn.host = "127.0.0.1" - conn.delegate = "thishost" - inject = { - 'inventory_hostname': "thishost", - 'ansible_ssh_user': 'vagrant', - 'ansible_ssh_host': '127.0.0.1', - 'ansible_ssh_port': '2222', - 'delegate_to': None, - 'playbook_dir': '.', - 'hostvars': { - 'thishost': { - 'inventory_hostname': 'thishost', - 'ansible_ssh_port': '2222', - 'ansible_ssh_host': '127.0.0.1', - 'ansible_ssh_user': 'vagrant' - } - } - } - - x = Synchronize(runner) - x.setup("synchronize", inject) - x.run(conn, "/tmp", "synchronize", "src=/tmp/foo dest=/tmp/bar", inject) - - assert runner.transport == "ssh", "runner transport was changed" - assert runner.remote_user == "jtanner", "runner remote_user was changed" - assert runner.executed_inject['delegate_to'] == "127.0.0.1", "was not delegated to 127.0.0.1" - assert runner.executed_inject['ansible_ssh_user'] == "vagrant", "runner user was changed" - assert runner.executed_complex_args.get("dest_port") == "2222", "remote port was not set to 2222" - assert runner.executed_complex_args.get("src") == "/tmp/foo", "source was set incorrectly" - assert runner.executed_complex_args.get("dest") == "vagrant@127.0.0.1:/tmp/bar", "dest was set incorrectly" - diff --git a/test/units/TestUtils.py b/test/units/TestUtils.py deleted file mode 100644 index c0ca9ba538..0000000000 --- a/test/units/TestUtils.py +++ /dev/null @@ -1,945 +0,0 @@ -# -*- coding: utf-8 -*- - -import traceback -import unittest -import os -import os.path -import re -import tempfile -import yaml -import passlib.hash -import string -import StringIO -import copy -import tempfile -import shutil - -from nose.plugins.skip import SkipTest -from mock import patch - -import ansible.utils -import ansible.errors -import ansible.constants as C -import ansible.utils.template as template2 -from ansible.module_utils.splitter import split_args - -from ansible import __version__ - -import sys -reload(sys) -sys.setdefaultencoding("utf8") - -class TestUtils(unittest.TestCase): - - def _is_fips(self): - try: - data = open('/proc/sys/crypto/fips_enabled').read().strip() - except: - return False - if data != '1': - return False - return True - - def test_before_comment(self): - ''' see if we can detect the part of a string before a comment. Used by INI parser in inventory ''' - - input = "before # comment" - expected = "before " - actual = ansible.utils.before_comment(input) - self.assertEqual(expected, actual) - - input = "before \# not a comment" - expected = "before # not a comment" - actual = ansible.utils.before_comment(input) - self.assertEqual(expected, actual) - - input = "" - expected = "" - actual = ansible.utils.before_comment(input) - self.assertEqual(expected, actual) - - input = "#" - expected = "" - actual = ansible.utils.before_comment(input) - self.assertEqual(expected, actual) - - ##################################### - ### check_conditional tests - - def test_check_conditional_jinja2_literals(self): - # see http://jinja.pocoo.org/docs/templates/#literals - - # none - self.assertEqual(ansible.utils.check_conditional( - None, '/', {}), True) - self.assertEqual(ansible.utils.check_conditional( - '', '/', {}), True) - - # list - self.assertEqual(ansible.utils.check_conditional( - ['true'], '/', {}), True) - self.assertEqual(ansible.utils.check_conditional( - ['false'], '/', {}), False) - - # non basestring or list - self.assertEqual(ansible.utils.check_conditional( - {}, '/', {}), {}) - - # boolean - self.assertEqual(ansible.utils.check_conditional( - 'true', '/', {}), True) - self.assertEqual(ansible.utils.check_conditional( - 'false', '/', {}), False) - self.assertEqual(ansible.utils.check_conditional( - 'True', '/', {}), True) - self.assertEqual(ansible.utils.check_conditional( - 'False', '/', {}), False) - - # integer - self.assertEqual(ansible.utils.check_conditional( - '1', '/', {}), True) - self.assertEqual(ansible.utils.check_conditional( - '0', '/', {}), False) - - # string, beware, a string is truthy unless empty - self.assertEqual(ansible.utils.check_conditional( - '"yes"', '/', {}), True) - self.assertEqual(ansible.utils.check_conditional( - '"no"', '/', {}), True) - self.assertEqual(ansible.utils.check_conditional( - '""', '/', {}), False) - - - def test_check_conditional_jinja2_variable_literals(self): - # see http://jinja.pocoo.org/docs/templates/#literals - - # boolean - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': 'True'}), True) - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': 'true'}), True) - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': 'False'}), False) - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': 'false'}), False) - - # integer - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': '1'}), True) - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': 1}), True) - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': '0'}), False) - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': 0}), False) - - # string, beware, a string is truthy unless empty - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': '"yes"'}), True) - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': '"no"'}), True) - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': '""'}), False) - - # Python boolean in Jinja2 expression - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': True}), True) - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': False}), False) - - - def test_check_conditional_jinja2_expression(self): - self.assertEqual(ansible.utils.check_conditional( - '1 == 1', '/', {}), True) - self.assertEqual(ansible.utils.check_conditional( - 'bar == 42', '/', {'bar': 42}), True) - self.assertEqual(ansible.utils.check_conditional( - 'bar != 42', '/', {'bar': 42}), False) - - - def test_check_conditional_jinja2_expression_in_variable(self): - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': '1 == 1'}), True) - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': 'bar == 42', 'bar': 42}), True) - self.assertEqual(ansible.utils.check_conditional( - 'var', '/', {'var': 'bar != 42', 'bar': 42}), False) - - def test_check_conditional_jinja2_unicode(self): - self.assertEqual(ansible.utils.check_conditional( - u'"\u00df"', '/', {}), True) - self.assertEqual(ansible.utils.check_conditional( - u'var == "\u00df"', '/', {'var': u'\u00df'}), True) - - - ##################################### - ### key-value parsing - - def test_parse_kv_basic(self): - self.assertEqual(ansible.utils.parse_kv('a=simple b="with space" c="this=that"'), - {'a': 'simple', 'b': 'with space', 'c': 'this=that'}) - self.assertEqual(ansible.utils.parse_kv('msg=АБВГД'), - {'msg': 'АБВГД'}) - - - def test_jsonify(self): - self.assertEqual(ansible.utils.jsonify(None), '{}') - self.assertEqual(ansible.utils.jsonify(dict(foo='bar', baz=['qux'])), '{"baz": ["qux"], "foo": "bar"}') - expected = u'{"baz":["qux"],"foo":"bar"}' - self.assertEqual("".join(ansible.utils.jsonify(dict(foo='bar', baz=['qux']), format=True).split()), expected) - - def test_is_failed(self): - self.assertEqual(ansible.utils.is_failed(dict(rc=0)), False) - self.assertEqual(ansible.utils.is_failed(dict(rc=1)), True) - self.assertEqual(ansible.utils.is_failed(dict()), False) - self.assertEqual(ansible.utils.is_failed(dict(failed=False)), False) - self.assertEqual(ansible.utils.is_failed(dict(failed=True)), True) - self.assertEqual(ansible.utils.is_failed(dict(failed='True')), True) - self.assertEqual(ansible.utils.is_failed(dict(failed='true')), True) - - def test_is_changed(self): - self.assertEqual(ansible.utils.is_changed(dict()), False) - self.assertEqual(ansible.utils.is_changed(dict(changed=False)), False) - self.assertEqual(ansible.utils.is_changed(dict(changed=True)), True) - self.assertEqual(ansible.utils.is_changed(dict(changed='True')), True) - self.assertEqual(ansible.utils.is_changed(dict(changed='true')), True) - - def test_path_dwim(self): - self.assertEqual(ansible.utils.path_dwim(None, __file__), - __file__) - self.assertEqual(ansible.utils.path_dwim(None, '~'), - os.path.expanduser('~')) - self.assertEqual(ansible.utils.path_dwim(None, 'TestUtils.py'), - __file__.rstrip('c')) - - def test_path_dwim_relative(self): - self.assertEqual(ansible.utils.path_dwim_relative(__file__, 'units', 'TestUtils.py', - os.path.dirname(os.path.dirname(__file__))), - __file__.rstrip('c')) - - def test_json_loads(self): - self.assertEqual(ansible.utils.json_loads('{"foo": "bar"}'), dict(foo='bar')) - - def test_parse_json(self): - # leading junk - self.assertEqual(ansible.utils.parse_json('ansible\n{"foo": "bar"}'), dict(foo="bar")) - - # No closing quotation - try: - rc = ansible.utils.parse_json('foo=bar "') - print rc - except ValueError: - pass - else: - traceback.print_exc() - raise AssertionError('Incorrect exception, expected ValueError') - - # Failed to parse - try: - ansible.utils.parse_json('{') - except ValueError: - pass - else: - raise AssertionError('Incorrect exception, expected ValueError') - - def test_parse_yaml(self): - #json - self.assertEqual(ansible.utils.parse_yaml('{"foo": "bar"}'), dict(foo='bar')) - - # broken json - try: - ansible.utils.parse_yaml('{') - except ansible.errors.AnsibleError: - pass - else: - raise AssertionError - - # broken json with path_hint - try: - ansible.utils.parse_yaml('{', path_hint='foo') - except ansible.errors.AnsibleError: - pass - else: - raise AssertionError - - # yaml with front-matter - self.assertEqual(ansible.utils.parse_yaml("---\nfoo: bar"), dict(foo='bar')) - # yaml no front-matter - self.assertEqual(ansible.utils.parse_yaml('foo: bar'), dict(foo='bar')) - # yaml indented first line (See #6348) - self.assertEqual(ansible.utils.parse_yaml(' - foo: bar\n baz: qux'), [dict(foo='bar', baz='qux')]) - - def test_process_common_errors(self): - # no quote - self.assertTrue('YAML thought it' in ansible.utils.process_common_errors('', 'foo: {{bar}}', 6)) - - # extra colon - self.assertTrue('an extra unquoted colon' in ansible.utils.process_common_errors('', 'foo: bar:', 8)) - - # match - self.assertTrue('same kind of quote' in ansible.utils.process_common_errors('', 'foo: "{{bar}}"baz', 6)) - self.assertTrue('same kind of quote' in ansible.utils.process_common_errors('', "foo: '{{bar}}'baz", 6)) - - # unbalanced - self.assertTrue('We could be wrong' in ansible.utils.process_common_errors('', 'foo: "bad" "wolf"', 6)) - self.assertTrue('We could be wrong' in ansible.utils.process_common_errors('', "foo: 'bad' 'wolf'", 6)) - - - def test_process_yaml_error(self): - data = 'foo: bar\n baz: qux' - try: - ansible.utils.parse_yaml(data) - except yaml.YAMLError, exc: - try: - ansible.utils.process_yaml_error(exc, data, __file__) - except ansible.errors.AnsibleYAMLValidationFailed, e: - self.assertTrue('Syntax Error while loading' in str(e)) - else: - raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed') - - data = 'foo: bar\n baz: {{qux}}' - try: - ansible.utils.parse_yaml(data) - except yaml.YAMLError, exc: - try: - ansible.utils.process_yaml_error(exc, data, __file__) - except ansible.errors.AnsibleYAMLValidationFailed, e: - self.assertTrue('Syntax Error while loading' in str(e)) - else: - raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed') - - data = '\xFF' - try: - ansible.utils.parse_yaml(data) - except yaml.YAMLError, exc: - try: - ansible.utils.process_yaml_error(exc, data, __file__) - except ansible.errors.AnsibleYAMLValidationFailed, e: - self.assertTrue('Check over' in str(e)) - else: - raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed') - - data = '\xFF' - try: - ansible.utils.parse_yaml(data) - except yaml.YAMLError, exc: - try: - ansible.utils.process_yaml_error(exc, data, None) - except ansible.errors.AnsibleYAMLValidationFailed, e: - self.assertTrue('Could not parse YAML.' in str(e)) - else: - raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed') - - def test_parse_yaml_from_file(self): - test = os.path.join(os.path.dirname(__file__), 'inventory_test_data', - 'common_vars.yml') - encrypted = os.path.join(os.path.dirname(__file__), 'inventory_test_data', - 'encrypted.yml') - broken = os.path.join(os.path.dirname(__file__), 'inventory_test_data', - 'broken.yml') - - try: - ansible.utils.parse_yaml_from_file(os.path.dirname(__file__)) - except ansible.errors.AnsibleError: - pass - else: - raise AssertionError('Incorrect exception, expected AnsibleError') - - self.assertEqual(ansible.utils.parse_yaml_from_file(test), yaml.safe_load(open(test))) - - self.assertEqual(ansible.utils.parse_yaml_from_file(encrypted, 'ansible'), dict(foo='bar')) - - try: - ansible.utils.parse_yaml_from_file(broken) - except ansible.errors.AnsibleYAMLValidationFailed, e: - self.assertTrue('Syntax Error while loading' in str(e)) - else: - raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed') - - def test_merge_hash(self): - self.assertEqual(ansible.utils.merge_hash(dict(foo='bar', baz='qux'), dict(foo='baz')), - dict(foo='baz', baz='qux')) - self.assertEqual(ansible.utils.merge_hash(dict(foo=dict(bar='baz')), dict(foo=dict(bar='qux'))), - dict(foo=dict(bar='qux'))) - - def test_md5s(self): - if self._is_fips(): - raise SkipTest('MD5 unavailable on FIPs enabled systems') - self.assertEqual(ansible.utils.md5s('ansible'), '640c8a5376aa12fa15cf02130ce239a6') - # Need a test that causes UnicodeEncodeError See 4221 - - def test_md5(self): - if self._is_fips(): - raise SkipTest('MD5 unavailable on FIPs enabled systems') - self.assertEqual(ansible.utils.md5(os.path.join(os.path.dirname(__file__), 'ansible.cfg')), - 'fb7b5b90ea63f04bde33e804b6fad42c') - self.assertEqual(ansible.utils.md5(os.path.join(os.path.dirname(__file__), 'ansible.cf')), - None) - - def test_checksum_s(self): - self.assertEqual(ansible.utils.checksum_s('ansible'), 'bef45157a43c9e5f469d188810814a4a8ab9f2ed') - # Need a test that causes UnicodeEncodeError See 4221 - - def test_checksum(self): - self.assertEqual(ansible.utils.checksum(os.path.join(os.path.dirname(__file__), 'ansible.cfg')), - '658b67c8ac7595adde7048425ff1f9aba270721a') - self.assertEqual(ansible.utils.checksum(os.path.join(os.path.dirname(__file__), 'ansible.cf')), - None) - - def test_default(self): - self.assertEqual(ansible.utils.default(None, lambda: {}), {}) - self.assertEqual(ansible.utils.default(dict(foo='bar'), lambda: {}), dict(foo='bar')) - - def test__gitinfo(self): - # this fails if not run from git clone - # self.assertEqual('last updated' in ansible.utils._gitinfo()) - # missing test for git submodule - # missing test outside of git clone - pass - - def test_version(self): - version = ansible.utils.version('ansible') - self.assertTrue(version.startswith('ansible %s' % __version__)) - # this fails if not run from git clone - # self.assertEqual('last updated' in version) - - def test_getch(self): - # figure out how to test this - pass - - def test_sanitize_output(self): - self.assertEqual(ansible.utils.sanitize_output('password=foo'), 'password=VALUE_HIDDEN') - self.assertEqual(ansible.utils.sanitize_output('foo=user:pass@foo/whatever'), - 'foo=user:********@foo/whatever') - self.assertEqual(ansible.utils.sanitize_output('foo=http://username:pass@wherever/foo'), - 'foo=http://username:********@wherever/foo') - self.assertEqual(ansible.utils.sanitize_output('foo=http://wherever/foo'), - 'foo=http://wherever/foo') - - def test_increment_debug(self): - ansible.utils.VERBOSITY = 0 - ansible.utils.increment_debug(None, None, None, None) - self.assertEqual(ansible.utils.VERBOSITY, 1) - - def test_base_parser(self): - output = ansible.utils.base_parser(output_opts=True) - self.assertTrue(output.has_option('--one-line') and output.has_option('--tree')) - - runas = ansible.utils.base_parser(runas_opts=True) - for opt in ['--sudo', '--sudo-user', '--user', '--su', '--su-user']: - self.assertTrue(runas.has_option(opt)) - - async = ansible.utils.base_parser(async_opts=True) - self.assertTrue(async.has_option('--poll') and async.has_option('--background')) - - connect = ansible.utils.base_parser(connect_opts=True) - self.assertTrue(connect.has_option('--connection')) - - subset = ansible.utils.base_parser(subset_opts=True) - self.assertTrue(subset.has_option('--limit')) - - check = ansible.utils.base_parser(check_opts=True) - self.assertTrue(check.has_option('--check')) - - diff = ansible.utils.base_parser(diff_opts=True) - self.assertTrue(diff.has_option('--diff')) - - def test_do_encrypt(self): - salt_chars = string.ascii_letters + string.digits + './' - salt = ansible.utils.random_password(length=8, chars=salt_chars) - hash = ansible.utils.do_encrypt('ansible', 'sha256_crypt', salt=salt) - self.assertTrue(passlib.hash.sha256_crypt.verify('ansible', hash)) - - hash = ansible.utils.do_encrypt('ansible', 'sha256_crypt') - self.assertTrue(passlib.hash.sha256_crypt.verify('ansible', hash)) - - try: - ansible.utils.do_encrypt('ansible', 'ansible') - except ansible.errors.AnsibleError: - pass - else: - raise AssertionError('Incorrect exception, expected AnsibleError') - - def test_do_encrypt_md5(self): - if self._is_fips(): - raise SkipTest('MD5 unavailable on FIPS systems') - hash = ansible.utils.do_encrypt('ansible', 'md5_crypt', salt_size=4) - self.assertTrue(passlib.hash.md5_crypt.verify('ansible', hash)) - - def test_last_non_blank_line(self): - self.assertEqual(ansible.utils.last_non_blank_line('a\n\nb\n\nc'), 'c') - self.assertEqual(ansible.utils.last_non_blank_line(''), '') - - def test_filter_leading_non_json_lines(self): - self.assertEqual(ansible.utils.filter_leading_non_json_lines('a\nb\nansible!\n{"foo": "bar"}'), - '{"foo": "bar"}\n') - self.assertEqual(ansible.utils.filter_leading_non_json_lines('a\nb\nansible!\n["foo", "bar"]'), - '["foo", "bar"]\n') - - def test_boolean(self): - self.assertEqual(ansible.utils.boolean("true"), True) - self.assertEqual(ansible.utils.boolean("True"), True) - self.assertEqual(ansible.utils.boolean("TRUE"), True) - self.assertEqual(ansible.utils.boolean("t"), True) - self.assertEqual(ansible.utils.boolean("T"), True) - self.assertEqual(ansible.utils.boolean("Y"), True) - self.assertEqual(ansible.utils.boolean("y"), True) - self.assertEqual(ansible.utils.boolean("1"), True) - self.assertEqual(ansible.utils.boolean(1), True) - self.assertEqual(ansible.utils.boolean("false"), False) - self.assertEqual(ansible.utils.boolean("False"), False) - self.assertEqual(ansible.utils.boolean("0"), False) - self.assertEqual(ansible.utils.boolean(0), False) - self.assertEqual(ansible.utils.boolean("foo"), False) - - def test_make_sudo_cmd(self): - cmd = ansible.utils.make_sudo_cmd(C.DEFAULT_SUDO_EXE, 'root', '/bin/sh', '/bin/ls') - self.assertTrue(isinstance(cmd, tuple)) - self.assertEqual(len(cmd), 3) - self.assertTrue('-u root' in cmd[0]) - self.assertTrue('-p "[sudo via ansible, key=' in cmd[0] and cmd[1].startswith('[sudo via ansible, key')) - self.assertTrue('echo BECOME-SUCCESS-' in cmd[0] and cmd[2].startswith('BECOME-SUCCESS-')) - self.assertTrue('sudo -k' in cmd[0]) - - def test_make_su_cmd(self): - cmd = ansible.utils.make_su_cmd('root', '/bin/sh', '/bin/ls') - self.assertTrue(isinstance(cmd, tuple)) - self.assertEqual(len(cmd), 3) - self.assertTrue('root -c "/bin/sh' in cmd[0] or ' root -c /bin/sh' in cmd[0]) - self.assertTrue('echo BECOME-SUCCESS-' in cmd[0] and cmd[2].startswith('BECOME-SUCCESS-')) - - def test_to_unicode(self): - uni = ansible.utils.unicode.to_unicode(u'ansible') - self.assertTrue(isinstance(uni, unicode)) - self.assertEqual(uni, u'ansible') - - none = ansible.utils.unicode.to_unicode(None, nonstring='passthru') - self.assertTrue(isinstance(none, type(None))) - self.assertTrue(none is None) - - utf8 = ansible.utils.unicode.to_unicode('ansible') - self.assertTrue(isinstance(utf8, unicode)) - self.assertEqual(utf8, u'ansible') - - def test_is_list_of_strings(self): - self.assertEqual(ansible.utils.is_list_of_strings(['foo', 'bar', u'baz']), True) - self.assertEqual(ansible.utils.is_list_of_strings(['foo', 'bar', True]), False) - self.assertEqual(ansible.utils.is_list_of_strings(['one', 2, 'three']), False) - - def test_contains_vars(self): - self.assertTrue(ansible.utils.contains_vars('{{foo}}')) - self.assertTrue(ansible.utils.contains_vars('$foo')) - self.assertFalse(ansible.utils.contains_vars('foo')) - - def test_safe_eval(self): - # Not basestring - self.assertEqual(ansible.utils.safe_eval(len), len) - self.assertEqual(ansible.utils.safe_eval(1), 1) - self.assertEqual(ansible.utils.safe_eval(len, include_exceptions=True), (len, None)) - self.assertEqual(ansible.utils.safe_eval(1, include_exceptions=True), (1, None)) - - # module - self.assertEqual(ansible.utils.safe_eval('foo.bar('), 'foo.bar(') - self.assertEqual(ansible.utils.safe_eval('foo.bar(', include_exceptions=True), ('foo.bar(', None)) - - # import - self.assertEqual(ansible.utils.safe_eval('import foo'), 'import foo') - self.assertEqual(ansible.utils.safe_eval('import foo', include_exceptions=True), ('import foo', None)) - - # valid simple eval - self.assertEqual(ansible.utils.safe_eval('True'), True) - self.assertEqual(ansible.utils.safe_eval('True', include_exceptions=True), (True, None)) - - # valid eval with lookup - self.assertEqual(ansible.utils.safe_eval('foo + bar', dict(foo=1, bar=2)), 3) - self.assertEqual(ansible.utils.safe_eval('foo + bar', dict(foo=1, bar=2), include_exceptions=True), (3, None)) - - # invalid eval - self.assertEqual(ansible.utils.safe_eval('foo'), 'foo') - nameerror = ansible.utils.safe_eval('foo', include_exceptions=True) - self.assertTrue(isinstance(nameerror, tuple)) - self.assertEqual(nameerror[0], 'foo') - self.assertTrue(isinstance(nameerror[1], NameError)) - - def test_listify_lookup_plugin_terms(self): - basedir = os.path.dirname(__file__) - # Straight lookups - #self.assertEqual(ansible.utils.listify_lookup_plugin_terms('things', basedir, dict(things=[])), []) - #self.assertEqual(ansible.utils.listify_lookup_plugin_terms('things', basedir, dict(things=['one', 'two'])), ['one', 'two']) - - def test_deprecated(self): - sys_stderr = sys.stderr - sys.stderr = StringIO.StringIO() - ansible.utils.deprecated('Ack!', '0.0') - out = sys.stderr.getvalue() - self.assertTrue('0.0' in out) - self.assertTrue('[DEPRECATION WARNING]' in out) - - sys.stderr = StringIO.StringIO() - ansible.utils.deprecated('Ack!', None) - out = sys.stderr.getvalue() - self.assertTrue('0.0' not in out) - self.assertTrue('[DEPRECATION WARNING]' in out) - - sys.stderr = StringIO.StringIO() - warnings = C.DEPRECATION_WARNINGS - C.DEPRECATION_WARNINGS = False - ansible.utils.deprecated('Ack!', None) - out = sys.stderr.getvalue() - self.assertTrue(not out) - C.DEPRECATION_WARNINGS = warnings - - sys.stderr = sys_stderr - - try: - ansible.utils.deprecated('Ack!', '0.0', True) - except ansible.errors.AnsibleError, e: - self.assertTrue('0.0' not in str(e)) - self.assertTrue('[DEPRECATED]' in str(e)) - else: - raise AssertionError("Incorrect exception, expected AnsibleError") - - def test_warning(self): - sys_stderr = sys.stderr - sys.stderr = StringIO.StringIO() - ansible.utils.warning('ANSIBLE') - out = sys.stderr.getvalue() - sys.stderr = sys_stderr - self.assertTrue('[WARNING]: ANSIBLE' in out) - - def test_combine_vars(self): - one = {'foo': {'bar': True}, 'baz': {'one': 'qux'}} - two = {'baz': {'two': 'qux'}} - replace = {'baz': {'two': 'qux'}, 'foo': {'bar': True}} - merge = {'baz': {'two': 'qux', 'one': 'qux'}, 'foo': {'bar': True}} - - C.DEFAULT_HASH_BEHAVIOUR = 'replace' - self.assertEqual(ansible.utils.combine_vars(one, two), replace) - - C.DEFAULT_HASH_BEHAVIOUR = 'merge' - self.assertEqual(ansible.utils.combine_vars(one, two), merge) - - def test_err(self): - sys_stderr = sys.stderr - sys.stderr = StringIO.StringIO() - ansible.utils.err('ANSIBLE') - out = sys.stderr.getvalue() - sys.stderr = sys_stderr - self.assertEqual(out, 'ANSIBLE\n') - - def test_exit(self): - sys_stderr = sys.stderr - sys.stderr = StringIO.StringIO() - try: - ansible.utils.exit('ansible') - except SystemExit, e: - self.assertEqual(e.code, 1) - self.assertEqual(sys.stderr.getvalue(), 'ansible\n') - else: - raise AssertionError('Incorrect exception, expected SystemExit') - finally: - sys.stderr = sys_stderr - - def test_unfrackpath(self): - os.environ['TEST_ROOT'] = os.path.dirname(os.path.dirname(__file__)) - self.assertEqual(ansible.utils.unfrackpath('$TEST_ROOT/units/../units/TestUtils.py'), __file__.rstrip('c')) - - def test_is_executable(self): - self.assertEqual(ansible.utils.is_executable(__file__), 0) - - bin_ansible = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), - 'bin', 'ansible') - self.assertNotEqual(ansible.utils.is_executable(bin_ansible), 0) - - def test_get_diff(self): - standard = dict( - before_header='foo', - after_header='bar', - before='fooo', - after='foo' - ) - - standard_expected = """--- before: foo -+++ after: bar -@@ -1 +1 @@ --fooo+foo""" - - # workaround py26 and py27 difflib differences - standard_expected = """-fooo+foo""" - diff = ansible.utils.get_diff(standard) - diff = diff.split('\n') - del diff[0] - del diff[0] - del diff[0] - diff = '\n'.join(diff) - self.assertEqual(diff, unicode(standard_expected)) - - def test_split_args(self): - # split_args is a smarter shlex.split for the needs of the way ansible uses it - - def _split_info(input, desired, actual): - print "SENT: ", input - print "WANT: ", desired - print "GOT: ", actual - - def _test_combo(input, desired): - actual = split_args(input) - _split_info(input, desired, actual) - assert actual == desired - - # trivial splitting - _test_combo('a b=c d=f', ['a', 'b=c', 'd=f' ]) - - # mixed quotes - _test_combo('a b=\'c\' d="e" f=\'g\'', ['a', "b='c'", 'd="e"', "f='g'" ]) - - # with spaces - # FIXME: this fails, commenting out only for now - # _test_combo('a "\'one two three\'"', ['a', "'one two three'" ]) - - # TODO: ... - # jinja2 preservation - _test_combo('a {{ y }} z', ['a', '{{ y }}', 'z' ]) - - # jinja2 preservation with spaces and filters and other hard things - _test_combo( - 'a {{ x | filter(\'moo\', \'param\') }} z {{ chicken }} "waffles"', - ['a', "{{ x | filter('moo', 'param') }}", 'z', '{{ chicken }}', '"waffles"'] - ) - - # invalid quote detection - self.assertRaises(Exception, split_args, 'hey I started a quote"') - self.assertRaises(Exception, split_args, 'hey I started a\' quote') - - # jinja2 loop blocks with lots of complexity - _test_combo( - # in memory of neighbors cat - # we preserve line breaks unless a line continuation character precedes them - 'a {% if x %} y {%else %} {{meow}} {% endif %} "cookie\nchip" \\\ndone\nand done', - ['a', '{% if x %}', 'y', '{%else %}', '{{meow}}', '{% endif %}', '"cookie\nchip"', 'done\n', 'and', 'done'] - ) - - # test space preservation within quotes - _test_combo( - 'content="1 2 3 4 " foo=bar', - ['content="1 2 3 4 "', 'foo=bar'] - ) - - # invalid jinja2 nesting detection - # invalid quote nesting detection - - def test_clean_data(self): - # clean data removes jinja2 tags from data - self.assertEqual( - ansible.utils._clean_data('this is a normal string', from_remote=True), - 'this is a normal string' - ) - self.assertEqual( - ansible.utils._clean_data('this string has a {{variable}}', from_remote=True), - 'this string has a {#variable#}' - ) - self.assertEqual( - ansible.utils._clean_data('this string {{has}} two {{variables}} in it', from_remote=True), - 'this string {#has#} two {#variables#} in it' - ) - self.assertEqual( - ansible.utils._clean_data('this string has a {{variable with a\nnewline}}', from_remote=True), - 'this string has a {#variable with a\nnewline#}' - ) - self.assertEqual( - ansible.utils._clean_data('this string is from inventory {{variable}}', from_inventory=True), - 'this string is from inventory {{variable}}' - ) - self.assertEqual( - ansible.utils._clean_data('this string is from inventory too but uses lookup {{lookup("foo","bar")}}', from_inventory=True), - 'this string is from inventory too but uses lookup {#lookup("foo","bar")#}' - ) - self.assertEqual( - ansible.utils._clean_data('this string has JSON in it: {"foo":{"bar":{"baz":"oops"}}}', from_remote=True), - 'this string has JSON in it: {"foo":{"bar":{"baz":"oops"}}}' - ) - self.assertEqual( - ansible.utils._clean_data('this string contains unicode: ¢ £ ¤ ¥', from_remote=True), - 'this string contains unicode: ¢ £ ¤ ¥' - ) - - - def test_censor_unlogged_data(self): - ''' used by the no_log attribute ''' - input = dict( - password='sekrit', - rc=12, - failed=True, - changed=False, - skipped=True, - msg='moo', - ) - data = ansible.utils.censor_unlogged_data(input) - assert 'password' not in data - assert 'rc' in data - assert 'failed' in data - assert 'changed' in data - assert 'skipped' in data - assert 'msg' not in data - assert data['censored'] == 'results hidden due to no_log parameter' - - def test_repo_url_to_role_name(self): - tests = [("http://git.example.com/repos/repo.git", "repo"), - ("ssh://git@git.example.com:repos/role-name", "role-name"), - ("ssh://git@git.example.com:repos/role-name,v0.1", "role-name"), - ("directory/role/is/installed/in", "directory/role/is/installed/in")] - for (url, result) in tests: - self.assertEqual(ansible.utils.repo_url_to_role_name(url), result) - - def test_role_spec_parse(self): - tests = [ - ( - "git+http://git.example.com/repos/repo.git,v1.0", - { - 'scm': 'git', - 'src': 'http://git.example.com/repos/repo.git', - 'version': 'v1.0', - 'name': 'repo' - } - ), - ( - "http://repo.example.com/download/tarfile.tar.gz", - { - 'scm': None, - 'src': 'http://repo.example.com/download/tarfile.tar.gz', - 'version': '', - 'name': 'tarfile' - } - ), - ( - "http://repo.example.com/download/tarfile.tar.gz,,nicename", - { - 'scm': None, - 'src': 'http://repo.example.com/download/tarfile.tar.gz', - 'version': '', - 'name': 'nicename' - } - ), - ( - "git+http://git.example.com/repos/repo.git,v1.0,awesome", - { - 'scm': 'git', - 'src': 'http://git.example.com/repos/repo.git', - 'version': 'v1.0', - 'name': 'awesome' - } - ), - ( - # test that http://github URLs are assumed git+http:// unless they end in .tar.gz - "http://github.com/ansible/fakerole/fake", - { - 'scm' : 'git', - 'src' : 'http://github.com/ansible/fakerole/fake', - 'version' : 'master', - 'name' : 'fake' - } - ), - ( - # test that http://github URLs are assumed git+http:// unless they end in .tar.gz - "http://github.com/ansible/fakerole/fake/archive/master.tar.gz", - { - 'scm' : None, - 'src' : 'http://github.com/ansible/fakerole/fake/archive/master.tar.gz', - 'version' : '', - 'name' : 'master' - } - ) - ] - for (spec, result) in tests: - self.assertEqual(ansible.utils.role_spec_parse(spec), result) - - def test_role_yaml_parse(self): - tests = ( - ( - # Old style - { - 'role': 'debops.elasticsearch', - 'name': 'elks' - }, - { - 'role': 'debops.elasticsearch', - 'name': 'elks', - 'scm': None, - 'src': 'debops.elasticsearch', - 'version': '', - } - ), - ( - { - 'role': 'debops.elasticsearch,1.0,elks', - 'my_param': 'foo' - }, - { - 'role': 'debops.elasticsearch,1.0,elks', - 'name': 'elks', - 'scm': None, - 'src': 'debops.elasticsearch', - 'version': '1.0', - 'my_param': 'foo', - } - ), - ( - { - 'role': 'debops.elasticsearch,1.0', - 'my_param': 'foo' - }, - { - 'role': 'debops.elasticsearch,1.0', - 'name': 'debops.elasticsearch', - 'scm': None, - 'src': 'debops.elasticsearch', - 'version': '1.0', - 'my_param': 'foo', - } - ), - # New style - ( - { - 'src': 'debops.elasticsearch', - 'name': 'elks', - 'my_param': 'foo' - }, - { - 'name': 'elks', - 'scm': None, - 'src': 'debops.elasticsearch', - 'version': '', - 'my_param': 'foo' - } - ), - ) - - for (role, result) in tests: - self.assertEqual(ansible.utils.role_yaml_parse(role), result) - - @patch('ansible.utils.plugins.module_finder._get_paths') - def test_find_plugin(self, mock_get_paths): - - tmp_path = tempfile.mkdtemp() - mock_get_paths.return_value = [tmp_path,] - right_module_1 = 'module.py' - right_module_2 = 'module_without_extension' - wrong_module_1 = 'folder' - wrong_module_2 = 'inexistent' - path_right_module_1 = os.path.join(tmp_path, right_module_1) - path_right_module_2 = os.path.join(tmp_path, right_module_2) - path_wrong_module_1 = os.path.join(tmp_path, wrong_module_1) - open(path_right_module_1, 'w').close() - open(path_right_module_2, 'w').close() - os.mkdir(path_wrong_module_1) - - self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(right_module_1), - path_right_module_1) - self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(right_module_2), - path_right_module_2) - self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(wrong_module_1), - None) - self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(wrong_module_2), - None) - - shutil.rmtree(tmp_path) diff --git a/test/units/TestUtilsStringFunctions.py b/test/units/TestUtilsStringFunctions.py deleted file mode 100644 index cccedf280d..0000000000 --- a/test/units/TestUtilsStringFunctions.py +++ /dev/null @@ -1,33 +0,0 @@ -# -*- coding: utf-8 -*- - -import unittest -import os -import os.path -import tempfile -import yaml -import passlib.hash -import string -import StringIO -import copy - -from nose.plugins.skip import SkipTest - -from ansible.utils import string_functions -import ansible.errors -import ansible.constants as C -import ansible.utils.template as template2 - -from ansible import __version__ - -import sys -reload(sys) -sys.setdefaultencoding("utf8") - -class TestUtilsStringFunctions(unittest.TestCase): - def test_isprintable(self): - self.assertFalse(string_functions.isprintable(chr(7))) - self.assertTrue(string_functions.isprintable('hello')) - - def test_count_newlines_from_end(self): - self.assertEqual(string_functions.count_newlines_from_end('foo\n\n\n\n'), 4) - self.assertEqual(string_functions.count_newlines_from_end('\nfoo'), 0) diff --git a/test/units/TestVaultEditor.py b/test/units/TestVaultEditor.py deleted file mode 100644 index cfa5bc13e6..0000000000 --- a/test/units/TestVaultEditor.py +++ /dev/null @@ -1,180 +0,0 @@ -#!/usr/bin/env python - -from unittest import TestCase -import getpass -import os -import shutil -import time -import tempfile -from binascii import unhexlify -from binascii import hexlify -from nose.plugins.skip import SkipTest - -from ansible import errors -from ansible.utils.vault import VaultLib -from ansible.utils.vault import VaultEditor - -# Counter import fails for 2.0.1, requires >= 2.6.1 from pip -try: - from Crypto.Util import Counter - HAS_COUNTER = True -except ImportError: - HAS_COUNTER = False - -# KDF import fails for 2.0.1, requires >= 2.6.1 from pip -try: - from Crypto.Protocol.KDF import PBKDF2 - HAS_PBKDF2 = True -except ImportError: - HAS_PBKDF2 = False - -# AES IMPORTS -try: - from Crypto.Cipher import AES as AES - HAS_AES = True -except ImportError: - HAS_AES = False - -class TestVaultEditor(TestCase): - - def _is_fips(self): - try: - data = open('/proc/sys/crypto/fips_enabled').read().strip() - except: - return False - if data != '1': - return False - return True - - def test_methods_exist(self): - v = VaultEditor(None, None, None) - slots = ['create_file', - 'decrypt_file', - 'edit_file', - 'encrypt_file', - 'rekey_file', - 'read_data', - 'write_data', - 'shuffle_files'] - for slot in slots: - assert hasattr(v, slot), "VaultLib is missing the %s method" % slot - - def test_decrypt_1_0(self): - if self._is_fips(): - raise SkipTest('Vault-1.0 will not function on FIPS enabled systems') - if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2: - raise SkipTest - dirpath = tempfile.mkdtemp() - filename = os.path.join(dirpath, "foo-ansible-1.0.yml") - shutil.rmtree(dirpath) - shutil.copytree("vault_test_data", dirpath) - ve = VaultEditor(None, "ansible", filename) - - # make sure the password functions for the cipher - error_hit = False - try: - ve.decrypt_file() - except errors.AnsibleError, e: - error_hit = True - - # verify decrypted content - f = open(filename, "rb") - fdata = f.read() - f.close() - - shutil.rmtree(dirpath) - assert error_hit == False, "error decrypting 1.0 file" - assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip() - - def test_decrypt_1_1_newline(self): - if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2: - raise SkipTest - dirpath = tempfile.mkdtemp() - filename = os.path.join(dirpath, "foo-ansible-1.1-ansible-newline-ansible.yml") - shutil.rmtree(dirpath) - shutil.copytree("vault_test_data", dirpath) - ve = VaultEditor(None, "ansible\nansible\n", filename) - - # make sure the password functions for the cipher - error_hit = False - try: - ve.decrypt_file() - except errors.AnsibleError, e: - error_hit = True - - # verify decrypted content - f = open(filename, "rb") - fdata = f.read() - f.close() - - shutil.rmtree(dirpath) - assert error_hit == False, "error decrypting 1.1 file with newline in password" - #assert fdata.strip() == "foo", "incorrect decryption of 1.1 file: %s" % fdata.strip() - - - def test_decrypt_1_1(self): - if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2: - raise SkipTest - dirpath = tempfile.mkdtemp() - filename = os.path.join(dirpath, "foo-ansible-1.1.yml") - shutil.rmtree(dirpath) - shutil.copytree("vault_test_data", dirpath) - ve = VaultEditor(None, "ansible", filename) - - # make sure the password functions for the cipher - error_hit = False - try: - ve.decrypt_file() - except errors.AnsibleError, e: - error_hit = True - - # verify decrypted content - f = open(filename, "rb") - fdata = f.read() - f.close() - - shutil.rmtree(dirpath) - assert error_hit == False, "error decrypting 1.1 file" - assert fdata.strip() == "foo", "incorrect decryption of 1.1 file: %s" % fdata.strip() - - - def test_rekey_migration(self): - if self._is_fips(): - raise SkipTest('Vault-1.0 will not function on FIPS enabled systems') - if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2: - raise SkipTest - dirpath = tempfile.mkdtemp() - filename = os.path.join(dirpath, "foo-ansible-1.0.yml") - shutil.rmtree(dirpath) - shutil.copytree("vault_test_data", dirpath) - ve = VaultEditor(None, "ansible", filename) - - # make sure the password functions for the cipher - error_hit = False - try: - ve.rekey_file('ansible2') - except errors.AnsibleError, e: - error_hit = True - - # verify decrypted content - f = open(filename, "rb") - fdata = f.read() - f.close() - - shutil.rmtree(dirpath) - assert error_hit == False, "error rekeying 1.0 file to 1.1" - - # ensure filedata can be decrypted, is 1.1 and is AES256 - vl = VaultLib("ansible2") - dec_data = None - error_hit = False - try: - dec_data = vl.decrypt(fdata) - except errors.AnsibleError, e: - error_hit = True - - assert vl.cipher_name == "AES256", "wrong cipher name set after rekey: %s" % vl.cipher_name - assert error_hit == False, "error decrypting migrated 1.0 file" - assert dec_data.strip() == "foo", "incorrect decryption of rekeyed/migrated file: %s" % dec_data - - diff --git a/test/units/__init__.py b/test/units/__init__.py new file mode 100644 index 0000000000..e7489db6fb --- /dev/null +++ b/test/units/__init__.py @@ -0,0 +1,5 @@ + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + diff --git a/test/units/ansible.cfg b/test/units/ansible.cfg deleted file mode 100644 index dd99b8102d..0000000000 --- a/test/units/ansible.cfg +++ /dev/null @@ -1,3 +0,0 @@ -[defaults] - -test_key = test_value diff --git a/test/units/errors/__init__.py b/test/units/errors/__init__.py new file mode 100644 index 0000000000..20207b272d --- /dev/null +++ b/test/units/errors/__init__.py @@ -0,0 +1,22 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + diff --git a/test/units/errors/test_errors.py b/test/units/errors/test_errors.py new file mode 100644 index 0000000000..3993ea5061 --- /dev/null +++ b/test/units/errors/test_errors.py @@ -0,0 +1,68 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.compat.tests import unittest + +from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject +from ansible.errors import AnsibleError + +from ansible.compat.tests import BUILTINS +from ansible.compat.tests.mock import mock_open, patch + +class TestErrors(unittest.TestCase): + + def setUp(self): + self.message = 'This is the error message' + + self.obj = AnsibleBaseYAMLObject() + + def tearDown(self): + pass + + def test_basic_error(self): + e = AnsibleError(self.message) + self.assertEqual(e.message, 'ERROR! ' + self.message) + self.assertEqual(e.__repr__(), 'ERROR! ' + self.message) + + @patch.object(AnsibleError, '_get_error_lines_from_file') + def test_error_with_object(self, mock_method): + self.obj.ansible_pos = ('foo.yml', 1, 1) + + mock_method.return_value = ('this is line 1\n', '') + e = AnsibleError(self.message, self.obj) + + self.assertEqual(e.message, "ERROR! This is the error message\n\nThe error appears to have been in 'foo.yml': line 1, column 1, but may\nbe elsewhere in the file depending on the exact syntax problem.\n\nThe offending line appears to be:\n\n\nthis is line 1\n^ here\n") + + def test_get_error_lines_from_file(self): + m = mock_open() + m.return_value.readlines.return_value = ['this is line 1\n'] + + with patch('{0}.open'.format(BUILTINS), m): + # this line will be found in the file + self.obj.ansible_pos = ('foo.yml', 1, 1) + e = AnsibleError(self.message, self.obj) + self.assertEqual(e.message, "ERROR! This is the error message\n\nThe error appears to have been in 'foo.yml': line 1, column 1, but may\nbe elsewhere in the file depending on the exact syntax problem.\n\nThe offending line appears to be:\n\n\nthis is line 1\n^ here\n") + + # this line will not be found, as it is out of the index range + self.obj.ansible_pos = ('foo.yml', 2, 1) + e = AnsibleError(self.message, self.obj) + self.assertEqual(e.message, "ERROR! This is the error message\n\nThe error appears to have been in 'foo.yml': line 2, column 1, but may\nbe elsewhere in the file depending on the exact syntax problem.\n\n(specified line no longer in file, maybe it changed?)") + diff --git a/test/units/executor/__init__.py b/test/units/executor/__init__.py new file mode 100644 index 0000000000..785fc45992 --- /dev/null +++ b/test/units/executor/__init__.py @@ -0,0 +1,21 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + diff --git a/test/units/executor/test_play_iterator.py b/test/units/executor/test_play_iterator.py new file mode 100644 index 0000000000..47c0352b25 --- /dev/null +++ b/test/units/executor/test_play_iterator.py @@ -0,0 +1,85 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import patch, MagicMock + +from ansible.errors import AnsibleError, AnsibleParserError +from ansible.executor.play_iterator import PlayIterator +from ansible.playbook import Playbook + +from test.mock.loader import DictDataLoader + +class TestPlayIterator(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_play_iterator(self): + fake_loader = DictDataLoader({ + "test_play.yml": """ + - hosts: all + gather_facts: false + roles: + - test_role + pre_tasks: + - debug: msg="this is a pre_task" + tasks: + - debug: msg="this is a regular task" + post_tasks: + - debug: msg="this is a post_task" + """, + '/etc/ansible/roles/test_role/tasks/main.yml': """ + - debug: msg="this is a role task" + """, + }) + + p = Playbook.load('test_play.yml', loader=fake_loader) + + hosts = [] + for i in range(0, 10): + host = MagicMock() + host.get_name.return_value = 'host%02d' % i + hosts.append(host) + + inventory = MagicMock() + inventory.get_hosts.return_value = hosts + inventory.filter_hosts.return_value = hosts + + itr = PlayIterator(inventory, p._entries[0]) + task = itr.get_next_task_for_host(hosts[0]) + print(task) + self.assertIsNotNone(task) + task = itr.get_next_task_for_host(hosts[0]) + print(task) + self.assertIsNotNone(task) + task = itr.get_next_task_for_host(hosts[0]) + print(task) + self.assertIsNotNone(task) + task = itr.get_next_task_for_host(hosts[0]) + print(task) + self.assertIsNotNone(task) + task = itr.get_next_task_for_host(hosts[0]) + print(task) + self.assertIsNone(task) diff --git a/test/units/inventory_test_data/ansible_hosts b/test/units/inventory_test_data/ansible_hosts deleted file mode 100644 index 94074edc3c..0000000000 --- a/test/units/inventory_test_data/ansible_hosts +++ /dev/null @@ -1,2 +0,0 @@ -[somegroup] -localhost diff --git a/test/units/inventory_test_data/broken.yml b/test/units/inventory_test_data/broken.yml deleted file mode 100644 index 0eccc1ba78..0000000000 --- a/test/units/inventory_test_data/broken.yml +++ /dev/null @@ -1,2 +0,0 @@ -foo: bar - baz: qux diff --git a/test/units/inventory_test_data/common_vars.yml b/test/units/inventory_test_data/common_vars.yml deleted file mode 100644 index c4c09b67f2..0000000000 --- a/test/units/inventory_test_data/common_vars.yml +++ /dev/null @@ -1,4 +0,0 @@ ---- -duck: quack -cow: moo -extguard: " '$favcolor' == 'blue' " diff --git a/test/units/inventory_test_data/complex_hosts b/test/units/inventory_test_data/complex_hosts deleted file mode 100644 index 34935c6330..0000000000 --- a/test/units/inventory_test_data/complex_hosts +++ /dev/null @@ -1,96 +0,0 @@ -# order of groups, children, and vars is not significant -# so this example mixes them up for maximum testing - -[nc:children] -rtp -triangle - -[eastcoast:children] -nc -florida - -[us:children] -eastcoast - -[redundantgroup] -rtp_a - -[redundantgroup2] -rtp_a - -[redundantgroup3:children] -rtp - -[redundantgroup:vars] -rga=1 - -[redundantgroup2:vars] -rgb=2 - -[redundantgroup3:vars] -rgc=3 - -[nc:vars] -b=10000 -c=10001 -d=10002 -e = 10003 - f = 10004 != 10005 - g = " g " - h = ' h ' - i = ' i " - j = " j - k = ['k1', 'k2'] - -[rtp] -rtp_a -rtp_b -rtp_c - -[rtp:vars] -a=1 -b=2 -c=3 - -[triangle] -tri_a -tri_b -tri_c - -[triangle:vars] -a=11 -b=12 -c=13 - -[florida] -orlando -miami - -[florida:vars] -a=100 -b=101 -c=102 - - -[eastcoast:vars] -b=100000 -c=100001 -d=100002 - -[us:vars] -c=1000000 - -[role1] -host[1:2] - -[role2] -host[2:3] - -[role3] -host[1:3:2] - -[role4] -blade-[a:c]-[1:16] -blade-[d:z]-[01:16].example.com -blade-[1:10]-[1:16] -host-e-[10:16].example.net:1234 diff --git a/test/units/inventory_test_data/encrypted.yml b/test/units/inventory_test_data/encrypted.yml deleted file mode 100644 index ca33ab25cb..0000000000 --- a/test/units/inventory_test_data/encrypted.yml +++ /dev/null @@ -1,6 +0,0 @@ -$ANSIBLE_VAULT;1.1;AES256 -33343734386261666161626433386662623039356366656637303939306563376130623138626165 -6436333766346533353463636566313332623130383662340a393835656134633665333861393331 -37666233346464636263636530626332623035633135363732623332313534306438393366323966 -3135306561356164310a343937653834643433343734653137383339323330626437313562306630 -3035 diff --git a/test/units/inventory_test_data/hosts_list.yml b/test/units/inventory_test_data/hosts_list.yml deleted file mode 100644 index 09c5ca7c17..0000000000 --- a/test/units/inventory_test_data/hosts_list.yml +++ /dev/null @@ -1,6 +0,0 @@ -# Test that playbooks support YAML lists of hosts. ---- -- hosts: [host1, host2, host3] - connection: local - tasks: - - action: command true diff --git a/test/units/inventory_test_data/inventory/test_alpha_end_before_beg b/test/units/inventory_test_data/inventory/test_alpha_end_before_beg deleted file mode 100644 index 1b7a478d87..0000000000 --- a/test/units/inventory_test_data/inventory/test_alpha_end_before_beg +++ /dev/null @@ -1,2 +0,0 @@ -[test] -host[Z:T] diff --git a/test/units/inventory_test_data/inventory/test_combined_range b/test/units/inventory_test_data/inventory/test_combined_range deleted file mode 100644 index cbcb41753e..0000000000 --- a/test/units/inventory_test_data/inventory/test_combined_range +++ /dev/null @@ -1,2 +0,0 @@ -[test] -host[1:2][A:B] diff --git a/test/units/inventory_test_data/inventory/test_incorrect_format b/test/units/inventory_test_data/inventory/test_incorrect_format deleted file mode 100644 index 339bd59edf..0000000000 --- a/test/units/inventory_test_data/inventory/test_incorrect_format +++ /dev/null @@ -1,2 +0,0 @@ -[test] -host[001:10] diff --git a/test/units/inventory_test_data/inventory/test_incorrect_range b/test/units/inventory_test_data/inventory/test_incorrect_range deleted file mode 100644 index 272ca7be71..0000000000 --- a/test/units/inventory_test_data/inventory/test_incorrect_range +++ /dev/null @@ -1,2 +0,0 @@ -[test] -host[1:2:3:4] diff --git a/test/units/inventory_test_data/inventory/test_leading_range b/test/units/inventory_test_data/inventory/test_leading_range deleted file mode 100644 index bf390de42a..0000000000 --- a/test/units/inventory_test_data/inventory/test_leading_range +++ /dev/null @@ -1,6 +0,0 @@ -[test] -[1:2].host -[A:B].host - -[test2] # comment -[1:3].host diff --git a/test/units/inventory_test_data/inventory/test_missing_end b/test/units/inventory_test_data/inventory/test_missing_end deleted file mode 100644 index ff32042402..0000000000 --- a/test/units/inventory_test_data/inventory/test_missing_end +++ /dev/null @@ -1,2 +0,0 @@ -[test] -host[1:] diff --git a/test/units/inventory_test_data/inventory_api.py b/test/units/inventory_test_data/inventory_api.py deleted file mode 100644 index 9bdca22ed3..0000000000 --- a/test/units/inventory_test_data/inventory_api.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env python - -import json -import sys - -from optparse import OptionParser - -parser = OptionParser() -parser.add_option('-l', '--list', default=False, dest="list_hosts", action="store_true") -parser.add_option('-H', '--host', default=None, dest="host") -parser.add_option('-e', '--extra-vars', default=None, dest="extra") - -options, args = parser.parse_args() - -systems = { - "ungrouped": [ "jupiter", "saturn" ], - "greek": [ "zeus", "hera", "poseidon" ], - "norse": [ "thor", "odin", "loki" ], - "major-god": [ "zeus", "odin" ], -} - -variables = { - "thor": { - "hammer": True - }, - "zeus": {}, -} - -if options.list_hosts == True: - print json.dumps(systems) - sys.exit(0) - -if options.host is not None: - if options.extra: - k,v = options.extra.split("=") - variables[options.host][k] = v - if options.host in variables: - print json.dumps(variables[options.host]) - else: - print "{}" - sys.exit(0) - -parser.print_help() -sys.exit(1) diff --git a/test/units/inventory_test_data/inventory_dir/0hosts b/test/units/inventory_test_data/inventory_dir/0hosts deleted file mode 100644 index 6f78a33a22..0000000000 --- a/test/units/inventory_test_data/inventory_dir/0hosts +++ /dev/null @@ -1,3 +0,0 @@ -zeus var_a=0 -morpheus -thor diff --git a/test/units/inventory_test_data/inventory_dir/1mythology b/test/units/inventory_test_data/inventory_dir/1mythology deleted file mode 100644 index 43fa181bd5..0000000000 --- a/test/units/inventory_test_data/inventory_dir/1mythology +++ /dev/null @@ -1,6 +0,0 @@ -[greek] -zeus -morpheus - -[norse] -thor diff --git a/test/units/inventory_test_data/inventory_dir/2levels b/test/units/inventory_test_data/inventory_dir/2levels deleted file mode 100644 index 363294923e..0000000000 --- a/test/units/inventory_test_data/inventory_dir/2levels +++ /dev/null @@ -1,6 +0,0 @@ -[major-god] -zeus var_a=2 -thor - -[minor-god] -morpheus diff --git a/test/units/inventory_test_data/inventory_dir/3comments b/test/units/inventory_test_data/inventory_dir/3comments deleted file mode 100644 index e11b5e416b..0000000000 --- a/test/units/inventory_test_data/inventory_dir/3comments +++ /dev/null @@ -1,8 +0,0 @@ -[major-god] # group with inline comments -zeus var_a="3\#4" # host with inline comments and "#" in the var string -# A comment -thor - -[minor-god] # group with inline comment and unbalanced quotes: ' " -morpheus # host with inline comments and unbalanced quotes: ' " -# A comment with unbalanced quotes: ' " diff --git a/test/units/inventory_test_data/inventory_dir/4skip_extensions.ini b/test/units/inventory_test_data/inventory_dir/4skip_extensions.ini deleted file mode 100644 index a30afe5fcc..0000000000 --- a/test/units/inventory_test_data/inventory_dir/4skip_extensions.ini +++ /dev/null @@ -1,2 +0,0 @@ -[skip] -skipme
\ No newline at end of file diff --git a/test/units/inventory_test_data/large_range b/test/units/inventory_test_data/large_range deleted file mode 100644 index 18cfc22078..0000000000 --- a/test/units/inventory_test_data/large_range +++ /dev/null @@ -1 +0,0 @@ -bob[000:142] diff --git a/test/units/inventory_test_data/restrict_pattern b/test/units/inventory_test_data/restrict_pattern deleted file mode 100644 index fb16b4dda5..0000000000 --- a/test/units/inventory_test_data/restrict_pattern +++ /dev/null @@ -1,2 +0,0 @@ -odin -thor diff --git a/test/units/inventory_test_data/simple_hosts b/test/units/inventory_test_data/simple_hosts deleted file mode 100644 index 4625b3dbab..0000000000 --- a/test/units/inventory_test_data/simple_hosts +++ /dev/null @@ -1,22 +0,0 @@ -jupiter -saturn -thrudgelmir[:5] - -[greek] -zeus -hera:3000 -poseidon -cerberus[001:003] -cottus[99:100] - -[norse] -thor -odin -loki - -[egyptian] -Hotep-[a:c] -Bast[C:D] - -[auth] -neptun auth="YWRtaW46YWRtaW4=" diff --git a/test/units/mock/__init__.py b/test/units/mock/__init__.py new file mode 100644 index 0000000000..ae8ccff595 --- /dev/null +++ b/test/units/mock/__init__.py @@ -0,0 +1,20 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type diff --git a/test/units/mock/loader.py b/test/units/mock/loader.py new file mode 100644 index 0000000000..cf9d7ea72d --- /dev/null +++ b/test/units/mock/loader.py @@ -0,0 +1,83 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os + +from ansible.parsing import DataLoader + +class DictDataLoader(DataLoader): + + def __init__(self, file_mapping=dict()): + assert type(file_mapping) == dict + + self._file_mapping = file_mapping + self._build_known_directories() + + super(DictDataLoader, self).__init__() + + def load_from_file(self, path): + if path in self._file_mapping: + return self.load(self._file_mapping[path], path) + return None + + def path_exists(self, path): + return path in self._file_mapping or path in self._known_directories + + def is_file(self, path): + return path in self._file_mapping + + def is_directory(self, path): + return path in self._known_directories + + def list_directory(self, path): + return [x for x in self._known_directories] + + def _add_known_directory(self, directory): + if directory not in self._known_directories: + self._known_directories.append(directory) + + def _build_known_directories(self): + self._known_directories = [] + for path in self._file_mapping: + dirname = os.path.dirname(path) + while dirname not in ('/', ''): + self._add_known_directory(dirname) + dirname = os.path.dirname(dirname) + + def push(self, path, content): + rebuild_dirs = False + if path not in self._file_mapping: + rebuild_dirs = True + + self._file_mapping[path] = content + + if rebuild_dirs: + self._build_known_directories() + + def pop(self, path): + if path in self._file_mapping: + del self._file_mapping[path] + self._build_known_directories() + + def clear(self): + self._file_mapping = dict() + self._known_directories = [] + diff --git a/test/units/module_tests/TestApt.py b/test/units/module_tests/TestApt.py deleted file mode 100644 index e7f2dafc95..0000000000 --- a/test/units/module_tests/TestApt.py +++ /dev/null @@ -1,42 +0,0 @@ -import collections -import mock -import os -import unittest - -from ansible.modules.core.packaging.os.apt import ( - expand_pkgspec_from_fnmatches, -) - - -class AptExpandPkgspecTestCase(unittest.TestCase): - - def setUp(self): - FakePackage = collections.namedtuple("Package", ("name",)) - self.fake_cache = [ FakePackage("apt"), - FakePackage("apt-utils"), - FakePackage("not-selected"), - ] - - def test_trivial(self): - foo = ["apt"] - self.assertEqual( - expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo) - - def test_version_wildcard(self): - foo = ["apt=1.0*"] - self.assertEqual( - expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo) - - def test_pkgname_wildcard_version_wildcard(self): - foo = ["apt*=1.0*"] - m_mock = mock.Mock() - self.assertEqual( - expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache), - ['apt', 'apt-utils']) - - def test_pkgname_expands(self): - foo = ["apt*"] - m_mock = mock.Mock() - self.assertEqual( - expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache), - ["apt", "apt-utils"]) diff --git a/test/units/module_tests/TestDocker.py b/test/units/module_tests/TestDocker.py deleted file mode 100644 index b8c8cf1e23..0000000000 --- a/test/units/module_tests/TestDocker.py +++ /dev/null @@ -1,19 +0,0 @@ -import collections -import os -import unittest - -from ansible.modules.core.cloud.docker.docker import get_split_image_tag - -class DockerSplitImageTagTestCase(unittest.TestCase): - - def test_trivial(self): - self.assertEqual(get_split_image_tag('test'), ('test', 'latest')) - - def test_with_org_name(self): - self.assertEqual(get_split_image_tag('ansible/centos7-ansible'), ('ansible/centos7-ansible', 'latest')) - - def test_with_tag(self): - self.assertEqual(get_split_image_tag('test:devel'), ('test', 'devel')) - - def test_with_tag_and_org_name(self): - self.assertEqual(get_split_image_tag('ansible/centos7-ansible:devel'), ('ansible/centos7-ansible', 'devel')) diff --git a/test/units/parsing/__init__.py b/test/units/parsing/__init__.py new file mode 100644 index 0000000000..785fc45992 --- /dev/null +++ b/test/units/parsing/__init__.py @@ -0,0 +1,21 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + diff --git a/test/units/parsing/test_data_loader.py b/test/units/parsing/test_data_loader.py new file mode 100644 index 0000000000..b9c37cdd0c --- /dev/null +++ b/test/units/parsing/test_data_loader.py @@ -0,0 +1,90 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from six import PY2 +from yaml.scanner import ScannerError + +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import patch, mock_open +from ansible.errors import AnsibleParserError + +from ansible.parsing import DataLoader +from ansible.parsing.yaml.objects import AnsibleMapping + +class TestDataLoader(unittest.TestCase): + + def setUp(self): + # FIXME: need to add tests that utilize vault_password + self._loader = DataLoader() + + def tearDown(self): + pass + + @patch.object(DataLoader, '_get_file_contents') + def test_parse_json_from_file(self, mock_def): + mock_def.return_value = ("""{"a": 1, "b": 2, "c": 3}""", True) + output = self._loader.load_from_file('dummy_json.txt') + self.assertEqual(output, dict(a=1,b=2,c=3)) + + @patch.object(DataLoader, '_get_file_contents') + def test_parse_yaml_from_file(self, mock_def): + mock_def.return_value = (""" + a: 1 + b: 2 + c: 3 + """, True) + output = self._loader.load_from_file('dummy_yaml.txt') + self.assertEqual(output, dict(a=1,b=2,c=3)) + + @patch.object(DataLoader, '_get_file_contents') + def test_parse_fail_from_file(self, mock_def): + mock_def.return_value = (""" + TEXT: + *** + NOT VALID + """, True) + self.assertRaises(AnsibleParserError, self._loader.load_from_file, 'dummy_yaml_bad.txt') + +class TestDataLoaderWithVault(unittest.TestCase): + + def setUp(self): + self._loader = DataLoader(vault_password='ansible') + + def tearDown(self): + pass + + @patch.multiple(DataLoader, path_exists=lambda s, x: True, is_file=lambda s, x: True) + def test_parse_from_vault_1_1_file(self): + vaulted_data = """$ANSIBLE_VAULT;1.1;AES256 +33343734386261666161626433386662623039356366656637303939306563376130623138626165 +6436333766346533353463636566313332623130383662340a393835656134633665333861393331 +37666233346464636263636530626332623035633135363732623332313534306438393366323966 +3135306561356164310a343937653834643433343734653137383339323330626437313562306630 +3035 +""" + if PY2: + builtins_name = '__builtin__' + else: + builtins_name = 'builtins' + + with patch(builtins_name + '.open', mock_open(read_data=vaulted_data)): + output = self._loader.load_from_file('dummy_vault.txt') + self.assertEqual(output, dict(foo='bar')) diff --git a/test/units/parsing/test_mod_args.py b/test/units/parsing/test_mod_args.py new file mode 100644 index 0000000000..187edfa03c --- /dev/null +++ b/test/units/parsing/test_mod_args.py @@ -0,0 +1,130 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.parsing.mod_args import ModuleArgsParser +from ansible.errors import AnsibleParserError + +from ansible.compat.tests import unittest + +class TestModArgsDwim(unittest.TestCase): + + # TODO: add tests that construct ModuleArgsParser with a task reference + # TODO: verify the AnsibleError raised on failure knows the task + # and the task knows the line numbers + + def setUp(self): + pass + + def _debug(self, mod, args, to): + print("RETURNED module = {0}".format(mod)) + print(" args = {0}".format(args)) + print(" to = {0}".format(to)) + + def tearDown(self): + pass + + def test_basic_shell(self): + m = ModuleArgsParser(dict(shell='echo hi')) + mod, args, to = m.parse() + self._debug(mod, args, to) + self.assertEqual(mod, 'command') + self.assertEqual(args, dict( + _raw_params = 'echo hi', + _uses_shell = True, + )) + self.assertIsNone(to) + + def test_basic_command(self): + m = ModuleArgsParser(dict(command='echo hi')) + mod, args, to = m.parse() + self._debug(mod, args, to) + self.assertEqual(mod, 'command') + self.assertEqual(args, dict( + _raw_params = 'echo hi', + )) + self.assertIsNone(to) + + def test_shell_with_modifiers(self): + m = ModuleArgsParser(dict(shell='/bin/foo creates=/tmp/baz removes=/tmp/bleep')) + mod, args, to = m.parse() + self._debug(mod, args, to) + self.assertEqual(mod, 'command') + self.assertEqual(args, dict( + creates = '/tmp/baz', + removes = '/tmp/bleep', + _raw_params = '/bin/foo', + _uses_shell = True, + )) + self.assertIsNone(to) + + def test_normal_usage(self): + m = ModuleArgsParser(dict(copy='src=a dest=b')) + mod, args, to = m.parse() + self._debug(mod, args, to) + self.assertEqual(mod, 'copy') + self.assertEqual(args, dict(src='a', dest='b')) + self.assertIsNone(to) + + def test_complex_args(self): + m = ModuleArgsParser(dict(copy=dict(src='a', dest='b'))) + mod, args, to = m.parse() + self._debug(mod, args, to) + self.assertEqual(mod, 'copy') + self.assertEqual(args, dict(src='a', dest='b')) + self.assertIsNone(to) + + def test_action_with_complex(self): + m = ModuleArgsParser(dict(action=dict(module='copy', src='a', dest='b'))) + mod, args, to = m.parse() + self._debug(mod, args, to) + self.assertEqual(mod, 'copy') + self.assertEqual(args, dict(src='a', dest='b')) + self.assertIsNone(to) + + def test_action_with_complex_and_complex_args(self): + m = ModuleArgsParser(dict(action=dict(module='copy', args=dict(src='a', dest='b')))) + mod, args, to = m.parse() + self._debug(mod, args, to) + self.assertEqual(mod, 'copy') + self.assertEqual(args, dict(src='a', dest='b')) + self.assertIsNone(to) + + def test_local_action_string(self): + m = ModuleArgsParser(dict(local_action='copy src=a dest=b')) + mod, args, to = m.parse() + self._debug(mod, args, to) + self.assertEqual(mod, 'copy') + self.assertEqual(args, dict(src='a', dest='b')) + self.assertIs(to, 'localhost') + + def test_multiple_actions(self): + m = ModuleArgsParser(dict(action='shell echo hi', local_action='shell echo hi')) + self.assertRaises(AnsibleParserError, m.parse) + + m = ModuleArgsParser(dict(action='shell echo hi', shell='echo hi')) + self.assertRaises(AnsibleParserError, m.parse) + + m = ModuleArgsParser(dict(local_action='shell echo hi', shell='echo hi')) + self.assertRaises(AnsibleParserError, m.parse) + + m = ModuleArgsParser(dict(ping='data=hi', shell='echo hi')) + self.assertRaises(AnsibleParserError, m.parse) + diff --git a/test/units/parsing/test_splitter.py b/test/units/parsing/test_splitter.py new file mode 100644 index 0000000000..1f648c8f6a --- /dev/null +++ b/test/units/parsing/test_splitter.py @@ -0,0 +1,112 @@ +# coding: utf-8 +# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from nose import tools +from ansible.compat.tests import unittest + +from ansible.parsing.splitter import split_args, parse_kv + + +# Tests using nose's test generators cannot use unittest base class. +# http://nose.readthedocs.org/en/latest/writing_tests.html#test-generators +class TestSplitter_Gen: + SPLIT_DATA = ( + (u'a', + [u'a'], + {u'_raw_params': u'a'}), + (u'a=b', + [u'a=b'], + {u'a': u'b'}), + (u'a="foo bar"', + [u'a="foo bar"'], + {u'a': u'foo bar'}), + (u'"foo bar baz"', + [u'"foo bar baz"'], + {u'_raw_params': '"foo bar baz"'}), + (u'foo bar baz', + [u'foo', u'bar', u'baz'], + {u'_raw_params': u'foo bar baz'}), + (u'a=b c="foo bar"', + [u'a=b', u'c="foo bar"'], + {u'a': u'b', u'c': u'foo bar'}), + (u'a="echo \\"hello world\\"" b=bar', + [u'a="echo \\"hello world\\""', u'b=bar'], + {u'a': u'echo "hello world"', u'b': u'bar'}), + (u'a="multi\nline"', + [u'a="multi\nline"'], + {u'a': u'multi\nline'}), + (u'a="blank\n\nline"', + [u'a="blank\n\nline"'], + {u'a': u'blank\n\nline'}), + (u'a="blank\n\n\nlines"', + [u'a="blank\n\n\nlines"'], + {u'a': u'blank\n\n\nlines'}), + (u'a="a long\nmessage\\\nabout a thing\n"', + [u'a="a long\nmessage\\\nabout a thing\n"'], + {u'a': u'a long\nmessage\\\nabout a thing\n'}), + (u'a="multiline\nmessage1\\\n" b="multiline\nmessage2\\\n"', + [u'a="multiline\nmessage1\\\n"', u'b="multiline\nmessage2\\\n"'], + {u'a': 'multiline\nmessage1\\\n', u'b': u'multiline\nmessage2\\\n'}), + (u'a={{jinja}}', + [u'a={{jinja}}'], + {u'a': u'{{jinja}}'}), + (u'a={{ jinja }}', + [u'a={{ jinja }}'], + {u'a': u'{{ jinja }}'}), + (u'a="{{jinja}}"', + [u'a="{{jinja}}"'], + {u'a': u'{{jinja}}'}), + (u'a={{ jinja }}{{jinja2}}', + [u'a={{ jinja }}{{jinja2}}'], + {u'a': u'{{ jinja }}{{jinja2}}'}), + (u'a="{{ jinja }}{{jinja2}}"', + [u'a="{{ jinja }}{{jinja2}}"'], + {u'a': u'{{ jinja }}{{jinja2}}'}), + (u'a={{jinja}} b={{jinja2}}', + [u'a={{jinja}}', u'b={{jinja2}}'], + {u'a': u'{{jinja}}', u'b': u'{{jinja2}}'}), + (u'a="{{jinja}}\n" b="{{jinja2}}\n"', + [u'a="{{jinja}}\n"', u'b="{{jinja2}}\n"'], + {u'a': u'{{jinja}}\n', u'b': u'{{jinja2}}\n'}), + (u'a="café eñyei"', + [u'a="café eñyei"'], + {u'a': u'café eñyei'}), + (u'a=café b=eñyei', + [u'a=café', u'b=eñyei'], + {u'a': u'café', u'b': u'eñyei'}), + ) + + def check_split_args(self, args, expected): + tools.eq_(split_args(args), expected) + + def test_split_args(self): + for datapoint in self.SPLIT_DATA: + yield self.check_split_args, datapoint[0], datapoint[1] + + def check_parse_kv(self, args, expected): + tools.eq_(parse_kv(args), expected) + + def test_parse_kv(self): + for datapoint in self.SPLIT_DATA: + try: + yield self.check_parse_kv, datapoint[0], datapoint[2] + except: pass diff --git a/test/units/parsing/vault/__init__.py b/test/units/parsing/vault/__init__.py new file mode 100644 index 0000000000..785fc45992 --- /dev/null +++ b/test/units/parsing/vault/__init__.py @@ -0,0 +1,21 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + diff --git a/test/units/TestVault.py b/test/units/parsing/vault/test_vault.py index b720d72e84..2aaac27fc7 100644 --- a/test/units/TestVault.py +++ b/test/units/parsing/vault/test_vault.py @@ -1,17 +1,40 @@ -#!/usr/bin/env python +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type -from unittest import TestCase import getpass import os import shutil import time import tempfile +import six + from binascii import unhexlify from binascii import hexlify from nose.plugins.skip import SkipTest +from ansible.compat.tests import unittest +from ansible.utils.unicode import to_bytes, to_unicode + from ansible import errors -from ansible.utils.vault import VaultLib +from ansible.parsing.vault import VaultLib # Counter import fails for 2.0.1, requires >= 2.6.1 from pip try: @@ -34,16 +57,7 @@ try: except ImportError: HAS_AES = False -class TestVaultLib(TestCase): - - def _is_fips(self): - try: - data = open('/proc/sys/crypto/fips_enabled').read().strip() - except: - return False - if data != '1': - return False - return True +class TestVaultLib(unittest.TestCase): def test_methods_exist(self): v = VaultLib('ansible') @@ -52,13 +66,13 @@ class TestVaultLib(TestCase): 'decrypt', '_add_header', '_split_header',] - for slot in slots: + for slot in slots: assert hasattr(v, slot), "VaultLib is missing the %s method" % slot def test_is_encrypted(self): v = VaultLib(None) - assert not v.is_encrypted("foobar"), "encryption check on plaintext failed" - data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify("ansible") + assert not v.is_encrypted(u"foobar"), "encryption check on plaintext failed" + data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible") assert v.is_encrypted(data), "encryption check on headered text failed" def test_add_header(self): @@ -66,32 +80,30 @@ class TestVaultLib(TestCase): v.cipher_name = "TEST" sensitive_data = "ansible" data = v._add_header(sensitive_data) - lines = data.split('\n') + lines = data.split(b'\n') assert len(lines) > 1, "failed to properly add header" - header = lines[0] + header = to_unicode(lines[0]) assert header.endswith(';TEST'), "header does end with cipher name" header_parts = header.split(';') - assert len(header_parts) == 3, "header has the wrong number of parts" + assert len(header_parts) == 3, "header has the wrong number of parts" assert header_parts[0] == '$ANSIBLE_VAULT', "header does not start with $ANSIBLE_VAULT" assert header_parts[1] == v.version, "header version is incorrect" assert header_parts[2] == 'TEST', "header does end with cipher name" def test_split_header(self): v = VaultLib('ansible') - data = "$ANSIBLE_VAULT;9.9;TEST\nansible" - rdata = v._split_header(data) - lines = rdata.split('\n') - assert lines[0] == "ansible" + data = b"$ANSIBLE_VAULT;9.9;TEST\nansible" + rdata = v._split_header(data) + lines = rdata.split(b'\n') + assert lines[0] == b"ansible" assert v.cipher_name == 'TEST', "cipher name was not set" assert v.version == "9.9" def test_encrypt_decrypt_aes(self): - if self._is_fips(): - raise SkipTest('MD5 not available on FIPS enabled systems') if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2: raise SkipTest v = VaultLib('ansible') - v.cipher_name = 'AES' + v.cipher_name = u'AES' enc_data = v.encrypt("foobar") dec_data = v.decrypt(enc_data) assert enc_data != "foobar", "encryption failed" @@ -105,20 +117,20 @@ class TestVaultLib(TestCase): enc_data = v.encrypt("foobar") dec_data = v.decrypt(enc_data) assert enc_data != "foobar", "encryption failed" - assert dec_data == "foobar", "decryption failed" + assert dec_data == "foobar", "decryption failed" def test_encrypt_encrypted(self): if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2: raise SkipTest v = VaultLib('ansible') v.cipher_name = 'AES' - data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify("ansible") + data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(six.b("ansible")) error_hit = False try: enc_data = v.encrypt(data) - except errors.AnsibleError, e: + except errors.AnsibleError as e: error_hit = True - assert error_hit, "No error was thrown when trying to encrypt data with a header" + assert error_hit, "No error was thrown when trying to encrypt data with a header" def test_decrypt_decrypted(self): if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2: @@ -128,9 +140,9 @@ class TestVaultLib(TestCase): error_hit = False try: dec_data = v.decrypt(data) - except errors.AnsibleError, e: + except errors.AnsibleError as e: error_hit = True - assert error_hit, "No error was thrown when trying to decrypt data without a header" + assert error_hit, "No error was thrown when trying to decrypt data without a header" def test_cipher_not_set(self): # not setting the cipher should default to AES256 @@ -141,7 +153,7 @@ class TestVaultLib(TestCase): error_hit = False try: enc_data = v.encrypt(data) - except errors.AnsibleError, e: + except errors.AnsibleError as e: error_hit = True - assert not error_hit, "An error was thrown when trying to encrypt data without the cipher set" - assert v.cipher_name == "AES256", "cipher name is not set to AES256: %s" % v.cipher_name + assert not error_hit, "An error was thrown when trying to encrypt data without the cipher set" + assert v.cipher_name == "AES256", "cipher name is not set to AES256: %s" % v.cipher_name diff --git a/test/units/parsing/vault/test_vault_editor.py b/test/units/parsing/vault/test_vault_editor.py new file mode 100644 index 0000000000..2ddf3de27a --- /dev/null +++ b/test/units/parsing/vault/test_vault_editor.py @@ -0,0 +1,214 @@ +# (c) 2014, James Tanner <tanner.jc@gmail.com> +# (c) 2014, James Cammarata, <jcammarata@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type +#!/usr/bin/env python + +import sys +import getpass +import os +import shutil +import time +import tempfile +from binascii import unhexlify +from binascii import hexlify +from nose.plugins.skip import SkipTest + +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import patch +from ansible.utils.unicode import to_bytes, to_unicode + +from ansible import errors +from ansible.parsing.vault import VaultLib +from ansible.parsing.vault import VaultEditor + +# Counter import fails for 2.0.1, requires >= 2.6.1 from pip +try: + from Crypto.Util import Counter + HAS_COUNTER = True +except ImportError: + HAS_COUNTER = False + +# KDF import fails for 2.0.1, requires >= 2.6.1 from pip +try: + from Crypto.Protocol.KDF import PBKDF2 + HAS_PBKDF2 = True +except ImportError: + HAS_PBKDF2 = False + +# AES IMPORTS +try: + from Crypto.Cipher import AES as AES + HAS_AES = True +except ImportError: + HAS_AES = False + +v10_data = """$ANSIBLE_VAULT;1.0;AES +53616c7465645f5fd0026926a2d415a28a2622116273fbc90e377225c12a347e1daf4456d36a77f9 +9ad98d59f61d06a4b66718d855f16fb7bdfe54d1ec8aeaa4d06c2dc1fa630ae1846a029877f0eeb1 +83c62ffb04c2512995e815de4b4d29ed""" + +v11_data = """$ANSIBLE_VAULT;1.1;AES256 +62303130653266653331306264616235333735323636616539316433666463323964623162386137 +3961616263373033353631316333623566303532663065310a393036623466376263393961326530 +64336561613965383835646464623865663966323464653236343638373165343863623638316664 +3631633031323837340a396530313963373030343933616133393566366137363761373930663833 +3739""" + +class TestVaultEditor(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_methods_exist(self): + v = VaultEditor(None, None, None) + slots = ['create_file', + 'decrypt_file', + 'edit_file', + 'encrypt_file', + 'rekey_file', + 'read_data', + 'write_data', + 'shuffle_files'] + for slot in slots: + assert hasattr(v, slot), "VaultLib is missing the %s method" % slot + + @patch.object(VaultEditor, '_editor_shell_command') + def test_create_file(self, mock_editor_shell_command): + + def sc_side_effect(filename): + return ['touch', filename] + mock_editor_shell_command.side_effect = sc_side_effect + + tmp_file = tempfile.NamedTemporaryFile() + os.unlink(tmp_file.name) + + ve = VaultEditor(None, "ansible", tmp_file.name) + ve.create_file() + + self.assertTrue(os.path.exists(tmp_file.name)) + + def test_decrypt_1_0(self): + """ + Skip testing decrypting 1.0 files if we don't have access to AES, KDF or + Counter, or we are running on python3 since VaultAES hasn't been backported. + """ + if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2 or sys.version > '3': + raise SkipTest + + v10_file = tempfile.NamedTemporaryFile(delete=False) + with v10_file as f: + f.write(to_bytes(v10_data)) + + ve = VaultEditor(None, "ansible", v10_file.name) + + # make sure the password functions for the cipher + error_hit = False + try: + ve.decrypt_file() + except errors.AnsibleError as e: + error_hit = True + + # verify decrypted content + f = open(v10_file.name, "rb") + fdata = to_unicode(f.read()) + f.close() + + os.unlink(v10_file.name) + + assert error_hit == False, "error decrypting 1.0 file" + assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip() + + + def test_decrypt_1_1(self): + if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2: + raise SkipTest + + v11_file = tempfile.NamedTemporaryFile(delete=False) + with v11_file as f: + f.write(to_bytes(v11_data)) + + ve = VaultEditor(None, "ansible", v11_file.name) + + # make sure the password functions for the cipher + error_hit = False + try: + ve.decrypt_file() + except errors.AnsibleError as e: + error_hit = True + + # verify decrypted content + f = open(v11_file.name, "rb") + fdata = to_unicode(f.read()) + f.close() + + os.unlink(v11_file.name) + + assert error_hit == False, "error decrypting 1.0 file" + assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip() + + + def test_rekey_migration(self): + """ + Skip testing rekeying files if we don't have access to AES, KDF or + Counter, or we are running on python3 since VaultAES hasn't been backported. + """ + if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2 or sys.version > '3': + raise SkipTest + + v10_file = tempfile.NamedTemporaryFile(delete=False) + with v10_file as f: + f.write(to_bytes(v10_data)) + + ve = VaultEditor(None, "ansible", v10_file.name) + + # make sure the password functions for the cipher + error_hit = False + try: + ve.rekey_file('ansible2') + except errors.AnsibleError as e: + error_hit = True + + # verify decrypted content + f = open(v10_file.name, "rb") + fdata = f.read() + f.close() + + assert error_hit == False, "error rekeying 1.0 file to 1.1" + + # ensure filedata can be decrypted, is 1.1 and is AES256 + vl = VaultLib("ansible2") + dec_data = None + error_hit = False + try: + dec_data = vl.decrypt(fdata) + except errors.AnsibleError as e: + error_hit = True + + os.unlink(v10_file.name) + + assert vl.cipher_name == "AES256", "wrong cipher name set after rekey: %s" % vl.cipher_name + assert error_hit == False, "error decrypting migrated 1.0 file" + assert dec_data.strip() == "foo", "incorrect decryption of rekeyed/migrated file: %s" % dec_data + + diff --git a/test/units/parsing/yaml/__init__.py b/test/units/parsing/yaml/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/test/units/parsing/yaml/__init__.py diff --git a/test/units/parsing/yaml/test_loader.py b/test/units/parsing/yaml/test_loader.py new file mode 100644 index 0000000000..37eeabff83 --- /dev/null +++ b/test/units/parsing/yaml/test_loader.py @@ -0,0 +1,283 @@ +# coding: utf-8 +# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from six import text_type, binary_type +from six.moves import StringIO +from collections import Sequence, Set, Mapping + +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import patch + +from ansible.parsing.yaml.loader import AnsibleLoader + + +class TestAnsibleLoaderBasic(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_parse_number(self): + stream = StringIO(""" + 1 + """) + loader = AnsibleLoader(stream, 'myfile.yml') + data = loader.get_single_data() + self.assertEqual(data, 1) + # No line/column info saved yet + + def test_parse_string(self): + stream = StringIO(""" + Ansible + """) + loader = AnsibleLoader(stream, 'myfile.yml') + data = loader.get_single_data() + self.assertEqual(data, u'Ansible') + self.assertIsInstance(data, text_type) + + self.assertEqual(data.ansible_pos, ('myfile.yml', 2, 17)) + + def test_parse_utf8_string(self): + stream = StringIO(""" + Cafè Eñyei + """) + loader = AnsibleLoader(stream, 'myfile.yml') + data = loader.get_single_data() + self.assertEqual(data, u'Cafè Eñyei') + self.assertIsInstance(data, text_type) + + self.assertEqual(data.ansible_pos, ('myfile.yml', 2, 17)) + + def test_parse_dict(self): + stream = StringIO(""" + webster: daniel + oed: oxford + """) + loader = AnsibleLoader(stream, 'myfile.yml') + data = loader.get_single_data() + self.assertEqual(data, {'webster': 'daniel', 'oed': 'oxford'}) + self.assertEqual(len(data), 2) + self.assertIsInstance(list(data.keys())[0], text_type) + self.assertIsInstance(list(data.values())[0], text_type) + + # Beginning of the first key + self.assertEqual(data.ansible_pos, ('myfile.yml', 2, 17)) + + self.assertEqual(data[u'webster'].ansible_pos, ('myfile.yml', 2, 26)) + self.assertEqual(data[u'oed'].ansible_pos, ('myfile.yml', 3, 22)) + + def test_parse_list(self): + stream = StringIO(""" + - a + - b + """) + loader = AnsibleLoader(stream, 'myfile.yml') + data = loader.get_single_data() + self.assertEqual(data, [u'a', u'b']) + self.assertEqual(len(data), 2) + self.assertIsInstance(data[0], text_type) + + self.assertEqual(data.ansible_pos, ('myfile.yml', 2, 17)) + + self.assertEqual(data[0].ansible_pos, ('myfile.yml', 2, 19)) + self.assertEqual(data[1].ansible_pos, ('myfile.yml', 3, 19)) + + def test_parse_short_dict(self): + stream = StringIO("""{"foo": "bar"}""") + loader = AnsibleLoader(stream, 'myfile.yml') + data = loader.get_single_data() + self.assertEqual(data, dict(foo=u'bar')) + + self.assertEqual(data.ansible_pos, ('myfile.yml', 1, 1)) + self.assertEqual(data[u'foo'].ansible_pos, ('myfile.yml', 1, 9)) + + stream = StringIO("""foo: bar""") + loader = AnsibleLoader(stream, 'myfile.yml') + data = loader.get_single_data() + self.assertEqual(data, dict(foo=u'bar')) + + self.assertEqual(data.ansible_pos, ('myfile.yml', 1, 1)) + self.assertEqual(data[u'foo'].ansible_pos, ('myfile.yml', 1, 6)) + + def test_error_conditions(self): + stream = StringIO("""{""") + loader = AnsibleLoader(stream, 'myfile.yml') + self.assertRaises(loader.get_single_data) + + def test_front_matter(self): + stream = StringIO("""---\nfoo: bar""") + loader = AnsibleLoader(stream, 'myfile.yml') + data = loader.get_single_data() + self.assertEqual(data, dict(foo=u'bar')) + + self.assertEqual(data.ansible_pos, ('myfile.yml', 2, 1)) + self.assertEqual(data[u'foo'].ansible_pos, ('myfile.yml', 2, 6)) + + # Initial indent (See: #6348) + stream = StringIO(""" - foo: bar\n baz: qux""") + loader = AnsibleLoader(stream, 'myfile.yml') + data = loader.get_single_data() + self.assertEqual(data, [{u'foo': u'bar', u'baz': u'qux'}]) + + self.assertEqual(data.ansible_pos, ('myfile.yml', 1, 2)) + self.assertEqual(data[0].ansible_pos, ('myfile.yml', 1, 4)) + self.assertEqual(data[0][u'foo'].ansible_pos, ('myfile.yml', 1, 9)) + self.assertEqual(data[0][u'baz'].ansible_pos, ('myfile.yml', 2, 9)) + + +class TestAnsibleLoaderPlay(unittest.TestCase): + + def setUp(self): + stream = StringIO(""" + - hosts: localhost + vars: + number: 1 + string: Ansible + utf8_string: Cafè Eñyei + dictionary: + webster: daniel + oed: oxford + list: + - a + - b + - 1 + - 2 + tasks: + - name: Test case + ping: + data: "{{ utf8_string }}" + + - name: Test 2 + ping: + data: "Cafè Eñyei" + + - name: Test 3 + command: "printf 'Cafè Eñyei\\n'" + """) + self.play_filename = '/path/to/myplay.yml' + stream.name = self.play_filename + self.loader = AnsibleLoader(stream) + self.data = self.loader.get_single_data() + + def tearDown(self): + pass + + def test_data_complete(self): + self.assertEqual(len(self.data), 1) + self.assertIsInstance(self.data, list) + self.assertEqual(frozenset(self.data[0].keys()), frozenset((u'hosts', u'vars', u'tasks'))) + + self.assertEqual(self.data[0][u'hosts'], u'localhost') + + self.assertEqual(self.data[0][u'vars'][u'number'], 1) + self.assertEqual(self.data[0][u'vars'][u'string'], u'Ansible') + self.assertEqual(self.data[0][u'vars'][u'utf8_string'], u'Cafè Eñyei') + self.assertEqual(self.data[0][u'vars'][u'dictionary'], + {u'webster': u'daniel', + u'oed': u'oxford'}) + self.assertEqual(self.data[0][u'vars'][u'list'], [u'a', u'b', 1, 2]) + + self.assertEqual(self.data[0][u'tasks'], + [{u'name': u'Test case', u'ping': {u'data': u'{{ utf8_string }}'}}, + {u'name': u'Test 2', u'ping': {u'data': u'Cafè Eñyei'}}, + {u'name': u'Test 3', u'command': u'printf \'Cafè Eñyei\n\''}, + ]) + + def walk(self, data): + # Make sure there's no str in the data + self.assertNotIsInstance(data, binary_type) + + # Descend into various container types + if isinstance(data, text_type): + # strings are a sequence so we have to be explicit here + return + elif isinstance(data, (Sequence, Set)): + for element in data: + self.walk(element) + elif isinstance(data, Mapping): + for k, v in data.items(): + self.walk(k) + self.walk(v) + + # Scalars were all checked so we're good to go + return + + def test_no_str_in_data(self): + # Checks that no strings are str type + self.walk(self.data) + + def check_vars(self): + # Numbers don't have line/col information yet + #self.assertEqual(self.data[0][u'vars'][u'number'].ansible_pos, (self.play_filename, 4, 21)) + + self.assertEqual(self.data[0][u'vars'][u'string'].ansible_pos, (self.play_filename, 5, 29)) + self.assertEqual(self.data[0][u'vars'][u'utf8_string'].ansible_pos, (self.play_filename, 6, 34)) + + self.assertEqual(self.data[0][u'vars'][u'dictionary'].ansible_pos, (self.play_filename, 8, 23)) + self.assertEqual(self.data[0][u'vars'][u'dictionary'][u'webster'].ansible_pos, (self.play_filename, 8, 32)) + self.assertEqual(self.data[0][u'vars'][u'dictionary'][u'oed'].ansible_pos, (self.play_filename, 9, 28)) + + self.assertEqual(self.data[0][u'vars'][u'list'].ansible_pos, (self.play_filename, 11, 23)) + self.assertEqual(self.data[0][u'vars'][u'list'][0].ansible_pos, (self.play_filename, 11, 25)) + self.assertEqual(self.data[0][u'vars'][u'list'][1].ansible_pos, (self.play_filename, 12, 25)) + # Numbers don't have line/col info yet + #self.assertEqual(self.data[0][u'vars'][u'list'][2].ansible_pos, (self.play_filename, 13, 25)) + #self.assertEqual(self.data[0][u'vars'][u'list'][3].ansible_pos, (self.play_filename, 14, 25)) + + def check_tasks(self): + # + # First Task + # + self.assertEqual(self.data[0][u'tasks'][0].ansible_pos, (self.play_filename, 16, 23)) + self.assertEqual(self.data[0][u'tasks'][0][u'name'].ansible_pos, (self.play_filename, 16, 29)) + self.assertEqual(self.data[0][u'tasks'][0][u'ping'].ansible_pos, (self.play_filename, 18, 25)) + self.assertEqual(self.data[0][u'tasks'][0][u'ping'][u'data'].ansible_pos, (self.play_filename, 18, 31)) + + # + # Second Task + # + self.assertEqual(self.data[0][u'tasks'][1].ansible_pos, (self.play_filename, 20, 23)) + self.assertEqual(self.data[0][u'tasks'][1][u'name'].ansible_pos, (self.play_filename, 20, 29)) + self.assertEqual(self.data[0][u'tasks'][1][u'ping'].ansible_pos, (self.play_filename, 22, 25)) + self.assertEqual(self.data[0][u'tasks'][1][u'ping'][u'data'].ansible_pos, (self.play_filename, 22, 31)) + + # + # Third Task + # + self.assertEqual(self.data[0][u'tasks'][2].ansible_pos, (self.play_filename, 24, 23)) + self.assertEqual(self.data[0][u'tasks'][2][u'name'].ansible_pos, (self.play_filename, 24, 29)) + self.assertEqual(self.data[0][u'tasks'][2][u'command'].ansible_pos, (self.play_filename, 25, 32)) + + def test_line_numbers(self): + # Check the line/column numbers are correct + # Note: Remember, currently dicts begin at the start of their first entry + self.assertEqual(self.data[0].ansible_pos, (self.play_filename, 2, 19)) + self.assertEqual(self.data[0][u'hosts'].ansible_pos, (self.play_filename, 2, 26)) + self.assertEqual(self.data[0][u'vars'].ansible_pos, (self.play_filename, 4, 21)) + + self.check_vars() + + self.assertEqual(self.data[0][u'tasks'].ansible_pos, (self.play_filename, 16, 21)) + + self.check_tasks() diff --git a/test/units/playbook/__init__.py b/test/units/playbook/__init__.py new file mode 100644 index 0000000000..785fc45992 --- /dev/null +++ b/test/units/playbook/__init__.py @@ -0,0 +1,21 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + diff --git a/test/units/playbook/test_block.py b/test/units/playbook/test_block.py new file mode 100644 index 0000000000..348681527b --- /dev/null +++ b/test/units/playbook/test_block.py @@ -0,0 +1,77 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.playbook.block import Block +from ansible.playbook.task import Task +from ansible.compat.tests import unittest + +class TestBlock(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_construct_empty_block(self): + b = Block() + + def test_construct_block_with_role(self): + pass + + def test_load_block_simple(self): + ds = dict( + block = [], + rescue = [], + always = [], + #otherwise = [], + ) + b = Block.load(ds) + self.assertEqual(b.block, []) + self.assertEqual(b.rescue, []) + self.assertEqual(b.always, []) + # not currently used + #self.assertEqual(b.otherwise, []) + + def test_load_block_with_tasks(self): + ds = dict( + block = [dict(action='block')], + rescue = [dict(action='rescue')], + always = [dict(action='always')], + #otherwise = [dict(action='otherwise')], + ) + b = Block.load(ds) + self.assertEqual(len(b.block), 1) + assert isinstance(b.block[0], Task) + self.assertEqual(len(b.rescue), 1) + assert isinstance(b.rescue[0], Task) + self.assertEqual(len(b.always), 1) + assert isinstance(b.always[0], Task) + # not currently used + #self.assertEqual(len(b.otherwise), 1) + #assert isinstance(b.otherwise[0], Task) + + def test_load_implicit_block(self): + ds = [dict(action='foo')] + b = Block.load(ds) + self.assertEqual(len(b.block), 1) + assert isinstance(b.block[0], Task) + diff --git a/test/units/playbook/test_play.py b/test/units/playbook/test_play.py new file mode 100644 index 0000000000..22486f4129 --- /dev/null +++ b/test/units/playbook/test_play.py @@ -0,0 +1,132 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import patch, MagicMock + +from ansible.errors import AnsibleError, AnsibleParserError +from ansible.playbook.play import Play +from ansible.playbook.role import Role +from ansible.playbook.task import Task + +from test.mock.loader import DictDataLoader + +class TestPlay(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_empty_play(self): + p = Play.load(dict()) + self.assertEqual(str(p), "PLAY: <no name specified>") + + def test_basic_play(self): + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + connection='local', + remote_user="root", + sudo=True, + sudo_user="testing", + )) + + def test_play_with_user_conflict(self): + p = Play.load(dict( + name="test play", + hosts=['foo'], + user="testing", + gather_facts=False, + )) + self.assertEqual(p.remote_user, "testing") + + def test_play_with_user_conflict(self): + play_data = dict( + name="test play", + hosts=['foo'], + user="testing", + remote_user="testing", + ) + self.assertRaises(AnsibleParserError, Play.load, play_data) + + def test_play_with_tasks(self): + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + tasks=[dict(action='shell echo "hello world"')], + )) + + def test_play_with_handlers(self): + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + handlers=[dict(action='shell echo "hello world"')], + )) + + def test_play_with_pre_tasks(self): + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + pre_tasks=[dict(action='shell echo "hello world"')], + )) + + def test_play_with_post_tasks(self): + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + post_tasks=[dict(action='shell echo "hello world"')], + )) + + def test_play_with_roles(self): + fake_loader = DictDataLoader({ + '/etc/ansible/roles/foo/tasks.yml': """ + - name: role task + shell: echo "hello world" + """, + }) + + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + roles=['foo'], + ), loader=fake_loader) + + tasks = p.compile() + + def test_play_compile(self): + p = Play.load(dict( + name="test play", + hosts=['foo'], + gather_facts=False, + tasks=[dict(action='shell echo "hello world"')], + )) + + tasks = p.compile() + self.assertEqual(len(tasks), 1) + self.assertIsInstance(tasks[0], Task) diff --git a/test/units/playbook/test_playbook.py b/test/units/playbook/test_playbook.py new file mode 100644 index 0000000000..dfb52dc7b1 --- /dev/null +++ b/test/units/playbook/test_playbook.py @@ -0,0 +1,69 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import patch, MagicMock + +from ansible.errors import AnsibleError, AnsibleParserError +from ansible.playbook import Playbook +from ansible.vars import VariableManager + +from test.mock.loader import DictDataLoader + +class TestPlaybook(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_empty_playbook(self): + fake_loader = DictDataLoader({}) + p = Playbook(loader=fake_loader) + + def test_basic_playbook(self): + fake_loader = DictDataLoader({ + "test_file.yml":""" + - hosts: all + """, + }) + p = Playbook.load("test_file.yml", loader=fake_loader) + plays = p.get_plays() + + def test_bad_playbook_files(self): + fake_loader = DictDataLoader({ + # represents a playbook which is not a list of plays + "bad_list.yml": """ + foo: bar + + """, + # represents a playbook where a play entry is mis-formatted + "bad_entry.yml": """ + - + - "This should be a mapping..." + + """, + }) + vm = VariableManager() + self.assertRaises(AnsibleParserError, Playbook.load, "bad_list.yml", vm, fake_loader) + self.assertRaises(AnsibleParserError, Playbook.load, "bad_entry.yml", vm, fake_loader) + diff --git a/test/units/playbook/test_role.py b/test/units/playbook/test_role.py new file mode 100644 index 0000000000..d0f3708898 --- /dev/null +++ b/test/units/playbook/test_role.py @@ -0,0 +1,167 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import patch, MagicMock + +from ansible.errors import AnsibleError, AnsibleParserError +from ansible.playbook.block import Block +from ansible.playbook.role import Role +from ansible.playbook.role.include import RoleInclude +from ansible.playbook.task import Task + +from test.mock.loader import DictDataLoader + +class TestRole(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_load_role_with_tasks(self): + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo/tasks/main.yml": """ + - shell: echo 'hello world' + """, + }) + + i = RoleInclude.load('foo', loader=fake_loader) + r = Role.load(i) + + self.assertEqual(str(r), 'foo') + self.assertEqual(len(r._task_blocks), 1) + assert isinstance(r._task_blocks[0], Block) + + def test_load_role_with_handlers(self): + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo/handlers/main.yml": """ + - name: test handler + shell: echo 'hello world' + """, + }) + + i = RoleInclude.load('foo', loader=fake_loader) + r = Role.load(i) + + self.assertEqual(len(r._handler_blocks), 1) + assert isinstance(r._handler_blocks[0], Block) + + def test_load_role_with_vars(self): + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo/defaults/main.yml": """ + foo: bar + """, + "/etc/ansible/roles/foo/vars/main.yml": """ + foo: bam + """, + }) + + i = RoleInclude.load('foo', loader=fake_loader) + r = Role.load(i) + + self.assertEqual(r._default_vars, dict(foo='bar')) + self.assertEqual(r._role_vars, dict(foo='bam')) + + def test_load_role_with_metadata(self): + + fake_loader = DictDataLoader({ + '/etc/ansible/roles/foo/meta/main.yml': """ + allow_duplicates: true + dependencies: + - bar + galaxy_info: + a: 1 + b: 2 + c: 3 + """, + '/etc/ansible/roles/bar/meta/main.yml': """ + dependencies: + - baz + """, + '/etc/ansible/roles/baz/meta/main.yml': """ + dependencies: + - bam + """, + '/etc/ansible/roles/bam/meta/main.yml': """ + dependencies: [] + """, + '/etc/ansible/roles/bad1/meta/main.yml': """ + 1 + """, + '/etc/ansible/roles/bad2/meta/main.yml': """ + foo: bar + """, + '/etc/ansible/roles/recursive1/meta/main.yml': """ + dependencies: ['recursive2'] + """, + '/etc/ansible/roles/recursive2/meta/main.yml': """ + dependencies: ['recursive1'] + """, + }) + + i = RoleInclude.load('foo', loader=fake_loader) + r = Role.load(i) + + role_deps = r.get_direct_dependencies() + + self.assertEqual(len(role_deps), 1) + self.assertEqual(type(role_deps[0]), Role) + self.assertEqual(len(role_deps[0].get_parents()), 1) + self.assertEqual(role_deps[0].get_parents()[0], r) + self.assertEqual(r._metadata.allow_duplicates, True) + self.assertEqual(r._metadata.galaxy_info, dict(a=1, b=2, c=3)) + + all_deps = r.get_all_dependencies() + self.assertEqual(len(all_deps), 3) + self.assertEqual(all_deps[0].get_name(), 'bar') + self.assertEqual(all_deps[1].get_name(), 'baz') + self.assertEqual(all_deps[2].get_name(), 'bam') + + i = RoleInclude.load('bad1', loader=fake_loader) + self.assertRaises(AnsibleParserError, Role.load, i) + + i = RoleInclude.load('bad2', loader=fake_loader) + self.assertRaises(AnsibleParserError, Role.load, i) + + i = RoleInclude.load('recursive1', loader=fake_loader) + self.assertRaises(AnsibleError, Role.load, i) + + def test_load_role_complex(self): + + # FIXME: add tests for the more complex uses of + # params and tags/when statements + + fake_loader = DictDataLoader({ + "/etc/ansible/roles/foo/tasks/main.yml": """ + - shell: echo 'hello world' + """, + }) + + i = RoleInclude.load(dict(role='foo'), loader=fake_loader) + r = Role.load(i) + + self.assertEqual(r.get_name(), "foo") + diff --git a/test/units/playbook/test_task.py b/test/units/playbook/test_task.py new file mode 100644 index 0000000000..b2160e0dd2 --- /dev/null +++ b/test/units/playbook/test_task.py @@ -0,0 +1,87 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.playbook.task import Task +from ansible.compat.tests import unittest + +basic_shell_task = dict( + name = 'Test Task', + shell = 'echo hi' +) + +kv_shell_task = dict( + action = 'shell echo hi' +) + +class TestTask(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_construct_empty_task(self): + t = Task() + + def test_construct_task_with_role(self): + pass + + def test_construct_task_with_block(self): + pass + + def test_construct_task_with_role_and_block(self): + pass + + def test_load_task_simple(self): + t = Task.load(basic_shell_task) + assert t is not None + self.assertEqual(t.name, basic_shell_task['name']) + self.assertEqual(t.action, 'command') + self.assertEqual(t.args, dict(_raw_params='echo hi', _uses_shell=True)) + + def test_load_task_kv_form(self): + t = Task.load(kv_shell_task) + self.assertEqual(t.action, 'command') + self.assertEqual(t.args, dict(_raw_params='echo hi', _uses_shell=True)) + + def test_task_auto_name(self): + assert 'name' not in kv_shell_task + t = Task.load(kv_shell_task) + #self.assertEqual(t.name, 'shell echo hi') + + def test_task_auto_name_with_role(self): + pass + + def test_load_task_complex_form(self): + pass + + def test_can_load_module_complex_form(self): + pass + + def test_local_action_implies_delegate(self): + pass + + def test_local_action_conflicts_with_delegate(self): + pass + + def test_delegate_to_parses(self): + pass diff --git a/test/units/plugins/__init__.py b/test/units/plugins/__init__.py new file mode 100644 index 0000000000..785fc45992 --- /dev/null +++ b/test/units/plugins/__init__.py @@ -0,0 +1,21 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + diff --git a/test/units/plugins/test_cache.py b/test/units/plugins/test_cache.py new file mode 100644 index 0000000000..f3cfe6a38c --- /dev/null +++ b/test/units/plugins/test_cache.py @@ -0,0 +1,100 @@ +# (c) 2012-2015, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.compat.tests import unittest +from ansible.plugins.cache.base import BaseCacheModule +from ansible.plugins.cache.memory import CacheModule as MemoryCache + +HAVE_MEMCACHED = True +try: + import memcached +except ImportError: + HAVE_MEMCACHED = False +else: + # Use an else so that the only reason we skip this is for lack of + # memcached, not errors importing the plugin + from ansible.plugins.cache.memcached import CacheModule as MemcachedCache + +HAVE_REDIS = True +try: + import redis +except ImportError: + HAVE_REDIS = False +else: + from ansible.plugins.cache.redis import CacheModule as RedisCache + + +class TestAbstractClass(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_subclass_error(self): + class CacheModule1(BaseCacheModule): + pass + with self.assertRaises(TypeError): + CacheModule1() + + class CacheModule2(BaseCacheModule): + def get(self, key): + super(CacheModule2, self).get(key) + + with self.assertRaises(TypeError): + CacheModule2() + + def test_subclass_success(self): + class CacheModule3(BaseCacheModule): + def get(self, key): + super(CacheModule3, self).get(key) + + def set(self, key, value): + super(CacheModule3, self).set(key, value) + + def keys(self): + super(CacheModule3, self).keys() + + def contains(self, key): + super(CacheModule3, self).contains(key) + + def delete(self, key): + super(CacheModule3, self).delete(key) + + def flush(self): + super(CacheModule3, self).flush() + + def copy(self): + super(CacheModule3, self).copy() + + self.assertIsInstance(CacheModule3(), CacheModule3) + + @unittest.skipUnless(HAVE_MEMCACHED, 'python-memcached module not installed') + def test_memcached_cachemodule(self): + self.assertIsInstance(MemcachedCache(), MemcachedCache) + + def test_memory_cachemodule(self): + self.assertIsInstance(MemoryCache(), MemoryCache) + + @unittest.skipUnless(HAVE_REDIS, 'Redis pyhton module not installed') + def test_redis_cachemodule(self): + self.assertIsInstance(RedisCache(), RedisCache) diff --git a/test/units/plugins/test_connection.py b/test/units/plugins/test_connection.py new file mode 100644 index 0000000000..0ed888ac95 --- /dev/null +++ b/test/units/plugins/test_connection.py @@ -0,0 +1,102 @@ +# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from six import StringIO + +from ansible.compat.tests import unittest +from ansible.executor.connection_info import ConnectionInformation + +from ansible.plugins.connections import ConnectionBase +#from ansible.plugins.connections.accelerate import Connection as AccelerateConnection +#from ansible.plugins.connections.chroot import Connection as ChrootConnection +#from ansible.plugins.connections.funcd import Connection as FuncdConnection +#from ansible.plugins.connections.jail import Connection as JailConnection +#from ansible.plugins.connections.libvirt_lxc import Connection as LibvirtLXCConnection +from ansible.plugins.connections.local import Connection as LocalConnection +from ansible.plugins.connections.paramiko_ssh import Connection as ParamikoConnection +from ansible.plugins.connections.ssh import Connection as SSHConnection +#from ansible.plugins.connections.winrm import Connection as WinRmConnection + +class TestConnectionBaseClass(unittest.TestCase): + + def setUp(self): + self.conn_info = ConnectionInformation() + self.in_stream = StringIO() + + def tearDown(self): + pass + + def test_subclass_error(self): + class ConnectionModule1(ConnectionBase): + pass + with self.assertRaises(TypeError): + ConnectionModule1() + + class ConnectionModule2(ConnectionBase): + def get(self, key): + super(ConnectionModule2, self).get(key) + + with self.assertRaises(TypeError): + ConnectionModule2() + + def test_subclass_success(self): + class ConnectionModule3(ConnectionBase): + @property + def transport(self): + pass + def _connect(self): + pass + def exec_command(self): + pass + def put_file(self): + pass + def fetch_file(self): + pass + def close(self): + pass + self.assertIsInstance(ConnectionModule3(self.conn_info, self.in_stream), ConnectionModule3) + +# def test_accelerate_connection_module(self): +# self.assertIsInstance(AccelerateConnection(), AccelerateConnection) +# +# def test_chroot_connection_module(self): +# self.assertIsInstance(ChrootConnection(), ChrootConnection) +# +# def test_funcd_connection_module(self): +# self.assertIsInstance(FuncdConnection(), FuncdConnection) +# +# def test_jail_connection_module(self): +# self.assertIsInstance(JailConnection(), JailConnection) +# +# def test_libvirt_lxc_connection_module(self): +# self.assertIsInstance(LibvirtLXCConnection(), LibvirtLXCConnection) + + def test_local_connection_module(self): + self.assertIsInstance(LocalConnection(self.conn_info, self.in_stream), LocalConnection) + + def test_paramiko_connection_module(self): + self.assertIsInstance(ParamikoConnection(self.conn_info, self.in_stream), ParamikoConnection) + + def test_ssh_connection_module(self): + self.assertIsInstance(SSHConnection(self.conn_info, self.in_stream), SSHConnection) + +# def test_winrm_connection_module(self): +# self.assertIsInstance(WinRmConnection(), WinRmConnection) diff --git a/test/units/plugins/test_plugins.py b/test/units/plugins/test_plugins.py new file mode 100644 index 0000000000..0d0fe400d0 --- /dev/null +++ b/test/units/plugins/test_plugins.py @@ -0,0 +1,77 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import os +from ansible.compat.tests import unittest +from ansible.compat.tests import BUILTINS + +from ansible.compat.tests.mock import mock_open, patch, MagicMock + +from ansible.plugins import MODULE_CACHE, PATH_CACHE, PLUGIN_PATH_CACHE, _basedirs, push_basedir, PluginLoader + +class TestErrors(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + @patch.object(PluginLoader, '_get_paths') + def test_print_paths(self, mock_method): + mock_method.return_value = ['/path/one', '/path/two', '/path/three'] + pl = PluginLoader('foo', 'foo', '', 'test_plugins') + paths = pl.print_paths() + expected_paths = os.pathsep.join(['/path/one', '/path/two', '/path/three']) + self.assertEqual(paths, expected_paths) + + def test_plugins__get_package_paths_no_package(self): + pl = PluginLoader('test', '', 'test', 'test_plugin') + self.assertEqual(pl._get_package_paths(), []) + + def test_plugins__get_package_paths_with_package(self): + # the _get_package_paths() call uses __import__ to load a + # python library, and then uses the __file__ attribute of + # the result for that to get the library path, so we mock + # that here and patch the builtin to use our mocked result + m = MagicMock() + m.return_value.__file__ = '/path/to/my/test.py' + pl = PluginLoader('test', 'foo.bar.bam', 'test', 'test_plugin') + with patch('{0}.__import__'.format(BUILTINS), m): + self.assertEqual(pl._get_package_paths(), ['/path/to/my/bar/bam']) + + def test_plugins__get_paths(self): + pl = PluginLoader('test', '', 'test', 'test_plugin') + pl._paths = ['/path/one', '/path/two'] + self.assertEqual(pl._get_paths(), ['/path/one', '/path/two']) + + # NOT YET WORKING + #def fake_glob(path): + # if path == 'test/*': + # return ['test/foo', 'test/bar', 'test/bam'] + # elif path == 'test/*/*' + #m._paths = None + #mock_glob = MagicMock() + #mock_glob.return_value = [] + #with patch('glob.glob', mock_glob): + # pass + diff --git a/test/units/vars/__init__.py b/test/units/vars/__init__.py new file mode 100644 index 0000000000..785fc45992 --- /dev/null +++ b/test/units/vars/__init__.py @@ -0,0 +1,21 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + diff --git a/test/units/vars/test_variable_manager.py b/test/units/vars/test_variable_manager.py new file mode 100644 index 0000000000..f8d815eb6f --- /dev/null +++ b/test/units/vars/test_variable_manager.py @@ -0,0 +1,144 @@ +# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import patch, MagicMock + +from ansible.vars import VariableManager + +from test.mock.loader import DictDataLoader + +class TestVariableManager(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_basic_manager(self): + fake_loader = DictDataLoader({}) + + v = VariableManager() + self.assertEqual(v.get_vars(loader=fake_loader), dict()) + + self.assertEqual( + v._merge_dicts( + dict(a=1), + dict(b=2) + ), dict(a=1, b=2) + ) + self.assertEqual( + v._merge_dicts( + dict(a=1, c=dict(foo='bar')), + dict(b=2, c=dict(baz='bam')) + ), dict(a=1, b=2, c=dict(foo='bar', baz='bam')) + ) + + + def test_variable_manager_extra_vars(self): + fake_loader = DictDataLoader({}) + + extra_vars = dict(a=1, b=2, c=3) + v = VariableManager() + v.set_extra_vars(extra_vars) + + for (key, val) in extra_vars.iteritems(): + self.assertEqual(v.get_vars(loader=fake_loader).get(key), val) + self.assertIsNot(v.extra_vars.get(key), val) + + def test_variable_manager_host_vars_file(self): + fake_loader = DictDataLoader({ + "host_vars/hostname1.yml": """ + foo: bar + """ + }) + + v = VariableManager() + v.add_host_vars_file("host_vars/hostname1.yml", loader=fake_loader) + self.assertIn("hostname1", v._host_vars_files) + self.assertEqual(v._host_vars_files["hostname1"], dict(foo="bar")) + + mock_host = MagicMock() + mock_host.get_name.return_value = "hostname1" + mock_host.get_vars.return_value = dict() + mock_host.get_groups.return_value = () + + self.assertEqual(v.get_vars(loader=fake_loader, host=mock_host).get("foo"), "bar") + + def test_variable_manager_group_vars_file(self): + fake_loader = DictDataLoader({ + "group_vars/somegroup.yml": """ + foo: bar + """ + }) + + v = VariableManager() + v.add_group_vars_file("group_vars/somegroup.yml", loader=fake_loader) + self.assertIn("somegroup", v._group_vars_files) + self.assertEqual(v._group_vars_files["somegroup"], dict(foo="bar")) + + mock_group = MagicMock() + mock_group.name.return_value = "somegroup" + mock_group.get_ancestors.return_value = () + + mock_host = MagicMock() + mock_host.get_name.return_value = "hostname1" + mock_host.get_vars.return_value = dict() + mock_host.get_groups.return_value = (mock_group) + + self.assertEqual(v.get_vars(loader=fake_loader, host=mock_host).get("foo"), "bar") + + def test_variable_manager_play_vars(self): + fake_loader = DictDataLoader({}) + + mock_play = MagicMock() + mock_play.get_vars.return_value = dict(foo="bar") + mock_play.get_roles.return_value = [] + mock_play.get_vars_files.return_value = [] + + v = VariableManager() + self.assertEqual(v.get_vars(loader=fake_loader, play=mock_play).get("foo"), "bar") + + def test_variable_manager_play_vars_files(self): + fake_loader = DictDataLoader({ + "/path/to/somefile.yml": """ + foo: bar + """ + }) + + mock_play = MagicMock() + mock_play.get_vars.return_value = dict() + mock_play.get_roles.return_value = [] + mock_play.get_vars_files.return_value = ['/path/to/somefile.yml'] + + v = VariableManager() + self.assertEqual(v.get_vars(loader=fake_loader, play=mock_play).get("foo"), "bar") + + def test_variable_manager_task_vars(self): + fake_loader = DictDataLoader({}) + + mock_task = MagicMock() + mock_task.get_vars.return_value = dict(foo="bar") + + v = VariableManager() + self.assertEqual(v.get_vars(loader=fake_loader, task=mock_task).get("foo"), "bar") + diff --git a/test/units/vault_test_data/foo-ansible-1.0.yml b/test/units/vault_test_data/foo-ansible-1.0.yml deleted file mode 100644 index f71ddf10ce..0000000000 --- a/test/units/vault_test_data/foo-ansible-1.0.yml +++ /dev/null @@ -1,4 +0,0 @@ -$ANSIBLE_VAULT;1.0;AES -53616c7465645f5fd0026926a2d415a28a2622116273fbc90e377225c12a347e1daf4456d36a77f9 -9ad98d59f61d06a4b66718d855f16fb7bdfe54d1ec8aeaa4d06c2dc1fa630ae1846a029877f0eeb1 -83c62ffb04c2512995e815de4b4d29ed diff --git a/test/units/vault_test_data/foo-ansible-1.1-ansible-newline-ansible.yml b/test/units/vault_test_data/foo-ansible-1.1-ansible-newline-ansible.yml deleted file mode 100644 index 6e025a1c40..0000000000 --- a/test/units/vault_test_data/foo-ansible-1.1-ansible-newline-ansible.yml +++ /dev/null @@ -1,6 +0,0 @@ -$ANSIBLE_VAULT;1.1;AES256 -61333063333663376535373431643063613232393438623732643966613962363563383132363631 -3235363730623635323039623439343561313566313361630a313632643338613636303637623765 -64356531643630303636323064336439393335313836366235336464633635376339663830333232 -6338353337663139320a646632386131646431656165656338633535386535623236393265373634 -37656134633661333935346434363237613435323865356234323264663838643931 diff --git a/test/units/vault_test_data/foo-ansible-1.1.yml b/test/units/vault_test_data/foo-ansible-1.1.yml deleted file mode 100644 index d9a4a448a6..0000000000 --- a/test/units/vault_test_data/foo-ansible-1.1.yml +++ /dev/null @@ -1,6 +0,0 @@ -$ANSIBLE_VAULT;1.1;AES256 -62303130653266653331306264616235333735323636616539316433666463323964623162386137 -3961616263373033353631316333623566303532663065310a393036623466376263393961326530 -64336561613965383835646464623865663966323464653236343638373165343863623638316664 -3631633031323837340a396530313963373030343933616133393566366137363761373930663833 -3739 |