# # Copyright (C) 2018 Codethink Limited # Copyright (C) 2019 Bloomberg LLP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Tristan Van Berkom # Daniel Silverstone # James Ennis # Benjamin Schubert import sys import string from contextlib import ExitStack from collections import OrderedDict from collections.abc import Mapping, Sequence from copy import deepcopy from ruamel import yaml from ._exceptions import LoadError, LoadErrorReason # Without this, pylint complains about all the `type(foo) is blah` checks # because it feels isinstance() is more idiomatic. Sadly, it is much slower to # do `isinstance(foo, blah)` for reasons I am unable to fathom. As such, we # blanket disable the check for this module. # # pylint: disable=unidiomatic-typecheck # Node() # # Container for YAML loaded data and its provenance # # All nodes returned (and all internal lists/strings) have this type (rather # than a plain tuple, to distinguish them in things like node_sanitize) # # Members: # value (str/list/dict): The loaded value. # file_index (int): Index within _FILE_LIST (a list of loaded file paths). # Negative indices indicate synthetic nodes so that # they can be referenced. # line (int): The line number within the file where the value appears. # col (int): The column number within the file where the value appears. # cdef class Node: def __init__(self, object value, int file_index, int line, int column): self.value = value self.file_index = file_index self.line = line self.column = column def __contains__(self, what): # Delegate to the inner value, though this will likely not work # very well if the node is a list or string, it's unlikely that # code which has access to such nodes would do this. return what in self.value # Metadata container for a yaml toplevel node. # # This class contains metadata around a yaml node in order to be able # to trace back the provenance of a node to the file. # cdef class FileInfo: cdef str filename, shortname, displayname cdef Node toplevel, cdef object project def __init__(self, str filename, str shortname, str displayname, Node toplevel, object project): self.filename = filename self.shortname = shortname self.displayname = displayname self.toplevel = toplevel self.project = project # File name handling cdef _FILE_LIST = [] # Purely synthetic node will have _SYNTHETIC_FILE_INDEX for the file number, have line number # zero, and a negative column number which comes from inverting the next value # out of this counter. Synthetic nodes created with a reference node will # have a file number from the reference node, some unknown line number, and # a negative column number from this counter. cdef int _SYNTHETIC_FILE_INDEX = -1 cdef int __counter = 0 cdef int next_synthetic_counter(): global __counter __counter -= 1 return __counter # Returned from node_get_provenance cdef class ProvenanceInformation: def __init__(self, Node nodeish): cdef FileInfo fileinfo self.node = nodeish if (nodeish is None) or (nodeish.file_index is None): self.filename = "" self.shortname = "" self.displayname = "" self.line = 1 self.col = 0 self.toplevel = None self.project = None else: fileinfo = _FILE_LIST[nodeish.file_index] self.filename = fileinfo.filename self.shortname = fileinfo.shortname self.displayname = fileinfo.displayname # We add 1 here to convert from computerish to humanish self.line = nodeish.line + 1 self.col = nodeish.column self.toplevel = fileinfo.toplevel self.project = fileinfo.project self.is_synthetic = (self.filename == '') or (self.col < 0) # Convert a Provenance to a string for error reporting def __str__(self): if self.is_synthetic: return "{} [synthetic node]".format(self.displayname) else: return "{} [line {:d} column {:d}]".format(self.displayname, self.line, self.col) # These exceptions are intended to be caught entirely within # the BuildStream framework, hence they do not reside in the # public exceptions.py class CompositeError(Exception): def __init__(self, path, message): super().__init__(message) self.path = path self.message = message class YAMLLoadError(Exception): pass # Represents the various states in which the Representer can be # while parsing yaml. cdef enum RepresenterState: doc init stream wait_key wait_list_item wait_value ctypedef RepresenterState (*representer_action)(Representer, object) # Representer for YAML events comprising input to the BuildStream format. # # All streams MUST represent a single document which must be a Mapping. # Anything else is considered an error. # # Mappings must only have string keys, values are always represented as # strings if they are scalar, or else as simple dictionaries and lists. # cdef class Representer: cdef int _file_index cdef RepresenterState state cdef list output, keys # Initialise a new representer # # The file index is used to store into the Node instances so that the # provenance of the YAML can be tracked. # # Args: # file_index (int): The index of this YAML file def __init__(self, int file_index): self._file_index = file_index self.state = RepresenterState.init self.output = [] self.keys = [] # Handle a YAML parse event # # Args: # event (YAML Event): The event to be handled # # Raises: # YAMLLoadError: Something went wrong. cdef void handle_event(self, event) except *: if getattr(event, "anchor", None) is not None: raise YAMLLoadError("Anchors are disallowed in BuildStream at line {} column {}" .format(event.start_mark.line, event.start_mark.column)) cdef str event_name = event.__class__.__name__ if event_name == "ScalarEvent": if event.tag is not None: if not event.tag.startswith("tag:yaml.org,2002:"): raise YAMLLoadError( "Non-core tag expressed in input. " + "This is disallowed in BuildStream. At line {} column {}" .format(event.start_mark.line, event.start_mark.column)) cdef representer_action handler = self._get_handler_for_event(event_name) if not handler: raise YAMLLoadError( "Invalid input detected. No handler for {} in state {} at line {} column {}" .format(event, self.state, event.start_mark.line, event.start_mark.column)) # Cython weirdness here, we need to pass self to the function self.state = handler(self, event) # pylint: disable=not-callable # Get the output of the YAML parse # # Returns: # (Node or None): Return the Node instance of the top level mapping or # None if there wasn't one. cdef Node get_output(self): if len(self.output): return self.output[0] return None cdef representer_action _get_handler_for_event(self, str event_name): if self.state == RepresenterState.wait_list_item: if event_name == "ScalarEvent": return self._handle_wait_list_item_ScalarEvent elif event_name == "MappingStartEvent": return self._handle_wait_list_item_MappingStartEvent elif event_name == "SequenceStartEvent": return self._handle_wait_list_item_SequenceStartEvent elif event_name == "SequenceEndEvent": return self._handle_wait_list_item_SequenceEndEvent elif self.state == RepresenterState.wait_value: if event_name == "ScalarEvent": return self._handle_wait_value_ScalarEvent elif event_name == "MappingStartEvent": return self._handle_wait_value_MappingStartEvent elif event_name == "SequenceStartEvent": return self._handle_wait_value_SequenceStartEvent elif self.state == RepresenterState.wait_key: if event_name == "ScalarEvent": return self._handle_wait_key_ScalarEvent elif event_name == "MappingEndEvent": return self._handle_wait_key_MappingEndEvent elif self.state == RepresenterState.stream: if event_name == "DocumentStartEvent": return self._handle_stream_DocumentStartEvent elif event_name == "StreamEndEvent": return self._handle_stream_StreamEndEvent elif self.state == RepresenterState.doc: if event_name == "MappingStartEvent": return self._handle_doc_MappingStartEvent elif event_name == "DocumentEndEvent": return self._handle_doc_DocumentEndEvent elif self.state == RepresenterState.init and event_name == "StreamStartEvent": return self._handle_init_StreamStartEvent return NULL cdef RepresenterState _handle_init_StreamStartEvent(self, object ev): return RepresenterState.stream cdef RepresenterState _handle_stream_DocumentStartEvent(self, object ev): return RepresenterState.doc cdef RepresenterState _handle_doc_MappingStartEvent(self, object ev): newmap = Node({}, self._file_index, ev.start_mark.line, ev.start_mark.column) self.output.append(newmap) return RepresenterState.wait_key cdef RepresenterState _handle_wait_key_ScalarEvent(self, object ev): self.keys.append(ev.value) return RepresenterState.wait_value cdef RepresenterState _handle_wait_value_ScalarEvent(self, object ev): key = self.keys.pop() ( ( self.output[-1]).value)[key] = \ Node(ev.value, self._file_index, ev.start_mark.line, ev.start_mark.column) return RepresenterState.wait_key cdef RepresenterState _handle_wait_value_MappingStartEvent(self, object ev): cdef RepresenterState new_state = self._handle_doc_MappingStartEvent(ev) key = self.keys.pop() ( ( self.output[-2]).value)[key] = self.output[-1] return new_state cdef RepresenterState _handle_wait_key_MappingEndEvent(self, object ev): # We've finished a mapping, so pop it off the output stack # unless it's the last one in which case we leave it if len(self.output) > 1: self.output.pop() if type(( self.output[-1]).value) is list: return RepresenterState.wait_list_item else: return RepresenterState.wait_key else: return RepresenterState.doc cdef RepresenterState _handle_wait_value_SequenceStartEvent(self, object ev): self.output.append(Node([], self._file_index, ev.start_mark.line, ev.start_mark.column)) ( ( self.output[-2]).value)[self.keys[-1]] = self.output[-1] return RepresenterState.wait_list_item cdef RepresenterState _handle_wait_list_item_SequenceStartEvent(self, object ev): self.keys.append(len(( self.output[-1]).value)) self.output.append(Node([], self._file_index, ev.start_mark.line, ev.start_mark.column)) ( ( self.output[-2]).value).append(self.output[-1]) return RepresenterState.wait_list_item cdef RepresenterState _handle_wait_list_item_SequenceEndEvent(self, object ev): # When ending a sequence, we need to pop a key because we retain the # key until the end so that if we need to mutate the underlying entry # we can. key = self.keys.pop() self.output.pop() if type(key) is int: return RepresenterState.wait_list_item else: return RepresenterState.wait_key cdef RepresenterState _handle_wait_list_item_ScalarEvent(self, object ev): ( self.output[-1]).value.append( Node(ev.value, self._file_index, ev.start_mark.line, ev.start_mark.column)) return RepresenterState.wait_list_item cdef RepresenterState _handle_wait_list_item_MappingStartEvent(self, object ev): cdef RepresenterState new_state = self._handle_doc_MappingStartEvent(ev) ( ( self.output[-2]).value).append(self.output[-1]) return new_state cdef RepresenterState _handle_doc_DocumentEndEvent(self, object ev): if len(self.output) != 1: raise YAMLLoadError("Zero, or more than one document found in YAML stream") return RepresenterState.stream cdef RepresenterState _handle_stream_StreamEndEvent(self, object ev): return RepresenterState.init # Loads a dictionary from some YAML # # Args: # filename (str): The YAML file to load # shortname (str): The filename in shorthand for error reporting (or None) # copy_tree (bool): Whether to make a copy, preserving the original toplevels # for later serialization # project (Project): The (optional) project to associate the parsed YAML with # # Returns (dict): A loaded copy of the YAML file with provenance information # # Raises: LoadError # cpdef Node load(str filename, str shortname=None, bint copy_tree=False, object project=None): if not shortname: shortname = filename cdef str displayname if (project is not None) and (project.junction is not None): displayname = "{}:{}".format(project.junction.name, shortname) else: displayname = shortname cdef Py_ssize_t file_number = len(_FILE_LIST) _FILE_LIST.append(FileInfo(filename, shortname, displayname, None, project)) cdef Node data try: with open(filename) as f: contents = f.read() data = load_data(contents, file_index=file_number, file_name=filename, copy_tree=copy_tree) return data except FileNotFoundError as e: raise LoadError(LoadErrorReason.MISSING_FILE, "Could not find file at {}".format(filename)) from e except IsADirectoryError as e: raise LoadError(LoadErrorReason.LOADING_DIRECTORY, "{} is a directory. bst command expects a .bst file." .format(filename)) from e except LoadError as e: raise LoadError(e.reason, "{}: {}".format(displayname, e)) from e # Like load(), but doesnt require the data to be in a file # cpdef Node load_data(str data, int file_index=_SYNTHETIC_FILE_INDEX, str file_name=None, bint copy_tree=False): cdef Representer rep cdef FileInfo f_info try: rep = Representer(file_index) parser = yaml.CParser(data) try: while parser.check_event(): rep.handle_event(parser.get_event()) finally: parser.dispose() contents = rep.get_output() except YAMLLoadError as e: raise LoadError(LoadErrorReason.INVALID_YAML, "Malformed YAML:\n\n{}\n\n".format(e)) from e except Exception as e: raise LoadError(LoadErrorReason.INVALID_YAML, "Severely malformed YAML:\n\n{}\n\n".format(e)) from e if type(contents) != Node: # Special case allowance for None, when the loaded file has only comments in it. if contents is None: contents = Node({}, file_index, 0, 0) else: raise LoadError(LoadErrorReason.INVALID_YAML, "YAML file has content of type '{}' instead of expected type 'dict': {}" .format(type(contents[0]).__name__, file_name)) # Store this away because we'll use it later for "top level" provenance if file_index is not None: f_info = _FILE_LIST[file_index] _FILE_LIST[file_index] = FileInfo( f_info.filename, f_info.shortname, f_info.displayname, contents, f_info.project, ) if copy_tree: contents = node_copy(contents) return contents # dump() # # Write a YAML node structure out to disk. # # This will always call `node_sanitize` on its input, so if you wanted # to output something close to what you read in, consider using the # `roundtrip_load` and `roundtrip_dump` function pair instead. # # Args: # contents (any): Content to write out # filename (str): The (optional) file name to write out to def dump(object contents, str filename=None): roundtrip_dump(node_sanitize(contents), file=filename) # node_get_provenance() # # Gets the provenance for a node # # Args: # node (Node): a dictionary # key (str): key in the dictionary # indices (list of indexes): Index path, in the case of list values # # Returns: The Provenance of the dict, member or list element # cpdef ProvenanceInformation node_get_provenance(Node node, str key=None, list indices=None): assert type(node.value) is dict if key is None: # Retrieving the provenance for this node directly return ProvenanceInformation(node) if key and not indices: return ProvenanceInformation(node.value.get(key)) cdef Node nodeish = node.value.get(key) for idx in indices: nodeish = nodeish.value[idx] return ProvenanceInformation(nodeish) # A sentinel to be used as a default argument for functions that need # to distinguish between a kwarg set to None and an unset kwarg. _sentinel = object() # node_get() # # Fetches a value from a dictionary node and checks it for # an expected value. Use default_value when parsing a value # which is only optionally supplied. # # Args: # node (dict): The dictionary node # expected_type (type): The expected type for the value being searched # key (str): The key to get a value for in node # indices (list of ints): Optionally decend into lists of lists # default_value: Optionally return this value if the key is not found # allow_none: (bool): Allow None to be a valid value # # Returns: # The value if found in node, otherwise default_value is returned # # Raises: # LoadError, when the value found is not of the expected type # # Note: # Returned strings are stripped of leading and trailing whitespace # cpdef object node_get(Node node, object expected_type, str key, list indices=None, object default_value=_sentinel, bint allow_none=False): if indices is None: value = node.value.get(key, _sentinel) if value is _sentinel: if default_value is _sentinel: provenance = node_get_provenance(node) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Dictionary did not contain expected key '{}'".format(provenance, key)) value = Node(default_value, _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter()) else: # Implied type check of the element itself # No need to synthesise useful node content as we destructure it immediately value = Node(node_get(node, list, key), _SYNTHETIC_FILE_INDEX, 0, 0) for index in indices: value = value.value[index] if type(value) is not Node: value = Node(value, _SYNTHETIC_FILE_INDEX, 0, 0) # Optionally allow None as a valid value for any type if value.value is None and (allow_none or default_value is None): return None if (expected_type is not None) and (type(value.value) is not expected_type): # Attempt basic conversions if possible, typically we want to # be able to specify numeric values and convert them to strings, # but we dont want to try converting dicts/lists try: if expected_type == bool and type(value.value) is str: # Dont coerce booleans to string, this makes "False" strings evaluate to True # We don't structure into full nodes since there's no need. if value.value in ('True', 'true'): value = Node(True, _SYNTHETIC_FILE_INDEX, 0, 0) elif value.value in ('False', 'false'): value = Node(False, _SYNTHETIC_FILE_INDEX, 0, 0) else: raise ValueError() elif not (expected_type == list or expected_type == dict or isinstance(value.value, (list, dict))): value = Node(expected_type(value.value), _SYNTHETIC_FILE_INDEX, 0, 0) else: raise ValueError() except (ValueError, TypeError): provenance = node_get_provenance(node, key=key, indices=indices) if indices: path = [key, *["[{:d}]".format(i) for i in indices]] path = "".join(path) else: path = key raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type '{}'" .format(provenance, path, expected_type.__name__)) # Now collapse lists, and scalars, to their value, leaving nodes as-is if type(value.value) is not dict: value = value.value # Trim it at the bud, let all loaded strings from yaml be stripped of whitespace if type(value) is str: value = value.strip() elif type(value) is list: # Now we create a fresh list which unwraps the str and list types # semi-recursively. value = __trim_list_provenance(value) return value cdef list __trim_list_provenance(list value): cdef list ret = [] cdef Node entry for entry in value: if type(entry.value) is list: ret.append(__trim_list_provenance(entry.value)) elif type(entry.value) is dict: ret.append(entry) else: ret.append(entry.value) return ret # node_set() # # Set an item within the node. If using `indices` be aware that the entry must # already exist, or else a KeyError will be raised. Use `node_extend_list` to # create entries before using `node_set` # # Args: # node (Node): The node # key (str): The key name # value: The value # indices: Any indices to index into the list referenced by key, like in # `node_get` (must be a list of integers) # cpdef void node_set(Node node, object key, object value, list indices=None) except *: cdef int idx if type(value) is list: value = __new_node_from_list(value) if indices: node = ( node.value)[key] key = indices.pop() for idx in indices: node = ( node.value)[idx] if type(value) is Node: node.value[key] = value else: try: # Need to do this just in case we're modifying a list old_value = node.value[key] except KeyError: old_value = None if old_value is None: node.value[key] = Node(value, node.file_index, node.line, next_synthetic_counter()) else: node.value[key] = Node(value, old_value.file_index, old_value.line, old_value.column) # node_extend_list() # # Extend a list inside a node to a given length, using the passed # default value to fill it out. # # Valid default values are: # Any string # An empty dict # An empty list # # Args: # node (node): The node # key (str): The list name in the node # length (int): The length to extend the list to # default (any): The default value to extend with. def node_extend_list(Node node, str key, Py_ssize_t length, object default): assert type(default) is str or default in ([], {}) cdef Node list_node = node.value.get(key) if list_node is None: list_node = node.value[key] = Node([], node.file_index, node.line, next_synthetic_counter()) cdef list the_list = list_node.value def_type = type(default) file_index = node.file_index if the_list: line_num = the_list[-1][2] else: line_num = list_node.line while length > len(the_list): if def_type is str: value = default elif def_type is list: value = [] else: value = {} line_num += 1 the_list.append(Node(value, file_index, line_num, next_synthetic_counter())) # node_items() # # A convenience generator for iterating over loaded key/value # tuples in a dictionary loaded from project YAML. # # Args: # node (Node): The dictionary node # # Yields: # (str): The key name # (anything): The value for the key # def node_items(Node node): cdef str key cdef Node value for key, value in node.value.items(): if type(value.value) is dict: yield (key, value) elif type(value.value) is list: yield (key, __trim_list_provenance(value.value)) else: yield (key, value.value) # node_keys() # # A convenience generator for iterating over loaded keys # in a dictionary loaded from project YAML. # # Args: # node (Node): The dictionary node # # Yields: # (str): The key name # cpdef list node_keys(Node node): return list(node.value.keys()) # node_del() # # A convenience generator for iterating over loaded key/value # tuples in a dictionary loaded from project YAML. # # Args: # node (dict): The dictionary node # key (str): The key we want to remove # safe (bool): Whether to raise a KeyError if unable # cpdef void node_del(Node node, str key, bint safe=False) except *: try: del node.value[key] except KeyError: if not safe: raise # is_node() # # A test method which returns whether or not the passed in value # is a valid YAML node. It is not valid to call this on a Node # object which is not a Mapping. # # Args: # maybenode (any): The object to test for nodeness # # Returns: # (bool): Whether or not maybenode was a Node # def is_node(maybenode): # It's a programming error to give this a Node which isn't a mapping # so assert that. assert (type(maybenode) is not Node) or (type(maybenode.value) is dict) # Now return the type check return type(maybenode) is Node # new_synthetic_file() # # Create a new synthetic mapping node, with an associated file entry # (in _FILE_LIST) such that later tracking can correctly determine which # file needs writing to in order to persist the changes. # # Args: # filename (str): The name of the synthetic file to create # project (Project): The optional project to associate this synthetic file with # # Returns: # (Node): An empty YAML mapping node, whose provenance is to this new # synthetic file # def new_synthetic_file(str filename, object project=None): cdef Py_ssize_t file_index = len(_FILE_LIST) cdef Node node = Node({}, file_index, 0, 0) _FILE_LIST.append(FileInfo(filename, filename, "".format(filename), node, project)) return node # new_empty_node() # # Args: # ref_node (Node): Optional node whose provenance should be referenced # # Returns # (Node): A new empty YAML mapping node # def new_empty_node(Node ref_node=None): if ref_node is not None: return Node({}, ref_node.file_index, ref_node.line, next_synthetic_counter()) else: return Node({}, _SYNTHETIC_FILE_INDEX, 0, 0) # new_node_from_dict() # # Args: # indict (dict): The input dictionary # # Returns: # (Node): A new synthetic YAML tree which represents this dictionary # cpdef Node new_node_from_dict(dict indict): cdef dict ret = {} cdef str k for k, v in indict.items(): vtype = type(v) if vtype is dict: ret[k] = new_node_from_dict(v) elif vtype is list: ret[k] = __new_node_from_list(v) else: ret[k] = Node(str(v), _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter()) return Node(ret, _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter()) # Internal function to help new_node_from_dict() to handle lists cdef Node __new_node_from_list(list inlist): cdef list ret = [] for v in inlist: vtype = type(v) if vtype is dict: ret.append(new_node_from_dict(v)) elif vtype is list: ret.append(__new_node_from_list(v)) else: ret.append(Node(str(v), _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter())) return Node(ret, _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter()) # _is_composite_list # # Checks if the given node is a Mapping with array composition # directives. # # Args: # node (value): Any node # # Returns: # (bool): True if node was a Mapping containing only # list composition directives # # Raises: # (LoadError): If node was a mapping and contained a mix of # list composition directives and other keys # cdef bint _is_composite_list(Node node): cdef bint has_directives = False cdef bint has_keys = False cdef str key if type(node.value) is dict: for key in node_keys(node): if key in ['(>)', '(<)', '(=)']: # pylint: disable=simplifiable-if-statement has_directives = True else: has_keys = True if has_keys and has_directives: provenance = node_get_provenance(node) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Dictionary contains array composition directives and arbitrary keys" .format(provenance)) return has_directives return False # _compose_composite_list() # # Composes a composite list (i.e. a dict with list composition directives) # on top of a target list which is a composite list itself. # # Args: # target (Node): A composite list # source (Node): A composite list # cdef void _compose_composite_list(Node target, Node source): clobber = source.value.get("(=)") prefix = source.value.get("(<)") suffix = source.value.get("(>)") if clobber is not None: # We want to clobber the target list # which basically means replacing the target list # with ourselves target.value["(=)"] = clobber if prefix is not None: target.value["(<)"] = prefix elif "(<)" in target.value: target.value["(<)"].value.clear() if suffix is not None: target.value["(>)"] = suffix elif "(>)" in target.value: target.value["(>)"].value.clear() else: # Not clobbering, so prefix the prefix and suffix the suffix if prefix is not None: if "(<)" in target.value: for v in reversed(prefix.value): target.value["(<)"].value.insert(0, v) else: target.value["(<)"] = prefix if suffix is not None: if "(>)" in target.value: target.value["(>)"].value.extend(suffix.value) else: target.value["(>)"] = suffix # _compose_list() # # Compose a composite list (a dict with composition directives) on top of a # simple list. # # Args: # target (Node): The target list to be composed into # source (Node): The composition list to be composed from # cdef void _compose_list(Node target, Node source): clobber = source.value.get("(=)") prefix = source.value.get("(<)") suffix = source.value.get("(>)") if clobber is not None: target.value.clear() target.value.extend(clobber.value) if prefix is not None: for v in reversed(prefix.value): target.value.insert(0, v) if suffix is not None: target.value.extend(suffix.value) # composite_dict() # # Compose one mapping node onto another # # Args: # target (Node): The target to compose into # source (Node): The source to compose from # path (list): The path to the current composition node # # Raises: CompositeError # cpdef void composite_dict(Node target, Node source, list path=None) except *: cdef str k cdef Node v, target_value if path is None: path = [] for k, v in source.value.items(): path.append(k) if type(v.value) is list: # List clobbers anything list-like target_value = target.value.get(k) if not (target_value is None or type(target_value.value) is list or _is_composite_list(target_value)): raise CompositeError(path, "{}: List cannot overwrite {} at: {}" .format(node_get_provenance(source, k), k, node_get_provenance(target, k))) # Looks good, clobber it target.value[k] = v elif _is_composite_list(v): if k not in target.value: # Composite list clobbers empty space target.value[k] = v elif type(target.value[k].value) is list: # Composite list composes into a list _compose_list(target.value[k], v) elif _is_composite_list(target.value[k]): # Composite list merges into composite list _compose_composite_list(target.value[k], v) else: # Else composing on top of normal dict or a scalar, so raise... raise CompositeError(path, "{}: Cannot compose lists onto {}".format( node_get_provenance(v), node_get_provenance(target.value[k]))) elif type(v.value) is dict: # We're composing a dict into target now if k not in target.value: # Target lacks a dict at that point, make a fresh one with # the same provenance as the incoming dict target.value[k] = Node({}, v.file_index, v.line, v.column) if type(target.value) is not dict: raise CompositeError(path, "{}: Cannot compose dictionary onto {}".format( node_get_provenance(v), node_get_provenance(target.value[k]))) composite_dict(target.value[k], v, path) else: target_value = target.value.get(k) if target_value is not None and type(target_value.value) is not str: raise CompositeError(path, "{}: Cannot compose scalar on non-scalar at {}".format( node_get_provenance(v), node_get_provenance(target.value[k]))) target.value[k] = v path.pop() # Like composite_dict(), but raises an all purpose LoadError for convenience # cpdef void composite(Node target, Node source) except *: assert type(source.value) is dict assert type(target.value) is dict try: composite_dict(target, source) except CompositeError as e: source_provenance = node_get_provenance(source) error_prefix = "" if source_provenance: error_prefix = "{}: ".format(source_provenance) raise LoadError(LoadErrorReason.ILLEGAL_COMPOSITE, "{}Failure composing {}: {}" .format(error_prefix, e.path, e.message)) from e # Like composite(target, source), but where target overrides source instead. # def composite_and_move(Node target, Node source): composite(source, target) cdef str key cdef Node value cdef list to_delete = [key for key in target.value.keys() if key not in source.value] for key, value in source.value.items(): target.value[key] = value for key in to_delete: del target.value[key] # Types we can short-circuit in node_sanitize for speed. __SANITIZE_SHORT_CIRCUIT_TYPES = (int, float, str, bool) # node_sanitize() # # Returns an alphabetically ordered recursive copy # of the source node with internal provenance information stripped. # # Only dicts are ordered, list elements are left in order. # cpdef object node_sanitize(object node, object dict_type=OrderedDict): node_type = type(node) # If we have an unwrappable node, unwrap it if node_type is Node: node = node.value node_type = type(node) # Short-circuit None which occurs ca. twice per element if node is None: return node # Next short-circuit integers, floats, strings, booleans, and tuples if node_type in __SANITIZE_SHORT_CIRCUIT_TYPES: return node # Now short-circuit lists. elif node_type is list: return [node_sanitize(elt, dict_type=dict_type) for elt in node] # Finally dict, and other Mappings need special handling elif node_type is dict: result = dict_type() key_list = [key for key, _ in node.items()] for key in sorted(key_list): result[key] = node_sanitize(node[key], dict_type=dict_type) return result # Sometimes we're handed tuples and we can't be sure what they contain # so we have to sanitize into them elif node_type is tuple: return tuple([node_sanitize(v, dict_type=dict_type) for v in node]) # Everything else just gets returned as-is. return node # node_validate() # # Validate the node so as to ensure the user has not specified # any keys which are unrecognized by buildstream (usually this # means a typo which would otherwise not trigger an error). # # Args: # node (Node): A dictionary loaded from YAML # valid_keys (list): A list of valid keys for the specified node # # Raises: # LoadError: In the case that the specified node contained # one or more invalid keys # cpdef void node_validate(Node node, list valid_keys) except *: # Probably the fastest way to do this: https://stackoverflow.com/a/23062482 cdef set valid_keys_set = set(valid_keys) cdef str key for key in node.value: if key not in valid_keys_set: provenance = node_get_provenance(node, key=key) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Unexpected key: {}".format(provenance, key)) # Node copying # # Unfortunately we copy nodes a *lot* and `isinstance()` is super-slow when # things from collections.abc get involved. The result is the following # intricate but substantially faster group of tuples and the use of `in`. # # If any of the {node,list}_copy routines raise a ValueError # then it's likely additional types need adding to these tuples. # These types just have their value copied __QUICK_TYPES = (str, bool) # These are the directives used to compose lists, we need this because it's # slightly faster during the node_final_assertions checks __NODE_ASSERT_COMPOSITION_DIRECTIVES = ('(>)', '(<)', '(=)') # node_copy() # # Make a deep copy of the given YAML node, preserving provenance. # # Args: # source (Node): The YAML node to copy # # Returns: # (Node): A deep copy of source with provenance preserved. # cpdef Node node_copy(Node source): cdef dict copy = {} cdef str key cdef Node value for key, value in source.value.items(): value_type = type(value.value) if value_type is dict: copy[key] = node_copy(value) elif value_type is list: copy[key] = _list_copy(value) elif value_type in __QUICK_TYPES: copy[key] = value else: raise ValueError("Unable to be quick about node_copy of {}".format(value_type)) return Node(copy, source.file_index, source.line, source.column) # Internal function to help node_copy() but for lists. cdef Node _list_copy(Node source): cdef list copy = [] cdef Node item for item in source.value: item_type = type(item.value) if item_type is dict: copy.append(node_copy(item)) elif item_type is list: copy.append(_list_copy(item)) elif item_type in __QUICK_TYPES: copy.append(item) else: raise ValueError("Unable to be quick about list_copy of {}".format(item_type)) return Node(copy, source.file_index, source.line, source.column) # node_final_assertions() # # This must be called on a fully loaded and composited node, # after all composition has completed. # # Args: # node (Mapping): The final composited node # # Raises: # (LoadError): If any assertions fail # cpdef void node_final_assertions(Node node) except *: cdef str key cdef Node value for key, value in node.value.items(): # Assert that list composition directives dont remain, this # indicates that the user intended to override a list which # never existed in the underlying data # if key in __NODE_ASSERT_COMPOSITION_DIRECTIVES: provenance = node_get_provenance(node, key) raise LoadError(LoadErrorReason.TRAILING_LIST_DIRECTIVE, "{}: Attempt to override non-existing list".format(provenance)) value_type = type(value.value) if value_type is dict: node_final_assertions(value) elif value_type is list: _list_final_assertions(value) # Helper function for node_final_assertions(), but for lists. def _list_final_assertions(Node values): for value in values.value: value_type = type(value.value) if value_type is dict: node_final_assertions(value) elif value_type is list: _list_final_assertions(value) # assert_symbol_name() # # A helper function to check if a loaded string is a valid symbol # name and to raise a consistent LoadError if not. For strings which # are required to be symbols. # # Args: # provenance (Provenance): The provenance of the loaded symbol, or None # symbol_name (str): The loaded symbol name # purpose (str): The purpose of the string, for an error message # allow_dashes (bool): Whether dashes are allowed for this symbol # # Raises: # LoadError: If the symbol_name is invalid # # Note that dashes are generally preferred for variable names and # usage in YAML, but things such as option names which will be # evaluated with jinja2 cannot use dashes. def assert_symbol_name(ProvenanceInformation provenance, str symbol_name, str purpose, *, bint allow_dashes=True): cdef str valid_chars = string.digits + string.ascii_letters + '_' if allow_dashes: valid_chars += '-' cdef bint valid = True if not symbol_name: valid = False elif any(x not in valid_chars for x in symbol_name): valid = False elif symbol_name[0] in string.digits: valid = False if not valid: detail = "Symbol names must contain only alphanumeric characters, " + \ "may not start with a digit, and may contain underscores" if allow_dashes: detail += " or dashes" message = "Invalid symbol name for {}: '{}'".format(purpose, symbol_name) if provenance is not None: message = "{}: {}".format(provenance, message) raise LoadError(LoadErrorReason.INVALID_SYMBOL_NAME, message, detail=detail) # node_find_target() # # Searches the given node tree for the given target node. # # This is typically used when trying to walk a path to a given node # for the purpose of then modifying a similar tree of objects elsewhere # # If the key is provided, then we actually hunt for the node represented by # target[key] and return its container, rather than hunting for target directly # # Args: # node (Node): The node at the root of the tree to search # target (Node): The node you are looking for in that tree # key (str): Optional string key within target node # # Returns: # (list): A path from `node` to `target` or None if `target` is not in the subtree cpdef list node_find_target(Node node, Node target, str key=None): if key is not None: target = target.value[key] cdef list path = [] if _walk_find_target(node, path, target): if key: # Remove key from end of path path = path[:-1] return path return None # Helper for node_find_target() which walks a value cdef bint _walk_find_target(Node node, list path, Node target): if node.file_index == target.file_index and node.line == target.line and node.column == target.column: return True elif type(node.value) is dict: return _walk_dict_node(node, path, target) elif type(node.value) is list: return _walk_list_node(node, path, target) return False # Helper for node_find_target() which walks a list cdef bint _walk_list_node(Node node, list path, Node target): cdef int i cdef Node v for i, v in enumerate(node.value): path.append(i) if _walk_find_target(v, path, target): return True del path[-1] return False # Helper for node_find_target() which walks a mapping cdef bint _walk_dict_node(Node node, list path, Node target): cdef str k cdef Node v for k, v in node.value.items(): path.append(k) if _walk_find_target(v, path, target): return True del path[-1] return False ############################################################################### # Roundtrip code # Always represent things consistently: yaml.RoundTripRepresenter.add_representer(OrderedDict, yaml.SafeRepresenter.represent_dict) # Always parse things consistently yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:int', yaml.RoundTripConstructor.construct_yaml_str) yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:float', yaml.RoundTripConstructor.construct_yaml_str) yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:bool', yaml.RoundTripConstructor.construct_yaml_str) yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:null', yaml.RoundTripConstructor.construct_yaml_str) yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:timestamp', yaml.RoundTripConstructor.construct_yaml_str) # HardlineDumper # # This is a dumper used during roundtrip_dump which forces every scalar to be # a plain string, in order to match the output format to the input format. # # If you discover something is broken, please add a test case to the roundtrip # test in tests/internals/yaml/roundtrip-test.yaml # class HardlineDumper(yaml.RoundTripDumper): def __init__(self, *args, **kwargs): yaml.RoundTripDumper.__init__(self, *args, **kwargs) # For each of YAML 1.1 and 1.2, force everything to be a plain string for version in [(1, 1), (1, 2), None]: self.add_version_implicit_resolver( version, u'tag:yaml.org,2002:str', yaml.util.RegExp(r'.*'), None) # roundtrip_load() # # Load a YAML file into memory in a form which allows roundtripping as best # as ruamel permits. # # Note, the returned objects can be treated as Mappings and Lists and Strings # but replacing content wholesale with plain dicts and lists may result # in a loss of comments and formatting. # # Args: # filename (str): The file to load in # allow_missing (bool): Optionally set this to True to allow missing files # # Returns: # (Mapping): The loaded YAML mapping. # # Raises: # (LoadError): If the file is missing, or a directory, this is raised. # Also if the YAML is malformed. # def roundtrip_load(filename, *, allow_missing=False): try: with open(filename, "r") as fh: data = fh.read() contents = roundtrip_load_data(data, filename=filename) except FileNotFoundError as e: if allow_missing: # Missing files are always empty dictionaries return {} else: raise LoadError(LoadErrorReason.MISSING_FILE, "Could not find file at {}".format(filename)) from e except IsADirectoryError as e: raise LoadError(LoadErrorReason.LOADING_DIRECTORY, "{} is a directory." .format(filename)) from e return contents # roundtrip_load_data() # # Parse the given contents as YAML, returning them as a roundtrippable data # structure. # # A lack of content will be returned as an empty mapping. # # Args: # contents (str): The contents to be parsed as YAML # filename (str): Optional filename to be used in error reports # # Returns: # (Mapping): The loaded YAML mapping # # Raises: # (LoadError): Raised on invalid YAML, or YAML which parses to something other # than a Mapping # def roundtrip_load_data(contents, *, filename=None): try: contents = yaml.load(contents, yaml.RoundTripLoader, preserve_quotes=True) except (yaml.scanner.ScannerError, yaml.composer.ComposerError, yaml.parser.ParserError) as e: raise LoadError(LoadErrorReason.INVALID_YAML, "Malformed YAML:\n\n{}\n\n{}\n".format(e.problem, e.problem_mark)) from e # Special case empty files at this point if contents is None: # We'll make them empty mappings like the main Node loader contents = {} if not isinstance(contents, Mapping): raise LoadError(LoadErrorReason.INVALID_YAML, "YAML file has content of type '{}' instead of expected type 'dict': {}" .format(type(contents).__name__, filename)) return contents # roundtrip_dump() # # Dumps the given contents as a YAML file. Ideally the contents came from # parsing with `roundtrip_load` or `roundtrip_load_data` so that they will be # dumped in the same form as they came from. # # If `file` is a string, it is the filename to write to, if `file` has a # `write` method, it's treated as a stream, otherwise output is to stdout. # # Args: # contents (Mapping or list): The content to write out as YAML. # file (any): The file to write to # def roundtrip_dump(contents, file=None): assert type(contents) is not Node def stringify_dict(thing): for k, v in thing.items(): if type(v) is str: pass elif isinstance(v, Mapping): stringify_dict(v) elif isinstance(v, Sequence): stringify_list(v) else: thing[k] = str(v) def stringify_list(thing): for i, v in enumerate(thing): if type(v) is str: pass elif isinstance(v, Mapping): stringify_dict(v) elif isinstance(v, Sequence): stringify_list(v) else: thing[i] = str(v) contents = deepcopy(contents) stringify_dict(contents) with ExitStack() as stack: if type(file) is str: from . import utils f = stack.enter_context(utils.save_file_atomic(file, 'w')) elif hasattr(file, 'write'): f = file else: f = sys.stdout yaml.round_trip_dump(contents, f, Dumper=HardlineDumper)