summaryrefslogtreecommitdiff
path: root/test/support
diff options
context:
space:
mode:
authorMatt Martz <matt@sivel.net>2020-11-02 10:35:13 -0600
committerGitHub <noreply@github.com>2020-11-02 10:35:13 -0600
commit6543c7bc5d7cb0b4719004b9a791b4ed22d9c5af (patch)
tree51edf8bb8ca0800bb211a521c6e2927db20dbb8b /test/support
parent2ee5af514b2585f90833c1e06a6c347b53c24930 (diff)
downloadansible-6543c7bc5d7cb0b4719004b9a791b4ed22d9c5af.tar.gz
Remove incidental_cs_role_permission (#72380)
* Add explicit argspec tests for choices * ci_complete ci_coverage * Remove incidental_cs_role_permission * ci_complete ci_coverage * ci_complete ci_coverage
Diffstat (limited to 'test/support')
-rw-r--r--test/support/integration/plugins/module_utils/cloudstack.py664
-rw-r--r--test/support/integration/plugins/modules/cs_role.py211
-rw-r--r--test/support/integration/plugins/modules/cs_role_permission.py351
3 files changed, 0 insertions, 1226 deletions
diff --git a/test/support/integration/plugins/module_utils/cloudstack.py b/test/support/integration/plugins/module_utils/cloudstack.py
deleted file mode 100644
index 85a53b6b6e..0000000000
--- a/test/support/integration/plugins/module_utils/cloudstack.py
+++ /dev/null
@@ -1,664 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright (c) 2015, René Moser <mail@renemoser.net>
-# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
-
-from __future__ import absolute_import, division, print_function
-__metaclass__ = type
-
-
-import os
-import sys
-import time
-import traceback
-
-from ansible.module_utils._text import to_text, to_native
-from ansible.module_utils.basic import missing_required_lib
-
-CS_IMP_ERR = None
-try:
- from cs import CloudStack, CloudStackException, read_config
- HAS_LIB_CS = True
-except ImportError:
- CS_IMP_ERR = traceback.format_exc()
- HAS_LIB_CS = False
-
-
-if sys.version_info > (3,):
- long = int
-
-
-def cs_argument_spec():
- return dict(
- api_key=dict(default=os.environ.get('CLOUDSTACK_KEY')),
- api_secret=dict(default=os.environ.get('CLOUDSTACK_SECRET'), no_log=True),
- api_url=dict(default=os.environ.get('CLOUDSTACK_ENDPOINT')),
- api_http_method=dict(choices=['get', 'post'], default=os.environ.get('CLOUDSTACK_METHOD')),
- api_timeout=dict(type='int', default=os.environ.get('CLOUDSTACK_TIMEOUT')),
- api_region=dict(default=os.environ.get('CLOUDSTACK_REGION') or 'cloudstack'),
- )
-
-
-def cs_required_together():
- return [['api_key', 'api_secret']]
-
-
-class AnsibleCloudStack:
-
- def __init__(self, module):
- if not HAS_LIB_CS:
- module.fail_json(msg=missing_required_lib('cs'), exception=CS_IMP_ERR)
-
- self.result = {
- 'changed': False,
- 'diff': {
- 'before': dict(),
- 'after': dict()
- }
- }
-
- # Common returns, will be merged with self.returns
- # search_for_key: replace_with_key
- self.common_returns = {
- 'id': 'id',
- 'name': 'name',
- 'created': 'created',
- 'zonename': 'zone',
- 'state': 'state',
- 'project': 'project',
- 'account': 'account',
- 'domain': 'domain',
- 'displaytext': 'display_text',
- 'displayname': 'display_name',
- 'description': 'description',
- }
-
- # Init returns dict for use in subclasses
- self.returns = {}
- # these values will be casted to int
- self.returns_to_int = {}
- # these keys will be compared case sensitive in self.has_changed()
- self.case_sensitive_keys = [
- 'id',
- 'displaytext',
- 'displayname',
- 'description',
- ]
-
- self.module = module
- self._cs = None
-
- # Helper for VPCs
- self._vpc_networks_ids = None
-
- self.domain = None
- self.account = None
- self.project = None
- self.ip_address = None
- self.network = None
- self.physical_network = None
- self.vpc = None
- self.zone = None
- self.vm = None
- self.vm_default_nic = None
- self.os_type = None
- self.hypervisor = None
- self.capabilities = None
- self.network_acl = None
-
- @property
- def cs(self):
- if self._cs is None:
- api_config = self.get_api_config()
- self._cs = CloudStack(**api_config)
- return self._cs
-
- def get_api_config(self):
- api_region = self.module.params.get('api_region') or os.environ.get('CLOUDSTACK_REGION')
- try:
- config = read_config(api_region)
- except KeyError:
- config = {}
-
- api_config = {
- 'endpoint': self.module.params.get('api_url') or config.get('endpoint'),
- 'key': self.module.params.get('api_key') or config.get('key'),
- 'secret': self.module.params.get('api_secret') or config.get('secret'),
- 'timeout': self.module.params.get('api_timeout') or config.get('timeout') or 10,
- 'method': self.module.params.get('api_http_method') or config.get('method') or 'get',
- }
- self.result.update({
- 'api_region': api_region,
- 'api_url': api_config['endpoint'],
- 'api_key': api_config['key'],
- 'api_timeout': int(api_config['timeout']),
- 'api_http_method': api_config['method'],
- })
- if not all([api_config['endpoint'], api_config['key'], api_config['secret']]):
- self.fail_json(msg="Missing api credentials: can not authenticate")
- return api_config
-
- def fail_json(self, **kwargs):
- self.result.update(kwargs)
- self.module.fail_json(**self.result)
-
- def get_or_fallback(self, key=None, fallback_key=None):
- value = self.module.params.get(key)
- if not value:
- value = self.module.params.get(fallback_key)
- return value
-
- def has_changed(self, want_dict, current_dict, only_keys=None, skip_diff_for_keys=None):
- result = False
- for key, value in want_dict.items():
-
- # Optionally limit by a list of keys
- if only_keys and key not in only_keys:
- continue
-
- # Skip None values
- if value is None:
- continue
-
- if key in current_dict:
- if isinstance(value, (int, float, long, complex)):
-
- # ensure we compare the same type
- if isinstance(value, int):
- current_dict[key] = int(current_dict[key])
- elif isinstance(value, float):
- current_dict[key] = float(current_dict[key])
- elif isinstance(value, long):
- current_dict[key] = long(current_dict[key])
- elif isinstance(value, complex):
- current_dict[key] = complex(current_dict[key])
-
- if value != current_dict[key]:
- if skip_diff_for_keys and key not in skip_diff_for_keys:
- self.result['diff']['before'][key] = current_dict[key]
- self.result['diff']['after'][key] = value
- result = True
- else:
- before_value = to_text(current_dict[key])
- after_value = to_text(value)
-
- if self.case_sensitive_keys and key in self.case_sensitive_keys:
- if before_value != after_value:
- if skip_diff_for_keys and key not in skip_diff_for_keys:
- self.result['diff']['before'][key] = before_value
- self.result['diff']['after'][key] = after_value
- result = True
-
- # Test for diff in case insensitive way
- elif before_value.lower() != after_value.lower():
- if skip_diff_for_keys and key not in skip_diff_for_keys:
- self.result['diff']['before'][key] = before_value
- self.result['diff']['after'][key] = after_value
- result = True
- else:
- if skip_diff_for_keys and key not in skip_diff_for_keys:
- self.result['diff']['before'][key] = None
- self.result['diff']['after'][key] = to_text(value)
- result = True
- return result
-
- def _get_by_key(self, key=None, my_dict=None):
- if my_dict is None:
- my_dict = {}
- if key:
- if key in my_dict:
- return my_dict[key]
- self.fail_json(msg="Something went wrong: %s not found" % key)
- return my_dict
-
- def query_api(self, command, **args):
- try:
- res = getattr(self.cs, command)(**args)
-
- if 'errortext' in res:
- self.fail_json(msg="Failed: '%s'" % res['errortext'])
-
- except CloudStackException as e:
- self.fail_json(msg='CloudStackException: %s' % to_native(e))
-
- except Exception as e:
- self.fail_json(msg=to_native(e))
-
- return res
-
- def get_network_acl(self, key=None):
- if self.network_acl is None:
- args = {
- 'name': self.module.params.get('network_acl'),
- 'vpcid': self.get_vpc(key='id'),
- }
- network_acls = self.query_api('listNetworkACLLists', **args)
- if network_acls:
- self.network_acl = network_acls['networkacllist'][0]
- self.result['network_acl'] = self.network_acl['name']
- if self.network_acl:
- return self._get_by_key(key, self.network_acl)
- else:
- self.fail_json(msg="Network ACL %s not found" % self.module.params.get('network_acl'))
-
- def get_vpc(self, key=None):
- """Return a VPC dictionary or the value of given key of."""
- if self.vpc:
- return self._get_by_key(key, self.vpc)
-
- vpc = self.module.params.get('vpc')
- if not vpc:
- vpc = os.environ.get('CLOUDSTACK_VPC')
- if not vpc:
- return None
-
- args = {
- 'account': self.get_account(key='name'),
- 'domainid': self.get_domain(key='id'),
- 'projectid': self.get_project(key='id'),
- 'zoneid': self.get_zone(key='id'),
- }
- vpcs = self.query_api('listVPCs', **args)
- if not vpcs:
- self.fail_json(msg="No VPCs available.")
-
- for v in vpcs['vpc']:
- if vpc in [v['name'], v['displaytext'], v['id']]:
- # Fail if the identifyer matches more than one VPC
- if self.vpc:
- self.fail_json(msg="More than one VPC found with the provided identifyer '%s'" % vpc)
- else:
- self.vpc = v
- self.result['vpc'] = v['name']
- if self.vpc:
- return self._get_by_key(key, self.vpc)
- self.fail_json(msg="VPC '%s' not found" % vpc)
-
- def is_vpc_network(self, network_id):
- """Returns True if network is in VPC."""
- # This is an efficient way to query a lot of networks at a time
- if self._vpc_networks_ids is None:
- args = {
- 'account': self.get_account(key='name'),
- 'domainid': self.get_domain(key='id'),
- 'projectid': self.get_project(key='id'),
- 'zoneid': self.get_zone(key='id'),
- }
- vpcs = self.query_api('listVPCs', **args)
- self._vpc_networks_ids = []
- if vpcs:
- for vpc in vpcs['vpc']:
- for n in vpc.get('network', []):
- self._vpc_networks_ids.append(n['id'])
- return network_id in self._vpc_networks_ids
-
- def get_physical_network(self, key=None):
- if self.physical_network:
- return self._get_by_key(key, self.physical_network)
- physical_network = self.module.params.get('physical_network')
- args = {
- 'zoneid': self.get_zone(key='id')
- }
- physical_networks = self.query_api('listPhysicalNetworks', **args)
- if not physical_networks:
- self.fail_json(msg="No physical networks available.")
-
- for net in physical_networks['physicalnetwork']:
- if physical_network in [net['name'], net['id']]:
- self.physical_network = net
- self.result['physical_network'] = net['name']
- return self._get_by_key(key, self.physical_network)
- self.fail_json(msg="Physical Network '%s' not found" % physical_network)
-
- def get_network(self, key=None):
- """Return a network dictionary or the value of given key of."""
- if self.network:
- return self._get_by_key(key, self.network)
-
- network = self.module.params.get('network')
- if not network:
- vpc_name = self.get_vpc(key='name')
- if vpc_name:
- self.fail_json(msg="Could not find network for VPC '%s' due missing argument: network" % vpc_name)
- return None
-
- args = {
- 'account': self.get_account(key='name'),
- 'domainid': self.get_domain(key='id'),
- 'projectid': self.get_project(key='id'),
- 'zoneid': self.get_zone(key='id'),
- 'vpcid': self.get_vpc(key='id')
- }
- networks = self.query_api('listNetworks', **args)
- if not networks:
- self.fail_json(msg="No networks available.")
-
- for n in networks['network']:
- # ignore any VPC network if vpc param is not given
- if 'vpcid' in n and not self.get_vpc(key='id'):
- continue
- if network in [n['displaytext'], n['name'], n['id']]:
- self.result['network'] = n['name']
- self.network = n
- return self._get_by_key(key, self.network)
- self.fail_json(msg="Network '%s' not found" % network)
-
- def get_project(self, key=None):
- if self.project:
- return self._get_by_key(key, self.project)
-
- project = self.module.params.get('project')
- if not project:
- project = os.environ.get('CLOUDSTACK_PROJECT')
- if not project:
- return None
- args = {
- 'account': self.get_account(key='name'),
- 'domainid': self.get_domain(key='id')
- }
- projects = self.query_api('listProjects', **args)
- if projects:
- for p in projects['project']:
- if project.lower() in [p['name'].lower(), p['id']]:
- self.result['project'] = p['name']
- self.project = p
- return self._get_by_key(key, self.project)
- self.fail_json(msg="project '%s' not found" % project)
-
- def get_ip_address(self, key=None):
- if self.ip_address:
- return self._get_by_key(key, self.ip_address)
-
- ip_address = self.module.params.get('ip_address')
- if not ip_address:
- self.fail_json(msg="IP address param 'ip_address' is required")
-
- args = {
- 'ipaddress': ip_address,
- 'account': self.get_account(key='name'),
- 'domainid': self.get_domain(key='id'),
- 'projectid': self.get_project(key='id'),
- 'vpcid': self.get_vpc(key='id'),
- }
-
- ip_addresses = self.query_api('listPublicIpAddresses', **args)
-
- if not ip_addresses:
- self.fail_json(msg="IP address '%s' not found" % args['ipaddress'])
-
- self.ip_address = ip_addresses['publicipaddress'][0]
- return self._get_by_key(key, self.ip_address)
-
- def get_vm_guest_ip(self):
- vm_guest_ip = self.module.params.get('vm_guest_ip')
- default_nic = self.get_vm_default_nic()
-
- if not vm_guest_ip:
- return default_nic['ipaddress']
-
- for secondary_ip in default_nic['secondaryip']:
- if vm_guest_ip == secondary_ip['ipaddress']:
- return vm_guest_ip
- self.fail_json(msg="Secondary IP '%s' not assigned to VM" % vm_guest_ip)
-
- def get_vm_default_nic(self):
- if self.vm_default_nic:
- return self.vm_default_nic
-
- nics = self.query_api('listNics', virtualmachineid=self.get_vm(key='id'))
- if nics:
- for n in nics['nic']:
- if n['isdefault']:
- self.vm_default_nic = n
- return self.vm_default_nic
- self.fail_json(msg="No default IP address of VM '%s' found" % self.module.params.get('vm'))
-
- def get_vm(self, key=None, filter_zone=True):
- if self.vm:
- return self._get_by_key(key, self.vm)
-
- vm = self.module.params.get('vm')
- if not vm:
- self.fail_json(msg="Virtual machine param 'vm' is required")
-
- args = {
- 'account': self.get_account(key='name'),
- 'domainid': self.get_domain(key='id'),
- 'projectid': self.get_project(key='id'),
- 'zoneid': self.get_zone(key='id') if filter_zone else None,
- 'fetch_list': True,
- }
- vms = self.query_api('listVirtualMachines', **args)
- if vms:
- for v in vms:
- if vm.lower() in [v['name'].lower(), v['displayname'].lower(), v['id']]:
- self.vm = v
- return self._get_by_key(key, self.vm)
- self.fail_json(msg="Virtual machine '%s' not found" % vm)
-
- def get_disk_offering(self, key=None):
- disk_offering = self.module.params.get('disk_offering')
- if not disk_offering:
- return None
-
- # Do not add domain filter for disk offering listing.
- disk_offerings = self.query_api('listDiskOfferings')
- if disk_offerings:
- for d in disk_offerings['diskoffering']:
- if disk_offering in [d['displaytext'], d['name'], d['id']]:
- return self._get_by_key(key, d)
- self.fail_json(msg="Disk offering '%s' not found" % disk_offering)
-
- def get_zone(self, key=None):
- if self.zone:
- return self._get_by_key(key, self.zone)
-
- zone = self.module.params.get('zone')
- if not zone:
- zone = os.environ.get('CLOUDSTACK_ZONE')
- zones = self.query_api('listZones')
-
- if not zones:
- self.fail_json(msg="No zones available. Please create a zone first")
-
- # use the first zone if no zone param given
- if not zone:
- self.zone = zones['zone'][0]
- self.result['zone'] = self.zone['name']
- return self._get_by_key(key, self.zone)
-
- if zones:
- for z in zones['zone']:
- if zone.lower() in [z['name'].lower(), z['id']]:
- self.result['zone'] = z['name']
- self.zone = z
- return self._get_by_key(key, self.zone)
- self.fail_json(msg="zone '%s' not found" % zone)
-
- def get_os_type(self, key=None):
- if self.os_type:
- return self._get_by_key(key, self.zone)
-
- os_type = self.module.params.get('os_type')
- if not os_type:
- return None
-
- os_types = self.query_api('listOsTypes')
- if os_types:
- for o in os_types['ostype']:
- if os_type in [o['description'], o['id']]:
- self.os_type = o
- return self._get_by_key(key, self.os_type)
- self.fail_json(msg="OS type '%s' not found" % os_type)
-
- def get_hypervisor(self):
- if self.hypervisor:
- return self.hypervisor
-
- hypervisor = self.module.params.get('hypervisor')
- hypervisors = self.query_api('listHypervisors')
-
- # use the first hypervisor if no hypervisor param given
- if not hypervisor:
- self.hypervisor = hypervisors['hypervisor'][0]['name']
- return self.hypervisor
-
- for h in hypervisors['hypervisor']:
- if hypervisor.lower() == h['name'].lower():
- self.hypervisor = h['name']
- return self.hypervisor
- self.fail_json(msg="Hypervisor '%s' not found" % hypervisor)
-
- def get_account(self, key=None):
- if self.account:
- return self._get_by_key(key, self.account)
-
- account = self.module.params.get('account')
- if not account:
- account = os.environ.get('CLOUDSTACK_ACCOUNT')
- if not account:
- return None
-
- domain = self.module.params.get('domain')
- if not domain:
- self.fail_json(msg="Account must be specified with Domain")
-
- args = {
- 'name': account,
- 'domainid': self.get_domain(key='id'),
- 'listall': True
- }
- accounts = self.query_api('listAccounts', **args)
- if accounts:
- self.account = accounts['account'][0]
- self.result['account'] = self.account['name']
- return self._get_by_key(key, self.account)
- self.fail_json(msg="Account '%s' not found" % account)
-
- def get_domain(self, key=None):
- if self.domain:
- return self._get_by_key(key, self.domain)
-
- domain = self.module.params.get('domain')
- if not domain:
- domain = os.environ.get('CLOUDSTACK_DOMAIN')
- if not domain:
- return None
-
- args = {
- 'listall': True,
- }
- domains = self.query_api('listDomains', **args)
- if domains:
- for d in domains['domain']:
- if d['path'].lower() in [domain.lower(), "root/" + domain.lower(), "root" + domain.lower()]:
- self.domain = d
- self.result['domain'] = d['path']
- return self._get_by_key(key, self.domain)
- self.fail_json(msg="Domain '%s' not found" % domain)
-
- def query_tags(self, resource, resource_type):
- args = {
- 'resourceid': resource['id'],
- 'resourcetype': resource_type,
- }
- tags = self.query_api('listTags', **args)
- return self.get_tags(resource=tags, key='tag')
-
- def get_tags(self, resource=None, key='tags'):
- existing_tags = []
- for tag in resource.get(key) or []:
- existing_tags.append({'key': tag['key'], 'value': tag['value']})
- return existing_tags
-
- def _process_tags(self, resource, resource_type, tags, operation="create"):
- if tags:
- self.result['changed'] = True
- if not self.module.check_mode:
- args = {
- 'resourceids': resource['id'],
- 'resourcetype': resource_type,
- 'tags': tags,
- }
- if operation == "create":
- response = self.query_api('createTags', **args)
- else:
- response = self.query_api('deleteTags', **args)
- self.poll_job(response)
-
- def _tags_that_should_exist_or_be_updated(self, resource, tags):
- existing_tags = self.get_tags(resource)
- return [tag for tag in tags if tag not in existing_tags]
-
- def _tags_that_should_not_exist(self, resource, tags):
- existing_tags = self.get_tags(resource)
- return [tag for tag in existing_tags if tag not in tags]
-
- def ensure_tags(self, resource, resource_type=None):
- if not resource_type or not resource:
- self.fail_json(msg="Error: Missing resource or resource_type for tags.")
-
- if 'tags' in resource:
- tags = self.module.params.get('tags')
- if tags is not None:
- self._process_tags(resource, resource_type, self._tags_that_should_not_exist(resource, tags), operation="delete")
- self._process_tags(resource, resource_type, self._tags_that_should_exist_or_be_updated(resource, tags))
- resource['tags'] = self.query_tags(resource=resource, resource_type=resource_type)
- return resource
-
- def get_capabilities(self, key=None):
- if self.capabilities:
- return self._get_by_key(key, self.capabilities)
- capabilities = self.query_api('listCapabilities')
- self.capabilities = capabilities['capability']
- return self._get_by_key(key, self.capabilities)
-
- def poll_job(self, job=None, key=None):
- if 'jobid' in job:
- while True:
- res = self.query_api('queryAsyncJobResult', jobid=job['jobid'])
- if res['jobstatus'] != 0 and 'jobresult' in res:
-
- if 'errortext' in res['jobresult']:
- self.fail_json(msg="Failed: '%s'" % res['jobresult']['errortext'])
-
- if key and key in res['jobresult']:
- job = res['jobresult'][key]
-
- break
- time.sleep(2)
- return job
-
- def update_result(self, resource, result=None):
- if result is None:
- result = dict()
- if resource:
- returns = self.common_returns.copy()
- returns.update(self.returns)
- for search_key, return_key in returns.items():
- if search_key in resource:
- result[return_key] = resource[search_key]
-
- # Bad bad API does not always return int when it should.
- for search_key, return_key in self.returns_to_int.items():
- if search_key in resource:
- result[return_key] = int(resource[search_key])
-
- if 'tags' in resource:
- result['tags'] = resource['tags']
- return result
-
- def get_result(self, resource):
- return self.update_result(resource, self.result)
-
- def get_result_and_facts(self, facts_name, resource):
- result = self.get_result(resource)
-
- ansible_facts = {
- facts_name: result.copy()
- }
- for k in ['diff', 'changed']:
- if k in ansible_facts[facts_name]:
- del ansible_facts[facts_name][k]
-
- result.update(ansible_facts=ansible_facts)
- return result
diff --git a/test/support/integration/plugins/modules/cs_role.py b/test/support/integration/plugins/modules/cs_role.py
deleted file mode 100644
index 6db295bd81..0000000000
--- a/test/support/integration/plugins/modules/cs_role.py
+++ /dev/null
@@ -1,211 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-#
-# (c) 2016, René Moser <mail@renemoser.net>
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-__metaclass__ = type
-
-ANSIBLE_METADATA = {'metadata_version': '1.1',
- 'status': ['preview'],
- 'supported_by': 'community'}
-
-
-DOCUMENTATION = '''
----
-module: cs_role
-short_description: Manages user roles on Apache CloudStack based clouds.
-description:
- - Create, update, delete user roles.
-version_added: '2.3'
-author: René Moser (@resmo)
-options:
- name:
- description:
- - Name of the role.
- type: str
- required: true
- uuid:
- description:
- - ID of the role.
- - If provided, I(uuid) is used as key.
- type: str
- aliases: [ id ]
- role_type:
- description:
- - Type of the role.
- - Only considered for creation.
- type: str
- default: User
- choices: [ User, DomainAdmin, ResourceAdmin, Admin ]
- description:
- description:
- - Description of the role.
- type: str
- state:
- description:
- - State of the role.
- type: str
- default: present
- choices: [ present, absent ]
-extends_documentation_fragment: cloudstack
-'''
-
-EXAMPLES = '''
-- name: Ensure an user role is present
- cs_role:
- name: myrole_user
- delegate_to: localhost
-
-- name: Ensure a role having particular ID is named as myrole_user
- cs_role:
- name: myrole_user
- id: 04589590-ac63-4ffc-93f5-b698b8ac38b6
- delegate_to: localhost
-
-- name: Ensure a role is absent
- cs_role:
- name: myrole_user
- state: absent
- delegate_to: localhost
-'''
-
-RETURN = '''
----
-id:
- description: UUID of the role.
- returned: success
- type: str
- sample: 04589590-ac63-4ffc-93f5-b698b8ac38b6
-name:
- description: Name of the role.
- returned: success
- type: str
- sample: myrole
-description:
- description: Description of the role.
- returned: success
- type: str
- sample: "This is my role description"
-role_type:
- description: Type of the role.
- returned: success
- type: str
- sample: User
-'''
-
-from ansible.module_utils.basic import AnsibleModule
-from ansible.module_utils.cloudstack import (
- AnsibleCloudStack,
- cs_argument_spec,
- cs_required_together,
-)
-
-
-class AnsibleCloudStackRole(AnsibleCloudStack):
-
- def __init__(self, module):
- super(AnsibleCloudStackRole, self).__init__(module)
- self.returns = {
- 'type': 'role_type',
- }
-
- def get_role(self):
- uuid = self.module.params.get('uuid')
- if uuid:
- args = {
- 'id': uuid,
- }
- roles = self.query_api('listRoles', **args)
- if roles:
- return roles['role'][0]
- else:
- args = {
- 'name': self.module.params.get('name'),
- }
- roles = self.query_api('listRoles', **args)
- if roles:
- return roles['role'][0]
- return None
-
- def present_role(self):
- role = self.get_role()
- if role:
- role = self._update_role(role)
- else:
- role = self._create_role(role)
- return role
-
- def _create_role(self, role):
- self.result['changed'] = True
- args = {
- 'name': self.module.params.get('name'),
- 'type': self.module.params.get('role_type'),
- 'description': self.module.params.get('description'),
- }
- if not self.module.check_mode:
- res = self.query_api('createRole', **args)
- role = res['role']
- return role
-
- def _update_role(self, role):
- args = {
- 'id': role['id'],
- 'name': self.module.params.get('name'),
- 'description': self.module.params.get('description'),
- }
- if self.has_changed(args, role):
- self.result['changed'] = True
- if not self.module.check_mode:
- res = self.query_api('updateRole', **args)
-
- # The API as in 4.9 does not return an updated role yet
- if 'role' not in res:
- role = self.get_role()
- else:
- role = res['role']
- return role
-
- def absent_role(self):
- role = self.get_role()
- if role:
- self.result['changed'] = True
- args = {
- 'id': role['id'],
- }
- if not self.module.check_mode:
- self.query_api('deleteRole', **args)
- return role
-
-
-def main():
- argument_spec = cs_argument_spec()
- argument_spec.update(dict(
- uuid=dict(aliases=['id']),
- name=dict(required=True),
- description=dict(),
- role_type=dict(choices=['User', 'DomainAdmin', 'ResourceAdmin', 'Admin'], default='User'),
- state=dict(choices=['present', 'absent'], default='present'),
- ))
-
- module = AnsibleModule(
- argument_spec=argument_spec,
- required_together=cs_required_together(),
- supports_check_mode=True
- )
-
- acs_role = AnsibleCloudStackRole(module)
- state = module.params.get('state')
- if state == 'absent':
- role = acs_role.absent_role()
- else:
- role = acs_role.present_role()
-
- result = acs_role.get_result(role)
-
- module.exit_json(**result)
-
-
-if __name__ == '__main__':
- main()
diff --git a/test/support/integration/plugins/modules/cs_role_permission.py b/test/support/integration/plugins/modules/cs_role_permission.py
deleted file mode 100644
index 30392b2f87..0000000000
--- a/test/support/integration/plugins/modules/cs_role_permission.py
+++ /dev/null
@@ -1,351 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-#
-# Copyright (c) 2017, David Passante (@dpassante)
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import absolute_import, division, print_function
-__metaclass__ = type
-
-ANSIBLE_METADATA = {'metadata_version': '1.1',
- 'status': ['preview'],
- 'supported_by': 'community'}
-
-DOCUMENTATION = '''
----
-module: cs_role_permission
-short_description: Manages role permissions on Apache CloudStack based clouds.
-description:
- - Create, update and remove CloudStack role permissions.
- - Managing role permissions only supported in CloudStack >= 4.9.
-version_added: '2.6'
-author: David Passante (@dpassante)
-options:
- name:
- description:
- - The API name of the permission.
- type: str
- required: true
- role:
- description:
- - Name or ID of the role.
- type: str
- required: true
- permission:
- description:
- - The rule permission, allow or deny. Defaulted to deny.
- type: str
- choices: [ allow, deny ]
- default: deny
- state:
- description:
- - State of the role permission.
- type: str
- choices: [ present, absent ]
- default: present
- description:
- description:
- - The description of the role permission.
- type: str
- parent:
- description:
- - The parent role permission uuid. use 0 to move this rule at the top of the list.
- type: str
-extends_documentation_fragment: cloudstack
-'''
-
-EXAMPLES = '''
-- name: Create a role permission
- cs_role_permission:
- role: My_Custom_role
- name: createVPC
- permission: allow
- description: My comments
- delegate_to: localhost
-
-- name: Remove a role permission
- cs_role_permission:
- state: absent
- role: My_Custom_role
- name: createVPC
- delegate_to: localhost
-
-- name: Update a system role permission
- cs_role_permission:
- role: Domain Admin
- name: createVPC
- permission: deny
- delegate_to: localhost
-
-- name: Update rules order. Move the rule at the top of list
- cs_role_permission:
- role: Domain Admin
- name: createVPC
- parent: 0
- delegate_to: localhost
-'''
-
-RETURN = '''
----
-id:
- description: The ID of the role permission.
- returned: success
- type: str
- sample: a6f7a5fc-43f8-11e5-a151-feff819cdc9f
-name:
- description: The API name of the permission.
- returned: success
- type: str
- sample: createVPC
-permission:
- description: The permission type of the api name.
- returned: success
- type: str
- sample: allow
-role_id:
- description: The ID of the role to which the role permission belongs.
- returned: success
- type: str
- sample: c6f7a5fc-43f8-11e5-a151-feff819cdc7f
-description:
- description: The description of the role permission
- returned: success
- type: str
- sample: Deny createVPC for users
-'''
-
-from distutils.version import LooseVersion
-
-from ansible.module_utils.basic import AnsibleModule
-from ansible.module_utils.cloudstack import (
- AnsibleCloudStack,
- cs_argument_spec,
- cs_required_together,
-)
-
-
-class AnsibleCloudStackRolePermission(AnsibleCloudStack):
-
- def __init__(self, module):
- super(AnsibleCloudStackRolePermission, self).__init__(module)
- cloudstack_min_version = LooseVersion('4.9.2')
-
- self.returns = {
- 'id': 'id',
- 'roleid': 'role_id',
- 'rule': 'name',
- 'permission': 'permission',
- 'description': 'description',
- }
- self.role_permission = None
-
- self.cloudstack_version = self._cloudstack_ver()
-
- if self.cloudstack_version < cloudstack_min_version:
- self.fail_json(msg="This module requires CloudStack >= %s." % cloudstack_min_version)
-
- def _cloudstack_ver(self):
- capabilities = self.get_capabilities()
- return LooseVersion(capabilities['cloudstackversion'])
-
- def _get_role_id(self):
- role = self.module.params.get('role')
- if not role:
- return None
-
- res = self.query_api('listRoles')
- roles = res['role']
- if roles:
- for r in roles:
- if role in [r['name'], r['id']]:
- return r['id']
- self.fail_json(msg="Role '%s' not found" % role)
-
- def _get_role_perm(self):
- role_permission = self.role_permission
-
- args = {
- 'roleid': self._get_role_id(),
- }
-
- rp = self.query_api('listRolePermissions', **args)
-
- if rp:
- role_permission = rp['rolepermission']
-
- return role_permission
-
- def _get_rule(self, rule=None):
- if not rule:
- rule = self.module.params.get('name')
-
- if self._get_role_perm():
- for _rule in self._get_role_perm():
- if rule == _rule['rule'] or rule == _rule['id']:
- return _rule
-
- return None
-
- def _get_rule_order(self):
- perms = self._get_role_perm()
- rules = []
-
- if perms:
- for i, rule in enumerate(perms):
- rules.append(rule['id'])
-
- return rules
-
- def replace_rule(self):
- old_rule = self._get_rule()
-
- if old_rule:
- rules_order = self._get_rule_order()
- old_pos = rules_order.index(old_rule['id'])
-
- self.remove_role_perm()
-
- new_rule = self.create_role_perm()
-
- if new_rule:
- perm_order = self.order_permissions(int(old_pos - 1), new_rule['id'])
-
- return perm_order
-
- return None
-
- def order_permissions(self, parent, rule_id):
- rules = self._get_rule_order()
-
- if isinstance(parent, int):
- parent_pos = parent
- elif parent == '0':
- parent_pos = -1
- else:
- parent_rule = self._get_rule(parent)
- if not parent_rule:
- self.fail_json(msg="Parent rule '%s' not found" % parent)
-
- parent_pos = rules.index(parent_rule['id'])
-
- r_id = rules.pop(rules.index(rule_id))
-
- rules.insert((parent_pos + 1), r_id)
- rules = ','.join(map(str, rules))
-
- return rules
-
- def create_or_update_role_perm(self):
- role_permission = self._get_rule()
-
- if not role_permission:
- role_permission = self.create_role_perm()
- else:
- role_permission = self.update_role_perm(role_permission)
-
- return role_permission
-
- def create_role_perm(self):
- role_permission = None
-
- self.result['changed'] = True
-
- args = {
- 'rule': self.module.params.get('name'),
- 'description': self.module.params.get('description'),
- 'roleid': self._get_role_id(),
- 'permission': self.module.params.get('permission'),
- }
-
- if not self.module.check_mode:
- res = self.query_api('createRolePermission', **args)
- role_permission = res['rolepermission']
-
- return role_permission
-
- def update_role_perm(self, role_perm):
- perm_order = None
-
- if not self.module.params.get('parent'):
- args = {
- 'ruleid': role_perm['id'],
- 'roleid': role_perm['roleid'],
- 'permission': self.module.params.get('permission'),
- }
-
- if self.has_changed(args, role_perm, only_keys=['permission']):
- self.result['changed'] = True
-
- if not self.module.check_mode:
- if self.cloudstack_version >= LooseVersion('4.11.0'):
- self.query_api('updateRolePermission', **args)
- role_perm = self._get_rule()
- else:
- perm_order = self.replace_rule()
- else:
- perm_order = self.order_permissions(self.module.params.get('parent'), role_perm['id'])
-
- if perm_order:
- args = {
- 'roleid': role_perm['roleid'],
- 'ruleorder': perm_order,
- }
-
- self.result['changed'] = True
-
- if not self.module.check_mode:
- self.query_api('updateRolePermission', **args)
- role_perm = self._get_rule()
-
- return role_perm
-
- def remove_role_perm(self):
- role_permission = self._get_rule()
-
- if role_permission:
- self.result['changed'] = True
-
- args = {
- 'id': role_permission['id'],
- }
-
- if not self.module.check_mode:
- self.query_api('deleteRolePermission', **args)
-
- return role_permission
-
-
-def main():
- argument_spec = cs_argument_spec()
- argument_spec.update(dict(
- role=dict(required=True),
- name=dict(required=True),
- permission=dict(choices=['allow', 'deny'], default='deny'),
- description=dict(),
- state=dict(choices=['present', 'absent'], default='present'),
- parent=dict(),
- ))
-
- module = AnsibleModule(
- argument_spec=argument_spec,
- required_together=cs_required_together(),
- mutually_exclusive=(
- ['permission', 'parent'],
- ),
- supports_check_mode=True
- )
-
- acs_role_perm = AnsibleCloudStackRolePermission(module)
-
- state = module.params.get('state')
- if state in ['absent']:
- role_permission = acs_role_perm.remove_role_perm()
- else:
- role_permission = acs_role_perm.create_or_update_role_perm()
-
- result = acs_role_perm.get_result(role_permission)
- module.exit_json(**result)
-
-
-if __name__ == '__main__':
- main()