From e87d147244a349aced2704741aa354c62a783b5d Mon Sep 17 00:00:00 2001 From: Matt Clay Date: Tue, 6 Dec 2016 23:07:58 -0500 Subject: Relocate module unit tests. (#18812) * Relocate module unit tests. * Fix classification of unit test changes. --- test/runner/lib/classification.py | 16 +- test/units/modules/cloud/__init__.py | 0 test/units/modules/cloud/amazon/__init__.py | 0 .../cloud/amazon/test_ec2_vpc_nat_gateway.py | 489 +++++++++++++++++++++ .../modules/cloud/amazon/test_kinesis_stream.py | 287 ++++++++++++ test/units/modules/cloud/docker/__init__.py | 0 test/units/modules/cloud/docker/test_docker.py | 19 + test/units/modules/cloud/openstack/__init__.py | 0 .../modules/cloud/openstack/test_os_server.py | 223 ++++++++++ test/units/modules/core/__init__.py | 0 test/units/modules/core/cloud/__init__.py | 0 test/units/modules/core/cloud/docker/__init__.py | 0 .../units/modules/core/cloud/docker/test_docker.py | 19 - .../units/modules/core/cloud/openstack/__init__.py | 0 .../modules/core/cloud/openstack/test_os_server.py | 223 ---------- test/units/modules/core/packaging/__init__.py | 0 test/units/modules/core/packaging/os/__init__.py | 0 test/units/modules/core/packaging/os/test_apt.py | 50 --- test/units/modules/extras/__init__.py | 0 test/units/modules/extras/cloud/__init__.py | 0 test/units/modules/extras/cloud/amazon/__init__.py | 0 .../cloud/amazon/test_ec2_vpc_nat_gateway.py | 489 --------------------- .../extras/cloud/amazon/test_kinesis_stream.py | 287 ------------ test/units/modules/packaging/__init__.py | 0 test/units/modules/packaging/os/__init__.py | 0 test/units/modules/packaging/os/test_apt.py | 50 +++ 26 files changed, 1080 insertions(+), 1072 deletions(-) create mode 100644 test/units/modules/cloud/__init__.py create mode 100644 test/units/modules/cloud/amazon/__init__.py create mode 100644 test/units/modules/cloud/amazon/test_ec2_vpc_nat_gateway.py create mode 100644 test/units/modules/cloud/amazon/test_kinesis_stream.py create mode 100644 test/units/modules/cloud/docker/__init__.py create mode 100644 test/units/modules/cloud/docker/test_docker.py create mode 100644 test/units/modules/cloud/openstack/__init__.py create mode 100644 test/units/modules/cloud/openstack/test_os_server.py delete mode 100644 test/units/modules/core/__init__.py delete mode 100644 test/units/modules/core/cloud/__init__.py delete mode 100644 test/units/modules/core/cloud/docker/__init__.py delete mode 100644 test/units/modules/core/cloud/docker/test_docker.py delete mode 100644 test/units/modules/core/cloud/openstack/__init__.py delete mode 100644 test/units/modules/core/cloud/openstack/test_os_server.py delete mode 100644 test/units/modules/core/packaging/__init__.py delete mode 100644 test/units/modules/core/packaging/os/__init__.py delete mode 100644 test/units/modules/core/packaging/os/test_apt.py delete mode 100644 test/units/modules/extras/__init__.py delete mode 100644 test/units/modules/extras/cloud/__init__.py delete mode 100644 test/units/modules/extras/cloud/amazon/__init__.py delete mode 100644 test/units/modules/extras/cloud/amazon/test_ec2_vpc_nat_gateway.py delete mode 100644 test/units/modules/extras/cloud/amazon/test_kinesis_stream.py create mode 100644 test/units/modules/packaging/__init__.py create mode 100644 test/units/modules/packaging/os/__init__.py create mode 100644 test/units/modules/packaging/os/test_apt.py diff --git a/test/runner/lib/classification.py b/test/runner/lib/classification.py index a3cce21292..ee56e7853c 100644 --- a/test/runner/lib/classification.py +++ b/test/runner/lib/classification.py @@ -81,7 +81,7 @@ class PathMapper(object): self.compile_paths = set(t.path for t in self.compile_targets) self.units_modules = set(t.module for t in self.units_targets if t.module) - self.units_paths = set(t.path for t in self.units_targets) + self.units_paths = set(a for t in self.units_targets for a in t.aliases) self.sanity_paths = set(t.path for t in self.sanity_targets) self.module_names_by_path = dict((t.path, t.module) for t in self.module_targets) @@ -280,9 +280,17 @@ class PathMapper(object): 'units': path, } - return { - 'units': '%s/' % os.path.dirname(path), - } + # changes to files which are not unit tests should trigger tests from the nearest parent directory + + test_path = os.path.dirname(path) + + while test_path: + if test_path + '/' in self.units_paths: + return { + 'units': test_path + '/', + } + + test_path = os.path.dirname(test_path) if path.startswith('test/runner/'): return all_tests() # test infrastructure, run all tests diff --git a/test/units/modules/cloud/__init__.py b/test/units/modules/cloud/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/units/modules/cloud/amazon/__init__.py b/test/units/modules/cloud/amazon/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/units/modules/cloud/amazon/test_ec2_vpc_nat_gateway.py b/test/units/modules/cloud/amazon/test_ec2_vpc_nat_gateway.py new file mode 100644 index 0000000000..f274b70114 --- /dev/null +++ b/test/units/modules/cloud/amazon/test_ec2_vpc_nat_gateway.py @@ -0,0 +1,489 @@ +from nose.plugins.skip import SkipTest + +try: + import boto3 + import botocore + HAS_BOTO3 = True +except ImportError: + HAS_BOTO3 = False + +if not HAS_BOTO3: + raise SkipTest("test_ec2_vpc_nat_gateway.py requires the python module 'boto3' and 'botocore'") + +import unittest + +from collections import namedtuple +from ansible.parsing.dataloader import DataLoader +from ansible.vars import VariableManager +from ansible.inventory import Inventory +from ansible.playbook.play import Play +from ansible.executor.task_queue_manager import TaskQueueManager + +import ansible.modules.cloud.amazon.ec2_vpc_nat_gateway as ng + +Options = ( + namedtuple( + 'Options', [ + 'connection', 'module_path', 'forks', 'become', 'become_method', + 'become_user', 'remote_user', 'private_key_file', 'ssh_common_args', + 'sftp_extra_args', 'scp_extra_args', 'ssh_extra_args', 'verbosity', + 'check' + ] + ) +) +# initialize needed objects +variable_manager = VariableManager() +loader = DataLoader() +options = ( + Options( + connection='local', + module_path='cloud/amazon', + forks=1, become=None, become_method=None, become_user=None, check=True, + remote_user=None, private_key_file=None, ssh_common_args=None, + sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, + verbosity=3 + ) +) +passwords = dict(vault_pass='') + +aws_region = 'us-west-2' + +# create inventory and pass to var manager +inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='localhost') +variable_manager.set_inventory(inventory) + +def run(play): + tqm = None + results = None + try: + tqm = TaskQueueManager( + inventory=inventory, + variable_manager=variable_manager, + loader=loader, + options=options, + passwords=passwords, + stdout_callback='default', + ) + results = tqm.run(play) + finally: + if tqm is not None: + tqm.cleanup() + return tqm, results + +class AnsibleVpcNatGatewayTasks(unittest.TestCase): + + def test_create_gateway_using_allocation_id(self): + play_source = dict( + name = "Create new nat gateway with eip allocation-id", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='ec2_vpc_nat_gateway', + args=dict( + subnet_id='subnet-12345678', + allocation_id='eipalloc-12345678', + wait='yes', + region=aws_region, + ) + ), + register='nat_gateway', + ), + dict( + action=dict( + module='debug', + args=dict( + msg='{{nat_gateway}}' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 2) + self.failUnless(tqm._stats.changed['localhost'] == 1) + + def test_create_gateway_using_allocation_id_idempotent(self): + play_source = dict( + name = "Create new nat gateway with eip allocation-id", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='ec2_vpc_nat_gateway', + args=dict( + subnet_id='subnet-123456789', + allocation_id='eipalloc-1234567', + wait='yes', + region=aws_region, + ) + ), + register='nat_gateway', + ), + dict( + action=dict( + module='debug', + args=dict( + msg='{{nat_gateway}}' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 2) + self.assertFalse('localhost' in tqm._stats.changed) + + def test_create_gateway_using_eip_address(self): + play_source = dict( + name = "Create new nat gateway with eip address", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='ec2_vpc_nat_gateway', + args=dict( + subnet_id='subnet-12345678', + eip_address='55.55.55.55', + wait='yes', + region=aws_region, + ) + ), + register='nat_gateway', + ), + dict( + action=dict( + module='debug', + args=dict( + msg='{{nat_gateway}}' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 2) + self.failUnless(tqm._stats.changed['localhost'] == 1) + + def test_create_gateway_using_eip_address_idempotent(self): + play_source = dict( + name = "Create new nat gateway with eip address", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='ec2_vpc_nat_gateway', + args=dict( + subnet_id='subnet-123456789', + eip_address='55.55.55.55', + wait='yes', + region=aws_region, + ) + ), + register='nat_gateway', + ), + dict( + action=dict( + module='debug', + args=dict( + msg='{{nat_gateway}}' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 2) + self.assertFalse('localhost' in tqm._stats.changed) + + def test_create_gateway_in_subnet_only_if_one_does_not_exist_already(self): + play_source = dict( + name = "Create new nat gateway only if one does not exist already", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='ec2_vpc_nat_gateway', + args=dict( + if_exist_do_not_create='yes', + subnet_id='subnet-123456789', + wait='yes', + region=aws_region, + ) + ), + register='nat_gateway', + ), + dict( + action=dict( + module='debug', + args=dict( + msg='{{nat_gateway}}' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 2) + self.assertFalse('localhost' in tqm._stats.changed) + + def test_delete_gateway(self): + play_source = dict( + name = "Delete Nat Gateway", + hosts = 'localhost', + gather_facts = 'no', + tasks = [ + dict( + action=dict( + module='ec2_vpc_nat_gateway', + args=dict( + nat_gateway_id='nat-123456789', + state='absent', + wait='yes', + region=aws_region, + ) + ), + register='nat_gateway', + ), + dict( + action=dict( + module='debug', + args=dict( + msg='{{nat_gateway}}' + ) + ) + ) + ] + ) + play = Play().load(play_source, variable_manager=variable_manager, loader=loader) + tqm, results = run(play) + self.failUnless(tqm._stats.ok['localhost'] == 2) + self.assertTrue('localhost' in tqm._stats.changed) + +class AnsibleEc2VpcNatGatewayFunctions(unittest.TestCase): + + def test_convert_to_lower(self): + example = ng.DRY_RUN_GATEWAY_UNCONVERTED + converted_example = ng.convert_to_lower(example[0]) + keys = list(converted_example.keys()) + keys.sort() + for i in range(len(keys)): + if i == 0: + self.assertEqual(keys[i], 'create_time') + if i == 1: + self.assertEqual(keys[i], 'nat_gateway_addresses') + gw_addresses_keys = list(converted_example[keys[i]][0].keys()) + gw_addresses_keys.sort() + for j in range(len(gw_addresses_keys)): + if j == 0: + self.assertEqual(gw_addresses_keys[j], 'allocation_id') + if j == 1: + self.assertEqual(gw_addresses_keys[j], 'network_interface_id') + if j == 2: + self.assertEqual(gw_addresses_keys[j], 'private_ip') + if j == 3: + self.assertEqual(gw_addresses_keys[j], 'public_ip') + if i == 2: + self.assertEqual(keys[i], 'nat_gateway_id') + if i == 3: + self.assertEqual(keys[i], 'state') + if i == 4: + self.assertEqual(keys[i], 'subnet_id') + if i == 5: + self.assertEqual(keys[i], 'vpc_id') + + def test_get_nat_gateways(self): + client = boto3.client('ec2', region_name=aws_region) + success, err_msg, stream = ( + ng.get_nat_gateways(client, 'subnet-123456789', check_mode=True) + ) + should_return = ng.DRY_RUN_GATEWAYS + self.assertTrue(success) + self.assertEqual(stream, should_return) + + def test_get_nat_gateways_no_gateways_found(self): + client = boto3.client('ec2', region_name=aws_region) + success, err_msg, stream = ( + ng.get_nat_gateways(client, 'subnet-1234567', check_mode=True) + ) + self.assertTrue(success) + self.assertEqual(stream, []) + + def test_wait_for_status(self): + client = boto3.client('ec2', region_name=aws_region) + success, err_msg, gws = ( + ng.wait_for_status( + client, 5, 'nat-123456789', 'available', check_mode=True + ) + ) + should_return = ng.DRY_RUN_GATEWAYS[0] + self.assertTrue(success) + self.assertEqual(gws, should_return) + + def test_wait_for_status_to_timeout(self): + client = boto3.client('ec2', region_name=aws_region) + success, err_msg, gws = ( + ng.wait_for_status( + client, 2, 'nat-12345678', 'available', check_mode=True + ) + ) + self.assertFalse(success) + self.assertEqual(gws, {}) + + def test_gateway_in_subnet_exists_with_allocation_id(self): + client = boto3.client('ec2', region_name=aws_region) + gws, err_msg = ( + ng.gateway_in_subnet_exists( + client, 'subnet-123456789', 'eipalloc-1234567', check_mode=True + ) + ) + should_return = ng.DRY_RUN_GATEWAYS + self.assertEqual(gws, should_return) + + def test_gateway_in_subnet_exists_with_allocation_id_does_not_exist(self): + client = boto3.client('ec2', region_name=aws_region) + gws, err_msg = ( + ng.gateway_in_subnet_exists( + client, 'subnet-123456789', 'eipalloc-123', check_mode=True + ) + ) + should_return = list() + self.assertEqual(gws, should_return) + + def test_gateway_in_subnet_exists_without_allocation_id(self): + client = boto3.client('ec2', region_name=aws_region) + gws, err_msg = ( + ng.gateway_in_subnet_exists( + client, 'subnet-123456789', check_mode=True + ) + ) + should_return = ng.DRY_RUN_GATEWAYS + self.assertEqual(gws, should_return) + + def test_get_eip_allocation_id_by_address(self): + client = boto3.client('ec2', region_name=aws_region) + allocation_id, _ = ( + ng.get_eip_allocation_id_by_address( + client, '55.55.55.55', check_mode=True + ) + ) + should_return = 'eipalloc-1234567' + self.assertEqual(allocation_id, should_return) + + def test_get_eip_allocation_id_by_address_does_not_exist(self): + client = boto3.client('ec2', region_name=aws_region) + allocation_id, err_msg = ( + ng.get_eip_allocation_id_by_address( + client, '52.52.52.52', check_mode=True + ) + ) + self.assertEqual(err_msg, 'EIP 52.52.52.52 does not exist') + self.assertTrue(allocation_id is None) + + def test_allocate_eip_address(self): + client = boto3.client('ec2', region_name=aws_region) + success, err_msg, eip_id = ( + ng.allocate_eip_address( + client, check_mode=True + ) + ) + self.assertTrue(success) + + def test_release_address(self): + client = boto3.client('ec2', region_name=aws_region) + success, _ = ( + ng.release_address( + client, 'eipalloc-1234567', check_mode=True + ) + ) + self.assertTrue(success) + + def test_create(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, results = ( + ng.create( + client, 'subnet-123456', 'eipalloc-1234567', check_mode=True + ) + ) + self.assertTrue(success) + self.assertTrue(changed) + + def test_pre_create(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, results = ( + ng.pre_create( + client, 'subnet-123456', check_mode=True + ) + ) + self.assertTrue(success) + self.assertTrue(changed) + + def test_pre_create_idemptotent_with_allocation_id(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, results = ( + ng.pre_create( + client, 'subnet-123456789', allocation_id='eipalloc-1234567', check_mode=True + ) + ) + self.assertTrue(success) + self.assertFalse(changed) + + def test_pre_create_idemptotent_with_eip_address(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, results = ( + ng.pre_create( + client, 'subnet-123456789', eip_address='55.55.55.55', check_mode=True + ) + ) + self.assertTrue(success) + self.assertFalse(changed) + + def test_pre_create_idemptotent_if_exist_do_not_create(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, results = ( + ng.pre_create( + client, 'subnet-123456789', if_exist_do_not_create=True, check_mode=True + ) + ) + self.assertTrue(success) + self.assertFalse(changed) + + def test_delete(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, _ = ( + ng.remove( + client, 'nat-123456789', check_mode=True + ) + ) + self.assertTrue(success) + self.assertTrue(changed) + + def test_delete_and_release_ip(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, _ = ( + ng.remove( + client, 'nat-123456789', release_eip=True, check_mode=True + ) + ) + self.assertTrue(success) + self.assertTrue(changed) + + def test_delete_if_does_not_exist(self): + client = boto3.client('ec2', region_name=aws_region) + success, changed, err_msg, _ = ( + ng.remove( + client, 'nat-12345', check_mode=True + ) + ) + self.assertFalse(success) + self.assertFalse(changed) diff --git a/test/units/modules/cloud/amazon/test_kinesis_stream.py b/test/units/modules/cloud/amazon/test_kinesis_stream.py new file mode 100644 index 0000000000..efddfece7c --- /dev/null +++ b/test/units/modules/cloud/amazon/test_kinesis_stream.py @@ -0,0 +1,287 @@ +from nose.plugins.skip import SkipTest + +try: + import boto3 + import botocore + HAS_BOTO3 = True +except ImportError: + HAS_BOTO3 = False + +if not HAS_BOTO3: + raise SkipTest("test_kinesis_stream.py requires the python module 'boto3' and 'botocore'") + +import unittest + +import ansible.modules.cloud.amazon.kinesis_stream as kinesis_stream + +aws_region = 'us-west-2' + + +class AnsibleKinesisStreamFunctions(unittest.TestCase): + + def test_convert_to_lower(self): + example = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + converted_example = kinesis_stream.convert_to_lower(example) + keys = list(converted_example.keys()) + keys.sort() + for i in range(len(keys)): + if i == 0: + self.assertEqual(keys[i], 'has_more_shards') + if i == 1: + self.assertEqual(keys[i], 'retention_period_hours') + if i == 2: + self.assertEqual(keys[i], 'stream_arn') + if i == 3: + self.assertEqual(keys[i], 'stream_name') + if i == 4: + self.assertEqual(keys[i], 'stream_status') + + def test_make_tags_in_aws_format(self): + example = { + 'env': 'development' + } + should_return = [ + { + 'Key': 'env', + 'Value': 'development' + } + ] + aws_tags = kinesis_stream.make_tags_in_aws_format(example) + self.assertEqual(aws_tags, should_return) + + def test_make_tags_in_proper_format(self): + example = [ + { + 'Key': 'env', + 'Value': 'development' + }, + { + 'Key': 'service', + 'Value': 'web' + } + ] + should_return = { + 'env': 'development', + 'service': 'web' + } + proper_tags = kinesis_stream.make_tags_in_proper_format(example) + self.assertEqual(proper_tags, should_return) + + def test_recreate_tags_from_list(self): + example = [('environment', 'development'), ('service', 'web')] + should_return = [ + { + 'Key': 'environment', + 'Value': 'development' + }, + { + 'Key': 'service', + 'Value': 'web' + } + ] + aws_tags = kinesis_stream.recreate_tags_from_list(example) + self.assertEqual(aws_tags, should_return) + + def test_get_tags(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg, tags = kinesis_stream.get_tags(client, 'test', check_mode=True) + self.assertTrue(success) + should_return = [ + { + 'Key': 'DryRunMode', + 'Value': 'true' + } + ] + self.assertEqual(tags, should_return) + + def test_find_stream(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg, stream = ( + kinesis_stream.find_stream(client, 'test', check_mode=True) + ) + should_return = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + self.assertTrue(success) + self.assertEqual(stream, should_return) + + def test_wait_for_status(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg, stream = ( + kinesis_stream.wait_for_status( + client, 'test', 'ACTIVE', check_mode=True + ) + ) + should_return = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + self.assertTrue(success) + self.assertEqual(stream, should_return) + + def test_tags_action_create(self): + client = boto3.client('kinesis', region_name=aws_region) + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.tags_action( + client, 'test', tags, 'create', check_mode=True + ) + ) + self.assertTrue(success) + + def test_tags_action_delete(self): + client = boto3.client('kinesis', region_name=aws_region) + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.tags_action( + client, 'test', tags, 'delete', check_mode=True + ) + ) + self.assertTrue(success) + + def test_tags_action_invalid(self): + client = boto3.client('kinesis', region_name=aws_region) + tags = { + 'env': 'development', + 'service': 'web' + } + success, err_msg = ( + kinesis_stream.tags_action( + client, 'test', tags, 'append', check_mode=True + ) + ) + self.assertFalse(success) + + def test_update_tags(self): + client = boto3.client('kinesis', region_name=aws_region) + tags = { + 'env': 'development', + 'service': 'web' + } + success, changed, err_msg = ( + kinesis_stream.update_tags( + client, 'test', tags, check_mode=True + ) + ) + self.assertTrue(success) + + def test_stream_action_create(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.stream_action( + client, 'test', 10, 'create', check_mode=True + ) + ) + self.assertTrue(success) + + def test_stream_action_delete(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.stream_action( + client, 'test', 10, 'delete', check_mode=True + ) + ) + self.assertTrue(success) + + def test_stream_action_invalid(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.stream_action( + client, 'test', 10, 'append', check_mode=True + ) + ) + self.assertFalse(success) + + def test_retention_action_increase(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.retention_action( + client, 'test', 48, 'increase', check_mode=True + ) + ) + self.assertTrue(success) + + def test_retention_action_decrease(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.retention_action( + client, 'test', 24, 'decrease', check_mode=True + ) + ) + self.assertTrue(success) + + def test_retention_action_invalid(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.retention_action( + client, 'test', 24, 'create', check_mode=True + ) + ) + self.assertFalse(success) + + def test_update(self): + client = boto3.client('kinesis', region_name=aws_region) + current_stream = { + 'HasMoreShards': True, + 'RetentionPeriodHours': 24, + 'StreamName': 'test', + 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'StreamStatus': 'ACTIVE' + } + tags = { + 'env': 'development', + 'service': 'web' + } + success, changed, err_msg = ( + kinesis_stream.update( + client, current_stream, 'test', retention_period=48, + tags=tags, check_mode=True + ) + ) + self.assertTrue(success) + self.assertTrue(changed) + self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') + + def test_create_stream(self): + client = boto3.client('kinesis', region_name=aws_region) + tags = { + 'env': 'development', + 'service': 'web' + } + success, changed, err_msg, results = ( + kinesis_stream.create_stream( + client, 'test', number_of_shards=10, retention_period=48, + tags=tags, check_mode=True + ) + ) + should_return = { + 'has_more_shards': True, + 'retention_period_hours': 24, + 'stream_name': 'test', + 'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test', + 'stream_status': 'ACTIVE', + 'tags': tags, + } + self.assertTrue(success) + self.assertTrue(changed) + self.assertEqual(results, should_return) + self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') diff --git a/test/units/modules/cloud/docker/__init__.py b/test/units/modules/cloud/docker/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/units/modules/cloud/docker/test_docker.py b/test/units/modules/cloud/docker/test_docker.py new file mode 100644 index 0000000000..0a40cfb8db --- /dev/null +++ b/test/units/modules/cloud/docker/test_docker.py @@ -0,0 +1,19 @@ +import collections +import os +import unittest + +from ansible.modules.cloud.docker._docker import get_split_image_tag + +class DockerSplitImageTagTestCase(unittest.TestCase): + + def test_trivial(self): + self.assertEqual(get_split_image_tag('test'), ('test', 'latest')) + + def test_with_org_name(self): + self.assertEqual(get_split_image_tag('ansible/centos7-ansible'), ('ansible/centos7-ansible', 'latest')) + + def test_with_tag(self): + self.assertEqual(get_split_image_tag('test:devel'), ('test', 'devel')) + + def test_with_tag_and_org_name(self): + self.assertEqual(get_split_image_tag('ansible/centos7-ansible:devel'), ('ansible/centos7-ansible', 'devel')) diff --git a/test/units/modules/cloud/openstack/__init__.py b/test/units/modules/cloud/openstack/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/units/modules/cloud/openstack/test_os_server.py b/test/units/modules/cloud/openstack/test_os_server.py new file mode 100644 index 0000000000..3ab3e40580 --- /dev/null +++ b/test/units/modules/cloud/openstack/test_os_server.py @@ -0,0 +1,223 @@ +import mock +import pytest +import yaml +import inspect +import collections + +from ansible.modules.cloud.openstack import os_server + + +class AnsibleFail(Exception): + pass + + +class AnsibleExit(Exception): + pass + + +def params_from_doc(func): + '''This function extracts the docstring from the specified function, + parses it as a YAML document, and returns parameters for the os_server + module.''' + + doc = inspect.getdoc(func) + cfg = yaml.load(doc) + + for task in cfg: + for module, params in task.items(): + for k, v in params.items(): + if k in ['nics'] and type(v) == str: + params[k] = [v] + task[module] = collections.defaultdict(str, + params) + + return cfg[0]['os_server'] + + +class FakeCloud (object): + ports = [ + {'name': 'port1', 'id': '1234'}, + {'name': 'port2', 'id': '4321'}, + ] + + networks = [ + {'name': 'network1', 'id': '5678'}, + {'name': 'network2', 'id': '8765'}, + ] + + images = [ + {'name': 'cirros', 'id': '1'}, + {'name': 'fedora', 'id': '2'}, + ] + + flavors = [ + {'name': 'm1.small', 'id': '1', 'flavor_ram': 1024}, + {'name': 'm1.tiny', 'id': '2', 'flavor_ram': 512}, + ] + + def _find(self, source, name): + for item in source: + if item['name'] == name or item['id'] == name: + return item + + def get_image_id(self, name, exclude=None): + image = self._find(self.images, name) + if image: + return image['id'] + + def get_flavor(self, name): + return self._find(self.flavors, name) + + def get_flavor_by_ram(self, ram, include=None): + for flavor in self.flavors: + if flavor['ram'] >= ram and (include is None or include in + flavor['name']): + return flavor + + def get_port(self, name): + return self._find(self.ports, name) + + def get_network(self, name): + return self._find(self.networks, name) + + create_server = mock.MagicMock() + + +class TestNetworkArgs(object): + '''This class exercises the _network_args function of the + os_server module. For each test, we parse the YAML document + contained in the docstring to retrieve the module parameters for the + test.''' + + def setup_method(self, method): + self.cloud = FakeCloud() + self.module = mock.MagicMock() + self.module.params = params_from_doc(method) + + def test_nics_string_net_id(self): + ''' + - os_server: + nics: net-id=1234 + ''' + args = os_server._network_args(self.module, self.cloud) + assert(args[0]['net-id'] == '1234') + + def test_nics_string_net_id_list(self): + ''' + - os_server: + nics: net-id=1234,net-id=4321 + ''' + args = os_server._network_args(self.module, self.cloud) + assert(args[0]['net-id'] == '1234') + assert(args[1]['net-id'] == '4321') + + def test_nics_string_port_id(self): + ''' + - os_server: + nics: port-id=1234 + ''' + args = os_server._network_args(self.module, self.cloud) + assert(args[0]['port-id'] == '1234') + + def test_nics_string_net_name(self): + ''' + - os_server: + nics: net-name=network1 + ''' + args = os_server._network_args(self.module, self.cloud) + assert(args[0]['net-id'] == '5678') + + def test_nics_string_port_name(self): + ''' + - os_server: + nics: port-name=port1 + ''' + args = os_server._network_args(self.module, self.cloud) + assert(args[0]['port-id'] == '1234') + + def test_nics_structured_net_id(self): + ''' + - os_server: + nics: + - net-id: '1234' + ''' + args = os_server._network_args(self.module, self.cloud) + assert(args[0]['net-id'] == '1234') + + def test_nics_structured_mixed(self): + ''' + - os_server: + nics: + - net-id: '1234' + - port-name: port1 + - 'net-name=network1,port-id=4321' + ''' + args = os_server._network_args(self.module, self.cloud) + assert(args[0]['net-id'] == '1234') + assert(args[1]['port-id'] == '1234') + assert(args[2]['net-id'] == '5678') + assert(args[3]['port-id'] == '4321') + + +class TestCreateServer(object): + def setup_method(self, method): + self.cloud = FakeCloud() + self.module = mock.MagicMock() + self.module.params = params_from_doc(method) + self.module.fail_json.side_effect = AnsibleFail() + self.module.exit_json.side_effect = AnsibleExit() + + self.meta = mock.MagicMock() + self.meta.gett_hostvars_from_server.return_value = { + 'id': '1234' + } + os_server.meta = self.meta + + def test_create_server(self): + ''' + - os_server: + image: cirros + flavor: m1.tiny + nics: + - net-name: network1 + meta: + - key: value + ''' + with pytest.raises(AnsibleExit): + os_server._create_server(self.module, self.cloud) + + assert(self.cloud.create_server.call_count == 1) + assert(self.cloud.create_server.call_args[1]['image'] + == self.cloud.get_image_id('cirros')) + assert(self.cloud.create_server.call_args[1]['flavor'] + == self.cloud.get_flavor('m1.tiny')['id']) + assert(self.cloud.create_server.call_args[1]['nics'][0]['net-id'] + == self.cloud.get_network('network1')['id']) + + def test_create_server_bad_flavor(self): + ''' + - os_server: + image: cirros + flavor: missing_flavor + nics: + - net-name: network1 + ''' + with pytest.raises(AnsibleFail): + os_server._create_server(self.module, self.cloud) + + assert('missing_flavor' in + self.module.fail_json.call_args[1]['msg']) + + def test_create_server_bad_nic(self): + ''' + - os_server: + image: cirros + flavor: m1.tiny + nics: + - net-name: missing_network + ''' + with pytest.raises(AnsibleFail): + os_server._create_server(self.module, self.cloud) + + assert('missing_network' in + self.module.fail_json.call_args[1]['msg']) diff --git a/test/units/modules/core/__init__.py b/test/units/modules/core/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/units/modules/core/cloud/__init__.py b/test/units/modules/core/cloud/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/units/modules/core/cloud/docker/__init__.py b/test/units/modules/core/cloud/docker/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/units/modules/core/cloud/docker/test_docker.py b/test/units/modules/core/cloud/docker/test_docker.py deleted file mode 100644 index 0a40cfb8db..0000000000 --- a/test/units/modules/core/cloud/docker/test_docker.py +++ /dev/null @@ -1,19 +0,0 @@ -import collections -import os -import unittest - -from ansible.modules.cloud.docker._docker import get_split_image_tag - -class DockerSplitImageTagTestCase(unittest.TestCase): - - def test_trivial(self): - self.assertEqual(get_split_image_tag('test'), ('test', 'latest')) - - def test_with_org_name(self): - self.assertEqual(get_split_image_tag('ansible/centos7-ansible'), ('ansible/centos7-ansible', 'latest')) - - def test_with_tag(self): - self.assertEqual(get_split_image_tag('test:devel'), ('test', 'devel')) - - def test_with_tag_and_org_name(self): - self.assertEqual(get_split_image_tag('ansible/centos7-ansible:devel'), ('ansible/centos7-ansible', 'devel')) diff --git a/test/units/modules/core/cloud/openstack/__init__.py b/test/units/modules/core/cloud/openstack/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/units/modules/core/cloud/openstack/test_os_server.py b/test/units/modules/core/cloud/openstack/test_os_server.py deleted file mode 100644 index 3ab3e40580..0000000000 --- a/test/units/modules/core/cloud/openstack/test_os_server.py +++ /dev/null @@ -1,223 +0,0 @@ -import mock -import pytest -import yaml -import inspect -import collections - -from ansible.modules.cloud.openstack import os_server - - -class AnsibleFail(Exception): - pass - - -class AnsibleExit(Exception): - pass - - -def params_from_doc(func): - '''This function extracts the docstring from the specified function, - parses it as a YAML document, and returns parameters for the os_server - module.''' - - doc = inspect.getdoc(func) - cfg = yaml.load(doc) - - for task in cfg: - for module, params in task.items(): - for k, v in params.items(): - if k in ['nics'] and type(v) == str: - params[k] = [v] - task[module] = collections.defaultdict(str, - params) - - return cfg[0]['os_server'] - - -class FakeCloud (object): - ports = [ - {'name': 'port1', 'id': '1234'}, - {'name': 'port2', 'id': '4321'}, - ] - - networks = [ - {'name': 'network1', 'id': '5678'}, - {'name': 'network2', 'id': '8765'}, - ] - - images = [ - {'name': 'cirros', 'id': '1'}, - {'name': 'fedora', 'id': '2'}, - ] - - flavors = [ - {'name': 'm1.small', 'id': '1', 'flavor_ram': 1024}, - {'name': 'm1.tiny', 'id': '2', 'flavor_ram': 512}, - ] - - def _find(self, source, name): - for item in source: - if item['name'] == name or item['id'] == name: - return item - - def get_image_id(self, name, exclude=None): - image = self._find(self.images, name) - if image: - return image['id'] - - def get_flavor(self, name): - return self._find(self.flavors, name) - - def get_flavor_by_ram(self, ram, include=None): - for flavor in self.flavors: - if flavor['ram'] >= ram and (include is None or include in - flavor['name']): - return flavor - - def get_port(self, name): - return self._find(self.ports, name) - - def get_network(self, name): - return self._find(self.networks, name) - - create_server = mock.MagicMock() - - -class TestNetworkArgs(object): - '''This class exercises the _network_args function of the - os_server module. For each test, we parse the YAML document - contained in the docstring to retrieve the module parameters for the - test.''' - - def setup_method(self, method): - self.cloud = FakeCloud() - self.module = mock.MagicMock() - self.module.params = params_from_doc(method) - - def test_nics_string_net_id(self): - ''' - - os_server: - nics: net-id=1234 - ''' - args = os_server._network_args(self.module, self.cloud) - assert(args[0]['net-id'] == '1234') - - def test_nics_string_net_id_list(self): - ''' - - os_server: - nics: net-id=1234,net-id=4321 - ''' - args = os_server._network_args(self.module, self.cloud) - assert(args[0]['net-id'] == '1234') - assert(args[1]['net-id'] == '4321') - - def test_nics_string_port_id(self): - ''' - - os_server: - nics: port-id=1234 - ''' - args = os_server._network_args(self.module, self.cloud) - assert(args[0]['port-id'] == '1234') - - def test_nics_string_net_name(self): - ''' - - os_server: - nics: net-name=network1 - ''' - args = os_server._network_args(self.module, self.cloud) - assert(args[0]['net-id'] == '5678') - - def test_nics_string_port_name(self): - ''' - - os_server: - nics: port-name=port1 - ''' - args = os_server._network_args(self.module, self.cloud) - assert(args[0]['port-id'] == '1234') - - def test_nics_structured_net_id(self): - ''' - - os_server: - nics: - - net-id: '1234' - ''' - args = os_server._network_args(self.module, self.cloud) - assert(args[0]['net-id'] == '1234') - - def test_nics_structured_mixed(self): - ''' - - os_server: - nics: - - net-id: '1234' - - port-name: port1 - - 'net-name=network1,port-id=4321' - ''' - args = os_server._network_args(self.module, self.cloud) - assert(args[0]['net-id'] == '1234') - assert(args[1]['port-id'] == '1234') - assert(args[2]['net-id'] == '5678') - assert(args[3]['port-id'] == '4321') - - -class TestCreateServer(object): - def setup_method(self, method): - self.cloud = FakeCloud() - self.module = mock.MagicMock() - self.module.params = params_from_doc(method) - self.module.fail_json.side_effect = AnsibleFail() - self.module.exit_json.side_effect = AnsibleExit() - - self.meta = mock.MagicMock() - self.meta.gett_hostvars_from_server.return_value = { - 'id': '1234' - } - os_server.meta = self.meta - - def test_create_server(self): - ''' - - os_server: - image: cirros - flavor: m1.tiny - nics: - - net-name: network1 - meta: - - key: value - ''' - with pytest.raises(AnsibleExit): - os_server._create_server(self.module, self.cloud) - - assert(self.cloud.create_server.call_count == 1) - assert(self.cloud.create_server.call_args[1]['image'] - == self.cloud.get_image_id('cirros')) - assert(self.cloud.create_server.call_args[1]['flavor'] - == self.cloud.get_flavor('m1.tiny')['id']) - assert(self.cloud.create_server.call_args[1]['nics'][0]['net-id'] - == self.cloud.get_network('network1')['id']) - - def test_create_server_bad_flavor(self): - ''' - - os_server: - image: cirros - flavor: missing_flavor - nics: - - net-name: network1 - ''' - with pytest.raises(AnsibleFail): - os_server._create_server(self.module, self.cloud) - - assert('missing_flavor' in - self.module.fail_json.call_args[1]['msg']) - - def test_create_server_bad_nic(self): - ''' - - os_server: - image: cirros - flavor: m1.tiny - nics: - - net-name: missing_network - ''' - with pytest.raises(AnsibleFail): - os_server._create_server(self.module, self.cloud) - - assert('missing_network' in - self.module.fail_json.call_args[1]['msg']) diff --git a/test/units/modules/core/packaging/__init__.py b/test/units/modules/core/packaging/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/units/modules/core/packaging/os/__init__.py b/test/units/modules/core/packaging/os/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/units/modules/core/packaging/os/test_apt.py b/test/units/modules/core/packaging/os/test_apt.py deleted file mode 100644 index f1b6f170f0..0000000000 --- a/test/units/modules/core/packaging/os/test_apt.py +++ /dev/null @@ -1,50 +0,0 @@ -import collections -import os -import sys - -from ansible.compat.tests import mock -from ansible.compat.tests import unittest - -try: - from ansible.modules.packaging.os.apt import ( - expand_pkgspec_from_fnmatches, - ) -except: - # Need some more module_utils work (porting urls.py) before we can test - # modules. So don't error out in this case. - if sys.version_info[0] >= 3: - pass - - -class AptExpandPkgspecTestCase(unittest.TestCase): - - def setUp(self): - FakePackage = collections.namedtuple("Package", ("name",)) - self.fake_cache = [ FakePackage("apt"), - FakePackage("apt-utils"), - FakePackage("not-selected"), - ] - - def test_trivial(self): - foo = ["apt"] - self.assertEqual( - expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo) - - def test_version_wildcard(self): - foo = ["apt=1.0*"] - self.assertEqual( - expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo) - - def test_pkgname_wildcard_version_wildcard(self): - foo = ["apt*=1.0*"] - m_mock = mock.Mock() - self.assertEqual( - expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache), - ['apt', 'apt-utils']) - - def test_pkgname_expands(self): - foo = ["apt*"] - m_mock = mock.Mock() - self.assertEqual( - expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache), - ["apt", "apt-utils"]) diff --git a/test/units/modules/extras/__init__.py b/test/units/modules/extras/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/units/modules/extras/cloud/__init__.py b/test/units/modules/extras/cloud/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/units/modules/extras/cloud/amazon/__init__.py b/test/units/modules/extras/cloud/amazon/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/units/modules/extras/cloud/amazon/test_ec2_vpc_nat_gateway.py b/test/units/modules/extras/cloud/amazon/test_ec2_vpc_nat_gateway.py deleted file mode 100644 index f274b70114..0000000000 --- a/test/units/modules/extras/cloud/amazon/test_ec2_vpc_nat_gateway.py +++ /dev/null @@ -1,489 +0,0 @@ -from nose.plugins.skip import SkipTest - -try: - import boto3 - import botocore - HAS_BOTO3 = True -except ImportError: - HAS_BOTO3 = False - -if not HAS_BOTO3: - raise SkipTest("test_ec2_vpc_nat_gateway.py requires the python module 'boto3' and 'botocore'") - -import unittest - -from collections import namedtuple -from ansible.parsing.dataloader import DataLoader -from ansible.vars import VariableManager -from ansible.inventory import Inventory -from ansible.playbook.play import Play -from ansible.executor.task_queue_manager import TaskQueueManager - -import ansible.modules.cloud.amazon.ec2_vpc_nat_gateway as ng - -Options = ( - namedtuple( - 'Options', [ - 'connection', 'module_path', 'forks', 'become', 'become_method', - 'become_user', 'remote_user', 'private_key_file', 'ssh_common_args', - 'sftp_extra_args', 'scp_extra_args', 'ssh_extra_args', 'verbosity', - 'check' - ] - ) -) -# initialize needed objects -variable_manager = VariableManager() -loader = DataLoader() -options = ( - Options( - connection='local', - module_path='cloud/amazon', - forks=1, become=None, become_method=None, become_user=None, check=True, - remote_user=None, private_key_file=None, ssh_common_args=None, - sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, - verbosity=3 - ) -) -passwords = dict(vault_pass='') - -aws_region = 'us-west-2' - -# create inventory and pass to var manager -inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list='localhost') -variable_manager.set_inventory(inventory) - -def run(play): - tqm = None - results = None - try: - tqm = TaskQueueManager( - inventory=inventory, - variable_manager=variable_manager, - loader=loader, - options=options, - passwords=passwords, - stdout_callback='default', - ) - results = tqm.run(play) - finally: - if tqm is not None: - tqm.cleanup() - return tqm, results - -class AnsibleVpcNatGatewayTasks(unittest.TestCase): - - def test_create_gateway_using_allocation_id(self): - play_source = dict( - name = "Create new nat gateway with eip allocation-id", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='ec2_vpc_nat_gateway', - args=dict( - subnet_id='subnet-12345678', - allocation_id='eipalloc-12345678', - wait='yes', - region=aws_region, - ) - ), - register='nat_gateway', - ), - dict( - action=dict( - module='debug', - args=dict( - msg='{{nat_gateway}}' - ) - ) - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 2) - self.failUnless(tqm._stats.changed['localhost'] == 1) - - def test_create_gateway_using_allocation_id_idempotent(self): - play_source = dict( - name = "Create new nat gateway with eip allocation-id", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='ec2_vpc_nat_gateway', - args=dict( - subnet_id='subnet-123456789', - allocation_id='eipalloc-1234567', - wait='yes', - region=aws_region, - ) - ), - register='nat_gateway', - ), - dict( - action=dict( - module='debug', - args=dict( - msg='{{nat_gateway}}' - ) - ) - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 2) - self.assertFalse('localhost' in tqm._stats.changed) - - def test_create_gateway_using_eip_address(self): - play_source = dict( - name = "Create new nat gateway with eip address", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='ec2_vpc_nat_gateway', - args=dict( - subnet_id='subnet-12345678', - eip_address='55.55.55.55', - wait='yes', - region=aws_region, - ) - ), - register='nat_gateway', - ), - dict( - action=dict( - module='debug', - args=dict( - msg='{{nat_gateway}}' - ) - ) - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 2) - self.failUnless(tqm._stats.changed['localhost'] == 1) - - def test_create_gateway_using_eip_address_idempotent(self): - play_source = dict( - name = "Create new nat gateway with eip address", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='ec2_vpc_nat_gateway', - args=dict( - subnet_id='subnet-123456789', - eip_address='55.55.55.55', - wait='yes', - region=aws_region, - ) - ), - register='nat_gateway', - ), - dict( - action=dict( - module='debug', - args=dict( - msg='{{nat_gateway}}' - ) - ) - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 2) - self.assertFalse('localhost' in tqm._stats.changed) - - def test_create_gateway_in_subnet_only_if_one_does_not_exist_already(self): - play_source = dict( - name = "Create new nat gateway only if one does not exist already", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='ec2_vpc_nat_gateway', - args=dict( - if_exist_do_not_create='yes', - subnet_id='subnet-123456789', - wait='yes', - region=aws_region, - ) - ), - register='nat_gateway', - ), - dict( - action=dict( - module='debug', - args=dict( - msg='{{nat_gateway}}' - ) - ) - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 2) - self.assertFalse('localhost' in tqm._stats.changed) - - def test_delete_gateway(self): - play_source = dict( - name = "Delete Nat Gateway", - hosts = 'localhost', - gather_facts = 'no', - tasks = [ - dict( - action=dict( - module='ec2_vpc_nat_gateway', - args=dict( - nat_gateway_id='nat-123456789', - state='absent', - wait='yes', - region=aws_region, - ) - ), - register='nat_gateway', - ), - dict( - action=dict( - module='debug', - args=dict( - msg='{{nat_gateway}}' - ) - ) - ) - ] - ) - play = Play().load(play_source, variable_manager=variable_manager, loader=loader) - tqm, results = run(play) - self.failUnless(tqm._stats.ok['localhost'] == 2) - self.assertTrue('localhost' in tqm._stats.changed) - -class AnsibleEc2VpcNatGatewayFunctions(unittest.TestCase): - - def test_convert_to_lower(self): - example = ng.DRY_RUN_GATEWAY_UNCONVERTED - converted_example = ng.convert_to_lower(example[0]) - keys = list(converted_example.keys()) - keys.sort() - for i in range(len(keys)): - if i == 0: - self.assertEqual(keys[i], 'create_time') - if i == 1: - self.assertEqual(keys[i], 'nat_gateway_addresses') - gw_addresses_keys = list(converted_example[keys[i]][0].keys()) - gw_addresses_keys.sort() - for j in range(len(gw_addresses_keys)): - if j == 0: - self.assertEqual(gw_addresses_keys[j], 'allocation_id') - if j == 1: - self.assertEqual(gw_addresses_keys[j], 'network_interface_id') - if j == 2: - self.assertEqual(gw_addresses_keys[j], 'private_ip') - if j == 3: - self.assertEqual(gw_addresses_keys[j], 'public_ip') - if i == 2: - self.assertEqual(keys[i], 'nat_gateway_id') - if i == 3: - self.assertEqual(keys[i], 'state') - if i == 4: - self.assertEqual(keys[i], 'subnet_id') - if i == 5: - self.assertEqual(keys[i], 'vpc_id') - - def test_get_nat_gateways(self): - client = boto3.client('ec2', region_name=aws_region) - success, err_msg, stream = ( - ng.get_nat_gateways(client, 'subnet-123456789', check_mode=True) - ) - should_return = ng.DRY_RUN_GATEWAYS - self.assertTrue(success) - self.assertEqual(stream, should_return) - - def test_get_nat_gateways_no_gateways_found(self): - client = boto3.client('ec2', region_name=aws_region) - success, err_msg, stream = ( - ng.get_nat_gateways(client, 'subnet-1234567', check_mode=True) - ) - self.assertTrue(success) - self.assertEqual(stream, []) - - def test_wait_for_status(self): - client = boto3.client('ec2', region_name=aws_region) - success, err_msg, gws = ( - ng.wait_for_status( - client, 5, 'nat-123456789', 'available', check_mode=True - ) - ) - should_return = ng.DRY_RUN_GATEWAYS[0] - self.assertTrue(success) - self.assertEqual(gws, should_return) - - def test_wait_for_status_to_timeout(self): - client = boto3.client('ec2', region_name=aws_region) - success, err_msg, gws = ( - ng.wait_for_status( - client, 2, 'nat-12345678', 'available', check_mode=True - ) - ) - self.assertFalse(success) - self.assertEqual(gws, {}) - - def test_gateway_in_subnet_exists_with_allocation_id(self): - client = boto3.client('ec2', region_name=aws_region) - gws, err_msg = ( - ng.gateway_in_subnet_exists( - client, 'subnet-123456789', 'eipalloc-1234567', check_mode=True - ) - ) - should_return = ng.DRY_RUN_GATEWAYS - self.assertEqual(gws, should_return) - - def test_gateway_in_subnet_exists_with_allocation_id_does_not_exist(self): - client = boto3.client('ec2', region_name=aws_region) - gws, err_msg = ( - ng.gateway_in_subnet_exists( - client, 'subnet-123456789', 'eipalloc-123', check_mode=True - ) - ) - should_return = list() - self.assertEqual(gws, should_return) - - def test_gateway_in_subnet_exists_without_allocation_id(self): - client = boto3.client('ec2', region_name=aws_region) - gws, err_msg = ( - ng.gateway_in_subnet_exists( - client, 'subnet-123456789', check_mode=True - ) - ) - should_return = ng.DRY_RUN_GATEWAYS - self.assertEqual(gws, should_return) - - def test_get_eip_allocation_id_by_address(self): - client = boto3.client('ec2', region_name=aws_region) - allocation_id, _ = ( - ng.get_eip_allocation_id_by_address( - client, '55.55.55.55', check_mode=True - ) - ) - should_return = 'eipalloc-1234567' - self.assertEqual(allocation_id, should_return) - - def test_get_eip_allocation_id_by_address_does_not_exist(self): - client = boto3.client('ec2', region_name=aws_region) - allocation_id, err_msg = ( - ng.get_eip_allocation_id_by_address( - client, '52.52.52.52', check_mode=True - ) - ) - self.assertEqual(err_msg, 'EIP 52.52.52.52 does not exist') - self.assertTrue(allocation_id is None) - - def test_allocate_eip_address(self): - client = boto3.client('ec2', region_name=aws_region) - success, err_msg, eip_id = ( - ng.allocate_eip_address( - client, check_mode=True - ) - ) - self.assertTrue(success) - - def test_release_address(self): - client = boto3.client('ec2', region_name=aws_region) - success, _ = ( - ng.release_address( - client, 'eipalloc-1234567', check_mode=True - ) - ) - self.assertTrue(success) - - def test_create(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.create( - client, 'subnet-123456', 'eipalloc-1234567', check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - - def test_pre_create(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.pre_create( - client, 'subnet-123456', check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - - def test_pre_create_idemptotent_with_allocation_id(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.pre_create( - client, 'subnet-123456789', allocation_id='eipalloc-1234567', check_mode=True - ) - ) - self.assertTrue(success) - self.assertFalse(changed) - - def test_pre_create_idemptotent_with_eip_address(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.pre_create( - client, 'subnet-123456789', eip_address='55.55.55.55', check_mode=True - ) - ) - self.assertTrue(success) - self.assertFalse(changed) - - def test_pre_create_idemptotent_if_exist_do_not_create(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.pre_create( - client, 'subnet-123456789', if_exist_do_not_create=True, check_mode=True - ) - ) - self.assertTrue(success) - self.assertFalse(changed) - - def test_delete(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, _ = ( - ng.remove( - client, 'nat-123456789', check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - - def test_delete_and_release_ip(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, _ = ( - ng.remove( - client, 'nat-123456789', release_eip=True, check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - - def test_delete_if_does_not_exist(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, _ = ( - ng.remove( - client, 'nat-12345', check_mode=True - ) - ) - self.assertFalse(success) - self.assertFalse(changed) diff --git a/test/units/modules/extras/cloud/amazon/test_kinesis_stream.py b/test/units/modules/extras/cloud/amazon/test_kinesis_stream.py deleted file mode 100644 index efddfece7c..0000000000 --- a/test/units/modules/extras/cloud/amazon/test_kinesis_stream.py +++ /dev/null @@ -1,287 +0,0 @@ -from nose.plugins.skip import SkipTest - -try: - import boto3 - import botocore - HAS_BOTO3 = True -except ImportError: - HAS_BOTO3 = False - -if not HAS_BOTO3: - raise SkipTest("test_kinesis_stream.py requires the python module 'boto3' and 'botocore'") - -import unittest - -import ansible.modules.cloud.amazon.kinesis_stream as kinesis_stream - -aws_region = 'us-west-2' - - -class AnsibleKinesisStreamFunctions(unittest.TestCase): - - def test_convert_to_lower(self): - example = { - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE' - } - converted_example = kinesis_stream.convert_to_lower(example) - keys = list(converted_example.keys()) - keys.sort() - for i in range(len(keys)): - if i == 0: - self.assertEqual(keys[i], 'has_more_shards') - if i == 1: - self.assertEqual(keys[i], 'retention_period_hours') - if i == 2: - self.assertEqual(keys[i], 'stream_arn') - if i == 3: - self.assertEqual(keys[i], 'stream_name') - if i == 4: - self.assertEqual(keys[i], 'stream_status') - - def test_make_tags_in_aws_format(self): - example = { - 'env': 'development' - } - should_return = [ - { - 'Key': 'env', - 'Value': 'development' - } - ] - aws_tags = kinesis_stream.make_tags_in_aws_format(example) - self.assertEqual(aws_tags, should_return) - - def test_make_tags_in_proper_format(self): - example = [ - { - 'Key': 'env', - 'Value': 'development' - }, - { - 'Key': 'service', - 'Value': 'web' - } - ] - should_return = { - 'env': 'development', - 'service': 'web' - } - proper_tags = kinesis_stream.make_tags_in_proper_format(example) - self.assertEqual(proper_tags, should_return) - - def test_recreate_tags_from_list(self): - example = [('environment', 'development'), ('service', 'web')] - should_return = [ - { - 'Key': 'environment', - 'Value': 'development' - }, - { - 'Key': 'service', - 'Value': 'web' - } - ] - aws_tags = kinesis_stream.recreate_tags_from_list(example) - self.assertEqual(aws_tags, should_return) - - def test_get_tags(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg, tags = kinesis_stream.get_tags(client, 'test', check_mode=True) - self.assertTrue(success) - should_return = [ - { - 'Key': 'DryRunMode', - 'Value': 'true' - } - ] - self.assertEqual(tags, should_return) - - def test_find_stream(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg, stream = ( - kinesis_stream.find_stream(client, 'test', check_mode=True) - ) - should_return = { - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE' - } - self.assertTrue(success) - self.assertEqual(stream, should_return) - - def test_wait_for_status(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg, stream = ( - kinesis_stream.wait_for_status( - client, 'test', 'ACTIVE', check_mode=True - ) - ) - should_return = { - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE' - } - self.assertTrue(success) - self.assertEqual(stream, should_return) - - def test_tags_action_create(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.tags_action( - client, 'test', tags, 'create', check_mode=True - ) - ) - self.assertTrue(success) - - def test_tags_action_delete(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.tags_action( - client, 'test', tags, 'delete', check_mode=True - ) - ) - self.assertTrue(success) - - def test_tags_action_invalid(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.tags_action( - client, 'test', tags, 'append', check_mode=True - ) - ) - self.assertFalse(success) - - def test_update_tags(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, changed, err_msg = ( - kinesis_stream.update_tags( - client, 'test', tags, check_mode=True - ) - ) - self.assertTrue(success) - - def test_stream_action_create(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.stream_action( - client, 'test', 10, 'create', check_mode=True - ) - ) - self.assertTrue(success) - - def test_stream_action_delete(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.stream_action( - client, 'test', 10, 'delete', check_mode=True - ) - ) - self.assertTrue(success) - - def test_stream_action_invalid(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.stream_action( - client, 'test', 10, 'append', check_mode=True - ) - ) - self.assertFalse(success) - - def test_retention_action_increase(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.retention_action( - client, 'test', 48, 'increase', check_mode=True - ) - ) - self.assertTrue(success) - - def test_retention_action_decrease(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.retention_action( - client, 'test', 24, 'decrease', check_mode=True - ) - ) - self.assertTrue(success) - - def test_retention_action_invalid(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.retention_action( - client, 'test', 24, 'create', check_mode=True - ) - ) - self.assertFalse(success) - - def test_update(self): - client = boto3.client('kinesis', region_name=aws_region) - current_stream = { - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE' - } - tags = { - 'env': 'development', - 'service': 'web' - } - success, changed, err_msg = ( - kinesis_stream.update( - client, current_stream, 'test', retention_period=48, - tags=tags, check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') - - def test_create_stream(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, changed, err_msg, results = ( - kinesis_stream.create_stream( - client, 'test', number_of_shards=10, retention_period=48, - tags=tags, check_mode=True - ) - ) - should_return = { - 'has_more_shards': True, - 'retention_period_hours': 24, - 'stream_name': 'test', - 'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'stream_status': 'ACTIVE', - 'tags': tags, - } - self.assertTrue(success) - self.assertTrue(changed) - self.assertEqual(results, should_return) - self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') diff --git a/test/units/modules/packaging/__init__.py b/test/units/modules/packaging/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/units/modules/packaging/os/__init__.py b/test/units/modules/packaging/os/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/units/modules/packaging/os/test_apt.py b/test/units/modules/packaging/os/test_apt.py new file mode 100644 index 0000000000..f1b6f170f0 --- /dev/null +++ b/test/units/modules/packaging/os/test_apt.py @@ -0,0 +1,50 @@ +import collections +import os +import sys + +from ansible.compat.tests import mock +from ansible.compat.tests import unittest + +try: + from ansible.modules.packaging.os.apt import ( + expand_pkgspec_from_fnmatches, + ) +except: + # Need some more module_utils work (porting urls.py) before we can test + # modules. So don't error out in this case. + if sys.version_info[0] >= 3: + pass + + +class AptExpandPkgspecTestCase(unittest.TestCase): + + def setUp(self): + FakePackage = collections.namedtuple("Package", ("name",)) + self.fake_cache = [ FakePackage("apt"), + FakePackage("apt-utils"), + FakePackage("not-selected"), + ] + + def test_trivial(self): + foo = ["apt"] + self.assertEqual( + expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo) + + def test_version_wildcard(self): + foo = ["apt=1.0*"] + self.assertEqual( + expand_pkgspec_from_fnmatches(None, foo, self.fake_cache), foo) + + def test_pkgname_wildcard_version_wildcard(self): + foo = ["apt*=1.0*"] + m_mock = mock.Mock() + self.assertEqual( + expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache), + ['apt', 'apt-utils']) + + def test_pkgname_expands(self): + foo = ["apt*"] + m_mock = mock.Mock() + self.assertEqual( + expand_pkgspec_from_fnmatches(m_mock, foo, self.fake_cache), + ["apt", "apt-utils"]) -- cgit v1.2.1