# coding: utf-8 import datetime import base64 import binascii import re import sys import types import warnings from collections.abc import Hashable, MutableSequence, MutableMapping # type: ignore # fmt: off from ruamel.yaml.error import (MarkedYAMLError, MarkedYAMLFutureWarning, MantissaNoDotYAML1_1Warning) from ruamel.yaml.nodes import * # NOQA from ruamel.yaml.nodes import (SequenceNode, MappingNode, ScalarNode) from ruamel.yaml.compat import (_F, builtins_module, # NOQA nprint, nprintf, version_tnf) from ruamel.yaml.compat import ordereddict # type: ignore from ruamel.yaml.comments import * # NOQA from ruamel.yaml.comments import (CommentedMap, CommentedOrderedMap, CommentedSet, CommentedKeySeq, CommentedSeq, TaggedScalar, CommentedKeyMap) from ruamel.yaml.scalarstring import (SingleQuotedScalarString, DoubleQuotedScalarString, LiteralScalarString, FoldedScalarString, PlainScalarString, ScalarString,) from ruamel.yaml.scalarint import ScalarInt, BinaryInt, OctalInt, HexInt, HexCapsInt from ruamel.yaml.scalarfloat import ScalarFloat from ruamel.yaml.scalarbool import ScalarBoolean from ruamel.yaml.timestamp import TimeStamp from ruamel.yaml.util import RegExp if False: # MYPY from typing import Any, Dict, List, Set, Generator, Union, Optional # NOQA __all__ = ['BaseConstructor', 'SafeConstructor', 'Constructor', 'ConstructorError', 'RoundTripConstructor'] # fmt: on class ConstructorError(MarkedYAMLError): pass class DuplicateKeyFutureWarning(MarkedYAMLFutureWarning): pass class DuplicateKeyError(MarkedYAMLFutureWarning): pass class BaseConstructor(object): yaml_constructors = {} # type: Dict[Any, Any] yaml_multi_constructors = {} # type: Dict[Any, Any] def __init__(self, preserve_quotes=None, loader=None): # type: (Optional[bool], Any) -> None self.loader = loader if self.loader is not None and getattr(self.loader, '_constructor', None) is None: self.loader._constructor = self self.loader = loader self.yaml_base_dict_type = dict self.yaml_base_list_type = list self.constructed_objects = {} # type: Dict[Any, Any] self.recursive_objects = {} # type: Dict[Any, Any] self.state_generators = [] # type: List[Any] self.deep_construct = False self._preserve_quotes = preserve_quotes self.allow_duplicate_keys = version_tnf((0, 15, 1), (0, 16)) @property def composer(self): # type: () -> Any if hasattr(self.loader, 'typ'): return self.loader.composer try: return self.loader._composer except AttributeError: sys.stdout.write('slt {}\n'.format(type(self))) sys.stdout.write('slc {}\n'.format(self.loader._composer)) sys.stdout.write('{}\n'.format(dir(self))) raise @property def resolver(self): # type: () -> Any if hasattr(self.loader, 'typ'): return self.loader.resolver return self.loader._resolver def check_data(self): # type: () -> Any # If there are more documents available? return self.composer.check_node() def get_data(self): # type: () -> Any # Construct and return the next document. if self.composer.check_node(): return self.construct_document(self.composer.get_node()) def get_single_data(self): # type: () -> Any # Ensure that the stream contains a single document and construct it. node = self.composer.get_single_node() if node is not None: return self.construct_document(node) return None def construct_document(self, node): # type: (Any) -> Any data = self.construct_object(node) while bool(self.state_generators): state_generators = self.state_generators self.state_generators = [] for generator in state_generators: for _dummy in generator: pass self.constructed_objects = {} self.recursive_objects = {} self.deep_construct = False return data def construct_object(self, node, deep=False): # type: (Any, bool) -> Any """deep is True when creating an object/mapping recursively, in that case want the underlying elements available during construction """ if node in self.constructed_objects: return self.constructed_objects[node] if deep: old_deep = self.deep_construct self.deep_construct = True if node in self.recursive_objects: return self.recursive_objects[node] # raise ConstructorError( # None, None, 'found unconstructable recursive node', node.start_mark # ) self.recursive_objects[node] = None data = self.construct_non_recursive_object(node) self.constructed_objects[node] = data del self.recursive_objects[node] if deep: self.deep_construct = old_deep return data def construct_non_recursive_object(self, node, tag=None): # type: (Any, Optional[str]) -> Any constructor = None # type: Any tag_suffix = None if tag is None: tag = node.tag if tag in self.yaml_constructors: constructor = self.yaml_constructors[tag] else: for tag_prefix in self.yaml_multi_constructors: if tag.startswith(tag_prefix): tag_suffix = tag[len(tag_prefix) :] constructor = self.yaml_multi_constructors[tag_prefix] break else: if None in self.yaml_multi_constructors: tag_suffix = tag constructor = self.yaml_multi_constructors[None] elif None in self.yaml_constructors: constructor = self.yaml_constructors[None] elif isinstance(node, ScalarNode): constructor = self.__class__.construct_scalar elif isinstance(node, SequenceNode): constructor = self.__class__.construct_sequence elif isinstance(node, MappingNode): constructor = self.__class__.construct_mapping if tag_suffix is None: data = constructor(self, node) else: data = constructor(self, tag_suffix, node) if isinstance(data, types.GeneratorType): generator = data data = next(generator) if self.deep_construct: for _dummy in generator: pass else: self.state_generators.append(generator) return data def construct_scalar(self, node): # type: (Any) -> Any if not isinstance(node, ScalarNode): raise ConstructorError( None, None, _F('expected a scalar node, but found {node_id!s}', node_id=node.id), node.start_mark, ) return node.value def construct_sequence(self, node, deep=False): # type: (Any, bool) -> Any """deep is True when creating an object/mapping recursively, in that case want the underlying elements available during construction """ if not isinstance(node, SequenceNode): raise ConstructorError( None, None, _F('expected a sequence node, but found {node_id!s}', node_id=node.id), node.start_mark, ) return [self.construct_object(child, deep=deep) for child in node.value] def construct_mapping(self, node, deep=False): # type: (Any, bool) -> Any """deep is True when creating an object/mapping recursively, in that case want the underlying elements available during construction """ if not isinstance(node, MappingNode): raise ConstructorError( None, None, _F('expected a mapping node, but found {node_id!s}', node_id=node.id), node.start_mark, ) total_mapping = self.yaml_base_dict_type() if getattr(node, 'merge', None) is not None: todo = [(node.merge, False), (node.value, False)] else: todo = [(node.value, True)] for values, check in todo: mapping = self.yaml_base_dict_type() # type: Dict[Any, Any] for key_node, value_node in values: # keys can be list -> deep key = self.construct_object(key_node, deep=True) # lists are not hashable, but tuples are if not isinstance(key, Hashable): if isinstance(key, list): key = tuple(key) if not isinstance(key, Hashable): raise ConstructorError( 'while constructing a mapping', node.start_mark, 'found unhashable key', key_node.start_mark, ) value = self.construct_object(value_node, deep=deep) if check: if self.check_mapping_key(node, key_node, mapping, key, value): mapping[key] = value else: mapping[key] = value total_mapping.update(mapping) return total_mapping def check_mapping_key(self, node, key_node, mapping, key, value): # type: (Any, Any, Any, Any, Any) -> bool """return True if key is unique""" if key in mapping: if not self.allow_duplicate_keys: mk = mapping.get(key) args = [ 'while constructing a mapping', node.start_mark, 'found duplicate key "{}" with value "{}" ' '(original value: "{}")'.format(key, value, mk), key_node.start_mark, """ To suppress this check see: http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys """, """\ Duplicate keys will become an error in future releases, and are errors by default when using the new API. """, ] if self.allow_duplicate_keys is None: warnings.warn(DuplicateKeyFutureWarning(*args)) else: raise DuplicateKeyError(*args) return False return True def check_set_key(self, node, key_node, setting, key): # type: (Any, Any, Any, Any, Any) -> None if key in setting: if not self.allow_duplicate_keys: args = [ 'while constructing a set', node.start_mark, 'found duplicate key "{}"'.format(key), key_node.start_mark, """ To suppress this check see: http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys """, """\ Duplicate keys will become an error in future releases, and are errors by default when using the new API. """, ] if self.allow_duplicate_keys is None: warnings.warn(DuplicateKeyFutureWarning(*args)) else: raise DuplicateKeyError(*args) def construct_pairs(self, node, deep=False): # type: (Any, bool) -> Any if not isinstance(node, MappingNode): raise ConstructorError( None, None, _F('expected a mapping node, but found {node_id!s}', node_id=node.id), node.start_mark, ) pairs = [] for key_node, value_node in node.value: key = self.construct_object(key_node, deep=deep) value = self.construct_object(value_node, deep=deep) pairs.append((key, value)) return pairs @classmethod def add_constructor(cls, tag, constructor): # type: (Any, Any) -> None if 'yaml_constructors' not in cls.__dict__: cls.yaml_constructors = cls.yaml_constructors.copy() cls.yaml_constructors[tag] = constructor @classmethod def add_multi_constructor(cls, tag_prefix, multi_constructor): # type: (Any, Any) -> None if 'yaml_multi_constructors' not in cls.__dict__: cls.yaml_multi_constructors = cls.yaml_multi_constructors.copy() cls.yaml_multi_constructors[tag_prefix] = multi_constructor class SafeConstructor(BaseConstructor): def construct_scalar(self, node): # type: (Any) -> Any if isinstance(node, MappingNode): for key_node, value_node in node.value: if key_node.tag == 'tag:yaml.org,2002:value': return self.construct_scalar(value_node) return BaseConstructor.construct_scalar(self, node) def flatten_mapping(self, node): # type: (Any) -> Any """ This implements the merge key feature http://yaml.org/type/merge.html by inserting keys from the merge dict/list of dicts if not yet available in this node """ merge = [] # type: List[Any] index = 0 while index < len(node.value): key_node, value_node = node.value[index] if key_node.tag == 'tag:yaml.org,2002:merge': if merge: # double << key if self.allow_duplicate_keys: del node.value[index] index += 1 continue args = [ 'while constructing a mapping', node.start_mark, 'found duplicate key "{}"'.format(key_node.value), key_node.start_mark, """ To suppress this check see: http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys """, """\ Duplicate keys will become an error in future releases, and are errors by default when using the new API. """, ] if self.allow_duplicate_keys is None: warnings.warn(DuplicateKeyFutureWarning(*args)) else: raise DuplicateKeyError(*args) del node.value[index] if isinstance(value_node, MappingNode): self.flatten_mapping(value_node) merge.extend(value_node.value) elif isinstance(value_node, SequenceNode): submerge = [] for subnode in value_node.value: if not isinstance(subnode, MappingNode): raise ConstructorError( 'while constructing a mapping', node.start_mark, _F( 'expected a mapping for merging, but found {subnode_id!s}', subnode_id=subnode.id, ), subnode.start_mark, ) self.flatten_mapping(subnode) submerge.append(subnode.value) submerge.reverse() for value in submerge: merge.extend(value) else: raise ConstructorError( 'while constructing a mapping', node.start_mark, _F( 'expected a mapping or list of mappings for merging, ' 'but found {value_node_id!s}', value_node_id=value_node.id, ), value_node.start_mark, ) elif key_node.tag == 'tag:yaml.org,2002:value': key_node.tag = 'tag:yaml.org,2002:str' index += 1 else: index += 1 if bool(merge): node.merge = merge # separate merge keys to be able to update without duplicate node.value = merge + node.value def construct_mapping(self, node, deep=False): # type: (Any, bool) -> Any """deep is True when creating an object/mapping recursively, in that case want the underlying elements available during construction """ if isinstance(node, MappingNode): self.flatten_mapping(node) return BaseConstructor.construct_mapping(self, node, deep=deep) def construct_yaml_null(self, node): # type: (Any) -> Any self.construct_scalar(node) return None # YAML 1.2 spec doesn't mention yes/no etc any more, 1.1 does bool_values = { 'yes': True, 'no': False, 'y': True, 'n': False, 'true': True, 'false': False, 'on': True, 'off': False, } def construct_yaml_bool(self, node): # type: (Any) -> bool value = self.construct_scalar(node) return self.bool_values[value.lower()] def construct_yaml_int(self, node): # type: (Any) -> int value_s = self.construct_scalar(node) value_s = value_s.replace('_', "") sign = +1 if value_s[0] == '-': sign = -1 if value_s[0] in '+-': value_s = value_s[1:] if value_s == '0': return 0 elif value_s.startswith('0b'): return sign * int(value_s[2:], 2) elif value_s.startswith('0x'): return sign * int(value_s[2:], 16) elif value_s.startswith('0o'): return sign * int(value_s[2:], 8) elif self.resolver.processing_version == (1, 1) and value_s[0] == '0': return sign * int(value_s, 8) elif self.resolver.processing_version == (1, 1) and ':' in value_s: digits = [int(part) for part in value_s.split(':')] digits.reverse() base = 1 value = 0 for digit in digits: value += digit * base base *= 60 return sign * value else: return sign * int(value_s) inf_value = 1e300 while inf_value != inf_value * inf_value: inf_value *= inf_value nan_value = -inf_value / inf_value # Trying to make a quiet NaN (like C99). def construct_yaml_float(self, node): # type: (Any) -> float value_so = self.construct_scalar(node) value_s = value_so.replace('_', "").lower() sign = +1 if value_s[0] == '-': sign = -1 if value_s[0] in '+-': value_s = value_s[1:] if value_s == '.inf': return sign * self.inf_value elif value_s == '.nan': return self.nan_value elif self.resolver.processing_version != (1, 2) and ':' in value_s: digits = [float(part) for part in value_s.split(':')] digits.reverse() base = 1 value = 0.0 for digit in digits: value += digit * base base *= 60 return sign * value else: if self.resolver.processing_version != (1, 2) and 'e' in value_s: # value_s is lower case independent of input mantissa, exponent = value_s.split('e') if '.' not in mantissa: warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so)) return sign * float(value_s) def construct_yaml_binary(self, node): # type: (Any) -> Any try: value = self.construct_scalar(node).encode('ascii') except UnicodeEncodeError as exc: raise ConstructorError( None, None, _F('failed to convert base64 data into ascii: {exc!s}', exc=exc), node.start_mark, ) try: if hasattr(base64, 'decodebytes'): return base64.decodebytes(value) else: return base64.decodestring(value) except binascii.Error as exc: raise ConstructorError( None, None, _F('failed to decode base64 data: {exc!s}', exc=exc), node.start_mark, ) timestamp_regexp = RegExp( """^(?P[0-9][0-9][0-9][0-9]) -(?P[0-9][0-9]?) -(?P[0-9][0-9]?) (?:((?P[Tt])|[ \\t]+) # explictly not retaining extra spaces (?P[0-9][0-9]?) :(?P[0-9][0-9]) :(?P[0-9][0-9]) (?:\\.(?P[0-9]*))? (?:[ \\t]*(?PZ|(?P[-+])(?P[0-9][0-9]?) (?::(?P[0-9][0-9]))?))?)?$""", re.X, ) def construct_yaml_timestamp(self, node, values=None): # type: (Any, Any) -> Any if values is None: try: match = self.timestamp_regexp.match(node.value) except TypeError: match = None if match is None: raise ConstructorError( None, None, 'failed to construct timestamp from "{}"'.format(node.value), node.start_mark, ) values = match.groupdict() year = int(values['year']) month = int(values['month']) day = int(values['day']) if not values['hour']: return datetime.date(year, month, day) hour = int(values['hour']) minute = int(values['minute']) second = int(values['second']) fraction = 0 if values['fraction']: fraction_s = values['fraction'][:6] while len(fraction_s) < 6: fraction_s += '0' fraction = int(fraction_s) if len(values['fraction']) > 6 and int(values['fraction'][6]) > 4: fraction += 1 delta = None if values['tz_sign']: tz_hour = int(values['tz_hour']) minutes = values['tz_minute'] tz_minute = int(minutes) if minutes else 0 delta = datetime.timedelta(hours=tz_hour, minutes=tz_minute) if values['tz_sign'] == '-': delta = -delta # should do something else instead (or hook this up to the preceding if statement # in reverse # if delta is None: # return datetime.datetime(year, month, day, hour, minute, second, fraction) # return datetime.datetime(year, month, day, hour, minute, second, fraction, # datetime.timezone.utc) # the above is not good enough though, should provide tzinfo. In Python3 that is easily # doable drop that kind of support for Python2 as it has not native tzinfo data = datetime.datetime(year, month, day, hour, minute, second, fraction) if delta: data -= delta return data def construct_yaml_omap(self, node): # type: (Any) -> Any # Note: we do now check for duplicate keys omap = ordereddict() yield omap if not isinstance(node, SequenceNode): raise ConstructorError( 'while constructing an ordered map', node.start_mark, _F('expected a sequence, but found {node_id!s}', node_id=node.id), node.start_mark, ) for subnode in node.value: if not isinstance(subnode, MappingNode): raise ConstructorError( 'while constructing an ordered map', node.start_mark, _F( 'expected a mapping of length 1, but found {subnode_id!s}', subnode_id=subnode.id, ), subnode.start_mark, ) if len(subnode.value) != 1: raise ConstructorError( 'while constructing an ordered map', node.start_mark, _F( 'expected a single mapping item, but found {len_subnode_val:d} items', len_subnode_val=len(subnode.value), ), subnode.start_mark, ) key_node, value_node = subnode.value[0] key = self.construct_object(key_node) assert key not in omap value = self.construct_object(value_node) omap[key] = value def construct_yaml_pairs(self, node): # type: (Any) -> Any # Note: the same code as `construct_yaml_omap`. pairs = [] # type: List[Any] yield pairs if not isinstance(node, SequenceNode): raise ConstructorError( 'while constructing pairs', node.start_mark, _F('expected a sequence, but found {node_id!s}', node_id=node.id), node.start_mark, ) for subnode in node.value: if not isinstance(subnode, MappingNode): raise ConstructorError( 'while constructing pairs', node.start_mark, _F( 'expected a mapping of length 1, but found {subnode_id!s}', subnode_id=subnode.id, ), subnode.start_mark, ) if len(subnode.value) != 1: raise ConstructorError( 'while constructing pairs', node.start_mark, _F( 'expected a single mapping item, but found {len_subnode_val:d} items', len_subnode_val=len(subnode.value), ), subnode.start_mark, ) key_node, value_node = subnode.value[0] key = self.construct_object(key_node) value = self.construct_object(value_node) pairs.append((key, value)) def construct_yaml_set(self, node): # type: (Any) -> Any data = set() # type: Set[Any] yield data value = self.construct_mapping(node) data.update(value) def construct_yaml_str(self, node): # type: (Any) -> Any value = self.construct_scalar(node) return value def construct_yaml_seq(self, node): # type: (Any) -> Any data = self.yaml_base_list_type() # type: List[Any] yield data data.extend(self.construct_sequence(node)) def construct_yaml_map(self, node): # type: (Any) -> Any data = self.yaml_base_dict_type() # type: Dict[Any, Any] yield data value = self.construct_mapping(node) data.update(value) def construct_yaml_object(self, node, cls): # type: (Any, Any) -> Any data = cls.__new__(cls) yield data if hasattr(data, '__setstate__'): state = self.construct_mapping(node, deep=True) data.__setstate__(state) else: state = self.construct_mapping(node) data.__dict__.update(state) def construct_undefined(self, node): # type: (Any) -> None raise ConstructorError( None, None, _F( 'could not determine a constructor for the tag {node_tag!r}', node_tag=node.tag ), node.start_mark, ) SafeConstructor.add_constructor('tag:yaml.org,2002:null', SafeConstructor.construct_yaml_null) SafeConstructor.add_constructor('tag:yaml.org,2002:bool', SafeConstructor.construct_yaml_bool) SafeConstructor.add_constructor('tag:yaml.org,2002:int', SafeConstructor.construct_yaml_int) SafeConstructor.add_constructor( 'tag:yaml.org,2002:float', SafeConstructor.construct_yaml_float ) SafeConstructor.add_constructor( 'tag:yaml.org,2002:binary', SafeConstructor.construct_yaml_binary ) SafeConstructor.add_constructor( 'tag:yaml.org,2002:timestamp', SafeConstructor.construct_yaml_timestamp ) SafeConstructor.add_constructor('tag:yaml.org,2002:omap', SafeConstructor.construct_yaml_omap) SafeConstructor.add_constructor( 'tag:yaml.org,2002:pairs', SafeConstructor.construct_yaml_pairs ) SafeConstructor.add_constructor('tag:yaml.org,2002:set', SafeConstructor.construct_yaml_set) SafeConstructor.add_constructor('tag:yaml.org,2002:str', SafeConstructor.construct_yaml_str) SafeConstructor.add_constructor('tag:yaml.org,2002:seq', SafeConstructor.construct_yaml_seq) SafeConstructor.add_constructor('tag:yaml.org,2002:map', SafeConstructor.construct_yaml_map) SafeConstructor.add_constructor(None, SafeConstructor.construct_undefined) class Constructor(SafeConstructor): def construct_python_str(self, node): # type: (Any) -> Any return self.construct_scalar(node) def construct_python_unicode(self, node): # type: (Any) -> Any return self.construct_scalar(node) def construct_python_bytes(self, node): # type: (Any) -> Any try: value = self.construct_scalar(node).encode('ascii') except UnicodeEncodeError as exc: raise ConstructorError( None, None, _F('failed to convert base64 data into ascii: {exc!s}', exc=exc), node.start_mark, ) try: if hasattr(base64, 'decodebytes'): return base64.decodebytes(value) else: return base64.decodestring(value) except binascii.Error as exc: raise ConstructorError( None, None, _F('failed to decode base64 data: {exc!s}', exc=exc), node.start_mark, ) def construct_python_long(self, node): # type: (Any) -> int val = self.construct_yaml_int(node) return val def construct_python_complex(self, node): # type: (Any) -> Any return complex(self.construct_scalar(node)) def construct_python_tuple(self, node): # type: (Any) -> Any return tuple(self.construct_sequence(node)) def find_python_module(self, name, mark): # type: (Any, Any) -> Any if not name: raise ConstructorError( 'while constructing a Python module', mark, 'expected non-empty name appended to the tag', mark, ) try: __import__(name) except ImportError as exc: raise ConstructorError( 'while constructing a Python module', mark, _F('cannot find module {name!r} ({exc!s})', name=name, exc=exc), mark, ) return sys.modules[name] def find_python_name(self, name, mark): # type: (Any, Any) -> Any if not name: raise ConstructorError( 'while constructing a Python object', mark, 'expected non-empty name appended to the tag', mark, ) if '.' in name: lname = name.split('.') lmodule_name = lname lobject_name = [] # type: List[Any] while len(lmodule_name) > 1: lobject_name.insert(0, lmodule_name.pop()) module_name = '.'.join(lmodule_name) try: __import__(module_name) # object_name = '.'.join(object_name) break except ImportError: continue else: module_name = builtins_module lobject_name = [name] try: __import__(module_name) except ImportError as exc: raise ConstructorError( 'while constructing a Python object', mark, _F( 'cannot find module {module_name!r} ({exc!s})', module_name=module_name, exc=exc, ), mark, ) module = sys.modules[module_name] object_name = '.'.join(lobject_name) obj = module while lobject_name: if not hasattr(obj, lobject_name[0]): raise ConstructorError( 'while constructing a Python object', mark, _F( 'cannot find {object_name!r} in the module {module_name!r}', object_name=object_name, module_name=module.__name__, ), mark, ) obj = getattr(obj, lobject_name.pop(0)) return obj def construct_python_name(self, suffix, node): # type: (Any, Any) -> Any value = self.construct_scalar(node) if value: raise ConstructorError( 'while constructing a Python name', node.start_mark, _F('expected the empty value, but found {value!r}', value=value), node.start_mark, ) return self.find_python_name(suffix, node.start_mark) def construct_python_module(self, suffix, node): # type: (Any, Any) -> Any value = self.construct_scalar(node) if value: raise ConstructorError( 'while constructing a Python module', node.start_mark, _F('expected the empty value, but found {value!r}', value=value), node.start_mark, ) return self.find_python_module(suffix, node.start_mark) def make_python_instance(self, suffix, node, args=None, kwds=None, newobj=False): # type: (Any, Any, Any, Any, bool) -> Any if not args: args = [] if not kwds: kwds = {} cls = self.find_python_name(suffix, node.start_mark) if newobj and isinstance(cls, type): return cls.__new__(cls, *args, **kwds) else: return cls(*args, **kwds) def set_python_instance_state(self, instance, state): # type: (Any, Any) -> None if hasattr(instance, '__setstate__'): instance.__setstate__(state) else: slotstate = {} # type: Dict[Any, Any] if isinstance(state, tuple) and len(state) == 2: state, slotstate = state if hasattr(instance, '__dict__'): instance.__dict__.update(state) elif state: slotstate.update(state) for key, value in slotstate.items(): setattr(instance, key, value) def construct_python_object(self, suffix, node): # type: (Any, Any) -> Any # Format: # !!python/object:module.name { ... state ... } instance = self.make_python_instance(suffix, node, newobj=True) self.recursive_objects[node] = instance yield instance deep = hasattr(instance, '__setstate__') state = self.construct_mapping(node, deep=deep) self.set_python_instance_state(instance, state) def construct_python_object_apply(self, suffix, node, newobj=False): # type: (Any, Any, bool) -> Any # Format: # !!python/object/apply # (or !!python/object/new) # args: [ ... arguments ... ] # kwds: { ... keywords ... } # state: ... state ... # listitems: [ ... listitems ... ] # dictitems: { ... dictitems ... } # or short format: # !!python/object/apply [ ... arguments ... ] # The difference between !!python/object/apply and !!python/object/new # is how an object is created, check make_python_instance for details. if isinstance(node, SequenceNode): args = self.construct_sequence(node, deep=True) kwds = {} # type: Dict[Any, Any] state = {} # type: Dict[Any, Any] listitems = [] # type: List[Any] dictitems = {} # type: Dict[Any, Any] else: value = self.construct_mapping(node, deep=True) args = value.get('args', []) kwds = value.get('kwds', {}) state = value.get('state', {}) listitems = value.get('listitems', []) dictitems = value.get('dictitems', {}) instance = self.make_python_instance(suffix, node, args, kwds, newobj) if bool(state): self.set_python_instance_state(instance, state) if bool(listitems): instance.extend(listitems) if bool(dictitems): for key in dictitems: instance[key] = dictitems[key] return instance def construct_python_object_new(self, suffix, node): # type: (Any, Any) -> Any return self.construct_python_object_apply(suffix, node, newobj=True) Constructor.add_constructor('tag:yaml.org,2002:python/none', Constructor.construct_yaml_null) Constructor.add_constructor('tag:yaml.org,2002:python/bool', Constructor.construct_yaml_bool) Constructor.add_constructor('tag:yaml.org,2002:python/str', Constructor.construct_python_str) Constructor.add_constructor( 'tag:yaml.org,2002:python/unicode', Constructor.construct_python_unicode ) Constructor.add_constructor( 'tag:yaml.org,2002:python/bytes', Constructor.construct_python_bytes ) Constructor.add_constructor('tag:yaml.org,2002:python/int', Constructor.construct_yaml_int) Constructor.add_constructor('tag:yaml.org,2002:python/long', Constructor.construct_python_long) Constructor.add_constructor('tag:yaml.org,2002:python/float', Constructor.construct_yaml_float) Constructor.add_constructor( 'tag:yaml.org,2002:python/complex', Constructor.construct_python_complex ) Constructor.add_constructor('tag:yaml.org,2002:python/list', Constructor.construct_yaml_seq) Constructor.add_constructor( 'tag:yaml.org,2002:python/tuple', Constructor.construct_python_tuple ) Constructor.add_constructor('tag:yaml.org,2002:python/dict', Constructor.construct_yaml_map) Constructor.add_multi_constructor( 'tag:yaml.org,2002:python/name:', Constructor.construct_python_name ) Constructor.add_multi_constructor( 'tag:yaml.org,2002:python/module:', Constructor.construct_python_module ) Constructor.add_multi_constructor( 'tag:yaml.org,2002:python/object:', Constructor.construct_python_object ) Constructor.add_multi_constructor( 'tag:yaml.org,2002:python/object/apply:', Constructor.construct_python_object_apply ) Constructor.add_multi_constructor( 'tag:yaml.org,2002:python/object/new:', Constructor.construct_python_object_new ) class RoundTripConstructor(SafeConstructor): """need to store the comments on the node itself, as well as on the items """ def construct_scalar(self, node): # type: (Any) -> Any if not isinstance(node, ScalarNode): raise ConstructorError( None, None, _F('expected a scalar node, but found {node_id!s}', node_id=node.id), node.start_mark, ) if node.style == '|' and isinstance(node.value, str): lss = LiteralScalarString(node.value, anchor=node.anchor) if node.comment and node.comment[1]: lss.comment = node.comment[1][0] # type: ignore return lss if node.style == '>' and isinstance(node.value, str): fold_positions = [] # type: List[int] idx = -1 while True: idx = node.value.find('\a', idx + 1) if idx < 0: break fold_positions.append(idx - len(fold_positions)) fss = FoldedScalarString(node.value.replace('\a', ''), anchor=node.anchor) if node.comment and node.comment[1]: fss.comment = node.comment[1][0] # type: ignore if fold_positions: fss.fold_pos = fold_positions # type: ignore return fss elif bool(self._preserve_quotes) and isinstance(node.value, str): if node.style == "'": return SingleQuotedScalarString(node.value, anchor=node.anchor) if node.style == '"': return DoubleQuotedScalarString(node.value, anchor=node.anchor) if node.anchor: return PlainScalarString(node.value, anchor=node.anchor) return node.value def construct_yaml_int(self, node): # type: (Any) -> Any width = None # type: Any value_su = self.construct_scalar(node) try: sx = value_su.rstrip('_') underscore = [len(sx) - sx.rindex('_') - 1, False, False] # type: Any except ValueError: underscore = None except IndexError: underscore = None value_s = value_su.replace('_', "") sign = +1 if value_s[0] == '-': sign = -1 if value_s[0] in '+-': value_s = value_s[1:] if value_s == '0': return 0 elif value_s.startswith('0b'): if self.resolver.processing_version > (1, 1) and value_s[2] == '0': width = len(value_s[2:]) if underscore is not None: underscore[1] = value_su[2] == '_' underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_' return BinaryInt( sign * int(value_s[2:], 2), width=width, underscore=underscore, anchor=node.anchor, ) elif value_s.startswith('0x'): # default to lower-case if no a-fA-F in string if self.resolver.processing_version > (1, 1) and value_s[2] == '0': width = len(value_s[2:]) hex_fun = HexInt # type: Any for ch in value_s[2:]: if ch in 'ABCDEF': # first non-digit is capital hex_fun = HexCapsInt break if ch in 'abcdef': break if underscore is not None: underscore[1] = value_su[2] == '_' underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_' return hex_fun( sign * int(value_s[2:], 16), width=width, underscore=underscore, anchor=node.anchor, ) elif value_s.startswith('0o'): if self.resolver.processing_version > (1, 1) and value_s[2] == '0': width = len(value_s[2:]) if underscore is not None: underscore[1] = value_su[2] == '_' underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_' return OctalInt( sign * int(value_s[2:], 8), width=width, underscore=underscore, anchor=node.anchor, ) elif self.resolver.processing_version != (1, 2) and value_s[0] == '0': return sign * int(value_s, 8) elif self.resolver.processing_version != (1, 2) and ':' in value_s: digits = [int(part) for part in value_s.split(':')] digits.reverse() base = 1 value = 0 for digit in digits: value += digit * base base *= 60 return sign * value elif self.resolver.processing_version > (1, 1) and value_s[0] == '0': # not an octal, an integer with leading zero(s) if underscore is not None: # cannot have a leading underscore underscore[2] = len(value_su) > 1 and value_su[-1] == '_' return ScalarInt(sign * int(value_s), width=len(value_s), underscore=underscore) elif underscore: # cannot have a leading underscore underscore[2] = len(value_su) > 1 and value_su[-1] == '_' return ScalarInt( sign * int(value_s), width=None, underscore=underscore, anchor=node.anchor ) elif node.anchor: return ScalarInt(sign * int(value_s), width=None, anchor=node.anchor) else: return sign * int(value_s) def construct_yaml_float(self, node): # type: (Any) -> Any def leading_zeros(v): # type: (Any) -> int lead0 = 0 idx = 0 while idx < len(v) and v[idx] in '0.': if v[idx] == '0': lead0 += 1 idx += 1 return lead0 # underscore = None m_sign = False # type: Any value_so = self.construct_scalar(node) value_s = value_so.replace('_', "").lower() sign = +1 if value_s[0] == '-': sign = -1 if value_s[0] in '+-': m_sign = value_s[0] value_s = value_s[1:] if value_s == '.inf': return sign * self.inf_value if value_s == '.nan': return self.nan_value if self.resolver.processing_version != (1, 2) and ':' in value_s: digits = [float(part) for part in value_s.split(':')] digits.reverse() base = 1 value = 0.0 for digit in digits: value += digit * base base *= 60 return sign * value if 'e' in value_s: try: mantissa, exponent = value_so.split('e') exp = 'e' except ValueError: mantissa, exponent = value_so.split('E') exp = 'E' if self.resolver.processing_version != (1, 2): # value_s is lower case independent of input if '.' not in mantissa: warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so)) lead0 = leading_zeros(mantissa) width = len(mantissa) prec = mantissa.find('.') if m_sign: width -= 1 e_width = len(exponent) e_sign = exponent[0] in '+-' # nprint('sf', width, prec, m_sign, exp, e_width, e_sign) return ScalarFloat( sign * float(value_s), width=width, prec=prec, m_sign=m_sign, m_lead0=lead0, exp=exp, e_width=e_width, e_sign=e_sign, anchor=node.anchor, ) width = len(value_so) prec = value_so.index('.') # you can use index, this would not be float without dot lead0 = leading_zeros(value_so) return ScalarFloat( sign * float(value_s), width=width, prec=prec, m_sign=m_sign, m_lead0=lead0, anchor=node.anchor, ) def construct_yaml_str(self, node): # type: (Any) -> Any value = self.construct_scalar(node) if isinstance(value, ScalarString): return value return value def construct_rt_sequence(self, node, seqtyp, deep=False): # type: (Any, Any, bool) -> Any if not isinstance(node, SequenceNode): raise ConstructorError( None, None, _F('expected a sequence node, but found {node_id!s}', node_id=node.id), node.start_mark, ) ret_val = [] if node.comment: seqtyp._yaml_add_comment(node.comment[:2]) if len(node.comment) > 2: # this happens e.g. if you have a sequence element that is a flow-style mapping # and that has no EOL comment but a following commentline or empty line seqtyp.yaml_end_comment_extend(node.comment[2], clear=True) if node.anchor: from ruamel.yaml.serializer import templated_id if not templated_id(node.anchor): seqtyp.yaml_set_anchor(node.anchor) for idx, child in enumerate(node.value): if child.comment: seqtyp._yaml_add_comment(child.comment, key=idx) child.comment = None # if moved to sequence remove from child ret_val.append(self.construct_object(child, deep=deep)) seqtyp._yaml_set_idx_line_col( idx, [child.start_mark.line, child.start_mark.column] ) return ret_val def flatten_mapping(self, node): # type: (Any) -> Any """ This implements the merge key feature http://yaml.org/type/merge.html by inserting keys from the merge dict/list of dicts if not yet available in this node """ def constructed(value_node): # type: (Any) -> Any # If the contents of a merge are defined within the # merge marker, then they won't have been constructed # yet. But if they were already constructed, we need to use # the existing object. if value_node in self.constructed_objects: value = self.constructed_objects[value_node] else: value = self.construct_object(value_node, deep=False) return value # merge = [] merge_map_list = [] # type: List[Any] index = 0 while index < len(node.value): key_node, value_node = node.value[index] if key_node.tag == 'tag:yaml.org,2002:merge': if merge_map_list: # double << key if self.allow_duplicate_keys: del node.value[index] index += 1 continue args = [ 'while constructing a mapping', node.start_mark, 'found duplicate key "{}"'.format(key_node.value), key_node.start_mark, """ To suppress this check see: http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys """, """\ Duplicate keys will become an error in future releases, and are errors by default when using the new API. """, ] if self.allow_duplicate_keys is None: warnings.warn(DuplicateKeyFutureWarning(*args)) else: raise DuplicateKeyError(*args) del node.value[index] if isinstance(value_node, MappingNode): merge_map_list.append((index, constructed(value_node))) # self.flatten_mapping(value_node) # merge.extend(value_node.value) elif isinstance(value_node, SequenceNode): # submerge = [] for subnode in value_node.value: if not isinstance(subnode, MappingNode): raise ConstructorError( 'while constructing a mapping', node.start_mark, _F( 'expected a mapping for merging, but found {subnode_id!s}', subnode_id=subnode.id, ), subnode.start_mark, ) merge_map_list.append((index, constructed(subnode))) # self.flatten_mapping(subnode) # submerge.append(subnode.value) # submerge.reverse() # for value in submerge: # merge.extend(value) else: raise ConstructorError( 'while constructing a mapping', node.start_mark, _F( 'expected a mapping or list of mappings for merging, ' 'but found {value_node_id!s}', value_node_id=value_node.id, ), value_node.start_mark, ) elif key_node.tag == 'tag:yaml.org,2002:value': key_node.tag = 'tag:yaml.org,2002:str' index += 1 else: index += 1 return merge_map_list # if merge: # node.value = merge + node.value def _sentinel(self): # type: () -> None pass def construct_mapping(self, node, maptyp, deep=False): # type: ignore # type: (Any, Any, bool) -> Any if not isinstance(node, MappingNode): raise ConstructorError( None, None, _F('expected a mapping node, but found {node_id!s}', node_id=node.id), node.start_mark, ) merge_map = self.flatten_mapping(node) # mapping = {} if node.comment: maptyp._yaml_add_comment(node.comment[:2]) if len(node.comment) > 2: maptyp.yaml_end_comment_extend(node.comment[2], clear=True) if node.anchor: from ruamel.yaml.serializer import templated_id if not templated_id(node.anchor): maptyp.yaml_set_anchor(node.anchor) last_key, last_value = None, self._sentinel for key_node, value_node in node.value: # keys can be list -> deep key = self.construct_object(key_node, deep=True) # lists are not hashable, but tuples are if not isinstance(key, Hashable): if isinstance(key, MutableSequence): key_s = CommentedKeySeq(key) if key_node.flow_style is True: key_s.fa.set_flow_style() elif key_node.flow_style is False: key_s.fa.set_block_style() key = key_s elif isinstance(key, MutableMapping): key_m = CommentedKeyMap(key) if key_node.flow_style is True: key_m.fa.set_flow_style() elif key_node.flow_style is False: key_m.fa.set_block_style() key = key_m if not isinstance(key, Hashable): raise ConstructorError( 'while constructing a mapping', node.start_mark, 'found unhashable key', key_node.start_mark, ) value = self.construct_object(value_node, deep=deep) if self.check_mapping_key(node, key_node, maptyp, key, value): if key_node.comment and len(key_node.comment) > 4 and key_node.comment[4]: if last_value is None: key_node.comment[0] = key_node.comment.pop(4) maptyp._yaml_add_comment(key_node.comment, value=last_key) else: key_node.comment[2] = key_node.comment.pop(4) maptyp._yaml_add_comment(key_node.comment, key=key) key_node.comment = None if key_node.comment: maptyp._yaml_add_comment(key_node.comment, key=key) if value_node.comment: maptyp._yaml_add_comment(value_node.comment, value=key) maptyp._yaml_set_kv_line_col( key, [ key_node.start_mark.line, key_node.start_mark.column, value_node.start_mark.line, value_node.start_mark.column, ], ) maptyp[key] = value last_key, last_value = key, value # could use indexing # do this last, or <<: before a key will prevent insertion in instances # of collections.OrderedDict (as they have no __contains__ if merge_map: maptyp.add_yaml_merge(merge_map) def construct_setting(self, node, typ, deep=False): # type: (Any, Any, bool) -> Any if not isinstance(node, MappingNode): raise ConstructorError( None, None, _F('expected a mapping node, but found {node_id!s}', node_id=node.id), node.start_mark, ) if node.comment: typ._yaml_add_comment(node.comment[:2]) if len(node.comment) > 2: typ.yaml_end_comment_extend(node.comment[2], clear=True) if node.anchor: from ruamel.yaml.serializer import templated_id if not templated_id(node.anchor): typ.yaml_set_anchor(node.anchor) for key_node, value_node in node.value: # keys can be list -> deep key = self.construct_object(key_node, deep=True) # lists are not hashable, but tuples are if not isinstance(key, Hashable): if isinstance(key, list): key = tuple(key) if not isinstance(key, Hashable): raise ConstructorError( 'while constructing a mapping', node.start_mark, 'found unhashable key', key_node.start_mark, ) # construct but should be null value = self.construct_object(value_node, deep=deep) # NOQA self.check_set_key(node, key_node, typ, key) if key_node.comment: typ._yaml_add_comment(key_node.comment, key=key) if value_node.comment: typ._yaml_add_comment(value_node.comment, value=key) typ.add(key) def construct_yaml_seq(self, node): # type: (Any) -> Any data = CommentedSeq() data._yaml_set_line_col(node.start_mark.line, node.start_mark.column) # if node.comment: # data._yaml_add_comment(node.comment) yield data data.extend(self.construct_rt_sequence(node, data)) self.set_collection_style(data, node) def construct_yaml_map(self, node): # type: (Any) -> Any data = CommentedMap() data._yaml_set_line_col(node.start_mark.line, node.start_mark.column) yield data self.construct_mapping(node, data, deep=True) self.set_collection_style(data, node) def set_collection_style(self, data, node): # type: (Any, Any) -> None if len(data) == 0: return if node.flow_style is True: data.fa.set_flow_style() elif node.flow_style is False: data.fa.set_block_style() def construct_yaml_object(self, node, cls): # type: (Any, Any) -> Any data = cls.__new__(cls) yield data if hasattr(data, '__setstate__'): state = SafeConstructor.construct_mapping(self, node, deep=True) data.__setstate__(state) else: state = SafeConstructor.construct_mapping(self, node) data.__dict__.update(state) def construct_yaml_omap(self, node): # type: (Any) -> Any # Note: we do now check for duplicate keys omap = CommentedOrderedMap() omap._yaml_set_line_col(node.start_mark.line, node.start_mark.column) if node.flow_style is True: omap.fa.set_flow_style() elif node.flow_style is False: omap.fa.set_block_style() yield omap if node.comment: omap._yaml_add_comment(node.comment[:2]) if len(node.comment) > 2: omap.yaml_end_comment_extend(node.comment[2], clear=True) if not isinstance(node, SequenceNode): raise ConstructorError( 'while constructing an ordered map', node.start_mark, _F('expected a sequence, but found {node_id!s}', node_id=node.id), node.start_mark, ) for subnode in node.value: if not isinstance(subnode, MappingNode): raise ConstructorError( 'while constructing an ordered map', node.start_mark, _F( 'expected a mapping of length 1, but found {subnode_id!s}', subnode_id=subnode.id, ), subnode.start_mark, ) if len(subnode.value) != 1: raise ConstructorError( 'while constructing an ordered map', node.start_mark, _F( 'expected a single mapping item, but found {len_subnode_val:d} items', len_subnode_val=len(subnode.value), ), subnode.start_mark, ) key_node, value_node = subnode.value[0] key = self.construct_object(key_node) assert key not in omap value = self.construct_object(value_node) if key_node.comment: omap._yaml_add_comment(key_node.comment, key=key) if subnode.comment: omap._yaml_add_comment(subnode.comment, key=key) if value_node.comment: omap._yaml_add_comment(value_node.comment, value=key) omap[key] = value def construct_yaml_set(self, node): # type: (Any) -> Any data = CommentedSet() data._yaml_set_line_col(node.start_mark.line, node.start_mark.column) yield data self.construct_setting(node, data) def construct_undefined(self, node): # type: (Any) -> Any try: if isinstance(node, MappingNode): data = CommentedMap() data._yaml_set_line_col(node.start_mark.line, node.start_mark.column) if node.flow_style is True: data.fa.set_flow_style() elif node.flow_style is False: data.fa.set_block_style() data.yaml_set_tag(node.tag) yield data if node.anchor: data.yaml_set_anchor(node.anchor) self.construct_mapping(node, data) return elif isinstance(node, ScalarNode): data2 = TaggedScalar() data2.value = self.construct_scalar(node) data2.style = node.style data2.yaml_set_tag(node.tag) yield data2 if node.anchor: data2.yaml_set_anchor(node.anchor, always_dump=True) return elif isinstance(node, SequenceNode): data3 = CommentedSeq() data3._yaml_set_line_col(node.start_mark.line, node.start_mark.column) if node.flow_style is True: data3.fa.set_flow_style() elif node.flow_style is False: data3.fa.set_block_style() data3.yaml_set_tag(node.tag) yield data3 if node.anchor: data3.yaml_set_anchor(node.anchor) data3.extend(self.construct_sequence(node)) return except: # NOQA pass raise ConstructorError( None, None, _F( 'could not determine a constructor for the tag {node_tag!r}', node_tag=node.tag ), node.start_mark, ) def construct_yaml_timestamp(self, node, values=None): # type: (Any, Any) -> Any try: match = self.timestamp_regexp.match(node.value) except TypeError: match = None if match is None: raise ConstructorError( None, None, 'failed to construct timestamp from "{}"'.format(node.value), node.start_mark, ) values = match.groupdict() if not values['hour']: return SafeConstructor.construct_yaml_timestamp(self, node, values) for part in ['t', 'tz_sign', 'tz_hour', 'tz_minute']: if values[part]: break else: return SafeConstructor.construct_yaml_timestamp(self, node, values) year = int(values['year']) month = int(values['month']) day = int(values['day']) hour = int(values['hour']) minute = int(values['minute']) second = int(values['second']) fraction = 0 if values['fraction']: fraction_s = values['fraction'][:6] while len(fraction_s) < 6: fraction_s += '0' fraction = int(fraction_s) if len(values['fraction']) > 6 and int(values['fraction'][6]) > 4: fraction += 1 delta = None if values['tz_sign']: tz_hour = int(values['tz_hour']) minutes = values['tz_minute'] tz_minute = int(minutes) if minutes else 0 delta = datetime.timedelta(hours=tz_hour, minutes=tz_minute) if values['tz_sign'] == '-': delta = -delta # shold check for NOne and solve issue 366 should be tzinfo=delta) if delta: dt = datetime.datetime(year, month, day, hour, minute) dt -= delta data = TimeStamp(dt.year, dt.month, dt.day, dt.hour, dt.minute, second, fraction) data._yaml['delta'] = delta tz = values['tz_sign'] + values['tz_hour'] if values['tz_minute']: tz += ':' + values['tz_minute'] data._yaml['tz'] = tz else: data = TimeStamp(year, month, day, hour, minute, second, fraction) if values['tz']: # no delta data._yaml['tz'] = values['tz'] if values['t']: data._yaml['t'] = True return data def construct_yaml_bool(self, node): # type: (Any) -> Any b = SafeConstructor.construct_yaml_bool(self, node) if node.anchor: return ScalarBoolean(b, anchor=node.anchor) return b RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:null', RoundTripConstructor.construct_yaml_null ) RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:bool', RoundTripConstructor.construct_yaml_bool ) RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:int', RoundTripConstructor.construct_yaml_int ) RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:float', RoundTripConstructor.construct_yaml_float ) RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:binary', RoundTripConstructor.construct_yaml_binary ) RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:timestamp', RoundTripConstructor.construct_yaml_timestamp ) RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:omap', RoundTripConstructor.construct_yaml_omap ) RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:pairs', RoundTripConstructor.construct_yaml_pairs ) RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:set', RoundTripConstructor.construct_yaml_set ) RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:str', RoundTripConstructor.construct_yaml_str ) RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:seq', RoundTripConstructor.construct_yaml_seq ) RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:map', RoundTripConstructor.construct_yaml_map ) RoundTripConstructor.add_constructor(None, RoundTripConstructor.construct_undefined)