diff options
author | Anthon van der Neut <anthon@mnt.org> | 2023-05-01 19:13:50 +0200 |
---|---|---|
committer | Anthon van der Neut <anthon@mnt.org> | 2023-05-01 19:13:50 +0200 |
commit | 8b731994b1543d7886af85f926d9eea5a22d0732 (patch) | |
tree | 3553d4cbc80b541484d7a3f39e00cdcfd8f9d030 /parser.py | |
parent | 45111ba0b67e8619265d89f3202635e62c13cde6 (diff) | |
download | ruamel.yaml-8b731994b1543d7886af85f926d9eea5a22d0732.tar.gz |
retrofitted 0.18 changes
Diffstat (limited to 'parser.py')
-rw-r--r-- | parser.py | 187 |
1 files changed, 75 insertions, 112 deletions
@@ -80,16 +80,14 @@ from ruamel.yaml.events import * # NOQA from ruamel.yaml.scanner import Scanner, RoundTripScanner, ScannerError # NOQA from ruamel.yaml.scanner import BlankLineComment from ruamel.yaml.comments import C_PRE, C_POST, C_SPLIT_ON_FIRST_BLANK -from ruamel.yaml.compat import _F, nprint, nprintf # NOQA +from ruamel.yaml.compat import nprint, nprintf # NOQA -if False: # MYPY - from typing import Any, Dict, Optional, List, Optional # NOQA +from typing import Any, Dict, Optional, List, Optional # NOQA __all__ = ['Parser', 'RoundTripParser', 'ParserError'] -def xprintf(*args, **kw): - # type: (Any, Any) -> Any +def xprintf(*args: Any, **kw: Any) -> Any: return nprintf(*args, **kw) pass @@ -104,42 +102,36 @@ class Parser: DEFAULT_TAGS = {'!': '!', '!!': 'tag:yaml.org,2002:'} - def __init__(self, loader): - # type: (Any) -> None + def __init__(self, loader: Any) -> None: self.loader = loader if self.loader is not None and getattr(self.loader, '_parser', None) is None: self.loader._parser = self self.reset_parser() - def reset_parser(self): - # type: () -> None + def reset_parser(self) -> None: # Reset the state attributes (to clear self-references) self.current_event = self.last_event = None - self.tag_handles = {} # type: Dict[Any, Any] - self.states = [] # type: List[Any] - self.marks = [] # type: List[Any] - self.state = self.parse_stream_start # type: Any + self.tag_handles: Dict[Any, Any] = {} + self.states: List[Any] = [] + self.marks: List[Any] = [] + self.state: Any = self.parse_stream_start - def dispose(self): - # type: () -> None + def dispose(self) -> None: self.reset_parser() @property - def scanner(self): - # type: () -> Any + def scanner(self) -> Any: if hasattr(self.loader, 'typ'): return self.loader.scanner return self.loader._scanner @property - def resolver(self): - # type: () -> Any + def resolver(self) -> Any: if hasattr(self.loader, 'typ'): return self.loader.resolver return self.loader._resolver - def check_event(self, *choices): - # type: (Any) -> bool + def check_event(self, *choices: Any) -> bool: # Check the type of the next event. if self.current_event is None: if self.state: @@ -152,16 +144,14 @@ class Parser: return True return False - def peek_event(self): - # type: () -> Any + def peek_event(self) -> Any: # Get the next event. if self.current_event is None: if self.state: self.current_event = self.state() return self.current_event - def get_event(self): - # type: () -> Any + def get_event(self) -> Any: # Get the next event and proceed further. if self.current_event is None: if self.state: @@ -178,8 +168,7 @@ class Parser: # implicit_document ::= block_node DOCUMENT-END* # explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END* - def parse_stream_start(self): - # type: () -> Any + def parse_stream_start(self) -> Any: # Parse the stream start. token = self.scanner.get_token() self.move_token_comment(token) @@ -190,8 +179,7 @@ class Parser: return event - def parse_implicit_document_start(self): - # type: () -> Any + def parse_implicit_document_start(self) -> Any: # Parse an implicit document. if not self.scanner.check_token(DirectiveToken, DocumentStartToken, StreamEndToken): self.tag_handles = self.DEFAULT_TAGS @@ -208,8 +196,7 @@ class Parser: else: return self.parse_document_start() - def parse_document_start(self): - # type: () -> Any + def parse_document_start(self) -> Any: # Parse any extra document end indicators. while self.scanner.check_token(DocumentEndToken): self.scanner.get_token() @@ -220,10 +207,8 @@ class Parser: raise ParserError( None, None, - _F( - "expected '<document start>', but found {pt!r}", - pt=self.scanner.peek_token().id, - ), + "expected '<document start>', " + f'but found {self.scanner.peek_token().id,!r}', self.scanner.peek_token().start_mark, ) token = self.scanner.get_token() @@ -232,10 +217,14 @@ class Parser: # if self.loader is not None and \ # end_mark.line != self.scanner.peek_token().start_mark.line: # self.loader.scalar_after_indicator = False - event = DocumentStartEvent( - start_mark, end_mark, explicit=True, version=version, tags=tags, - comment=token.comment - ) # type: Any + event: Any = DocumentStartEvent( + start_mark, + end_mark, + explicit=True, + version=version, + tags=tags, + comment=token.comment, + ) self.states.append(self.parse_document_end) self.state = self.parse_document_content else: @@ -247,8 +236,7 @@ class Parser: self.state = None return event - def parse_document_end(self): - # type: () -> Any + def parse_document_end(self) -> Any: # Parse the document end. token = self.scanner.peek_token() start_mark = end_mark = token.start_mark @@ -267,8 +255,7 @@ class Parser: return event - def parse_document_content(self): - # type: () -> Any + def parse_document_content(self) -> Any: if self.scanner.check_token( DirectiveToken, DocumentStartToken, DocumentEndToken, StreamEndToken ): @@ -278,8 +265,7 @@ class Parser: else: return self.parse_block_node() - def process_directives(self): - # type: () -> Any + def process_directives(self) -> Any: yaml_version = None self.tag_handles = {} while self.scanner.check_token(DirectiveToken): @@ -302,14 +288,11 @@ class Parser: handle, prefix = token.value if handle in self.tag_handles: raise ParserError( - None, - None, - _F('duplicate tag handle {handle!r}', handle=handle), - token.start_mark, + None, None, f'duplicate tag handle {handle!r}', token.start_mark, ) self.tag_handles[handle] = prefix if bool(self.tag_handles): - value = yaml_version, self.tag_handles.copy() # type: Any + value: Any = (yaml_version, self.tag_handles.copy()) else: value = yaml_version, None if self.loader is not None and hasattr(self.loader, 'tags'): @@ -339,27 +322,22 @@ class Parser: # block_collection ::= block_sequence | block_mapping # flow_collection ::= flow_sequence | flow_mapping - def parse_block_node(self): - # type: () -> Any + def parse_block_node(self) -> Any: return self.parse_node(block=True) - def parse_flow_node(self): - # type: () -> Any + def parse_flow_node(self) -> Any: return self.parse_node() - def parse_block_node_or_indentless_sequence(self): - # type: () -> Any + def parse_block_node_or_indentless_sequence(self) -> Any: return self.parse_node(block=True, indentless_sequence=True) - def transform_tag(self, handle, suffix): - # type: (Any, Any) -> Any + def transform_tag(self, handle: Any, suffix: Any) -> Any: return self.tag_handles[handle] + suffix - def parse_node(self, block=False, indentless_sequence=False): - # type: (bool, bool) -> Any + def parse_node(self, block: bool = False, indentless_sequence: bool = False) -> Any: if self.scanner.check_token(AliasToken): token = self.scanner.get_token() - event = AliasEvent(token.value, token.start_mark, token.end_mark) # type: Any + event: Any = AliasEvent(token.value, token.start_mark, token.end_mark) self.state = self.states.pop() return event @@ -394,7 +372,7 @@ class Parser: raise ParserError( 'while parsing a node', start_mark, - _F('found undefined tag handle {handle!r}', handle=handle), + f'found undefined tag handle {handle!r}', tag_mark, ) tag = self.transform_tag(handle, suffix) @@ -507,9 +485,9 @@ class Parser: node = 'flow' token = self.scanner.peek_token() raise ParserError( - _F('while parsing a {node!s} node', node=node), + f'while parsing a {node!s} node', start_mark, - _F('expected the node content, but found {token_id!r}', token_id=token.id), + f'expected the node content, but found {token.id!r}', token.start_mark, ) return event @@ -517,16 +495,14 @@ class Parser: # block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)* # BLOCK-END - def parse_block_sequence_first_entry(self): - # type: () -> Any + def parse_block_sequence_first_entry(self) -> Any: token = self.scanner.get_token() # move any comment from start token # self.move_token_comment(token) self.marks.append(token.start_mark) return self.parse_block_sequence_entry() - def parse_block_sequence_entry(self): - # type: () -> Any + def parse_block_sequence_entry(self) -> Any: if self.scanner.check_token(BlockEntryToken): token = self.scanner.get_token() self.move_token_comment(token) @@ -541,7 +517,7 @@ class Parser: raise ParserError( 'while parsing a block collection', self.marks[-1], - _F('expected <block end>, but found {token_id!r}', token_id=token.id), + f'expected <block end>, but found {token.id!r}', token.start_mark, ) token = self.scanner.get_token() # BlockEndToken @@ -557,8 +533,7 @@ class Parser: # - entry # - nested - def parse_indentless_sequence_entry(self): - # type: () -> Any + def parse_indentless_sequence_entry(self) -> Any: if self.scanner.check_token(BlockEntryToken): token = self.scanner.get_token() self.move_token_comment(token) @@ -587,14 +562,12 @@ class Parser: # (VALUE block_node_or_indentless_sequence?)?)* # BLOCK-END - def parse_block_mapping_first_key(self): - # type: () -> Any + def parse_block_mapping_first_key(self) -> Any: token = self.scanner.get_token() self.marks.append(token.start_mark) return self.parse_block_mapping_key() - def parse_block_mapping_key(self): - # type: () -> Any + def parse_block_mapping_key(self) -> Any: if self.scanner.check_token(KeyToken): token = self.scanner.get_token() self.move_token_comment(token) @@ -612,7 +585,7 @@ class Parser: raise ParserError( 'while parsing a block mapping', self.marks[-1], - _F('expected <block end>, but found {token_id!r}', token_id=token.id), + f'expected <block end>, but found {token.id!r}', token.start_mark, ) token = self.scanner.get_token() @@ -622,8 +595,7 @@ class Parser: self.marks.pop() return event - def parse_block_mapping_value(self): - # type: () -> Any + def parse_block_mapping_value(self) -> Any: if self.scanner.check_token(ValueToken): token = self.scanner.get_token() # value token might have post comment move it to e.g. block @@ -662,14 +634,12 @@ class Parser: # For `flow_sequence_entry`, the part `KEY flow_node? (VALUE flow_node?)?` # generate an inline mapping (set syntax). - def parse_flow_sequence_first_entry(self): - # type: () -> Any + def parse_flow_sequence_first_entry(self) -> Any: token = self.scanner.get_token() self.marks.append(token.start_mark) return self.parse_flow_sequence_entry(first=True) - def parse_flow_sequence_entry(self, first=False): - # type: (bool) -> Any + def parse_flow_sequence_entry(self, first: bool = False) -> Any: if not self.scanner.check_token(FlowSequenceEndToken): if not first: if self.scanner.check_token(FlowEntryToken): @@ -679,15 +649,15 @@ class Parser: raise ParserError( 'while parsing a flow sequence', self.marks[-1], - _F("expected ',' or ']', but got {token_id!r}", token_id=token.id), + f"expected ',' or ']', but got {token.id!r}", token.start_mark, ) if self.scanner.check_token(KeyToken): token = self.scanner.peek_token() - event = MappingStartEvent( + event: Any = MappingStartEvent( None, None, True, token.start_mark, token.end_mark, flow_style=True - ) # type: Any + ) self.state = self.parse_flow_sequence_entry_mapping_key return event elif not self.scanner.check_token(FlowSequenceEndToken): @@ -699,8 +669,7 @@ class Parser: self.marks.pop() return event - def parse_flow_sequence_entry_mapping_key(self): - # type: () -> Any + def parse_flow_sequence_entry_mapping_key(self) -> Any: token = self.scanner.get_token() if not self.scanner.check_token(ValueToken, FlowEntryToken, FlowSequenceEndToken): self.states.append(self.parse_flow_sequence_entry_mapping_value) @@ -709,8 +678,7 @@ class Parser: self.state = self.parse_flow_sequence_entry_mapping_value return self.process_empty_scalar(token.end_mark) - def parse_flow_sequence_entry_mapping_value(self): - # type: () -> Any + def parse_flow_sequence_entry_mapping_value(self) -> Any: if self.scanner.check_token(ValueToken): token = self.scanner.get_token() if not self.scanner.check_token(FlowEntryToken, FlowSequenceEndToken): @@ -724,8 +692,7 @@ class Parser: token = self.scanner.peek_token() return self.process_empty_scalar(token.start_mark) - def parse_flow_sequence_entry_mapping_end(self): - # type: () -> Any + def parse_flow_sequence_entry_mapping_end(self) -> Any: self.state = self.parse_flow_sequence_entry token = self.scanner.peek_token() return MappingEndEvent(token.start_mark, token.start_mark) @@ -736,14 +703,12 @@ class Parser: # FLOW-MAPPING-END # flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? - def parse_flow_mapping_first_key(self): - # type: () -> Any + def parse_flow_mapping_first_key(self) -> Any: token = self.scanner.get_token() self.marks.append(token.start_mark) return self.parse_flow_mapping_key(first=True) - def parse_flow_mapping_key(self, first=False): - # type: (Any) -> Any + def parse_flow_mapping_key(self, first: Any = False) -> Any: if not self.scanner.check_token(FlowMappingEndToken): if not first: if self.scanner.check_token(FlowEntryToken): @@ -753,7 +718,7 @@ class Parser: raise ParserError( 'while parsing a flow mapping', self.marks[-1], - _F("expected ',' or '}}', but got {token_id!r}", token_id=token.id), + f"expected ',' or '}}', but got {token.id!r}", token.start_mark, ) if self.scanner.check_token(KeyToken): @@ -780,8 +745,7 @@ class Parser: self.marks.pop() return event - def parse_flow_mapping_value(self): - # type: () -> Any + def parse_flow_mapping_value(self) -> Any: if self.scanner.check_token(ValueToken): token = self.scanner.get_token() if not self.scanner.check_token(FlowEntryToken, FlowMappingEndToken): @@ -795,25 +759,23 @@ class Parser: token = self.scanner.peek_token() return self.process_empty_scalar(token.start_mark) - def parse_flow_mapping_empty_value(self): - # type: () -> Any + def parse_flow_mapping_empty_value(self) -> Any: self.state = self.parse_flow_mapping_key return self.process_empty_scalar(self.scanner.peek_token().start_mark) - def process_empty_scalar(self, mark, comment=None): - # type: (Any, Any) -> Any + def process_empty_scalar(self, mark: Any, comment: Any = None) -> Any: return ScalarEvent(None, None, (True, False), "", mark, mark, comment=comment) - def move_token_comment(self, token, nt=None, empty=False): - # type: (Any, Optional[Any], Optional[bool]) -> Any + def move_token_comment( + self, token: Any, nt: Optional[Any] = None, empty: Optional[bool] = False + ) -> Any: pass class RoundTripParser(Parser): """roundtrip is a safe loader, that wants to see the unmangled tag""" - def transform_tag(self, handle, suffix): - # type: (Any, Any) -> Any + def transform_tag(self, handle: Any, suffix: Any) -> Any: # return self.tag_handles[handle]+suffix if handle == '!!' and suffix in ( 'null', @@ -832,8 +794,9 @@ class RoundTripParser(Parser): return Parser.transform_tag(self, handle, suffix) return handle + suffix - def move_token_comment(self, token, nt=None, empty=False): - # type: (Any, Optional[Any], Optional[bool]) -> Any + def move_token_comment( + self, token: Any, nt: Optional[Any] = None, empty: Optional[bool] = False + ) -> Any: token.move_old_comment(self.scanner.peek_token() if nt is None else nt, empty=empty) @@ -843,12 +806,12 @@ class RoundTripParserSC(RoundTripParser): # some of the differences are based on the superclass testing # if self.loader.comment_handling is not None - def move_token_comment(self, token, nt=None, empty=False): - # type: (Any, Any, Any, Optional[bool]) -> None + def move_token_comment( + self: Any, token: Any, nt: Any = None, empty: Optional[bool] = False + ) -> None: token.move_new_comment(self.scanner.peek_token() if nt is None else nt, empty=empty) - def distribute_comment(self, comment, line): - # type: (Any, Any) -> Any + def distribute_comment(self, comment: Any, line: Any) -> Any: # ToDo, look at indentation of the comment to determine attachment if comment is None: return None |