diff options
Diffstat (limited to 'os_client_config/config.py')
-rw-r--r-- | os_client_config/config.py | 1224 |
1 files changed, 7 insertions, 1217 deletions
diff --git a/os_client_config/config.py b/os_client_config/config.py index f80e158..d506a04 100644 --- a/os_client_config/config.py +++ b/os_client_config/config.py @@ -12,1230 +12,20 @@ # License for the specific language governing permissions and limitations # under the License. +from openstack.config import loader +from openstack.config.loader import * # noqa -# alias because we already had an option named argparse -import argparse as argparse_mod -import collections -import copy -import json -import os -import re -import sys -import warnings - -import appdirs -from keystoneauth1 import adapter -from keystoneauth1 import loading -import yaml - -from os_client_config import _log from os_client_config import cloud_config from os_client_config import defaults -from os_client_config import exceptions -from os_client_config import vendors - -APPDIRS = appdirs.AppDirs('openstack', 'OpenStack', multipath='/etc') -CONFIG_HOME = APPDIRS.user_config_dir -CACHE_PATH = APPDIRS.user_cache_dir - -UNIX_CONFIG_HOME = os.path.join( - os.path.expanduser(os.path.join('~', '.config')), 'openstack') -UNIX_SITE_CONFIG_HOME = '/etc/openstack' - -SITE_CONFIG_HOME = APPDIRS.site_config_dir - -CONFIG_SEARCH_PATH = [ - os.getcwd(), - CONFIG_HOME, UNIX_CONFIG_HOME, - SITE_CONFIG_HOME, UNIX_SITE_CONFIG_HOME -] -YAML_SUFFIXES = ('.yaml', '.yml') -JSON_SUFFIXES = ('.json',) -CONFIG_FILES = [ - os.path.join(d, 'clouds' + s) - for d in CONFIG_SEARCH_PATH - for s in YAML_SUFFIXES + JSON_SUFFIXES -] -SECURE_FILES = [ - os.path.join(d, 'secure' + s) - for d in CONFIG_SEARCH_PATH - for s in YAML_SUFFIXES + JSON_SUFFIXES -] -VENDOR_FILES = [ - os.path.join(d, 'clouds-public' + s) - for d in CONFIG_SEARCH_PATH - for s in YAML_SUFFIXES + JSON_SUFFIXES -] - -BOOL_KEYS = ('insecure', 'cache') - -FORMAT_EXCLUSIONS = frozenset(['password']) - - -# NOTE(dtroyer): This turns out to be not the best idea so let's move -# overriding defaults to a kwarg to OpenStackConfig.__init__() -# Remove this sometime in June 2015 once OSC is comfortably -# changed-over and global-defaults is updated. -def set_default(key, value): - warnings.warn( - "Use of set_default() is deprecated. Defaults should be set with the " - "`override_defaults` parameter of OpenStackConfig." - ) - defaults.get_defaults() # make sure the dict is initialized - defaults._defaults[key] = value - - -def get_boolean(value): - if value is None: - return False - if type(value) is bool: - return value - if value.lower() == 'true': - return True - return False - - -def _get_os_environ(envvar_prefix=None): - ret = defaults.get_defaults() - if not envvar_prefix: - # This makes the or below be OS_ or OS_ which is a no-op - envvar_prefix = 'OS_' - environkeys = [k for k in os.environ.keys() - if (k.startswith('OS_') or k.startswith(envvar_prefix)) - and not k.startswith('OS_TEST') # infra CI var - and not k.startswith('OS_STD') # infra CI var - ] - for k in environkeys: - newkey = k.split('_', 1)[-1].lower() - ret[newkey] = os.environ[k] - # If the only environ keys are selectors or behavior modification, don't - # return anything - selectors = set([ - 'OS_CLOUD', 'OS_REGION_NAME', - 'OS_CLIENT_CONFIG_FILE', 'OS_CLIENT_SECURE_FILE', 'OS_CLOUD_NAME']) - if set(environkeys) - selectors: - return ret - return None - - -def _merge_clouds(old_dict, new_dict): - """Like dict.update, except handling nested dicts.""" - ret = old_dict.copy() - for (k, v) in new_dict.items(): - if isinstance(v, dict): - if k in ret: - ret[k] = _merge_clouds(ret[k], v) - else: - ret[k] = v.copy() - else: - ret[k] = v - return ret - - -def _auth_update(old_dict, new_dict_source): - """Like dict.update, except handling the nested dict called auth.""" - new_dict = copy.deepcopy(new_dict_source) - for (k, v) in new_dict.items(): - if k == 'auth': - if k in old_dict: - old_dict[k].update(v) - else: - old_dict[k] = v.copy() - else: - old_dict[k] = v - return old_dict - - -def _fix_argv(argv): - # Transform any _ characters in arg names to - so that we don't - # have to throw billions of compat argparse arguments around all - # over the place. - processed = collections.defaultdict(list) - for index in range(0, len(argv)): - # If the value starts with '--' and has '-' or '_' in it, then - # it's worth looking at it - if re.match('^--.*(_|-)+.*', argv[index]): - split_args = argv[index].split('=') - orig = split_args[0] - new = orig.replace('_', '-') - if orig != new: - split_args[0] = new - argv[index] = "=".join(split_args) - # Save both for later so we can throw an error about dupes - processed[new].append(orig) - overlap = [] - for new, old in processed.items(): - if len(old) > 1: - overlap.extend(old) - if overlap: - raise exceptions.OpenStackConfigException( - "The following options were given: '{options}' which contain" - " duplicates except that one has _ and one has -. There is" - " no sane way for us to know what you're doing. Remove the" - " duplicate option and try again".format( - options=','.join(overlap))) - - -class OpenStackConfig(object): - - def __init__(self, config_files=None, vendor_files=None, - override_defaults=None, force_ipv4=None, - envvar_prefix=None, secure_files=None, - pw_func=None, session_constructor=None, - app_name=None, app_version=None, - load_yaml_config=True): - self.log = _log.setup_logging(__name__) - self._session_constructor = session_constructor - self._app_name = app_name - self._app_version = app_version - - if load_yaml_config: - self._config_files = config_files or CONFIG_FILES - self._secure_files = secure_files or SECURE_FILES - self._vendor_files = vendor_files or VENDOR_FILES - else: - self._config_files = [] - self._secure_files = [] - self._vendor_files = [] - - config_file_override = os.environ.get('OS_CLIENT_CONFIG_FILE') - if config_file_override: - self._config_files.insert(0, config_file_override) - - secure_file_override = os.environ.get('OS_CLIENT_SECURE_FILE') - if secure_file_override: - self._secure_files.insert(0, secure_file_override) - - self.defaults = defaults.get_defaults() - if override_defaults: - self.defaults.update(override_defaults) - - # First, use a config file if it exists where expected - self.config_filename, self.cloud_config = self._load_config_file() - _, secure_config = self._load_secure_file() - if secure_config: - self.cloud_config = _merge_clouds( - self.cloud_config, secure_config) - - if not self.cloud_config: - self.cloud_config = {'clouds': {}} - if 'clouds' not in self.cloud_config: - self.cloud_config['clouds'] = {} - - # Grab ipv6 preference settings from env - client_config = self.cloud_config.get('client', {}) - - if force_ipv4 is not None: - # If it's passed in to the constructor, honor it. - self.force_ipv4 = force_ipv4 - else: - # Get the backwards compat value - prefer_ipv6 = get_boolean( - os.environ.get( - 'OS_PREFER_IPV6', client_config.get( - 'prefer_ipv6', client_config.get( - 'prefer-ipv6', True)))) - force_ipv4 = get_boolean( - os.environ.get( - 'OS_FORCE_IPV4', client_config.get( - 'force_ipv4', client_config.get( - 'broken-ipv6', False)))) - - self.force_ipv4 = force_ipv4 - if not prefer_ipv6: - # this will only be false if someone set it explicitly - # honor their wishes - self.force_ipv4 = True - - # Next, process environment variables and add them to the mix - self.envvar_key = os.environ.get('OS_CLOUD_NAME', 'envvars') - if self.envvar_key in self.cloud_config['clouds']: - raise exceptions.OpenStackConfigException( - '"{0}" defines a cloud named "{1}", but' - ' OS_CLOUD_NAME is also set to "{1}". Please rename' - ' either your environment based cloud, or one of your' - ' file-based clouds.'.format(self.config_filename, - self.envvar_key)) - - self.default_cloud = os.environ.get('OS_CLOUD') - - envvars = _get_os_environ(envvar_prefix=envvar_prefix) - if envvars: - self.cloud_config['clouds'][self.envvar_key] = envvars - if not self.default_cloud: - self.default_cloud = self.envvar_key - - if not self.default_cloud and self.cloud_config['clouds']: - if len(self.cloud_config['clouds'].keys()) == 1: - # If there is only one cloud just use it. This matches envvars - # behavior and allows for much less typing. - # TODO(mordred) allow someone to mark a cloud as "default" in - # clouds.yaml. - # The next/iter thing is for python3 compat where dict.keys - # returns an iterator but in python2 it's a list. - self.default_cloud = next(iter( - self.cloud_config['clouds'].keys())) - - # Finally, fall through and make a cloud that starts with defaults - # because we need somewhere to put arguments, and there are neither - # config files or env vars - if not self.cloud_config['clouds']: - self.cloud_config = dict( - clouds=dict(defaults=dict(self.defaults))) - self.default_cloud = 'defaults' - - self._cache_expiration_time = 0 - self._cache_path = CACHE_PATH - self._cache_class = 'dogpile.cache.null' - self._cache_arguments = {} - self._cache_expiration = {} - if 'cache' in self.cloud_config: - cache_settings = self._normalize_keys(self.cloud_config['cache']) - - # expiration_time used to be 'max_age' but the dogpile setting - # is expiration_time. Support max_age for backwards compat. - self._cache_expiration_time = cache_settings.get( - 'expiration_time', cache_settings.get( - 'max_age', self._cache_expiration_time)) - - # If cache class is given, use that. If not, but if cache time - # is given, default to memory. Otherwise, default to nothing. - # to memory. - if self._cache_expiration_time: - self._cache_class = 'dogpile.cache.memory' - self._cache_class = self.cloud_config['cache'].get( - 'class', self._cache_class) - - self._cache_path = os.path.expanduser( - cache_settings.get('path', self._cache_path)) - self._cache_arguments = cache_settings.get( - 'arguments', self._cache_arguments) - self._cache_expiration = cache_settings.get( - 'expiration', self._cache_expiration) - - # Flag location to hold the peeked value of an argparse timeout value - self._argv_timeout = False - - # Save the password callback - # password = self._pw_callback(prompt="Password: ") - self._pw_callback = pw_func - - def get_extra_config(self, key, defaults=None): - """Fetch an arbitrary extra chunk of config, laying in defaults. - - :param string key: name of the config section to fetch - :param dict defaults: (optional) default values to merge under the - found config - """ - if not defaults: - defaults = {} - return _merge_clouds( - self._normalize_keys(defaults), - self._normalize_keys(self.cloud_config.get(key, {}))) - - def _load_config_file(self): - return self._load_yaml_json_file(self._config_files) - - def _load_secure_file(self): - return self._load_yaml_json_file(self._secure_files) - - def _load_vendor_file(self): - return self._load_yaml_json_file(self._vendor_files) - - def _load_yaml_json_file(self, filelist): - for path in filelist: - if os.path.exists(path): - with open(path, 'r') as f: - if path.endswith('json'): - return path, json.load(f) - else: - return path, yaml.safe_load(f) - return (None, {}) - - def _normalize_keys(self, config): - new_config = {} - for key, value in config.items(): - key = key.replace('-', '_') - if isinstance(value, dict): - new_config[key] = self._normalize_keys(value) - elif isinstance(value, bool): - new_config[key] = value - elif isinstance(value, int) and key != 'verbose_level': - new_config[key] = str(value) - elif isinstance(value, float): - new_config[key] = str(value) - else: - new_config[key] = value - return new_config - - def get_cache_expiration_time(self): - return int(self._cache_expiration_time) - - def get_cache_interval(self): - return self.get_cache_expiration_time() - - def get_cache_max_age(self): - return self.get_cache_expiration_time() - - def get_cache_path(self): - return self._cache_path - - def get_cache_class(self): - return self._cache_class - - def get_cache_arguments(self): - return copy.deepcopy(self._cache_arguments) - - def get_cache_expiration(self): - return copy.deepcopy(self._cache_expiration) - - def _expand_region_name(self, region_name): - return {'name': region_name, 'values': {}} - - def _expand_regions(self, regions): - ret = [] - for region in regions: - if isinstance(region, dict): - ret.append(copy.deepcopy(region)) - else: - ret.append(self._expand_region_name(region)) - return ret - - def _get_regions(self, cloud): - if cloud not in self.cloud_config['clouds']: - return [self._expand_region_name('')] - regions = self._get_known_regions(cloud) - if not regions: - # We don't know of any regions use a workable default. - regions = [self._expand_region_name('')] - return regions - - def _get_known_regions(self, cloud): - config = self._normalize_keys(self.cloud_config['clouds'][cloud]) - if 'regions' in config: - return self._expand_regions(config['regions']) - elif 'region_name' in config: - if isinstance(config['region_name'], list): - regions = config['region_name'] - else: - regions = config['region_name'].split(',') - if len(regions) > 1: - warnings.warn( - "Comma separated lists in region_name are deprecated." - " Please use a yaml list in the regions" - " parameter in {0} instead.".format(self.config_filename)) - return self._expand_regions(regions) - else: - # crappit. we don't have a region defined. - new_cloud = dict() - our_cloud = self.cloud_config['clouds'].get(cloud, dict()) - self._expand_vendor_profile(cloud, new_cloud, our_cloud) - if 'regions' in new_cloud and new_cloud['regions']: - return self._expand_regions(new_cloud['regions']) - elif 'region_name' in new_cloud and new_cloud['region_name']: - return [self._expand_region_name(new_cloud['region_name'])] - - def _get_region(self, cloud=None, region_name=''): - if region_name is None: - region_name = '' - if not cloud: - return self._expand_region_name(region_name) - - regions = self._get_known_regions(cloud) - if not regions: - return self._expand_region_name(region_name) - - if not region_name: - return regions[0] - - for region in regions: - if region['name'] == region_name: - return region - - raise exceptions.OpenStackConfigException( - 'Region {region_name} is not a valid region name for cloud' - ' {cloud}. Valid choices are {region_list}. Please note that' - ' region names are case sensitive.'.format( - region_name=region_name, - region_list=','.join([r['name'] for r in regions]), - cloud=cloud)) - - def get_cloud_names(self): - return self.cloud_config['clouds'].keys() - - def _get_base_cloud_config(self, name): - cloud = dict() - - # Only validate cloud name if one was given - if name and name not in self.cloud_config['clouds']: - raise exceptions.OpenStackConfigException( - "Cloud {name} was not found.".format( - name=name)) - - our_cloud = self.cloud_config['clouds'].get(name, dict()) - - # Get the defaults - cloud.update(self.defaults) - self._expand_vendor_profile(name, cloud, our_cloud) - - if 'auth' not in cloud: - cloud['auth'] = dict() - - _auth_update(cloud, our_cloud) - if 'cloud' in cloud: - del cloud['cloud'] - - return cloud - - def _expand_vendor_profile(self, name, cloud, our_cloud): - # Expand a profile if it exists. 'cloud' is an old confusing name - # for this. - profile_name = our_cloud.get('profile', our_cloud.get('cloud', None)) - if profile_name and profile_name != self.envvar_key: - if 'cloud' in our_cloud: - warnings.warn( - "{0} use the keyword 'cloud' to reference a known " - "vendor profile. This has been deprecated in favor of the " - "'profile' keyword.".format(self.config_filename)) - vendor_filename, vendor_file = self._load_vendor_file() - if vendor_file and profile_name in vendor_file['public-clouds']: - _auth_update(cloud, vendor_file['public-clouds'][profile_name]) - else: - profile_data = vendors.get_profile(profile_name) - if profile_data: - status = profile_data.pop('status', 'active') - message = profile_data.pop('message', '') - if status == 'deprecated': - warnings.warn( - "{profile_name} is deprecated: {message}".format( - profile_name=profile_name, message=message)) - elif status == 'shutdown': - raise exceptions.OpenStackConfigException( - "{profile_name} references a cloud that no longer" - " exists: {message}".format( - profile_name=profile_name, message=message)) - _auth_update(cloud, profile_data) - else: - # Can't find the requested vendor config, go about business - warnings.warn("Couldn't find the vendor profile '{0}', for" - " the cloud '{1}'".format(profile_name, - name)) - - def _project_scoped(self, cloud): - return ('project_id' in cloud or 'project_name' in cloud - or 'project_id' in cloud['auth'] - or 'project_name' in cloud['auth']) - - def _validate_networks(self, networks, key): - value = None - for net in networks: - if value and net[key]: - raise exceptions.OpenStackConfigException( - "Duplicate network entries for {key}: {net1} and {net2}." - " Only one network can be flagged with {key}".format( - key=key, - net1=value['name'], - net2=net['name'])) - if not value and net[key]: - value = net - - def _fix_backwards_networks(self, cloud): - # Leave the external_network and internal_network keys in the - # dict because consuming code might be expecting them. - networks = [] - # Normalize existing network entries - for net in cloud.get('networks', []): - name = net.get('name') - if not name: - raise exceptions.OpenStackConfigException( - 'Entry in network list is missing required field "name".') - network = dict( - name=name, - routes_externally=get_boolean(net.get('routes_externally')), - nat_source=get_boolean(net.get('nat_source')), - nat_destination=get_boolean(net.get('nat_destination')), - default_interface=get_boolean(net.get('default_interface')), - ) - # routes_ipv4_externally defaults to the value of routes_externally - network['routes_ipv4_externally'] = get_boolean( - net.get( - 'routes_ipv4_externally', network['routes_externally'])) - # routes_ipv6_externally defaults to the value of routes_externally - network['routes_ipv6_externally'] = get_boolean( - net.get( - 'routes_ipv6_externally', network['routes_externally'])) - networks.append(network) - - for key in ('external_network', 'internal_network'): - external = key.startswith('external') - if key in cloud and 'networks' in cloud: - raise exceptions.OpenStackConfigException( - "Both {key} and networks were specified in the config." - " Please remove {key} from the config and use the network" - " list to configure network behavior.".format(key=key)) - if key in cloud: - warnings.warn( - "{key} is deprecated. Please replace with an entry in" - " a dict inside of the networks list with name: {name}" - " and routes_externally: {external}".format( - key=key, name=cloud[key], external=external)) - networks.append(dict( - name=cloud[key], - routes_externally=external, - nat_destination=not external, - default_interface=external)) - - # Validate that we don't have duplicates - self._validate_networks(networks, 'nat_destination') - self._validate_networks(networks, 'default_interface') - - cloud['networks'] = networks - return cloud - - def _handle_domain_id(self, cloud): - # Allow people to just specify domain once if it's the same - mappings = { - 'domain_id': ('user_domain_id', 'project_domain_id'), - 'domain_name': ('user_domain_name', 'project_domain_name'), - } - for target_key, possible_values in mappings.items(): - if not self._project_scoped(cloud): - if target_key in cloud and target_key not in cloud['auth']: - cloud['auth'][target_key] = cloud.pop(target_key) - continue - for key in possible_values: - if target_key in cloud['auth'] and key not in cloud['auth']: - cloud['auth'][key] = cloud['auth'][target_key] - cloud.pop(target_key, None) - cloud['auth'].pop(target_key, None) - return cloud - - def _fix_backwards_project(self, cloud): - # Do the lists backwards so that project_name is the ultimate winner - # Also handle moving domain names into auth so that domain mapping - # is easier - mappings = { - 'domain_id': ('domain_id', 'domain-id'), - 'domain_name': ('domain_name', 'domain-name'), - 'user_domain_id': ('user_domain_id', 'user-domain-id'), - 'user_domain_name': ('user_domain_name', 'user-domain-name'), - 'project_domain_id': ('project_domain_id', 'project-domain-id'), - 'project_domain_name': ( - 'project_domain_name', 'project-domain-name'), - 'token': ('auth-token', 'auth_token', 'token'), - } - if cloud.get('auth_type', None) == 'v2password': - # If v2password is explcitly requested, this is to deal with old - # clouds. That's fine - we need to map settings in the opposite - # direction - mappings['tenant_id'] = ( - 'project_id', 'project-id', 'tenant_id', 'tenant-id') - mappings['tenant_name'] = ( - 'project_name', 'project-name', 'tenant_name', 'tenant-name') - else: - mappings['project_id'] = ( - 'tenant_id', 'tenant-id', 'project_id', 'project-id') - mappings['project_name'] = ( - 'tenant_name', 'tenant-name', 'project_name', 'project-name') - for target_key, possible_values in mappings.items(): - target = None - for key in possible_values: - if key in cloud: - target = str(cloud[key]) - del cloud[key] - if key in cloud['auth']: - target = str(cloud['auth'][key]) - del cloud['auth'][key] - if target: - cloud['auth'][target_key] = target - return cloud - - def _fix_backwards_auth_plugin(self, cloud): - # Do the lists backwards so that auth_type is the ultimate winner - mappings = { - 'auth_type': ('auth_plugin', 'auth_type'), - } - for target_key, possible_values in mappings.items(): - target = None - for key in possible_values: - if key in cloud: - target = cloud[key] - del cloud[key] - cloud[target_key] = target - # Because we force alignment to v3 nouns, we want to force - # use of the auth plugin that can do auto-selection and dealing - # with that based on auth parameters. v2password is basically - # completely broken - return cloud - - def register_argparse_arguments(self, parser, argv, service_keys=None): - """Register all of the common argparse options needed. - - Given an argparse parser, register the keystoneauth Session arguments, - the keystoneauth Auth Plugin Options and os-cloud. Also, peek in the - argv to see if all of the auth plugin options should be registered - or merely the ones already configured. - - :param argparse.ArgumentParser: parser to attach argparse options to - :param list argv: the arguments provided to the application - :param string service_keys: Service or list of services this argparse - should be specialized for, if known. - The first item in the list will be used - as the default value for service_type - (optional) - - :raises exceptions.OpenStackConfigException if an invalid auth-type - is requested - """ - - if service_keys is None: - service_keys = [] - - # Fix argv in place - mapping any keys with embedded _ in them to - - _fix_argv(argv) - - local_parser = argparse_mod.ArgumentParser(add_help=False) - - for p in (parser, local_parser): - p.add_argument( - '--os-cloud', - metavar='<name>', - default=os.environ.get('OS_CLOUD', None), - help='Named cloud to connect to') - - # we need to peek to see if timeout was actually passed, since - # the keystoneauth declaration of it has a default, which means - # we have no clue if the value we get is from the ksa default - # for from the user passing it explicitly. We'll stash it for later - local_parser.add_argument('--timeout', metavar='<timeout>') - - # We need for get_one_cloud to be able to peek at whether a token - # was passed so that we can swap the default from password to - # token if it was. And we need to also peek for --os-auth-token - # for novaclient backwards compat - local_parser.add_argument('--os-token') - local_parser.add_argument('--os-auth-token') - - # Peek into the future and see if we have an auth-type set in - # config AND a cloud set, so that we know which command line - # arguments to register and show to the user (the user may want - # to say something like: - # openstack --os-cloud=foo --os-oidctoken=bar - # although I think that user is the cause of my personal pain - options, _args = local_parser.parse_known_args(argv) - if options.timeout: - self._argv_timeout = True - - # validate = False because we're not _actually_ loading here - # we're only peeking, so it's the wrong time to assert that - # the rest of the arguments given are invalid for the plugin - # chosen (for instance, --help may be requested, so that the - # user can see what options he may want to give - cloud = self.get_one_cloud(argparse=options, validate=False) - default_auth_type = cloud.config['auth_type'] - - try: - loading.register_auth_argparse_arguments( - parser, argv, default=default_auth_type) - except Exception: - # Hidiing the keystoneauth exception because we're not actually - # loading the auth plugin at this point, so the error message - # from it doesn't actually make sense to os-client-config users - options, _args = parser.parse_known_args(argv) - plugin_names = loading.get_available_plugin_names() - raise exceptions.OpenStackConfigException( - "An invalid auth-type was specified: {auth_type}." - " Valid choices are: {plugin_names}.".format( - auth_type=options.os_auth_type, - plugin_names=",".join(plugin_names))) - - if service_keys: - primary_service = service_keys[0] - else: - primary_service = None - loading.register_session_argparse_arguments(parser) - adapter.register_adapter_argparse_arguments( - parser, service_type=primary_service) - for service_key in service_keys: - # legacy clients have un-prefixed api-version options - parser.add_argument( - '--{service_key}-api-version'.format( - service_key=service_key.replace('_', '-'), - help=argparse_mod.SUPPRESS)) - adapter.register_service_adapter_argparse_arguments( - parser, service_type=service_key) - - # Backwards compat options for legacy clients - parser.add_argument('--http-timeout', help=argparse_mod.SUPPRESS) - parser.add_argument('--os-endpoint-type', help=argparse_mod.SUPPRESS) - parser.add_argument('--endpoint-type', help=argparse_mod.SUPPRESS) - - def _fix_backwards_interface(self, cloud): - new_cloud = {} - for key in cloud.keys(): - if key.endswith('endpoint_type'): - target_key = key.replace('endpoint_type', 'interface') - else: - target_key = key - new_cloud[target_key] = cloud[key] - return new_cloud - - def _fix_backwards_api_timeout(self, cloud): - new_cloud = {} - # requests can only have one timeout, which means that in a single - # cloud there is no point in different timeout values. However, - # for some reason many of the legacy clients decided to shove their - # service name in to the arg name for reasons surpassin sanity. If - # we find any values that are not api_timeout, overwrite api_timeout - # with the value - service_timeout = None - for key in cloud.keys(): - if key.endswith('timeout') and not ( - key == 'timeout' or key == 'api_timeout'): - service_timeout = cloud[key] - else: - new_cloud[key] = cloud[key] - if service_timeout is not None: - new_cloud['api_timeout'] = service_timeout - # The common argparse arg from keystoneauth is called timeout, but - # os-client-config expects it to be called api_timeout - if self._argv_timeout: - if 'timeout' in new_cloud and new_cloud['timeout']: - new_cloud['api_timeout'] = new_cloud.pop('timeout') - return new_cloud - - def get_all_clouds(self): - - clouds = [] - - for cloud in self.get_cloud_names(): - for region in self._get_regions(cloud): - if region: - clouds.append(self.get_one_cloud( - cloud, region_name=region['name'])) - return clouds - - def _fix_args(self, args=None, argparse=None): - """Massage the passed-in options - - Replace - with _ and strip os_ prefixes. - - Convert an argparse Namespace object to a dict, removing values - that are either None or ''. - """ - if not args: - args = {} - - if argparse: - # Convert the passed-in Namespace - o_dict = vars(argparse) - parsed_args = dict() - for k in o_dict: - if o_dict[k] is not None and o_dict[k] != '': - parsed_args[k] = o_dict[k] - args.update(parsed_args) - - os_args = dict() - new_args = dict() - for (key, val) in iter(args.items()): - if type(args[key]) == dict: - # dive into the auth dict - new_args[key] = self._fix_args(args[key]) - continue - - key = key.replace('-', '_') - if key.startswith('os_'): - os_args[key[3:]] = val - else: - new_args[key] = val - new_args.update(os_args) - return new_args - - def _find_winning_auth_value(self, opt, config): - opt_name = opt.name.replace('-', '_') - if opt_name in config: - return config[opt_name] - else: - deprecated = getattr(opt, 'deprecated', getattr( - opt, 'deprecated_opts', [])) - for d_opt in deprecated: - d_opt_name = d_opt.name.replace('-', '_') - if d_opt_name in config: - return config[d_opt_name] - - def auth_config_hook(self, config): - """Allow examination of config values before loading auth plugin - - OpenStackClient will override this to perform additional checks - on auth_type. - """ - return config - - def _get_auth_loader(self, config): - # Use the 'none' plugin for variants of None specified, - # since it does not look up endpoints or tokens but rather - # does a passthrough. This is useful for things like Ironic - # that have a keystoneless operational mode, but means we're - # still dealing with a keystoneauth Session object, so all the - # _other_ things (SSL arg handling, timeout) all work consistently - if config['auth_type'] in (None, "None", ''): - # 'none' auth_plugin has token as 'notused' so validate_auth will - # not strip the value (which it does for actual python None) - config['auth_type'] = 'none' - elif config['auth_type'] == 'token_endpoint': - # Humans have been trained to use a thing called token_endpoint - # That it does not exist in keystoneauth is irrelvant- it not - # doing what they want causes them sorrow. - config['auth_type'] = 'admin_token' - return loading.get_plugin_loader(config['auth_type']) - - def _validate_auth(self, config, loader): - # May throw a keystoneauth1.exceptions.NoMatchingPlugin - - plugin_options = loader.get_options() - - for p_opt in plugin_options: - # if it's in config.auth, win, kill it from config dict - # if it's in config and not in config.auth, move it - # deprecated loses to current - # provided beats default, deprecated or not - winning_value = self._find_winning_auth_value( - p_opt, - config['auth'], - ) - if not winning_value: - winning_value = self._find_winning_auth_value( - p_opt, - config, - ) - - config = self._clean_up_after_ourselves( - config, - p_opt, - winning_value, - ) - - if winning_value: - # Prefer the plugin configuration dest value if the value's key - # is marked as deprecated. - if p_opt.dest is None: - good_name = p_opt.name.replace('-', '_') - config['auth'][good_name] = winning_value - else: - config['auth'][p_opt.dest] = winning_value - - # See if this needs a prompting - config = self.option_prompt(config, p_opt) - - return config - - def _validate_auth_correctly(self, config, loader): - # May throw a keystoneauth1.exceptions.NoMatchingPlugin - - plugin_options = loader.get_options() - - for p_opt in plugin_options: - # if it's in config, win, move it and kill it from config dict - # if it's in config.auth but not in config it's good - # deprecated loses to current - # provided beats default, deprecated or not - winning_value = self._find_winning_auth_value( - p_opt, - config, - ) - if not winning_value: - winning_value = self._find_winning_auth_value( - p_opt, - config['auth'], - ) - - config = self._clean_up_after_ourselves( - config, - p_opt, - winning_value, - ) - - # See if this needs a prompting - config = self.option_prompt(config, p_opt) - - return config - - def option_prompt(self, config, p_opt): - """Prompt user for option that requires a value""" - if ( - getattr(p_opt, 'prompt', None) is not None and - p_opt.dest not in config['auth'] and - self._pw_callback is not None - ): - config['auth'][p_opt.dest] = self._pw_callback(p_opt.prompt) - return config - - def _clean_up_after_ourselves(self, config, p_opt, winning_value): - - # Clean up after ourselves - for opt in [p_opt.name] + [o.name for o in p_opt.deprecated]: - opt = opt.replace('-', '_') - config.pop(opt, None) - config['auth'].pop(opt, None) - - if winning_value: - # Prefer the plugin configuration dest value if the value's key - # is marked as depreciated. - if p_opt.dest is None: - config['auth'][p_opt.name.replace('-', '_')] = ( - winning_value) - else: - config['auth'][p_opt.dest] = winning_value - return config - - def magic_fixes(self, config): - """Perform the set of magic argument fixups""" - - # Infer token plugin if a token was given - if (('auth' in config and 'token' in config['auth']) or - ('auth_token' in config and config['auth_token']) or - ('token' in config and config['token'])): - config.setdefault('token', config.pop('auth_token', None)) - - # These backwards compat values are only set via argparse. If it's - # there, it's because it was passed in explicitly, and should win - config = self._fix_backwards_api_timeout(config) - if 'endpoint_type' in config: - config['interface'] = config.pop('endpoint_type') - - config = self._fix_backwards_auth_plugin(config) - config = self._fix_backwards_project(config) - config = self._fix_backwards_interface(config) - config = self._fix_backwards_networks(config) - config = self._handle_domain_id(config) - - for key in BOOL_KEYS: - if key in config: - if type(config[key]) is not bool: - config[key] = get_boolean(config[key]) - - # TODO(mordred): Special casing auth_url here. We should - # come back to this betterer later so that it's - # more generalized - if 'auth' in config and 'auth_url' in config['auth']: - config['auth']['auth_url'] = config['auth']['auth_url'].format( - **config) - - return config - - def get_one_cloud(self, cloud=None, validate=True, - argparse=None, **kwargs): - """Retrieve a single cloud configuration and merge additional options - - :param string cloud: - The name of the configuration to load from clouds.yaml - :param boolean validate: - Validate the config. Setting this to False causes no auth plugin - to be created. It's really only useful for testing. - :param Namespace argparse: - An argparse Namespace object; allows direct passing in of - argparse options to be added to the cloud config. Values - of None and '' will be removed. - :param region_name: Name of the region of the cloud. - :param kwargs: Additional configuration options - - :raises: keystoneauth1.exceptions.MissingRequiredOptions - on missing required auth parameters - """ - - args = self._fix_args(kwargs, argparse=argparse) - - if cloud is None: - if 'cloud' in args: - cloud = args['cloud'] - else: - cloud = self.default_cloud - - config = self._get_base_cloud_config(cloud) - - # Get region specific settings - if 'region_name' not in args: - args['region_name'] = '' - region = self._get_region(cloud=cloud, region_name=args['region_name']) - args['region_name'] = region['name'] - region_args = copy.deepcopy(region['values']) - - # Regions is a list that we can use to create a list of cloud/region - # objects. It does not belong in the single-cloud dict - config.pop('regions', None) - - # Can't just do update, because None values take over - for arg_list in region_args, args: - for (key, val) in iter(arg_list.items()): - if val is not None: - if key == 'auth' and config[key] is not None: - config[key] = _auth_update(config[key], val) - else: - config[key] = val - - config = self.magic_fixes(config) - config = self._normalize_keys(config) - - # NOTE(dtroyer): OSC needs a hook into the auth args before the - # plugin is loaded in order to maintain backward- - # compatible behaviour - config = self.auth_config_hook(config) - - if validate: - loader = self._get_auth_loader(config) - config = self._validate_auth(config, loader) - auth_plugin = loader.load_from_options(**config['auth']) - else: - auth_plugin = None - - # If any of the defaults reference other values, we need to expand - for (key, value) in config.items(): - if hasattr(value, 'format') and key not in FORMAT_EXCLUSIONS: - config[key] = value.format(**config) - - force_ipv4 = config.pop('force_ipv4', self.force_ipv4) - prefer_ipv6 = config.pop('prefer_ipv6', True) - if not prefer_ipv6: - force_ipv4 = True - - if cloud is None: - cloud_name = '' - else: - cloud_name = str(cloud) - return cloud_config.CloudConfig( - name=cloud_name, - region=config['region_name'], - config=config, - force_ipv4=force_ipv4, - auth_plugin=auth_plugin, - openstack_config=self, - session_constructor=self._session_constructor, - app_name=self._app_name, - app_version=self._app_version, - ) - - def get_one_cloud_osc( - self, - cloud=None, - validate=True, - argparse=None, - **kwargs - ): - """Retrieve a single cloud configuration and merge additional options - - :param string cloud: - The name of the configuration to load from clouds.yaml - :param boolean validate: - Validate the config. Setting this to False causes no auth plugin - to be created. It's really only useful for testing. - :param Namespace argparse: - An argparse Namespace object; allows direct passing in of - argparse options to be added to the cloud config. Values - of None and '' will be removed. - :param region_name: Name of the region of the cloud. - :param kwargs: Additional configuration options - - :raises: keystoneauth1.exceptions.MissingRequiredOptions - on missing required auth parameters - """ - - args = self._fix_args(kwargs, argparse=argparse) - - if cloud is None: - if 'cloud' in args: - cloud = args['cloud'] - else: - cloud = self.default_cloud - - config = self._get_base_cloud_config(cloud) - - # Get region specific settings - if 'region_name' not in args: - args['region_name'] = '' - region = self._get_region(cloud=cloud, region_name=args['region_name']) - args['region_name'] = region['name'] - region_args = copy.deepcopy(region['values']) - - # Regions is a list that we can use to create a list of cloud/region - # objects. It does not belong in the single-cloud dict - config.pop('regions', None) - - # Can't just do update, because None values take over - for arg_list in region_args, args: - for (key, val) in iter(arg_list.items()): - if val is not None: - if key == 'auth' and config[key] is not None: - config[key] = _auth_update(config[key], val) - else: - config[key] = val - - config = self.magic_fixes(config) - - # NOTE(dtroyer): OSC needs a hook into the auth args before the - # plugin is loaded in order to maintain backward- - # compatible behaviour - config = self.auth_config_hook(config) - - if validate: - loader = self._get_auth_loader(config) - config = self._validate_auth_correctly(config, loader) - auth_plugin = loader.load_from_options(**config['auth']) - else: - auth_plugin = None - - # If any of the defaults reference other values, we need to expand - for (key, value) in config.items(): - if hasattr(value, 'format') and key not in FORMAT_EXCLUSIONS: - config[key] = value.format(**config) - - force_ipv4 = config.pop('force_ipv4', self.force_ipv4) - prefer_ipv6 = config.pop('prefer_ipv6', True) - if not prefer_ipv6: - force_ipv4 = True - - if cloud is None: - cloud_name = '' - else: - cloud_name = str(cloud) - return cloud_config.CloudConfig( - name=cloud_name, - region=config['region_name'], - config=self._normalize_keys(config), - force_ipv4=force_ipv4, - auth_plugin=auth_plugin, - openstack_config=self, - ) - - @staticmethod - def set_one_cloud(config_file, cloud, set_config=None): - """Set a single cloud configuration. - :param string config_file: - The path to the config file to edit. If this file does not exist - it will be created. - :param string cloud: - The name of the configuration to save to clouds.yaml - :param dict set_config: Configuration options to be set - """ - set_config = set_config or {} - cur_config = {} - try: - with open(config_file) as fh: - cur_config = yaml.safe_load(fh) - except IOError as e: - # Not no such file - if e.errno != 2: - raise - pass +class OpenStackConfig(loader.OpenStackConfig): - clouds_config = cur_config.get('clouds', {}) - cloud_config = _auth_update(clouds_config.get(cloud, {}), set_config) - clouds_config[cloud] = cloud_config - cur_config['clouds'] = clouds_config + _cloud_region_class = cloud_config.CloudConfig + _defaults_module = defaults - with open(config_file, 'w') as fh: - yaml.safe_dump(cur_config, fh, default_flow_style=False) + get_one_cloud = loader.OpenStackConfig.get_one + get_all_clouds = loader.OpenStackConfig.get_all if __name__ == '__main__': |