summaryrefslogtreecommitdiff
path: root/constructor.py
diff options
context:
space:
mode:
Diffstat (limited to 'constructor.py')
-rw-r--r--constructor.py463
1 files changed, 171 insertions, 292 deletions
diff --git a/constructor.py b/constructor.py
index a67ca55..dc7e5ed 100644
--- a/constructor.py
+++ b/constructor.py
@@ -13,7 +13,7 @@ from ruamel.yaml.error import (MarkedYAMLError, MarkedYAMLFutureWarning,
MantissaNoDotYAML1_1Warning)
from ruamel.yaml.nodes import * # NOQA
from ruamel.yaml.nodes import (SequenceNode, MappingNode, ScalarNode)
-from ruamel.yaml.compat import (_F, builtins_module, # NOQA
+from ruamel.yaml.compat import (builtins_module, # NOQA
nprint, nprintf, version_tnf)
from ruamel.yaml.compat import ordereddict
@@ -33,8 +33,7 @@ from ruamel.yaml.scalarbool import ScalarBoolean
from ruamel.yaml.timestamp import TimeStamp
from ruamel.yaml.util import timestamp_regexp, create_timestamp
-if False: # MYPY
- from typing import Any, Dict, List, Set, Generator, Union, Optional # NOQA
+from typing import Any, Dict, List, Set, Iterator, Union, Optional # NOQA
__all__ = ['BaseConstructor', 'SafeConstructor', 'Constructor',
@@ -59,70 +58,62 @@ class BaseConstructor:
yaml_constructors = {} # type: Dict[Any, Any]
yaml_multi_constructors = {} # type: Dict[Any, Any]
- def __init__(self, preserve_quotes=None, loader=None):
- # type: (Optional[bool], Any) -> None
+ def __init__(self, preserve_quotes: Optional[bool] = None, loader: Any = None) -> None:
self.loader = loader
if self.loader is not None and getattr(self.loader, '_constructor', None) is None:
self.loader._constructor = self
self.loader = loader
self.yaml_base_dict_type = dict
self.yaml_base_list_type = list
- self.constructed_objects = {} # type: Dict[Any, Any]
- self.recursive_objects = {} # type: Dict[Any, Any]
- self.state_generators = [] # type: List[Any]
+ self.constructed_objects: Dict[Any, Any] = {}
+ self.recursive_objects: Dict[Any, Any] = {}
+ self.state_generators: List[Any] = []
self.deep_construct = False
self._preserve_quotes = preserve_quotes
self.allow_duplicate_keys = version_tnf((0, 15, 1), (0, 16))
@property
- def composer(self):
- # type: () -> Any
+ def composer(self) -> Any:
if hasattr(self.loader, 'typ'):
return self.loader.composer
try:
return self.loader._composer
except AttributeError:
- sys.stdout.write('slt {}\n'.format(type(self)))
- sys.stdout.write('slc {}\n'.format(self.loader._composer))
- sys.stdout.write('{}\n'.format(dir(self)))
+ sys.stdout.write(f'slt {type(self)}\n')
+ sys.stdout.write(f'slc {self.loader._composer}\n')
+ sys.stdout.write(f'{dir(self)}\n')
raise
@property
- def resolver(self):
- # type: () -> Any
+ def resolver(self) -> Any:
if hasattr(self.loader, 'typ'):
return self.loader.resolver
return self.loader._resolver
@property
- def scanner(self):
- # type: () -> Any
+ def scanner(self) -> Any:
# needed to get to the expanded comments
if hasattr(self.loader, 'typ'):
return self.loader.scanner
return self.loader._scanner
- def check_data(self):
- # type: () -> Any
+ def check_data(self) -> Any:
# If there are more documents available?
return self.composer.check_node()
- def get_data(self):
- # type: () -> Any
+ def get_data(self) -> Any:
# Construct and return the next document.
if self.composer.check_node():
return self.construct_document(self.composer.get_node())
- def get_single_data(self):
- # type: () -> Any
+ def get_single_data(self) -> Any:
# Ensure that the stream contains a single document and construct it.
node = self.composer.get_single_node()
if node is not None:
return self.construct_document(node)
return None
- def construct_document(self, node):
- # type: (Any) -> Any
+ def construct_document(self, node: Any) -> Any:
data = self.construct_object(node)
while bool(self.state_generators):
state_generators = self.state_generators
@@ -135,8 +126,7 @@ class BaseConstructor:
self.deep_construct = False
return data
- def construct_object(self, node, deep=False):
- # type: (Any, bool) -> Any
+ def construct_object(self, node: Any, deep: bool = False) -> Any:
"""deep is True when creating an object/mapping recursively,
in that case want the underlying elements available during construction
"""
@@ -159,9 +149,8 @@ class BaseConstructor:
self.deep_construct = old_deep
return data
- def construct_non_recursive_object(self, node, tag=None):
- # type: (Any, Optional[str]) -> Any
- constructor = None # type: Any
+ def construct_non_recursive_object(self, node: Any, tag: Optional[str] = None) -> Any:
+ constructor: Any = None
tag_suffix = None
if tag is None:
tag = node.tag
@@ -199,19 +188,14 @@ class BaseConstructor:
self.state_generators.append(generator)
return data
- def construct_scalar(self, node):
- # type: (Any) -> Any
+ def construct_scalar(self, node: Any) -> Any:
if not isinstance(node, ScalarNode):
raise ConstructorError(
- None,
- None,
- _F('expected a scalar node, but found {node_id!s}', node_id=node.id),
- node.start_mark,
+ None, None, f'expected a scalar node, but found {node.id!s}', node.start_mark,
)
return node.value
- def construct_sequence(self, node, deep=False):
- # type: (Any, bool) -> Any
+ def construct_sequence(self, node: Any, deep: bool = False) -> Any:
"""deep is True when creating an object/mapping recursively,
in that case want the underlying elements available during construction
"""
@@ -219,22 +203,18 @@ class BaseConstructor:
raise ConstructorError(
None,
None,
- _F('expected a sequence node, but found {node_id!s}', node_id=node.id),
+ f'expected a sequence node, but found {node.id!s}',
node.start_mark,
)
return [self.construct_object(child, deep=deep) for child in node.value]
- def construct_mapping(self, node, deep=False):
- # type: (Any, bool) -> Any
+ def construct_mapping(self, node: Any, deep: bool = False) -> Any:
"""deep is True when creating an object/mapping recursively,
in that case want the underlying elements available during construction
"""
if not isinstance(node, MappingNode):
raise ConstructorError(
- None,
- None,
- _F('expected a mapping node, but found {node_id!s}', node_id=node.id),
- node.start_mark,
+ None, None, f'expected a mapping node, but found {node.id!s}', node.start_mark,
)
total_mapping = self.yaml_base_dict_type()
if getattr(node, 'merge', None) is not None:
@@ -242,7 +222,7 @@ class BaseConstructor:
else:
todo = [(node.value, True)]
for values, check in todo:
- mapping = self.yaml_base_dict_type() # type: Dict[Any, Any]
+ mapping: Dict[Any, Any] = self.yaml_base_dict_type()
for key_node, value_node in values:
# keys can be list -> deep
key = self.construct_object(key_node, deep=True)
@@ -267,8 +247,9 @@ class BaseConstructor:
total_mapping.update(mapping)
return total_mapping
- def check_mapping_key(self, node, key_node, mapping, key, value):
- # type: (Any, Any, Any, Any, Any) -> bool
+ def check_mapping_key(
+ self, node: Any, key_node: Any, mapping: Any, key: Any, value: Any
+ ) -> bool:
"""return True if key is unique"""
if key in mapping:
if not self.allow_duplicate_keys:
@@ -276,8 +257,8 @@ class BaseConstructor:
args = [
'while constructing a mapping',
node.start_mark,
- 'found duplicate key "{}" with value "{}" '
- '(original value: "{}")'.format(key, value, mk),
+ f'found duplicate key "{key}" with value "{value}" '
+ f'(original value: "{mk}")',
key_node.start_mark,
"""
To suppress this check see:
@@ -289,20 +270,19 @@ class BaseConstructor:
""",
]
if self.allow_duplicate_keys is None:
- warnings.warn(DuplicateKeyFutureWarning(*args))
+ warnings.warn(DuplicateKeyFutureWarning(*args), stacklevel=1)
else:
raise DuplicateKeyError(*args)
return False
return True
- def check_set_key(self, node, key_node, setting, key):
- # type: (Any, Any, Any, Any, Any) -> None
+ def check_set_key(self: Any, node: Any, key_node: Any, setting: Any, key: Any) -> None:
if key in setting:
if not self.allow_duplicate_keys:
args = [
'while constructing a set',
node.start_mark,
- 'found duplicate key "{}"'.format(key),
+ f'found duplicate key "{key}"',
key_node.start_mark,
"""
To suppress this check see:
@@ -314,18 +294,14 @@ class BaseConstructor:
""",
]
if self.allow_duplicate_keys is None:
- warnings.warn(DuplicateKeyFutureWarning(*args))
+ warnings.warn(DuplicateKeyFutureWarning(*args), stacklevel=1)
else:
raise DuplicateKeyError(*args)
- def construct_pairs(self, node, deep=False):
- # type: (Any, bool) -> Any
+ def construct_pairs(self, node: Any, deep: bool = False) -> Any:
if not isinstance(node, MappingNode):
raise ConstructorError(
- None,
- None,
- _F('expected a mapping node, but found {node_id!s}', node_id=node.id),
- node.start_mark,
+ None, None, f'expected a mapping node, but found {node.id!s}', node.start_mark,
)
pairs = []
for key_node, value_node in node.value:
@@ -335,37 +311,33 @@ class BaseConstructor:
return pairs
@classmethod
- def add_constructor(cls, tag, constructor):
- # type: (Any, Any) -> None
+ def add_constructor(cls, tag: Any, constructor: Any) -> None:
if 'yaml_constructors' not in cls.__dict__:
cls.yaml_constructors = cls.yaml_constructors.copy()
cls.yaml_constructors[tag] = constructor
@classmethod
- def add_multi_constructor(cls, tag_prefix, multi_constructor):
- # type: (Any, Any) -> None
+ def add_multi_constructor(cls, tag_prefix: Any, multi_constructor: Any) -> None:
if 'yaml_multi_constructors' not in cls.__dict__:
cls.yaml_multi_constructors = cls.yaml_multi_constructors.copy()
cls.yaml_multi_constructors[tag_prefix] = multi_constructor
class SafeConstructor(BaseConstructor):
- def construct_scalar(self, node):
- # type: (Any) -> Any
+ def construct_scalar(self, node: Any) -> Any:
if isinstance(node, MappingNode):
for key_node, value_node in node.value:
if key_node.tag == 'tag:yaml.org,2002:value':
return self.construct_scalar(value_node)
return BaseConstructor.construct_scalar(self, node)
- def flatten_mapping(self, node):
- # type: (Any) -> Any
+ def flatten_mapping(self, node: Any) -> Any:
"""
This implements the merge key feature http://yaml.org/type/merge.html
by inserting keys from the merge dict/list of dicts if not yet
available in this node
"""
- merge = [] # type: List[Any]
+ merge: List[Any] = []
index = 0
while index < len(node.value):
key_node, value_node = node.value[index]
@@ -378,7 +350,7 @@ class SafeConstructor(BaseConstructor):
args = [
'while constructing a mapping',
node.start_mark,
- 'found duplicate key "{}"'.format(key_node.value),
+ f'found duplicate key "{key_node.value}"',
key_node.start_mark,
"""
To suppress this check see:
@@ -390,7 +362,7 @@ class SafeConstructor(BaseConstructor):
""",
]
if self.allow_duplicate_keys is None:
- warnings.warn(DuplicateKeyFutureWarning(*args))
+ warnings.warn(DuplicateKeyFutureWarning(*args), stacklevel=1)
else:
raise DuplicateKeyError(*args)
del node.value[index]
@@ -404,10 +376,7 @@ class SafeConstructor(BaseConstructor):
raise ConstructorError(
'while constructing a mapping',
node.start_mark,
- _F(
- 'expected a mapping for merging, but found {subnode_id!s}',
- subnode_id=subnode.id,
- ),
+ f'expected a mapping for merging, but found {subnode.id!s}',
subnode.start_mark,
)
self.flatten_mapping(subnode)
@@ -419,11 +388,8 @@ class SafeConstructor(BaseConstructor):
raise ConstructorError(
'while constructing a mapping',
node.start_mark,
- _F(
- 'expected a mapping or list of mappings for merging, '
- 'but found {value_node_id!s}',
- value_node_id=value_node.id,
- ),
+ 'expected a mapping or list of mappings for merging, '
+ f'but found {value_node.id!s}',
value_node.start_mark,
)
elif key_node.tag == 'tag:yaml.org,2002:value':
@@ -435,8 +401,7 @@ class SafeConstructor(BaseConstructor):
node.merge = merge # separate merge keys to be able to update without duplicate
node.value = merge + node.value
- def construct_mapping(self, node, deep=False):
- # type: (Any, bool) -> Any
+ def construct_mapping(self, node: Any, deep: bool = False) -> Any:
"""deep is True when creating an object/mapping recursively,
in that case want the underlying elements available during construction
"""
@@ -444,8 +409,7 @@ class SafeConstructor(BaseConstructor):
self.flatten_mapping(node)
return BaseConstructor.construct_mapping(self, node, deep=deep)
- def construct_yaml_null(self, node):
- # type: (Any) -> Any
+ def construct_yaml_null(self, node: Any) -> Any:
self.construct_scalar(node)
return None
@@ -461,13 +425,11 @@ class SafeConstructor(BaseConstructor):
'off': False,
}
- def construct_yaml_bool(self, node):
- # type: (Any) -> bool
+ def construct_yaml_bool(self, node: Any) -> bool:
value = self.construct_scalar(node)
return self.bool_values[value.lower()]
- def construct_yaml_int(self, node):
- # type: (Any) -> int
+ def construct_yaml_int(self, node: Any) -> int:
value_s = self.construct_scalar(node)
value_s = value_s.replace('_', "")
sign = +1
@@ -502,8 +464,7 @@ class SafeConstructor(BaseConstructor):
inf_value *= inf_value
nan_value = -inf_value / inf_value # Trying to make a quiet NaN (like C99).
- def construct_yaml_float(self, node):
- # type: (Any) -> float
+ def construct_yaml_float(self, node: Any) -> float:
value_so = self.construct_scalar(node)
value_s = value_so.replace('_', "").lower()
sign = +1
@@ -529,34 +490,29 @@ class SafeConstructor(BaseConstructor):
# value_s is lower case independent of input
mantissa, exponent = value_s.split('e')
if '.' not in mantissa:
- warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so))
+ warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so), stacklevel=1)
return sign * float(value_s)
- def construct_yaml_binary(self, node):
- # type: (Any) -> Any
+ def construct_yaml_binary(self, node: Any) -> Any:
try:
value = self.construct_scalar(node).encode('ascii')
except UnicodeEncodeError as exc:
raise ConstructorError(
None,
None,
- _F('failed to convert base64 data into ascii: {exc!s}', exc=exc),
+ f'failed to convert base64 data into ascii: {exc!s}',
node.start_mark,
)
try:
return base64.decodebytes(value)
except binascii.Error as exc:
raise ConstructorError(
- None,
- None,
- _F('failed to decode base64 data: {exc!s}', exc=exc),
- node.start_mark,
+ None, None, f'failed to decode base64 data: {exc!s}', node.start_mark,
)
timestamp_regexp = timestamp_regexp # moved to util 0.17.17
- def construct_yaml_timestamp(self, node, values=None):
- # type: (Any, Any) -> Any
+ def construct_yaml_timestamp(self, node: Any, values: Any = None) -> Any:
if values is None:
try:
match = self.timestamp_regexp.match(node.value)
@@ -566,14 +522,13 @@ class SafeConstructor(BaseConstructor):
raise ConstructorError(
None,
None,
- 'failed to construct timestamp from "{}"'.format(node.value),
+ f'failed to construct timestamp from "{node.value}"',
node.start_mark,
)
values = match.groupdict()
return create_timestamp(**values)
- def construct_yaml_omap(self, node):
- # type: (Any) -> Any
+ def construct_yaml_omap(self, node: Any) -> Any:
# Note: we do now check for duplicate keys
omap = ordereddict()
yield omap
@@ -581,7 +536,7 @@ class SafeConstructor(BaseConstructor):
raise ConstructorError(
'while constructing an ordered map',
node.start_mark,
- _F('expected a sequence, but found {node_id!s}', node_id=node.id),
+ f'expected a sequence, but found {node.id!s}',
node.start_mark,
)
for subnode in node.value:
@@ -589,20 +544,14 @@ class SafeConstructor(BaseConstructor):
raise ConstructorError(
'while constructing an ordered map',
node.start_mark,
- _F(
- 'expected a mapping of length 1, but found {subnode_id!s}',
- subnode_id=subnode.id,
- ),
+ f'expected a mapping of length 1, but found {subnode.id!s}',
subnode.start_mark,
)
if len(subnode.value) != 1:
raise ConstructorError(
'while constructing an ordered map',
node.start_mark,
- _F(
- 'expected a single mapping item, but found {len_subnode_val:d} items',
- len_subnode_val=len(subnode.value),
- ),
+ f'expected a single mapping item, but found {len(subnode.value):d} items',
subnode.start_mark,
)
key_node, value_node = subnode.value[0]
@@ -611,16 +560,15 @@ class SafeConstructor(BaseConstructor):
value = self.construct_object(value_node)
omap[key] = value
- def construct_yaml_pairs(self, node):
- # type: (Any) -> Any
+ def construct_yaml_pairs(self, node: Any) -> Any:
# Note: the same code as `construct_yaml_omap`.
- pairs = [] # type: List[Any]
+ pairs: List[Any] = []
yield pairs
if not isinstance(node, SequenceNode):
raise ConstructorError(
'while constructing pairs',
node.start_mark,
- _F('expected a sequence, but found {node_id!s}', node_id=node.id),
+ f'expected a sequence, but found {node.id!s}',
node.start_mark,
)
for subnode in node.value:
@@ -628,20 +576,14 @@ class SafeConstructor(BaseConstructor):
raise ConstructorError(
'while constructing pairs',
node.start_mark,
- _F(
- 'expected a mapping of length 1, but found {subnode_id!s}',
- subnode_id=subnode.id,
- ),
+ f'expected a mapping of length 1, but found {subnode.id!s}',
subnode.start_mark,
)
if len(subnode.value) != 1:
raise ConstructorError(
'while constructing pairs',
node.start_mark,
- _F(
- 'expected a single mapping item, but found {len_subnode_val:d} items',
- len_subnode_val=len(subnode.value),
- ),
+ f'expected a single mapping item, but found {len(subnode.value):d} items',
subnode.start_mark,
)
key_node, value_node = subnode.value[0]
@@ -649,33 +591,28 @@ class SafeConstructor(BaseConstructor):
value = self.construct_object(value_node)
pairs.append((key, value))
- def construct_yaml_set(self, node):
- # type: (Any) -> Any
- data = set() # type: Set[Any]
+ def construct_yaml_set(self, node: Any) -> Any:
+ data: Set[Any] = set()
yield data
value = self.construct_mapping(node)
data.update(value)
- def construct_yaml_str(self, node):
- # type: (Any) -> Any
+ def construct_yaml_str(self, node: Any) -> Any:
value = self.construct_scalar(node)
return value
- def construct_yaml_seq(self, node):
- # type: (Any) -> Any
- data = self.yaml_base_list_type() # type: List[Any]
+ def construct_yaml_seq(self, node: Any) -> Any:
+ data: List[Any] = self.yaml_base_list_type()
yield data
data.extend(self.construct_sequence(node))
- def construct_yaml_map(self, node):
- # type: (Any) -> Any
- data = self.yaml_base_dict_type() # type: Dict[Any, Any]
+ def construct_yaml_map(self, node: Any) -> Any:
+ data: Dict[Any, Any] = self.yaml_base_dict_type()
yield data
value = self.construct_mapping(node)
data.update(value)
- def construct_yaml_object(self, node, cls):
- # type: (Any, Any) -> Any
+ def construct_yaml_object(self, node: Any, cls: Any) -> Any:
data = cls.__new__(cls)
yield data
if hasattr(data, '__setstate__'):
@@ -685,14 +622,11 @@ class SafeConstructor(BaseConstructor):
state = self.construct_mapping(node)
data.__dict__.update(state)
- def construct_undefined(self, node):
- # type: (Any) -> None
+ def construct_undefined(self, node: Any) -> None:
raise ConstructorError(
None,
None,
- _F(
- 'could not determine a constructor for the tag {node_tag!r}', node_tag=node.tag
- ),
+ f'could not determine a constructor for the tag {node.tag!r}',
node.start_mark,
)
@@ -733,50 +667,40 @@ SafeConstructor.add_constructor(None, SafeConstructor.construct_undefined)
class Constructor(SafeConstructor):
- def construct_python_str(self, node):
- # type: (Any) -> Any
+ def construct_python_str(self, node: Any) -> Any:
return self.construct_scalar(node)
- def construct_python_unicode(self, node):
- # type: (Any) -> Any
+ def construct_python_unicode(self, node: Any) -> Any:
return self.construct_scalar(node)
- def construct_python_bytes(self, node):
- # type: (Any) -> Any
+ def construct_python_bytes(self, node: Any) -> Any:
try:
value = self.construct_scalar(node).encode('ascii')
except UnicodeEncodeError as exc:
raise ConstructorError(
None,
None,
- _F('failed to convert base64 data into ascii: {exc!s}', exc=exc),
+ f'failed to convert base64 data into ascii: {exc!s}',
node.start_mark,
)
try:
return base64.decodebytes(value)
except binascii.Error as exc:
raise ConstructorError(
- None,
- None,
- _F('failed to decode base64 data: {exc!s}', exc=exc),
- node.start_mark,
+ None, None, f'failed to decode base64 data: {exc!s}', node.start_mark,
)
- def construct_python_long(self, node):
- # type: (Any) -> int
+ def construct_python_long(self, node: Any) -> int:
val = self.construct_yaml_int(node)
return val
- def construct_python_complex(self, node):
- # type: (Any) -> Any
+ def construct_python_complex(self, node: Any) -> Any:
return complex(self.construct_scalar(node))
- def construct_python_tuple(self, node):
- # type: (Any) -> Any
+ def construct_python_tuple(self, node: Any) -> Any:
return tuple(self.construct_sequence(node))
- def find_python_module(self, name, mark):
- # type: (Any, Any) -> Any
+ def find_python_module(self, name: Any, mark: Any) -> Any:
if not name:
raise ConstructorError(
'while constructing a Python module',
@@ -790,13 +714,12 @@ class Constructor(SafeConstructor):
raise ConstructorError(
'while constructing a Python module',
mark,
- _F('cannot find module {name!r} ({exc!s})', name=name, exc=exc),
+ f'cannot find module {name!r} ({exc!s})',
mark,
)
return sys.modules[name]
- def find_python_name(self, name, mark):
- # type: (Any, Any) -> Any
+ def find_python_name(self, name: Any, mark: Any) -> Any:
if not name:
raise ConstructorError(
'while constructing a Python object',
@@ -807,7 +730,7 @@ class Constructor(SafeConstructor):
if '.' in name:
lname = name.split('.')
lmodule_name = lname
- lobject_name = [] # type: List[Any]
+ lobject_name: List[Any] = []
while len(lmodule_name) > 1:
lobject_name.insert(0, lmodule_name.pop())
module_name = '.'.join(lmodule_name)
@@ -826,11 +749,7 @@ class Constructor(SafeConstructor):
raise ConstructorError(
'while constructing a Python object',
mark,
- _F(
- 'cannot find module {module_name!r} ({exc!s})',
- module_name=module_name,
- exc=exc,
- ),
+ f'cannot find module {module_name!r} ({exc!s})',
mark,
)
module = sys.modules[module_name]
@@ -842,42 +761,37 @@ class Constructor(SafeConstructor):
raise ConstructorError(
'while constructing a Python object',
mark,
- _F(
- 'cannot find {object_name!r} in the module {module_name!r}',
- object_name=object_name,
- module_name=module.__name__,
- ),
+ f'cannot find {object_name!r} in the module {module.__name__!r}',
mark,
)
obj = getattr(obj, lobject_name.pop(0))
return obj
- def construct_python_name(self, suffix, node):
- # type: (Any, Any) -> Any
+ def construct_python_name(self, suffix: Any, node: Any) -> Any:
value = self.construct_scalar(node)
if value:
raise ConstructorError(
'while constructing a Python name',
node.start_mark,
- _F('expected the empty value, but found {value!r}', value=value),
+ f'expected the empty value, but found {value!r}',
node.start_mark,
)
return self.find_python_name(suffix, node.start_mark)
- def construct_python_module(self, suffix, node):
- # type: (Any, Any) -> Any
+ def construct_python_module(self, suffix: Any, node: Any) -> Any:
value = self.construct_scalar(node)
if value:
raise ConstructorError(
'while constructing a Python module',
node.start_mark,
- _F('expected the empty value, but found {value!r}', value=value),
+ f'expected the empty value, but found {value!r}',
node.start_mark,
)
return self.find_python_module(suffix, node.start_mark)
- def make_python_instance(self, suffix, node, args=None, kwds=None, newobj=False):
- # type: (Any, Any, Any, Any, bool) -> Any
+ def make_python_instance(
+ self, suffix: Any, node: Any, args: Any = None, kwds: Any = None, newobj: bool = False
+ ) -> Any:
if not args:
args = []
if not kwds:
@@ -888,12 +802,11 @@ class Constructor(SafeConstructor):
else:
return cls(*args, **kwds)
- def set_python_instance_state(self, instance, state):
- # type: (Any, Any) -> None
+ def set_python_instance_state(self, instance: Any, state: Any) -> None:
if hasattr(instance, '__setstate__'):
instance.__setstate__(state)
else:
- slotstate = {} # type: Dict[Any, Any]
+ slotstate: Dict[Any, Any] = {}
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
if hasattr(instance, '__dict__'):
@@ -903,8 +816,7 @@ class Constructor(SafeConstructor):
for key, value in slotstate.items():
setattr(instance, key, value)
- def construct_python_object(self, suffix, node):
- # type: (Any, Any) -> Any
+ def construct_python_object(self, suffix: Any, node: Any) -> Any:
# Format:
# !!python/object:module.name { ... state ... }
instance = self.make_python_instance(suffix, node, newobj=True)
@@ -914,8 +826,9 @@ class Constructor(SafeConstructor):
state = self.construct_mapping(node, deep=deep)
self.set_python_instance_state(instance, state)
- def construct_python_object_apply(self, suffix, node, newobj=False):
- # type: (Any, Any, bool) -> Any
+ def construct_python_object_apply(
+ self, suffix: Any, node: Any, newobj: bool = False
+ ) -> Any:
# Format:
# !!python/object/apply # (or !!python/object/new)
# args: [ ... arguments ... ]
@@ -929,10 +842,10 @@ class Constructor(SafeConstructor):
# is how an object is created, check make_python_instance for details.
if isinstance(node, SequenceNode):
args = self.construct_sequence(node, deep=True)
- kwds = {} # type: Dict[Any, Any]
- state = {} # type: Dict[Any, Any]
- listitems = [] # type: List[Any]
- dictitems = {} # type: Dict[Any, Any]
+ kwds: Dict[Any, Any] = {}
+ state: Dict[Any, Any] = {}
+ listitems: List[Any] = []
+ dictitems: Dict[Any, Any] = {}
else:
value = self.construct_mapping(node, deep=True)
args = value.get('args', [])
@@ -950,8 +863,7 @@ class Constructor(SafeConstructor):
instance[key] = dictitems[key]
return instance
- def construct_python_object_new(self, suffix, node):
- # type: (Any, Any) -> Any
+ def construct_python_object_new(self, suffix: Any, node: Any) -> Any:
return self.construct_python_object_apply(suffix, node, newobj=True)
@@ -1013,15 +925,13 @@ class RoundTripConstructor(SafeConstructor):
as well as on the items
"""
- def comment(self, idx):
- # type: (Any) -> Any
+ def comment(self, idx: Any) -> Any:
assert self.loader.comment_handling is not None
x = self.scanner.comments[idx]
x.set_assigned()
return x
- def comments(self, list_of_comments, idx=None):
- # type: (Any, Optional[Any]) -> Any
+ def comments(self, list_of_comments: Any, idx: Optional[Any] = None) -> Any:
# hand in the comment and optional pre, eol, post segment
if list_of_comments is None:
return []
@@ -1032,14 +942,10 @@ class RoundTripConstructor(SafeConstructor):
for x in list_of_comments:
yield self.comment(x)
- def construct_scalar(self, node):
- # type: (Any) -> Any
+ def construct_scalar(self, node: Any) -> Any:
if not isinstance(node, ScalarNode):
raise ConstructorError(
- None,
- None,
- _F('expected a scalar node, but found {node_id!s}', node_id=node.id),
- node.start_mark,
+ None, None, f'expected a scalar node, but found {node.id!s}', node.start_mark,
)
if node.style == '|' and isinstance(node.value, str):
@@ -1055,7 +961,7 @@ class RoundTripConstructor(SafeConstructor):
lss.comment = self.comment(node.comment[1][0]) # type: ignore
return lss
if node.style == '>' and isinstance(node.value, str):
- fold_positions = [] # type: List[int]
+ fold_positions: List[int] = []
idx = -1
while True:
idx = node.value.find('\a', idx + 1)
@@ -1084,13 +990,12 @@ class RoundTripConstructor(SafeConstructor):
return PlainScalarString(node.value, anchor=node.anchor)
return node.value
- def construct_yaml_int(self, node):
- # type: (Any) -> Any
- width = None # type: Any
+ def construct_yaml_int(self, node: Any) -> Any:
+ width: Any = None
value_su = self.construct_scalar(node)
try:
sx = value_su.rstrip('_')
- underscore = [len(sx) - sx.rindex('_') - 1, False, False] # type: Any
+ underscore: Any = [len(sx) - sx.rindex('_') - 1, False, False]
except ValueError:
underscore = None
except IndexError:
@@ -1119,7 +1024,7 @@ class RoundTripConstructor(SafeConstructor):
# default to lower-case if no a-fA-F in string
if self.resolver.processing_version > (1, 1) and value_s[2] == '0':
width = len(value_s[2:])
- hex_fun = HexInt # type: Any
+ hex_fun: Any = HexInt
for ch in value_s[2:]:
if ch in 'ABCDEF': # first non-digit is capital
hex_fun = HexCapsInt
@@ -1180,10 +1085,8 @@ class RoundTripConstructor(SafeConstructor):
else:
return sign * int(value_s)
- def construct_yaml_float(self, node):
- # type: (Any) -> Any
- def leading_zeros(v):
- # type: (Any) -> int
+ def construct_yaml_float(self, node: Any) -> Any:
+ def leading_zeros(v: Any) -> int:
lead0 = 0
idx = 0
while idx < len(v) and v[idx] in '0.':
@@ -1193,7 +1096,7 @@ class RoundTripConstructor(SafeConstructor):
return lead0
# underscore = None
- m_sign = False # type: Any
+ m_sign: Any = False
value_so = self.construct_scalar(node)
value_s = value_so.replace('_', "").lower()
sign = +1
@@ -1225,7 +1128,7 @@ class RoundTripConstructor(SafeConstructor):
if self.resolver.processing_version != (1, 2):
# value_s is lower case independent of input
if '.' not in mantissa:
- warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so))
+ warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so), stacklevel=1)
lead0 = leading_zeros(mantissa)
width = len(mantissa)
prec = mantissa.find('.')
@@ -1246,7 +1149,8 @@ class RoundTripConstructor(SafeConstructor):
anchor=node.anchor,
)
width = len(value_so)
- prec = value_so.index('.') # you can use index, this would not be float without dot
+ # you can't use index, !!float 42 would be a float without a dot
+ prec = value_so.find('.')
lead0 = leading_zeros(value_so)
return ScalarFloat(
sign * float(value_s),
@@ -1257,20 +1161,18 @@ class RoundTripConstructor(SafeConstructor):
anchor=node.anchor,
)
- def construct_yaml_str(self, node):
- # type: (Any) -> Any
+ def construct_yaml_str(self, node: Any) -> Any:
value = self.construct_scalar(node)
if isinstance(value, ScalarString):
return value
return value
- def construct_rt_sequence(self, node, seqtyp, deep=False):
- # type: (Any, Any, bool) -> Any
+ def construct_rt_sequence(self, node: Any, seqtyp: Any, deep: bool = False) -> Any:
if not isinstance(node, SequenceNode):
raise ConstructorError(
None,
None,
- _F('expected a sequence node, but found {node_id!s}', node_id=node.id),
+ f'expected a sequence node, but found {node.id!s}',
node.start_mark,
)
ret_val = []
@@ -1301,16 +1203,14 @@ class RoundTripConstructor(SafeConstructor):
)
return ret_val
- def flatten_mapping(self, node):
- # type: (Any) -> Any
+ def flatten_mapping(self, node: Any) -> Any:
"""
This implements the merge key feature http://yaml.org/type/merge.html
by inserting keys from the merge dict/list of dicts if not yet
available in this node
"""
- def constructed(value_node):
- # type: (Any) -> Any
+ def constructed(value_node: Any) -> Any:
# If the contents of a merge are defined within the
# merge marker, then they won't have been constructed
# yet. But if they were already constructed, we need to use
@@ -1322,7 +1222,7 @@ class RoundTripConstructor(SafeConstructor):
return value
# merge = []
- merge_map_list = [] # type: List[Any]
+ merge_map_list: List[Any] = []
index = 0
while index < len(node.value):
key_node, value_node = node.value[index]
@@ -1335,7 +1235,7 @@ class RoundTripConstructor(SafeConstructor):
args = [
'while constructing a mapping',
node.start_mark,
- 'found duplicate key "{}"'.format(key_node.value),
+ f'found duplicate key "{key_node.value}"',
key_node.start_mark,
"""
To suppress this check see:
@@ -1347,7 +1247,7 @@ class RoundTripConstructor(SafeConstructor):
""",
]
if self.allow_duplicate_keys is None:
- warnings.warn(DuplicateKeyFutureWarning(*args))
+ warnings.warn(DuplicateKeyFutureWarning(*args), stacklevel=1)
else:
raise DuplicateKeyError(*args)
del node.value[index]
@@ -1362,10 +1262,7 @@ class RoundTripConstructor(SafeConstructor):
raise ConstructorError(
'while constructing a mapping',
node.start_mark,
- _F(
- 'expected a mapping for merging, but found {subnode_id!s}',
- subnode_id=subnode.id,
- ),
+ f'expected a mapping for merging, but found {subnode.id!s}',
subnode.start_mark,
)
merge_map_list.append((index, constructed(subnode)))
@@ -1378,11 +1275,8 @@ class RoundTripConstructor(SafeConstructor):
raise ConstructorError(
'while constructing a mapping',
node.start_mark,
- _F(
- 'expected a mapping or list of mappings for merging, '
- 'but found {value_node_id!s}',
- value_node_id=value_node.id,
- ),
+ 'expected a mapping or list of mappings for merging, '
+ f'but found {value_node.id!s}',
value_node.start_mark,
)
elif key_node.tag == 'tag:yaml.org,2002:value':
@@ -1394,18 +1288,13 @@ class RoundTripConstructor(SafeConstructor):
# if merge:
# node.value = merge + node.value
- def _sentinel(self):
- # type: () -> None
+ def _sentinel(self) -> None:
pass
- def construct_mapping(self, node, maptyp, deep=False): # type: ignore
- # type: (Any, Any, bool) -> Any
+ def construct_mapping(self, node: Any, maptyp: Any, deep: bool = False) -> Any: # type: ignore # NOQA
if not isinstance(node, MappingNode):
raise ConstructorError(
- None,
- None,
- _F('expected a mapping node, but found {node_id!s}', node_id=node.id),
- node.start_mark,
+ None, None, f'expected a mapping node, but found {node.id!s}', node.start_mark,
)
merge_map = self.flatten_mapping(node)
# mapping = {}
@@ -1439,6 +1328,7 @@ class RoundTripConstructor(SafeConstructor):
key_s.fa.set_flow_style()
elif key_node.flow_style is False:
key_s.fa.set_block_style()
+ key_s._yaml_set_line_col(key.lc.line, key.lc.col) # type: ignore
key = key_s
elif isinstance(key, MutableMapping):
key_m = CommentedKeyMap(key)
@@ -1446,6 +1336,7 @@ class RoundTripConstructor(SafeConstructor):
key_m.fa.set_flow_style()
elif key_node.flow_style is False:
key_m.fa.set_block_style()
+ key_m._yaml_set_line_col(key.lc.line, key.lc.col) # type: ignore
key = key_m
if not isinstance(key, Hashable):
raise ConstructorError(
@@ -1503,14 +1394,10 @@ class RoundTripConstructor(SafeConstructor):
if merge_map:
maptyp.add_yaml_merge(merge_map)
- def construct_setting(self, node, typ, deep=False):
- # type: (Any, Any, bool) -> Any
+ def construct_setting(self, node: Any, typ: Any, deep: bool = False) -> Any:
if not isinstance(node, MappingNode):
raise ConstructorError(
- None,
- None,
- _F('expected a mapping node, but found {node_id!s}', node_id=node.id),
- node.start_mark,
+ None, None, f'expected a mapping node, but found {node.id!s}', node.start_mark,
)
if self.loader and self.loader.comment_handling is None:
if node.comment:
@@ -1556,8 +1443,7 @@ class RoundTripConstructor(SafeConstructor):
nprintf('nc7b', value_node.comment)
typ.add(key)
- def construct_yaml_seq(self, node):
- # type: (Any) -> Any
+ def construct_yaml_seq(self, node: Any) -> Iterator[CommentedSeq]:
data = CommentedSeq()
data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
# if node.comment:
@@ -1566,16 +1452,14 @@ class RoundTripConstructor(SafeConstructor):
data.extend(self.construct_rt_sequence(node, data))
self.set_collection_style(data, node)
- def construct_yaml_map(self, node):
- # type: (Any) -> Any
+ def construct_yaml_map(self, node: Any) -> Iterator[CommentedMap]:
data = CommentedMap()
data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
yield data
self.construct_mapping(node, data, deep=True)
self.set_collection_style(data, node)
- def set_collection_style(self, data, node):
- # type: (Any, Any) -> None
+ def set_collection_style(self, data: Any, node: Any) -> None:
if len(data) == 0:
return
if node.flow_style is True:
@@ -1583,8 +1467,7 @@ class RoundTripConstructor(SafeConstructor):
elif node.flow_style is False:
data.fa.set_block_style()
- def construct_yaml_object(self, node, cls):
- # type: (Any, Any) -> Any
+ def construct_yaml_object(self, node: Any, cls: Any) -> Any:
data = cls.__new__(cls)
yield data
if hasattr(data, '__setstate__'):
@@ -1608,8 +1491,7 @@ class RoundTripConstructor(SafeConstructor):
a = getattr(data, Anchor.attrib)
a.value = node.anchor
- def construct_yaml_omap(self, node):
- # type: (Any) -> Any
+ def construct_yaml_omap(self, node: Any) -> Iterator[CommentedOrderedMap]:
# Note: we do now check for duplicate keys
omap = CommentedOrderedMap()
omap._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
@@ -1631,7 +1513,7 @@ class RoundTripConstructor(SafeConstructor):
raise ConstructorError(
'while constructing an ordered map',
node.start_mark,
- _F('expected a sequence, but found {node_id!s}', node_id=node.id),
+ f'expected a sequence, but found {node.id!s}',
node.start_mark,
)
for subnode in node.value:
@@ -1639,20 +1521,14 @@ class RoundTripConstructor(SafeConstructor):
raise ConstructorError(
'while constructing an ordered map',
node.start_mark,
- _F(
- 'expected a mapping of length 1, but found {subnode_id!s}',
- subnode_id=subnode.id,
- ),
+ f'expected a mapping of length 1, but found {subnode.id!s}',
subnode.start_mark,
)
if len(subnode.value) != 1:
raise ConstructorError(
'while constructing an ordered map',
node.start_mark,
- _F(
- 'expected a single mapping item, but found {len_subnode_val:d} items',
- len_subnode_val=len(subnode.value),
- ),
+ f'expected a single mapping item, but found {len(subnode.value):d} items',
subnode.start_mark,
)
key_node, value_node = subnode.value[0]
@@ -1676,15 +1552,15 @@ class RoundTripConstructor(SafeConstructor):
nprintf('nc9c', value_node.comment)
omap[key] = value
- def construct_yaml_set(self, node):
- # type: (Any) -> Any
+ def construct_yaml_set(self, node: Any) -> Iterator[CommentedSet]:
data = CommentedSet()
data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
yield data
self.construct_setting(node, data)
- def construct_undefined(self, node):
- # type: (Any) -> Any
+ def construct_unknown(
+ self, node: Any
+ ) -> Iterator[Union[CommentedMap, TaggedScalar, CommentedSeq]]:
try:
if isinstance(node, MappingNode):
data = CommentedMap()
@@ -1735,14 +1611,13 @@ class RoundTripConstructor(SafeConstructor):
raise ConstructorError(
None,
None,
- _F(
- 'could not determine a constructor for the tag {node_tag!r}', node_tag=node.tag
- ),
+ f'could not determine a constructor for the tag {node.tag!r}',
node.start_mark,
)
- def construct_yaml_timestamp(self, node, values=None):
- # type: (Any, Any) -> Any
+ def construct_yaml_timestamp(
+ self, node: Any, values: Any = None
+ ) -> Union[datetime.date, datetime.datetime, TimeStamp]:
try:
match = self.timestamp_regexp.match(node.value)
except TypeError:
@@ -1751,7 +1626,7 @@ class RoundTripConstructor(SafeConstructor):
raise ConstructorError(
None,
None,
- 'failed to construct timestamp from "{}"'.format(node.value),
+ f'failed to construct timestamp from "{node.value}"',
node.start_mark,
)
values = match.groupdict()
@@ -1774,9 +1649,15 @@ class RoundTripConstructor(SafeConstructor):
if values['tz_sign'] == '-':
delta = -delta
# should check for None and solve issue 366 should be tzinfo=delta)
- data = TimeStamp(
- dd.year, dd.month, dd.day, dd.hour, dd.minute, dd.second, dd.microsecond
- )
+ # isinstance(datetime.datetime.now, datetime.date) is true)
+ if isinstance(dd, datetime.datetime):
+ data = TimeStamp(
+ dd.year, dd.month, dd.day, dd.hour, dd.minute, dd.second, dd.microsecond
+ )
+ else:
+ # ToDo: make this into a DateStamp?
+ data = TimeStamp(dd.year, dd.month, dd.day, 0, 0, 0, 0)
+ return data
if delta:
data._yaml['delta'] = delta
tz = values['tz_sign'] + values['tz_hour']
@@ -1786,13 +1667,11 @@ class RoundTripConstructor(SafeConstructor):
else:
if values['tz']: # no delta
data._yaml['tz'] = values['tz']
-
if values['t']:
data._yaml['t'] = True
return data
- def construct_yaml_bool(self, node):
- # type: (Any) -> Any
+ def construct_yaml_sbool(self, node: Any) -> Union[bool, ScalarBoolean]:
b = SafeConstructor.construct_yaml_bool(self, node)
if node.anchor:
return ScalarBoolean(b, anchor=node.anchor)
@@ -1804,7 +1683,7 @@ RoundTripConstructor.add_constructor(
)
RoundTripConstructor.add_constructor(
- 'tag:yaml.org,2002:bool', RoundTripConstructor.construct_yaml_bool
+ 'tag:yaml.org,2002:bool', RoundTripConstructor.construct_yaml_sbool
)
RoundTripConstructor.add_constructor(
@@ -1847,4 +1726,4 @@ RoundTripConstructor.add_constructor(
'tag:yaml.org,2002:map', RoundTripConstructor.construct_yaml_map
)
-RoundTripConstructor.add_constructor(None, RoundTripConstructor.construct_undefined)
+RoundTripConstructor.add_constructor(None, RoundTripConstructor.construct_unknown)