summaryrefslogtreecommitdiff
path: root/cloudinit/config/schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/config/schema.py')
-rw-r--r--cloudinit/config/schema.py220
1 files changed, 153 insertions, 67 deletions
diff --git a/cloudinit/config/schema.py b/cloudinit/config/schema.py
index 42792985..b0b5fccf 100644
--- a/cloudinit/config/schema.py
+++ b/cloudinit/config/schema.py
@@ -1,6 +1,5 @@
# This file is part of cloud-init. See LICENSE file for license information.
"""schema.py: Set of module functions for processing cloud-config schema."""
-
import argparse
import json
import logging
@@ -29,7 +28,6 @@ except ImportError:
ValidationError = Exception # type: ignore
-error = partial(error, sys_exit=True)
LOG = logging.getLogger(__name__)
VERSIONED_USERDATA_SCHEMA_FILE = "versions.schema.cloud-config.json"
@@ -164,17 +162,47 @@ def is_schema_byte_string(checker, instance):
) or isinstance(instance, (bytes,))
-def _add_deprecation_msg(description: Optional[str] = None) -> str:
- if description:
- return f"{DEPRECATED_PREFIX}{description}"
- return DEPRECATED_PREFIX.replace(":", ".").strip()
+def _add_deprecated_changed_or_new_msg(
+ config: dict, annotate=False, filter_key=None
+) -> str:
+ """combine description with new/changed/deprecated message
+
+ deprecated/changed/new keys require a _version key (this is verified
+ in a unittest), a _description key is optional
+ """
+
+ def format_message(key: str):
+ if not config.get(f"{key}"):
+ return ""
+ key_description = config.get(f"{key}_description", "")
+ v = config.get(
+ f"{key}_version",
+ f"<missing {key}_version key, please file a bug report>",
+ )
+ msg = f"{key.capitalize()} in version {v}. {key_description}"
+ if annotate:
+ return f" {msg}"
+ # italicised RST - no whitespace between astrisk and text
+ return f"\n\n*{msg.strip()}*"
-def _validator_deprecated(
+ # define print order
+ filter_keys = (
+ filter_key if filter_key else ["deprecated", "changed", "new"]
+ )
+
+ # build a deprecation/new/changed string
+ changed_new_deprecated = "".join(map(format_message, filter_keys))
+ description = config.get("description", "")
+ return f"{description}{changed_new_deprecated}".rstrip()
+
+
+def _validator(
_validator,
deprecated: bool,
_instance,
schema: dict,
+ filter_key: str,
error_type: Type[Exception] = SchemaDeprecationError,
):
"""Jsonschema validator for `deprecated` items.
@@ -183,11 +211,16 @@ def _validator_deprecated(
otherwise the instance is consider faulty.
"""
if deprecated:
- description = schema.get("description")
- msg = _add_deprecation_msg(description)
+ msg = _add_deprecated_changed_or_new_msg(
+ schema, annotate=True, filter_key=[filter_key]
+ )
yield error_type(msg)
+_validator_deprecated = partial(_validator, filter_key="deprecated")
+_validator_changed = partial(_validator, filter_key="changed")
+
+
def _anyOf(
validator,
anyOf,
@@ -316,6 +349,7 @@ def get_jsonschema_validator():
# Add deprecation handling
validators = dict(Draft4Validator.VALIDATORS)
validators[DEPRECATED_KEY] = _validator_deprecated
+ validators["changed"] = _validator_changed
validators["oneOf"] = _oneOf
validators["anyOf"] = _anyOf
@@ -382,7 +416,7 @@ def validate_cloudconfig_metaschema(validator, schema: dict, throw=True):
def validate_cloudconfig_schema(
config: dict,
- schema: dict = None,
+ schema: Optional[dict] = None,
strict: bool = False,
strict_metaschema: bool = False,
log_details: bool = True,
@@ -424,10 +458,14 @@ def validate_cloudconfig_schema(
errors: SchemaProblems = []
deprecations: SchemaProblems = []
- for error in sorted(validator.iter_errors(config), key=lambda e: e.path):
- path = ".".join([str(p) for p in error.path])
- problem = (SchemaProblem(path, error.message),)
- if isinstance(error, SchemaDeprecationError): # pylint: disable=W1116
+ for schema_error in sorted(
+ validator.iter_errors(config), key=lambda e: e.path
+ ):
+ path = ".".join([str(p) for p in schema_error.path])
+ problem = (SchemaProblem(path, schema_error.message),)
+ if isinstance(
+ schema_error, SchemaDeprecationError
+ ): # pylint: disable=W1116
deprecations += problem
else:
errors += problem
@@ -610,23 +648,7 @@ def validate_cloudconfig_file(config_path, schema, annotate=False):
@raises SchemaValidationError containing any of schema_errors encountered.
@raises RuntimeError when config_path does not exist.
"""
- if config_path is None:
- # Use system's raw userdata path
- if os.getuid() != 0:
- raise RuntimeError(
- "Unable to read system userdata as non-root user."
- " Try using sudo"
- )
- init = Init(ds_deps=[])
- init.fetch(existing="trust")
- init.consume_data()
- content = load_file(init.paths.get_ipath("cloud_config"), decode=False)
- else:
- if not os.path.exists(config_path):
- raise RuntimeError(
- "Configfile {0} does not exist".format(config_path)
- )
- content = load_file(config_path, decode=False)
+ content = load_file(config_path, decode=False)
if not content.startswith(CLOUD_CONFIG_HEADER):
errors = [
SchemaProblem(
@@ -693,7 +715,7 @@ def validate_cloudconfig_file(config_path, schema, annotate=False):
schema_deprecations=e.schema_deprecations,
)
)
- else:
+ elif e.schema_deprecations:
message = _format_schema_problems(
e.schema_deprecations,
prefix="Cloud config schema deprecations: ",
@@ -857,32 +879,35 @@ def _get_property_description(prop_config: dict) -> str:
Order and deprecated property description after active descriptions.
Add a trailing stop "." to any description not ending with ":".
"""
- prop_descr = prop_config.get("description", "")
+
+ def assign_descriptions(
+ config: dict, descriptions: list, deprecated_descriptions: list
+ ):
+ if any(
+ map(
+ config.get,
+ ("deprecated_version", "changed_version", "new_version"),
+ )
+ ):
+ deprecated_descriptions.append(
+ _add_deprecated_changed_or_new_msg(config)
+ )
+ elif config.get("description"):
+ descriptions.append(_add_deprecated_changed_or_new_msg(config))
+
oneOf = prop_config.get("oneOf", {})
anyOf = prop_config.get("anyOf", {})
- descriptions = []
- deprecated_descriptions = []
- if prop_descr:
- prop_descr = prop_descr.rstrip(".")
- if not prop_config.get(DEPRECATED_KEY):
- descriptions.append(prop_descr)
- else:
- deprecated_descriptions.append(_add_deprecation_msg(prop_descr))
+ descriptions: list = []
+ deprecated_descriptions: list = []
+
+ assign_descriptions(prop_config, descriptions, deprecated_descriptions)
for sub_item in chain(oneOf, anyOf):
- if not sub_item.get("description"):
- continue
- if not sub_item.get(DEPRECATED_KEY):
- descriptions.append(sub_item["description"].rstrip("."))
- else:
- deprecated_descriptions.append(
- f"{DEPRECATED_PREFIX}{sub_item['description'].rstrip('.')}"
- )
+ assign_descriptions(sub_item, descriptions, deprecated_descriptions)
+
# order deprecated descrs last
description = ". ".join(chain(descriptions, deprecated_descriptions))
if description:
description = f" {description}"
- if description[-1] != ":":
- description += "."
return description
@@ -1063,7 +1088,8 @@ def load_doc(requested_modules: list) -> str:
"Invalid --docs value {}. Must be one of: {}".format(
list(invalid_docs),
", ".join(all_modules),
- )
+ ),
+ sys_exit=True,
)
for mod_name in all_modules:
if "all" in requested_modules or mod_name in requested_modules:
@@ -1164,28 +1190,88 @@ def handle_schema_args(name, args):
"""Handle provided schema args and perform the appropriate actions."""
exclusive_args = [args.config_file, args.docs, args.system]
if len([arg for arg in exclusive_args if arg]) != 1:
- error("Expected one of --config-file, --system or --docs arguments")
+ error(
+ "Expected one of --config-file, --system or --docs arguments",
+ sys_exit=True,
+ )
if args.annotate and args.docs:
- error("Invalid flag combination. Cannot use --annotate with --docs")
+ error(
+ "Invalid flag combination. Cannot use --annotate with --docs",
+ sys_exit=True,
+ )
full_schema = get_schema()
- if args.config_file or args.system:
- try:
- validate_cloudconfig_file(
- args.config_file, full_schema, args.annotate
+ if args.docs:
+ print(load_doc(args.docs))
+ return
+ if args.config_file:
+ config_files = (("user-data", args.config_file),)
+ else:
+ if os.getuid() != 0:
+ error(
+ "Unable to read system userdata or vendordata as non-root"
+ " user. Try using sudo.",
+ sys_exit=True,
)
+ init = Init(ds_deps=[])
+ init.fetch(existing="trust")
+ userdata_file = init.paths.get_ipath("cloud_config")
+ if not userdata_file:
+ error(
+ "Unable to obtain user data file. No instance data available",
+ sys_exit=True,
+ )
+ return # Helps typing
+ config_files = (("user-data", userdata_file),)
+ vendor_config_files = (
+ ("vendor-data", init.paths.get_ipath("vendor_cloud_config")),
+ ("vendor2-data", init.paths.get_ipath("vendor2_cloud_config")),
+ )
+ for cfg_type, vendor_file in vendor_config_files:
+ if vendor_file and os.path.exists(vendor_file):
+ config_files += ((cfg_type, vendor_file),)
+ if not os.path.exists(config_files[0][1]):
+ error(
+ f"Config file {config_files[0][1]} does not exist",
+ fmt="Error: {}",
+ sys_exit=True,
+ )
+
+ nested_output_prefix = ""
+ multi_config_output = bool(len(config_files) > 1)
+ if multi_config_output:
+ print(
+ "Found cloud-config data types: %s"
+ % ", ".join(cfg_type for cfg_type, _ in config_files)
+ )
+ nested_output_prefix = " "
+
+ error_types = []
+ for idx, (cfg_type, cfg_file) in enumerate(config_files, 1):
+ if multi_config_output:
+ print(f"\n{idx}. {cfg_type} at {cfg_file}:")
+ try:
+ validate_cloudconfig_file(cfg_file, full_schema, args.annotate)
except SchemaValidationError as e:
if not args.annotate:
- error(str(e))
+ print(f"{nested_output_prefix}Invalid cloud-config {cfg_file}")
+ error(
+ str(e),
+ fmt=nested_output_prefix + "Error: {}\n",
+ )
+ error_types.append(cfg_type)
except RuntimeError as e:
- error(str(e))
+ print(f"{nested_output_prefix}Invalid cloud-config {cfg_type}")
+ error(str(e), fmt=nested_output_prefix + "Error: {}\n")
+ error_types.append(cfg_type)
else:
- if args.config_file is None:
- cfg_name = "system userdata"
- else:
- cfg_name = args.config_file
- print("Valid cloud-config:", cfg_name)
- elif args.docs:
- print(load_doc(args.docs))
+ cfg = cfg_file if args.config_file else cfg_type
+ print(f"{nested_output_prefix}Valid cloud-config: {cfg}")
+ if error_types:
+ error(
+ ", ".join(error_type for error_type in error_types),
+ fmt="Error: Invalid cloud-config schema: {}\n",
+ sys_exit=True,
+ )
def main():