diff options
Diffstat (limited to 'cloudinit/config/schema.py')
-rw-r--r-- | cloudinit/config/schema.py | 220 |
1 files changed, 153 insertions, 67 deletions
diff --git a/cloudinit/config/schema.py b/cloudinit/config/schema.py index 42792985..b0b5fccf 100644 --- a/cloudinit/config/schema.py +++ b/cloudinit/config/schema.py @@ -1,6 +1,5 @@ # This file is part of cloud-init. See LICENSE file for license information. """schema.py: Set of module functions for processing cloud-config schema.""" - import argparse import json import logging @@ -29,7 +28,6 @@ except ImportError: ValidationError = Exception # type: ignore -error = partial(error, sys_exit=True) LOG = logging.getLogger(__name__) VERSIONED_USERDATA_SCHEMA_FILE = "versions.schema.cloud-config.json" @@ -164,17 +162,47 @@ def is_schema_byte_string(checker, instance): ) or isinstance(instance, (bytes,)) -def _add_deprecation_msg(description: Optional[str] = None) -> str: - if description: - return f"{DEPRECATED_PREFIX}{description}" - return DEPRECATED_PREFIX.replace(":", ".").strip() +def _add_deprecated_changed_or_new_msg( + config: dict, annotate=False, filter_key=None +) -> str: + """combine description with new/changed/deprecated message + + deprecated/changed/new keys require a _version key (this is verified + in a unittest), a _description key is optional + """ + + def format_message(key: str): + if not config.get(f"{key}"): + return "" + key_description = config.get(f"{key}_description", "") + v = config.get( + f"{key}_version", + f"<missing {key}_version key, please file a bug report>", + ) + msg = f"{key.capitalize()} in version {v}. {key_description}" + if annotate: + return f" {msg}" + # italicised RST - no whitespace between astrisk and text + return f"\n\n*{msg.strip()}*" -def _validator_deprecated( + # define print order + filter_keys = ( + filter_key if filter_key else ["deprecated", "changed", "new"] + ) + + # build a deprecation/new/changed string + changed_new_deprecated = "".join(map(format_message, filter_keys)) + description = config.get("description", "") + return f"{description}{changed_new_deprecated}".rstrip() + + +def _validator( _validator, deprecated: bool, _instance, schema: dict, + filter_key: str, error_type: Type[Exception] = SchemaDeprecationError, ): """Jsonschema validator for `deprecated` items. @@ -183,11 +211,16 @@ def _validator_deprecated( otherwise the instance is consider faulty. """ if deprecated: - description = schema.get("description") - msg = _add_deprecation_msg(description) + msg = _add_deprecated_changed_or_new_msg( + schema, annotate=True, filter_key=[filter_key] + ) yield error_type(msg) +_validator_deprecated = partial(_validator, filter_key="deprecated") +_validator_changed = partial(_validator, filter_key="changed") + + def _anyOf( validator, anyOf, @@ -316,6 +349,7 @@ def get_jsonschema_validator(): # Add deprecation handling validators = dict(Draft4Validator.VALIDATORS) validators[DEPRECATED_KEY] = _validator_deprecated + validators["changed"] = _validator_changed validators["oneOf"] = _oneOf validators["anyOf"] = _anyOf @@ -382,7 +416,7 @@ def validate_cloudconfig_metaschema(validator, schema: dict, throw=True): def validate_cloudconfig_schema( config: dict, - schema: dict = None, + schema: Optional[dict] = None, strict: bool = False, strict_metaschema: bool = False, log_details: bool = True, @@ -424,10 +458,14 @@ def validate_cloudconfig_schema( errors: SchemaProblems = [] deprecations: SchemaProblems = [] - for error in sorted(validator.iter_errors(config), key=lambda e: e.path): - path = ".".join([str(p) for p in error.path]) - problem = (SchemaProblem(path, error.message),) - if isinstance(error, SchemaDeprecationError): # pylint: disable=W1116 + for schema_error in sorted( + validator.iter_errors(config), key=lambda e: e.path + ): + path = ".".join([str(p) for p in schema_error.path]) + problem = (SchemaProblem(path, schema_error.message),) + if isinstance( + schema_error, SchemaDeprecationError + ): # pylint: disable=W1116 deprecations += problem else: errors += problem @@ -610,23 +648,7 @@ def validate_cloudconfig_file(config_path, schema, annotate=False): @raises SchemaValidationError containing any of schema_errors encountered. @raises RuntimeError when config_path does not exist. """ - if config_path is None: - # Use system's raw userdata path - if os.getuid() != 0: - raise RuntimeError( - "Unable to read system userdata as non-root user." - " Try using sudo" - ) - init = Init(ds_deps=[]) - init.fetch(existing="trust") - init.consume_data() - content = load_file(init.paths.get_ipath("cloud_config"), decode=False) - else: - if not os.path.exists(config_path): - raise RuntimeError( - "Configfile {0} does not exist".format(config_path) - ) - content = load_file(config_path, decode=False) + content = load_file(config_path, decode=False) if not content.startswith(CLOUD_CONFIG_HEADER): errors = [ SchemaProblem( @@ -693,7 +715,7 @@ def validate_cloudconfig_file(config_path, schema, annotate=False): schema_deprecations=e.schema_deprecations, ) ) - else: + elif e.schema_deprecations: message = _format_schema_problems( e.schema_deprecations, prefix="Cloud config schema deprecations: ", @@ -857,32 +879,35 @@ def _get_property_description(prop_config: dict) -> str: Order and deprecated property description after active descriptions. Add a trailing stop "." to any description not ending with ":". """ - prop_descr = prop_config.get("description", "") + + def assign_descriptions( + config: dict, descriptions: list, deprecated_descriptions: list + ): + if any( + map( + config.get, + ("deprecated_version", "changed_version", "new_version"), + ) + ): + deprecated_descriptions.append( + _add_deprecated_changed_or_new_msg(config) + ) + elif config.get("description"): + descriptions.append(_add_deprecated_changed_or_new_msg(config)) + oneOf = prop_config.get("oneOf", {}) anyOf = prop_config.get("anyOf", {}) - descriptions = [] - deprecated_descriptions = [] - if prop_descr: - prop_descr = prop_descr.rstrip(".") - if not prop_config.get(DEPRECATED_KEY): - descriptions.append(prop_descr) - else: - deprecated_descriptions.append(_add_deprecation_msg(prop_descr)) + descriptions: list = [] + deprecated_descriptions: list = [] + + assign_descriptions(prop_config, descriptions, deprecated_descriptions) for sub_item in chain(oneOf, anyOf): - if not sub_item.get("description"): - continue - if not sub_item.get(DEPRECATED_KEY): - descriptions.append(sub_item["description"].rstrip(".")) - else: - deprecated_descriptions.append( - f"{DEPRECATED_PREFIX}{sub_item['description'].rstrip('.')}" - ) + assign_descriptions(sub_item, descriptions, deprecated_descriptions) + # order deprecated descrs last description = ". ".join(chain(descriptions, deprecated_descriptions)) if description: description = f" {description}" - if description[-1] != ":": - description += "." return description @@ -1063,7 +1088,8 @@ def load_doc(requested_modules: list) -> str: "Invalid --docs value {}. Must be one of: {}".format( list(invalid_docs), ", ".join(all_modules), - ) + ), + sys_exit=True, ) for mod_name in all_modules: if "all" in requested_modules or mod_name in requested_modules: @@ -1164,28 +1190,88 @@ def handle_schema_args(name, args): """Handle provided schema args and perform the appropriate actions.""" exclusive_args = [args.config_file, args.docs, args.system] if len([arg for arg in exclusive_args if arg]) != 1: - error("Expected one of --config-file, --system or --docs arguments") + error( + "Expected one of --config-file, --system or --docs arguments", + sys_exit=True, + ) if args.annotate and args.docs: - error("Invalid flag combination. Cannot use --annotate with --docs") + error( + "Invalid flag combination. Cannot use --annotate with --docs", + sys_exit=True, + ) full_schema = get_schema() - if args.config_file or args.system: - try: - validate_cloudconfig_file( - args.config_file, full_schema, args.annotate + if args.docs: + print(load_doc(args.docs)) + return + if args.config_file: + config_files = (("user-data", args.config_file),) + else: + if os.getuid() != 0: + error( + "Unable to read system userdata or vendordata as non-root" + " user. Try using sudo.", + sys_exit=True, ) + init = Init(ds_deps=[]) + init.fetch(existing="trust") + userdata_file = init.paths.get_ipath("cloud_config") + if not userdata_file: + error( + "Unable to obtain user data file. No instance data available", + sys_exit=True, + ) + return # Helps typing + config_files = (("user-data", userdata_file),) + vendor_config_files = ( + ("vendor-data", init.paths.get_ipath("vendor_cloud_config")), + ("vendor2-data", init.paths.get_ipath("vendor2_cloud_config")), + ) + for cfg_type, vendor_file in vendor_config_files: + if vendor_file and os.path.exists(vendor_file): + config_files += ((cfg_type, vendor_file),) + if not os.path.exists(config_files[0][1]): + error( + f"Config file {config_files[0][1]} does not exist", + fmt="Error: {}", + sys_exit=True, + ) + + nested_output_prefix = "" + multi_config_output = bool(len(config_files) > 1) + if multi_config_output: + print( + "Found cloud-config data types: %s" + % ", ".join(cfg_type for cfg_type, _ in config_files) + ) + nested_output_prefix = " " + + error_types = [] + for idx, (cfg_type, cfg_file) in enumerate(config_files, 1): + if multi_config_output: + print(f"\n{idx}. {cfg_type} at {cfg_file}:") + try: + validate_cloudconfig_file(cfg_file, full_schema, args.annotate) except SchemaValidationError as e: if not args.annotate: - error(str(e)) + print(f"{nested_output_prefix}Invalid cloud-config {cfg_file}") + error( + str(e), + fmt=nested_output_prefix + "Error: {}\n", + ) + error_types.append(cfg_type) except RuntimeError as e: - error(str(e)) + print(f"{nested_output_prefix}Invalid cloud-config {cfg_type}") + error(str(e), fmt=nested_output_prefix + "Error: {}\n") + error_types.append(cfg_type) else: - if args.config_file is None: - cfg_name = "system userdata" - else: - cfg_name = args.config_file - print("Valid cloud-config:", cfg_name) - elif args.docs: - print(load_doc(args.docs)) + cfg = cfg_file if args.config_file else cfg_type + print(f"{nested_output_prefix}Valid cloud-config: {cfg}") + if error_types: + error( + ", ".join(error_type for error_type in error_types), + fmt="Error: Invalid cloud-config schema: {}\n", + sys_exit=True, + ) def main(): |