diff options
Diffstat (limited to 'constructor.py')
-rw-r--r-- | constructor.py | 463 |
1 files changed, 171 insertions, 292 deletions
diff --git a/constructor.py b/constructor.py index a67ca55..dc7e5ed 100644 --- a/constructor.py +++ b/constructor.py @@ -13,7 +13,7 @@ from ruamel.yaml.error import (MarkedYAMLError, MarkedYAMLFutureWarning, MantissaNoDotYAML1_1Warning) from ruamel.yaml.nodes import * # NOQA from ruamel.yaml.nodes import (SequenceNode, MappingNode, ScalarNode) -from ruamel.yaml.compat import (_F, builtins_module, # NOQA +from ruamel.yaml.compat import (builtins_module, # NOQA nprint, nprintf, version_tnf) from ruamel.yaml.compat import ordereddict @@ -33,8 +33,7 @@ from ruamel.yaml.scalarbool import ScalarBoolean from ruamel.yaml.timestamp import TimeStamp from ruamel.yaml.util import timestamp_regexp, create_timestamp -if False: # MYPY - from typing import Any, Dict, List, Set, Generator, Union, Optional # NOQA +from typing import Any, Dict, List, Set, Iterator, Union, Optional # NOQA __all__ = ['BaseConstructor', 'SafeConstructor', 'Constructor', @@ -59,70 +58,62 @@ class BaseConstructor: yaml_constructors = {} # type: Dict[Any, Any] yaml_multi_constructors = {} # type: Dict[Any, Any] - def __init__(self, preserve_quotes=None, loader=None): - # type: (Optional[bool], Any) -> None + def __init__(self, preserve_quotes: Optional[bool] = None, loader: Any = None) -> None: self.loader = loader if self.loader is not None and getattr(self.loader, '_constructor', None) is None: self.loader._constructor = self self.loader = loader self.yaml_base_dict_type = dict self.yaml_base_list_type = list - self.constructed_objects = {} # type: Dict[Any, Any] - self.recursive_objects = {} # type: Dict[Any, Any] - self.state_generators = [] # type: List[Any] + self.constructed_objects: Dict[Any, Any] = {} + self.recursive_objects: Dict[Any, Any] = {} + self.state_generators: List[Any] = [] self.deep_construct = False self._preserve_quotes = preserve_quotes self.allow_duplicate_keys = version_tnf((0, 15, 1), (0, 16)) @property - def composer(self): - # type: () -> Any + def composer(self) -> Any: if hasattr(self.loader, 'typ'): return self.loader.composer try: return self.loader._composer except AttributeError: - sys.stdout.write('slt {}\n'.format(type(self))) - sys.stdout.write('slc {}\n'.format(self.loader._composer)) - sys.stdout.write('{}\n'.format(dir(self))) + sys.stdout.write(f'slt {type(self)}\n') + sys.stdout.write(f'slc {self.loader._composer}\n') + sys.stdout.write(f'{dir(self)}\n') raise @property - def resolver(self): - # type: () -> Any + def resolver(self) -> Any: if hasattr(self.loader, 'typ'): return self.loader.resolver return self.loader._resolver @property - def scanner(self): - # type: () -> Any + def scanner(self) -> Any: # needed to get to the expanded comments if hasattr(self.loader, 'typ'): return self.loader.scanner return self.loader._scanner - def check_data(self): - # type: () -> Any + def check_data(self) -> Any: # If there are more documents available? return self.composer.check_node() - def get_data(self): - # type: () -> Any + def get_data(self) -> Any: # Construct and return the next document. if self.composer.check_node(): return self.construct_document(self.composer.get_node()) - def get_single_data(self): - # type: () -> Any + def get_single_data(self) -> Any: # Ensure that the stream contains a single document and construct it. node = self.composer.get_single_node() if node is not None: return self.construct_document(node) return None - def construct_document(self, node): - # type: (Any) -> Any + def construct_document(self, node: Any) -> Any: data = self.construct_object(node) while bool(self.state_generators): state_generators = self.state_generators @@ -135,8 +126,7 @@ class BaseConstructor: self.deep_construct = False return data - def construct_object(self, node, deep=False): - # type: (Any, bool) -> Any + def construct_object(self, node: Any, deep: bool = False) -> Any: """deep is True when creating an object/mapping recursively, in that case want the underlying elements available during construction """ @@ -159,9 +149,8 @@ class BaseConstructor: self.deep_construct = old_deep return data - def construct_non_recursive_object(self, node, tag=None): - # type: (Any, Optional[str]) -> Any - constructor = None # type: Any + def construct_non_recursive_object(self, node: Any, tag: Optional[str] = None) -> Any: + constructor: Any = None tag_suffix = None if tag is None: tag = node.tag @@ -199,19 +188,14 @@ class BaseConstructor: self.state_generators.append(generator) return data - def construct_scalar(self, node): - # type: (Any) -> Any + def construct_scalar(self, node: Any) -> Any: if not isinstance(node, ScalarNode): raise ConstructorError( - None, - None, - _F('expected a scalar node, but found {node_id!s}', node_id=node.id), - node.start_mark, + None, None, f'expected a scalar node, but found {node.id!s}', node.start_mark, ) return node.value - def construct_sequence(self, node, deep=False): - # type: (Any, bool) -> Any + def construct_sequence(self, node: Any, deep: bool = False) -> Any: """deep is True when creating an object/mapping recursively, in that case want the underlying elements available during construction """ @@ -219,22 +203,18 @@ class BaseConstructor: raise ConstructorError( None, None, - _F('expected a sequence node, but found {node_id!s}', node_id=node.id), + f'expected a sequence node, but found {node.id!s}', node.start_mark, ) return [self.construct_object(child, deep=deep) for child in node.value] - def construct_mapping(self, node, deep=False): - # type: (Any, bool) -> Any + def construct_mapping(self, node: Any, deep: bool = False) -> Any: """deep is True when creating an object/mapping recursively, in that case want the underlying elements available during construction """ if not isinstance(node, MappingNode): raise ConstructorError( - None, - None, - _F('expected a mapping node, but found {node_id!s}', node_id=node.id), - node.start_mark, + None, None, f'expected a mapping node, but found {node.id!s}', node.start_mark, ) total_mapping = self.yaml_base_dict_type() if getattr(node, 'merge', None) is not None: @@ -242,7 +222,7 @@ class BaseConstructor: else: todo = [(node.value, True)] for values, check in todo: - mapping = self.yaml_base_dict_type() # type: Dict[Any, Any] + mapping: Dict[Any, Any] = self.yaml_base_dict_type() for key_node, value_node in values: # keys can be list -> deep key = self.construct_object(key_node, deep=True) @@ -267,8 +247,9 @@ class BaseConstructor: total_mapping.update(mapping) return total_mapping - def check_mapping_key(self, node, key_node, mapping, key, value): - # type: (Any, Any, Any, Any, Any) -> bool + def check_mapping_key( + self, node: Any, key_node: Any, mapping: Any, key: Any, value: Any + ) -> bool: """return True if key is unique""" if key in mapping: if not self.allow_duplicate_keys: @@ -276,8 +257,8 @@ class BaseConstructor: args = [ 'while constructing a mapping', node.start_mark, - 'found duplicate key "{}" with value "{}" ' - '(original value: "{}")'.format(key, value, mk), + f'found duplicate key "{key}" with value "{value}" ' + f'(original value: "{mk}")', key_node.start_mark, """ To suppress this check see: @@ -289,20 +270,19 @@ class BaseConstructor: """, ] if self.allow_duplicate_keys is None: - warnings.warn(DuplicateKeyFutureWarning(*args)) + warnings.warn(DuplicateKeyFutureWarning(*args), stacklevel=1) else: raise DuplicateKeyError(*args) return False return True - def check_set_key(self, node, key_node, setting, key): - # type: (Any, Any, Any, Any, Any) -> None + def check_set_key(self: Any, node: Any, key_node: Any, setting: Any, key: Any) -> None: if key in setting: if not self.allow_duplicate_keys: args = [ 'while constructing a set', node.start_mark, - 'found duplicate key "{}"'.format(key), + f'found duplicate key "{key}"', key_node.start_mark, """ To suppress this check see: @@ -314,18 +294,14 @@ class BaseConstructor: """, ] if self.allow_duplicate_keys is None: - warnings.warn(DuplicateKeyFutureWarning(*args)) + warnings.warn(DuplicateKeyFutureWarning(*args), stacklevel=1) else: raise DuplicateKeyError(*args) - def construct_pairs(self, node, deep=False): - # type: (Any, bool) -> Any + def construct_pairs(self, node: Any, deep: bool = False) -> Any: if not isinstance(node, MappingNode): raise ConstructorError( - None, - None, - _F('expected a mapping node, but found {node_id!s}', node_id=node.id), - node.start_mark, + None, None, f'expected a mapping node, but found {node.id!s}', node.start_mark, ) pairs = [] for key_node, value_node in node.value: @@ -335,37 +311,33 @@ class BaseConstructor: return pairs @classmethod - def add_constructor(cls, tag, constructor): - # type: (Any, Any) -> None + def add_constructor(cls, tag: Any, constructor: Any) -> None: if 'yaml_constructors' not in cls.__dict__: cls.yaml_constructors = cls.yaml_constructors.copy() cls.yaml_constructors[tag] = constructor @classmethod - def add_multi_constructor(cls, tag_prefix, multi_constructor): - # type: (Any, Any) -> None + def add_multi_constructor(cls, tag_prefix: Any, multi_constructor: Any) -> None: if 'yaml_multi_constructors' not in cls.__dict__: cls.yaml_multi_constructors = cls.yaml_multi_constructors.copy() cls.yaml_multi_constructors[tag_prefix] = multi_constructor class SafeConstructor(BaseConstructor): - def construct_scalar(self, node): - # type: (Any) -> Any + def construct_scalar(self, node: Any) -> Any: if isinstance(node, MappingNode): for key_node, value_node in node.value: if key_node.tag == 'tag:yaml.org,2002:value': return self.construct_scalar(value_node) return BaseConstructor.construct_scalar(self, node) - def flatten_mapping(self, node): - # type: (Any) -> Any + def flatten_mapping(self, node: Any) -> Any: """ This implements the merge key feature http://yaml.org/type/merge.html by inserting keys from the merge dict/list of dicts if not yet available in this node """ - merge = [] # type: List[Any] + merge: List[Any] = [] index = 0 while index < len(node.value): key_node, value_node = node.value[index] @@ -378,7 +350,7 @@ class SafeConstructor(BaseConstructor): args = [ 'while constructing a mapping', node.start_mark, - 'found duplicate key "{}"'.format(key_node.value), + f'found duplicate key "{key_node.value}"', key_node.start_mark, """ To suppress this check see: @@ -390,7 +362,7 @@ class SafeConstructor(BaseConstructor): """, ] if self.allow_duplicate_keys is None: - warnings.warn(DuplicateKeyFutureWarning(*args)) + warnings.warn(DuplicateKeyFutureWarning(*args), stacklevel=1) else: raise DuplicateKeyError(*args) del node.value[index] @@ -404,10 +376,7 @@ class SafeConstructor(BaseConstructor): raise ConstructorError( 'while constructing a mapping', node.start_mark, - _F( - 'expected a mapping for merging, but found {subnode_id!s}', - subnode_id=subnode.id, - ), + f'expected a mapping for merging, but found {subnode.id!s}', subnode.start_mark, ) self.flatten_mapping(subnode) @@ -419,11 +388,8 @@ class SafeConstructor(BaseConstructor): raise ConstructorError( 'while constructing a mapping', node.start_mark, - _F( - 'expected a mapping or list of mappings for merging, ' - 'but found {value_node_id!s}', - value_node_id=value_node.id, - ), + 'expected a mapping or list of mappings for merging, ' + f'but found {value_node.id!s}', value_node.start_mark, ) elif key_node.tag == 'tag:yaml.org,2002:value': @@ -435,8 +401,7 @@ class SafeConstructor(BaseConstructor): node.merge = merge # separate merge keys to be able to update without duplicate node.value = merge + node.value - def construct_mapping(self, node, deep=False): - # type: (Any, bool) -> Any + def construct_mapping(self, node: Any, deep: bool = False) -> Any: """deep is True when creating an object/mapping recursively, in that case want the underlying elements available during construction """ @@ -444,8 +409,7 @@ class SafeConstructor(BaseConstructor): self.flatten_mapping(node) return BaseConstructor.construct_mapping(self, node, deep=deep) - def construct_yaml_null(self, node): - # type: (Any) -> Any + def construct_yaml_null(self, node: Any) -> Any: self.construct_scalar(node) return None @@ -461,13 +425,11 @@ class SafeConstructor(BaseConstructor): 'off': False, } - def construct_yaml_bool(self, node): - # type: (Any) -> bool + def construct_yaml_bool(self, node: Any) -> bool: value = self.construct_scalar(node) return self.bool_values[value.lower()] - def construct_yaml_int(self, node): - # type: (Any) -> int + def construct_yaml_int(self, node: Any) -> int: value_s = self.construct_scalar(node) value_s = value_s.replace('_', "") sign = +1 @@ -502,8 +464,7 @@ class SafeConstructor(BaseConstructor): inf_value *= inf_value nan_value = -inf_value / inf_value # Trying to make a quiet NaN (like C99). - def construct_yaml_float(self, node): - # type: (Any) -> float + def construct_yaml_float(self, node: Any) -> float: value_so = self.construct_scalar(node) value_s = value_so.replace('_', "").lower() sign = +1 @@ -529,34 +490,29 @@ class SafeConstructor(BaseConstructor): # value_s is lower case independent of input mantissa, exponent = value_s.split('e') if '.' not in mantissa: - warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so)) + warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so), stacklevel=1) return sign * float(value_s) - def construct_yaml_binary(self, node): - # type: (Any) -> Any + def construct_yaml_binary(self, node: Any) -> Any: try: value = self.construct_scalar(node).encode('ascii') except UnicodeEncodeError as exc: raise ConstructorError( None, None, - _F('failed to convert base64 data into ascii: {exc!s}', exc=exc), + f'failed to convert base64 data into ascii: {exc!s}', node.start_mark, ) try: return base64.decodebytes(value) except binascii.Error as exc: raise ConstructorError( - None, - None, - _F('failed to decode base64 data: {exc!s}', exc=exc), - node.start_mark, + None, None, f'failed to decode base64 data: {exc!s}', node.start_mark, ) timestamp_regexp = timestamp_regexp # moved to util 0.17.17 - def construct_yaml_timestamp(self, node, values=None): - # type: (Any, Any) -> Any + def construct_yaml_timestamp(self, node: Any, values: Any = None) -> Any: if values is None: try: match = self.timestamp_regexp.match(node.value) @@ -566,14 +522,13 @@ class SafeConstructor(BaseConstructor): raise ConstructorError( None, None, - 'failed to construct timestamp from "{}"'.format(node.value), + f'failed to construct timestamp from "{node.value}"', node.start_mark, ) values = match.groupdict() return create_timestamp(**values) - def construct_yaml_omap(self, node): - # type: (Any) -> Any + def construct_yaml_omap(self, node: Any) -> Any: # Note: we do now check for duplicate keys omap = ordereddict() yield omap @@ -581,7 +536,7 @@ class SafeConstructor(BaseConstructor): raise ConstructorError( 'while constructing an ordered map', node.start_mark, - _F('expected a sequence, but found {node_id!s}', node_id=node.id), + f'expected a sequence, but found {node.id!s}', node.start_mark, ) for subnode in node.value: @@ -589,20 +544,14 @@ class SafeConstructor(BaseConstructor): raise ConstructorError( 'while constructing an ordered map', node.start_mark, - _F( - 'expected a mapping of length 1, but found {subnode_id!s}', - subnode_id=subnode.id, - ), + f'expected a mapping of length 1, but found {subnode.id!s}', subnode.start_mark, ) if len(subnode.value) != 1: raise ConstructorError( 'while constructing an ordered map', node.start_mark, - _F( - 'expected a single mapping item, but found {len_subnode_val:d} items', - len_subnode_val=len(subnode.value), - ), + f'expected a single mapping item, but found {len(subnode.value):d} items', subnode.start_mark, ) key_node, value_node = subnode.value[0] @@ -611,16 +560,15 @@ class SafeConstructor(BaseConstructor): value = self.construct_object(value_node) omap[key] = value - def construct_yaml_pairs(self, node): - # type: (Any) -> Any + def construct_yaml_pairs(self, node: Any) -> Any: # Note: the same code as `construct_yaml_omap`. - pairs = [] # type: List[Any] + pairs: List[Any] = [] yield pairs if not isinstance(node, SequenceNode): raise ConstructorError( 'while constructing pairs', node.start_mark, - _F('expected a sequence, but found {node_id!s}', node_id=node.id), + f'expected a sequence, but found {node.id!s}', node.start_mark, ) for subnode in node.value: @@ -628,20 +576,14 @@ class SafeConstructor(BaseConstructor): raise ConstructorError( 'while constructing pairs', node.start_mark, - _F( - 'expected a mapping of length 1, but found {subnode_id!s}', - subnode_id=subnode.id, - ), + f'expected a mapping of length 1, but found {subnode.id!s}', subnode.start_mark, ) if len(subnode.value) != 1: raise ConstructorError( 'while constructing pairs', node.start_mark, - _F( - 'expected a single mapping item, but found {len_subnode_val:d} items', - len_subnode_val=len(subnode.value), - ), + f'expected a single mapping item, but found {len(subnode.value):d} items', subnode.start_mark, ) key_node, value_node = subnode.value[0] @@ -649,33 +591,28 @@ class SafeConstructor(BaseConstructor): value = self.construct_object(value_node) pairs.append((key, value)) - def construct_yaml_set(self, node): - # type: (Any) -> Any - data = set() # type: Set[Any] + def construct_yaml_set(self, node: Any) -> Any: + data: Set[Any] = set() yield data value = self.construct_mapping(node) data.update(value) - def construct_yaml_str(self, node): - # type: (Any) -> Any + def construct_yaml_str(self, node: Any) -> Any: value = self.construct_scalar(node) return value - def construct_yaml_seq(self, node): - # type: (Any) -> Any - data = self.yaml_base_list_type() # type: List[Any] + def construct_yaml_seq(self, node: Any) -> Any: + data: List[Any] = self.yaml_base_list_type() yield data data.extend(self.construct_sequence(node)) - def construct_yaml_map(self, node): - # type: (Any) -> Any - data = self.yaml_base_dict_type() # type: Dict[Any, Any] + def construct_yaml_map(self, node: Any) -> Any: + data: Dict[Any, Any] = self.yaml_base_dict_type() yield data value = self.construct_mapping(node) data.update(value) - def construct_yaml_object(self, node, cls): - # type: (Any, Any) -> Any + def construct_yaml_object(self, node: Any, cls: Any) -> Any: data = cls.__new__(cls) yield data if hasattr(data, '__setstate__'): @@ -685,14 +622,11 @@ class SafeConstructor(BaseConstructor): state = self.construct_mapping(node) data.__dict__.update(state) - def construct_undefined(self, node): - # type: (Any) -> None + def construct_undefined(self, node: Any) -> None: raise ConstructorError( None, None, - _F( - 'could not determine a constructor for the tag {node_tag!r}', node_tag=node.tag - ), + f'could not determine a constructor for the tag {node.tag!r}', node.start_mark, ) @@ -733,50 +667,40 @@ SafeConstructor.add_constructor(None, SafeConstructor.construct_undefined) class Constructor(SafeConstructor): - def construct_python_str(self, node): - # type: (Any) -> Any + def construct_python_str(self, node: Any) -> Any: return self.construct_scalar(node) - def construct_python_unicode(self, node): - # type: (Any) -> Any + def construct_python_unicode(self, node: Any) -> Any: return self.construct_scalar(node) - def construct_python_bytes(self, node): - # type: (Any) -> Any + def construct_python_bytes(self, node: Any) -> Any: try: value = self.construct_scalar(node).encode('ascii') except UnicodeEncodeError as exc: raise ConstructorError( None, None, - _F('failed to convert base64 data into ascii: {exc!s}', exc=exc), + f'failed to convert base64 data into ascii: {exc!s}', node.start_mark, ) try: return base64.decodebytes(value) except binascii.Error as exc: raise ConstructorError( - None, - None, - _F('failed to decode base64 data: {exc!s}', exc=exc), - node.start_mark, + None, None, f'failed to decode base64 data: {exc!s}', node.start_mark, ) - def construct_python_long(self, node): - # type: (Any) -> int + def construct_python_long(self, node: Any) -> int: val = self.construct_yaml_int(node) return val - def construct_python_complex(self, node): - # type: (Any) -> Any + def construct_python_complex(self, node: Any) -> Any: return complex(self.construct_scalar(node)) - def construct_python_tuple(self, node): - # type: (Any) -> Any + def construct_python_tuple(self, node: Any) -> Any: return tuple(self.construct_sequence(node)) - def find_python_module(self, name, mark): - # type: (Any, Any) -> Any + def find_python_module(self, name: Any, mark: Any) -> Any: if not name: raise ConstructorError( 'while constructing a Python module', @@ -790,13 +714,12 @@ class Constructor(SafeConstructor): raise ConstructorError( 'while constructing a Python module', mark, - _F('cannot find module {name!r} ({exc!s})', name=name, exc=exc), + f'cannot find module {name!r} ({exc!s})', mark, ) return sys.modules[name] - def find_python_name(self, name, mark): - # type: (Any, Any) -> Any + def find_python_name(self, name: Any, mark: Any) -> Any: if not name: raise ConstructorError( 'while constructing a Python object', @@ -807,7 +730,7 @@ class Constructor(SafeConstructor): if '.' in name: lname = name.split('.') lmodule_name = lname - lobject_name = [] # type: List[Any] + lobject_name: List[Any] = [] while len(lmodule_name) > 1: lobject_name.insert(0, lmodule_name.pop()) module_name = '.'.join(lmodule_name) @@ -826,11 +749,7 @@ class Constructor(SafeConstructor): raise ConstructorError( 'while constructing a Python object', mark, - _F( - 'cannot find module {module_name!r} ({exc!s})', - module_name=module_name, - exc=exc, - ), + f'cannot find module {module_name!r} ({exc!s})', mark, ) module = sys.modules[module_name] @@ -842,42 +761,37 @@ class Constructor(SafeConstructor): raise ConstructorError( 'while constructing a Python object', mark, - _F( - 'cannot find {object_name!r} in the module {module_name!r}', - object_name=object_name, - module_name=module.__name__, - ), + f'cannot find {object_name!r} in the module {module.__name__!r}', mark, ) obj = getattr(obj, lobject_name.pop(0)) return obj - def construct_python_name(self, suffix, node): - # type: (Any, Any) -> Any + def construct_python_name(self, suffix: Any, node: Any) -> Any: value = self.construct_scalar(node) if value: raise ConstructorError( 'while constructing a Python name', node.start_mark, - _F('expected the empty value, but found {value!r}', value=value), + f'expected the empty value, but found {value!r}', node.start_mark, ) return self.find_python_name(suffix, node.start_mark) - def construct_python_module(self, suffix, node): - # type: (Any, Any) -> Any + def construct_python_module(self, suffix: Any, node: Any) -> Any: value = self.construct_scalar(node) if value: raise ConstructorError( 'while constructing a Python module', node.start_mark, - _F('expected the empty value, but found {value!r}', value=value), + f'expected the empty value, but found {value!r}', node.start_mark, ) return self.find_python_module(suffix, node.start_mark) - def make_python_instance(self, suffix, node, args=None, kwds=None, newobj=False): - # type: (Any, Any, Any, Any, bool) -> Any + def make_python_instance( + self, suffix: Any, node: Any, args: Any = None, kwds: Any = None, newobj: bool = False + ) -> Any: if not args: args = [] if not kwds: @@ -888,12 +802,11 @@ class Constructor(SafeConstructor): else: return cls(*args, **kwds) - def set_python_instance_state(self, instance, state): - # type: (Any, Any) -> None + def set_python_instance_state(self, instance: Any, state: Any) -> None: if hasattr(instance, '__setstate__'): instance.__setstate__(state) else: - slotstate = {} # type: Dict[Any, Any] + slotstate: Dict[Any, Any] = {} if isinstance(state, tuple) and len(state) == 2: state, slotstate = state if hasattr(instance, '__dict__'): @@ -903,8 +816,7 @@ class Constructor(SafeConstructor): for key, value in slotstate.items(): setattr(instance, key, value) - def construct_python_object(self, suffix, node): - # type: (Any, Any) -> Any + def construct_python_object(self, suffix: Any, node: Any) -> Any: # Format: # !!python/object:module.name { ... state ... } instance = self.make_python_instance(suffix, node, newobj=True) @@ -914,8 +826,9 @@ class Constructor(SafeConstructor): state = self.construct_mapping(node, deep=deep) self.set_python_instance_state(instance, state) - def construct_python_object_apply(self, suffix, node, newobj=False): - # type: (Any, Any, bool) -> Any + def construct_python_object_apply( + self, suffix: Any, node: Any, newobj: bool = False + ) -> Any: # Format: # !!python/object/apply # (or !!python/object/new) # args: [ ... arguments ... ] @@ -929,10 +842,10 @@ class Constructor(SafeConstructor): # is how an object is created, check make_python_instance for details. if isinstance(node, SequenceNode): args = self.construct_sequence(node, deep=True) - kwds = {} # type: Dict[Any, Any] - state = {} # type: Dict[Any, Any] - listitems = [] # type: List[Any] - dictitems = {} # type: Dict[Any, Any] + kwds: Dict[Any, Any] = {} + state: Dict[Any, Any] = {} + listitems: List[Any] = [] + dictitems: Dict[Any, Any] = {} else: value = self.construct_mapping(node, deep=True) args = value.get('args', []) @@ -950,8 +863,7 @@ class Constructor(SafeConstructor): instance[key] = dictitems[key] return instance - def construct_python_object_new(self, suffix, node): - # type: (Any, Any) -> Any + def construct_python_object_new(self, suffix: Any, node: Any) -> Any: return self.construct_python_object_apply(suffix, node, newobj=True) @@ -1013,15 +925,13 @@ class RoundTripConstructor(SafeConstructor): as well as on the items """ - def comment(self, idx): - # type: (Any) -> Any + def comment(self, idx: Any) -> Any: assert self.loader.comment_handling is not None x = self.scanner.comments[idx] x.set_assigned() return x - def comments(self, list_of_comments, idx=None): - # type: (Any, Optional[Any]) -> Any + def comments(self, list_of_comments: Any, idx: Optional[Any] = None) -> Any: # hand in the comment and optional pre, eol, post segment if list_of_comments is None: return [] @@ -1032,14 +942,10 @@ class RoundTripConstructor(SafeConstructor): for x in list_of_comments: yield self.comment(x) - def construct_scalar(self, node): - # type: (Any) -> Any + def construct_scalar(self, node: Any) -> Any: if not isinstance(node, ScalarNode): raise ConstructorError( - None, - None, - _F('expected a scalar node, but found {node_id!s}', node_id=node.id), - node.start_mark, + None, None, f'expected a scalar node, but found {node.id!s}', node.start_mark, ) if node.style == '|' and isinstance(node.value, str): @@ -1055,7 +961,7 @@ class RoundTripConstructor(SafeConstructor): lss.comment = self.comment(node.comment[1][0]) # type: ignore return lss if node.style == '>' and isinstance(node.value, str): - fold_positions = [] # type: List[int] + fold_positions: List[int] = [] idx = -1 while True: idx = node.value.find('\a', idx + 1) @@ -1084,13 +990,12 @@ class RoundTripConstructor(SafeConstructor): return PlainScalarString(node.value, anchor=node.anchor) return node.value - def construct_yaml_int(self, node): - # type: (Any) -> Any - width = None # type: Any + def construct_yaml_int(self, node: Any) -> Any: + width: Any = None value_su = self.construct_scalar(node) try: sx = value_su.rstrip('_') - underscore = [len(sx) - sx.rindex('_') - 1, False, False] # type: Any + underscore: Any = [len(sx) - sx.rindex('_') - 1, False, False] except ValueError: underscore = None except IndexError: @@ -1119,7 +1024,7 @@ class RoundTripConstructor(SafeConstructor): # default to lower-case if no a-fA-F in string if self.resolver.processing_version > (1, 1) and value_s[2] == '0': width = len(value_s[2:]) - hex_fun = HexInt # type: Any + hex_fun: Any = HexInt for ch in value_s[2:]: if ch in 'ABCDEF': # first non-digit is capital hex_fun = HexCapsInt @@ -1180,10 +1085,8 @@ class RoundTripConstructor(SafeConstructor): else: return sign * int(value_s) - def construct_yaml_float(self, node): - # type: (Any) -> Any - def leading_zeros(v): - # type: (Any) -> int + def construct_yaml_float(self, node: Any) -> Any: + def leading_zeros(v: Any) -> int: lead0 = 0 idx = 0 while idx < len(v) and v[idx] in '0.': @@ -1193,7 +1096,7 @@ class RoundTripConstructor(SafeConstructor): return lead0 # underscore = None - m_sign = False # type: Any + m_sign: Any = False value_so = self.construct_scalar(node) value_s = value_so.replace('_', "").lower() sign = +1 @@ -1225,7 +1128,7 @@ class RoundTripConstructor(SafeConstructor): if self.resolver.processing_version != (1, 2): # value_s is lower case independent of input if '.' not in mantissa: - warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so)) + warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so), stacklevel=1) lead0 = leading_zeros(mantissa) width = len(mantissa) prec = mantissa.find('.') @@ -1246,7 +1149,8 @@ class RoundTripConstructor(SafeConstructor): anchor=node.anchor, ) width = len(value_so) - prec = value_so.index('.') # you can use index, this would not be float without dot + # you can't use index, !!float 42 would be a float without a dot + prec = value_so.find('.') lead0 = leading_zeros(value_so) return ScalarFloat( sign * float(value_s), @@ -1257,20 +1161,18 @@ class RoundTripConstructor(SafeConstructor): anchor=node.anchor, ) - def construct_yaml_str(self, node): - # type: (Any) -> Any + def construct_yaml_str(self, node: Any) -> Any: value = self.construct_scalar(node) if isinstance(value, ScalarString): return value return value - def construct_rt_sequence(self, node, seqtyp, deep=False): - # type: (Any, Any, bool) -> Any + def construct_rt_sequence(self, node: Any, seqtyp: Any, deep: bool = False) -> Any: if not isinstance(node, SequenceNode): raise ConstructorError( None, None, - _F('expected a sequence node, but found {node_id!s}', node_id=node.id), + f'expected a sequence node, but found {node.id!s}', node.start_mark, ) ret_val = [] @@ -1301,16 +1203,14 @@ class RoundTripConstructor(SafeConstructor): ) return ret_val - def flatten_mapping(self, node): - # type: (Any) -> Any + def flatten_mapping(self, node: Any) -> Any: """ This implements the merge key feature http://yaml.org/type/merge.html by inserting keys from the merge dict/list of dicts if not yet available in this node """ - def constructed(value_node): - # type: (Any) -> Any + def constructed(value_node: Any) -> Any: # If the contents of a merge are defined within the # merge marker, then they won't have been constructed # yet. But if they were already constructed, we need to use @@ -1322,7 +1222,7 @@ class RoundTripConstructor(SafeConstructor): return value # merge = [] - merge_map_list = [] # type: List[Any] + merge_map_list: List[Any] = [] index = 0 while index < len(node.value): key_node, value_node = node.value[index] @@ -1335,7 +1235,7 @@ class RoundTripConstructor(SafeConstructor): args = [ 'while constructing a mapping', node.start_mark, - 'found duplicate key "{}"'.format(key_node.value), + f'found duplicate key "{key_node.value}"', key_node.start_mark, """ To suppress this check see: @@ -1347,7 +1247,7 @@ class RoundTripConstructor(SafeConstructor): """, ] if self.allow_duplicate_keys is None: - warnings.warn(DuplicateKeyFutureWarning(*args)) + warnings.warn(DuplicateKeyFutureWarning(*args), stacklevel=1) else: raise DuplicateKeyError(*args) del node.value[index] @@ -1362,10 +1262,7 @@ class RoundTripConstructor(SafeConstructor): raise ConstructorError( 'while constructing a mapping', node.start_mark, - _F( - 'expected a mapping for merging, but found {subnode_id!s}', - subnode_id=subnode.id, - ), + f'expected a mapping for merging, but found {subnode.id!s}', subnode.start_mark, ) merge_map_list.append((index, constructed(subnode))) @@ -1378,11 +1275,8 @@ class RoundTripConstructor(SafeConstructor): raise ConstructorError( 'while constructing a mapping', node.start_mark, - _F( - 'expected a mapping or list of mappings for merging, ' - 'but found {value_node_id!s}', - value_node_id=value_node.id, - ), + 'expected a mapping or list of mappings for merging, ' + f'but found {value_node.id!s}', value_node.start_mark, ) elif key_node.tag == 'tag:yaml.org,2002:value': @@ -1394,18 +1288,13 @@ class RoundTripConstructor(SafeConstructor): # if merge: # node.value = merge + node.value - def _sentinel(self): - # type: () -> None + def _sentinel(self) -> None: pass - def construct_mapping(self, node, maptyp, deep=False): # type: ignore - # type: (Any, Any, bool) -> Any + def construct_mapping(self, node: Any, maptyp: Any, deep: bool = False) -> Any: # type: ignore # NOQA if not isinstance(node, MappingNode): raise ConstructorError( - None, - None, - _F('expected a mapping node, but found {node_id!s}', node_id=node.id), - node.start_mark, + None, None, f'expected a mapping node, but found {node.id!s}', node.start_mark, ) merge_map = self.flatten_mapping(node) # mapping = {} @@ -1439,6 +1328,7 @@ class RoundTripConstructor(SafeConstructor): key_s.fa.set_flow_style() elif key_node.flow_style is False: key_s.fa.set_block_style() + key_s._yaml_set_line_col(key.lc.line, key.lc.col) # type: ignore key = key_s elif isinstance(key, MutableMapping): key_m = CommentedKeyMap(key) @@ -1446,6 +1336,7 @@ class RoundTripConstructor(SafeConstructor): key_m.fa.set_flow_style() elif key_node.flow_style is False: key_m.fa.set_block_style() + key_m._yaml_set_line_col(key.lc.line, key.lc.col) # type: ignore key = key_m if not isinstance(key, Hashable): raise ConstructorError( @@ -1503,14 +1394,10 @@ class RoundTripConstructor(SafeConstructor): if merge_map: maptyp.add_yaml_merge(merge_map) - def construct_setting(self, node, typ, deep=False): - # type: (Any, Any, bool) -> Any + def construct_setting(self, node: Any, typ: Any, deep: bool = False) -> Any: if not isinstance(node, MappingNode): raise ConstructorError( - None, - None, - _F('expected a mapping node, but found {node_id!s}', node_id=node.id), - node.start_mark, + None, None, f'expected a mapping node, but found {node.id!s}', node.start_mark, ) if self.loader and self.loader.comment_handling is None: if node.comment: @@ -1556,8 +1443,7 @@ class RoundTripConstructor(SafeConstructor): nprintf('nc7b', value_node.comment) typ.add(key) - def construct_yaml_seq(self, node): - # type: (Any) -> Any + def construct_yaml_seq(self, node: Any) -> Iterator[CommentedSeq]: data = CommentedSeq() data._yaml_set_line_col(node.start_mark.line, node.start_mark.column) # if node.comment: @@ -1566,16 +1452,14 @@ class RoundTripConstructor(SafeConstructor): data.extend(self.construct_rt_sequence(node, data)) self.set_collection_style(data, node) - def construct_yaml_map(self, node): - # type: (Any) -> Any + def construct_yaml_map(self, node: Any) -> Iterator[CommentedMap]: data = CommentedMap() data._yaml_set_line_col(node.start_mark.line, node.start_mark.column) yield data self.construct_mapping(node, data, deep=True) self.set_collection_style(data, node) - def set_collection_style(self, data, node): - # type: (Any, Any) -> None + def set_collection_style(self, data: Any, node: Any) -> None: if len(data) == 0: return if node.flow_style is True: @@ -1583,8 +1467,7 @@ class RoundTripConstructor(SafeConstructor): elif node.flow_style is False: data.fa.set_block_style() - def construct_yaml_object(self, node, cls): - # type: (Any, Any) -> Any + def construct_yaml_object(self, node: Any, cls: Any) -> Any: data = cls.__new__(cls) yield data if hasattr(data, '__setstate__'): @@ -1608,8 +1491,7 @@ class RoundTripConstructor(SafeConstructor): a = getattr(data, Anchor.attrib) a.value = node.anchor - def construct_yaml_omap(self, node): - # type: (Any) -> Any + def construct_yaml_omap(self, node: Any) -> Iterator[CommentedOrderedMap]: # Note: we do now check for duplicate keys omap = CommentedOrderedMap() omap._yaml_set_line_col(node.start_mark.line, node.start_mark.column) @@ -1631,7 +1513,7 @@ class RoundTripConstructor(SafeConstructor): raise ConstructorError( 'while constructing an ordered map', node.start_mark, - _F('expected a sequence, but found {node_id!s}', node_id=node.id), + f'expected a sequence, but found {node.id!s}', node.start_mark, ) for subnode in node.value: @@ -1639,20 +1521,14 @@ class RoundTripConstructor(SafeConstructor): raise ConstructorError( 'while constructing an ordered map', node.start_mark, - _F( - 'expected a mapping of length 1, but found {subnode_id!s}', - subnode_id=subnode.id, - ), + f'expected a mapping of length 1, but found {subnode.id!s}', subnode.start_mark, ) if len(subnode.value) != 1: raise ConstructorError( 'while constructing an ordered map', node.start_mark, - _F( - 'expected a single mapping item, but found {len_subnode_val:d} items', - len_subnode_val=len(subnode.value), - ), + f'expected a single mapping item, but found {len(subnode.value):d} items', subnode.start_mark, ) key_node, value_node = subnode.value[0] @@ -1676,15 +1552,15 @@ class RoundTripConstructor(SafeConstructor): nprintf('nc9c', value_node.comment) omap[key] = value - def construct_yaml_set(self, node): - # type: (Any) -> Any + def construct_yaml_set(self, node: Any) -> Iterator[CommentedSet]: data = CommentedSet() data._yaml_set_line_col(node.start_mark.line, node.start_mark.column) yield data self.construct_setting(node, data) - def construct_undefined(self, node): - # type: (Any) -> Any + def construct_unknown( + self, node: Any + ) -> Iterator[Union[CommentedMap, TaggedScalar, CommentedSeq]]: try: if isinstance(node, MappingNode): data = CommentedMap() @@ -1735,14 +1611,13 @@ class RoundTripConstructor(SafeConstructor): raise ConstructorError( None, None, - _F( - 'could not determine a constructor for the tag {node_tag!r}', node_tag=node.tag - ), + f'could not determine a constructor for the tag {node.tag!r}', node.start_mark, ) - def construct_yaml_timestamp(self, node, values=None): - # type: (Any, Any) -> Any + def construct_yaml_timestamp( + self, node: Any, values: Any = None + ) -> Union[datetime.date, datetime.datetime, TimeStamp]: try: match = self.timestamp_regexp.match(node.value) except TypeError: @@ -1751,7 +1626,7 @@ class RoundTripConstructor(SafeConstructor): raise ConstructorError( None, None, - 'failed to construct timestamp from "{}"'.format(node.value), + f'failed to construct timestamp from "{node.value}"', node.start_mark, ) values = match.groupdict() @@ -1774,9 +1649,15 @@ class RoundTripConstructor(SafeConstructor): if values['tz_sign'] == '-': delta = -delta # should check for None and solve issue 366 should be tzinfo=delta) - data = TimeStamp( - dd.year, dd.month, dd.day, dd.hour, dd.minute, dd.second, dd.microsecond - ) + # isinstance(datetime.datetime.now, datetime.date) is true) + if isinstance(dd, datetime.datetime): + data = TimeStamp( + dd.year, dd.month, dd.day, dd.hour, dd.minute, dd.second, dd.microsecond + ) + else: + # ToDo: make this into a DateStamp? + data = TimeStamp(dd.year, dd.month, dd.day, 0, 0, 0, 0) + return data if delta: data._yaml['delta'] = delta tz = values['tz_sign'] + values['tz_hour'] @@ -1786,13 +1667,11 @@ class RoundTripConstructor(SafeConstructor): else: if values['tz']: # no delta data._yaml['tz'] = values['tz'] - if values['t']: data._yaml['t'] = True return data - def construct_yaml_bool(self, node): - # type: (Any) -> Any + def construct_yaml_sbool(self, node: Any) -> Union[bool, ScalarBoolean]: b = SafeConstructor.construct_yaml_bool(self, node) if node.anchor: return ScalarBoolean(b, anchor=node.anchor) @@ -1804,7 +1683,7 @@ RoundTripConstructor.add_constructor( ) RoundTripConstructor.add_constructor( - 'tag:yaml.org,2002:bool', RoundTripConstructor.construct_yaml_bool + 'tag:yaml.org,2002:bool', RoundTripConstructor.construct_yaml_sbool ) RoundTripConstructor.add_constructor( @@ -1847,4 +1726,4 @@ RoundTripConstructor.add_constructor( 'tag:yaml.org,2002:map', RoundTripConstructor.construct_yaml_map ) -RoundTripConstructor.add_constructor(None, RoundTripConstructor.construct_undefined) +RoundTripConstructor.add_constructor(None, RoundTripConstructor.construct_unknown) |