summaryrefslogtreecommitdiff
path: root/parser.py
diff options
context:
space:
mode:
authorAnthon van der Neut <anthon@mnt.org>2023-05-01 19:13:50 +0200
committerAnthon van der Neut <anthon@mnt.org>2023-05-01 19:13:50 +0200
commit8b731994b1543d7886af85f926d9eea5a22d0732 (patch)
tree3553d4cbc80b541484d7a3f39e00cdcfd8f9d030 /parser.py
parent45111ba0b67e8619265d89f3202635e62c13cde6 (diff)
downloadruamel.yaml-8b731994b1543d7886af85f926d9eea5a22d0732.tar.gz
retrofitted 0.18 changes
Diffstat (limited to 'parser.py')
-rw-r--r--parser.py187
1 files changed, 75 insertions, 112 deletions
diff --git a/parser.py b/parser.py
index a2fab43..c8b5fcf 100644
--- a/parser.py
+++ b/parser.py
@@ -80,16 +80,14 @@ from ruamel.yaml.events import * # NOQA
from ruamel.yaml.scanner import Scanner, RoundTripScanner, ScannerError # NOQA
from ruamel.yaml.scanner import BlankLineComment
from ruamel.yaml.comments import C_PRE, C_POST, C_SPLIT_ON_FIRST_BLANK
-from ruamel.yaml.compat import _F, nprint, nprintf # NOQA
+from ruamel.yaml.compat import nprint, nprintf # NOQA
-if False: # MYPY
- from typing import Any, Dict, Optional, List, Optional # NOQA
+from typing import Any, Dict, Optional, List, Optional # NOQA
__all__ = ['Parser', 'RoundTripParser', 'ParserError']
-def xprintf(*args, **kw):
- # type: (Any, Any) -> Any
+def xprintf(*args: Any, **kw: Any) -> Any:
return nprintf(*args, **kw)
pass
@@ -104,42 +102,36 @@ class Parser:
DEFAULT_TAGS = {'!': '!', '!!': 'tag:yaml.org,2002:'}
- def __init__(self, loader):
- # type: (Any) -> None
+ def __init__(self, loader: Any) -> None:
self.loader = loader
if self.loader is not None and getattr(self.loader, '_parser', None) is None:
self.loader._parser = self
self.reset_parser()
- def reset_parser(self):
- # type: () -> None
+ def reset_parser(self) -> None:
# Reset the state attributes (to clear self-references)
self.current_event = self.last_event = None
- self.tag_handles = {} # type: Dict[Any, Any]
- self.states = [] # type: List[Any]
- self.marks = [] # type: List[Any]
- self.state = self.parse_stream_start # type: Any
+ self.tag_handles: Dict[Any, Any] = {}
+ self.states: List[Any] = []
+ self.marks: List[Any] = []
+ self.state: Any = self.parse_stream_start
- def dispose(self):
- # type: () -> None
+ def dispose(self) -> None:
self.reset_parser()
@property
- def scanner(self):
- # type: () -> Any
+ def scanner(self) -> Any:
if hasattr(self.loader, 'typ'):
return self.loader.scanner
return self.loader._scanner
@property
- def resolver(self):
- # type: () -> Any
+ def resolver(self) -> Any:
if hasattr(self.loader, 'typ'):
return self.loader.resolver
return self.loader._resolver
- def check_event(self, *choices):
- # type: (Any) -> bool
+ def check_event(self, *choices: Any) -> bool:
# Check the type of the next event.
if self.current_event is None:
if self.state:
@@ -152,16 +144,14 @@ class Parser:
return True
return False
- def peek_event(self):
- # type: () -> Any
+ def peek_event(self) -> Any:
# Get the next event.
if self.current_event is None:
if self.state:
self.current_event = self.state()
return self.current_event
- def get_event(self):
- # type: () -> Any
+ def get_event(self) -> Any:
# Get the next event and proceed further.
if self.current_event is None:
if self.state:
@@ -178,8 +168,7 @@ class Parser:
# implicit_document ::= block_node DOCUMENT-END*
# explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END*
- def parse_stream_start(self):
- # type: () -> Any
+ def parse_stream_start(self) -> Any:
# Parse the stream start.
token = self.scanner.get_token()
self.move_token_comment(token)
@@ -190,8 +179,7 @@ class Parser:
return event
- def parse_implicit_document_start(self):
- # type: () -> Any
+ def parse_implicit_document_start(self) -> Any:
# Parse an implicit document.
if not self.scanner.check_token(DirectiveToken, DocumentStartToken, StreamEndToken):
self.tag_handles = self.DEFAULT_TAGS
@@ -208,8 +196,7 @@ class Parser:
else:
return self.parse_document_start()
- def parse_document_start(self):
- # type: () -> Any
+ def parse_document_start(self) -> Any:
# Parse any extra document end indicators.
while self.scanner.check_token(DocumentEndToken):
self.scanner.get_token()
@@ -220,10 +207,8 @@ class Parser:
raise ParserError(
None,
None,
- _F(
- "expected '<document start>', but found {pt!r}",
- pt=self.scanner.peek_token().id,
- ),
+ "expected '<document start>', "
+ f'but found {self.scanner.peek_token().id,!r}',
self.scanner.peek_token().start_mark,
)
token = self.scanner.get_token()
@@ -232,10 +217,14 @@ class Parser:
# if self.loader is not None and \
# end_mark.line != self.scanner.peek_token().start_mark.line:
# self.loader.scalar_after_indicator = False
- event = DocumentStartEvent(
- start_mark, end_mark, explicit=True, version=version, tags=tags,
- comment=token.comment
- ) # type: Any
+ event: Any = DocumentStartEvent(
+ start_mark,
+ end_mark,
+ explicit=True,
+ version=version,
+ tags=tags,
+ comment=token.comment,
+ )
self.states.append(self.parse_document_end)
self.state = self.parse_document_content
else:
@@ -247,8 +236,7 @@ class Parser:
self.state = None
return event
- def parse_document_end(self):
- # type: () -> Any
+ def parse_document_end(self) -> Any:
# Parse the document end.
token = self.scanner.peek_token()
start_mark = end_mark = token.start_mark
@@ -267,8 +255,7 @@ class Parser:
return event
- def parse_document_content(self):
- # type: () -> Any
+ def parse_document_content(self) -> Any:
if self.scanner.check_token(
DirectiveToken, DocumentStartToken, DocumentEndToken, StreamEndToken
):
@@ -278,8 +265,7 @@ class Parser:
else:
return self.parse_block_node()
- def process_directives(self):
- # type: () -> Any
+ def process_directives(self) -> Any:
yaml_version = None
self.tag_handles = {}
while self.scanner.check_token(DirectiveToken):
@@ -302,14 +288,11 @@ class Parser:
handle, prefix = token.value
if handle in self.tag_handles:
raise ParserError(
- None,
- None,
- _F('duplicate tag handle {handle!r}', handle=handle),
- token.start_mark,
+ None, None, f'duplicate tag handle {handle!r}', token.start_mark,
)
self.tag_handles[handle] = prefix
if bool(self.tag_handles):
- value = yaml_version, self.tag_handles.copy() # type: Any
+ value: Any = (yaml_version, self.tag_handles.copy())
else:
value = yaml_version, None
if self.loader is not None and hasattr(self.loader, 'tags'):
@@ -339,27 +322,22 @@ class Parser:
# block_collection ::= block_sequence | block_mapping
# flow_collection ::= flow_sequence | flow_mapping
- def parse_block_node(self):
- # type: () -> Any
+ def parse_block_node(self) -> Any:
return self.parse_node(block=True)
- def parse_flow_node(self):
- # type: () -> Any
+ def parse_flow_node(self) -> Any:
return self.parse_node()
- def parse_block_node_or_indentless_sequence(self):
- # type: () -> Any
+ def parse_block_node_or_indentless_sequence(self) -> Any:
return self.parse_node(block=True, indentless_sequence=True)
- def transform_tag(self, handle, suffix):
- # type: (Any, Any) -> Any
+ def transform_tag(self, handle: Any, suffix: Any) -> Any:
return self.tag_handles[handle] + suffix
- def parse_node(self, block=False, indentless_sequence=False):
- # type: (bool, bool) -> Any
+ def parse_node(self, block: bool = False, indentless_sequence: bool = False) -> Any:
if self.scanner.check_token(AliasToken):
token = self.scanner.get_token()
- event = AliasEvent(token.value, token.start_mark, token.end_mark) # type: Any
+ event: Any = AliasEvent(token.value, token.start_mark, token.end_mark)
self.state = self.states.pop()
return event
@@ -394,7 +372,7 @@ class Parser:
raise ParserError(
'while parsing a node',
start_mark,
- _F('found undefined tag handle {handle!r}', handle=handle),
+ f'found undefined tag handle {handle!r}',
tag_mark,
)
tag = self.transform_tag(handle, suffix)
@@ -507,9 +485,9 @@ class Parser:
node = 'flow'
token = self.scanner.peek_token()
raise ParserError(
- _F('while parsing a {node!s} node', node=node),
+ f'while parsing a {node!s} node',
start_mark,
- _F('expected the node content, but found {token_id!r}', token_id=token.id),
+ f'expected the node content, but found {token.id!r}',
token.start_mark,
)
return event
@@ -517,16 +495,14 @@ class Parser:
# block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)*
# BLOCK-END
- def parse_block_sequence_first_entry(self):
- # type: () -> Any
+ def parse_block_sequence_first_entry(self) -> Any:
token = self.scanner.get_token()
# move any comment from start token
# self.move_token_comment(token)
self.marks.append(token.start_mark)
return self.parse_block_sequence_entry()
- def parse_block_sequence_entry(self):
- # type: () -> Any
+ def parse_block_sequence_entry(self) -> Any:
if self.scanner.check_token(BlockEntryToken):
token = self.scanner.get_token()
self.move_token_comment(token)
@@ -541,7 +517,7 @@ class Parser:
raise ParserError(
'while parsing a block collection',
self.marks[-1],
- _F('expected <block end>, but found {token_id!r}', token_id=token.id),
+ f'expected <block end>, but found {token.id!r}',
token.start_mark,
)
token = self.scanner.get_token() # BlockEndToken
@@ -557,8 +533,7 @@ class Parser:
# - entry
# - nested
- def parse_indentless_sequence_entry(self):
- # type: () -> Any
+ def parse_indentless_sequence_entry(self) -> Any:
if self.scanner.check_token(BlockEntryToken):
token = self.scanner.get_token()
self.move_token_comment(token)
@@ -587,14 +562,12 @@ class Parser:
# (VALUE block_node_or_indentless_sequence?)?)*
# BLOCK-END
- def parse_block_mapping_first_key(self):
- # type: () -> Any
+ def parse_block_mapping_first_key(self) -> Any:
token = self.scanner.get_token()
self.marks.append(token.start_mark)
return self.parse_block_mapping_key()
- def parse_block_mapping_key(self):
- # type: () -> Any
+ def parse_block_mapping_key(self) -> Any:
if self.scanner.check_token(KeyToken):
token = self.scanner.get_token()
self.move_token_comment(token)
@@ -612,7 +585,7 @@ class Parser:
raise ParserError(
'while parsing a block mapping',
self.marks[-1],
- _F('expected <block end>, but found {token_id!r}', token_id=token.id),
+ f'expected <block end>, but found {token.id!r}',
token.start_mark,
)
token = self.scanner.get_token()
@@ -622,8 +595,7 @@ class Parser:
self.marks.pop()
return event
- def parse_block_mapping_value(self):
- # type: () -> Any
+ def parse_block_mapping_value(self) -> Any:
if self.scanner.check_token(ValueToken):
token = self.scanner.get_token()
# value token might have post comment move it to e.g. block
@@ -662,14 +634,12 @@ class Parser:
# For `flow_sequence_entry`, the part `KEY flow_node? (VALUE flow_node?)?`
# generate an inline mapping (set syntax).
- def parse_flow_sequence_first_entry(self):
- # type: () -> Any
+ def parse_flow_sequence_first_entry(self) -> Any:
token = self.scanner.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_sequence_entry(first=True)
- def parse_flow_sequence_entry(self, first=False):
- # type: (bool) -> Any
+ def parse_flow_sequence_entry(self, first: bool = False) -> Any:
if not self.scanner.check_token(FlowSequenceEndToken):
if not first:
if self.scanner.check_token(FlowEntryToken):
@@ -679,15 +649,15 @@ class Parser:
raise ParserError(
'while parsing a flow sequence',
self.marks[-1],
- _F("expected ',' or ']', but got {token_id!r}", token_id=token.id),
+ f"expected ',' or ']', but got {token.id!r}",
token.start_mark,
)
if self.scanner.check_token(KeyToken):
token = self.scanner.peek_token()
- event = MappingStartEvent(
+ event: Any = MappingStartEvent(
None, None, True, token.start_mark, token.end_mark, flow_style=True
- ) # type: Any
+ )
self.state = self.parse_flow_sequence_entry_mapping_key
return event
elif not self.scanner.check_token(FlowSequenceEndToken):
@@ -699,8 +669,7 @@ class Parser:
self.marks.pop()
return event
- def parse_flow_sequence_entry_mapping_key(self):
- # type: () -> Any
+ def parse_flow_sequence_entry_mapping_key(self) -> Any:
token = self.scanner.get_token()
if not self.scanner.check_token(ValueToken, FlowEntryToken, FlowSequenceEndToken):
self.states.append(self.parse_flow_sequence_entry_mapping_value)
@@ -709,8 +678,7 @@ class Parser:
self.state = self.parse_flow_sequence_entry_mapping_value
return self.process_empty_scalar(token.end_mark)
- def parse_flow_sequence_entry_mapping_value(self):
- # type: () -> Any
+ def parse_flow_sequence_entry_mapping_value(self) -> Any:
if self.scanner.check_token(ValueToken):
token = self.scanner.get_token()
if not self.scanner.check_token(FlowEntryToken, FlowSequenceEndToken):
@@ -724,8 +692,7 @@ class Parser:
token = self.scanner.peek_token()
return self.process_empty_scalar(token.start_mark)
- def parse_flow_sequence_entry_mapping_end(self):
- # type: () -> Any
+ def parse_flow_sequence_entry_mapping_end(self) -> Any:
self.state = self.parse_flow_sequence_entry
token = self.scanner.peek_token()
return MappingEndEvent(token.start_mark, token.start_mark)
@@ -736,14 +703,12 @@ class Parser:
# FLOW-MAPPING-END
# flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)?
- def parse_flow_mapping_first_key(self):
- # type: () -> Any
+ def parse_flow_mapping_first_key(self) -> Any:
token = self.scanner.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_mapping_key(first=True)
- def parse_flow_mapping_key(self, first=False):
- # type: (Any) -> Any
+ def parse_flow_mapping_key(self, first: Any = False) -> Any:
if not self.scanner.check_token(FlowMappingEndToken):
if not first:
if self.scanner.check_token(FlowEntryToken):
@@ -753,7 +718,7 @@ class Parser:
raise ParserError(
'while parsing a flow mapping',
self.marks[-1],
- _F("expected ',' or '}}', but got {token_id!r}", token_id=token.id),
+ f"expected ',' or '}}', but got {token.id!r}",
token.start_mark,
)
if self.scanner.check_token(KeyToken):
@@ -780,8 +745,7 @@ class Parser:
self.marks.pop()
return event
- def parse_flow_mapping_value(self):
- # type: () -> Any
+ def parse_flow_mapping_value(self) -> Any:
if self.scanner.check_token(ValueToken):
token = self.scanner.get_token()
if not self.scanner.check_token(FlowEntryToken, FlowMappingEndToken):
@@ -795,25 +759,23 @@ class Parser:
token = self.scanner.peek_token()
return self.process_empty_scalar(token.start_mark)
- def parse_flow_mapping_empty_value(self):
- # type: () -> Any
+ def parse_flow_mapping_empty_value(self) -> Any:
self.state = self.parse_flow_mapping_key
return self.process_empty_scalar(self.scanner.peek_token().start_mark)
- def process_empty_scalar(self, mark, comment=None):
- # type: (Any, Any) -> Any
+ def process_empty_scalar(self, mark: Any, comment: Any = None) -> Any:
return ScalarEvent(None, None, (True, False), "", mark, mark, comment=comment)
- def move_token_comment(self, token, nt=None, empty=False):
- # type: (Any, Optional[Any], Optional[bool]) -> Any
+ def move_token_comment(
+ self, token: Any, nt: Optional[Any] = None, empty: Optional[bool] = False
+ ) -> Any:
pass
class RoundTripParser(Parser):
"""roundtrip is a safe loader, that wants to see the unmangled tag"""
- def transform_tag(self, handle, suffix):
- # type: (Any, Any) -> Any
+ def transform_tag(self, handle: Any, suffix: Any) -> Any:
# return self.tag_handles[handle]+suffix
if handle == '!!' and suffix in (
'null',
@@ -832,8 +794,9 @@ class RoundTripParser(Parser):
return Parser.transform_tag(self, handle, suffix)
return handle + suffix
- def move_token_comment(self, token, nt=None, empty=False):
- # type: (Any, Optional[Any], Optional[bool]) -> Any
+ def move_token_comment(
+ self, token: Any, nt: Optional[Any] = None, empty: Optional[bool] = False
+ ) -> Any:
token.move_old_comment(self.scanner.peek_token() if nt is None else nt, empty=empty)
@@ -843,12 +806,12 @@ class RoundTripParserSC(RoundTripParser):
# some of the differences are based on the superclass testing
# if self.loader.comment_handling is not None
- def move_token_comment(self, token, nt=None, empty=False):
- # type: (Any, Any, Any, Optional[bool]) -> None
+ def move_token_comment(
+ self: Any, token: Any, nt: Any = None, empty: Optional[bool] = False
+ ) -> None:
token.move_new_comment(self.scanner.peek_token() if nt is None else nt, empty=empty)
- def distribute_comment(self, comment, line):
- # type: (Any, Any) -> Any
+ def distribute_comment(self, comment: Any, line: Any) -> Any:
# ToDo, look at indentation of the comment to determine attachment
if comment is None:
return None