# coding: utf-8 from __future__ import print_function, absolute_import, division import datetime import base64 import binascii import re import sys import types import warnings # fmt: off from ruamel.yaml.error import (MarkedYAMLError, MarkedYAMLFutureWarning, MantissaNoDotYAML1_1Warning) from ruamel.yaml.nodes import * # NOQA from ruamel.yaml.nodes import (SequenceNode, MappingNode, ScalarNode) from ruamel.yaml.compat import (utf8, builtins_module, to_str, PY2, PY3, # NOQA ordereddict, text_type, nprint, version_tnf, Hashable) from ruamel.yaml.comments import * # NOQA from ruamel.yaml.comments import (CommentedMap, CommentedOrderedMap, CommentedSet, CommentedKeySeq, CommentedSeq, TaggedScalar) from ruamel.yaml.scalarstring import * # NOQA from ruamel.yaml.scalarstring import (PreservedScalarString, SingleQuotedScalarString, DoubleQuotedScalarString, ScalarString) from ruamel.yaml.scalarint import ScalarInt, BinaryInt, OctalInt, HexInt, HexCapsInt from ruamel.yaml.scalarfloat import ScalarFloat from ruamel.yaml.timestamp import TimeStamp from ruamel.yaml.util import RegExp if False: # MYPY from typing import Any, Dict, List, Set, Generator, Union # NOQA __all__ = ['BaseConstructor', 'SafeConstructor', 'Constructor', 'ConstructorError', 'RoundTripConstructor'] # fmt: on class ConstructorError(MarkedYAMLError): pass class DuplicateKeyFutureWarning(MarkedYAMLFutureWarning): pass class DuplicateKeyError(MarkedYAMLFutureWarning): pass class BaseConstructor(object): yaml_constructors = {} # type: Dict[Any, Any] yaml_multi_constructors = {} # type: Dict[Any, Any] def __init__(self, preserve_quotes=None, loader=None): # type: (Union[None, bool], Any) -> None self.loader = loader if self.loader is not None and getattr(self.loader, '_constructor', None) is None: self.loader._constructor = self self.loader = loader self.constructed_objects = {} # type: Dict[Any, Any] self.recursive_objects = {} # type: Dict[Any, Any] self.state_generators = [] # type: List[Any] self.deep_construct = False self._preserve_quotes = preserve_quotes self.allow_duplicate_keys = version_tnf((0, 15, 1), (0, 16)) @property def composer(self): # type: () -> Any if hasattr(self.loader, 'typ'): return self.loader.composer try: return self.loader._composer except AttributeError: print('slt', type(self)) print('slc', self.loader._composer) print(dir(self)) raise @property def resolver(self): # type: () -> Any if hasattr(self.loader, 'typ'): return self.loader.resolver return self.loader._resolver def check_data(self): # type: () -> Any # If there are more documents available? return self.composer.check_node() def get_data(self): # type: () -> Any # Construct and return the next document. if self.composer.check_node(): return self.construct_document(self.composer.get_node()) def get_single_data(self): # type: () -> Any # Ensure that the stream contains a single document and construct it. node = self.composer.get_single_node() if node is not None: return self.construct_document(node) return None def construct_document(self, node): # type: (Any) -> Any data = self.construct_object(node) while bool(self.state_generators): state_generators = self.state_generators self.state_generators = [] for generator in state_generators: for _dummy in generator: pass self.constructed_objects = {} self.recursive_objects = {} self.deep_construct = False return data def construct_object(self, node, deep=False): # type: (Any, bool) -> Any """deep is True when creating an object/mapping recursively, in that case want the underlying elements available during construction """ if node in self.constructed_objects: return self.constructed_objects[node] if deep: old_deep = self.deep_construct self.deep_construct = True if node in self.recursive_objects: raise ConstructorError( None, None, 'found unconstructable recursive node', node.start_mark ) self.recursive_objects[node] = None constructor = None # type: Any tag_suffix = None if node.tag in self.yaml_constructors: constructor = self.yaml_constructors[node.tag] else: for tag_prefix in self.yaml_multi_constructors: if node.tag.startswith(tag_prefix): tag_suffix = node.tag[len(tag_prefix) :] constructor = self.yaml_multi_constructors[tag_prefix] break else: if None in self.yaml_multi_constructors: tag_suffix = node.tag constructor = self.yaml_multi_constructors[None] elif None in self.yaml_constructors: constructor = self.yaml_constructors[None] elif isinstance(node, ScalarNode): constructor = self.__class__.construct_scalar # type: ignore elif isinstance(node, SequenceNode): constructor = self.__class__.construct_sequence # type: ignore elif isinstance(node, MappingNode): constructor = self.__class__.construct_mapping # type: ignore if tag_suffix is None: data = constructor(self, node) else: data = constructor(self, tag_suffix, node) if isinstance(data, types.GeneratorType): generator = data data = next(generator) if self.deep_construct: for _dummy in generator: pass else: self.state_generators.append(generator) self.constructed_objects[node] = data del self.recursive_objects[node] if deep: self.deep_construct = old_deep return data def construct_scalar(self, node): # type: (Any) -> Any if not isinstance(node, ScalarNode): raise ConstructorError( None, None, 'expected a scalar node, but found %s' % node.id, node.start_mark ) return node.value def construct_sequence(self, node, deep=False): # type: (Any, bool) -> Any """deep is True when creating an object/mapping recursively, in that case want the underlying elements available during construction """ if not isinstance(node, SequenceNode): raise ConstructorError( None, None, 'expected a sequence node, but found %s' % node.id, node.start_mark ) return [self.construct_object(child, deep=deep) for child in node.value] def construct_mapping(self, node, deep=False): # type: (Any, bool) -> Any """deep is True when creating an object/mapping recursively, in that case want the underlying elements available during construction """ if not isinstance(node, MappingNode): raise ConstructorError( None, None, 'expected a mapping node, but found %s' % node.id, node.start_mark ) total_mapping = {} if getattr(node, 'merge', None) is not None: todo = [(node.merge, False), (node.value, False)] else: todo = [(node.value, True)] for values, check in todo: mapping = {} # type: Dict[Any, Any] for key_node, value_node in values: # keys can be list -> deep key = self.construct_object(key_node, deep=True) # lists are not hashable, but tuples are if not isinstance(key, Hashable): if isinstance(key, list): key = tuple(key) if PY2: try: hash(key) except TypeError as exc: raise ConstructorError( 'while constructing a mapping', node.start_mark, 'found unacceptable key (%s)' % exc, key_node.start_mark, ) else: if not isinstance(key, Hashable): raise ConstructorError( 'while constructing a mapping', node.start_mark, 'found unhashable key', key_node.start_mark, ) value = self.construct_object(value_node, deep=deep) if check: self.check_mapping_key(node, key_node, mapping, key, value) mapping[key] = value total_mapping.update(mapping) return total_mapping def check_mapping_key(self, node, key_node, mapping, key, value): # type: (Any, Any, Any, Any, Any) -> None if key in mapping: if not self.allow_duplicate_keys: args = [ 'while constructing a mapping', node.start_mark, 'found duplicate key "{}" with value "{}" ' '(original value: "{}")'.format(key, value, mapping.get(key)), key_node.start_mark, """ To suppress this check see: http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys """, """\ Duplicate keys will become an error in future releases, and are errors by default when using the new API. """, ] if self.allow_duplicate_keys is None: warnings.warn(DuplicateKeyFutureWarning(*args)) else: raise DuplicateKeyError(*args) def construct_pairs(self, node, deep=False): # type: (Any, bool) -> Any if not isinstance(node, MappingNode): raise ConstructorError( None, None, 'expected a mapping node, but found %s' % node.id, node.start_mark ) pairs = [] for key_node, value_node in node.value: key = self.construct_object(key_node, deep=deep) value = self.construct_object(value_node, deep=deep) pairs.append((key, value)) return pairs @classmethod def add_constructor(cls, tag, constructor): # type: (Any, Any) -> None if 'yaml_constructors' not in cls.__dict__: cls.yaml_constructors = cls.yaml_constructors.copy() cls.yaml_constructors[tag] = constructor @classmethod def add_multi_constructor(cls, tag_prefix, multi_constructor): # type: (Any, Any) -> None if 'yaml_multi_constructors' not in cls.__dict__: cls.yaml_multi_constructors = cls.yaml_multi_constructors.copy() cls.yaml_multi_constructors[tag_prefix] = multi_constructor class SafeConstructor(BaseConstructor): def construct_scalar(self, node): # type: (Any) -> Any if isinstance(node, MappingNode): for key_node, value_node in node.value: if key_node.tag == u'tag:yaml.org,2002:value': return self.construct_scalar(value_node) return BaseConstructor.construct_scalar(self, node) def flatten_mapping(self, node): # type: (Any) -> Any """ This implements the merge key feature http://yaml.org/type/merge.html by inserting keys from the merge dict/list of dicts if not yet available in this node """ merge = [] # type: List[Any] index = 0 while index < len(node.value): key_node, value_node = node.value[index] if key_node.tag == u'tag:yaml.org,2002:merge': del node.value[index] if isinstance(value_node, MappingNode): self.flatten_mapping(value_node) merge.extend(value_node.value) elif isinstance(value_node, SequenceNode): submerge = [] for subnode in value_node.value: if not isinstance(subnode, MappingNode): raise ConstructorError( 'while constructing a mapping', node.start_mark, 'expected a mapping for merging, but found %s' % subnode.id, subnode.start_mark, ) self.flatten_mapping(subnode) submerge.append(subnode.value) submerge.reverse() for value in submerge: merge.extend(value) else: raise ConstructorError( 'while constructing a mapping', node.start_mark, 'expected a mapping or list of mappings for merging, ' 'but found %s' % value_node.id, value_node.start_mark, ) elif key_node.tag == u'tag:yaml.org,2002:value': key_node.tag = u'tag:yaml.org,2002:str' index += 1 else: index += 1 if bool(merge): node.merge = merge # separate merge keys to be able to update without duplicate node.value = merge + node.value def construct_mapping(self, node, deep=False): # type: (Any, bool) -> Any """deep is True when creating an object/mapping recursively, in that case want the underlying elements available during construction """ if isinstance(node, MappingNode): self.flatten_mapping(node) return BaseConstructor.construct_mapping(self, node, deep=deep) def construct_yaml_null(self, node): # type: (Any) -> Any self.construct_scalar(node) return None # YAML 1.2 spec doesn't mention yes/no etc any more, 1.1 does bool_values = { u'yes': True, u'no': False, u'true': True, u'false': False, u'on': True, u'off': False, } def construct_yaml_bool(self, node): # type: (Any) -> bool value = self.construct_scalar(node) return self.bool_values[value.lower()] def construct_yaml_int(self, node): # type: (Any) -> int value_s = to_str(self.construct_scalar(node)) value_s = value_s.replace('_', "") sign = +1 if value_s[0] == '-': sign = -1 if value_s[0] in '+-': value_s = value_s[1:] if value_s == '0': return 0 elif value_s.startswith('0b'): return sign * int(value_s[2:], 2) elif value_s.startswith('0x'): return sign * int(value_s[2:], 16) elif value_s.startswith('0o'): return sign * int(value_s[2:], 8) elif self.resolver.processing_version != (1, 2) and value_s[0] == '0': return sign * int(value_s, 8) elif self.resolver.processing_version != (1, 2) and ':' in value_s: digits = [int(part) for part in value_s.split(':')] digits.reverse() base = 1 value = 0 for digit in digits: value += digit * base base *= 60 return sign * value else: return sign * int(value_s) inf_value = 1e300 while inf_value != inf_value * inf_value: inf_value *= inf_value nan_value = -inf_value / inf_value # Trying to make a quiet NaN (like C99). def construct_yaml_float(self, node): # type: (Any) -> float value_so = to_str(self.construct_scalar(node)) value_s = value_so.replace('_', "").lower() sign = +1 if value_s[0] == '-': sign = -1 if value_s[0] in '+-': value_s = value_s[1:] if value_s == '.inf': return sign * self.inf_value elif value_s == '.nan': return self.nan_value elif self.resolver.processing_version != (1, 2) and ':' in value_s: digits = [float(part) for part in value_s.split(':')] digits.reverse() base = 1 value = 0.0 for digit in digits: value += digit * base base *= 60 return sign * value else: if self.resolver.processing_version != (1, 2) and 'e' in value_s: # value_s is lower case independent of input mantissa, exponent = value_s.split('e') if '.' not in mantissa: warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so)) return sign * float(value_s) if PY3: def construct_yaml_binary(self, node): # type: (Any) -> Any try: value = self.construct_scalar(node).encode('ascii') except UnicodeEncodeError as exc: raise ConstructorError( None, None, 'failed to convert base64 data into ascii: %s' % exc, node.start_mark, ) try: if hasattr(base64, 'decodebytes'): return base64.decodebytes(value) else: return base64.decodestring(value) except binascii.Error as exc: raise ConstructorError( None, None, 'failed to decode base64 data: %s' % exc, node.start_mark ) else: def construct_yaml_binary(self, node): # type: (Any) -> Any value = self.construct_scalar(node) try: return to_str(value).decode('base64') except (binascii.Error, UnicodeEncodeError) as exc: raise ConstructorError( None, None, 'failed to decode base64 data: %s' % exc, node.start_mark ) timestamp_regexp = RegExp( u"""^(?P[0-9][0-9][0-9][0-9]) -(?P[0-9][0-9]?) -(?P[0-9][0-9]?) (?:((?P[Tt])|[ \\t]+) # explictly not retaining extra spaces (?P[0-9][0-9]?) :(?P[0-9][0-9]) :(?P[0-9][0-9]) (?:\\.(?P[0-9]*))? (?:[ \\t]*(?PZ|(?P[-+])(?P[0-9][0-9]?) (?::(?P[0-9][0-9]))?))?)?$""", re.X, ) def construct_yaml_timestamp(self, node, values=None): # type: (Any, Any) -> Any if values is None: try: match = self.timestamp_regexp.match(node.value) except TypeError: match = None if match is None: raise ConstructorError( None, None, 'failed to construct timestamp from "{}"'.format(node.value), node.start_mark, ) values = match.groupdict() year = int(values['year']) month = int(values['month']) day = int(values['day']) if not values['hour']: return datetime.date(year, month, day) hour = int(values['hour']) minute = int(values['minute']) second = int(values['second']) fraction = 0 if values['fraction']: fraction_s = values['fraction'][:6] while len(fraction_s) < 6: fraction_s += '0' fraction = int(fraction_s) if len(values['fraction']) > 6 and int(values['fraction'][6]) > 4: fraction += 1 delta = None if values['tz_sign']: tz_hour = int(values['tz_hour']) minutes = values['tz_minute'] tz_minute = int(minutes) if minutes else 0 delta = datetime.timedelta(hours=tz_hour, minutes=tz_minute) if values['tz_sign'] == '-': delta = -delta data = datetime.datetime(year, month, day, hour, minute, second, fraction) if delta: data -= delta return data def construct_yaml_omap(self, node): # type: (Any) -> Any # Note: we do now check for duplicate keys omap = ordereddict() yield omap if not isinstance(node, SequenceNode): raise ConstructorError( 'while constructing an ordered map', node.start_mark, 'expected a sequence, but found %s' % node.id, node.start_mark, ) for subnode in node.value: if not isinstance(subnode, MappingNode): raise ConstructorError( 'while constructing an ordered map', node.start_mark, 'expected a mapping of length 1, but found %s' % subnode.id, subnode.start_mark, ) if len(subnode.value) != 1: raise ConstructorError( 'while constructing an ordered map', node.start_mark, 'expected a single mapping item, but found %d items' % len(subnode.value), subnode.start_mark, ) key_node, value_node = subnode.value[0] key = self.construct_object(key_node) assert key not in omap value = self.construct_object(value_node) omap[key] = value def construct_yaml_pairs(self, node): # type: (Any) -> Any # Note: the same code as `construct_yaml_omap`. pairs = [] # type: List[Any] yield pairs if not isinstance(node, SequenceNode): raise ConstructorError( 'while constructing pairs', node.start_mark, 'expected a sequence, but found %s' % node.id, node.start_mark, ) for subnode in node.value: if not isinstance(subnode, MappingNode): raise ConstructorError( 'while constructing pairs', node.start_mark, 'expected a mapping of length 1, but found %s' % subnode.id, subnode.start_mark, ) if len(subnode.value) != 1: raise ConstructorError( 'while constructing pairs', node.start_mark, 'expected a single mapping item, but found %d items' % len(subnode.value), subnode.start_mark, ) key_node, value_node = subnode.value[0] key = self.construct_object(key_node) value = self.construct_object(value_node) pairs.append((key, value)) def construct_yaml_set(self, node): # type: (Any) -> Any data = set() # type: Set[Any] yield data value = self.construct_mapping(node) data.update(value) def construct_yaml_str(self, node): # type: (Any) -> Any value = self.construct_scalar(node) if PY3: return value try: return value.encode('ascii') except UnicodeEncodeError: return value def construct_yaml_seq(self, node): # type: (Any) -> Any data = [] # type: List[Any] yield data data.extend(self.construct_sequence(node)) def construct_yaml_map(self, node): # type: (Any) -> Any data = {} # type: Dict[Any, Any] yield data value = self.construct_mapping(node) data.update(value) def construct_yaml_object(self, node, cls): # type: (Any, Any) -> Any data = cls.__new__(cls) yield data if hasattr(data, '__setstate__'): state = self.construct_mapping(node, deep=True) data.__setstate__(state) else: state = self.construct_mapping(node) data.__dict__.update(state) def construct_undefined(self, node): # type: (Any) -> None raise ConstructorError( None, None, 'could not determine a constructor for the tag %r' % utf8(node.tag), node.start_mark, ) SafeConstructor.add_constructor(u'tag:yaml.org,2002:null', SafeConstructor.construct_yaml_null) SafeConstructor.add_constructor(u'tag:yaml.org,2002:bool', SafeConstructor.construct_yaml_bool) SafeConstructor.add_constructor(u'tag:yaml.org,2002:int', SafeConstructor.construct_yaml_int) SafeConstructor.add_constructor( u'tag:yaml.org,2002:float', SafeConstructor.construct_yaml_float ) SafeConstructor.add_constructor( u'tag:yaml.org,2002:binary', SafeConstructor.construct_yaml_binary ) SafeConstructor.add_constructor( u'tag:yaml.org,2002:timestamp', SafeConstructor.construct_yaml_timestamp ) SafeConstructor.add_constructor(u'tag:yaml.org,2002:omap', SafeConstructor.construct_yaml_omap) SafeConstructor.add_constructor( u'tag:yaml.org,2002:pairs', SafeConstructor.construct_yaml_pairs ) SafeConstructor.add_constructor(u'tag:yaml.org,2002:set', SafeConstructor.construct_yaml_set) SafeConstructor.add_constructor(u'tag:yaml.org,2002:str', SafeConstructor.construct_yaml_str) SafeConstructor.add_constructor(u'tag:yaml.org,2002:seq', SafeConstructor.construct_yaml_seq) SafeConstructor.add_constructor(u'tag:yaml.org,2002:map', SafeConstructor.construct_yaml_map) SafeConstructor.add_constructor(None, SafeConstructor.construct_undefined) if PY2: class classobj: pass class Constructor(SafeConstructor): def construct_python_str(self, node): # type: (Any) -> Any return utf8(self.construct_scalar(node)) def construct_python_unicode(self, node): # type: (Any) -> Any return self.construct_scalar(node) if PY3: def construct_python_bytes(self, node): # type: (Any) -> Any try: value = self.construct_scalar(node).encode('ascii') except UnicodeEncodeError as exc: raise ConstructorError( None, None, 'failed to convert base64 data into ascii: %s' % exc, node.start_mark, ) try: if hasattr(base64, 'decodebytes'): return base64.decodebytes(value) else: return base64.decodestring(value) except binascii.Error as exc: raise ConstructorError( None, None, 'failed to decode base64 data: %s' % exc, node.start_mark ) def construct_python_long(self, node): # type: (Any) -> int val = self.construct_yaml_int(node) if PY3: return val return int(val) def construct_python_complex(self, node): # type: (Any) -> Any return complex(self.construct_scalar(node)) def construct_python_tuple(self, node): # type: (Any) -> Any return tuple(self.construct_sequence(node)) def find_python_module(self, name, mark): # type: (Any, Any) -> Any if not name: raise ConstructorError( 'while constructing a Python module', mark, 'expected non-empty name appended to the tag', mark, ) try: __import__(name) except ImportError as exc: raise ConstructorError( 'while constructing a Python module', mark, 'cannot find module %r (%s)' % (utf8(name), exc), mark, ) return sys.modules[name] def find_python_name(self, name, mark): # type: (Any, Any) -> Any if not name: raise ConstructorError( 'while constructing a Python object', mark, 'expected non-empty name appended to the tag', mark, ) if u'.' in name: module_name, object_name = name.rsplit('.', 1) else: module_name = builtins_module object_name = name try: __import__(module_name) except ImportError as exc: raise ConstructorError( 'while constructing a Python object', mark, 'cannot find module %r (%s)' % (utf8(module_name), exc), mark, ) module = sys.modules[module_name] if not hasattr(module, object_name): raise ConstructorError( 'while constructing a Python object', mark, 'cannot find %r in the module %r' % (utf8(object_name), module.__name__), mark, ) return getattr(module, object_name) def construct_python_name(self, suffix, node): # type: (Any, Any) -> Any value = self.construct_scalar(node) if value: raise ConstructorError( 'while constructing a Python name', node.start_mark, 'expected the empty value, but found %r' % utf8(value), node.start_mark, ) return self.find_python_name(suffix, node.start_mark) def construct_python_module(self, suffix, node): # type: (Any, Any) -> Any value = self.construct_scalar(node) if value: raise ConstructorError( 'while constructing a Python module', node.start_mark, 'expected the empty value, but found %r' % utf8(value), node.start_mark, ) return self.find_python_module(suffix, node.start_mark) def make_python_instance(self, suffix, node, args=None, kwds=None, newobj=False): # type: (Any, Any, Any, Any, bool) -> Any if not args: args = [] if not kwds: kwds = {} cls = self.find_python_name(suffix, node.start_mark) if PY3: if newobj and isinstance(cls, type): return cls.__new__(cls, *args, **kwds) else: return cls(*args, **kwds) else: if newobj and isinstance(cls, type(classobj)) and not args and not kwds: instance = classobj() instance.__class__ = cls return instance elif newobj and isinstance(cls, type): return cls.__new__(cls, *args, **kwds) else: return cls(*args, **kwds) def set_python_instance_state(self, instance, state): # type: (Any, Any) -> None if hasattr(instance, '__setstate__'): instance.__setstate__(state) else: slotstate = {} # type: Dict[Any, Any] if isinstance(state, tuple) and len(state) == 2: state, slotstate = state if hasattr(instance, '__dict__'): instance.__dict__.update(state) elif state: slotstate.update(state) for key, value in slotstate.items(): setattr(object, key, value) def construct_python_object(self, suffix, node): # type: (Any, Any) -> Any # Format: # !!python/object:module.name { ... state ... } instance = self.make_python_instance(suffix, node, newobj=True) yield instance deep = hasattr(instance, '__setstate__') state = self.construct_mapping(node, deep=deep) self.set_python_instance_state(instance, state) def construct_python_object_apply(self, suffix, node, newobj=False): # type: (Any, Any, bool) -> Any # Format: # !!python/object/apply # (or !!python/object/new) # args: [ ... arguments ... ] # kwds: { ... keywords ... } # state: ... state ... # listitems: [ ... listitems ... ] # dictitems: { ... dictitems ... } # or short format: # !!python/object/apply [ ... arguments ... ] # The difference between !!python/object/apply and !!python/object/new # is how an object is created, check make_python_instance for details. if isinstance(node, SequenceNode): args = self.construct_sequence(node, deep=True) kwds = {} # type: Dict[Any, Any] state = {} # type: Dict[Any, Any] listitems = [] # type: List[Any] dictitems = {} # type: Dict[Any, Any] else: value = self.construct_mapping(node, deep=True) args = value.get('args', []) kwds = value.get('kwds', {}) state = value.get('state', {}) listitems = value.get('listitems', []) dictitems = value.get('dictitems', {}) instance = self.make_python_instance(suffix, node, args, kwds, newobj) if bool(state): self.set_python_instance_state(instance, state) if bool(listitems): instance.extend(listitems) if bool(dictitems): for key in dictitems: instance[key] = dictitems[key] return instance def construct_python_object_new(self, suffix, node): # type: (Any, Any) -> Any return self.construct_python_object_apply(suffix, node, newobj=True) Constructor.add_constructor(u'tag:yaml.org,2002:python/none', Constructor.construct_yaml_null) Constructor.add_constructor(u'tag:yaml.org,2002:python/bool', Constructor.construct_yaml_bool) Constructor.add_constructor(u'tag:yaml.org,2002:python/str', Constructor.construct_python_str) Constructor.add_constructor( u'tag:yaml.org,2002:python/unicode', Constructor.construct_python_unicode ) if PY3: Constructor.add_constructor( u'tag:yaml.org,2002:python/bytes', Constructor.construct_python_bytes ) Constructor.add_constructor(u'tag:yaml.org,2002:python/int', Constructor.construct_yaml_int) Constructor.add_constructor( u'tag:yaml.org,2002:python/long', Constructor.construct_python_long ) Constructor.add_constructor( u'tag:yaml.org,2002:python/float', Constructor.construct_yaml_float ) Constructor.add_constructor( u'tag:yaml.org,2002:python/complex', Constructor.construct_python_complex ) Constructor.add_constructor(u'tag:yaml.org,2002:python/list', Constructor.construct_yaml_seq) Constructor.add_constructor( u'tag:yaml.org,2002:python/tuple', Constructor.construct_python_tuple ) Constructor.add_constructor(u'tag:yaml.org,2002:python/dict', Constructor.construct_yaml_map) Constructor.add_multi_constructor( u'tag:yaml.org,2002:python/name:', Constructor.construct_python_name ) Constructor.add_multi_constructor( u'tag:yaml.org,2002:python/module:', Constructor.construct_python_module ) Constructor.add_multi_constructor( u'tag:yaml.org,2002:python/object:', Constructor.construct_python_object ) Constructor.add_multi_constructor( u'tag:yaml.org,2002:python/object/apply:', Constructor.construct_python_object_apply ) Constructor.add_multi_constructor( u'tag:yaml.org,2002:python/object/new:', Constructor.construct_python_object_new ) class RoundTripConstructor(SafeConstructor): """need to store the comments on the node itself, as well as on the items """ def construct_scalar(self, node): # type: (Any) -> Any if not isinstance(node, ScalarNode): raise ConstructorError( None, None, 'expected a scalar node, but found %s' % node.id, node.start_mark ) if node.style == '|' and isinstance(node.value, text_type): return PreservedScalarString(node.value) elif bool(self._preserve_quotes) and isinstance(node.value, text_type): if node.style == "'": return SingleQuotedScalarString(node.value) if node.style == '"': return DoubleQuotedScalarString(node.value) return node.value def construct_yaml_int(self, node): # type: (Any) -> Any width = None # type: Any value_su = to_str(self.construct_scalar(node)) try: sx = value_su.rstrip('_') underscore = [len(sx) - sx.rindex('_') - 1, False, False] # type: Any except ValueError: underscore = None except IndexError: underscore = None value_s = value_su.replace('_', "") sign = +1 if value_s[0] == '-': sign = -1 if value_s[0] in '+-': value_s = value_s[1:] if value_s == '0': return 0 elif value_s.startswith('0b'): if self.resolver.processing_version > (1, 1) and value_s[2] == '0': width = len(value_s[2:]) if underscore is not None: underscore[1] = value_su[2] == '_' underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_' return BinaryInt( # type: ignore sign * int(value_s[2:], 2), width=width, underscore=underscore ) elif value_s.startswith('0x'): # default to lower-case if no a-fA-F in string if self.resolver.processing_version > (1, 1) and value_s[2] == '0': width = len(value_s[2:]) hex_fun = HexInt # type: Any for ch in value_s[2:]: if ch in 'ABCDEF': # first non-digit is capital hex_fun = HexCapsInt break if ch in 'abcdef': break if underscore is not None: underscore[1] = value_su[2] == '_' underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_' return hex_fun(sign * int(value_s[2:], 16), width=width, underscore=underscore) elif value_s.startswith('0o'): if self.resolver.processing_version > (1, 1) and value_s[2] == '0': width = len(value_s[2:]) if underscore is not None: underscore[1] = value_su[2] == '_' underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_' return OctalInt( # type: ignore sign * int(value_s[2:], 8), width=width, underscore=underscore ) elif self.resolver.processing_version != (1, 2) and value_s[0] == '0': return sign * int(value_s, 8) elif self.resolver.processing_version != (1, 2) and ':' in value_s: digits = [int(part) for part in value_s.split(':')] digits.reverse() base = 1 value = 0 for digit in digits: value += digit * base base *= 60 return sign * value elif self.resolver.processing_version > (1, 1) and value_s[0] == '0': # not an octal, an integer with leading zero(s) if underscore is not None: # cannot have a leading underscore underscore[2] = len(value_su) > 1 and value_su[-1] == '_' return ScalarInt( # type: ignore sign * int(value_s), width=len(value_s), underscore=underscore ) elif underscore: # cannot have a leading underscore underscore[2] = len(value_su) > 1 and value_su[-1] == '_' return ScalarInt( # type: ignore sign * int(value_s), width=None, underscore=underscore ) else: return sign * int(value_s) def construct_yaml_float(self, node): # type: (Any) -> Any def leading_zeros(v): # type: (Any) -> int lead0 = 0 idx = 0 while idx < len(v) and v[idx] in '0.': if v[idx] == '0': lead0 += 1 idx += 1 return lead0 # underscore = None m_sign = False # type: Any value_so = to_str(self.construct_scalar(node)) value_s = value_so.replace('_', "").lower() sign = +1 if value_s[0] == '-': sign = -1 if value_s[0] in '+-': m_sign = value_s[0] value_s = value_s[1:] if value_s == '.inf': return sign * self.inf_value if value_s == '.nan': return self.nan_value if self.resolver.processing_version != (1, 2) and ':' in value_s: digits = [float(part) for part in value_s.split(':')] digits.reverse() base = 1 value = 0.0 for digit in digits: value += digit * base base *= 60 return sign * value if 'e' in value_s: try: mantissa, exponent = value_so.split('e') exp = 'e' except ValueError: mantissa, exponent = value_so.split('E') exp = 'E' if self.resolver.processing_version != (1, 2): # value_s is lower case independent of input if '.' not in mantissa: warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so)) lead0 = leading_zeros(mantissa) width = len(mantissa) prec = mantissa.find('.') if m_sign: width -= 1 e_width = len(exponent) e_sign = exponent[0] in '+-' # print('sf', width, prec, m_sign, exp, e_width, e_sign) return ScalarFloat( # type: ignore sign * float(value_s), width=width, prec=prec, m_sign=m_sign, m_lead0=lead0, exp=exp, e_width=e_width, e_sign=e_sign, ) width = len(value_so) prec = value_so.index('.') # you can use index, this would not be float without dot lead0 = leading_zeros(value_so) return ScalarFloat( # type: ignore sign * float(value_s), width=width, prec=prec, m_sign=m_sign, m_lead0=lead0 ) def construct_yaml_str(self, node): # type: (Any) -> Any value = self.construct_scalar(node) if isinstance(value, ScalarString): return value if PY3: return value try: return value.encode('ascii') except AttributeError: # in case you replace the node dynamically e.g. with a dict return value except UnicodeEncodeError: return value def construct_rt_sequence(self, node, seqtyp, deep=False): # type: (Any, Any, bool) -> Any if not isinstance(node, SequenceNode): raise ConstructorError( None, None, 'expected a sequence node, but found %s' % node.id, node.start_mark ) ret_val = [] if node.comment: seqtyp._yaml_add_comment(node.comment[:2]) if len(node.comment) > 2: seqtyp.yaml_end_comment_extend(node.comment[2], clear=True) if node.anchor: from ruamel.yaml.serializer import templated_id if not templated_id(node.anchor): seqtyp.yaml_set_anchor(node.anchor) for idx, child in enumerate(node.value): ret_val.append(self.construct_object(child, deep=deep)) if child.comment: seqtyp._yaml_add_comment(child.comment, key=idx) seqtyp._yaml_set_idx_line_col( idx, [child.start_mark.line, child.start_mark.column] ) return ret_val def flatten_mapping(self, node): # type: (Any) -> Any """ This implements the merge key feature http://yaml.org/type/merge.html by inserting keys from the merge dict/list of dicts if not yet available in this node """ def constructed(value_node): # type: (Any) -> Any # If the contents of a merge are defined within the # merge marker, then they won't have been constructed # yet. But if they were already constructed, we need to use # the existing object. if value_node in self.constructed_objects: value = self.constructed_objects[value_node] else: value = self.construct_object(value_node, deep=False) return value # merge = [] merge_map_list = [] index = 0 while index < len(node.value): key_node, value_node = node.value[index] if key_node.tag == u'tag:yaml.org,2002:merge': del node.value[index] if isinstance(value_node, MappingNode): merge_map_list.append((index, constructed(value_node))) # self.flatten_mapping(value_node) # merge.extend(value_node.value) elif isinstance(value_node, SequenceNode): # submerge = [] for subnode in value_node.value: if not isinstance(subnode, MappingNode): raise ConstructorError( 'while constructing a mapping', node.start_mark, 'expected a mapping for merging, but found %s' % subnode.id, subnode.start_mark, ) merge_map_list.append((index, constructed(subnode))) # self.flatten_mapping(subnode) # submerge.append(subnode.value) # submerge.reverse() # for value in submerge: # merge.extend(value) else: raise ConstructorError( 'while constructing a mapping', node.start_mark, 'expected a mapping or list of mappings for merging, ' 'but found %s' % value_node.id, value_node.start_mark, ) elif key_node.tag == u'tag:yaml.org,2002:value': key_node.tag = u'tag:yaml.org,2002:str' index += 1 else: index += 1 return merge_map_list # if merge: # node.value = merge + node.value def _sentinel(self): # type: () -> None pass def construct_mapping(self, node, maptyp=None, deep=False): # type: ignore # type: (Any, Any, bool) -> Any if not isinstance(node, MappingNode): raise ConstructorError( None, None, 'expected a mapping node, but found %s' % node.id, node.start_mark ) merge_map = self.flatten_mapping(node) # mapping = {} if node.comment: maptyp._yaml_add_comment(node.comment[:2]) if len(node.comment) > 2: maptyp.yaml_end_comment_extend(node.comment[2], clear=True) if node.anchor: from ruamel.yaml.serializer import templated_id if not templated_id(node.anchor): maptyp.yaml_set_anchor(node.anchor) last_key, last_value = None, self._sentinel for key_node, value_node in node.value: # keys can be list -> deep key = self.construct_object(key_node, deep=True) # lists are not hashable, but tuples are if not isinstance(key, Hashable): if isinstance(key, list): key_a = CommentedKeySeq(key) if key_node.flow_style is True: key_a.fa.set_flow_style() elif key_node.flow_style is False: key_a.fa.set_block_style() key = key_a # key = tuple(key) if PY2: try: hash(key) except TypeError as exc: raise ConstructorError( 'while constructing a mapping', node.start_mark, 'found unacceptable key (%s)' % exc, key_node.start_mark, ) else: if not isinstance(key, Hashable): raise ConstructorError( 'while constructing a mapping', node.start_mark, 'found unhashable key', key_node.start_mark, ) value = self.construct_object(value_node, deep=deep) self.check_mapping_key(node, key_node, maptyp, key, value) if key_node.comment and len(key_node.comment) > 4 and key_node.comment[4]: if last_value is None: key_node.comment[0] = key_node.comment.pop(4) maptyp._yaml_add_comment(key_node.comment, value=last_key) else: key_node.comment[2] = key_node.comment.pop(4) maptyp._yaml_add_comment(key_node.comment, key=key) key_node.comment = None if key_node.comment: maptyp._yaml_add_comment(key_node.comment, key=key) if value_node.comment: maptyp._yaml_add_comment(value_node.comment, value=key) maptyp._yaml_set_kv_line_col( key, [ key_node.start_mark.line, key_node.start_mark.column, value_node.start_mark.line, value_node.start_mark.column, ], ) maptyp[key] = value last_key, last_value = key, value # could use indexing # do this last, or <<: before a key will prevent insertion in instances # of collections.OrderedDict (as they have no __contains__ if merge_map: maptyp.add_yaml_merge(merge_map) def construct_setting(self, node, typ, deep=False): # type: (Any, Any, bool) -> Any if not isinstance(node, MappingNode): raise ConstructorError( None, None, 'expected a mapping node, but found %s' % node.id, node.start_mark ) if node.comment: typ._yaml_add_comment(node.comment[:2]) if len(node.comment) > 2: typ.yaml_end_comment_extend(node.comment[2], clear=True) if node.anchor: from ruamel.yaml.serializer import templated_id if not templated_id(node.anchor): typ.yaml_set_anchor(node.anchor) for key_node, value_node in node.value: # keys can be list -> deep key = self.construct_object(key_node, deep=True) # lists are not hashable, but tuples are if not isinstance(key, Hashable): if isinstance(key, list): key = tuple(key) if PY2: try: hash(key) except TypeError as exc: raise ConstructorError( 'while constructing a mapping', node.start_mark, 'found unacceptable key (%s)' % exc, key_node.start_mark, ) else: if not isinstance(key, Hashable): raise ConstructorError( 'while constructing a mapping', node.start_mark, 'found unhashable key', key_node.start_mark, ) # construct but should be null value = self.construct_object(value_node, deep=deep) # NOQA self.check_mapping_key(node, key_node, typ, key, value) if key_node.comment: typ._yaml_add_comment(key_node.comment, key=key) if value_node.comment: typ._yaml_add_comment(value_node.comment, value=key) typ.add(key) def construct_yaml_seq(self, node): # type: (Any) -> Any data = CommentedSeq() data._yaml_set_line_col(node.start_mark.line, node.start_mark.column) if node.flow_style is True: data.fa.set_flow_style() elif node.flow_style is False: data.fa.set_block_style() if node.comment: data._yaml_add_comment(node.comment) yield data data.extend(self.construct_rt_sequence(node, data)) def construct_yaml_map(self, node): # type: (Any) -> Any data = CommentedMap() data._yaml_set_line_col(node.start_mark.line, node.start_mark.column) if node.flow_style is True: data.fa.set_flow_style() elif node.flow_style is False: data.fa.set_block_style() yield data self.construct_mapping(node, data) def construct_yaml_object(self, node, cls): # type: (Any, Any) -> Any data = cls.__new__(cls) yield data if hasattr(data, '__setstate__'): state = SafeConstructor.construct_mapping(self, node, deep=True) data.__setstate__(state) else: state = SafeConstructor.construct_mapping(self, node) data.__dict__.update(state) def construct_yaml_omap(self, node): # type: (Any) -> Any # Note: we do now check for duplicate keys omap = CommentedOrderedMap() omap._yaml_set_line_col(node.start_mark.line, node.start_mark.column) if node.flow_style is True: omap.fa.set_flow_style() elif node.flow_style is False: omap.fa.set_block_style() yield omap if node.comment: omap._yaml_add_comment(node.comment[:2]) if len(node.comment) > 2: omap.yaml_end_comment_extend(node.comment[2], clear=True) if not isinstance(node, SequenceNode): raise ConstructorError( 'while constructing an ordered map', node.start_mark, 'expected a sequence, but found %s' % node.id, node.start_mark, ) for subnode in node.value: if not isinstance(subnode, MappingNode): raise ConstructorError( 'while constructing an ordered map', node.start_mark, 'expected a mapping of length 1, but found %s' % subnode.id, subnode.start_mark, ) if len(subnode.value) != 1: raise ConstructorError( 'while constructing an ordered map', node.start_mark, 'expected a single mapping item, but found %d items' % len(subnode.value), subnode.start_mark, ) key_node, value_node = subnode.value[0] key = self.construct_object(key_node) assert key not in omap value = self.construct_object(value_node) if key_node.comment: omap._yaml_add_comment(key_node.comment, key=key) if subnode.comment: omap._yaml_add_comment(subnode.comment, key=key) if value_node.comment: omap._yaml_add_comment(value_node.comment, value=key) omap[key] = value def construct_yaml_set(self, node): # type: (Any) -> Any data = CommentedSet() data._yaml_set_line_col(node.start_mark.line, node.start_mark.column) yield data self.construct_setting(node, data) def construct_undefined(self, node): # type: (Any) -> Any try: if isinstance(node, MappingNode): data = CommentedMap() data._yaml_set_line_col(node.start_mark.line, node.start_mark.column) if node.flow_style is True: data.fa.set_flow_style() elif node.flow_style is False: data.fa.set_block_style() data.yaml_set_tag(node.tag) yield data self.construct_mapping(node, data) return elif isinstance(node, ScalarNode): data2 = TaggedScalar() data2.value = self.construct_scalar(node) data2.style = node.style data2.yaml_set_tag(node.tag) yield data2 return except: # NOQA pass raise ConstructorError( None, None, 'could not determine a constructor for the tag %r' % utf8(node.tag), node.start_mark, ) def construct_yaml_timestamp(self, node, values=None): # type: (Any, Any) -> Any try: match = self.timestamp_regexp.match(node.value) except TypeError: match = None if match is None: raise ConstructorError( None, None, 'failed to construct timestamp from "{}"'.format(node.value), node.start_mark, ) values = match.groupdict() if not values['hour']: return SafeConstructor.construct_yaml_timestamp(self, node, values) for part in ['t', 'tz_sign', 'tz_hour', 'tz_minute']: if values[part]: break else: return SafeConstructor.construct_yaml_timestamp(self, node, values) year = int(values['year']) month = int(values['month']) day = int(values['day']) hour = int(values['hour']) minute = int(values['minute']) second = int(values['second']) fraction = 0 if values['fraction']: fraction_s = values['fraction'][:6] while len(fraction_s) < 6: fraction_s += '0' fraction = int(fraction_s) if len(values['fraction']) > 6 and int(values['fraction'][6]) > 4: fraction += 1 delta = None if values['tz_sign']: tz_hour = int(values['tz_hour']) minutes = values['tz_minute'] tz_minute = int(minutes) if minutes else 0 delta = datetime.timedelta(hours=tz_hour, minutes=tz_minute) if values['tz_sign'] == '-': delta = -delta if delta: dt = datetime.datetime(year, month, day, hour, minute) dt -= delta data = TimeStamp(dt.year, dt.month, dt.day, dt.hour, dt.minute, second, fraction) data._yaml['delta'] = delta tz = values['tz_sign'] + values['tz_hour'] if values['tz_minute']: tz += ':' + values['tz_minute'] data._yaml['tz'] = tz else: data = TimeStamp(year, month, day, hour, minute, second, fraction) if values['tz']: # no delta data._yaml['tz'] = values['tz'] if values['t']: data._yaml['t'] = True return data RoundTripConstructor.add_constructor( u'tag:yaml.org,2002:null', RoundTripConstructor.construct_yaml_null ) RoundTripConstructor.add_constructor( u'tag:yaml.org,2002:bool', RoundTripConstructor.construct_yaml_bool ) RoundTripConstructor.add_constructor( u'tag:yaml.org,2002:int', RoundTripConstructor.construct_yaml_int ) RoundTripConstructor.add_constructor( u'tag:yaml.org,2002:float', RoundTripConstructor.construct_yaml_float ) RoundTripConstructor.add_constructor( u'tag:yaml.org,2002:binary', RoundTripConstructor.construct_yaml_binary ) RoundTripConstructor.add_constructor( u'tag:yaml.org,2002:timestamp', RoundTripConstructor.construct_yaml_timestamp ) RoundTripConstructor.add_constructor( u'tag:yaml.org,2002:omap', RoundTripConstructor.construct_yaml_omap ) RoundTripConstructor.add_constructor( u'tag:yaml.org,2002:pairs', RoundTripConstructor.construct_yaml_pairs ) RoundTripConstructor.add_constructor( u'tag:yaml.org,2002:set', RoundTripConstructor.construct_yaml_set ) RoundTripConstructor.add_constructor( u'tag:yaml.org,2002:str', RoundTripConstructor.construct_yaml_str ) RoundTripConstructor.add_constructor( u'tag:yaml.org,2002:seq', RoundTripConstructor.construct_yaml_seq ) RoundTripConstructor.add_constructor( u'tag:yaml.org,2002:map', RoundTripConstructor.construct_yaml_map ) RoundTripConstructor.add_constructor(None, RoundTripConstructor.construct_undefined)