diff options
Diffstat (limited to 'cloudinit/config/schema.py')
-rw-r--r-- | cloudinit/config/schema.py | 158 |
1 files changed, 118 insertions, 40 deletions
diff --git a/cloudinit/config/schema.py b/cloudinit/config/schema.py index 9886bde6..9005e924 100644 --- a/cloudinit/config/schema.py +++ b/cloudinit/config/schema.py @@ -17,8 +17,14 @@ from typing import TYPE_CHECKING, List, NamedTuple, Optional, Type, Union, cast import yaml from cloudinit import importer, safeyaml -from cloudinit.stages import Init -from cloudinit.util import error, get_modules_from_dir, load_file +from cloudinit.cmd.devel import read_cfg_paths +from cloudinit.handlers import INCLUSION_TYPES_MAP, type_from_starts_with +from cloudinit.util import ( + decode_binary, + error, + get_modules_from_dir, + load_file, +) try: from jsonschema import ValidationError as _ValidationError @@ -35,7 +41,6 @@ VERSIONED_USERDATA_SCHEMA_FILE = "versions.schema.cloud-config.json" # Also add new version definition to versions.schema.json. USERDATA_SCHEMA_FILE = "schema-cloud-config-v1.json" _YAML_MAP = {True: "true", False: "false", None: "null"} -CLOUD_CONFIG_HEADER = b"#cloud-config" SCHEMA_DOC_TMPL = """ {name} {title_underbar} @@ -64,6 +69,10 @@ SCHEMA_EXAMPLES_SPACER_TEMPLATE = "\n # --- Example{0} ---" DEPRECATED_KEY = "deprecated" DEPRECATED_PREFIX = "DEPRECATED: " +# user-data files typically must begin with a leading '#' +USERDATA_VALID_HEADERS = sorted( + [t for t in INCLUSION_TYPES_MAP.keys() if t[0] == "#"] +) # type-annotate only if type-checking. # Consider to add `type_extensions` as a dependency when Bionic is EOL. @@ -129,20 +138,26 @@ class SchemaValidationError(ValueError): ((flat.config.key, msg),) """ message = "" - if schema_errors: - message += _format_schema_problems( - schema_errors, prefix="Cloud config schema errors: " - ) - if schema_deprecations: + + def handle_problems(problems, prefix): + if not problems: + return problems + nonlocal message if message: message += "\n\n" - message += _format_schema_problems( - schema_deprecations, - prefix="Cloud config schema deprecations: ", - ) + problems = sorted(list(set(problems))) + message += _format_schema_problems(problems, prefix=prefix) + return problems + + self.schema_errors = handle_problems( + schema_errors, + prefix="Cloud config schema errors: ", + ) + self.schema_deprecations = handle_problems( + schema_deprecations, + prefix="Cloud config schema deprecations: ", + ) super().__init__(message) - self.schema_errors = schema_errors - self.schema_deprecations = schema_deprecations def has_errors(self) -> bool: return bool(self.schema_errors) @@ -639,7 +654,12 @@ def annotated_cloudconfig_file( ) -def validate_cloudconfig_file(config_path, schema, annotate=False): +def validate_cloudconfig_file( + config_path: str, + schema: dict, + annotate: bool = False, + instance_data_path: str = None, +): """Validate cloudconfig file adheres to a specific jsonschema. @param config_path: Path to the yaml cloud-config file to parse, or None @@ -647,28 +667,69 @@ def validate_cloudconfig_file(config_path, schema, annotate=False): @param schema: Dict describing a valid jsonschema to validate against. @param annotate: Boolean set True to print original config file with error annotations on the offending lines. + @param instance_data_path: Path to instance_data JSON, used for text/jinja + rendering. @raises SchemaValidationError containing any of schema_errors encountered. @raises RuntimeError when config_path does not exist. """ + from cloudinit.handlers.jinja_template import ( + JinjaLoadError, + NotJinjaError, + render_jinja_payload_from_file, + ) + content = load_file(config_path, decode=False) - if not content.startswith(CLOUD_CONFIG_HEADER): - errors = [ - SchemaProblem( - "format-l1.c1", - 'File {0} needs to begin with "{1}"'.format( - config_path, CLOUD_CONFIG_HEADER.decode() - ), - ), - ] - error = SchemaValidationError(errors) - if annotate: - print( - annotated_cloudconfig_file( - {}, content, {}, schema_errors=error.schema_errors + user_data_type = type_from_starts_with(content) + schema_position = "format-l1.c1" + if not user_data_type: + raise SchemaValidationError( + [ + SchemaProblem( + schema_position, + f"No valid cloud-init user-data header in {config_path}.\n" + "Expected first line to be one of: " + f"{', '.join(USERDATA_VALID_HEADERS)}", ) - ) - raise error + ] + ) + if user_data_type not in ("text/cloud-config", "text/jinja2"): + print( + f"User-data type '{user_data_type}' not currently evaluated" + " by cloud-init schema" + ) + return + if user_data_type == "text/jinja2": + try: + content = render_jinja_payload_from_file( + decode_binary(content), config_path, instance_data_path + ).encode() + except NotJinjaError as e: + raise SchemaValidationError( + [ + SchemaProblem( + schema_position, + "Detected type '{user_data_type}' from header. " + "But, content is not a jinja template", + ) + ] + ) from e + except JinjaLoadError as e: + error(str(e), sys_exit=True) + schema_position = "format-l2.c1" + user_data_type = type_from_starts_with(content) + if not user_data_type: + content_header = content[: decode_binary(content).find("\n")] + raise SchemaValidationError( + [ + SchemaProblem( + schema_position, + f"Unrecognized user-data header in {config_path}:" + f" {content_header}. Expected one of the following " + f"headers: {', '.join(USERDATA_VALID_HEADERS)}", + ) + ] + ) try: if annotate: cloudconfig, marks = safeyaml.load_with_marks(content) @@ -691,14 +752,14 @@ def validate_cloudconfig_file(config_path, schema, annotate=False): "File {0} is not valid yaml. {1}".format(config_path, str(e)), ), ] - error = SchemaValidationError(errors) + schema_error = SchemaValidationError(errors) if annotate: print( annotated_cloudconfig_file( - {}, content, {}, schema_errors=error.schema_errors + {}, content, {}, schema_errors=schema_error.schema_errors ) ) - raise error from e + raise schema_error from e if not isinstance(cloudconfig, dict): # Return a meaningful message on empty cloud-config if not annotate: @@ -1153,6 +1214,16 @@ def get_parser(parser=None): help="Path of the cloud-config yaml file to validate", ) parser.add_argument( + "-i", + "--instance-data", + type=str, + help=( + "Path to instance-data.json file for variable expansion " + "of '##template: jinja' user-data. Default: " + f"{read_cfg_paths().get_runpath('instance_data')}" + ), + ) + parser.add_argument( "--system", action="store_true", default=False, @@ -1193,6 +1264,13 @@ def handle_schema_args(name, args): if args.docs: print(load_doc(args.docs)) return + paths = read_cfg_paths() + if args.instance_data: + instance_data_path = args.instance_data + elif os.getuid() != 0: + instance_data_path = paths.get_runpath("instance_data") + else: + instance_data_path = paths.get_runpath("instance_data_sensitive") if args.config_file: config_files = (("user-data", args.config_file),) else: @@ -1202,9 +1280,7 @@ def handle_schema_args(name, args): " user. Try using sudo.", sys_exit=True, ) - init = Init(ds_deps=[]) - init.fetch(existing="trust") - userdata_file = init.paths.get_ipath("cloud_config") + userdata_file = paths.get_ipath("cloud_config") if not userdata_file: error( "Unable to obtain user data file. No instance data available", @@ -1213,8 +1289,8 @@ def handle_schema_args(name, args): return # Helps typing config_files = (("user-data", userdata_file),) vendor_config_files = ( - ("vendor-data", init.paths.get_ipath("vendor_cloud_config")), - ("vendor2-data", init.paths.get_ipath("vendor2_cloud_config")), + ("vendor-data", paths.get_ipath("vendor_cloud_config")), + ("vendor2-data", paths.get_ipath("vendor2_cloud_config")), ) for cfg_type, vendor_file in vendor_config_files: if vendor_file and os.path.exists(vendor_file): @@ -1240,7 +1316,9 @@ def handle_schema_args(name, args): if multi_config_output: print(f"\n{idx}. {cfg_type} at {cfg_file}:") try: - validate_cloudconfig_file(cfg_file, full_schema, args.annotate) + validate_cloudconfig_file( + cfg_file, full_schema, args.annotate, instance_data_path + ) except SchemaValidationError as e: if not args.annotate: print(f"{nested_output_prefix}Invalid cloud-config {cfg_file}") |