summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/units/README.md5
-rw-r--r--test/units/TestConstants.py64
-rw-r--r--test/units/TestFilters.py191
-rw-r--r--test/units/TestInventory.py505
-rw-r--r--test/units/TestModuleUtilsBasic.py334
-rw-r--r--test/units/TestModuleUtilsDatabase.py118
-rw-r--r--test/units/TestModules.py32
-rw-r--r--test/units/TestPlayVarsFiles.py390
-rw-r--r--test/units/TestSynchronize.py176
-rw-r--r--test/units/TestUtils.py945
-rw-r--r--test/units/TestUtilsStringFunctions.py33
-rw-r--r--test/units/TestVaultEditor.py180
-rw-r--r--test/units/__init__.py5
-rw-r--r--test/units/ansible.cfg3
-rw-r--r--test/units/errors/__init__.py22
-rw-r--r--test/units/errors/test_errors.py68
-rw-r--r--test/units/executor/__init__.py21
-rw-r--r--test/units/executor/test_play_iterator.py85
-rw-r--r--test/units/inventory_test_data/ansible_hosts2
-rw-r--r--test/units/inventory_test_data/broken.yml2
-rw-r--r--test/units/inventory_test_data/common_vars.yml4
-rw-r--r--test/units/inventory_test_data/complex_hosts96
-rw-r--r--test/units/inventory_test_data/encrypted.yml6
-rw-r--r--test/units/inventory_test_data/hosts_list.yml6
-rw-r--r--test/units/inventory_test_data/inventory/test_alpha_end_before_beg2
-rw-r--r--test/units/inventory_test_data/inventory/test_combined_range2
-rw-r--r--test/units/inventory_test_data/inventory/test_incorrect_format2
-rw-r--r--test/units/inventory_test_data/inventory/test_incorrect_range2
-rw-r--r--test/units/inventory_test_data/inventory/test_leading_range6
-rw-r--r--test/units/inventory_test_data/inventory/test_missing_end2
-rw-r--r--test/units/inventory_test_data/inventory_api.py44
-rw-r--r--test/units/inventory_test_data/inventory_dir/0hosts3
-rw-r--r--test/units/inventory_test_data/inventory_dir/1mythology6
-rw-r--r--test/units/inventory_test_data/inventory_dir/2levels6
-rw-r--r--test/units/inventory_test_data/inventory_dir/3comments8
-rw-r--r--test/units/inventory_test_data/inventory_dir/4skip_extensions.ini2
-rw-r--r--test/units/inventory_test_data/large_range1
-rw-r--r--test/units/inventory_test_data/restrict_pattern2
-rw-r--r--test/units/inventory_test_data/simple_hosts22
-rw-r--r--test/units/mock/__init__.py20
-rw-r--r--test/units/mock/loader.py83
-rw-r--r--test/units/module_tests/TestApt.py42
-rw-r--r--test/units/module_tests/TestDocker.py19
-rw-r--r--test/units/parsing/__init__.py21
-rw-r--r--test/units/parsing/test_data_loader.py90
-rw-r--r--test/units/parsing/test_mod_args.py130
-rw-r--r--test/units/parsing/test_splitter.py112
-rw-r--r--test/units/parsing/vault/__init__.py21
-rw-r--r--test/units/parsing/vault/test_vault.py (renamed from test/units/TestVault.py)82
-rw-r--r--test/units/parsing/vault/test_vault_editor.py214
-rw-r--r--test/units/parsing/yaml/__init__.py0
-rw-r--r--test/units/parsing/yaml/test_loader.py283
-rw-r--r--test/units/playbook/__init__.py21
-rw-r--r--test/units/playbook/test_block.py77
-rw-r--r--test/units/playbook/test_play.py132
-rw-r--r--test/units/playbook/test_playbook.py69
-rw-r--r--test/units/playbook/test_role.py167
-rw-r--r--test/units/playbook/test_task.py87
-rw-r--r--test/units/plugins/__init__.py21
-rw-r--r--test/units/plugins/test_cache.py100
-rw-r--r--test/units/plugins/test_connection.py102
-rw-r--r--test/units/plugins/test_plugins.py77
-rw-r--r--test/units/vars/__init__.py21
-rw-r--r--test/units/vars/test_variable_manager.py144
-rw-r--r--test/units/vault_test_data/foo-ansible-1.0.yml4
-rw-r--r--test/units/vault_test_data/foo-ansible-1.1-ansible-newline-ansible.yml6
-rw-r--r--test/units/vault_test_data/foo-ansible-1.1.yml6
67 files changed, 2240 insertions, 3314 deletions
diff --git a/test/units/README.md b/test/units/README.md
deleted file mode 100644
index d0b3dd5abd..0000000000
--- a/test/units/README.md
+++ /dev/null
@@ -1,5 +0,0 @@
-Unit tests
-==========
-
-Tests at code level. Should be concise and to the point, and organized by subject.
-
diff --git a/test/units/TestConstants.py b/test/units/TestConstants.py
deleted file mode 100644
index f3b96e8abc..0000000000
--- a/test/units/TestConstants.py
+++ /dev/null
@@ -1,64 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-
-from ansible.constants import get_config
-import ConfigParser
-import random
-import string
-import os
-
-
-def random_string(length):
- return ''.join(random.choice(string.ascii_uppercase) for x in range(6))
-
-p = ConfigParser.ConfigParser()
-p.read(os.path.join(os.path.dirname(__file__), 'ansible.cfg'))
-
-class TestConstants(unittest.TestCase):
-
- #####################################
- ### get_config unit tests
-
-
- def test_configfile_and_env_both_set(self):
- r = random_string(6)
- env_var = 'ANSIBLE_TEST_%s' % r
- os.environ[env_var] = r
-
- res = get_config(p, 'defaults', 'test_key', env_var, 'default')
- del os.environ[env_var]
-
- assert res == r
-
-
- def test_configfile_set_env_not_set(self):
- r = random_string(6)
- env_var = 'ANSIBLE_TEST_%s' % r
- assert env_var not in os.environ
-
- res = get_config(p, 'defaults', 'test_key', env_var, 'default')
-
- print res
- assert res == 'test_value'
-
-
- def test_configfile_not_set_env_set(self):
- r = random_string(6)
- env_var = 'ANSIBLE_TEST_%s' % r
- os.environ[env_var] = r
-
- res = get_config(p, 'defaults', 'doesnt_exist', env_var, 'default')
- del os.environ[env_var]
-
- assert res == r
-
-
- def test_configfile_not_set_env_not_set(self):
- r = random_string(6)
- env_var = 'ANSIBLE_TEST_%s' % r
- assert env_var not in os.environ
-
- res = get_config(p, 'defaults', 'doesnt_exist', env_var, 'default')
-
- assert res == 'default'
diff --git a/test/units/TestFilters.py b/test/units/TestFilters.py
deleted file mode 100644
index 3c7eb4506e..0000000000
--- a/test/units/TestFilters.py
+++ /dev/null
@@ -1,191 +0,0 @@
-'''
-Test bundled filters
-'''
-
-import os.path
-import unittest, tempfile, shutil
-from ansible import playbook, inventory, callbacks
-import ansible.runner.filter_plugins.core
-import ansible.runner.filter_plugins.mathstuff
-
-INVENTORY = inventory.Inventory(['localhost'])
-
-BOOK = '''
-- hosts: localhost
- vars:
- var: { a: [1,2,3] }
- tasks:
- - template: src=%s dest=%s
-'''
-
-SRC = '''
--
-{{ var|to_json }}
--
-{{ var|to_nice_json }}
--
-{{ var|to_yaml }}
--
-{{ var|to_nice_yaml }}
-'''
-
-DEST = '''
--
-{"a": [1, 2, 3]}
--
-{
- "a": [
- 1,
- 2,
- 3
- ]
-}
--
-a: [1, 2, 3]
-
--
-a:
-- 1
-- 2
-- 3
-'''
-
-class TestFilters(unittest.TestCase):
-
- def setUp(self):
- self.tmpdir = tempfile.mkdtemp(dir='/tmp')
-
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
-
- def temp(self, name, data=''):
- '''write a temporary file and return the name'''
- name = self.tmpdir + '/' + name
- with open(name, 'w') as f:
- f.write(data)
- return name
-
- def test_bool_none(self):
- a = ansible.runner.filter_plugins.core.bool(None)
- assert a == None
-
- def test_bool_true(self):
- a = ansible.runner.filter_plugins.core.bool(True)
- assert a == True
-
- def test_bool_yes(self):
- a = ansible.runner.filter_plugins.core.bool('Yes')
- assert a == True
-
- def test_bool_no(self):
- a = ansible.runner.filter_plugins.core.bool('Foo')
- assert a == False
-
- def test_quotes(self):
- a = ansible.runner.filter_plugins.core.quote('ls | wc -l')
- assert a == "'ls | wc -l'"
-
- def test_fileglob(self):
- pathname = os.path.join(os.path.dirname(__file__), '*')
- a = ansible.runner.filter_plugins.core.fileglob(pathname)
- assert __file__ in a
-
- def test_regex(self):
- a = ansible.runner.filter_plugins.core.regex('ansible', 'ansible',
- match_type='findall')
- assert a == True
-
- def test_match_case_sensitive(self):
- a = ansible.runner.filter_plugins.core.match('ansible', 'ansible')
- assert a == True
-
- def test_match_case_insensitive(self):
- a = ansible.runner.filter_plugins.core.match('ANSIBLE', 'ansible',
- True)
- assert a == True
-
- def test_match_no_match(self):
- a = ansible.runner.filter_plugins.core.match(' ansible', 'ansible')
- assert a == False
-
- def test_search_case_sensitive(self):
- a = ansible.runner.filter_plugins.core.search(' ansible ', 'ansible')
- assert a == True
-
- def test_search_case_insensitive(self):
- a = ansible.runner.filter_plugins.core.search(' ANSIBLE ', 'ansible',
- True)
- assert a == True
-
- def test_regex_replace_case_sensitive(self):
- a = ansible.runner.filter_plugins.core.regex_replace('ansible', '^a.*i(.*)$',
- 'a\\1')
- assert a == 'able'
-
- def test_regex_replace_case_insensitive(self):
- a = ansible.runner.filter_plugins.core.regex_replace('ansible', '^A.*I(.*)$',
- 'a\\1', True)
- assert a == 'able'
-
- def test_regex_replace_no_match(self):
- a = ansible.runner.filter_plugins.core.regex_replace('ansible', '^b.*i(.*)$',
- 'a\\1')
- assert a == 'ansible'
-
- def test_to_uuid(self):
- a = ansible.runner.filter_plugins.core.to_uuid('example.com')
-
- assert a == 'ae780c3a-a3ab-53c2-bfb4-098da300b3fe'
-
- #def test_filters(self):
-
- # this test is pretty low level using a playbook, hence I am disabling it for now -- MPD.
- #return
-
- #src = self.temp('src.j2', SRC)
- #dest = self.temp('dest.txt')
- #book = self.temp('book', BOOK % (src, dest))
-
- #playbook.PlayBook(
- # playbook = book,
- # inventory = INVENTORY,
- # transport = 'local',
- # callbacks = callbacks.PlaybookCallbacks(),
- # runner_callbacks = callbacks.DefaultRunnerCallbacks(),
- # stats = callbacks.AggregateStats(),
- #).run()
-
- #out = open(dest).read()
- #self.assertEqual(DEST, out)
-
- def test_version_compare(self):
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(0, 1.1, 'lt', False))
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.1, 1.2, '<'))
-
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.2, 1.2, '=='))
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.2, 1.2, '='))
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.2, 1.2, 'eq'))
-
-
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.3, 1.2, 'gt'))
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.3, 1.2, '>'))
-
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.3, 1.2, 'ne'))
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.3, 1.2, '!='))
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.3, 1.2, '<>'))
-
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.1, 1.1, 'ge'))
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.2, 1.1, '>='))
-
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.1, 1.1, 'le'))
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare(1.0, 1.1, '<='))
-
- self.assertTrue(ansible.runner.filter_plugins.core.version_compare('12.04', 12, 'ge'))
-
- def test_min(self):
- a = ansible.runner.filter_plugins.mathstuff.min([3, 2, 5, 4])
- assert a == 2
-
- def test_max(self):
- a = ansible.runner.filter_plugins.mathstuff.max([3, 2, 5, 4])
- assert a == 5
diff --git a/test/units/TestInventory.py b/test/units/TestInventory.py
deleted file mode 100644
index dc3a0ce6d6..0000000000
--- a/test/units/TestInventory.py
+++ /dev/null
@@ -1,505 +0,0 @@
-import os
-import unittest
-from nose.tools import raises
-
-from ansible import errors
-from ansible.inventory import Inventory
-
-class TestInventory(unittest.TestCase):
-
- def setUp(self):
-
- self.cwd = os.getcwd()
- self.test_dir = os.path.join(self.cwd, 'inventory_test_data')
-
- self.inventory_file = os.path.join(self.test_dir, 'simple_hosts')
- self.large_range_inventory_file = os.path.join(self.test_dir, 'large_range')
- self.complex_inventory_file = os.path.join(self.test_dir, 'complex_hosts')
- self.inventory_script = os.path.join(self.test_dir, 'inventory_api.py')
- self.inventory_dir = os.path.join(self.test_dir, 'inventory_dir')
-
- os.chmod(self.inventory_script, 0755)
-
- def tearDown(self):
- os.chmod(self.inventory_script, 0644)
-
- def compare(self, left, right, sort=True):
- if sort:
- left = sorted(left)
- right = sorted(right)
- print left
- print right
- assert left == right
-
- def empty_inventory(self):
- return Inventory(None)
-
- def simple_inventory(self):
- return Inventory(self.inventory_file)
-
- def large_range_inventory(self):
- return Inventory(self.large_range_inventory_file)
-
- def script_inventory(self):
- return Inventory(self.inventory_script)
-
- def complex_inventory(self):
- return Inventory(self.complex_inventory_file)
-
- def dir_inventory(self):
- return Inventory(self.inventory_dir)
-
- all_simple_hosts=['jupiter', 'saturn', 'zeus', 'hera',
- 'cerberus001','cerberus002','cerberus003',
- 'cottus99', 'cottus100',
- 'poseidon', 'thor', 'odin', 'loki',
- 'thrudgelmir0', 'thrudgelmir1', 'thrudgelmir2',
- 'thrudgelmir3', 'thrudgelmir4', 'thrudgelmir5',
- 'Hotep-a', 'Hotep-b', 'Hotep-c',
- 'BastC', 'BastD', 'neptun', ]
-
- #####################################
- ### Empty inventory format tests
-
- def test_empty(self):
- inventory = self.empty_inventory()
- hosts = inventory.list_hosts()
- self.assertEqual(hosts, [])
-
- #####################################
- ### Simple inventory format tests
-
- def test_simple(self):
- inventory = self.simple_inventory()
- hosts = inventory.list_hosts()
- self.assertEqual(sorted(hosts), sorted(self.all_simple_hosts))
-
- def test_simple_all(self):
- inventory = self.simple_inventory()
- hosts = inventory.list_hosts('all')
- self.assertEqual(sorted(hosts), sorted(self.all_simple_hosts))
-
- def test_get_hosts(self):
- inventory = Inventory('127.0.0.1,192.168.1.1')
- hosts = inventory.get_hosts('!10.0.0.1')
- hosts_all = inventory.get_hosts('all')
- self.assertEqual(sorted(hosts), sorted(hosts_all))
-
- def test_no_src(self):
- inventory = Inventory('127.0.0.1,')
- self.assertEqual(inventory.src(), None)
-
- def test_simple_norse(self):
- inventory = self.simple_inventory()
- hosts = inventory.list_hosts("norse")
-
- expected_hosts=['thor', 'odin', 'loki']
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_simple_ungrouped(self):
- inventory = self.simple_inventory()
- hosts = inventory.list_hosts("ungrouped")
-
- expected_hosts=['jupiter', 'saturn',
- 'thrudgelmir0', 'thrudgelmir1', 'thrudgelmir2',
- 'thrudgelmir3', 'thrudgelmir4', 'thrudgelmir5']
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_simple_combined(self):
- inventory = self.simple_inventory()
- hosts = inventory.list_hosts("norse:greek")
-
- expected_hosts=['zeus', 'hera', 'poseidon',
- 'cerberus001','cerberus002','cerberus003',
- 'cottus99','cottus100',
- 'thor', 'odin', 'loki']
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_simple_restrict(self):
- inventory = self.simple_inventory()
-
- restricted_hosts = ['hera', 'poseidon', 'thor']
- expected_hosts=['zeus', 'hera', 'poseidon',
- 'cerberus001','cerberus002','cerberus003',
- 'cottus99', 'cottus100',
- 'thor', 'odin', 'loki']
-
- inventory.restrict_to(restricted_hosts)
- hosts = inventory.list_hosts("norse:greek")
-
- assert sorted(hosts) == sorted(restricted_hosts)
-
- inventory.lift_restriction()
- hosts = inventory.list_hosts("norse:greek")
-
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_simple_string_ipv4(self):
- inventory = Inventory('127.0.0.1,192.168.1.1')
- hosts = inventory.list_hosts()
- self.assertEqual(sorted(hosts), sorted(['127.0.0.1','192.168.1.1']))
-
- def test_simple_string_ipv4_port(self):
- inventory = Inventory('127.0.0.1:2222,192.168.1.1')
- hosts = inventory.list_hosts()
- self.assertEqual(sorted(hosts), sorted(['127.0.0.1','192.168.1.1']))
-
- def test_simple_string_ipv4_vars(self):
- inventory = Inventory('127.0.0.1:2222,192.168.1.1')
- var = inventory.get_variables('127.0.0.1')
- self.assertEqual(var['ansible_ssh_port'], 2222)
-
- def test_simple_string_ipv6(self):
- inventory = Inventory('FE80:EF45::12:1,192.168.1.1')
- hosts = inventory.list_hosts()
- self.assertEqual(sorted(hosts), sorted(['FE80:EF45::12:1','192.168.1.1']))
-
- def test_simple_string_ipv6_port(self):
- inventory = Inventory('[FE80:EF45::12:1]:2222,192.168.1.1')
- hosts = inventory.list_hosts()
- self.assertEqual(sorted(hosts), sorted(['FE80:EF45::12:1','192.168.1.1']))
-
- def test_simple_string_ipv6_vars(self):
- inventory = Inventory('[FE80:EF45::12:1]:2222,192.168.1.1')
- var = inventory.get_variables('FE80:EF45::12:1')
- self.assertEqual(var['ansible_ssh_port'], 2222)
-
- def test_simple_string_fqdn(self):
- inventory = Inventory('foo.example.com,bar.example.com')
- hosts = inventory.list_hosts()
- self.assertEqual(sorted(hosts), sorted(['foo.example.com','bar.example.com']))
-
- def test_simple_string_fqdn_port(self):
- inventory = Inventory('foo.example.com:2222,bar.example.com')
- hosts = inventory.list_hosts()
- self.assertEqual(sorted(hosts), sorted(['foo.example.com','bar.example.com']))
-
- def test_simple_string_fqdn_vars(self):
- inventory = Inventory('foo.example.com:2222,bar.example.com')
- var = inventory.get_variables('foo.example.com')
- self.assertEqual(var['ansible_ssh_port'], 2222)
-
- def test_simple_vars(self):
- inventory = self.simple_inventory()
- vars = inventory.get_variables('thor')
-
- assert vars == {'group_names': ['norse'],
- 'inventory_hostname': 'thor',
- 'inventory_hostname_short': 'thor'}
-
- def test_simple_port(self):
- inventory = self.simple_inventory()
- vars = inventory.get_variables('hera')
-
- expected = { 'ansible_ssh_port': 3000,
- 'group_names': ['greek'],
- 'inventory_hostname': 'hera',
- 'inventory_hostname_short': 'hera' }
- assert vars == expected
-
- def test_large_range(self):
- inventory = self.large_range_inventory()
- hosts = inventory.list_hosts()
- self.assertEqual(sorted(hosts), sorted('bob%03i' %i for i in range(0, 143)))
-
- def test_subset(self):
- inventory = self.simple_inventory()
- inventory.subset('odin;thor,loki')
- self.assertEqual(sorted(inventory.list_hosts()), sorted(['thor','odin','loki']))
-
- def test_subset_range(self):
- inventory = self.simple_inventory()
- inventory.subset('greek[0-2];norse[0]')
- self.assertEqual(sorted(inventory.list_hosts()), sorted(['zeus','hera','thor']))
-
- def test_subet_range_empty_group(self):
- inventory = self.simple_inventory()
- inventory.subset('missing[0]')
- self.assertEqual(sorted(inventory.list_hosts()), sorted([]))
-
- def test_subset_filename(self):
- inventory = self.simple_inventory()
- inventory.subset('@' + os.path.join(self.test_dir, 'restrict_pattern'))
- self.assertEqual(sorted(inventory.list_hosts()), sorted(['thor','odin']))
-
- @raises(errors.AnsibleError)
- def testinvalid_entry(self):
- Inventory('1234')
-
- ###################################################
- ### INI file advanced tests
-
- def test_complex_vars(self):
- inventory = self.complex_inventory()
-
- vars = inventory.get_variables('rtp_a')
- print vars
-
- expected = dict(
- a=1, b=2, c=3, d=10002, e=10003, f='10004 != 10005',
- g=' g ', h=' h ', i="' i \"", j='" j',
- k=[ 'k1', 'k2' ],
- rga=1, rgb=2, rgc=3,
- inventory_hostname='rtp_a', inventory_hostname_short='rtp_a',
- group_names=[ 'eastcoast', 'nc', 'redundantgroup', 'redundantgroup2', 'redundantgroup3', 'rtp', 'us' ]
- )
- print vars
- print expected
- assert vars == expected
-
- def test_complex_group_names(self):
- inventory = self.complex_inventory()
- tests = {
- 'host1': [ 'role1', 'role3' ],
- 'host2': [ 'role1', 'role2' ],
- 'host3': [ 'role2', 'role3' ]
- }
- for host, roles in tests.iteritems():
- group_names = inventory.get_variables(host)['group_names']
- assert sorted(group_names) == sorted(roles)
-
- def test_complex_exclude(self):
- inventory = self.complex_inventory()
- hosts = inventory.list_hosts("nc:florida:!triangle:!orlando")
- expected_hosts = ['miami', 'rtp_a', 'rtp_b', 'rtp_c']
- print "HOSTS=%s" % sorted(hosts)
- print "EXPECTED=%s" % sorted(expected_hosts)
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_regex_exclude(self):
- inventory = self.complex_inventory()
- hosts = inventory.list_hosts("~rtp_[ac]")
- expected_hosts = ['rtp_a', 'rtp_c']
- print "HOSTS=%s" % sorted(hosts)
- print "EXPECTED=%s" % sorted(expected_hosts)
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_regex_grouping(self):
- inventory = self.simple_inventory()
- hosts = inventory.list_hosts("~(cer[a-z]|berc)(erus00[13])")
- expected_hosts = ['cerberus001', 'cerberus003']
- print "HOSTS=%s" % sorted(hosts)
- print "EXPECTED=%s" % sorted(expected_hosts)
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_complex_enumeration(self):
-
-
- expected1 = ['rtp_b']
- expected2 = ['rtp_a', 'rtp_b']
- expected3 = ['rtp_a', 'rtp_b', 'rtp_c', 'tri_a', 'tri_b', 'tri_c']
- expected4 = ['rtp_b', 'orlando' ]
- expected5 = ['blade-a-1']
-
- inventory = self.complex_inventory()
- hosts = inventory.list_hosts("nc[1]")
- self.compare(hosts, expected1, sort=False)
- hosts = inventory.list_hosts("nc[0-2]")
- self.compare(hosts, expected2, sort=False)
- hosts = inventory.list_hosts("nc[0-99999]")
- self.compare(hosts, expected3, sort=False)
- hosts = inventory.list_hosts("nc[1-2]:florida[0-1]")
- self.compare(hosts, expected4, sort=False)
- hosts = inventory.list_hosts("blade-a-1")
- self.compare(hosts, expected5, sort=False)
-
- def test_complex_intersect(self):
- inventory = self.complex_inventory()
- hosts = inventory.list_hosts("nc:&redundantgroup:!rtp_c")
- self.compare(hosts, ['rtp_a'])
- hosts = inventory.list_hosts("nc:&triangle:!tri_c")
- self.compare(hosts, ['tri_a', 'tri_b'])
-
- @raises(errors.AnsibleError)
- def test_invalid_range(self):
- Inventory(os.path.join(self.test_dir, 'inventory','test_incorrect_range'))
-
- @raises(errors.AnsibleError)
- def test_missing_end(self):
- Inventory(os.path.join(self.test_dir, 'inventory','test_missing_end'))
-
- @raises(errors.AnsibleError)
- def test_incorrect_format(self):
- Inventory(os.path.join(self.test_dir, 'inventory','test_incorrect_format'))
-
- @raises(errors.AnsibleError)
- def test_alpha_end_before_beg(self):
- Inventory(os.path.join(self.test_dir, 'inventory','test_alpha_end_before_beg'))
-
- def test_combined_range(self):
- i = Inventory(os.path.join(self.test_dir, 'inventory','test_combined_range'))
- hosts = i.list_hosts('test')
- expected_hosts=['host1A','host2A','host1B','host2B']
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_leading_range(self):
- i = Inventory(os.path.join(self.test_dir, 'inventory','test_leading_range'))
- hosts = i.list_hosts('test')
- expected_hosts=['1.host','2.host','A.host','B.host']
- assert sorted(hosts) == sorted(expected_hosts)
-
- hosts2 = i.list_hosts('test2')
- expected_hosts2=['1.host','2.host','3.host']
- assert sorted(hosts2) == sorted(expected_hosts2)
-
- ###################################################
- ### Inventory API tests
-
- def test_script(self):
- inventory = self.script_inventory()
- hosts = inventory.list_hosts()
-
- expected_hosts=['jupiter', 'saturn', 'zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']
-
- print "Expected: %s"%(expected_hosts)
- print "Got : %s"%(hosts)
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_script_all(self):
- inventory = self.script_inventory()
- hosts = inventory.list_hosts('all')
-
- expected_hosts=['jupiter', 'saturn', 'zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_script_norse(self):
- inventory = self.script_inventory()
- hosts = inventory.list_hosts("norse")
-
- expected_hosts=['thor', 'odin', 'loki']
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_script_combined(self):
- inventory = self.script_inventory()
- hosts = inventory.list_hosts("norse:greek")
-
- expected_hosts=['zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_script_restrict(self):
- inventory = self.script_inventory()
-
- restricted_hosts = ['hera', 'poseidon', 'thor']
- expected_hosts=['zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']
-
- inventory.restrict_to(restricted_hosts)
- hosts = inventory.list_hosts("norse:greek")
-
- assert sorted(hosts) == sorted(restricted_hosts)
-
- inventory.lift_restriction()
- hosts = inventory.list_hosts("norse:greek")
-
- assert sorted(hosts) == sorted(expected_hosts)
-
- def test_script_vars(self):
- inventory = self.script_inventory()
- vars = inventory.get_variables('thor')
-
- print "VARS=%s" % vars
-
- assert vars == {'hammer':True,
- 'group_names': ['norse'],
- 'inventory_hostname': 'thor',
- 'inventory_hostname_short': 'thor'}
-
- def test_hosts_list(self):
- # Test the case when playbook 'hosts' var is a list.
- inventory = self.script_inventory()
- host_names = sorted(['thor', 'loki', 'odin']) # Not sure if sorting is in the contract or not
- actual_hosts = inventory.get_hosts(host_names)
- actual_host_names = [host.name for host in actual_hosts]
- assert host_names == actual_host_names
-
- def test_script_multiple_groups(self):
- inventory = self.script_inventory()
- vars = inventory.get_variables('zeus')
-
- print "VARS=%s" % vars
-
- assert vars == {'inventory_hostname': 'zeus',
- 'inventory_hostname_short': 'zeus',
- 'group_names': ['greek', 'major-god']}
-
- def test_allows_equals_sign_in_var(self):
- inventory = self.simple_inventory()
- auth = inventory.get_variables('neptun')['auth']
- assert auth == 'YWRtaW46YWRtaW4='
-
- def test_dir_inventory(self):
- inventory = self.dir_inventory()
-
- host_vars = inventory.get_variables('zeus')
-
- expected_vars = {'inventory_hostname': 'zeus',
- 'inventory_hostname_short': 'zeus',
- 'group_names': ['greek', 'major-god'],
- 'var_a': '3#4'}
-
- print "HOST VARS=%s" % host_vars
- print "EXPECTED VARS=%s" % expected_vars
-
- assert host_vars == expected_vars
-
- def test_dir_inventory_multiple_groups(self):
- inventory = self.dir_inventory()
- group_greek = inventory.get_hosts('greek')
- actual_host_names = [host.name for host in group_greek]
- print "greek : %s " % actual_host_names
- assert actual_host_names == ['zeus', 'morpheus']
-
- def test_dir_inventory_skip_extension(self):
- inventory = self.dir_inventory()
- assert 'skipme' not in [h.name for h in inventory.get_hosts()]
-
- def test_dir_inventory_group_hosts(self):
- inventory = self.dir_inventory()
- expected_groups = {'all': ['morpheus', 'thor', 'zeus'],
- 'major-god': ['thor', 'zeus'],
- 'minor-god': ['morpheus'],
- 'norse': ['thor'],
- 'greek': ['morpheus', 'zeus'],
- 'ungrouped': []}
-
- actual_groups = {}
- for group in inventory.get_groups():
- actual_groups[group.name] = sorted([h.name for h in group.get_hosts()])
- print "INVENTORY groups[%s].hosts=%s" % (group.name, actual_groups[group.name])
- print "EXPECTED groups[%s].hosts=%s" % (group.name, expected_groups[group.name])
-
- assert actual_groups == expected_groups
-
- def test_dir_inventory_groups_for_host(self):
- inventory = self.dir_inventory()
- expected_groups_for_host = {'morpheus': ['all', 'greek', 'minor-god'],
- 'thor': ['all', 'major-god', 'norse'],
- 'zeus': ['all', 'greek', 'major-god']}
-
- actual_groups_for_host = {}
- for (host, expected) in expected_groups_for_host.iteritems():
- groups = inventory.groups_for_host(host)
- names = sorted([g.name for g in groups])
- actual_groups_for_host[host] = names
- print "INVENTORY groups_for_host(%s)=%s" % (host, names)
- print "EXPECTED groups_for_host(%s)=%s" % (host, expected)
-
- assert actual_groups_for_host == expected_groups_for_host
-
- def test_dir_inventory_groups_list(self):
- inventory = self.dir_inventory()
- inventory_groups = inventory.groups_list()
-
- expected_groups = {'all': ['morpheus', 'thor', 'zeus'],
- 'major-god': ['thor', 'zeus'],
- 'minor-god': ['morpheus'],
- 'norse': ['thor'],
- 'greek': ['morpheus', 'zeus'],
- 'ungrouped': []}
-
- for (name, expected_hosts) in expected_groups.iteritems():
- inventory_groups[name] = sorted(inventory_groups.get(name, []))
- print "INVENTORY groups_list['%s']=%s" % (name, inventory_groups[name])
- print "EXPECTED groups_list['%s']=%s" % (name, expected_hosts)
-
- assert inventory_groups == expected_groups
-
diff --git a/test/units/TestModuleUtilsBasic.py b/test/units/TestModuleUtilsBasic.py
deleted file mode 100644
index 5b8be28307..0000000000
--- a/test/units/TestModuleUtilsBasic.py
+++ /dev/null
@@ -1,334 +0,0 @@
-import os
-import tempfile
-
-import unittest
-from nose.tools import raises
-from nose.tools import timed
-
-from ansible import errors
-from ansible.module_common import ModuleReplacer
-from ansible.module_utils.basic import heuristic_log_sanitize
-from ansible.utils import checksum as utils_checksum
-
-TEST_MODULE_DATA = """
-from ansible.module_utils.basic import *
-
-def get_module():
- return AnsibleModule(
- argument_spec = dict(),
- supports_check_mode = True,
- no_log = True,
- )
-
-get_module()
-
-"""
-
-class TestModuleUtilsBasic(unittest.TestCase):
-
- def cleanup_temp_file(self, fd, path):
- try:
- os.close(fd)
- os.remove(path)
- except:
- pass
-
- def cleanup_temp_dir(self, path):
- try:
- os.rmdir(path)
- except:
- pass
-
- def setUp(self):
- # create a temporary file for the test module
- # we're about to generate
- self.tmp_fd, self.tmp_path = tempfile.mkstemp()
- os.write(self.tmp_fd, TEST_MODULE_DATA)
-
- # template the module code and eval it
- module_data, module_style, shebang = ModuleReplacer().modify_module(self.tmp_path, {}, "", {})
-
- d = {}
- exec(module_data, d, d)
- self.module = d['get_module']()
-
- # module_utils/basic.py screws with CWD, let's save it and reset
- self.cwd = os.getcwd()
-
- def tearDown(self):
- self.cleanup_temp_file(self.tmp_fd, self.tmp_path)
- # Reset CWD back to what it was before basic.py changed it
- os.chdir(self.cwd)
-
- #################################################################################
- # run_command() tests
-
- # test run_command with a string command
- def test_run_command_string(self):
- (rc, out, err) = self.module.run_command("/bin/echo -n 'foo bar'")
- self.assertEqual(rc, 0)
- self.assertEqual(out, 'foo bar')
- (rc, out, err) = self.module.run_command("/bin/echo -n 'foo bar'", use_unsafe_shell=True)
- self.assertEqual(rc, 0)
- self.assertEqual(out, 'foo bar')
-
- # test run_command with an array of args (with both use_unsafe_shell=True|False)
- def test_run_command_args(self):
- (rc, out, err) = self.module.run_command(['/bin/echo', '-n', "foo bar"])
- self.assertEqual(rc, 0)
- self.assertEqual(out, 'foo bar')
- (rc, out, err) = self.module.run_command(['/bin/echo', '-n', "foo bar"], use_unsafe_shell=True)
- self.assertEqual(rc, 0)
- self.assertEqual(out, 'foo bar')
-
- # test run_command with leading environment variables
- @raises(SystemExit)
- def test_run_command_string_with_env_variables(self):
- self.module.run_command('FOO=bar /bin/echo -n "foo bar"')
-
- @raises(SystemExit)
- def test_run_command_args_with_env_variables(self):
- self.module.run_command(['FOO=bar', '/bin/echo', '-n', 'foo bar'])
-
- def test_run_command_string_unsafe_with_env_variables(self):
- (rc, out, err) = self.module.run_command('FOO=bar /bin/echo -n "foo bar"', use_unsafe_shell=True)
- self.assertEqual(rc, 0)
- self.assertEqual(out, 'foo bar')
-
- # test run_command with a command pipe (with both use_unsafe_shell=True|False)
- def test_run_command_string_unsafe_with_pipe(self):
- (rc, out, err) = self.module.run_command('echo "foo bar" | cat', use_unsafe_shell=True)
- self.assertEqual(rc, 0)
- self.assertEqual(out, 'foo bar\n')
-
- # test run_command with a shell redirect in (with both use_unsafe_shell=True|False)
- def test_run_command_string_unsafe_with_redirect_in(self):
- (rc, out, err) = self.module.run_command('cat << EOF\nfoo bar\nEOF', use_unsafe_shell=True)
- self.assertEqual(rc, 0)
- self.assertEqual(out, 'foo bar\n')
-
- # test run_command with a shell redirect out (with both use_unsafe_shell=True|False)
- def test_run_command_string_unsafe_with_redirect_out(self):
- tmp_fd, tmp_path = tempfile.mkstemp()
- try:
- (rc, out, err) = self.module.run_command('echo "foo bar" > %s' % tmp_path, use_unsafe_shell=True)
- self.assertEqual(rc, 0)
- self.assertTrue(os.path.exists(tmp_path))
- checksum = utils_checksum(tmp_path)
- self.assertEqual(checksum, 'd53a205a336e07cf9eac45471b3870f9489288ec')
- except:
- raise
- finally:
- self.cleanup_temp_file(tmp_fd, tmp_path)
-
- # test run_command with a double shell redirect out (append) (with both use_unsafe_shell=True|False)
- def test_run_command_string_unsafe_with_double_redirect_out(self):
- tmp_fd, tmp_path = tempfile.mkstemp()
- try:
- (rc, out, err) = self.module.run_command('echo "foo bar" >> %s' % tmp_path, use_unsafe_shell=True)
- self.assertEqual(rc, 0)
- self.assertTrue(os.path.exists(tmp_path))
- checksum = utils_checksum(tmp_path)
- self.assertEqual(checksum, 'd53a205a336e07cf9eac45471b3870f9489288ec')
- except:
- raise
- finally:
- self.cleanup_temp_file(tmp_fd, tmp_path)
-
- # test run_command with data
- def test_run_command_string_with_data(self):
- (rc, out, err) = self.module.run_command('cat', data='foo bar')
- self.assertEqual(rc, 0)
- self.assertEqual(out, 'foo bar\n')
-
- # test run_command with binary data
- def test_run_command_string_with_binary_data(self):
- (rc, out, err) = self.module.run_command('cat', data='\x41\x42\x43\x44', binary_data=True)
- self.assertEqual(rc, 0)
- self.assertEqual(out, 'ABCD')
-
- # test run_command with a cwd set
- def test_run_command_string_with_cwd(self):
- tmp_path = tempfile.mkdtemp()
- try:
- (rc, out, err) = self.module.run_command('pwd', cwd=tmp_path)
- self.assertEqual(rc, 0)
- self.assertTrue(os.path.exists(tmp_path))
- self.assertEqual(out.strip(), os.path.realpath(tmp_path))
- except:
- raise
- finally:
- self.cleanup_temp_dir(tmp_path)
-
-
-class TestModuleUtilsBasicHelpers(unittest.TestCase):
- ''' Test some implementation details of AnsibleModule
-
- Some pieces of AnsibleModule are implementation details but they have
- potential cornercases that we need to check. Go ahead and test at
- this level that the functions are behaving even though their API may
- change and we'd have to rewrite these tests so that we know that we
- need to check for those problems in any rewrite.
-
- In the future we might want to restructure higher level code to be
- friendlier to unittests so that we can test at the level that the public
- is interacting with the APIs.
- '''
-
- MANY_RECORDS = 7000
- URL_SECRET = 'http://username:pas:word@foo.com/data'
- SSH_SECRET = 'username:pas:word@foo.com/data'
-
- def cleanup_temp_file(self, fd, path):
- try:
- os.close(fd)
- os.remove(path)
- except:
- pass
-
- def cleanup_temp_dir(self, path):
- try:
- os.rmdir(path)
- except:
- pass
-
- def _gen_data(self, records, per_rec, top_level, secret_text):
- hostvars = {'hostvars': {}}
- for i in range(1, records, 1):
- host_facts = {'host%s' % i:
- {'pstack':
- {'running': '875.1',
- 'symlinked': '880.0',
- 'tars': [],
- 'versions': ['885.0']},
- }}
-
- if per_rec:
- host_facts['host%s' % i]['secret'] = secret_text
- hostvars['hostvars'].update(host_facts)
- if top_level:
- hostvars['secret'] = secret_text
- return hostvars
-
- def setUp(self):
- self.many_url = repr(self._gen_data(self.MANY_RECORDS, True, True,
- self.URL_SECRET))
- self.many_ssh = repr(self._gen_data(self.MANY_RECORDS, True, True,
- self.SSH_SECRET))
- self.one_url = repr(self._gen_data(self.MANY_RECORDS, False, True,
- self.URL_SECRET))
- self.one_ssh = repr(self._gen_data(self.MANY_RECORDS, False, True,
- self.SSH_SECRET))
- self.zero_secrets = repr(self._gen_data(self.MANY_RECORDS, False,
- False, ''))
- self.few_url = repr(self._gen_data(2, True, True, self.URL_SECRET))
- self.few_ssh = repr(self._gen_data(2, True, True, self.SSH_SECRET))
-
- # create a temporary file for the test module
- # we're about to generate
- self.tmp_fd, self.tmp_path = tempfile.mkstemp()
- os.write(self.tmp_fd, TEST_MODULE_DATA)
-
- # template the module code and eval it
- module_data, module_style, shebang = ModuleReplacer().modify_module(self.tmp_path, {}, "", {})
-
- d = {}
- exec(module_data, d, d)
- self.module = d['get_module']()
-
- # module_utils/basic.py screws with CWD, let's save it and reset
- self.cwd = os.getcwd()
-
- def tearDown(self):
- self.cleanup_temp_file(self.tmp_fd, self.tmp_path)
- # Reset CWD back to what it was before basic.py changed it
- os.chdir(self.cwd)
-
-
- #################################################################################
-
- #
- # Speed tests
- #
-
- # Previously, we used regexes which had some pathologically slow cases for
- # parameters with large amounts of data with many ':' but no '@'. The
- # present function gets slower when there are many replacements so we may
- # want to explore regexes in the future (for the speed when substituting
- # or flexibility). These speed tests will hopefully tell us if we're
- # introducing code that has cases that are simply too slow.
- #
- # Some regex notes:
- # * re.sub() is faster than re.match() + str.join().
- # * We may be able to detect a large number of '@' symbols and then use
- # a regex else use the present function.
-
- @timed(5)
- def test_log_sanitize_speed_many_url(self):
- heuristic_log_sanitize(self.many_url)
-
- @timed(5)
- def test_log_sanitize_speed_many_ssh(self):
- heuristic_log_sanitize(self.many_ssh)
-
- @timed(5)
- def test_log_sanitize_speed_one_url(self):
- heuristic_log_sanitize(self.one_url)
-
- @timed(5)
- def test_log_sanitize_speed_one_ssh(self):
- heuristic_log_sanitize(self.one_ssh)
-
- @timed(5)
- def test_log_sanitize_speed_zero_secrets(self):
- heuristic_log_sanitize(self.zero_secrets)
-
- #
- # Test that the password obfuscation sanitizes somewhat cleanly.
- #
-
- def test_log_sanitize_correctness(self):
- url_data = repr(self._gen_data(3, True, True, self.URL_SECRET))
- ssh_data = repr(self._gen_data(3, True, True, self.SSH_SECRET))
-
- url_output = heuristic_log_sanitize(url_data)
- ssh_output = heuristic_log_sanitize(ssh_data)
-
- # Basic functionality: Successfully hid the password
- try:
- self.assertNotIn('pas:word', url_output)
- self.assertNotIn('pas:word', ssh_output)
-
- # Slightly more advanced, we hid all of the password despite the ":"
- self.assertNotIn('pas', url_output)
- self.assertNotIn('pas', ssh_output)
- except AttributeError:
- # python2.6 or less's unittest
- self.assertFalse('pas:word' in url_output, '%s is present in %s' % ('"pas:word"', url_output))
- self.assertFalse('pas:word' in ssh_output, '%s is present in %s' % ('"pas:word"', ssh_output))
-
- self.assertFalse('pas' in url_output, '%s is present in %s' % ('"pas"', url_output))
- self.assertFalse('pas' in ssh_output, '%s is present in %s' % ('"pas"', ssh_output))
-
- # In this implementation we replace the password with 8 "*" which is
- # also the length of our password. The url fields should be able to
- # accurately detect where the password ends so the length should be
- # the same:
- self.assertEqual(len(url_output), len(url_data))
-
- # ssh checking is harder as the heuristic is overzealous in many
- # cases. Since the input will have at least one ":" present before
- # the password we can tell some things about the beginning and end of
- # the data, though:
- self.assertTrue(ssh_output.startswith("{'"))
- self.assertTrue(ssh_output.endswith("}"))
- try:
- self.assertIn(":********@foo.com/data'", ssh_output)
- except AttributeError:
- # python2.6 or less's unittest
- self.assertTrue(":********@foo.com/data'" in ssh_output, '%s is not present in %s' % (":********@foo.com/data'", ssh_output))
-
- # The overzealous-ness here may lead to us changing the algorithm in
- # the future. We could make it consume less of the data (with the
- # possibility of leaving partial passwords exposed) and encourage
- # people to use no_log instead of relying on this obfuscation.
diff --git a/test/units/TestModuleUtilsDatabase.py b/test/units/TestModuleUtilsDatabase.py
deleted file mode 100644
index 67da0b60e0..0000000000
--- a/test/units/TestModuleUtilsDatabase.py
+++ /dev/null
@@ -1,118 +0,0 @@
-import collections
-import mock
-import os
-import re
-
-from nose.tools import eq_
-try:
- from nose.tools import assert_raises_regexp
-except ImportError:
- # Python < 2.7
- def assert_raises_regexp(expected, regexp, callable, *a, **kw):
- try:
- callable(*a, **kw)
- except expected as e:
- if isinstance(regexp, basestring):
- regexp = re.compile(regexp)
- if not regexp.search(str(e)):
- raise Exception('"%s" does not match "%s"' %
- (regexp.pattern, str(e)))
- else:
- if hasattr(expected,'__name__'): excName = expected.__name__
- else: excName = str(expected)
- raise AssertionError("%s not raised" % excName)
-
-from ansible.module_utils.database import (
- pg_quote_identifier,
- SQLParseError,
-)
-
-
-# Note: Using nose's generator test cases here so we can't inherit from
-# unittest.TestCase
-class TestQuotePgIdentifier(object):
-
- # These are all valid strings
- # The results are based on interpreting the identifier as a table name
- valid = {
- # User quoted
- '"public.table"': '"public.table"',
- '"public"."table"': '"public"."table"',
- '"schema test"."table test"': '"schema test"."table test"',
-
- # We quote part
- 'public.table': '"public"."table"',
- '"public".table': '"public"."table"',
- 'public."table"': '"public"."table"',
- 'schema test.table test': '"schema test"."table test"',
- '"schema test".table test': '"schema test"."table test"',
- 'schema test."table test"': '"schema test"."table test"',
-
- # Embedded double quotes
- 'table "test"': '"table ""test"""',
- 'public."table ""test"""': '"public"."table ""test"""',
- 'public.table "test"': '"public"."table ""test"""',
- 'schema "test".table': '"schema ""test"""."table"',
- '"schema ""test""".table': '"schema ""test"""."table"',
- '"""wat"""."""test"""': '"""wat"""."""test"""',
- # Sigh, handle these as well:
- '"no end quote': '"""no end quote"',
- 'schema."table': '"schema"."""table"',
- '"schema.table': '"""schema"."table"',
- 'schema."table.something': '"schema"."""table"."something"',
-
- # Embedded dots
- '"schema.test"."table.test"': '"schema.test"."table.test"',
- '"schema.".table': '"schema."."table"',
- '"schema."."table"': '"schema."."table"',
- 'schema.".table"': '"schema".".table"',
- '"schema".".table"': '"schema".".table"',
- '"schema.".".table"': '"schema.".".table"',
- # These are valid but maybe not what the user intended
- '."table"': '".""table"""',
- 'table.': '"table."',
- }
-
- invalid = {
- ('test.too.many.dots', 'table'): 'PostgreSQL does not support table with more than 3 dots',
- ('"test.too".many.dots', 'database'): 'PostgreSQL does not support database with more than 1 dots',
- ('test.too."many.dots"', 'database'): 'PostgreSQL does not support database with more than 1 dots',
- ('"test"."too"."many"."dots"', 'database'): "PostgreSQL does not support database with more than 1 dots",
- ('"test"."too"."many"."dots"', 'schema'): "PostgreSQL does not support schema with more than 2 dots",
- ('"test"."too"."many"."dots"', 'table'): "PostgreSQL does not support table with more than 3 dots",
- ('"test"."too"."many"."dots"."for"."column"', 'column'): "PostgreSQL does not support column with more than 4 dots",
- ('"table "invalid" double quote"', 'table'): 'User escaped identifiers must escape extra quotes',
- ('"schema "invalid"""."table "invalid"', 'table'): 'User escaped identifiers must escape extra quotes',
- ('"schema."table"','table'): 'User escaped identifiers must escape extra quotes',
- ('"schema".', 'table'): 'Identifier name unspecified or unquoted trailing dot',
- }
-
- def check_valid_quotes(self, identifier, quoted_identifier):
- eq_(pg_quote_identifier(identifier, 'table'), quoted_identifier)
-
- def test_valid_quotes(self):
- for identifier in self.valid:
- yield self.check_valid_quotes, identifier, self.valid[identifier]
-
- def check_invalid_quotes(self, identifier, id_type, msg):
- assert_raises_regexp(SQLParseError, msg, pg_quote_identifier, *(identifier, id_type))
-
- def test_invalid_quotes(self):
- for test in self.invalid:
- yield self.check_invalid_quotes, test[0], test[1], self.invalid[test]
-
- def test_how_many_dots(self):
- eq_(pg_quote_identifier('role', 'role'), '"role"')
- assert_raises_regexp(SQLParseError, "PostgreSQL does not support role with more than 1 dots", pg_quote_identifier, *('role.more', 'role'))
-
- eq_(pg_quote_identifier('db', 'database'), '"db"')
- assert_raises_regexp(SQLParseError, "PostgreSQL does not support database with more than 1 dots", pg_quote_identifier, *('db.more', 'database'))
-
- eq_(pg_quote_identifier('db.schema', 'schema'), '"db"."schema"')
- assert_raises_regexp(SQLParseError, "PostgreSQL does not support schema with more than 2 dots", pg_quote_identifier, *('db.schema.more', 'schema'))
-
- eq_(pg_quote_identifier('db.schema.table', 'table'), '"db"."schema"."table"')
- assert_raises_regexp(SQLParseError, "PostgreSQL does not support table with more than 3 dots", pg_quote_identifier, *('db.schema.table.more', 'table'))
-
- eq_(pg_quote_identifier('db.schema.table.column', 'column'), '"db"."schema"."table"."column"')
- assert_raises_regexp(SQLParseError, "PostgreSQL does not support column with more than 4 dots", pg_quote_identifier, *('db.schema.table.column.more', 'column'))
diff --git a/test/units/TestModules.py b/test/units/TestModules.py
deleted file mode 100644
index aef2e83ed6..0000000000
--- a/test/units/TestModules.py
+++ /dev/null
@@ -1,32 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import os
-import ast
-import unittest
-from ansible import utils
-
-
-class TestModules(unittest.TestCase):
-
- def list_all_modules(self):
- paths = utils.plugins.module_finder._get_paths()
- paths = [x for x in paths if os.path.isdir(x)]
- module_list = []
- for path in paths:
- for (dirpath, dirnames, filenames) in os.walk(path):
- for filename in filenames:
- (path, ext) = os.path.splitext(filename)
- if ext == ".py":
- module_list.append(os.path.join(dirpath, filename))
- return module_list
-
- def test_ast_parse(self):
- module_list = self.list_all_modules()
- ERRORS = []
- # attempt to parse each module with ast
- for m in module_list:
- try:
- ast.parse(''.join(open(m)))
- except Exception, e:
- ERRORS.append((m, e))
- assert len(ERRORS) == 0, "get_docstring errors: %s" % ERRORS
diff --git a/test/units/TestPlayVarsFiles.py b/test/units/TestPlayVarsFiles.py
deleted file mode 100644
index 9d42b73e8b..0000000000
--- a/test/units/TestPlayVarsFiles.py
+++ /dev/null
@@ -1,390 +0,0 @@
-#!/usr/bin/env python
-
-import os
-import shutil
-from tempfile import mkstemp
-from tempfile import mkdtemp
-from ansible.playbook.play import Play
-import ansible
-
-import unittest
-from nose.plugins.skip import SkipTest
-
-
-class FakeCallBacks(object):
- def __init__(self):
- pass
- def on_vars_prompt(self):
- pass
- def on_import_for_host(self, host, filename):
- pass
-
-class FakeInventory(object):
- def __init__(self):
- self.hosts = {}
- def basedir(self):
- return "."
- def src(self):
- return "fakeinventory"
- def get_variables(self, host, vault_password=None):
- if host in self.hosts:
- return self.hosts[host]
- else:
- return {}
-
-class FakePlayBook(object):
- def __init__(self):
- self.extra_vars = {}
- self.remote_user = None
- self.remote_port = None
- self.sudo = None
- self.sudo_user = None
- self.su = None
- self.su_user = None
- self.become = None
- self.become_method = None
- self.become_user = None
- self.transport = None
- self.only_tags = None
- self.skip_tags = None
- self.force_handlers = None
- self.VARS_CACHE = {}
- self.SETUP_CACHE = {}
- self.inventory = FakeInventory()
- self.callbacks = FakeCallBacks()
-
- self.VARS_CACHE['localhost'] = {}
-
-
-class TestMe(unittest.TestCase):
-
- ########################################
- # BASIC FILE LOADING BEHAVIOR TESTS
- ########################################
-
- def test_play_constructor(self):
- # __init__(self, playbook, ds, basedir, vault_password=None)
- playbook = FakePlayBook()
- ds = { "hosts": "localhost"}
- basedir = "."
- play = Play(playbook, ds, basedir)
-
- def test_vars_file(self):
-
- # make a vars file
- fd, temp_path = mkstemp()
- f = open(temp_path, "wb")
- f.write("foo: bar\n")
- f.close()
-
- # create a play with a vars_file
- playbook = FakePlayBook()
- ds = { "hosts": "localhost",
- "vars_files": [temp_path]}
- basedir = "."
- play = Play(playbook, ds, basedir)
- os.remove(temp_path)
-
- # make sure the variable was loaded
- assert 'foo' in play.vars_file_vars, "vars_file was not loaded into play.vars_file_vars"
- assert play.vars_file_vars['foo'] == 'bar', "foo was not set to bar in play.vars_file_vars"
-
- def test_vars_file_nonlist_error(self):
-
- # make a vars file
- fd, temp_path = mkstemp()
- f = open(temp_path, "wb")
- f.write("foo: bar\n")
- f.close()
-
- # create a play with a string for vars_files
- playbook = FakePlayBook()
- ds = { "hosts": "localhost",
- "vars_files": temp_path}
- basedir = "."
- error_hit = False
- try:
- play = Play(playbook, ds, basedir)
- except:
- error_hit = True
- os.remove(temp_path)
-
- assert error_hit == True, "no error was thrown when vars_files was not a list"
-
-
- def test_multiple_vars_files(self):
-
- # make a vars file
- fd, temp_path = mkstemp()
- f = open(temp_path, "wb")
- f.write("foo: bar\n")
- f.close()
-
- # make a second vars file
- fd, temp_path2 = mkstemp()
- f = open(temp_path2, "wb")
- f.write("baz: bang\n")
- f.close()
-
-
- # create a play with two vars_files
- playbook = FakePlayBook()
- ds = { "hosts": "localhost",
- "vars_files": [temp_path, temp_path2]}
- basedir = "."
- play = Play(playbook, ds, basedir)
- os.remove(temp_path)
- os.remove(temp_path2)
-
- # make sure the variables were loaded
- assert 'foo' in play.vars_file_vars, "vars_file was not loaded into play.vars_file_vars"
- assert play.vars_file_vars['foo'] == 'bar', "foo was not set to bar in play.vars_file_vars"
- assert 'baz' in play.vars_file_vars, "vars_file2 was not loaded into play.vars_file_vars"
- assert play.vars_file_vars['baz'] == 'bang', "baz was not set to bang in play.vars_file_vars"
-
- def test_vars_files_first_found(self):
-
- # make a vars file
- fd, temp_path = mkstemp()
- f = open(temp_path, "wb")
- f.write("foo: bar\n")
- f.close()
-
- # get a random file path
- fd, temp_path2 = mkstemp()
- # make sure this file doesn't exist
- os.remove(temp_path2)
-
- # create a play
- playbook = FakePlayBook()
- ds = { "hosts": "localhost",
- "vars_files": [[temp_path2, temp_path]]}
- basedir = "."
- play = Play(playbook, ds, basedir)
- os.remove(temp_path)
-
- # make sure the variable was loaded
- assert 'foo' in play.vars_file_vars, "vars_file was not loaded into play.vars_file_vars"
- assert play.vars_file_vars['foo'] == 'bar', "foo was not set to bar in play.vars_file_vars"
-
- def test_vars_files_multiple_found(self):
-
- # make a vars file
- fd, temp_path = mkstemp()
- f = open(temp_path, "wb")
- f.write("foo: bar\n")
- f.close()
-
- # make a second vars file
- fd, temp_path2 = mkstemp()
- f = open(temp_path2, "wb")
- f.write("baz: bang\n")
- f.close()
-
- # create a play
- playbook = FakePlayBook()
- ds = { "hosts": "localhost",
- "vars_files": [[temp_path, temp_path2]]}
- basedir = "."
- play = Play(playbook, ds, basedir)
- os.remove(temp_path)
- os.remove(temp_path2)
-
- # make sure the variables were loaded
- assert 'foo' in play.vars_file_vars, "vars_file was not loaded into play.vars_file_vars"
- assert play.vars_file_vars['foo'] == 'bar', "foo was not set to bar in play.vars_file_vars"
- assert 'baz' not in play.vars_file_vars, "vars_file2 was loaded after vars_file1 was loaded"
-
- def test_vars_files_assert_all_found(self):
-
- # make a vars file
- fd, temp_path = mkstemp()
- f = open(temp_path, "wb")
- f.write("foo: bar\n")
- f.close()
-
- # make a second vars file
- fd, temp_path2 = mkstemp()
- # make sure it doesn't exist
- os.remove(temp_path2)
-
- # create a play
- playbook = FakePlayBook()
- ds = { "hosts": "localhost",
- "vars_files": [temp_path, temp_path2]}
- basedir = "."
-
- error_hit = False
- error_msg = None
-
- try:
- play = Play(playbook, ds, basedir)
- except ansible.errors.AnsibleError, e:
- error_hit = True
- error_msg = e
-
- os.remove(temp_path)
- assert error_hit == True, "no error was thrown for missing vars_file"
-
-
- ########################################
- # VARIABLE PRECEDENCE TESTS
- ########################################
-
- # On the first run vars_files are loaded into play.vars_file_vars by host == None
- # * only files with vars from host==None will work here
- # On the secondary run(s), a host is given and the vars_files are loaded into VARS_CACHE
- # * this only occurs if host is not None, filename2 has vars in the name, and filename3 does not
-
- # filename -- the original string
- # filename2 -- filename templated with play vars
- # filename3 -- filename2 template with inject (hostvars + setup_cache + vars_cache)
- # filename4 -- path_dwim(filename3)
-
- def test_vars_files_for_host(self):
-
- # host != None
- # vars in filename2
- # no vars in filename3
-
- # make a vars file
- fd, temp_path = mkstemp()
- f = open(temp_path, "wb")
- f.write("foo: bar\n")
- f.close()
-
- # build play attributes
- playbook = FakePlayBook()
- ds = { "hosts": "localhost",
- "vars_files": ["{{ temp_path }}"]}
- basedir = "."
- playbook.VARS_CACHE['localhost']['temp_path'] = temp_path
-
- # create play and do first run
- play = Play(playbook, ds, basedir)
-
- # the second run is started by calling update_vars_files
- play.update_vars_files(['localhost'])
- os.remove(temp_path)
-
- assert 'foo' in play.playbook.VARS_CACHE['localhost'], "vars_file vars were not loaded into vars_cache"
- assert play.playbook.VARS_CACHE['localhost']['foo'] == 'bar', "foo does not equal bar"
-
-
- ########################################
- # COMPLEX FILENAME TEMPLATING TESTS
- ########################################
-
- def test_vars_files_two_vars_in_name(self):
-
- # self.vars_file_vars = ds['vars']
- # self.vars_file_vars += _get_vars() ... aka extra_vars
-
- # make a temp dir
- temp_dir = mkdtemp()
-
- # make a temp file
- fd, temp_file = mkstemp(dir=temp_dir)
- f = open(temp_file, "wb")
- f.write("foo: bar\n")
- f.close()
-
- # build play attributes
- playbook = FakePlayBook()
- ds = { "hosts": "localhost",
- "vars": { "temp_dir": os.path.dirname(temp_file),
- "temp_file": os.path.basename(temp_file) },
- "vars_files": ["{{ temp_dir + '/' + temp_file }}"]}
- basedir = "."
-
- # create play and do first run
- play = Play(playbook, ds, basedir)
-
- # cleanup
- shutil.rmtree(temp_dir)
-
- assert 'foo' in play.vars_file_vars, "double var templated vars_files filename not loaded"
-
- def test_vars_files_two_vars_different_scope(self):
-
- #
- # Use a play var and an inventory var to create the filename
- #
-
- # self.playbook.inventory.get_variables(host)
- # {'group_names': ['ungrouped'], 'inventory_hostname': 'localhost',
- # 'ansible_ssh_user': 'root', 'inventory_hostname_short': 'localhost'}
-
- # make a temp dir
- temp_dir = mkdtemp()
-
- # make a temp file
- fd, temp_file = mkstemp(dir=temp_dir)
- f = open(temp_file, "wb")
- f.write("foo: bar\n")
- f.close()
-
- # build play attributes
- playbook = FakePlayBook()
- playbook.inventory.hosts['localhost'] = {'inventory_hostname': os.path.basename(temp_file)}
- ds = { "hosts": "localhost",
- "vars": { "temp_dir": os.path.dirname(temp_file)},
- "vars_files": ["{{ temp_dir + '/' + inventory_hostname }}"]}
- basedir = "."
-
- # create play and do first run
- play = Play(playbook, ds, basedir)
-
- # do the host run
- play.update_vars_files(['localhost'])
-
- # cleanup
- shutil.rmtree(temp_dir)
-
- assert 'foo' not in play.vars_file_vars, \
- "mixed scope vars_file loaded into play vars"
- assert 'foo' in play.playbook.VARS_CACHE['localhost'], \
- "differently scoped templated vars_files filename not loaded"
- assert play.playbook.VARS_CACHE['localhost']['foo'] == 'bar', \
- "foo is not bar"
-
- def test_vars_files_two_vars_different_scope_first_found(self):
-
- #
- # Use a play var and an inventory var to create the filename
- #
-
- # make a temp dir
- temp_dir = mkdtemp()
-
- # make a temp file
- fd, temp_file = mkstemp(dir=temp_dir)
- f = open(temp_file, "wb")
- f.write("foo: bar\n")
- f.close()
-
- # build play attributes
- playbook = FakePlayBook()
- playbook.inventory.hosts['localhost'] = {'inventory_hostname': os.path.basename(temp_file)}
- ds = { "hosts": "localhost",
- "vars": { "temp_dir": os.path.dirname(temp_file)},
- "vars_files": [["{{ temp_dir + '/' + inventory_hostname }}"]]}
- basedir = "."
-
- # create play and do first run
- play = Play(playbook, ds, basedir)
-
- # do the host run
- play.update_vars_files(['localhost'])
-
- # cleanup
- shutil.rmtree(temp_dir)
-
- assert 'foo' not in play.vars_file_vars, \
- "mixed scope vars_file loaded into play vars"
- assert 'foo' in play.playbook.VARS_CACHE['localhost'], \
- "differently scoped templated vars_files filename not loaded"
- assert play.playbook.VARS_CACHE['localhost']['foo'] == 'bar', \
- "foo is not bar"
-
-
diff --git a/test/units/TestSynchronize.py b/test/units/TestSynchronize.py
deleted file mode 100644
index cf28ea5d80..0000000000
--- a/test/units/TestSynchronize.py
+++ /dev/null
@@ -1,176 +0,0 @@
-
-import unittest
-import getpass
-import os
-import shutil
-import time
-import tempfile
-from nose.plugins.skip import SkipTest
-
-from ansible.runner.action_plugins.synchronize import ActionModule as Synchronize
-
-class FakeRunner(object):
- def __init__(self):
- self.connection = None
- self.transport = None
- self.basedir = None
- self.sudo = None
- self.remote_user = None
- self.private_key_file = None
- self.check = False
- self.become = False
- self.become_method = 'sudo'
- self.become_user = False
-
- def _execute_module(self, conn, tmp, module_name, args,
- async_jid=None, async_module=None, async_limit=None, inject=None,
- persist_files=False, complex_args=None, delete_remote_tmp=True):
- self.executed_conn = conn
- self.executed_tmp = tmp
- self.executed_module_name = module_name
- self.executed_args = args
- self.executed_async_jid = async_jid
- self.executed_async_module = async_module
- self.executed_async_limit = async_limit
- self.executed_inject = inject
- self.executed_persist_files = persist_files
- self.executed_complex_args = complex_args
- self.executed_delete_remote_tmp = delete_remote_tmp
-
- def noop_on_check(self, inject):
- return self.check
-
-class FakeConn(object):
- def __init__(self):
- self.host = None
- self.delegate = None
-
-class TestSynchronize(unittest.TestCase):
-
-
- def test_synchronize_action_basic(self):
-
- """ verify the synchronize action plugin sets
- the delegate to 127.0.0.1 and remote path to user@host:/path """
-
- runner = FakeRunner()
- runner.remote_user = "root"
- runner.transport = "ssh"
- conn = FakeConn()
- inject = {
- 'inventory_hostname': "el6.lab.net",
- 'inventory_hostname_short': "el6",
- 'ansible_connection': None,
- 'ansible_ssh_user': 'root',
- 'delegate_to': None,
- 'playbook_dir': '.',
- }
-
- x = Synchronize(runner)
- x.setup("synchronize", inject)
- x.run(conn, "/tmp", "synchronize", "src=/tmp/foo dest=/tmp/bar", inject)
-
- assert runner.executed_inject['delegate_to'] == "127.0.0.1", "was not delegated to 127.0.0.1"
- assert runner.executed_complex_args == {"dest":"root@el6.lab.net:/tmp/bar", "src":"/tmp/foo"}, "wrong args used"
- assert runner.sudo == None, "sudo was not reset to None"
-
- def test_synchronize_action_sudo(self):
-
- """ verify the synchronize action plugin unsets and then sets sudo """
-
- runner = FakeRunner()
- runner.become = True
- runner.remote_user = "root"
- runner.transport = "ssh"
- conn = FakeConn()
- inject = {
- 'inventory_hostname': "el6.lab.net",
- 'inventory_hostname_short': "el6",
- 'ansible_connection': None,
- 'ansible_ssh_user': 'root',
- 'delegate_to': None,
- 'playbook_dir': '.',
- }
-
- x = Synchronize(runner)
- x.setup("synchronize", inject)
- x.run(conn, "/tmp", "synchronize", "src=/tmp/foo dest=/tmp/bar", inject)
-
- assert runner.executed_inject['delegate_to'] == "127.0.0.1", "was not delegated to 127.0.0.1"
- assert runner.executed_complex_args == {'dest':'root@el6.lab.net:/tmp/bar',
- 'src':'/tmp/foo',
- 'rsync_path':'"sudo rsync"'}, "wrong args used"
- assert runner.become == True, "sudo was not reset to True"
-
-
- def test_synchronize_action_local(self):
-
- """ verify the synchronize action plugin sets
- the delegate to 127.0.0.1 and does not alter the dest """
-
- runner = FakeRunner()
- runner.remote_user = "jtanner"
- runner.transport = "paramiko"
- conn = FakeConn()
- conn.host = "127.0.0.1"
- conn.delegate = "thishost"
- inject = {
- 'inventory_hostname': "thishost",
- 'ansible_ssh_host': '127.0.0.1',
- 'ansible_connection': 'local',
- 'delegate_to': None,
- 'playbook_dir': '.',
- }
-
- x = Synchronize(runner)
- x.setup("synchronize", inject)
- x.run(conn, "/tmp", "synchronize", "src=/tmp/foo dest=/tmp/bar", inject)
-
- assert runner.transport == "paramiko", "runner transport was changed"
- assert runner.remote_user == "jtanner", "runner remote_user was changed"
- assert runner.executed_inject['delegate_to'] == "127.0.0.1", "was not delegated to 127.0.0.1"
- assert "dest_port" not in runner.executed_complex_args, "dest_port should not have been set"
- assert runner.executed_complex_args.get("src") == "/tmp/foo", "source was set incorrectly"
- assert runner.executed_complex_args.get("dest") == "/tmp/bar", "dest was set incorrectly"
-
-
- def test_synchronize_action_vagrant(self):
-
- """ Verify the action plugin accommodates the common
- scenarios for vagrant boxes. """
-
- runner = FakeRunner()
- runner.remote_user = "jtanner"
- runner.transport = "ssh"
- conn = FakeConn()
- conn.host = "127.0.0.1"
- conn.delegate = "thishost"
- inject = {
- 'inventory_hostname': "thishost",
- 'ansible_ssh_user': 'vagrant',
- 'ansible_ssh_host': '127.0.0.1',
- 'ansible_ssh_port': '2222',
- 'delegate_to': None,
- 'playbook_dir': '.',
- 'hostvars': {
- 'thishost': {
- 'inventory_hostname': 'thishost',
- 'ansible_ssh_port': '2222',
- 'ansible_ssh_host': '127.0.0.1',
- 'ansible_ssh_user': 'vagrant'
- }
- }
- }
-
- x = Synchronize(runner)
- x.setup("synchronize", inject)
- x.run(conn, "/tmp", "synchronize", "src=/tmp/foo dest=/tmp/bar", inject)
-
- assert runner.transport == "ssh", "runner transport was changed"
- assert runner.remote_user == "jtanner", "runner remote_user was changed"
- assert runner.executed_inject['delegate_to'] == "127.0.0.1", "was not delegated to 127.0.0.1"
- assert runner.executed_inject['ansible_ssh_user'] == "vagrant", "runner user was changed"
- assert runner.executed_complex_args.get("dest_port") == "2222", "remote port was not set to 2222"
- assert runner.executed_complex_args.get("src") == "/tmp/foo", "source was set incorrectly"
- assert runner.executed_complex_args.get("dest") == "vagrant@127.0.0.1:/tmp/bar", "dest was set incorrectly"
-
diff --git a/test/units/TestUtils.py b/test/units/TestUtils.py
deleted file mode 100644
index c0ca9ba538..0000000000
--- a/test/units/TestUtils.py
+++ /dev/null
@@ -1,945 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import traceback
-import unittest
-import os
-import os.path
-import re
-import tempfile
-import yaml
-import passlib.hash
-import string
-import StringIO
-import copy
-import tempfile
-import shutil
-
-from nose.plugins.skip import SkipTest
-from mock import patch
-
-import ansible.utils
-import ansible.errors
-import ansible.constants as C
-import ansible.utils.template as template2
-from ansible.module_utils.splitter import split_args
-
-from ansible import __version__
-
-import sys
-reload(sys)
-sys.setdefaultencoding("utf8")
-
-class TestUtils(unittest.TestCase):
-
- def _is_fips(self):
- try:
- data = open('/proc/sys/crypto/fips_enabled').read().strip()
- except:
- return False
- if data != '1':
- return False
- return True
-
- def test_before_comment(self):
- ''' see if we can detect the part of a string before a comment. Used by INI parser in inventory '''
-
- input = "before # comment"
- expected = "before "
- actual = ansible.utils.before_comment(input)
- self.assertEqual(expected, actual)
-
- input = "before \# not a comment"
- expected = "before # not a comment"
- actual = ansible.utils.before_comment(input)
- self.assertEqual(expected, actual)
-
- input = ""
- expected = ""
- actual = ansible.utils.before_comment(input)
- self.assertEqual(expected, actual)
-
- input = "#"
- expected = ""
- actual = ansible.utils.before_comment(input)
- self.assertEqual(expected, actual)
-
- #####################################
- ### check_conditional tests
-
- def test_check_conditional_jinja2_literals(self):
- # see http://jinja.pocoo.org/docs/templates/#literals
-
- # none
- self.assertEqual(ansible.utils.check_conditional(
- None, '/', {}), True)
- self.assertEqual(ansible.utils.check_conditional(
- '', '/', {}), True)
-
- # list
- self.assertEqual(ansible.utils.check_conditional(
- ['true'], '/', {}), True)
- self.assertEqual(ansible.utils.check_conditional(
- ['false'], '/', {}), False)
-
- # non basestring or list
- self.assertEqual(ansible.utils.check_conditional(
- {}, '/', {}), {})
-
- # boolean
- self.assertEqual(ansible.utils.check_conditional(
- 'true', '/', {}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'false', '/', {}), False)
- self.assertEqual(ansible.utils.check_conditional(
- 'True', '/', {}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'False', '/', {}), False)
-
- # integer
- self.assertEqual(ansible.utils.check_conditional(
- '1', '/', {}), True)
- self.assertEqual(ansible.utils.check_conditional(
- '0', '/', {}), False)
-
- # string, beware, a string is truthy unless empty
- self.assertEqual(ansible.utils.check_conditional(
- '"yes"', '/', {}), True)
- self.assertEqual(ansible.utils.check_conditional(
- '"no"', '/', {}), True)
- self.assertEqual(ansible.utils.check_conditional(
- '""', '/', {}), False)
-
-
- def test_check_conditional_jinja2_variable_literals(self):
- # see http://jinja.pocoo.org/docs/templates/#literals
-
- # boolean
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': 'True'}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': 'true'}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': 'False'}), False)
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': 'false'}), False)
-
- # integer
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': '1'}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': 1}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': '0'}), False)
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': 0}), False)
-
- # string, beware, a string is truthy unless empty
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': '"yes"'}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': '"no"'}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': '""'}), False)
-
- # Python boolean in Jinja2 expression
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': True}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': False}), False)
-
-
- def test_check_conditional_jinja2_expression(self):
- self.assertEqual(ansible.utils.check_conditional(
- '1 == 1', '/', {}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'bar == 42', '/', {'bar': 42}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'bar != 42', '/', {'bar': 42}), False)
-
-
- def test_check_conditional_jinja2_expression_in_variable(self):
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': '1 == 1'}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': 'bar == 42', 'bar': 42}), True)
- self.assertEqual(ansible.utils.check_conditional(
- 'var', '/', {'var': 'bar != 42', 'bar': 42}), False)
-
- def test_check_conditional_jinja2_unicode(self):
- self.assertEqual(ansible.utils.check_conditional(
- u'"\u00df"', '/', {}), True)
- self.assertEqual(ansible.utils.check_conditional(
- u'var == "\u00df"', '/', {'var': u'\u00df'}), True)
-
-
- #####################################
- ### key-value parsing
-
- def test_parse_kv_basic(self):
- self.assertEqual(ansible.utils.parse_kv('a=simple b="with space" c="this=that"'),
- {'a': 'simple', 'b': 'with space', 'c': 'this=that'})
- self.assertEqual(ansible.utils.parse_kv('msg=АБВГД'),
- {'msg': 'АБВГД'})
-
-
- def test_jsonify(self):
- self.assertEqual(ansible.utils.jsonify(None), '{}')
- self.assertEqual(ansible.utils.jsonify(dict(foo='bar', baz=['qux'])), '{"baz": ["qux"], "foo": "bar"}')
- expected = u'{"baz":["qux"],"foo":"bar"}'
- self.assertEqual("".join(ansible.utils.jsonify(dict(foo='bar', baz=['qux']), format=True).split()), expected)
-
- def test_is_failed(self):
- self.assertEqual(ansible.utils.is_failed(dict(rc=0)), False)
- self.assertEqual(ansible.utils.is_failed(dict(rc=1)), True)
- self.assertEqual(ansible.utils.is_failed(dict()), False)
- self.assertEqual(ansible.utils.is_failed(dict(failed=False)), False)
- self.assertEqual(ansible.utils.is_failed(dict(failed=True)), True)
- self.assertEqual(ansible.utils.is_failed(dict(failed='True')), True)
- self.assertEqual(ansible.utils.is_failed(dict(failed='true')), True)
-
- def test_is_changed(self):
- self.assertEqual(ansible.utils.is_changed(dict()), False)
- self.assertEqual(ansible.utils.is_changed(dict(changed=False)), False)
- self.assertEqual(ansible.utils.is_changed(dict(changed=True)), True)
- self.assertEqual(ansible.utils.is_changed(dict(changed='True')), True)
- self.assertEqual(ansible.utils.is_changed(dict(changed='true')), True)
-
- def test_path_dwim(self):
- self.assertEqual(ansible.utils.path_dwim(None, __file__),
- __file__)
- self.assertEqual(ansible.utils.path_dwim(None, '~'),
- os.path.expanduser('~'))
- self.assertEqual(ansible.utils.path_dwim(None, 'TestUtils.py'),
- __file__.rstrip('c'))
-
- def test_path_dwim_relative(self):
- self.assertEqual(ansible.utils.path_dwim_relative(__file__, 'units', 'TestUtils.py',
- os.path.dirname(os.path.dirname(__file__))),
- __file__.rstrip('c'))
-
- def test_json_loads(self):
- self.assertEqual(ansible.utils.json_loads('{"foo": "bar"}'), dict(foo='bar'))
-
- def test_parse_json(self):
- # leading junk
- self.assertEqual(ansible.utils.parse_json('ansible\n{"foo": "bar"}'), dict(foo="bar"))
-
- # No closing quotation
- try:
- rc = ansible.utils.parse_json('foo=bar "')
- print rc
- except ValueError:
- pass
- else:
- traceback.print_exc()
- raise AssertionError('Incorrect exception, expected ValueError')
-
- # Failed to parse
- try:
- ansible.utils.parse_json('{')
- except ValueError:
- pass
- else:
- raise AssertionError('Incorrect exception, expected ValueError')
-
- def test_parse_yaml(self):
- #json
- self.assertEqual(ansible.utils.parse_yaml('{"foo": "bar"}'), dict(foo='bar'))
-
- # broken json
- try:
- ansible.utils.parse_yaml('{')
- except ansible.errors.AnsibleError:
- pass
- else:
- raise AssertionError
-
- # broken json with path_hint
- try:
- ansible.utils.parse_yaml('{', path_hint='foo')
- except ansible.errors.AnsibleError:
- pass
- else:
- raise AssertionError
-
- # yaml with front-matter
- self.assertEqual(ansible.utils.parse_yaml("---\nfoo: bar"), dict(foo='bar'))
- # yaml no front-matter
- self.assertEqual(ansible.utils.parse_yaml('foo: bar'), dict(foo='bar'))
- # yaml indented first line (See #6348)
- self.assertEqual(ansible.utils.parse_yaml(' - foo: bar\n baz: qux'), [dict(foo='bar', baz='qux')])
-
- def test_process_common_errors(self):
- # no quote
- self.assertTrue('YAML thought it' in ansible.utils.process_common_errors('', 'foo: {{bar}}', 6))
-
- # extra colon
- self.assertTrue('an extra unquoted colon' in ansible.utils.process_common_errors('', 'foo: bar:', 8))
-
- # match
- self.assertTrue('same kind of quote' in ansible.utils.process_common_errors('', 'foo: "{{bar}}"baz', 6))
- self.assertTrue('same kind of quote' in ansible.utils.process_common_errors('', "foo: '{{bar}}'baz", 6))
-
- # unbalanced
- self.assertTrue('We could be wrong' in ansible.utils.process_common_errors('', 'foo: "bad" "wolf"', 6))
- self.assertTrue('We could be wrong' in ansible.utils.process_common_errors('', "foo: 'bad' 'wolf'", 6))
-
-
- def test_process_yaml_error(self):
- data = 'foo: bar\n baz: qux'
- try:
- ansible.utils.parse_yaml(data)
- except yaml.YAMLError, exc:
- try:
- ansible.utils.process_yaml_error(exc, data, __file__)
- except ansible.errors.AnsibleYAMLValidationFailed, e:
- self.assertTrue('Syntax Error while loading' in str(e))
- else:
- raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
-
- data = 'foo: bar\n baz: {{qux}}'
- try:
- ansible.utils.parse_yaml(data)
- except yaml.YAMLError, exc:
- try:
- ansible.utils.process_yaml_error(exc, data, __file__)
- except ansible.errors.AnsibleYAMLValidationFailed, e:
- self.assertTrue('Syntax Error while loading' in str(e))
- else:
- raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
-
- data = '\xFF'
- try:
- ansible.utils.parse_yaml(data)
- except yaml.YAMLError, exc:
- try:
- ansible.utils.process_yaml_error(exc, data, __file__)
- except ansible.errors.AnsibleYAMLValidationFailed, e:
- self.assertTrue('Check over' in str(e))
- else:
- raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
-
- data = '\xFF'
- try:
- ansible.utils.parse_yaml(data)
- except yaml.YAMLError, exc:
- try:
- ansible.utils.process_yaml_error(exc, data, None)
- except ansible.errors.AnsibleYAMLValidationFailed, e:
- self.assertTrue('Could not parse YAML.' in str(e))
- else:
- raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
-
- def test_parse_yaml_from_file(self):
- test = os.path.join(os.path.dirname(__file__), 'inventory_test_data',
- 'common_vars.yml')
- encrypted = os.path.join(os.path.dirname(__file__), 'inventory_test_data',
- 'encrypted.yml')
- broken = os.path.join(os.path.dirname(__file__), 'inventory_test_data',
- 'broken.yml')
-
- try:
- ansible.utils.parse_yaml_from_file(os.path.dirname(__file__))
- except ansible.errors.AnsibleError:
- pass
- else:
- raise AssertionError('Incorrect exception, expected AnsibleError')
-
- self.assertEqual(ansible.utils.parse_yaml_from_file(test), yaml.safe_load(open(test)))
-
- self.assertEqual(ansible.utils.parse_yaml_from_file(encrypted, 'ansible'), dict(foo='bar'))
-
- try:
- ansible.utils.parse_yaml_from_file(broken)
- except ansible.errors.AnsibleYAMLValidationFailed, e:
- self.assertTrue('Syntax Error while loading' in str(e))
- else:
- raise AssertionError('Incorrect exception, expected AnsibleYAMLValidationFailed')
-
- def test_merge_hash(self):
- self.assertEqual(ansible.utils.merge_hash(dict(foo='bar', baz='qux'), dict(foo='baz')),
- dict(foo='baz', baz='qux'))
- self.assertEqual(ansible.utils.merge_hash(dict(foo=dict(bar='baz')), dict(foo=dict(bar='qux'))),
- dict(foo=dict(bar='qux')))
-
- def test_md5s(self):
- if self._is_fips():
- raise SkipTest('MD5 unavailable on FIPs enabled systems')
- self.assertEqual(ansible.utils.md5s('ansible'), '640c8a5376aa12fa15cf02130ce239a6')
- # Need a test that causes UnicodeEncodeError See 4221
-
- def test_md5(self):
- if self._is_fips():
- raise SkipTest('MD5 unavailable on FIPs enabled systems')
- self.assertEqual(ansible.utils.md5(os.path.join(os.path.dirname(__file__), 'ansible.cfg')),
- 'fb7b5b90ea63f04bde33e804b6fad42c')
- self.assertEqual(ansible.utils.md5(os.path.join(os.path.dirname(__file__), 'ansible.cf')),
- None)
-
- def test_checksum_s(self):
- self.assertEqual(ansible.utils.checksum_s('ansible'), 'bef45157a43c9e5f469d188810814a4a8ab9f2ed')
- # Need a test that causes UnicodeEncodeError See 4221
-
- def test_checksum(self):
- self.assertEqual(ansible.utils.checksum(os.path.join(os.path.dirname(__file__), 'ansible.cfg')),
- '658b67c8ac7595adde7048425ff1f9aba270721a')
- self.assertEqual(ansible.utils.checksum(os.path.join(os.path.dirname(__file__), 'ansible.cf')),
- None)
-
- def test_default(self):
- self.assertEqual(ansible.utils.default(None, lambda: {}), {})
- self.assertEqual(ansible.utils.default(dict(foo='bar'), lambda: {}), dict(foo='bar'))
-
- def test__gitinfo(self):
- # this fails if not run from git clone
- # self.assertEqual('last updated' in ansible.utils._gitinfo())
- # missing test for git submodule
- # missing test outside of git clone
- pass
-
- def test_version(self):
- version = ansible.utils.version('ansible')
- self.assertTrue(version.startswith('ansible %s' % __version__))
- # this fails if not run from git clone
- # self.assertEqual('last updated' in version)
-
- def test_getch(self):
- # figure out how to test this
- pass
-
- def test_sanitize_output(self):
- self.assertEqual(ansible.utils.sanitize_output('password=foo'), 'password=VALUE_HIDDEN')
- self.assertEqual(ansible.utils.sanitize_output('foo=user:pass@foo/whatever'),
- 'foo=user:********@foo/whatever')
- self.assertEqual(ansible.utils.sanitize_output('foo=http://username:pass@wherever/foo'),
- 'foo=http://username:********@wherever/foo')
- self.assertEqual(ansible.utils.sanitize_output('foo=http://wherever/foo'),
- 'foo=http://wherever/foo')
-
- def test_increment_debug(self):
- ansible.utils.VERBOSITY = 0
- ansible.utils.increment_debug(None, None, None, None)
- self.assertEqual(ansible.utils.VERBOSITY, 1)
-
- def test_base_parser(self):
- output = ansible.utils.base_parser(output_opts=True)
- self.assertTrue(output.has_option('--one-line') and output.has_option('--tree'))
-
- runas = ansible.utils.base_parser(runas_opts=True)
- for opt in ['--sudo', '--sudo-user', '--user', '--su', '--su-user']:
- self.assertTrue(runas.has_option(opt))
-
- async = ansible.utils.base_parser(async_opts=True)
- self.assertTrue(async.has_option('--poll') and async.has_option('--background'))
-
- connect = ansible.utils.base_parser(connect_opts=True)
- self.assertTrue(connect.has_option('--connection'))
-
- subset = ansible.utils.base_parser(subset_opts=True)
- self.assertTrue(subset.has_option('--limit'))
-
- check = ansible.utils.base_parser(check_opts=True)
- self.assertTrue(check.has_option('--check'))
-
- diff = ansible.utils.base_parser(diff_opts=True)
- self.assertTrue(diff.has_option('--diff'))
-
- def test_do_encrypt(self):
- salt_chars = string.ascii_letters + string.digits + './'
- salt = ansible.utils.random_password(length=8, chars=salt_chars)
- hash = ansible.utils.do_encrypt('ansible', 'sha256_crypt', salt=salt)
- self.assertTrue(passlib.hash.sha256_crypt.verify('ansible', hash))
-
- hash = ansible.utils.do_encrypt('ansible', 'sha256_crypt')
- self.assertTrue(passlib.hash.sha256_crypt.verify('ansible', hash))
-
- try:
- ansible.utils.do_encrypt('ansible', 'ansible')
- except ansible.errors.AnsibleError:
- pass
- else:
- raise AssertionError('Incorrect exception, expected AnsibleError')
-
- def test_do_encrypt_md5(self):
- if self._is_fips():
- raise SkipTest('MD5 unavailable on FIPS systems')
- hash = ansible.utils.do_encrypt('ansible', 'md5_crypt', salt_size=4)
- self.assertTrue(passlib.hash.md5_crypt.verify('ansible', hash))
-
- def test_last_non_blank_line(self):
- self.assertEqual(ansible.utils.last_non_blank_line('a\n\nb\n\nc'), 'c')
- self.assertEqual(ansible.utils.last_non_blank_line(''), '')
-
- def test_filter_leading_non_json_lines(self):
- self.assertEqual(ansible.utils.filter_leading_non_json_lines('a\nb\nansible!\n{"foo": "bar"}'),
- '{"foo": "bar"}\n')
- self.assertEqual(ansible.utils.filter_leading_non_json_lines('a\nb\nansible!\n["foo", "bar"]'),
- '["foo", "bar"]\n')
-
- def test_boolean(self):
- self.assertEqual(ansible.utils.boolean("true"), True)
- self.assertEqual(ansible.utils.boolean("True"), True)
- self.assertEqual(ansible.utils.boolean("TRUE"), True)
- self.assertEqual(ansible.utils.boolean("t"), True)
- self.assertEqual(ansible.utils.boolean("T"), True)
- self.assertEqual(ansible.utils.boolean("Y"), True)
- self.assertEqual(ansible.utils.boolean("y"), True)
- self.assertEqual(ansible.utils.boolean("1"), True)
- self.assertEqual(ansible.utils.boolean(1), True)
- self.assertEqual(ansible.utils.boolean("false"), False)
- self.assertEqual(ansible.utils.boolean("False"), False)
- self.assertEqual(ansible.utils.boolean("0"), False)
- self.assertEqual(ansible.utils.boolean(0), False)
- self.assertEqual(ansible.utils.boolean("foo"), False)
-
- def test_make_sudo_cmd(self):
- cmd = ansible.utils.make_sudo_cmd(C.DEFAULT_SUDO_EXE, 'root', '/bin/sh', '/bin/ls')
- self.assertTrue(isinstance(cmd, tuple))
- self.assertEqual(len(cmd), 3)
- self.assertTrue('-u root' in cmd[0])
- self.assertTrue('-p "[sudo via ansible, key=' in cmd[0] and cmd[1].startswith('[sudo via ansible, key'))
- self.assertTrue('echo BECOME-SUCCESS-' in cmd[0] and cmd[2].startswith('BECOME-SUCCESS-'))
- self.assertTrue('sudo -k' in cmd[0])
-
- def test_make_su_cmd(self):
- cmd = ansible.utils.make_su_cmd('root', '/bin/sh', '/bin/ls')
- self.assertTrue(isinstance(cmd, tuple))
- self.assertEqual(len(cmd), 3)
- self.assertTrue('root -c "/bin/sh' in cmd[0] or ' root -c /bin/sh' in cmd[0])
- self.assertTrue('echo BECOME-SUCCESS-' in cmd[0] and cmd[2].startswith('BECOME-SUCCESS-'))
-
- def test_to_unicode(self):
- uni = ansible.utils.unicode.to_unicode(u'ansible')
- self.assertTrue(isinstance(uni, unicode))
- self.assertEqual(uni, u'ansible')
-
- none = ansible.utils.unicode.to_unicode(None, nonstring='passthru')
- self.assertTrue(isinstance(none, type(None)))
- self.assertTrue(none is None)
-
- utf8 = ansible.utils.unicode.to_unicode('ansible')
- self.assertTrue(isinstance(utf8, unicode))
- self.assertEqual(utf8, u'ansible')
-
- def test_is_list_of_strings(self):
- self.assertEqual(ansible.utils.is_list_of_strings(['foo', 'bar', u'baz']), True)
- self.assertEqual(ansible.utils.is_list_of_strings(['foo', 'bar', True]), False)
- self.assertEqual(ansible.utils.is_list_of_strings(['one', 2, 'three']), False)
-
- def test_contains_vars(self):
- self.assertTrue(ansible.utils.contains_vars('{{foo}}'))
- self.assertTrue(ansible.utils.contains_vars('$foo'))
- self.assertFalse(ansible.utils.contains_vars('foo'))
-
- def test_safe_eval(self):
- # Not basestring
- self.assertEqual(ansible.utils.safe_eval(len), len)
- self.assertEqual(ansible.utils.safe_eval(1), 1)
- self.assertEqual(ansible.utils.safe_eval(len, include_exceptions=True), (len, None))
- self.assertEqual(ansible.utils.safe_eval(1, include_exceptions=True), (1, None))
-
- # module
- self.assertEqual(ansible.utils.safe_eval('foo.bar('), 'foo.bar(')
- self.assertEqual(ansible.utils.safe_eval('foo.bar(', include_exceptions=True), ('foo.bar(', None))
-
- # import
- self.assertEqual(ansible.utils.safe_eval('import foo'), 'import foo')
- self.assertEqual(ansible.utils.safe_eval('import foo', include_exceptions=True), ('import foo', None))
-
- # valid simple eval
- self.assertEqual(ansible.utils.safe_eval('True'), True)
- self.assertEqual(ansible.utils.safe_eval('True', include_exceptions=True), (True, None))
-
- # valid eval with lookup
- self.assertEqual(ansible.utils.safe_eval('foo + bar', dict(foo=1, bar=2)), 3)
- self.assertEqual(ansible.utils.safe_eval('foo + bar', dict(foo=1, bar=2), include_exceptions=True), (3, None))
-
- # invalid eval
- self.assertEqual(ansible.utils.safe_eval('foo'), 'foo')
- nameerror = ansible.utils.safe_eval('foo', include_exceptions=True)
- self.assertTrue(isinstance(nameerror, tuple))
- self.assertEqual(nameerror[0], 'foo')
- self.assertTrue(isinstance(nameerror[1], NameError))
-
- def test_listify_lookup_plugin_terms(self):
- basedir = os.path.dirname(__file__)
- # Straight lookups
- #self.assertEqual(ansible.utils.listify_lookup_plugin_terms('things', basedir, dict(things=[])), [])
- #self.assertEqual(ansible.utils.listify_lookup_plugin_terms('things', basedir, dict(things=['one', 'two'])), ['one', 'two'])
-
- def test_deprecated(self):
- sys_stderr = sys.stderr
- sys.stderr = StringIO.StringIO()
- ansible.utils.deprecated('Ack!', '0.0')
- out = sys.stderr.getvalue()
- self.assertTrue('0.0' in out)
- self.assertTrue('[DEPRECATION WARNING]' in out)
-
- sys.stderr = StringIO.StringIO()
- ansible.utils.deprecated('Ack!', None)
- out = sys.stderr.getvalue()
- self.assertTrue('0.0' not in out)
- self.assertTrue('[DEPRECATION WARNING]' in out)
-
- sys.stderr = StringIO.StringIO()
- warnings = C.DEPRECATION_WARNINGS
- C.DEPRECATION_WARNINGS = False
- ansible.utils.deprecated('Ack!', None)
- out = sys.stderr.getvalue()
- self.assertTrue(not out)
- C.DEPRECATION_WARNINGS = warnings
-
- sys.stderr = sys_stderr
-
- try:
- ansible.utils.deprecated('Ack!', '0.0', True)
- except ansible.errors.AnsibleError, e:
- self.assertTrue('0.0' not in str(e))
- self.assertTrue('[DEPRECATED]' in str(e))
- else:
- raise AssertionError("Incorrect exception, expected AnsibleError")
-
- def test_warning(self):
- sys_stderr = sys.stderr
- sys.stderr = StringIO.StringIO()
- ansible.utils.warning('ANSIBLE')
- out = sys.stderr.getvalue()
- sys.stderr = sys_stderr
- self.assertTrue('[WARNING]: ANSIBLE' in out)
-
- def test_combine_vars(self):
- one = {'foo': {'bar': True}, 'baz': {'one': 'qux'}}
- two = {'baz': {'two': 'qux'}}
- replace = {'baz': {'two': 'qux'}, 'foo': {'bar': True}}
- merge = {'baz': {'two': 'qux', 'one': 'qux'}, 'foo': {'bar': True}}
-
- C.DEFAULT_HASH_BEHAVIOUR = 'replace'
- self.assertEqual(ansible.utils.combine_vars(one, two), replace)
-
- C.DEFAULT_HASH_BEHAVIOUR = 'merge'
- self.assertEqual(ansible.utils.combine_vars(one, two), merge)
-
- def test_err(self):
- sys_stderr = sys.stderr
- sys.stderr = StringIO.StringIO()
- ansible.utils.err('ANSIBLE')
- out = sys.stderr.getvalue()
- sys.stderr = sys_stderr
- self.assertEqual(out, 'ANSIBLE\n')
-
- def test_exit(self):
- sys_stderr = sys.stderr
- sys.stderr = StringIO.StringIO()
- try:
- ansible.utils.exit('ansible')
- except SystemExit, e:
- self.assertEqual(e.code, 1)
- self.assertEqual(sys.stderr.getvalue(), 'ansible\n')
- else:
- raise AssertionError('Incorrect exception, expected SystemExit')
- finally:
- sys.stderr = sys_stderr
-
- def test_unfrackpath(self):
- os.environ['TEST_ROOT'] = os.path.dirname(os.path.dirname(__file__))
- self.assertEqual(ansible.utils.unfrackpath('$TEST_ROOT/units/../units/TestUtils.py'), __file__.rstrip('c'))
-
- def test_is_executable(self):
- self.assertEqual(ansible.utils.is_executable(__file__), 0)
-
- bin_ansible = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
- 'bin', 'ansible')
- self.assertNotEqual(ansible.utils.is_executable(bin_ansible), 0)
-
- def test_get_diff(self):
- standard = dict(
- before_header='foo',
- after_header='bar',
- before='fooo',
- after='foo'
- )
-
- standard_expected = """--- before: foo
-+++ after: bar
-@@ -1 +1 @@
--fooo+foo"""
-
- # workaround py26 and py27 difflib differences
- standard_expected = """-fooo+foo"""
- diff = ansible.utils.get_diff(standard)
- diff = diff.split('\n')
- del diff[0]
- del diff[0]
- del diff[0]
- diff = '\n'.join(diff)
- self.assertEqual(diff, unicode(standard_expected))
-
- def test_split_args(self):
- # split_args is a smarter shlex.split for the needs of the way ansible uses it
-
- def _split_info(input, desired, actual):
- print "SENT: ", input
- print "WANT: ", desired
- print "GOT: ", actual
-
- def _test_combo(input, desired):
- actual = split_args(input)
- _split_info(input, desired, actual)
- assert actual == desired
-
- # trivial splitting
- _test_combo('a b=c d=f', ['a', 'b=c', 'd=f' ])
-
- # mixed quotes
- _test_combo('a b=\'c\' d="e" f=\'g\'', ['a', "b='c'", 'd="e"', "f='g'" ])
-
- # with spaces
- # FIXME: this fails, commenting out only for now
- # _test_combo('a "\'one two three\'"', ['a', "'one two three'" ])
-
- # TODO: ...
- # jinja2 preservation
- _test_combo('a {{ y }} z', ['a', '{{ y }}', 'z' ])
-
- # jinja2 preservation with spaces and filters and other hard things
- _test_combo(
- 'a {{ x | filter(\'moo\', \'param\') }} z {{ chicken }} "waffles"',
- ['a', "{{ x | filter('moo', 'param') }}", 'z', '{{ chicken }}', '"waffles"']
- )
-
- # invalid quote detection
- self.assertRaises(Exception, split_args, 'hey I started a quote"')
- self.assertRaises(Exception, split_args, 'hey I started a\' quote')
-
- # jinja2 loop blocks with lots of complexity
- _test_combo(
- # in memory of neighbors cat
- # we preserve line breaks unless a line continuation character precedes them
- 'a {% if x %} y {%else %} {{meow}} {% endif %} "cookie\nchip" \\\ndone\nand done',
- ['a', '{% if x %}', 'y', '{%else %}', '{{meow}}', '{% endif %}', '"cookie\nchip"', 'done\n', 'and', 'done']
- )
-
- # test space preservation within quotes
- _test_combo(
- 'content="1 2 3 4 " foo=bar',
- ['content="1 2 3 4 "', 'foo=bar']
- )
-
- # invalid jinja2 nesting detection
- # invalid quote nesting detection
-
- def test_clean_data(self):
- # clean data removes jinja2 tags from data
- self.assertEqual(
- ansible.utils._clean_data('this is a normal string', from_remote=True),
- 'this is a normal string'
- )
- self.assertEqual(
- ansible.utils._clean_data('this string has a {{variable}}', from_remote=True),
- 'this string has a {#variable#}'
- )
- self.assertEqual(
- ansible.utils._clean_data('this string {{has}} two {{variables}} in it', from_remote=True),
- 'this string {#has#} two {#variables#} in it'
- )
- self.assertEqual(
- ansible.utils._clean_data('this string has a {{variable with a\nnewline}}', from_remote=True),
- 'this string has a {#variable with a\nnewline#}'
- )
- self.assertEqual(
- ansible.utils._clean_data('this string is from inventory {{variable}}', from_inventory=True),
- 'this string is from inventory {{variable}}'
- )
- self.assertEqual(
- ansible.utils._clean_data('this string is from inventory too but uses lookup {{lookup("foo","bar")}}', from_inventory=True),
- 'this string is from inventory too but uses lookup {#lookup("foo","bar")#}'
- )
- self.assertEqual(
- ansible.utils._clean_data('this string has JSON in it: {"foo":{"bar":{"baz":"oops"}}}', from_remote=True),
- 'this string has JSON in it: {"foo":{"bar":{"baz":"oops"}}}'
- )
- self.assertEqual(
- ansible.utils._clean_data('this string contains unicode: ¢ £ ¤ ¥', from_remote=True),
- 'this string contains unicode: ¢ £ ¤ ¥'
- )
-
-
- def test_censor_unlogged_data(self):
- ''' used by the no_log attribute '''
- input = dict(
- password='sekrit',
- rc=12,
- failed=True,
- changed=False,
- skipped=True,
- msg='moo',
- )
- data = ansible.utils.censor_unlogged_data(input)
- assert 'password' not in data
- assert 'rc' in data
- assert 'failed' in data
- assert 'changed' in data
- assert 'skipped' in data
- assert 'msg' not in data
- assert data['censored'] == 'results hidden due to no_log parameter'
-
- def test_repo_url_to_role_name(self):
- tests = [("http://git.example.com/repos/repo.git", "repo"),
- ("ssh://git@git.example.com:repos/role-name", "role-name"),
- ("ssh://git@git.example.com:repos/role-name,v0.1", "role-name"),
- ("directory/role/is/installed/in", "directory/role/is/installed/in")]
- for (url, result) in tests:
- self.assertEqual(ansible.utils.repo_url_to_role_name(url), result)
-
- def test_role_spec_parse(self):
- tests = [
- (
- "git+http://git.example.com/repos/repo.git,v1.0",
- {
- 'scm': 'git',
- 'src': 'http://git.example.com/repos/repo.git',
- 'version': 'v1.0',
- 'name': 'repo'
- }
- ),
- (
- "http://repo.example.com/download/tarfile.tar.gz",
- {
- 'scm': None,
- 'src': 'http://repo.example.com/download/tarfile.tar.gz',
- 'version': '',
- 'name': 'tarfile'
- }
- ),
- (
- "http://repo.example.com/download/tarfile.tar.gz,,nicename",
- {
- 'scm': None,
- 'src': 'http://repo.example.com/download/tarfile.tar.gz',
- 'version': '',
- 'name': 'nicename'
- }
- ),
- (
- "git+http://git.example.com/repos/repo.git,v1.0,awesome",
- {
- 'scm': 'git',
- 'src': 'http://git.example.com/repos/repo.git',
- 'version': 'v1.0',
- 'name': 'awesome'
- }
- ),
- (
- # test that http://github URLs are assumed git+http:// unless they end in .tar.gz
- "http://github.com/ansible/fakerole/fake",
- {
- 'scm' : 'git',
- 'src' : 'http://github.com/ansible/fakerole/fake',
- 'version' : 'master',
- 'name' : 'fake'
- }
- ),
- (
- # test that http://github URLs are assumed git+http:// unless they end in .tar.gz
- "http://github.com/ansible/fakerole/fake/archive/master.tar.gz",
- {
- 'scm' : None,
- 'src' : 'http://github.com/ansible/fakerole/fake/archive/master.tar.gz',
- 'version' : '',
- 'name' : 'master'
- }
- )
- ]
- for (spec, result) in tests:
- self.assertEqual(ansible.utils.role_spec_parse(spec), result)
-
- def test_role_yaml_parse(self):
- tests = (
- (
- # Old style
- {
- 'role': 'debops.elasticsearch',
- 'name': 'elks'
- },
- {
- 'role': 'debops.elasticsearch',
- 'name': 'elks',
- 'scm': None,
- 'src': 'debops.elasticsearch',
- 'version': '',
- }
- ),
- (
- {
- 'role': 'debops.elasticsearch,1.0,elks',
- 'my_param': 'foo'
- },
- {
- 'role': 'debops.elasticsearch,1.0,elks',
- 'name': 'elks',
- 'scm': None,
- 'src': 'debops.elasticsearch',
- 'version': '1.0',
- 'my_param': 'foo',
- }
- ),
- (
- {
- 'role': 'debops.elasticsearch,1.0',
- 'my_param': 'foo'
- },
- {
- 'role': 'debops.elasticsearch,1.0',
- 'name': 'debops.elasticsearch',
- 'scm': None,
- 'src': 'debops.elasticsearch',
- 'version': '1.0',
- 'my_param': 'foo',
- }
- ),
- # New style
- (
- {
- 'src': 'debops.elasticsearch',
- 'name': 'elks',
- 'my_param': 'foo'
- },
- {
- 'name': 'elks',
- 'scm': None,
- 'src': 'debops.elasticsearch',
- 'version': '',
- 'my_param': 'foo'
- }
- ),
- )
-
- for (role, result) in tests:
- self.assertEqual(ansible.utils.role_yaml_parse(role), result)
-
- @patch('ansible.utils.plugins.module_finder._get_paths')
- def test_find_plugin(self, mock_get_paths):
-
- tmp_path = tempfile.mkdtemp()
- mock_get_paths.return_value = [tmp_path,]
- right_module_1 = 'module.py'
- right_module_2 = 'module_without_extension'
- wrong_module_1 = 'folder'
- wrong_module_2 = 'inexistent'
- path_right_module_1 = os.path.join(tmp_path, right_module_1)
- path_right_module_2 = os.path.join(tmp_path, right_module_2)
- path_wrong_module_1 = os.path.join(tmp_path, wrong_module_1)
- open(path_right_module_1, 'w').close()
- open(path_right_module_2, 'w').close()
- os.mkdir(path_wrong_module_1)
-
- self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(right_module_1),
- path_right_module_1)
- self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(right_module_2),
- path_right_module_2)
- self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(wrong_module_1),
- None)
- self.assertEqual(ansible.utils.plugins.module_finder.find_plugin(wrong_module_2),
- None)
-
- shutil.rmtree(tmp_path)
diff --git a/test/units/TestUtilsStringFunctions.py b/test/units/TestUtilsStringFunctions.py
deleted file mode 100644
index cccedf280d..0000000000
--- a/test/units/TestUtilsStringFunctions.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-import os
-import os.path
-import tempfile
-import yaml
-import passlib.hash
-import string
-import StringIO
-import copy
-
-from nose.plugins.skip import SkipTest
-
-from ansible.utils import string_functions
-import ansible.errors
-import ansible.constants as C
-import ansible.utils.template as template2
-
-from ansible import __version__
-
-import sys
-reload(sys)
-sys.setdefaultencoding("utf8")
-
-class TestUtilsStringFunctions(unittest.TestCase):
- def test_isprintable(self):
- self.assertFalse(string_functions.isprintable(chr(7)))
- self.assertTrue(string_functions.isprintable('hello'))
-
- def test_count_newlines_from_end(self):
- self.assertEqual(string_functions.count_newlines_from_end('foo\n\n\n\n'), 4)
- self.assertEqual(string_functions.count_newlines_from_end('\nfoo'), 0)
diff --git a/test/units/TestVaultEditor.py b/test/units/TestVaultEditor.py
deleted file mode 100644
index cfa5bc13e6..0000000000
--- a/test/units/TestVaultEditor.py
+++ /dev/null
@@ -1,180 +0,0 @@
-#!/usr/bin/env python
-
-from unittest import TestCase
-import getpass
-import os
-import shutil
-import time
-import tempfile
-from binascii import unhexlify
-from binascii import hexlify
-from nose.plugins.skip import SkipTest
-
-from ansible import errors
-from ansible.utils.vault import VaultLib
-from ansible.utils.vault import VaultEditor
-
-# Counter import fails for 2.0.1, requires >= 2.6.1 from pip
-try:
- from Crypto.Util import Counter
- HAS_COUNTER = True
-except ImportError:
- HAS_COUNTER = False
-
-# KDF import fails for 2.0.1, requires >= 2.6.1 from pip
-try:
- from Crypto.Protocol.KDF import PBKDF2
- HAS_PBKDF2 = True
-except ImportError:
- HAS_PBKDF2 = False
-
-# AES IMPORTS
-try:
- from Crypto.Cipher import AES as AES
- HAS_AES = True
-except ImportError:
- HAS_AES = False
-
-class TestVaultEditor(TestCase):
-
- def _is_fips(self):
- try:
- data = open('/proc/sys/crypto/fips_enabled').read().strip()
- except:
- return False
- if data != '1':
- return False
- return True
-
- def test_methods_exist(self):
- v = VaultEditor(None, None, None)
- slots = ['create_file',
- 'decrypt_file',
- 'edit_file',
- 'encrypt_file',
- 'rekey_file',
- 'read_data',
- 'write_data',
- 'shuffle_files']
- for slot in slots:
- assert hasattr(v, slot), "VaultLib is missing the %s method" % slot
-
- def test_decrypt_1_0(self):
- if self._is_fips():
- raise SkipTest('Vault-1.0 will not function on FIPS enabled systems')
- if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
- raise SkipTest
- dirpath = tempfile.mkdtemp()
- filename = os.path.join(dirpath, "foo-ansible-1.0.yml")
- shutil.rmtree(dirpath)
- shutil.copytree("vault_test_data", dirpath)
- ve = VaultEditor(None, "ansible", filename)
-
- # make sure the password functions for the cipher
- error_hit = False
- try:
- ve.decrypt_file()
- except errors.AnsibleError, e:
- error_hit = True
-
- # verify decrypted content
- f = open(filename, "rb")
- fdata = f.read()
- f.close()
-
- shutil.rmtree(dirpath)
- assert error_hit == False, "error decrypting 1.0 file"
- assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip()
-
- def test_decrypt_1_1_newline(self):
- if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
- raise SkipTest
- dirpath = tempfile.mkdtemp()
- filename = os.path.join(dirpath, "foo-ansible-1.1-ansible-newline-ansible.yml")
- shutil.rmtree(dirpath)
- shutil.copytree("vault_test_data", dirpath)
- ve = VaultEditor(None, "ansible\nansible\n", filename)
-
- # make sure the password functions for the cipher
- error_hit = False
- try:
- ve.decrypt_file()
- except errors.AnsibleError, e:
- error_hit = True
-
- # verify decrypted content
- f = open(filename, "rb")
- fdata = f.read()
- f.close()
-
- shutil.rmtree(dirpath)
- assert error_hit == False, "error decrypting 1.1 file with newline in password"
- #assert fdata.strip() == "foo", "incorrect decryption of 1.1 file: %s" % fdata.strip()
-
-
- def test_decrypt_1_1(self):
- if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
- raise SkipTest
- dirpath = tempfile.mkdtemp()
- filename = os.path.join(dirpath, "foo-ansible-1.1.yml")
- shutil.rmtree(dirpath)
- shutil.copytree("vault_test_data", dirpath)
- ve = VaultEditor(None, "ansible", filename)
-
- # make sure the password functions for the cipher
- error_hit = False
- try:
- ve.decrypt_file()
- except errors.AnsibleError, e:
- error_hit = True
-
- # verify decrypted content
- f = open(filename, "rb")
- fdata = f.read()
- f.close()
-
- shutil.rmtree(dirpath)
- assert error_hit == False, "error decrypting 1.1 file"
- assert fdata.strip() == "foo", "incorrect decryption of 1.1 file: %s" % fdata.strip()
-
-
- def test_rekey_migration(self):
- if self._is_fips():
- raise SkipTest('Vault-1.0 will not function on FIPS enabled systems')
- if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
- raise SkipTest
- dirpath = tempfile.mkdtemp()
- filename = os.path.join(dirpath, "foo-ansible-1.0.yml")
- shutil.rmtree(dirpath)
- shutil.copytree("vault_test_data", dirpath)
- ve = VaultEditor(None, "ansible", filename)
-
- # make sure the password functions for the cipher
- error_hit = False
- try:
- ve.rekey_file('ansible2')
- except errors.AnsibleError, e:
- error_hit = True
-
- # verify decrypted content
- f = open(filename, "rb")
- fdata = f.read()
- f.close()
-
- shutil.rmtree(dirpath)
- assert error_hit == False, "error rekeying 1.0 file to 1.1"
-
- # ensure filedata can be decrypted, is 1.1 and is AES256
- vl = VaultLib("ansible2")
- dec_data = None
- error_hit = False
- try:
- dec_data = vl.decrypt(fdata)
- except errors.AnsibleError, e:
- error_hit = True
-
- assert vl.cipher_name == "AES256", "wrong cipher name set after rekey: %s" % vl.cipher_name
- assert error_hit == False, "error decrypting migrated 1.0 file"
- assert dec_data.strip() == "foo", "incorrect decryption of rekeyed/migrated file: %s" % dec_data
-
-
diff --git a/test/units/__init__.py b/test/units/__init__.py
new file mode 100644
index 0000000000..e7489db6fb
--- /dev/null
+++ b/test/units/__init__.py
@@ -0,0 +1,5 @@
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
diff --git a/test/units/ansible.cfg b/test/units/ansible.cfg
deleted file mode 100644
index dd99b8102d..0000000000
--- a/test/units/ansible.cfg
+++ /dev/null
@@ -1,3 +0,0 @@
-[defaults]
-
-test_key = test_value
diff --git a/test/units/errors/__init__.py b/test/units/errors/__init__.py
new file mode 100644
index 0000000000..20207b272d
--- /dev/null
+++ b/test/units/errors/__init__.py
@@ -0,0 +1,22 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+
diff --git a/test/units/errors/test_errors.py b/test/units/errors/test_errors.py
new file mode 100644
index 0000000000..3993ea5061
--- /dev/null
+++ b/test/units/errors/test_errors.py
@@ -0,0 +1,68 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from ansible.compat.tests import unittest
+
+from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject
+from ansible.errors import AnsibleError
+
+from ansible.compat.tests import BUILTINS
+from ansible.compat.tests.mock import mock_open, patch
+
+class TestErrors(unittest.TestCase):
+
+ def setUp(self):
+ self.message = 'This is the error message'
+
+ self.obj = AnsibleBaseYAMLObject()
+
+ def tearDown(self):
+ pass
+
+ def test_basic_error(self):
+ e = AnsibleError(self.message)
+ self.assertEqual(e.message, 'ERROR! ' + self.message)
+ self.assertEqual(e.__repr__(), 'ERROR! ' + self.message)
+
+ @patch.object(AnsibleError, '_get_error_lines_from_file')
+ def test_error_with_object(self, mock_method):
+ self.obj.ansible_pos = ('foo.yml', 1, 1)
+
+ mock_method.return_value = ('this is line 1\n', '')
+ e = AnsibleError(self.message, self.obj)
+
+ self.assertEqual(e.message, "ERROR! This is the error message\n\nThe error appears to have been in 'foo.yml': line 1, column 1, but may\nbe elsewhere in the file depending on the exact syntax problem.\n\nThe offending line appears to be:\n\n\nthis is line 1\n^ here\n")
+
+ def test_get_error_lines_from_file(self):
+ m = mock_open()
+ m.return_value.readlines.return_value = ['this is line 1\n']
+
+ with patch('{0}.open'.format(BUILTINS), m):
+ # this line will be found in the file
+ self.obj.ansible_pos = ('foo.yml', 1, 1)
+ e = AnsibleError(self.message, self.obj)
+ self.assertEqual(e.message, "ERROR! This is the error message\n\nThe error appears to have been in 'foo.yml': line 1, column 1, but may\nbe elsewhere in the file depending on the exact syntax problem.\n\nThe offending line appears to be:\n\n\nthis is line 1\n^ here\n")
+
+ # this line will not be found, as it is out of the index range
+ self.obj.ansible_pos = ('foo.yml', 2, 1)
+ e = AnsibleError(self.message, self.obj)
+ self.assertEqual(e.message, "ERROR! This is the error message\n\nThe error appears to have been in 'foo.yml': line 2, column 1, but may\nbe elsewhere in the file depending on the exact syntax problem.\n\n(specified line no longer in file, maybe it changed?)")
+
diff --git a/test/units/executor/__init__.py b/test/units/executor/__init__.py
new file mode 100644
index 0000000000..785fc45992
--- /dev/null
+++ b/test/units/executor/__init__.py
@@ -0,0 +1,21 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
diff --git a/test/units/executor/test_play_iterator.py b/test/units/executor/test_play_iterator.py
new file mode 100644
index 0000000000..47c0352b25
--- /dev/null
+++ b/test/units/executor/test_play_iterator.py
@@ -0,0 +1,85 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from ansible.compat.tests import unittest
+from ansible.compat.tests.mock import patch, MagicMock
+
+from ansible.errors import AnsibleError, AnsibleParserError
+from ansible.executor.play_iterator import PlayIterator
+from ansible.playbook import Playbook
+
+from test.mock.loader import DictDataLoader
+
+class TestPlayIterator(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_play_iterator(self):
+ fake_loader = DictDataLoader({
+ "test_play.yml": """
+ - hosts: all
+ gather_facts: false
+ roles:
+ - test_role
+ pre_tasks:
+ - debug: msg="this is a pre_task"
+ tasks:
+ - debug: msg="this is a regular task"
+ post_tasks:
+ - debug: msg="this is a post_task"
+ """,
+ '/etc/ansible/roles/test_role/tasks/main.yml': """
+ - debug: msg="this is a role task"
+ """,
+ })
+
+ p = Playbook.load('test_play.yml', loader=fake_loader)
+
+ hosts = []
+ for i in range(0, 10):
+ host = MagicMock()
+ host.get_name.return_value = 'host%02d' % i
+ hosts.append(host)
+
+ inventory = MagicMock()
+ inventory.get_hosts.return_value = hosts
+ inventory.filter_hosts.return_value = hosts
+
+ itr = PlayIterator(inventory, p._entries[0])
+ task = itr.get_next_task_for_host(hosts[0])
+ print(task)
+ self.assertIsNotNone(task)
+ task = itr.get_next_task_for_host(hosts[0])
+ print(task)
+ self.assertIsNotNone(task)
+ task = itr.get_next_task_for_host(hosts[0])
+ print(task)
+ self.assertIsNotNone(task)
+ task = itr.get_next_task_for_host(hosts[0])
+ print(task)
+ self.assertIsNotNone(task)
+ task = itr.get_next_task_for_host(hosts[0])
+ print(task)
+ self.assertIsNone(task)
diff --git a/test/units/inventory_test_data/ansible_hosts b/test/units/inventory_test_data/ansible_hosts
deleted file mode 100644
index 94074edc3c..0000000000
--- a/test/units/inventory_test_data/ansible_hosts
+++ /dev/null
@@ -1,2 +0,0 @@
-[somegroup]
-localhost
diff --git a/test/units/inventory_test_data/broken.yml b/test/units/inventory_test_data/broken.yml
deleted file mode 100644
index 0eccc1ba78..0000000000
--- a/test/units/inventory_test_data/broken.yml
+++ /dev/null
@@ -1,2 +0,0 @@
-foo: bar
- baz: qux
diff --git a/test/units/inventory_test_data/common_vars.yml b/test/units/inventory_test_data/common_vars.yml
deleted file mode 100644
index c4c09b67f2..0000000000
--- a/test/units/inventory_test_data/common_vars.yml
+++ /dev/null
@@ -1,4 +0,0 @@
----
-duck: quack
-cow: moo
-extguard: " '$favcolor' == 'blue' "
diff --git a/test/units/inventory_test_data/complex_hosts b/test/units/inventory_test_data/complex_hosts
deleted file mode 100644
index 34935c6330..0000000000
--- a/test/units/inventory_test_data/complex_hosts
+++ /dev/null
@@ -1,96 +0,0 @@
-# order of groups, children, and vars is not significant
-# so this example mixes them up for maximum testing
-
-[nc:children]
-rtp
-triangle
-
-[eastcoast:children]
-nc
-florida
-
-[us:children]
-eastcoast
-
-[redundantgroup]
-rtp_a
-
-[redundantgroup2]
-rtp_a
-
-[redundantgroup3:children]
-rtp
-
-[redundantgroup:vars]
-rga=1
-
-[redundantgroup2:vars]
-rgb=2
-
-[redundantgroup3:vars]
-rgc=3
-
-[nc:vars]
-b=10000
-c=10001
-d=10002
-e = 10003
- f = 10004 != 10005
- g = " g "
- h = ' h '
- i = ' i "
- j = " j
- k = ['k1', 'k2']
-
-[rtp]
-rtp_a
-rtp_b
-rtp_c
-
-[rtp:vars]
-a=1
-b=2
-c=3
-
-[triangle]
-tri_a
-tri_b
-tri_c
-
-[triangle:vars]
-a=11
-b=12
-c=13
-
-[florida]
-orlando
-miami
-
-[florida:vars]
-a=100
-b=101
-c=102
-
-
-[eastcoast:vars]
-b=100000
-c=100001
-d=100002
-
-[us:vars]
-c=1000000
-
-[role1]
-host[1:2]
-
-[role2]
-host[2:3]
-
-[role3]
-host[1:3:2]
-
-[role4]
-blade-[a:c]-[1:16]
-blade-[d:z]-[01:16].example.com
-blade-[1:10]-[1:16]
-host-e-[10:16].example.net:1234
diff --git a/test/units/inventory_test_data/encrypted.yml b/test/units/inventory_test_data/encrypted.yml
deleted file mode 100644
index ca33ab25cb..0000000000
--- a/test/units/inventory_test_data/encrypted.yml
+++ /dev/null
@@ -1,6 +0,0 @@
-$ANSIBLE_VAULT;1.1;AES256
-33343734386261666161626433386662623039356366656637303939306563376130623138626165
-6436333766346533353463636566313332623130383662340a393835656134633665333861393331
-37666233346464636263636530626332623035633135363732623332313534306438393366323966
-3135306561356164310a343937653834643433343734653137383339323330626437313562306630
-3035
diff --git a/test/units/inventory_test_data/hosts_list.yml b/test/units/inventory_test_data/hosts_list.yml
deleted file mode 100644
index 09c5ca7c17..0000000000
--- a/test/units/inventory_test_data/hosts_list.yml
+++ /dev/null
@@ -1,6 +0,0 @@
-# Test that playbooks support YAML lists of hosts.
----
-- hosts: [host1, host2, host3]
- connection: local
- tasks:
- - action: command true
diff --git a/test/units/inventory_test_data/inventory/test_alpha_end_before_beg b/test/units/inventory_test_data/inventory/test_alpha_end_before_beg
deleted file mode 100644
index 1b7a478d87..0000000000
--- a/test/units/inventory_test_data/inventory/test_alpha_end_before_beg
+++ /dev/null
@@ -1,2 +0,0 @@
-[test]
-host[Z:T]
diff --git a/test/units/inventory_test_data/inventory/test_combined_range b/test/units/inventory_test_data/inventory/test_combined_range
deleted file mode 100644
index cbcb41753e..0000000000
--- a/test/units/inventory_test_data/inventory/test_combined_range
+++ /dev/null
@@ -1,2 +0,0 @@
-[test]
-host[1:2][A:B]
diff --git a/test/units/inventory_test_data/inventory/test_incorrect_format b/test/units/inventory_test_data/inventory/test_incorrect_format
deleted file mode 100644
index 339bd59edf..0000000000
--- a/test/units/inventory_test_data/inventory/test_incorrect_format
+++ /dev/null
@@ -1,2 +0,0 @@
-[test]
-host[001:10]
diff --git a/test/units/inventory_test_data/inventory/test_incorrect_range b/test/units/inventory_test_data/inventory/test_incorrect_range
deleted file mode 100644
index 272ca7be71..0000000000
--- a/test/units/inventory_test_data/inventory/test_incorrect_range
+++ /dev/null
@@ -1,2 +0,0 @@
-[test]
-host[1:2:3:4]
diff --git a/test/units/inventory_test_data/inventory/test_leading_range b/test/units/inventory_test_data/inventory/test_leading_range
deleted file mode 100644
index bf390de42a..0000000000
--- a/test/units/inventory_test_data/inventory/test_leading_range
+++ /dev/null
@@ -1,6 +0,0 @@
-[test]
-[1:2].host
-[A:B].host
-
-[test2] # comment
-[1:3].host
diff --git a/test/units/inventory_test_data/inventory/test_missing_end b/test/units/inventory_test_data/inventory/test_missing_end
deleted file mode 100644
index ff32042402..0000000000
--- a/test/units/inventory_test_data/inventory/test_missing_end
+++ /dev/null
@@ -1,2 +0,0 @@
-[test]
-host[1:]
diff --git a/test/units/inventory_test_data/inventory_api.py b/test/units/inventory_test_data/inventory_api.py
deleted file mode 100644
index 9bdca22ed3..0000000000
--- a/test/units/inventory_test_data/inventory_api.py
+++ /dev/null
@@ -1,44 +0,0 @@
-#!/usr/bin/env python
-
-import json
-import sys
-
-from optparse import OptionParser
-
-parser = OptionParser()
-parser.add_option('-l', '--list', default=False, dest="list_hosts", action="store_true")
-parser.add_option('-H', '--host', default=None, dest="host")
-parser.add_option('-e', '--extra-vars', default=None, dest="extra")
-
-options, args = parser.parse_args()
-
-systems = {
- "ungrouped": [ "jupiter", "saturn" ],
- "greek": [ "zeus", "hera", "poseidon" ],
- "norse": [ "thor", "odin", "loki" ],
- "major-god": [ "zeus", "odin" ],
-}
-
-variables = {
- "thor": {
- "hammer": True
- },
- "zeus": {},
-}
-
-if options.list_hosts == True:
- print json.dumps(systems)
- sys.exit(0)
-
-if options.host is not None:
- if options.extra:
- k,v = options.extra.split("=")
- variables[options.host][k] = v
- if options.host in variables:
- print json.dumps(variables[options.host])
- else:
- print "{}"
- sys.exit(0)
-
-parser.print_help()
-sys.exit(1)
diff --git a/test/units/inventory_test_data/inventory_dir/0hosts b/test/units/inventory_test_data/inventory_dir/0hosts
deleted file mode 100644
index 6f78a33a22..0000000000
--- a/test/units/inventory_test_data/inventory_dir/0hosts
+++ /dev/null
@@ -1,3 +0,0 @@
-zeus var_a=0
-morpheus
-thor
diff --git a/test/units/inventory_test_data/inventory_dir/1mythology b/test/units/inventory_test_data/inventory_dir/1mythology
deleted file mode 100644
index 43fa181bd5..0000000000
--- a/test/units/inventory_test_data/inventory_dir/1mythology
+++ /dev/null
@@ -1,6 +0,0 @@
-[greek]
-zeus
-morpheus
-
-[norse]
-thor
diff --git a/test/units/inventory_test_data/inventory_dir/2levels b/test/units/inventory_test_data/inventory_dir/2levels
deleted file mode 100644
index 363294923e..0000000000
--- a/test/units/inventory_test_data/inventory_dir/2levels
+++ /dev/null
@@ -1,6 +0,0 @@
-[major-god]
-zeus var_a=2
-thor
-
-[minor-god]
-morpheus
diff --git a/test/units/inventory_test_data/inventory_dir/3comments b/test/units/inventory_test_data/inventory_dir/3comments
deleted file mode 100644
index e11b5e416b..0000000000
--- a/test/units/inventory_test_data/inventory_dir/3comments
+++ /dev/null
@@ -1,8 +0,0 @@
-[major-god] # group with inline comments
-zeus var_a="3\#4" # host with inline comments and "#" in the var string
-# A comment
-thor
-
-[minor-god] # group with inline comment and unbalanced quotes: ' "
-morpheus # host with inline comments and unbalanced quotes: ' "
-# A comment with unbalanced quotes: ' "
diff --git a/test/units/inventory_test_data/inventory_dir/4skip_extensions.ini b/test/units/inventory_test_data/inventory_dir/4skip_extensions.ini
deleted file mode 100644
index a30afe5fcc..0000000000
--- a/test/units/inventory_test_data/inventory_dir/4skip_extensions.ini
+++ /dev/null
@@ -1,2 +0,0 @@
-[skip]
-skipme \ No newline at end of file
diff --git a/test/units/inventory_test_data/large_range b/test/units/inventory_test_data/large_range
deleted file mode 100644
index 18cfc22078..0000000000
--- a/test/units/inventory_test_data/large_range
+++ /dev/null
@@ -1 +0,0 @@
-bob[000:142]
diff --git a/test/units/inventory_test_data/restrict_pattern b/test/units/inventory_test_data/restrict_pattern
deleted file mode 100644
index fb16b4dda5..0000000000
--- a/test/units/inventory_test_data/restrict_pattern
+++ /dev/null
@@ -1,2 +0,0 @@
-odin
-thor
diff --git a/test/units/inventory_test_data/simple_hosts b/test/units/inventory_test_data/simple_hosts
deleted file mode 100644
index 4625b3dbab..0000000000
--- a/test/units/inventory_test_data/simple_hosts
+++ /dev/null
@@ -1,22 +0,0 @@
-jupiter
-saturn
-thrudgelmir[:5]
-
-[greek]
-zeus
-hera:3000
-poseidon
-cerberus[001:003]
-cottus[99:100]
-
-[norse]
-thor
-odin
-loki
-
-[egyptian]
-Hotep-[a:c]
-Bast[C:D]
-
-[auth]
-neptun auth="YWRtaW46YWRtaW4="
diff --git a/test/units/mock/__init__.py b/test/units/mock/__init__.py
new file mode 100644
index 0000000000..ae8ccff595
--- /dev/null
+++ b/test/units/mock/__init__.py
@@ -0,0 +1,20 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
diff --git a/test/units/mock/loader.py b/test/units/mock/loader.py
new file mode 100644
index 0000000000..cf9d7ea72d
--- /dev/null
+++ b/test/units/mock/loader.py
@@ -0,0 +1,83 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+
+from ansible.parsing import DataLoader
+
+class DictDataLoader(DataLoader):
+
+ def __init__(self, file_mapping=dict()):
+ assert type(file_mapping) == dict
+
+ self._file_mapping = file_mapping
+ self._build_known_directories()
+
+ super(DictDataLoader, self).__init__()
+
+ def load_from_file(self, path):
+ if path in self._file_mapping:
+ return self.load(self._file_mapping[path], path)
+ return None
+
+ def path_exists(self, path):
+ return path in self._file_mapping or path in self._known_directories
+
+ def is_file(self, path):
+ return path in self._file_mapping
+
+ def is_directory(self, path):
+ return path in self._known_directories
+
+ def list_directory(self, path):
+ return [x for x in self._known_directories]
+
+ def _add_known_directory(self, directory):
+ if directory not in self._known_directories:
+ self._known_directories.append(directory)
+
+ def _build_known_directories(self):
+ self._known_directories = []
+ for path in self._file_mapping:
+ dirname = os.path.dirname(path)
+ while dirname not in ('/', ''):
+ self._add_known_directory(dirname)
+ dirname = os.path.dirname(dirname)
+
+ def push(self, path, content):
+ rebuild_dirs = False
+ if path not in self._file_mapping:
+ rebuild_dirs = True
+
+ self._file_mapping[path] = content
+
+ if rebuild_dirs:
+ self._build_known_directories()
+
+ def pop(self, path):
+ if path in self._file_mapping:
+ del self._file_mapping[path]
+ self._build_known_directories()
+
+ def clear(self):
+ self._file_mapping = dict()
+ self._known_directories = []
+
diff --git a/test/units/module_tests/TestApt.py b/test/units/module_tests/TestApt.py
deleted file mode 100644
index e7f2dafc95..0000000000
--- a/test/units/module_tests/TestApt.py
+++ /dev/null
@@ -1,42 +0,0 @@
-import collections
-import mock
-import os
-import unittest
-
-from ansible.modules.core.packaging.os.apt import (
- expand_pkgspec_from_fnmatches,
-)
-
-
-class AptExpandPkgspecTestCase(unittest.TestCase):
-
- def setUp(self):
- FakePackage = collections.namedtuple("Package", ("name",))
- self.fake_cache = [ FakePackage("apt"),
- FakePackage("apt-utils"),
- FakePackage("not-selected"),
- ]
-
- def test_trivial(self):
- foo = ["apt"]
- self.assertEqual(
- expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo)
-
- def test_version_wildcard(self):
- foo = ["apt=1.0*"]
- self.assertEqual(
- expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo)
-
- def test_pkgname_wildcard_version_wildcard(self):
- foo = ["apt*=1.0*"]
- m_mock = mock.Mock()
- self.assertEqual(
- expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache),
- ['apt', 'apt-utils'])
-
- def test_pkgname_expands(self):
- foo = ["apt*"]
- m_mock = mock.Mock()
- self.assertEqual(
- expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache),
- ["apt", "apt-utils"])
diff --git a/test/units/module_tests/TestDocker.py b/test/units/module_tests/TestDocker.py
deleted file mode 100644
index b8c8cf1e23..0000000000
--- a/test/units/module_tests/TestDocker.py
+++ /dev/null
@@ -1,19 +0,0 @@
-import collections
-import os
-import unittest
-
-from ansible.modules.core.cloud.docker.docker import get_split_image_tag
-
-class DockerSplitImageTagTestCase(unittest.TestCase):
-
- def test_trivial(self):
- self.assertEqual(get_split_image_tag('test'), ('test', 'latest'))
-
- def test_with_org_name(self):
- self.assertEqual(get_split_image_tag('ansible/centos7-ansible'), ('ansible/centos7-ansible', 'latest'))
-
- def test_with_tag(self):
- self.assertEqual(get_split_image_tag('test:devel'), ('test', 'devel'))
-
- def test_with_tag_and_org_name(self):
- self.assertEqual(get_split_image_tag('ansible/centos7-ansible:devel'), ('ansible/centos7-ansible', 'devel'))
diff --git a/test/units/parsing/__init__.py b/test/units/parsing/__init__.py
new file mode 100644
index 0000000000..785fc45992
--- /dev/null
+++ b/test/units/parsing/__init__.py
@@ -0,0 +1,21 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
diff --git a/test/units/parsing/test_data_loader.py b/test/units/parsing/test_data_loader.py
new file mode 100644
index 0000000000..b9c37cdd0c
--- /dev/null
+++ b/test/units/parsing/test_data_loader.py
@@ -0,0 +1,90 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from six import PY2
+from yaml.scanner import ScannerError
+
+from ansible.compat.tests import unittest
+from ansible.compat.tests.mock import patch, mock_open
+from ansible.errors import AnsibleParserError
+
+from ansible.parsing import DataLoader
+from ansible.parsing.yaml.objects import AnsibleMapping
+
+class TestDataLoader(unittest.TestCase):
+
+ def setUp(self):
+ # FIXME: need to add tests that utilize vault_password
+ self._loader = DataLoader()
+
+ def tearDown(self):
+ pass
+
+ @patch.object(DataLoader, '_get_file_contents')
+ def test_parse_json_from_file(self, mock_def):
+ mock_def.return_value = ("""{"a": 1, "b": 2, "c": 3}""", True)
+ output = self._loader.load_from_file('dummy_json.txt')
+ self.assertEqual(output, dict(a=1,b=2,c=3))
+
+ @patch.object(DataLoader, '_get_file_contents')
+ def test_parse_yaml_from_file(self, mock_def):
+ mock_def.return_value = ("""
+ a: 1
+ b: 2
+ c: 3
+ """, True)
+ output = self._loader.load_from_file('dummy_yaml.txt')
+ self.assertEqual(output, dict(a=1,b=2,c=3))
+
+ @patch.object(DataLoader, '_get_file_contents')
+ def test_parse_fail_from_file(self, mock_def):
+ mock_def.return_value = ("""
+ TEXT:
+ ***
+ NOT VALID
+ """, True)
+ self.assertRaises(AnsibleParserError, self._loader.load_from_file, 'dummy_yaml_bad.txt')
+
+class TestDataLoaderWithVault(unittest.TestCase):
+
+ def setUp(self):
+ self._loader = DataLoader(vault_password='ansible')
+
+ def tearDown(self):
+ pass
+
+ @patch.multiple(DataLoader, path_exists=lambda s, x: True, is_file=lambda s, x: True)
+ def test_parse_from_vault_1_1_file(self):
+ vaulted_data = """$ANSIBLE_VAULT;1.1;AES256
+33343734386261666161626433386662623039356366656637303939306563376130623138626165
+6436333766346533353463636566313332623130383662340a393835656134633665333861393331
+37666233346464636263636530626332623035633135363732623332313534306438393366323966
+3135306561356164310a343937653834643433343734653137383339323330626437313562306630
+3035
+"""
+ if PY2:
+ builtins_name = '__builtin__'
+ else:
+ builtins_name = 'builtins'
+
+ with patch(builtins_name + '.open', mock_open(read_data=vaulted_data)):
+ output = self._loader.load_from_file('dummy_vault.txt')
+ self.assertEqual(output, dict(foo='bar'))
diff --git a/test/units/parsing/test_mod_args.py b/test/units/parsing/test_mod_args.py
new file mode 100644
index 0000000000..187edfa03c
--- /dev/null
+++ b/test/units/parsing/test_mod_args.py
@@ -0,0 +1,130 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from ansible.parsing.mod_args import ModuleArgsParser
+from ansible.errors import AnsibleParserError
+
+from ansible.compat.tests import unittest
+
+class TestModArgsDwim(unittest.TestCase):
+
+ # TODO: add tests that construct ModuleArgsParser with a task reference
+ # TODO: verify the AnsibleError raised on failure knows the task
+ # and the task knows the line numbers
+
+ def setUp(self):
+ pass
+
+ def _debug(self, mod, args, to):
+ print("RETURNED module = {0}".format(mod))
+ print(" args = {0}".format(args))
+ print(" to = {0}".format(to))
+
+ def tearDown(self):
+ pass
+
+ def test_basic_shell(self):
+ m = ModuleArgsParser(dict(shell='echo hi'))
+ mod, args, to = m.parse()
+ self._debug(mod, args, to)
+ self.assertEqual(mod, 'command')
+ self.assertEqual(args, dict(
+ _raw_params = 'echo hi',
+ _uses_shell = True,
+ ))
+ self.assertIsNone(to)
+
+ def test_basic_command(self):
+ m = ModuleArgsParser(dict(command='echo hi'))
+ mod, args, to = m.parse()
+ self._debug(mod, args, to)
+ self.assertEqual(mod, 'command')
+ self.assertEqual(args, dict(
+ _raw_params = 'echo hi',
+ ))
+ self.assertIsNone(to)
+
+ def test_shell_with_modifiers(self):
+ m = ModuleArgsParser(dict(shell='/bin/foo creates=/tmp/baz removes=/tmp/bleep'))
+ mod, args, to = m.parse()
+ self._debug(mod, args, to)
+ self.assertEqual(mod, 'command')
+ self.assertEqual(args, dict(
+ creates = '/tmp/baz',
+ removes = '/tmp/bleep',
+ _raw_params = '/bin/foo',
+ _uses_shell = True,
+ ))
+ self.assertIsNone(to)
+
+ def test_normal_usage(self):
+ m = ModuleArgsParser(dict(copy='src=a dest=b'))
+ mod, args, to = m.parse()
+ self._debug(mod, args, to)
+ self.assertEqual(mod, 'copy')
+ self.assertEqual(args, dict(src='a', dest='b'))
+ self.assertIsNone(to)
+
+ def test_complex_args(self):
+ m = ModuleArgsParser(dict(copy=dict(src='a', dest='b')))
+ mod, args, to = m.parse()
+ self._debug(mod, args, to)
+ self.assertEqual(mod, 'copy')
+ self.assertEqual(args, dict(src='a', dest='b'))
+ self.assertIsNone(to)
+
+ def test_action_with_complex(self):
+ m = ModuleArgsParser(dict(action=dict(module='copy', src='a', dest='b')))
+ mod, args, to = m.parse()
+ self._debug(mod, args, to)
+ self.assertEqual(mod, 'copy')
+ self.assertEqual(args, dict(src='a', dest='b'))
+ self.assertIsNone(to)
+
+ def test_action_with_complex_and_complex_args(self):
+ m = ModuleArgsParser(dict(action=dict(module='copy', args=dict(src='a', dest='b'))))
+ mod, args, to = m.parse()
+ self._debug(mod, args, to)
+ self.assertEqual(mod, 'copy')
+ self.assertEqual(args, dict(src='a', dest='b'))
+ self.assertIsNone(to)
+
+ def test_local_action_string(self):
+ m = ModuleArgsParser(dict(local_action='copy src=a dest=b'))
+ mod, args, to = m.parse()
+ self._debug(mod, args, to)
+ self.assertEqual(mod, 'copy')
+ self.assertEqual(args, dict(src='a', dest='b'))
+ self.assertIs(to, 'localhost')
+
+ def test_multiple_actions(self):
+ m = ModuleArgsParser(dict(action='shell echo hi', local_action='shell echo hi'))
+ self.assertRaises(AnsibleParserError, m.parse)
+
+ m = ModuleArgsParser(dict(action='shell echo hi', shell='echo hi'))
+ self.assertRaises(AnsibleParserError, m.parse)
+
+ m = ModuleArgsParser(dict(local_action='shell echo hi', shell='echo hi'))
+ self.assertRaises(AnsibleParserError, m.parse)
+
+ m = ModuleArgsParser(dict(ping='data=hi', shell='echo hi'))
+ self.assertRaises(AnsibleParserError, m.parse)
+
diff --git a/test/units/parsing/test_splitter.py b/test/units/parsing/test_splitter.py
new file mode 100644
index 0000000000..1f648c8f6a
--- /dev/null
+++ b/test/units/parsing/test_splitter.py
@@ -0,0 +1,112 @@
+# coding: utf-8
+# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from nose import tools
+from ansible.compat.tests import unittest
+
+from ansible.parsing.splitter import split_args, parse_kv
+
+
+# Tests using nose's test generators cannot use unittest base class.
+# http://nose.readthedocs.org/en/latest/writing_tests.html#test-generators
+class TestSplitter_Gen:
+ SPLIT_DATA = (
+ (u'a',
+ [u'a'],
+ {u'_raw_params': u'a'}),
+ (u'a=b',
+ [u'a=b'],
+ {u'a': u'b'}),
+ (u'a="foo bar"',
+ [u'a="foo bar"'],
+ {u'a': u'foo bar'}),
+ (u'"foo bar baz"',
+ [u'"foo bar baz"'],
+ {u'_raw_params': '"foo bar baz"'}),
+ (u'foo bar baz',
+ [u'foo', u'bar', u'baz'],
+ {u'_raw_params': u'foo bar baz'}),
+ (u'a=b c="foo bar"',
+ [u'a=b', u'c="foo bar"'],
+ {u'a': u'b', u'c': u'foo bar'}),
+ (u'a="echo \\"hello world\\"" b=bar',
+ [u'a="echo \\"hello world\\""', u'b=bar'],
+ {u'a': u'echo "hello world"', u'b': u'bar'}),
+ (u'a="multi\nline"',
+ [u'a="multi\nline"'],
+ {u'a': u'multi\nline'}),
+ (u'a="blank\n\nline"',
+ [u'a="blank\n\nline"'],
+ {u'a': u'blank\n\nline'}),
+ (u'a="blank\n\n\nlines"',
+ [u'a="blank\n\n\nlines"'],
+ {u'a': u'blank\n\n\nlines'}),
+ (u'a="a long\nmessage\\\nabout a thing\n"',
+ [u'a="a long\nmessage\\\nabout a thing\n"'],
+ {u'a': u'a long\nmessage\\\nabout a thing\n'}),
+ (u'a="multiline\nmessage1\\\n" b="multiline\nmessage2\\\n"',
+ [u'a="multiline\nmessage1\\\n"', u'b="multiline\nmessage2\\\n"'],
+ {u'a': 'multiline\nmessage1\\\n', u'b': u'multiline\nmessage2\\\n'}),
+ (u'a={{jinja}}',
+ [u'a={{jinja}}'],
+ {u'a': u'{{jinja}}'}),
+ (u'a={{ jinja }}',
+ [u'a={{ jinja }}'],
+ {u'a': u'{{ jinja }}'}),
+ (u'a="{{jinja}}"',
+ [u'a="{{jinja}}"'],
+ {u'a': u'{{jinja}}'}),
+ (u'a={{ jinja }}{{jinja2}}',
+ [u'a={{ jinja }}{{jinja2}}'],
+ {u'a': u'{{ jinja }}{{jinja2}}'}),
+ (u'a="{{ jinja }}{{jinja2}}"',
+ [u'a="{{ jinja }}{{jinja2}}"'],
+ {u'a': u'{{ jinja }}{{jinja2}}'}),
+ (u'a={{jinja}} b={{jinja2}}',
+ [u'a={{jinja}}', u'b={{jinja2}}'],
+ {u'a': u'{{jinja}}', u'b': u'{{jinja2}}'}),
+ (u'a="{{jinja}}\n" b="{{jinja2}}\n"',
+ [u'a="{{jinja}}\n"', u'b="{{jinja2}}\n"'],
+ {u'a': u'{{jinja}}\n', u'b': u'{{jinja2}}\n'}),
+ (u'a="café eñyei"',
+ [u'a="café eñyei"'],
+ {u'a': u'café eñyei'}),
+ (u'a=café b=eñyei',
+ [u'a=café', u'b=eñyei'],
+ {u'a': u'café', u'b': u'eñyei'}),
+ )
+
+ def check_split_args(self, args, expected):
+ tools.eq_(split_args(args), expected)
+
+ def test_split_args(self):
+ for datapoint in self.SPLIT_DATA:
+ yield self.check_split_args, datapoint[0], datapoint[1]
+
+ def check_parse_kv(self, args, expected):
+ tools.eq_(parse_kv(args), expected)
+
+ def test_parse_kv(self):
+ for datapoint in self.SPLIT_DATA:
+ try:
+ yield self.check_parse_kv, datapoint[0], datapoint[2]
+ except: pass
diff --git a/test/units/parsing/vault/__init__.py b/test/units/parsing/vault/__init__.py
new file mode 100644
index 0000000000..785fc45992
--- /dev/null
+++ b/test/units/parsing/vault/__init__.py
@@ -0,0 +1,21 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
diff --git a/test/units/TestVault.py b/test/units/parsing/vault/test_vault.py
index b720d72e84..2aaac27fc7 100644
--- a/test/units/TestVault.py
+++ b/test/units/parsing/vault/test_vault.py
@@ -1,17 +1,40 @@
-#!/usr/bin/env python
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
-from unittest import TestCase
import getpass
import os
import shutil
import time
import tempfile
+import six
+
from binascii import unhexlify
from binascii import hexlify
from nose.plugins.skip import SkipTest
+from ansible.compat.tests import unittest
+from ansible.utils.unicode import to_bytes, to_unicode
+
from ansible import errors
-from ansible.utils.vault import VaultLib
+from ansible.parsing.vault import VaultLib
# Counter import fails for 2.0.1, requires >= 2.6.1 from pip
try:
@@ -34,16 +57,7 @@ try:
except ImportError:
HAS_AES = False
-class TestVaultLib(TestCase):
-
- def _is_fips(self):
- try:
- data = open('/proc/sys/crypto/fips_enabled').read().strip()
- except:
- return False
- if data != '1':
- return False
- return True
+class TestVaultLib(unittest.TestCase):
def test_methods_exist(self):
v = VaultLib('ansible')
@@ -52,13 +66,13 @@ class TestVaultLib(TestCase):
'decrypt',
'_add_header',
'_split_header',]
- for slot in slots:
+ for slot in slots:
assert hasattr(v, slot), "VaultLib is missing the %s method" % slot
def test_is_encrypted(self):
v = VaultLib(None)
- assert not v.is_encrypted("foobar"), "encryption check on plaintext failed"
- data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify("ansible")
+ assert not v.is_encrypted(u"foobar"), "encryption check on plaintext failed"
+ data = u"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
assert v.is_encrypted(data), "encryption check on headered text failed"
def test_add_header(self):
@@ -66,32 +80,30 @@ class TestVaultLib(TestCase):
v.cipher_name = "TEST"
sensitive_data = "ansible"
data = v._add_header(sensitive_data)
- lines = data.split('\n')
+ lines = data.split(b'\n')
assert len(lines) > 1, "failed to properly add header"
- header = lines[0]
+ header = to_unicode(lines[0])
assert header.endswith(';TEST'), "header does end with cipher name"
header_parts = header.split(';')
- assert len(header_parts) == 3, "header has the wrong number of parts"
+ assert len(header_parts) == 3, "header has the wrong number of parts"
assert header_parts[0] == '$ANSIBLE_VAULT', "header does not start with $ANSIBLE_VAULT"
assert header_parts[1] == v.version, "header version is incorrect"
assert header_parts[2] == 'TEST', "header does end with cipher name"
def test_split_header(self):
v = VaultLib('ansible')
- data = "$ANSIBLE_VAULT;9.9;TEST\nansible"
- rdata = v._split_header(data)
- lines = rdata.split('\n')
- assert lines[0] == "ansible"
+ data = b"$ANSIBLE_VAULT;9.9;TEST\nansible"
+ rdata = v._split_header(data)
+ lines = rdata.split(b'\n')
+ assert lines[0] == b"ansible"
assert v.cipher_name == 'TEST', "cipher name was not set"
assert v.version == "9.9"
def test_encrypt_decrypt_aes(self):
- if self._is_fips():
- raise SkipTest('MD5 not available on FIPS enabled systems')
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
v = VaultLib('ansible')
- v.cipher_name = 'AES'
+ v.cipher_name = u'AES'
enc_data = v.encrypt("foobar")
dec_data = v.decrypt(enc_data)
assert enc_data != "foobar", "encryption failed"
@@ -105,20 +117,20 @@ class TestVaultLib(TestCase):
enc_data = v.encrypt("foobar")
dec_data = v.decrypt(enc_data)
assert enc_data != "foobar", "encryption failed"
- assert dec_data == "foobar", "decryption failed"
+ assert dec_data == "foobar", "decryption failed"
def test_encrypt_encrypted(self):
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
v = VaultLib('ansible')
v.cipher_name = 'AES'
- data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify("ansible")
+ data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(six.b("ansible"))
error_hit = False
try:
enc_data = v.encrypt(data)
- except errors.AnsibleError, e:
+ except errors.AnsibleError as e:
error_hit = True
- assert error_hit, "No error was thrown when trying to encrypt data with a header"
+ assert error_hit, "No error was thrown when trying to encrypt data with a header"
def test_decrypt_decrypted(self):
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
@@ -128,9 +140,9 @@ class TestVaultLib(TestCase):
error_hit = False
try:
dec_data = v.decrypt(data)
- except errors.AnsibleError, e:
+ except errors.AnsibleError as e:
error_hit = True
- assert error_hit, "No error was thrown when trying to decrypt data without a header"
+ assert error_hit, "No error was thrown when trying to decrypt data without a header"
def test_cipher_not_set(self):
# not setting the cipher should default to AES256
@@ -141,7 +153,7 @@ class TestVaultLib(TestCase):
error_hit = False
try:
enc_data = v.encrypt(data)
- except errors.AnsibleError, e:
+ except errors.AnsibleError as e:
error_hit = True
- assert not error_hit, "An error was thrown when trying to encrypt data without the cipher set"
- assert v.cipher_name == "AES256", "cipher name is not set to AES256: %s" % v.cipher_name
+ assert not error_hit, "An error was thrown when trying to encrypt data without the cipher set"
+ assert v.cipher_name == "AES256", "cipher name is not set to AES256: %s" % v.cipher_name
diff --git a/test/units/parsing/vault/test_vault_editor.py b/test/units/parsing/vault/test_vault_editor.py
new file mode 100644
index 0000000000..2ddf3de27a
--- /dev/null
+++ b/test/units/parsing/vault/test_vault_editor.py
@@ -0,0 +1,214 @@
+# (c) 2014, James Tanner <tanner.jc@gmail.com>
+# (c) 2014, James Cammarata, <jcammarata@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+#!/usr/bin/env python
+
+import sys
+import getpass
+import os
+import shutil
+import time
+import tempfile
+from binascii import unhexlify
+from binascii import hexlify
+from nose.plugins.skip import SkipTest
+
+from ansible.compat.tests import unittest
+from ansible.compat.tests.mock import patch
+from ansible.utils.unicode import to_bytes, to_unicode
+
+from ansible import errors
+from ansible.parsing.vault import VaultLib
+from ansible.parsing.vault import VaultEditor
+
+# Counter import fails for 2.0.1, requires >= 2.6.1 from pip
+try:
+ from Crypto.Util import Counter
+ HAS_COUNTER = True
+except ImportError:
+ HAS_COUNTER = False
+
+# KDF import fails for 2.0.1, requires >= 2.6.1 from pip
+try:
+ from Crypto.Protocol.KDF import PBKDF2
+ HAS_PBKDF2 = True
+except ImportError:
+ HAS_PBKDF2 = False
+
+# AES IMPORTS
+try:
+ from Crypto.Cipher import AES as AES
+ HAS_AES = True
+except ImportError:
+ HAS_AES = False
+
+v10_data = """$ANSIBLE_VAULT;1.0;AES
+53616c7465645f5fd0026926a2d415a28a2622116273fbc90e377225c12a347e1daf4456d36a77f9
+9ad98d59f61d06a4b66718d855f16fb7bdfe54d1ec8aeaa4d06c2dc1fa630ae1846a029877f0eeb1
+83c62ffb04c2512995e815de4b4d29ed"""
+
+v11_data = """$ANSIBLE_VAULT;1.1;AES256
+62303130653266653331306264616235333735323636616539316433666463323964623162386137
+3961616263373033353631316333623566303532663065310a393036623466376263393961326530
+64336561613965383835646464623865663966323464653236343638373165343863623638316664
+3631633031323837340a396530313963373030343933616133393566366137363761373930663833
+3739"""
+
+class TestVaultEditor(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_methods_exist(self):
+ v = VaultEditor(None, None, None)
+ slots = ['create_file',
+ 'decrypt_file',
+ 'edit_file',
+ 'encrypt_file',
+ 'rekey_file',
+ 'read_data',
+ 'write_data',
+ 'shuffle_files']
+ for slot in slots:
+ assert hasattr(v, slot), "VaultLib is missing the %s method" % slot
+
+ @patch.object(VaultEditor, '_editor_shell_command')
+ def test_create_file(self, mock_editor_shell_command):
+
+ def sc_side_effect(filename):
+ return ['touch', filename]
+ mock_editor_shell_command.side_effect = sc_side_effect
+
+ tmp_file = tempfile.NamedTemporaryFile()
+ os.unlink(tmp_file.name)
+
+ ve = VaultEditor(None, "ansible", tmp_file.name)
+ ve.create_file()
+
+ self.assertTrue(os.path.exists(tmp_file.name))
+
+ def test_decrypt_1_0(self):
+ """
+ Skip testing decrypting 1.0 files if we don't have access to AES, KDF or
+ Counter, or we are running on python3 since VaultAES hasn't been backported.
+ """
+ if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2 or sys.version > '3':
+ raise SkipTest
+
+ v10_file = tempfile.NamedTemporaryFile(delete=False)
+ with v10_file as f:
+ f.write(to_bytes(v10_data))
+
+ ve = VaultEditor(None, "ansible", v10_file.name)
+
+ # make sure the password functions for the cipher
+ error_hit = False
+ try:
+ ve.decrypt_file()
+ except errors.AnsibleError as e:
+ error_hit = True
+
+ # verify decrypted content
+ f = open(v10_file.name, "rb")
+ fdata = to_unicode(f.read())
+ f.close()
+
+ os.unlink(v10_file.name)
+
+ assert error_hit == False, "error decrypting 1.0 file"
+ assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip()
+
+
+ def test_decrypt_1_1(self):
+ if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
+ raise SkipTest
+
+ v11_file = tempfile.NamedTemporaryFile(delete=False)
+ with v11_file as f:
+ f.write(to_bytes(v11_data))
+
+ ve = VaultEditor(None, "ansible", v11_file.name)
+
+ # make sure the password functions for the cipher
+ error_hit = False
+ try:
+ ve.decrypt_file()
+ except errors.AnsibleError as e:
+ error_hit = True
+
+ # verify decrypted content
+ f = open(v11_file.name, "rb")
+ fdata = to_unicode(f.read())
+ f.close()
+
+ os.unlink(v11_file.name)
+
+ assert error_hit == False, "error decrypting 1.0 file"
+ assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip()
+
+
+ def test_rekey_migration(self):
+ """
+ Skip testing rekeying files if we don't have access to AES, KDF or
+ Counter, or we are running on python3 since VaultAES hasn't been backported.
+ """
+ if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2 or sys.version > '3':
+ raise SkipTest
+
+ v10_file = tempfile.NamedTemporaryFile(delete=False)
+ with v10_file as f:
+ f.write(to_bytes(v10_data))
+
+ ve = VaultEditor(None, "ansible", v10_file.name)
+
+ # make sure the password functions for the cipher
+ error_hit = False
+ try:
+ ve.rekey_file('ansible2')
+ except errors.AnsibleError as e:
+ error_hit = True
+
+ # verify decrypted content
+ f = open(v10_file.name, "rb")
+ fdata = f.read()
+ f.close()
+
+ assert error_hit == False, "error rekeying 1.0 file to 1.1"
+
+ # ensure filedata can be decrypted, is 1.1 and is AES256
+ vl = VaultLib("ansible2")
+ dec_data = None
+ error_hit = False
+ try:
+ dec_data = vl.decrypt(fdata)
+ except errors.AnsibleError as e:
+ error_hit = True
+
+ os.unlink(v10_file.name)
+
+ assert vl.cipher_name == "AES256", "wrong cipher name set after rekey: %s" % vl.cipher_name
+ assert error_hit == False, "error decrypting migrated 1.0 file"
+ assert dec_data.strip() == "foo", "incorrect decryption of rekeyed/migrated file: %s" % dec_data
+
+
diff --git a/test/units/parsing/yaml/__init__.py b/test/units/parsing/yaml/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/test/units/parsing/yaml/__init__.py
diff --git a/test/units/parsing/yaml/test_loader.py b/test/units/parsing/yaml/test_loader.py
new file mode 100644
index 0000000000..37eeabff83
--- /dev/null
+++ b/test/units/parsing/yaml/test_loader.py
@@ -0,0 +1,283 @@
+# coding: utf-8
+# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from six import text_type, binary_type
+from six.moves import StringIO
+from collections import Sequence, Set, Mapping
+
+from ansible.compat.tests import unittest
+from ansible.compat.tests.mock import patch
+
+from ansible.parsing.yaml.loader import AnsibleLoader
+
+
+class TestAnsibleLoaderBasic(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_parse_number(self):
+ stream = StringIO("""
+ 1
+ """)
+ loader = AnsibleLoader(stream, 'myfile.yml')
+ data = loader.get_single_data()
+ self.assertEqual(data, 1)
+ # No line/column info saved yet
+
+ def test_parse_string(self):
+ stream = StringIO("""
+ Ansible
+ """)
+ loader = AnsibleLoader(stream, 'myfile.yml')
+ data = loader.get_single_data()
+ self.assertEqual(data, u'Ansible')
+ self.assertIsInstance(data, text_type)
+
+ self.assertEqual(data.ansible_pos, ('myfile.yml', 2, 17))
+
+ def test_parse_utf8_string(self):
+ stream = StringIO("""
+ Cafè Eñyei
+ """)
+ loader = AnsibleLoader(stream, 'myfile.yml')
+ data = loader.get_single_data()
+ self.assertEqual(data, u'Cafè Eñyei')
+ self.assertIsInstance(data, text_type)
+
+ self.assertEqual(data.ansible_pos, ('myfile.yml', 2, 17))
+
+ def test_parse_dict(self):
+ stream = StringIO("""
+ webster: daniel
+ oed: oxford
+ """)
+ loader = AnsibleLoader(stream, 'myfile.yml')
+ data = loader.get_single_data()
+ self.assertEqual(data, {'webster': 'daniel', 'oed': 'oxford'})
+ self.assertEqual(len(data), 2)
+ self.assertIsInstance(list(data.keys())[0], text_type)
+ self.assertIsInstance(list(data.values())[0], text_type)
+
+ # Beginning of the first key
+ self.assertEqual(data.ansible_pos, ('myfile.yml', 2, 17))
+
+ self.assertEqual(data[u'webster'].ansible_pos, ('myfile.yml', 2, 26))
+ self.assertEqual(data[u'oed'].ansible_pos, ('myfile.yml', 3, 22))
+
+ def test_parse_list(self):
+ stream = StringIO("""
+ - a
+ - b
+ """)
+ loader = AnsibleLoader(stream, 'myfile.yml')
+ data = loader.get_single_data()
+ self.assertEqual(data, [u'a', u'b'])
+ self.assertEqual(len(data), 2)
+ self.assertIsInstance(data[0], text_type)
+
+ self.assertEqual(data.ansible_pos, ('myfile.yml', 2, 17))
+
+ self.assertEqual(data[0].ansible_pos, ('myfile.yml', 2, 19))
+ self.assertEqual(data[1].ansible_pos, ('myfile.yml', 3, 19))
+
+ def test_parse_short_dict(self):
+ stream = StringIO("""{"foo": "bar"}""")
+ loader = AnsibleLoader(stream, 'myfile.yml')
+ data = loader.get_single_data()
+ self.assertEqual(data, dict(foo=u'bar'))
+
+ self.assertEqual(data.ansible_pos, ('myfile.yml', 1, 1))
+ self.assertEqual(data[u'foo'].ansible_pos, ('myfile.yml', 1, 9))
+
+ stream = StringIO("""foo: bar""")
+ loader = AnsibleLoader(stream, 'myfile.yml')
+ data = loader.get_single_data()
+ self.assertEqual(data, dict(foo=u'bar'))
+
+ self.assertEqual(data.ansible_pos, ('myfile.yml', 1, 1))
+ self.assertEqual(data[u'foo'].ansible_pos, ('myfile.yml', 1, 6))
+
+ def test_error_conditions(self):
+ stream = StringIO("""{""")
+ loader = AnsibleLoader(stream, 'myfile.yml')
+ self.assertRaises(loader.get_single_data)
+
+ def test_front_matter(self):
+ stream = StringIO("""---\nfoo: bar""")
+ loader = AnsibleLoader(stream, 'myfile.yml')
+ data = loader.get_single_data()
+ self.assertEqual(data, dict(foo=u'bar'))
+
+ self.assertEqual(data.ansible_pos, ('myfile.yml', 2, 1))
+ self.assertEqual(data[u'foo'].ansible_pos, ('myfile.yml', 2, 6))
+
+ # Initial indent (See: #6348)
+ stream = StringIO(""" - foo: bar\n baz: qux""")
+ loader = AnsibleLoader(stream, 'myfile.yml')
+ data = loader.get_single_data()
+ self.assertEqual(data, [{u'foo': u'bar', u'baz': u'qux'}])
+
+ self.assertEqual(data.ansible_pos, ('myfile.yml', 1, 2))
+ self.assertEqual(data[0].ansible_pos, ('myfile.yml', 1, 4))
+ self.assertEqual(data[0][u'foo'].ansible_pos, ('myfile.yml', 1, 9))
+ self.assertEqual(data[0][u'baz'].ansible_pos, ('myfile.yml', 2, 9))
+
+
+class TestAnsibleLoaderPlay(unittest.TestCase):
+
+ def setUp(self):
+ stream = StringIO("""
+ - hosts: localhost
+ vars:
+ number: 1
+ string: Ansible
+ utf8_string: Cafè Eñyei
+ dictionary:
+ webster: daniel
+ oed: oxford
+ list:
+ - a
+ - b
+ - 1
+ - 2
+ tasks:
+ - name: Test case
+ ping:
+ data: "{{ utf8_string }}"
+
+ - name: Test 2
+ ping:
+ data: "Cafè Eñyei"
+
+ - name: Test 3
+ command: "printf 'Cafè Eñyei\\n'"
+ """)
+ self.play_filename = '/path/to/myplay.yml'
+ stream.name = self.play_filename
+ self.loader = AnsibleLoader(stream)
+ self.data = self.loader.get_single_data()
+
+ def tearDown(self):
+ pass
+
+ def test_data_complete(self):
+ self.assertEqual(len(self.data), 1)
+ self.assertIsInstance(self.data, list)
+ self.assertEqual(frozenset(self.data[0].keys()), frozenset((u'hosts', u'vars', u'tasks')))
+
+ self.assertEqual(self.data[0][u'hosts'], u'localhost')
+
+ self.assertEqual(self.data[0][u'vars'][u'number'], 1)
+ self.assertEqual(self.data[0][u'vars'][u'string'], u'Ansible')
+ self.assertEqual(self.data[0][u'vars'][u'utf8_string'], u'Cafè Eñyei')
+ self.assertEqual(self.data[0][u'vars'][u'dictionary'],
+ {u'webster': u'daniel',
+ u'oed': u'oxford'})
+ self.assertEqual(self.data[0][u'vars'][u'list'], [u'a', u'b', 1, 2])
+
+ self.assertEqual(self.data[0][u'tasks'],
+ [{u'name': u'Test case', u'ping': {u'data': u'{{ utf8_string }}'}},
+ {u'name': u'Test 2', u'ping': {u'data': u'Cafè Eñyei'}},
+ {u'name': u'Test 3', u'command': u'printf \'Cafè Eñyei\n\''},
+ ])
+
+ def walk(self, data):
+ # Make sure there's no str in the data
+ self.assertNotIsInstance(data, binary_type)
+
+ # Descend into various container types
+ if isinstance(data, text_type):
+ # strings are a sequence so we have to be explicit here
+ return
+ elif isinstance(data, (Sequence, Set)):
+ for element in data:
+ self.walk(element)
+ elif isinstance(data, Mapping):
+ for k, v in data.items():
+ self.walk(k)
+ self.walk(v)
+
+ # Scalars were all checked so we're good to go
+ return
+
+ def test_no_str_in_data(self):
+ # Checks that no strings are str type
+ self.walk(self.data)
+
+ def check_vars(self):
+ # Numbers don't have line/col information yet
+ #self.assertEqual(self.data[0][u'vars'][u'number'].ansible_pos, (self.play_filename, 4, 21))
+
+ self.assertEqual(self.data[0][u'vars'][u'string'].ansible_pos, (self.play_filename, 5, 29))
+ self.assertEqual(self.data[0][u'vars'][u'utf8_string'].ansible_pos, (self.play_filename, 6, 34))
+
+ self.assertEqual(self.data[0][u'vars'][u'dictionary'].ansible_pos, (self.play_filename, 8, 23))
+ self.assertEqual(self.data[0][u'vars'][u'dictionary'][u'webster'].ansible_pos, (self.play_filename, 8, 32))
+ self.assertEqual(self.data[0][u'vars'][u'dictionary'][u'oed'].ansible_pos, (self.play_filename, 9, 28))
+
+ self.assertEqual(self.data[0][u'vars'][u'list'].ansible_pos, (self.play_filename, 11, 23))
+ self.assertEqual(self.data[0][u'vars'][u'list'][0].ansible_pos, (self.play_filename, 11, 25))
+ self.assertEqual(self.data[0][u'vars'][u'list'][1].ansible_pos, (self.play_filename, 12, 25))
+ # Numbers don't have line/col info yet
+ #self.assertEqual(self.data[0][u'vars'][u'list'][2].ansible_pos, (self.play_filename, 13, 25))
+ #self.assertEqual(self.data[0][u'vars'][u'list'][3].ansible_pos, (self.play_filename, 14, 25))
+
+ def check_tasks(self):
+ #
+ # First Task
+ #
+ self.assertEqual(self.data[0][u'tasks'][0].ansible_pos, (self.play_filename, 16, 23))
+ self.assertEqual(self.data[0][u'tasks'][0][u'name'].ansible_pos, (self.play_filename, 16, 29))
+ self.assertEqual(self.data[0][u'tasks'][0][u'ping'].ansible_pos, (self.play_filename, 18, 25))
+ self.assertEqual(self.data[0][u'tasks'][0][u'ping'][u'data'].ansible_pos, (self.play_filename, 18, 31))
+
+ #
+ # Second Task
+ #
+ self.assertEqual(self.data[0][u'tasks'][1].ansible_pos, (self.play_filename, 20, 23))
+ self.assertEqual(self.data[0][u'tasks'][1][u'name'].ansible_pos, (self.play_filename, 20, 29))
+ self.assertEqual(self.data[0][u'tasks'][1][u'ping'].ansible_pos, (self.play_filename, 22, 25))
+ self.assertEqual(self.data[0][u'tasks'][1][u'ping'][u'data'].ansible_pos, (self.play_filename, 22, 31))
+
+ #
+ # Third Task
+ #
+ self.assertEqual(self.data[0][u'tasks'][2].ansible_pos, (self.play_filename, 24, 23))
+ self.assertEqual(self.data[0][u'tasks'][2][u'name'].ansible_pos, (self.play_filename, 24, 29))
+ self.assertEqual(self.data[0][u'tasks'][2][u'command'].ansible_pos, (self.play_filename, 25, 32))
+
+ def test_line_numbers(self):
+ # Check the line/column numbers are correct
+ # Note: Remember, currently dicts begin at the start of their first entry
+ self.assertEqual(self.data[0].ansible_pos, (self.play_filename, 2, 19))
+ self.assertEqual(self.data[0][u'hosts'].ansible_pos, (self.play_filename, 2, 26))
+ self.assertEqual(self.data[0][u'vars'].ansible_pos, (self.play_filename, 4, 21))
+
+ self.check_vars()
+
+ self.assertEqual(self.data[0][u'tasks'].ansible_pos, (self.play_filename, 16, 21))
+
+ self.check_tasks()
diff --git a/test/units/playbook/__init__.py b/test/units/playbook/__init__.py
new file mode 100644
index 0000000000..785fc45992
--- /dev/null
+++ b/test/units/playbook/__init__.py
@@ -0,0 +1,21 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
diff --git a/test/units/playbook/test_block.py b/test/units/playbook/test_block.py
new file mode 100644
index 0000000000..348681527b
--- /dev/null
+++ b/test/units/playbook/test_block.py
@@ -0,0 +1,77 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from ansible.playbook.block import Block
+from ansible.playbook.task import Task
+from ansible.compat.tests import unittest
+
+class TestBlock(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_construct_empty_block(self):
+ b = Block()
+
+ def test_construct_block_with_role(self):
+ pass
+
+ def test_load_block_simple(self):
+ ds = dict(
+ block = [],
+ rescue = [],
+ always = [],
+ #otherwise = [],
+ )
+ b = Block.load(ds)
+ self.assertEqual(b.block, [])
+ self.assertEqual(b.rescue, [])
+ self.assertEqual(b.always, [])
+ # not currently used
+ #self.assertEqual(b.otherwise, [])
+
+ def test_load_block_with_tasks(self):
+ ds = dict(
+ block = [dict(action='block')],
+ rescue = [dict(action='rescue')],
+ always = [dict(action='always')],
+ #otherwise = [dict(action='otherwise')],
+ )
+ b = Block.load(ds)
+ self.assertEqual(len(b.block), 1)
+ assert isinstance(b.block[0], Task)
+ self.assertEqual(len(b.rescue), 1)
+ assert isinstance(b.rescue[0], Task)
+ self.assertEqual(len(b.always), 1)
+ assert isinstance(b.always[0], Task)
+ # not currently used
+ #self.assertEqual(len(b.otherwise), 1)
+ #assert isinstance(b.otherwise[0], Task)
+
+ def test_load_implicit_block(self):
+ ds = [dict(action='foo')]
+ b = Block.load(ds)
+ self.assertEqual(len(b.block), 1)
+ assert isinstance(b.block[0], Task)
+
diff --git a/test/units/playbook/test_play.py b/test/units/playbook/test_play.py
new file mode 100644
index 0000000000..22486f4129
--- /dev/null
+++ b/test/units/playbook/test_play.py
@@ -0,0 +1,132 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from ansible.compat.tests import unittest
+from ansible.compat.tests.mock import patch, MagicMock
+
+from ansible.errors import AnsibleError, AnsibleParserError
+from ansible.playbook.play import Play
+from ansible.playbook.role import Role
+from ansible.playbook.task import Task
+
+from test.mock.loader import DictDataLoader
+
+class TestPlay(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_empty_play(self):
+ p = Play.load(dict())
+ self.assertEqual(str(p), "PLAY: <no name specified>")
+
+ def test_basic_play(self):
+ p = Play.load(dict(
+ name="test play",
+ hosts=['foo'],
+ gather_facts=False,
+ connection='local',
+ remote_user="root",
+ sudo=True,
+ sudo_user="testing",
+ ))
+
+ def test_play_with_user_conflict(self):
+ p = Play.load(dict(
+ name="test play",
+ hosts=['foo'],
+ user="testing",
+ gather_facts=False,
+ ))
+ self.assertEqual(p.remote_user, "testing")
+
+ def test_play_with_user_conflict(self):
+ play_data = dict(
+ name="test play",
+ hosts=['foo'],
+ user="testing",
+ remote_user="testing",
+ )
+ self.assertRaises(AnsibleParserError, Play.load, play_data)
+
+ def test_play_with_tasks(self):
+ p = Play.load(dict(
+ name="test play",
+ hosts=['foo'],
+ gather_facts=False,
+ tasks=[dict(action='shell echo "hello world"')],
+ ))
+
+ def test_play_with_handlers(self):
+ p = Play.load(dict(
+ name="test play",
+ hosts=['foo'],
+ gather_facts=False,
+ handlers=[dict(action='shell echo "hello world"')],
+ ))
+
+ def test_play_with_pre_tasks(self):
+ p = Play.load(dict(
+ name="test play",
+ hosts=['foo'],
+ gather_facts=False,
+ pre_tasks=[dict(action='shell echo "hello world"')],
+ ))
+
+ def test_play_with_post_tasks(self):
+ p = Play.load(dict(
+ name="test play",
+ hosts=['foo'],
+ gather_facts=False,
+ post_tasks=[dict(action='shell echo "hello world"')],
+ ))
+
+ def test_play_with_roles(self):
+ fake_loader = DictDataLoader({
+ '/etc/ansible/roles/foo/tasks.yml': """
+ - name: role task
+ shell: echo "hello world"
+ """,
+ })
+
+ p = Play.load(dict(
+ name="test play",
+ hosts=['foo'],
+ gather_facts=False,
+ roles=['foo'],
+ ), loader=fake_loader)
+
+ tasks = p.compile()
+
+ def test_play_compile(self):
+ p = Play.load(dict(
+ name="test play",
+ hosts=['foo'],
+ gather_facts=False,
+ tasks=[dict(action='shell echo "hello world"')],
+ ))
+
+ tasks = p.compile()
+ self.assertEqual(len(tasks), 1)
+ self.assertIsInstance(tasks[0], Task)
diff --git a/test/units/playbook/test_playbook.py b/test/units/playbook/test_playbook.py
new file mode 100644
index 0000000000..dfb52dc7b1
--- /dev/null
+++ b/test/units/playbook/test_playbook.py
@@ -0,0 +1,69 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from ansible.compat.tests import unittest
+from ansible.compat.tests.mock import patch, MagicMock
+
+from ansible.errors import AnsibleError, AnsibleParserError
+from ansible.playbook import Playbook
+from ansible.vars import VariableManager
+
+from test.mock.loader import DictDataLoader
+
+class TestPlaybook(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_empty_playbook(self):
+ fake_loader = DictDataLoader({})
+ p = Playbook(loader=fake_loader)
+
+ def test_basic_playbook(self):
+ fake_loader = DictDataLoader({
+ "test_file.yml":"""
+ - hosts: all
+ """,
+ })
+ p = Playbook.load("test_file.yml", loader=fake_loader)
+ plays = p.get_plays()
+
+ def test_bad_playbook_files(self):
+ fake_loader = DictDataLoader({
+ # represents a playbook which is not a list of plays
+ "bad_list.yml": """
+ foo: bar
+
+ """,
+ # represents a playbook where a play entry is mis-formatted
+ "bad_entry.yml": """
+ -
+ - "This should be a mapping..."
+
+ """,
+ })
+ vm = VariableManager()
+ self.assertRaises(AnsibleParserError, Playbook.load, "bad_list.yml", vm, fake_loader)
+ self.assertRaises(AnsibleParserError, Playbook.load, "bad_entry.yml", vm, fake_loader)
+
diff --git a/test/units/playbook/test_role.py b/test/units/playbook/test_role.py
new file mode 100644
index 0000000000..d0f3708898
--- /dev/null
+++ b/test/units/playbook/test_role.py
@@ -0,0 +1,167 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from ansible.compat.tests import unittest
+from ansible.compat.tests.mock import patch, MagicMock
+
+from ansible.errors import AnsibleError, AnsibleParserError
+from ansible.playbook.block import Block
+from ansible.playbook.role import Role
+from ansible.playbook.role.include import RoleInclude
+from ansible.playbook.task import Task
+
+from test.mock.loader import DictDataLoader
+
+class TestRole(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_load_role_with_tasks(self):
+
+ fake_loader = DictDataLoader({
+ "/etc/ansible/roles/foo/tasks/main.yml": """
+ - shell: echo 'hello world'
+ """,
+ })
+
+ i = RoleInclude.load('foo', loader=fake_loader)
+ r = Role.load(i)
+
+ self.assertEqual(str(r), 'foo')
+ self.assertEqual(len(r._task_blocks), 1)
+ assert isinstance(r._task_blocks[0], Block)
+
+ def test_load_role_with_handlers(self):
+
+ fake_loader = DictDataLoader({
+ "/etc/ansible/roles/foo/handlers/main.yml": """
+ - name: test handler
+ shell: echo 'hello world'
+ """,
+ })
+
+ i = RoleInclude.load('foo', loader=fake_loader)
+ r = Role.load(i)
+
+ self.assertEqual(len(r._handler_blocks), 1)
+ assert isinstance(r._handler_blocks[0], Block)
+
+ def test_load_role_with_vars(self):
+
+ fake_loader = DictDataLoader({
+ "/etc/ansible/roles/foo/defaults/main.yml": """
+ foo: bar
+ """,
+ "/etc/ansible/roles/foo/vars/main.yml": """
+ foo: bam
+ """,
+ })
+
+ i = RoleInclude.load('foo', loader=fake_loader)
+ r = Role.load(i)
+
+ self.assertEqual(r._default_vars, dict(foo='bar'))
+ self.assertEqual(r._role_vars, dict(foo='bam'))
+
+ def test_load_role_with_metadata(self):
+
+ fake_loader = DictDataLoader({
+ '/etc/ansible/roles/foo/meta/main.yml': """
+ allow_duplicates: true
+ dependencies:
+ - bar
+ galaxy_info:
+ a: 1
+ b: 2
+ c: 3
+ """,
+ '/etc/ansible/roles/bar/meta/main.yml': """
+ dependencies:
+ - baz
+ """,
+ '/etc/ansible/roles/baz/meta/main.yml': """
+ dependencies:
+ - bam
+ """,
+ '/etc/ansible/roles/bam/meta/main.yml': """
+ dependencies: []
+ """,
+ '/etc/ansible/roles/bad1/meta/main.yml': """
+ 1
+ """,
+ '/etc/ansible/roles/bad2/meta/main.yml': """
+ foo: bar
+ """,
+ '/etc/ansible/roles/recursive1/meta/main.yml': """
+ dependencies: ['recursive2']
+ """,
+ '/etc/ansible/roles/recursive2/meta/main.yml': """
+ dependencies: ['recursive1']
+ """,
+ })
+
+ i = RoleInclude.load('foo', loader=fake_loader)
+ r = Role.load(i)
+
+ role_deps = r.get_direct_dependencies()
+
+ self.assertEqual(len(role_deps), 1)
+ self.assertEqual(type(role_deps[0]), Role)
+ self.assertEqual(len(role_deps[0].get_parents()), 1)
+ self.assertEqual(role_deps[0].get_parents()[0], r)
+ self.assertEqual(r._metadata.allow_duplicates, True)
+ self.assertEqual(r._metadata.galaxy_info, dict(a=1, b=2, c=3))
+
+ all_deps = r.get_all_dependencies()
+ self.assertEqual(len(all_deps), 3)
+ self.assertEqual(all_deps[0].get_name(), 'bar')
+ self.assertEqual(all_deps[1].get_name(), 'baz')
+ self.assertEqual(all_deps[2].get_name(), 'bam')
+
+ i = RoleInclude.load('bad1', loader=fake_loader)
+ self.assertRaises(AnsibleParserError, Role.load, i)
+
+ i = RoleInclude.load('bad2', loader=fake_loader)
+ self.assertRaises(AnsibleParserError, Role.load, i)
+
+ i = RoleInclude.load('recursive1', loader=fake_loader)
+ self.assertRaises(AnsibleError, Role.load, i)
+
+ def test_load_role_complex(self):
+
+ # FIXME: add tests for the more complex uses of
+ # params and tags/when statements
+
+ fake_loader = DictDataLoader({
+ "/etc/ansible/roles/foo/tasks/main.yml": """
+ - shell: echo 'hello world'
+ """,
+ })
+
+ i = RoleInclude.load(dict(role='foo'), loader=fake_loader)
+ r = Role.load(i)
+
+ self.assertEqual(r.get_name(), "foo")
+
diff --git a/test/units/playbook/test_task.py b/test/units/playbook/test_task.py
new file mode 100644
index 0000000000..b2160e0dd2
--- /dev/null
+++ b/test/units/playbook/test_task.py
@@ -0,0 +1,87 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from ansible.playbook.task import Task
+from ansible.compat.tests import unittest
+
+basic_shell_task = dict(
+ name = 'Test Task',
+ shell = 'echo hi'
+)
+
+kv_shell_task = dict(
+ action = 'shell echo hi'
+)
+
+class TestTask(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_construct_empty_task(self):
+ t = Task()
+
+ def test_construct_task_with_role(self):
+ pass
+
+ def test_construct_task_with_block(self):
+ pass
+
+ def test_construct_task_with_role_and_block(self):
+ pass
+
+ def test_load_task_simple(self):
+ t = Task.load(basic_shell_task)
+ assert t is not None
+ self.assertEqual(t.name, basic_shell_task['name'])
+ self.assertEqual(t.action, 'command')
+ self.assertEqual(t.args, dict(_raw_params='echo hi', _uses_shell=True))
+
+ def test_load_task_kv_form(self):
+ t = Task.load(kv_shell_task)
+ self.assertEqual(t.action, 'command')
+ self.assertEqual(t.args, dict(_raw_params='echo hi', _uses_shell=True))
+
+ def test_task_auto_name(self):
+ assert 'name' not in kv_shell_task
+ t = Task.load(kv_shell_task)
+ #self.assertEqual(t.name, 'shell echo hi')
+
+ def test_task_auto_name_with_role(self):
+ pass
+
+ def test_load_task_complex_form(self):
+ pass
+
+ def test_can_load_module_complex_form(self):
+ pass
+
+ def test_local_action_implies_delegate(self):
+ pass
+
+ def test_local_action_conflicts_with_delegate(self):
+ pass
+
+ def test_delegate_to_parses(self):
+ pass
diff --git a/test/units/plugins/__init__.py b/test/units/plugins/__init__.py
new file mode 100644
index 0000000000..785fc45992
--- /dev/null
+++ b/test/units/plugins/__init__.py
@@ -0,0 +1,21 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
diff --git a/test/units/plugins/test_cache.py b/test/units/plugins/test_cache.py
new file mode 100644
index 0000000000..f3cfe6a38c
--- /dev/null
+++ b/test/units/plugins/test_cache.py
@@ -0,0 +1,100 @@
+# (c) 2012-2015, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from ansible.compat.tests import unittest
+from ansible.plugins.cache.base import BaseCacheModule
+from ansible.plugins.cache.memory import CacheModule as MemoryCache
+
+HAVE_MEMCACHED = True
+try:
+ import memcached
+except ImportError:
+ HAVE_MEMCACHED = False
+else:
+ # Use an else so that the only reason we skip this is for lack of
+ # memcached, not errors importing the plugin
+ from ansible.plugins.cache.memcached import CacheModule as MemcachedCache
+
+HAVE_REDIS = True
+try:
+ import redis
+except ImportError:
+ HAVE_REDIS = False
+else:
+ from ansible.plugins.cache.redis import CacheModule as RedisCache
+
+
+class TestAbstractClass(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_subclass_error(self):
+ class CacheModule1(BaseCacheModule):
+ pass
+ with self.assertRaises(TypeError):
+ CacheModule1()
+
+ class CacheModule2(BaseCacheModule):
+ def get(self, key):
+ super(CacheModule2, self).get(key)
+
+ with self.assertRaises(TypeError):
+ CacheModule2()
+
+ def test_subclass_success(self):
+ class CacheModule3(BaseCacheModule):
+ def get(self, key):
+ super(CacheModule3, self).get(key)
+
+ def set(self, key, value):
+ super(CacheModule3, self).set(key, value)
+
+ def keys(self):
+ super(CacheModule3, self).keys()
+
+ def contains(self, key):
+ super(CacheModule3, self).contains(key)
+
+ def delete(self, key):
+ super(CacheModule3, self).delete(key)
+
+ def flush(self):
+ super(CacheModule3, self).flush()
+
+ def copy(self):
+ super(CacheModule3, self).copy()
+
+ self.assertIsInstance(CacheModule3(), CacheModule3)
+
+ @unittest.skipUnless(HAVE_MEMCACHED, 'python-memcached module not installed')
+ def test_memcached_cachemodule(self):
+ self.assertIsInstance(MemcachedCache(), MemcachedCache)
+
+ def test_memory_cachemodule(self):
+ self.assertIsInstance(MemoryCache(), MemoryCache)
+
+ @unittest.skipUnless(HAVE_REDIS, 'Redis pyhton module not installed')
+ def test_redis_cachemodule(self):
+ self.assertIsInstance(RedisCache(), RedisCache)
diff --git a/test/units/plugins/test_connection.py b/test/units/plugins/test_connection.py
new file mode 100644
index 0000000000..0ed888ac95
--- /dev/null
+++ b/test/units/plugins/test_connection.py
@@ -0,0 +1,102 @@
+# (c) 2015, Toshio Kuratomi <tkuratomi@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from six import StringIO
+
+from ansible.compat.tests import unittest
+from ansible.executor.connection_info import ConnectionInformation
+
+from ansible.plugins.connections import ConnectionBase
+#from ansible.plugins.connections.accelerate import Connection as AccelerateConnection
+#from ansible.plugins.connections.chroot import Connection as ChrootConnection
+#from ansible.plugins.connections.funcd import Connection as FuncdConnection
+#from ansible.plugins.connections.jail import Connection as JailConnection
+#from ansible.plugins.connections.libvirt_lxc import Connection as LibvirtLXCConnection
+from ansible.plugins.connections.local import Connection as LocalConnection
+from ansible.plugins.connections.paramiko_ssh import Connection as ParamikoConnection
+from ansible.plugins.connections.ssh import Connection as SSHConnection
+#from ansible.plugins.connections.winrm import Connection as WinRmConnection
+
+class TestConnectionBaseClass(unittest.TestCase):
+
+ def setUp(self):
+ self.conn_info = ConnectionInformation()
+ self.in_stream = StringIO()
+
+ def tearDown(self):
+ pass
+
+ def test_subclass_error(self):
+ class ConnectionModule1(ConnectionBase):
+ pass
+ with self.assertRaises(TypeError):
+ ConnectionModule1()
+
+ class ConnectionModule2(ConnectionBase):
+ def get(self, key):
+ super(ConnectionModule2, self).get(key)
+
+ with self.assertRaises(TypeError):
+ ConnectionModule2()
+
+ def test_subclass_success(self):
+ class ConnectionModule3(ConnectionBase):
+ @property
+ def transport(self):
+ pass
+ def _connect(self):
+ pass
+ def exec_command(self):
+ pass
+ def put_file(self):
+ pass
+ def fetch_file(self):
+ pass
+ def close(self):
+ pass
+ self.assertIsInstance(ConnectionModule3(self.conn_info, self.in_stream), ConnectionModule3)
+
+# def test_accelerate_connection_module(self):
+# self.assertIsInstance(AccelerateConnection(), AccelerateConnection)
+#
+# def test_chroot_connection_module(self):
+# self.assertIsInstance(ChrootConnection(), ChrootConnection)
+#
+# def test_funcd_connection_module(self):
+# self.assertIsInstance(FuncdConnection(), FuncdConnection)
+#
+# def test_jail_connection_module(self):
+# self.assertIsInstance(JailConnection(), JailConnection)
+#
+# def test_libvirt_lxc_connection_module(self):
+# self.assertIsInstance(LibvirtLXCConnection(), LibvirtLXCConnection)
+
+ def test_local_connection_module(self):
+ self.assertIsInstance(LocalConnection(self.conn_info, self.in_stream), LocalConnection)
+
+ def test_paramiko_connection_module(self):
+ self.assertIsInstance(ParamikoConnection(self.conn_info, self.in_stream), ParamikoConnection)
+
+ def test_ssh_connection_module(self):
+ self.assertIsInstance(SSHConnection(self.conn_info, self.in_stream), SSHConnection)
+
+# def test_winrm_connection_module(self):
+# self.assertIsInstance(WinRmConnection(), WinRmConnection)
diff --git a/test/units/plugins/test_plugins.py b/test/units/plugins/test_plugins.py
new file mode 100644
index 0000000000..0d0fe400d0
--- /dev/null
+++ b/test/units/plugins/test_plugins.py
@@ -0,0 +1,77 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+import os
+from ansible.compat.tests import unittest
+from ansible.compat.tests import BUILTINS
+
+from ansible.compat.tests.mock import mock_open, patch, MagicMock
+
+from ansible.plugins import MODULE_CACHE, PATH_CACHE, PLUGIN_PATH_CACHE, _basedirs, push_basedir, PluginLoader
+
+class TestErrors(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ @patch.object(PluginLoader, '_get_paths')
+ def test_print_paths(self, mock_method):
+ mock_method.return_value = ['/path/one', '/path/two', '/path/three']
+ pl = PluginLoader('foo', 'foo', '', 'test_plugins')
+ paths = pl.print_paths()
+ expected_paths = os.pathsep.join(['/path/one', '/path/two', '/path/three'])
+ self.assertEqual(paths, expected_paths)
+
+ def test_plugins__get_package_paths_no_package(self):
+ pl = PluginLoader('test', '', 'test', 'test_plugin')
+ self.assertEqual(pl._get_package_paths(), [])
+
+ def test_plugins__get_package_paths_with_package(self):
+ # the _get_package_paths() call uses __import__ to load a
+ # python library, and then uses the __file__ attribute of
+ # the result for that to get the library path, so we mock
+ # that here and patch the builtin to use our mocked result
+ m = MagicMock()
+ m.return_value.__file__ = '/path/to/my/test.py'
+ pl = PluginLoader('test', 'foo.bar.bam', 'test', 'test_plugin')
+ with patch('{0}.__import__'.format(BUILTINS), m):
+ self.assertEqual(pl._get_package_paths(), ['/path/to/my/bar/bam'])
+
+ def test_plugins__get_paths(self):
+ pl = PluginLoader('test', '', 'test', 'test_plugin')
+ pl._paths = ['/path/one', '/path/two']
+ self.assertEqual(pl._get_paths(), ['/path/one', '/path/two'])
+
+ # NOT YET WORKING
+ #def fake_glob(path):
+ # if path == 'test/*':
+ # return ['test/foo', 'test/bar', 'test/bam']
+ # elif path == 'test/*/*'
+ #m._paths = None
+ #mock_glob = MagicMock()
+ #mock_glob.return_value = []
+ #with patch('glob.glob', mock_glob):
+ # pass
+
diff --git a/test/units/vars/__init__.py b/test/units/vars/__init__.py
new file mode 100644
index 0000000000..785fc45992
--- /dev/null
+++ b/test/units/vars/__init__.py
@@ -0,0 +1,21 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
diff --git a/test/units/vars/test_variable_manager.py b/test/units/vars/test_variable_manager.py
new file mode 100644
index 0000000000..f8d815eb6f
--- /dev/null
+++ b/test/units/vars/test_variable_manager.py
@@ -0,0 +1,144 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from ansible.compat.tests import unittest
+from ansible.compat.tests.mock import patch, MagicMock
+
+from ansible.vars import VariableManager
+
+from test.mock.loader import DictDataLoader
+
+class TestVariableManager(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_basic_manager(self):
+ fake_loader = DictDataLoader({})
+
+ v = VariableManager()
+ self.assertEqual(v.get_vars(loader=fake_loader), dict())
+
+ self.assertEqual(
+ v._merge_dicts(
+ dict(a=1),
+ dict(b=2)
+ ), dict(a=1, b=2)
+ )
+ self.assertEqual(
+ v._merge_dicts(
+ dict(a=1, c=dict(foo='bar')),
+ dict(b=2, c=dict(baz='bam'))
+ ), dict(a=1, b=2, c=dict(foo='bar', baz='bam'))
+ )
+
+
+ def test_variable_manager_extra_vars(self):
+ fake_loader = DictDataLoader({})
+
+ extra_vars = dict(a=1, b=2, c=3)
+ v = VariableManager()
+ v.set_extra_vars(extra_vars)
+
+ for (key, val) in extra_vars.iteritems():
+ self.assertEqual(v.get_vars(loader=fake_loader).get(key), val)
+ self.assertIsNot(v.extra_vars.get(key), val)
+
+ def test_variable_manager_host_vars_file(self):
+ fake_loader = DictDataLoader({
+ "host_vars/hostname1.yml": """
+ foo: bar
+ """
+ })
+
+ v = VariableManager()
+ v.add_host_vars_file("host_vars/hostname1.yml", loader=fake_loader)
+ self.assertIn("hostname1", v._host_vars_files)
+ self.assertEqual(v._host_vars_files["hostname1"], dict(foo="bar"))
+
+ mock_host = MagicMock()
+ mock_host.get_name.return_value = "hostname1"
+ mock_host.get_vars.return_value = dict()
+ mock_host.get_groups.return_value = ()
+
+ self.assertEqual(v.get_vars(loader=fake_loader, host=mock_host).get("foo"), "bar")
+
+ def test_variable_manager_group_vars_file(self):
+ fake_loader = DictDataLoader({
+ "group_vars/somegroup.yml": """
+ foo: bar
+ """
+ })
+
+ v = VariableManager()
+ v.add_group_vars_file("group_vars/somegroup.yml", loader=fake_loader)
+ self.assertIn("somegroup", v._group_vars_files)
+ self.assertEqual(v._group_vars_files["somegroup"], dict(foo="bar"))
+
+ mock_group = MagicMock()
+ mock_group.name.return_value = "somegroup"
+ mock_group.get_ancestors.return_value = ()
+
+ mock_host = MagicMock()
+ mock_host.get_name.return_value = "hostname1"
+ mock_host.get_vars.return_value = dict()
+ mock_host.get_groups.return_value = (mock_group)
+
+ self.assertEqual(v.get_vars(loader=fake_loader, host=mock_host).get("foo"), "bar")
+
+ def test_variable_manager_play_vars(self):
+ fake_loader = DictDataLoader({})
+
+ mock_play = MagicMock()
+ mock_play.get_vars.return_value = dict(foo="bar")
+ mock_play.get_roles.return_value = []
+ mock_play.get_vars_files.return_value = []
+
+ v = VariableManager()
+ self.assertEqual(v.get_vars(loader=fake_loader, play=mock_play).get("foo"), "bar")
+
+ def test_variable_manager_play_vars_files(self):
+ fake_loader = DictDataLoader({
+ "/path/to/somefile.yml": """
+ foo: bar
+ """
+ })
+
+ mock_play = MagicMock()
+ mock_play.get_vars.return_value = dict()
+ mock_play.get_roles.return_value = []
+ mock_play.get_vars_files.return_value = ['/path/to/somefile.yml']
+
+ v = VariableManager()
+ self.assertEqual(v.get_vars(loader=fake_loader, play=mock_play).get("foo"), "bar")
+
+ def test_variable_manager_task_vars(self):
+ fake_loader = DictDataLoader({})
+
+ mock_task = MagicMock()
+ mock_task.get_vars.return_value = dict(foo="bar")
+
+ v = VariableManager()
+ self.assertEqual(v.get_vars(loader=fake_loader, task=mock_task).get("foo"), "bar")
+
diff --git a/test/units/vault_test_data/foo-ansible-1.0.yml b/test/units/vault_test_data/foo-ansible-1.0.yml
deleted file mode 100644
index f71ddf10ce..0000000000
--- a/test/units/vault_test_data/foo-ansible-1.0.yml
+++ /dev/null
@@ -1,4 +0,0 @@
-$ANSIBLE_VAULT;1.0;AES
-53616c7465645f5fd0026926a2d415a28a2622116273fbc90e377225c12a347e1daf4456d36a77f9
-9ad98d59f61d06a4b66718d855f16fb7bdfe54d1ec8aeaa4d06c2dc1fa630ae1846a029877f0eeb1
-83c62ffb04c2512995e815de4b4d29ed
diff --git a/test/units/vault_test_data/foo-ansible-1.1-ansible-newline-ansible.yml b/test/units/vault_test_data/foo-ansible-1.1-ansible-newline-ansible.yml
deleted file mode 100644
index 6e025a1c40..0000000000
--- a/test/units/vault_test_data/foo-ansible-1.1-ansible-newline-ansible.yml
+++ /dev/null
@@ -1,6 +0,0 @@
-$ANSIBLE_VAULT;1.1;AES256
-61333063333663376535373431643063613232393438623732643966613962363563383132363631
-3235363730623635323039623439343561313566313361630a313632643338613636303637623765
-64356531643630303636323064336439393335313836366235336464633635376339663830333232
-6338353337663139320a646632386131646431656165656338633535386535623236393265373634
-37656134633661333935346434363237613435323865356234323264663838643931
diff --git a/test/units/vault_test_data/foo-ansible-1.1.yml b/test/units/vault_test_data/foo-ansible-1.1.yml
deleted file mode 100644
index d9a4a448a6..0000000000
--- a/test/units/vault_test_data/foo-ansible-1.1.yml
+++ /dev/null
@@ -1,6 +0,0 @@
-$ANSIBLE_VAULT;1.1;AES256
-62303130653266653331306264616235333735323636616539316433666463323964623162386137
-3961616263373033353631316333623566303532663065310a393036623466376263393961326530
-64336561613965383835646464623865663966323464653236343638373165343863623638316664
-3631633031323837340a396530313963373030343933616133393566366137363761373930663833
-3739