summaryrefslogtreecommitdiff
path: root/cloudinit/config/schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/config/schema.py')
-rw-r--r--cloudinit/config/schema.py158
1 files changed, 118 insertions, 40 deletions
diff --git a/cloudinit/config/schema.py b/cloudinit/config/schema.py
index 9886bde6..9005e924 100644
--- a/cloudinit/config/schema.py
+++ b/cloudinit/config/schema.py
@@ -17,8 +17,14 @@ from typing import TYPE_CHECKING, List, NamedTuple, Optional, Type, Union, cast
import yaml
from cloudinit import importer, safeyaml
-from cloudinit.stages import Init
-from cloudinit.util import error, get_modules_from_dir, load_file
+from cloudinit.cmd.devel import read_cfg_paths
+from cloudinit.handlers import INCLUSION_TYPES_MAP, type_from_starts_with
+from cloudinit.util import (
+ decode_binary,
+ error,
+ get_modules_from_dir,
+ load_file,
+)
try:
from jsonschema import ValidationError as _ValidationError
@@ -35,7 +41,6 @@ VERSIONED_USERDATA_SCHEMA_FILE = "versions.schema.cloud-config.json"
# Also add new version definition to versions.schema.json.
USERDATA_SCHEMA_FILE = "schema-cloud-config-v1.json"
_YAML_MAP = {True: "true", False: "false", None: "null"}
-CLOUD_CONFIG_HEADER = b"#cloud-config"
SCHEMA_DOC_TMPL = """
{name}
{title_underbar}
@@ -64,6 +69,10 @@ SCHEMA_EXAMPLES_SPACER_TEMPLATE = "\n # --- Example{0} ---"
DEPRECATED_KEY = "deprecated"
DEPRECATED_PREFIX = "DEPRECATED: "
+# user-data files typically must begin with a leading '#'
+USERDATA_VALID_HEADERS = sorted(
+ [t for t in INCLUSION_TYPES_MAP.keys() if t[0] == "#"]
+)
# type-annotate only if type-checking.
# Consider to add `type_extensions` as a dependency when Bionic is EOL.
@@ -129,20 +138,26 @@ class SchemaValidationError(ValueError):
((flat.config.key, msg),)
"""
message = ""
- if schema_errors:
- message += _format_schema_problems(
- schema_errors, prefix="Cloud config schema errors: "
- )
- if schema_deprecations:
+
+ def handle_problems(problems, prefix):
+ if not problems:
+ return problems
+ nonlocal message
if message:
message += "\n\n"
- message += _format_schema_problems(
- schema_deprecations,
- prefix="Cloud config schema deprecations: ",
- )
+ problems = sorted(list(set(problems)))
+ message += _format_schema_problems(problems, prefix=prefix)
+ return problems
+
+ self.schema_errors = handle_problems(
+ schema_errors,
+ prefix="Cloud config schema errors: ",
+ )
+ self.schema_deprecations = handle_problems(
+ schema_deprecations,
+ prefix="Cloud config schema deprecations: ",
+ )
super().__init__(message)
- self.schema_errors = schema_errors
- self.schema_deprecations = schema_deprecations
def has_errors(self) -> bool:
return bool(self.schema_errors)
@@ -639,7 +654,12 @@ def annotated_cloudconfig_file(
)
-def validate_cloudconfig_file(config_path, schema, annotate=False):
+def validate_cloudconfig_file(
+ config_path: str,
+ schema: dict,
+ annotate: bool = False,
+ instance_data_path: str = None,
+):
"""Validate cloudconfig file adheres to a specific jsonschema.
@param config_path: Path to the yaml cloud-config file to parse, or None
@@ -647,28 +667,69 @@ def validate_cloudconfig_file(config_path, schema, annotate=False):
@param schema: Dict describing a valid jsonschema to validate against.
@param annotate: Boolean set True to print original config file with error
annotations on the offending lines.
+ @param instance_data_path: Path to instance_data JSON, used for text/jinja
+ rendering.
@raises SchemaValidationError containing any of schema_errors encountered.
@raises RuntimeError when config_path does not exist.
"""
+ from cloudinit.handlers.jinja_template import (
+ JinjaLoadError,
+ NotJinjaError,
+ render_jinja_payload_from_file,
+ )
+
content = load_file(config_path, decode=False)
- if not content.startswith(CLOUD_CONFIG_HEADER):
- errors = [
- SchemaProblem(
- "format-l1.c1",
- 'File {0} needs to begin with "{1}"'.format(
- config_path, CLOUD_CONFIG_HEADER.decode()
- ),
- ),
- ]
- error = SchemaValidationError(errors)
- if annotate:
- print(
- annotated_cloudconfig_file(
- {}, content, {}, schema_errors=error.schema_errors
+ user_data_type = type_from_starts_with(content)
+ schema_position = "format-l1.c1"
+ if not user_data_type:
+ raise SchemaValidationError(
+ [
+ SchemaProblem(
+ schema_position,
+ f"No valid cloud-init user-data header in {config_path}.\n"
+ "Expected first line to be one of: "
+ f"{', '.join(USERDATA_VALID_HEADERS)}",
)
- )
- raise error
+ ]
+ )
+ if user_data_type not in ("text/cloud-config", "text/jinja2"):
+ print(
+ f"User-data type '{user_data_type}' not currently evaluated"
+ " by cloud-init schema"
+ )
+ return
+ if user_data_type == "text/jinja2":
+ try:
+ content = render_jinja_payload_from_file(
+ decode_binary(content), config_path, instance_data_path
+ ).encode()
+ except NotJinjaError as e:
+ raise SchemaValidationError(
+ [
+ SchemaProblem(
+ schema_position,
+ "Detected type '{user_data_type}' from header. "
+ "But, content is not a jinja template",
+ )
+ ]
+ ) from e
+ except JinjaLoadError as e:
+ error(str(e), sys_exit=True)
+ schema_position = "format-l2.c1"
+ user_data_type = type_from_starts_with(content)
+ if not user_data_type:
+ content_header = content[: decode_binary(content).find("\n")]
+ raise SchemaValidationError(
+ [
+ SchemaProblem(
+ schema_position,
+ f"Unrecognized user-data header in {config_path}:"
+ f" {content_header}. Expected one of the following "
+ f"headers: {', '.join(USERDATA_VALID_HEADERS)}",
+ )
+ ]
+ )
try:
if annotate:
cloudconfig, marks = safeyaml.load_with_marks(content)
@@ -691,14 +752,14 @@ def validate_cloudconfig_file(config_path, schema, annotate=False):
"File {0} is not valid yaml. {1}".format(config_path, str(e)),
),
]
- error = SchemaValidationError(errors)
+ schema_error = SchemaValidationError(errors)
if annotate:
print(
annotated_cloudconfig_file(
- {}, content, {}, schema_errors=error.schema_errors
+ {}, content, {}, schema_errors=schema_error.schema_errors
)
)
- raise error from e
+ raise schema_error from e
if not isinstance(cloudconfig, dict):
# Return a meaningful message on empty cloud-config
if not annotate:
@@ -1153,6 +1214,16 @@ def get_parser(parser=None):
help="Path of the cloud-config yaml file to validate",
)
parser.add_argument(
+ "-i",
+ "--instance-data",
+ type=str,
+ help=(
+ "Path to instance-data.json file for variable expansion "
+ "of '##template: jinja' user-data. Default: "
+ f"{read_cfg_paths().get_runpath('instance_data')}"
+ ),
+ )
+ parser.add_argument(
"--system",
action="store_true",
default=False,
@@ -1193,6 +1264,13 @@ def handle_schema_args(name, args):
if args.docs:
print(load_doc(args.docs))
return
+ paths = read_cfg_paths()
+ if args.instance_data:
+ instance_data_path = args.instance_data
+ elif os.getuid() != 0:
+ instance_data_path = paths.get_runpath("instance_data")
+ else:
+ instance_data_path = paths.get_runpath("instance_data_sensitive")
if args.config_file:
config_files = (("user-data", args.config_file),)
else:
@@ -1202,9 +1280,7 @@ def handle_schema_args(name, args):
" user. Try using sudo.",
sys_exit=True,
)
- init = Init(ds_deps=[])
- init.fetch(existing="trust")
- userdata_file = init.paths.get_ipath("cloud_config")
+ userdata_file = paths.get_ipath("cloud_config")
if not userdata_file:
error(
"Unable to obtain user data file. No instance data available",
@@ -1213,8 +1289,8 @@ def handle_schema_args(name, args):
return # Helps typing
config_files = (("user-data", userdata_file),)
vendor_config_files = (
- ("vendor-data", init.paths.get_ipath("vendor_cloud_config")),
- ("vendor2-data", init.paths.get_ipath("vendor2_cloud_config")),
+ ("vendor-data", paths.get_ipath("vendor_cloud_config")),
+ ("vendor2-data", paths.get_ipath("vendor2_cloud_config")),
)
for cfg_type, vendor_file in vendor_config_files:
if vendor_file and os.path.exists(vendor_file):
@@ -1240,7 +1316,9 @@ def handle_schema_args(name, args):
if multi_config_output:
print(f"\n{idx}. {cfg_type} at {cfg_file}:")
try:
- validate_cloudconfig_file(cfg_file, full_schema, args.annotate)
+ validate_cloudconfig_file(
+ cfg_file, full_schema, args.annotate, instance_data_path
+ )
except SchemaValidationError as e:
if not args.annotate:
print(f"{nested_output_prefix}Invalid cloud-config {cfg_file}")