summaryrefslogtreecommitdiff
path: root/composer.py
diff options
context:
space:
mode:
authorAnthon van der Neut <anthon@mnt.org>2023-05-01 19:13:50 +0200
committerAnthon van der Neut <anthon@mnt.org>2023-05-01 19:13:50 +0200
commit8b731994b1543d7886af85f926d9eea5a22d0732 (patch)
tree3553d4cbc80b541484d7a3f39e00cdcfd8f9d030 /composer.py
parent45111ba0b67e8619265d89f3202635e62c13cde6 (diff)
downloadruamel.yaml-8b731994b1543d7886af85f926d9eea5a22d0732.tar.gz
retrofitted 0.18 changes
Diffstat (limited to 'composer.py')
-rw-r--r--composer.py68
1 files changed, 24 insertions, 44 deletions
diff --git a/composer.py b/composer.py
index bad132a..c943c1b 100644
--- a/composer.py
+++ b/composer.py
@@ -3,7 +3,7 @@
import warnings
from ruamel.yaml.error import MarkedYAMLError, ReusedAnchorWarning
-from ruamel.yaml.compat import _F, nprint, nprintf # NOQA
+from ruamel.yaml.compat import nprint, nprintf # NOQA
from ruamel.yaml.events import (
StreamStartEvent,
@@ -17,8 +17,7 @@ from ruamel.yaml.events import (
)
from ruamel.yaml.nodes import MappingNode, ScalarNode, SequenceNode
-if False: # MYPY
- from typing import Any, Dict, Optional, List # NOQA
+from typing import Any, Dict, Optional, List # NOQA
__all__ = ['Composer', 'ComposerError']
@@ -28,30 +27,26 @@ class ComposerError(MarkedYAMLError):
class Composer:
- def __init__(self, loader=None):
- # type: (Any) -> None
+ def __init__(self, loader: Any = None) -> None:
self.loader = loader
if self.loader is not None and getattr(self.loader, '_composer', None) is None:
self.loader._composer = self
- self.anchors = {} # type: Dict[Any, Any]
+ self.anchors: Dict[Any, Any] = {}
@property
- def parser(self):
- # type: () -> Any
+ def parser(self) -> Any:
if hasattr(self.loader, 'typ'):
self.loader.parser
return self.loader._parser
@property
- def resolver(self):
- # type: () -> Any
+ def resolver(self) -> Any:
# assert self.loader._resolver is not None
if hasattr(self.loader, 'typ'):
self.loader.resolver
return self.loader._resolver
- def check_node(self):
- # type: () -> Any
+ def check_node(self) -> Any:
# Drop the STREAM-START event.
if self.parser.check_event(StreamStartEvent):
self.parser.get_event()
@@ -59,19 +54,17 @@ class Composer:
# If there are more documents available?
return not self.parser.check_event(StreamEndEvent)
- def get_node(self):
- # type: () -> Any
+ def get_node(self) -> Any:
# Get the root node of the next document.
if not self.parser.check_event(StreamEndEvent):
return self.compose_document()
- def get_single_node(self):
- # type: () -> Any
+ def get_single_node(self) -> Any:
# Drop the STREAM-START event.
self.parser.get_event()
# Compose a document if the stream is not empty.
- document = None # type: Any
+ document: Any = None
if not self.parser.check_event(StreamEndEvent):
document = self.compose_document()
@@ -90,8 +83,7 @@ class Composer:
return document
- def compose_document(self):
- # type: (Any) -> Any
+ def compose_document(self: Any) -> Any:
# Drop the DOCUMENT-START event.
self.parser.get_event()
@@ -104,36 +96,28 @@ class Composer:
self.anchors = {}
return node
- def return_alias(self, a):
- # type: (Any) -> Any
+ def return_alias(self, a: Any) -> Any:
return a
- def compose_node(self, parent, index):
- # type: (Any, Any) -> Any
+ def compose_node(self, parent: Any, index: Any) -> Any:
if self.parser.check_event(AliasEvent):
event = self.parser.get_event()
alias = event.anchor
if alias not in self.anchors:
raise ComposerError(
- None,
- None,
- _F('found undefined alias {alias!r}', alias=alias),
- event.start_mark,
+ None, None, f'found undefined alias {alias!r}', event.start_mark,
)
return self.return_alias(self.anchors[alias])
event = self.parser.peek_event()
anchor = event.anchor
if anchor is not None: # have an anchor
if anchor in self.anchors:
- # raise ComposerError(
- # "found duplicate anchor %r; first occurrence"
- # % (anchor), self.anchors[anchor].start_mark,
- # "second occurrence", event.start_mark)
ws = (
- '\nfound duplicate anchor {!r}\nfirst occurrence {}\nsecond occurrence '
- '{}'.format((anchor), self.anchors[anchor].start_mark, event.start_mark)
+ f'\nfound duplicate anchor {anchor!r}\n'
+ f'first occurrence {self.anchors[anchor].start_mark}\n'
+ f'second occurrence {event.start_mark}'
)
- warnings.warn(ws, ReusedAnchorWarning)
+ warnings.warn(ws, ReusedAnchorWarning, stacklevel=2)
self.resolver.descend_resolver(parent, index)
if self.parser.check_event(ScalarEvent):
node = self.compose_scalar_node(anchor)
@@ -144,8 +128,7 @@ class Composer:
self.resolver.ascend_resolver()
return node
- def compose_scalar_node(self, anchor):
- # type: (Any) -> Any
+ def compose_scalar_node(self, anchor: Any) -> Any:
event = self.parser.get_event()
tag = event.tag
if tag is None or tag == '!':
@@ -163,8 +146,7 @@ class Composer:
self.anchors[anchor] = node
return node
- def compose_sequence_node(self, anchor):
- # type: (Any) -> Any
+ def compose_sequence_node(self, anchor: Any) -> Any:
start_event = self.parser.get_event()
tag = start_event.tag
if tag is None or tag == '!':
@@ -187,17 +169,16 @@ class Composer:
end_event = self.parser.get_event()
if node.flow_style is True and end_event.comment is not None:
if node.comment is not None:
+ x = node.flow_style
nprint(
- 'Warning: unexpected end_event commment in sequence '
- 'node {}'.format(node.flow_style)
+ f'Warning: unexpected end_event commment in sequence node {x}'
)
node.comment = end_event.comment
node.end_mark = end_event.end_mark
self.check_end_doc_comment(end_event, node)
return node
- def compose_mapping_node(self, anchor):
- # type: (Any) -> Any
+ def compose_mapping_node(self, anchor: Any) -> Any:
start_event = self.parser.get_event()
tag = start_event.tag
if tag is None or tag == '!':
@@ -230,8 +211,7 @@ class Composer:
self.check_end_doc_comment(end_event, node)
return node
- def check_end_doc_comment(self, end_event, node):
- # type: (Any, Any) -> None
+ def check_end_doc_comment(self, end_event: Any, node: Any) -> None:
if end_event.comment and end_event.comment[1]:
# pre comments on an end_event, no following to move to
if node.comment is None: