diff options
Diffstat (limited to 'composer.py')
-rw-r--r-- | composer.py | 68 |
1 files changed, 24 insertions, 44 deletions
diff --git a/composer.py b/composer.py index bad132a..c943c1b 100644 --- a/composer.py +++ b/composer.py @@ -3,7 +3,7 @@ import warnings from ruamel.yaml.error import MarkedYAMLError, ReusedAnchorWarning -from ruamel.yaml.compat import _F, nprint, nprintf # NOQA +from ruamel.yaml.compat import nprint, nprintf # NOQA from ruamel.yaml.events import ( StreamStartEvent, @@ -17,8 +17,7 @@ from ruamel.yaml.events import ( ) from ruamel.yaml.nodes import MappingNode, ScalarNode, SequenceNode -if False: # MYPY - from typing import Any, Dict, Optional, List # NOQA +from typing import Any, Dict, Optional, List # NOQA __all__ = ['Composer', 'ComposerError'] @@ -28,30 +27,26 @@ class ComposerError(MarkedYAMLError): class Composer: - def __init__(self, loader=None): - # type: (Any) -> None + def __init__(self, loader: Any = None) -> None: self.loader = loader if self.loader is not None and getattr(self.loader, '_composer', None) is None: self.loader._composer = self - self.anchors = {} # type: Dict[Any, Any] + self.anchors: Dict[Any, Any] = {} @property - def parser(self): - # type: () -> Any + def parser(self) -> Any: if hasattr(self.loader, 'typ'): self.loader.parser return self.loader._parser @property - def resolver(self): - # type: () -> Any + def resolver(self) -> Any: # assert self.loader._resolver is not None if hasattr(self.loader, 'typ'): self.loader.resolver return self.loader._resolver - def check_node(self): - # type: () -> Any + def check_node(self) -> Any: # Drop the STREAM-START event. if self.parser.check_event(StreamStartEvent): self.parser.get_event() @@ -59,19 +54,17 @@ class Composer: # If there are more documents available? return not self.parser.check_event(StreamEndEvent) - def get_node(self): - # type: () -> Any + def get_node(self) -> Any: # Get the root node of the next document. if not self.parser.check_event(StreamEndEvent): return self.compose_document() - def get_single_node(self): - # type: () -> Any + def get_single_node(self) -> Any: # Drop the STREAM-START event. self.parser.get_event() # Compose a document if the stream is not empty. - document = None # type: Any + document: Any = None if not self.parser.check_event(StreamEndEvent): document = self.compose_document() @@ -90,8 +83,7 @@ class Composer: return document - def compose_document(self): - # type: (Any) -> Any + def compose_document(self: Any) -> Any: # Drop the DOCUMENT-START event. self.parser.get_event() @@ -104,36 +96,28 @@ class Composer: self.anchors = {} return node - def return_alias(self, a): - # type: (Any) -> Any + def return_alias(self, a: Any) -> Any: return a - def compose_node(self, parent, index): - # type: (Any, Any) -> Any + def compose_node(self, parent: Any, index: Any) -> Any: if self.parser.check_event(AliasEvent): event = self.parser.get_event() alias = event.anchor if alias not in self.anchors: raise ComposerError( - None, - None, - _F('found undefined alias {alias!r}', alias=alias), - event.start_mark, + None, None, f'found undefined alias {alias!r}', event.start_mark, ) return self.return_alias(self.anchors[alias]) event = self.parser.peek_event() anchor = event.anchor if anchor is not None: # have an anchor if anchor in self.anchors: - # raise ComposerError( - # "found duplicate anchor %r; first occurrence" - # % (anchor), self.anchors[anchor].start_mark, - # "second occurrence", event.start_mark) ws = ( - '\nfound duplicate anchor {!r}\nfirst occurrence {}\nsecond occurrence ' - '{}'.format((anchor), self.anchors[anchor].start_mark, event.start_mark) + f'\nfound duplicate anchor {anchor!r}\n' + f'first occurrence {self.anchors[anchor].start_mark}\n' + f'second occurrence {event.start_mark}' ) - warnings.warn(ws, ReusedAnchorWarning) + warnings.warn(ws, ReusedAnchorWarning, stacklevel=2) self.resolver.descend_resolver(parent, index) if self.parser.check_event(ScalarEvent): node = self.compose_scalar_node(anchor) @@ -144,8 +128,7 @@ class Composer: self.resolver.ascend_resolver() return node - def compose_scalar_node(self, anchor): - # type: (Any) -> Any + def compose_scalar_node(self, anchor: Any) -> Any: event = self.parser.get_event() tag = event.tag if tag is None or tag == '!': @@ -163,8 +146,7 @@ class Composer: self.anchors[anchor] = node return node - def compose_sequence_node(self, anchor): - # type: (Any) -> Any + def compose_sequence_node(self, anchor: Any) -> Any: start_event = self.parser.get_event() tag = start_event.tag if tag is None or tag == '!': @@ -187,17 +169,16 @@ class Composer: end_event = self.parser.get_event() if node.flow_style is True and end_event.comment is not None: if node.comment is not None: + x = node.flow_style nprint( - 'Warning: unexpected end_event commment in sequence ' - 'node {}'.format(node.flow_style) + f'Warning: unexpected end_event commment in sequence node {x}' ) node.comment = end_event.comment node.end_mark = end_event.end_mark self.check_end_doc_comment(end_event, node) return node - def compose_mapping_node(self, anchor): - # type: (Any) -> Any + def compose_mapping_node(self, anchor: Any) -> Any: start_event = self.parser.get_event() tag = start_event.tag if tag is None or tag == '!': @@ -230,8 +211,7 @@ class Composer: self.check_end_doc_comment(end_event, node) return node - def check_end_doc_comment(self, end_event, node): - # type: (Any, Any) -> None + def check_end_doc_comment(self, end_event: Any, node: Any) -> None: if end_event.comment and end_event.comment[1]: # pre comments on an end_event, no following to move to if node.comment is None: |