diff options
author | Matt Martz <matt@sivel.net> | 2020-11-02 10:35:13 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-02 10:35:13 -0600 |
commit | 6543c7bc5d7cb0b4719004b9a791b4ed22d9c5af (patch) | |
tree | 51edf8bb8ca0800bb211a521c6e2927db20dbb8b /test/support | |
parent | 2ee5af514b2585f90833c1e06a6c347b53c24930 (diff) | |
download | ansible-6543c7bc5d7cb0b4719004b9a791b4ed22d9c5af.tar.gz |
Remove incidental_cs_role_permission (#72380)
* Add explicit argspec tests for choices
* ci_complete ci_coverage
* Remove incidental_cs_role_permission
* ci_complete ci_coverage
* ci_complete ci_coverage
Diffstat (limited to 'test/support')
3 files changed, 0 insertions, 1226 deletions
diff --git a/test/support/integration/plugins/module_utils/cloudstack.py b/test/support/integration/plugins/module_utils/cloudstack.py deleted file mode 100644 index 85a53b6b6e..0000000000 --- a/test/support/integration/plugins/module_utils/cloudstack.py +++ /dev/null @@ -1,664 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright (c) 2015, René Moser <mail@renemoser.net> -# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause) - -from __future__ import absolute_import, division, print_function -__metaclass__ = type - - -import os -import sys -import time -import traceback - -from ansible.module_utils._text import to_text, to_native -from ansible.module_utils.basic import missing_required_lib - -CS_IMP_ERR = None -try: - from cs import CloudStack, CloudStackException, read_config - HAS_LIB_CS = True -except ImportError: - CS_IMP_ERR = traceback.format_exc() - HAS_LIB_CS = False - - -if sys.version_info > (3,): - long = int - - -def cs_argument_spec(): - return dict( - api_key=dict(default=os.environ.get('CLOUDSTACK_KEY')), - api_secret=dict(default=os.environ.get('CLOUDSTACK_SECRET'), no_log=True), - api_url=dict(default=os.environ.get('CLOUDSTACK_ENDPOINT')), - api_http_method=dict(choices=['get', 'post'], default=os.environ.get('CLOUDSTACK_METHOD')), - api_timeout=dict(type='int', default=os.environ.get('CLOUDSTACK_TIMEOUT')), - api_region=dict(default=os.environ.get('CLOUDSTACK_REGION') or 'cloudstack'), - ) - - -def cs_required_together(): - return [['api_key', 'api_secret']] - - -class AnsibleCloudStack: - - def __init__(self, module): - if not HAS_LIB_CS: - module.fail_json(msg=missing_required_lib('cs'), exception=CS_IMP_ERR) - - self.result = { - 'changed': False, - 'diff': { - 'before': dict(), - 'after': dict() - } - } - - # Common returns, will be merged with self.returns - # search_for_key: replace_with_key - self.common_returns = { - 'id': 'id', - 'name': 'name', - 'created': 'created', - 'zonename': 'zone', - 'state': 'state', - 'project': 'project', - 'account': 'account', - 'domain': 'domain', - 'displaytext': 'display_text', - 'displayname': 'display_name', - 'description': 'description', - } - - # Init returns dict for use in subclasses - self.returns = {} - # these values will be casted to int - self.returns_to_int = {} - # these keys will be compared case sensitive in self.has_changed() - self.case_sensitive_keys = [ - 'id', - 'displaytext', - 'displayname', - 'description', - ] - - self.module = module - self._cs = None - - # Helper for VPCs - self._vpc_networks_ids = None - - self.domain = None - self.account = None - self.project = None - self.ip_address = None - self.network = None - self.physical_network = None - self.vpc = None - self.zone = None - self.vm = None - self.vm_default_nic = None - self.os_type = None - self.hypervisor = None - self.capabilities = None - self.network_acl = None - - @property - def cs(self): - if self._cs is None: - api_config = self.get_api_config() - self._cs = CloudStack(**api_config) - return self._cs - - def get_api_config(self): - api_region = self.module.params.get('api_region') or os.environ.get('CLOUDSTACK_REGION') - try: - config = read_config(api_region) - except KeyError: - config = {} - - api_config = { - 'endpoint': self.module.params.get('api_url') or config.get('endpoint'), - 'key': self.module.params.get('api_key') or config.get('key'), - 'secret': self.module.params.get('api_secret') or config.get('secret'), - 'timeout': self.module.params.get('api_timeout') or config.get('timeout') or 10, - 'method': self.module.params.get('api_http_method') or config.get('method') or 'get', - } - self.result.update({ - 'api_region': api_region, - 'api_url': api_config['endpoint'], - 'api_key': api_config['key'], - 'api_timeout': int(api_config['timeout']), - 'api_http_method': api_config['method'], - }) - if not all([api_config['endpoint'], api_config['key'], api_config['secret']]): - self.fail_json(msg="Missing api credentials: can not authenticate") - return api_config - - def fail_json(self, **kwargs): - self.result.update(kwargs) - self.module.fail_json(**self.result) - - def get_or_fallback(self, key=None, fallback_key=None): - value = self.module.params.get(key) - if not value: - value = self.module.params.get(fallback_key) - return value - - def has_changed(self, want_dict, current_dict, only_keys=None, skip_diff_for_keys=None): - result = False - for key, value in want_dict.items(): - - # Optionally limit by a list of keys - if only_keys and key not in only_keys: - continue - - # Skip None values - if value is None: - continue - - if key in current_dict: - if isinstance(value, (int, float, long, complex)): - - # ensure we compare the same type - if isinstance(value, int): - current_dict[key] = int(current_dict[key]) - elif isinstance(value, float): - current_dict[key] = float(current_dict[key]) - elif isinstance(value, long): - current_dict[key] = long(current_dict[key]) - elif isinstance(value, complex): - current_dict[key] = complex(current_dict[key]) - - if value != current_dict[key]: - if skip_diff_for_keys and key not in skip_diff_for_keys: - self.result['diff']['before'][key] = current_dict[key] - self.result['diff']['after'][key] = value - result = True - else: - before_value = to_text(current_dict[key]) - after_value = to_text(value) - - if self.case_sensitive_keys and key in self.case_sensitive_keys: - if before_value != after_value: - if skip_diff_for_keys and key not in skip_diff_for_keys: - self.result['diff']['before'][key] = before_value - self.result['diff']['after'][key] = after_value - result = True - - # Test for diff in case insensitive way - elif before_value.lower() != after_value.lower(): - if skip_diff_for_keys and key not in skip_diff_for_keys: - self.result['diff']['before'][key] = before_value - self.result['diff']['after'][key] = after_value - result = True - else: - if skip_diff_for_keys and key not in skip_diff_for_keys: - self.result['diff']['before'][key] = None - self.result['diff']['after'][key] = to_text(value) - result = True - return result - - def _get_by_key(self, key=None, my_dict=None): - if my_dict is None: - my_dict = {} - if key: - if key in my_dict: - return my_dict[key] - self.fail_json(msg="Something went wrong: %s not found" % key) - return my_dict - - def query_api(self, command, **args): - try: - res = getattr(self.cs, command)(**args) - - if 'errortext' in res: - self.fail_json(msg="Failed: '%s'" % res['errortext']) - - except CloudStackException as e: - self.fail_json(msg='CloudStackException: %s' % to_native(e)) - - except Exception as e: - self.fail_json(msg=to_native(e)) - - return res - - def get_network_acl(self, key=None): - if self.network_acl is None: - args = { - 'name': self.module.params.get('network_acl'), - 'vpcid': self.get_vpc(key='id'), - } - network_acls = self.query_api('listNetworkACLLists', **args) - if network_acls: - self.network_acl = network_acls['networkacllist'][0] - self.result['network_acl'] = self.network_acl['name'] - if self.network_acl: - return self._get_by_key(key, self.network_acl) - else: - self.fail_json(msg="Network ACL %s not found" % self.module.params.get('network_acl')) - - def get_vpc(self, key=None): - """Return a VPC dictionary or the value of given key of.""" - if self.vpc: - return self._get_by_key(key, self.vpc) - - vpc = self.module.params.get('vpc') - if not vpc: - vpc = os.environ.get('CLOUDSTACK_VPC') - if not vpc: - return None - - args = { - 'account': self.get_account(key='name'), - 'domainid': self.get_domain(key='id'), - 'projectid': self.get_project(key='id'), - 'zoneid': self.get_zone(key='id'), - } - vpcs = self.query_api('listVPCs', **args) - if not vpcs: - self.fail_json(msg="No VPCs available.") - - for v in vpcs['vpc']: - if vpc in [v['name'], v['displaytext'], v['id']]: - # Fail if the identifyer matches more than one VPC - if self.vpc: - self.fail_json(msg="More than one VPC found with the provided identifyer '%s'" % vpc) - else: - self.vpc = v - self.result['vpc'] = v['name'] - if self.vpc: - return self._get_by_key(key, self.vpc) - self.fail_json(msg="VPC '%s' not found" % vpc) - - def is_vpc_network(self, network_id): - """Returns True if network is in VPC.""" - # This is an efficient way to query a lot of networks at a time - if self._vpc_networks_ids is None: - args = { - 'account': self.get_account(key='name'), - 'domainid': self.get_domain(key='id'), - 'projectid': self.get_project(key='id'), - 'zoneid': self.get_zone(key='id'), - } - vpcs = self.query_api('listVPCs', **args) - self._vpc_networks_ids = [] - if vpcs: - for vpc in vpcs['vpc']: - for n in vpc.get('network', []): - self._vpc_networks_ids.append(n['id']) - return network_id in self._vpc_networks_ids - - def get_physical_network(self, key=None): - if self.physical_network: - return self._get_by_key(key, self.physical_network) - physical_network = self.module.params.get('physical_network') - args = { - 'zoneid': self.get_zone(key='id') - } - physical_networks = self.query_api('listPhysicalNetworks', **args) - if not physical_networks: - self.fail_json(msg="No physical networks available.") - - for net in physical_networks['physicalnetwork']: - if physical_network in [net['name'], net['id']]: - self.physical_network = net - self.result['physical_network'] = net['name'] - return self._get_by_key(key, self.physical_network) - self.fail_json(msg="Physical Network '%s' not found" % physical_network) - - def get_network(self, key=None): - """Return a network dictionary or the value of given key of.""" - if self.network: - return self._get_by_key(key, self.network) - - network = self.module.params.get('network') - if not network: - vpc_name = self.get_vpc(key='name') - if vpc_name: - self.fail_json(msg="Could not find network for VPC '%s' due missing argument: network" % vpc_name) - return None - - args = { - 'account': self.get_account(key='name'), - 'domainid': self.get_domain(key='id'), - 'projectid': self.get_project(key='id'), - 'zoneid': self.get_zone(key='id'), - 'vpcid': self.get_vpc(key='id') - } - networks = self.query_api('listNetworks', **args) - if not networks: - self.fail_json(msg="No networks available.") - - for n in networks['network']: - # ignore any VPC network if vpc param is not given - if 'vpcid' in n and not self.get_vpc(key='id'): - continue - if network in [n['displaytext'], n['name'], n['id']]: - self.result['network'] = n['name'] - self.network = n - return self._get_by_key(key, self.network) - self.fail_json(msg="Network '%s' not found" % network) - - def get_project(self, key=None): - if self.project: - return self._get_by_key(key, self.project) - - project = self.module.params.get('project') - if not project: - project = os.environ.get('CLOUDSTACK_PROJECT') - if not project: - return None - args = { - 'account': self.get_account(key='name'), - 'domainid': self.get_domain(key='id') - } - projects = self.query_api('listProjects', **args) - if projects: - for p in projects['project']: - if project.lower() in [p['name'].lower(), p['id']]: - self.result['project'] = p['name'] - self.project = p - return self._get_by_key(key, self.project) - self.fail_json(msg="project '%s' not found" % project) - - def get_ip_address(self, key=None): - if self.ip_address: - return self._get_by_key(key, self.ip_address) - - ip_address = self.module.params.get('ip_address') - if not ip_address: - self.fail_json(msg="IP address param 'ip_address' is required") - - args = { - 'ipaddress': ip_address, - 'account': self.get_account(key='name'), - 'domainid': self.get_domain(key='id'), - 'projectid': self.get_project(key='id'), - 'vpcid': self.get_vpc(key='id'), - } - - ip_addresses = self.query_api('listPublicIpAddresses', **args) - - if not ip_addresses: - self.fail_json(msg="IP address '%s' not found" % args['ipaddress']) - - self.ip_address = ip_addresses['publicipaddress'][0] - return self._get_by_key(key, self.ip_address) - - def get_vm_guest_ip(self): - vm_guest_ip = self.module.params.get('vm_guest_ip') - default_nic = self.get_vm_default_nic() - - if not vm_guest_ip: - return default_nic['ipaddress'] - - for secondary_ip in default_nic['secondaryip']: - if vm_guest_ip == secondary_ip['ipaddress']: - return vm_guest_ip - self.fail_json(msg="Secondary IP '%s' not assigned to VM" % vm_guest_ip) - - def get_vm_default_nic(self): - if self.vm_default_nic: - return self.vm_default_nic - - nics = self.query_api('listNics', virtualmachineid=self.get_vm(key='id')) - if nics: - for n in nics['nic']: - if n['isdefault']: - self.vm_default_nic = n - return self.vm_default_nic - self.fail_json(msg="No default IP address of VM '%s' found" % self.module.params.get('vm')) - - def get_vm(self, key=None, filter_zone=True): - if self.vm: - return self._get_by_key(key, self.vm) - - vm = self.module.params.get('vm') - if not vm: - self.fail_json(msg="Virtual machine param 'vm' is required") - - args = { - 'account': self.get_account(key='name'), - 'domainid': self.get_domain(key='id'), - 'projectid': self.get_project(key='id'), - 'zoneid': self.get_zone(key='id') if filter_zone else None, - 'fetch_list': True, - } - vms = self.query_api('listVirtualMachines', **args) - if vms: - for v in vms: - if vm.lower() in [v['name'].lower(), v['displayname'].lower(), v['id']]: - self.vm = v - return self._get_by_key(key, self.vm) - self.fail_json(msg="Virtual machine '%s' not found" % vm) - - def get_disk_offering(self, key=None): - disk_offering = self.module.params.get('disk_offering') - if not disk_offering: - return None - - # Do not add domain filter for disk offering listing. - disk_offerings = self.query_api('listDiskOfferings') - if disk_offerings: - for d in disk_offerings['diskoffering']: - if disk_offering in [d['displaytext'], d['name'], d['id']]: - return self._get_by_key(key, d) - self.fail_json(msg="Disk offering '%s' not found" % disk_offering) - - def get_zone(self, key=None): - if self.zone: - return self._get_by_key(key, self.zone) - - zone = self.module.params.get('zone') - if not zone: - zone = os.environ.get('CLOUDSTACK_ZONE') - zones = self.query_api('listZones') - - if not zones: - self.fail_json(msg="No zones available. Please create a zone first") - - # use the first zone if no zone param given - if not zone: - self.zone = zones['zone'][0] - self.result['zone'] = self.zone['name'] - return self._get_by_key(key, self.zone) - - if zones: - for z in zones['zone']: - if zone.lower() in [z['name'].lower(), z['id']]: - self.result['zone'] = z['name'] - self.zone = z - return self._get_by_key(key, self.zone) - self.fail_json(msg="zone '%s' not found" % zone) - - def get_os_type(self, key=None): - if self.os_type: - return self._get_by_key(key, self.zone) - - os_type = self.module.params.get('os_type') - if not os_type: - return None - - os_types = self.query_api('listOsTypes') - if os_types: - for o in os_types['ostype']: - if os_type in [o['description'], o['id']]: - self.os_type = o - return self._get_by_key(key, self.os_type) - self.fail_json(msg="OS type '%s' not found" % os_type) - - def get_hypervisor(self): - if self.hypervisor: - return self.hypervisor - - hypervisor = self.module.params.get('hypervisor') - hypervisors = self.query_api('listHypervisors') - - # use the first hypervisor if no hypervisor param given - if not hypervisor: - self.hypervisor = hypervisors['hypervisor'][0]['name'] - return self.hypervisor - - for h in hypervisors['hypervisor']: - if hypervisor.lower() == h['name'].lower(): - self.hypervisor = h['name'] - return self.hypervisor - self.fail_json(msg="Hypervisor '%s' not found" % hypervisor) - - def get_account(self, key=None): - if self.account: - return self._get_by_key(key, self.account) - - account = self.module.params.get('account') - if not account: - account = os.environ.get('CLOUDSTACK_ACCOUNT') - if not account: - return None - - domain = self.module.params.get('domain') - if not domain: - self.fail_json(msg="Account must be specified with Domain") - - args = { - 'name': account, - 'domainid': self.get_domain(key='id'), - 'listall': True - } - accounts = self.query_api('listAccounts', **args) - if accounts: - self.account = accounts['account'][0] - self.result['account'] = self.account['name'] - return self._get_by_key(key, self.account) - self.fail_json(msg="Account '%s' not found" % account) - - def get_domain(self, key=None): - if self.domain: - return self._get_by_key(key, self.domain) - - domain = self.module.params.get('domain') - if not domain: - domain = os.environ.get('CLOUDSTACK_DOMAIN') - if not domain: - return None - - args = { - 'listall': True, - } - domains = self.query_api('listDomains', **args) - if domains: - for d in domains['domain']: - if d['path'].lower() in [domain.lower(), "root/" + domain.lower(), "root" + domain.lower()]: - self.domain = d - self.result['domain'] = d['path'] - return self._get_by_key(key, self.domain) - self.fail_json(msg="Domain '%s' not found" % domain) - - def query_tags(self, resource, resource_type): - args = { - 'resourceid': resource['id'], - 'resourcetype': resource_type, - } - tags = self.query_api('listTags', **args) - return self.get_tags(resource=tags, key='tag') - - def get_tags(self, resource=None, key='tags'): - existing_tags = [] - for tag in resource.get(key) or []: - existing_tags.append({'key': tag['key'], 'value': tag['value']}) - return existing_tags - - def _process_tags(self, resource, resource_type, tags, operation="create"): - if tags: - self.result['changed'] = True - if not self.module.check_mode: - args = { - 'resourceids': resource['id'], - 'resourcetype': resource_type, - 'tags': tags, - } - if operation == "create": - response = self.query_api('createTags', **args) - else: - response = self.query_api('deleteTags', **args) - self.poll_job(response) - - def _tags_that_should_exist_or_be_updated(self, resource, tags): - existing_tags = self.get_tags(resource) - return [tag for tag in tags if tag not in existing_tags] - - def _tags_that_should_not_exist(self, resource, tags): - existing_tags = self.get_tags(resource) - return [tag for tag in existing_tags if tag not in tags] - - def ensure_tags(self, resource, resource_type=None): - if not resource_type or not resource: - self.fail_json(msg="Error: Missing resource or resource_type for tags.") - - if 'tags' in resource: - tags = self.module.params.get('tags') - if tags is not None: - self._process_tags(resource, resource_type, self._tags_that_should_not_exist(resource, tags), operation="delete") - self._process_tags(resource, resource_type, self._tags_that_should_exist_or_be_updated(resource, tags)) - resource['tags'] = self.query_tags(resource=resource, resource_type=resource_type) - return resource - - def get_capabilities(self, key=None): - if self.capabilities: - return self._get_by_key(key, self.capabilities) - capabilities = self.query_api('listCapabilities') - self.capabilities = capabilities['capability'] - return self._get_by_key(key, self.capabilities) - - def poll_job(self, job=None, key=None): - if 'jobid' in job: - while True: - res = self.query_api('queryAsyncJobResult', jobid=job['jobid']) - if res['jobstatus'] != 0 and 'jobresult' in res: - - if 'errortext' in res['jobresult']: - self.fail_json(msg="Failed: '%s'" % res['jobresult']['errortext']) - - if key and key in res['jobresult']: - job = res['jobresult'][key] - - break - time.sleep(2) - return job - - def update_result(self, resource, result=None): - if result is None: - result = dict() - if resource: - returns = self.common_returns.copy() - returns.update(self.returns) - for search_key, return_key in returns.items(): - if search_key in resource: - result[return_key] = resource[search_key] - - # Bad bad API does not always return int when it should. - for search_key, return_key in self.returns_to_int.items(): - if search_key in resource: - result[return_key] = int(resource[search_key]) - - if 'tags' in resource: - result['tags'] = resource['tags'] - return result - - def get_result(self, resource): - return self.update_result(resource, self.result) - - def get_result_and_facts(self, facts_name, resource): - result = self.get_result(resource) - - ansible_facts = { - facts_name: result.copy() - } - for k in ['diff', 'changed']: - if k in ansible_facts[facts_name]: - del ansible_facts[facts_name][k] - - result.update(ansible_facts=ansible_facts) - return result diff --git a/test/support/integration/plugins/modules/cs_role.py b/test/support/integration/plugins/modules/cs_role.py deleted file mode 100644 index 6db295bd81..0000000000 --- a/test/support/integration/plugins/modules/cs_role.py +++ /dev/null @@ -1,211 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# -# (c) 2016, René Moser <mail@renemoser.net> -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -from __future__ import absolute_import, division, print_function -__metaclass__ = type - -ANSIBLE_METADATA = {'metadata_version': '1.1', - 'status': ['preview'], - 'supported_by': 'community'} - - -DOCUMENTATION = ''' ---- -module: cs_role -short_description: Manages user roles on Apache CloudStack based clouds. -description: - - Create, update, delete user roles. -version_added: '2.3' -author: René Moser (@resmo) -options: - name: - description: - - Name of the role. - type: str - required: true - uuid: - description: - - ID of the role. - - If provided, I(uuid) is used as key. - type: str - aliases: [ id ] - role_type: - description: - - Type of the role. - - Only considered for creation. - type: str - default: User - choices: [ User, DomainAdmin, ResourceAdmin, Admin ] - description: - description: - - Description of the role. - type: str - state: - description: - - State of the role. - type: str - default: present - choices: [ present, absent ] -extends_documentation_fragment: cloudstack -''' - -EXAMPLES = ''' -- name: Ensure an user role is present - cs_role: - name: myrole_user - delegate_to: localhost - -- name: Ensure a role having particular ID is named as myrole_user - cs_role: - name: myrole_user - id: 04589590-ac63-4ffc-93f5-b698b8ac38b6 - delegate_to: localhost - -- name: Ensure a role is absent - cs_role: - name: myrole_user - state: absent - delegate_to: localhost -''' - -RETURN = ''' ---- -id: - description: UUID of the role. - returned: success - type: str - sample: 04589590-ac63-4ffc-93f5-b698b8ac38b6 -name: - description: Name of the role. - returned: success - type: str - sample: myrole -description: - description: Description of the role. - returned: success - type: str - sample: "This is my role description" -role_type: - description: Type of the role. - returned: success - type: str - sample: User -''' - -from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils.cloudstack import ( - AnsibleCloudStack, - cs_argument_spec, - cs_required_together, -) - - -class AnsibleCloudStackRole(AnsibleCloudStack): - - def __init__(self, module): - super(AnsibleCloudStackRole, self).__init__(module) - self.returns = { - 'type': 'role_type', - } - - def get_role(self): - uuid = self.module.params.get('uuid') - if uuid: - args = { - 'id': uuid, - } - roles = self.query_api('listRoles', **args) - if roles: - return roles['role'][0] - else: - args = { - 'name': self.module.params.get('name'), - } - roles = self.query_api('listRoles', **args) - if roles: - return roles['role'][0] - return None - - def present_role(self): - role = self.get_role() - if role: - role = self._update_role(role) - else: - role = self._create_role(role) - return role - - def _create_role(self, role): - self.result['changed'] = True - args = { - 'name': self.module.params.get('name'), - 'type': self.module.params.get('role_type'), - 'description': self.module.params.get('description'), - } - if not self.module.check_mode: - res = self.query_api('createRole', **args) - role = res['role'] - return role - - def _update_role(self, role): - args = { - 'id': role['id'], - 'name': self.module.params.get('name'), - 'description': self.module.params.get('description'), - } - if self.has_changed(args, role): - self.result['changed'] = True - if not self.module.check_mode: - res = self.query_api('updateRole', **args) - - # The API as in 4.9 does not return an updated role yet - if 'role' not in res: - role = self.get_role() - else: - role = res['role'] - return role - - def absent_role(self): - role = self.get_role() - if role: - self.result['changed'] = True - args = { - 'id': role['id'], - } - if not self.module.check_mode: - self.query_api('deleteRole', **args) - return role - - -def main(): - argument_spec = cs_argument_spec() - argument_spec.update(dict( - uuid=dict(aliases=['id']), - name=dict(required=True), - description=dict(), - role_type=dict(choices=['User', 'DomainAdmin', 'ResourceAdmin', 'Admin'], default='User'), - state=dict(choices=['present', 'absent'], default='present'), - )) - - module = AnsibleModule( - argument_spec=argument_spec, - required_together=cs_required_together(), - supports_check_mode=True - ) - - acs_role = AnsibleCloudStackRole(module) - state = module.params.get('state') - if state == 'absent': - role = acs_role.absent_role() - else: - role = acs_role.present_role() - - result = acs_role.get_result(role) - - module.exit_json(**result) - - -if __name__ == '__main__': - main() diff --git a/test/support/integration/plugins/modules/cs_role_permission.py b/test/support/integration/plugins/modules/cs_role_permission.py deleted file mode 100644 index 30392b2f87..0000000000 --- a/test/support/integration/plugins/modules/cs_role_permission.py +++ /dev/null @@ -1,351 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# -# Copyright (c) 2017, David Passante (@dpassante) -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -from __future__ import absolute_import, division, print_function -__metaclass__ = type - -ANSIBLE_METADATA = {'metadata_version': '1.1', - 'status': ['preview'], - 'supported_by': 'community'} - -DOCUMENTATION = ''' ---- -module: cs_role_permission -short_description: Manages role permissions on Apache CloudStack based clouds. -description: - - Create, update and remove CloudStack role permissions. - - Managing role permissions only supported in CloudStack >= 4.9. -version_added: '2.6' -author: David Passante (@dpassante) -options: - name: - description: - - The API name of the permission. - type: str - required: true - role: - description: - - Name or ID of the role. - type: str - required: true - permission: - description: - - The rule permission, allow or deny. Defaulted to deny. - type: str - choices: [ allow, deny ] - default: deny - state: - description: - - State of the role permission. - type: str - choices: [ present, absent ] - default: present - description: - description: - - The description of the role permission. - type: str - parent: - description: - - The parent role permission uuid. use 0 to move this rule at the top of the list. - type: str -extends_documentation_fragment: cloudstack -''' - -EXAMPLES = ''' -- name: Create a role permission - cs_role_permission: - role: My_Custom_role - name: createVPC - permission: allow - description: My comments - delegate_to: localhost - -- name: Remove a role permission - cs_role_permission: - state: absent - role: My_Custom_role - name: createVPC - delegate_to: localhost - -- name: Update a system role permission - cs_role_permission: - role: Domain Admin - name: createVPC - permission: deny - delegate_to: localhost - -- name: Update rules order. Move the rule at the top of list - cs_role_permission: - role: Domain Admin - name: createVPC - parent: 0 - delegate_to: localhost -''' - -RETURN = ''' ---- -id: - description: The ID of the role permission. - returned: success - type: str - sample: a6f7a5fc-43f8-11e5-a151-feff819cdc9f -name: - description: The API name of the permission. - returned: success - type: str - sample: createVPC -permission: - description: The permission type of the api name. - returned: success - type: str - sample: allow -role_id: - description: The ID of the role to which the role permission belongs. - returned: success - type: str - sample: c6f7a5fc-43f8-11e5-a151-feff819cdc7f -description: - description: The description of the role permission - returned: success - type: str - sample: Deny createVPC for users -''' - -from distutils.version import LooseVersion - -from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils.cloudstack import ( - AnsibleCloudStack, - cs_argument_spec, - cs_required_together, -) - - -class AnsibleCloudStackRolePermission(AnsibleCloudStack): - - def __init__(self, module): - super(AnsibleCloudStackRolePermission, self).__init__(module) - cloudstack_min_version = LooseVersion('4.9.2') - - self.returns = { - 'id': 'id', - 'roleid': 'role_id', - 'rule': 'name', - 'permission': 'permission', - 'description': 'description', - } - self.role_permission = None - - self.cloudstack_version = self._cloudstack_ver() - - if self.cloudstack_version < cloudstack_min_version: - self.fail_json(msg="This module requires CloudStack >= %s." % cloudstack_min_version) - - def _cloudstack_ver(self): - capabilities = self.get_capabilities() - return LooseVersion(capabilities['cloudstackversion']) - - def _get_role_id(self): - role = self.module.params.get('role') - if not role: - return None - - res = self.query_api('listRoles') - roles = res['role'] - if roles: - for r in roles: - if role in [r['name'], r['id']]: - return r['id'] - self.fail_json(msg="Role '%s' not found" % role) - - def _get_role_perm(self): - role_permission = self.role_permission - - args = { - 'roleid': self._get_role_id(), - } - - rp = self.query_api('listRolePermissions', **args) - - if rp: - role_permission = rp['rolepermission'] - - return role_permission - - def _get_rule(self, rule=None): - if not rule: - rule = self.module.params.get('name') - - if self._get_role_perm(): - for _rule in self._get_role_perm(): - if rule == _rule['rule'] or rule == _rule['id']: - return _rule - - return None - - def _get_rule_order(self): - perms = self._get_role_perm() - rules = [] - - if perms: - for i, rule in enumerate(perms): - rules.append(rule['id']) - - return rules - - def replace_rule(self): - old_rule = self._get_rule() - - if old_rule: - rules_order = self._get_rule_order() - old_pos = rules_order.index(old_rule['id']) - - self.remove_role_perm() - - new_rule = self.create_role_perm() - - if new_rule: - perm_order = self.order_permissions(int(old_pos - 1), new_rule['id']) - - return perm_order - - return None - - def order_permissions(self, parent, rule_id): - rules = self._get_rule_order() - - if isinstance(parent, int): - parent_pos = parent - elif parent == '0': - parent_pos = -1 - else: - parent_rule = self._get_rule(parent) - if not parent_rule: - self.fail_json(msg="Parent rule '%s' not found" % parent) - - parent_pos = rules.index(parent_rule['id']) - - r_id = rules.pop(rules.index(rule_id)) - - rules.insert((parent_pos + 1), r_id) - rules = ','.join(map(str, rules)) - - return rules - - def create_or_update_role_perm(self): - role_permission = self._get_rule() - - if not role_permission: - role_permission = self.create_role_perm() - else: - role_permission = self.update_role_perm(role_permission) - - return role_permission - - def create_role_perm(self): - role_permission = None - - self.result['changed'] = True - - args = { - 'rule': self.module.params.get('name'), - 'description': self.module.params.get('description'), - 'roleid': self._get_role_id(), - 'permission': self.module.params.get('permission'), - } - - if not self.module.check_mode: - res = self.query_api('createRolePermission', **args) - role_permission = res['rolepermission'] - - return role_permission - - def update_role_perm(self, role_perm): - perm_order = None - - if not self.module.params.get('parent'): - args = { - 'ruleid': role_perm['id'], - 'roleid': role_perm['roleid'], - 'permission': self.module.params.get('permission'), - } - - if self.has_changed(args, role_perm, only_keys=['permission']): - self.result['changed'] = True - - if not self.module.check_mode: - if self.cloudstack_version >= LooseVersion('4.11.0'): - self.query_api('updateRolePermission', **args) - role_perm = self._get_rule() - else: - perm_order = self.replace_rule() - else: - perm_order = self.order_permissions(self.module.params.get('parent'), role_perm['id']) - - if perm_order: - args = { - 'roleid': role_perm['roleid'], - 'ruleorder': perm_order, - } - - self.result['changed'] = True - - if not self.module.check_mode: - self.query_api('updateRolePermission', **args) - role_perm = self._get_rule() - - return role_perm - - def remove_role_perm(self): - role_permission = self._get_rule() - - if role_permission: - self.result['changed'] = True - - args = { - 'id': role_permission['id'], - } - - if not self.module.check_mode: - self.query_api('deleteRolePermission', **args) - - return role_permission - - -def main(): - argument_spec = cs_argument_spec() - argument_spec.update(dict( - role=dict(required=True), - name=dict(required=True), - permission=dict(choices=['allow', 'deny'], default='deny'), - description=dict(), - state=dict(choices=['present', 'absent'], default='present'), - parent=dict(), - )) - - module = AnsibleModule( - argument_spec=argument_spec, - required_together=cs_required_together(), - mutually_exclusive=( - ['permission', 'parent'], - ), - supports_check_mode=True - ) - - acs_role_perm = AnsibleCloudStackRolePermission(module) - - state = module.params.get('state') - if state in ['absent']: - role_permission = acs_role_perm.remove_role_perm() - else: - role_permission = acs_role_perm.create_or_update_role_perm() - - result = acs_role_perm.get_result(role_permission) - module.exit_json(**result) - - -if __name__ == '__main__': - main() |