diff options
Diffstat (limited to 'parser.py')
-rw-r--r-- | parser.py | 315 |
1 files changed, 180 insertions, 135 deletions
@@ -1,7 +1,5 @@ # coding: utf-8 -from __future__ import absolute_import - # The following YAML grammar is LL(1) and is parsed by a recursive descent # parser. # @@ -46,7 +44,7 @@ from __future__ import absolute_import # # FIRST sets: # -# stream: { STREAM-START } +# stream: { STREAM-START <} # explicit_document: { DIRECTIVE DOCUMENT-START } # implicit_document: FIRST(block_node) # block_node: { ALIAS TAG ANCHOR SCALAR BLOCK-SEQUENCE-START @@ -80,60 +78,60 @@ from ruamel.yaml.error import MarkedYAMLError from ruamel.yaml.tokens import * # NOQA from ruamel.yaml.events import * # NOQA from ruamel.yaml.scanner import Scanner, RoundTripScanner, ScannerError # NOQA -from ruamel.yaml.compat import utf8, nprint, nprintf # NOQA +from ruamel.yaml.scanner import BlankLineComment +from ruamel.yaml.comments import C_PRE, C_POST, C_SPLIT_ON_FIRST_BLANK +from ruamel.yaml.compat import nprint, nprintf # NOQA -if False: # MYPY - from typing import Any, Dict, Optional, List # NOQA +from typing import Any, Dict, Optional, List, Optional # NOQA __all__ = ['Parser', 'RoundTripParser', 'ParserError'] +def xprintf(*args: Any, **kw: Any) -> Any: + return nprintf(*args, **kw) + pass + + class ParserError(MarkedYAMLError): pass -class Parser(object): +class Parser: # Since writing a recursive-descendant parser is a straightforward task, we # do not give many comments here. - DEFAULT_TAGS = {u'!': u'!', u'!!': u'tag:yaml.org,2002:'} + DEFAULT_TAGS = {'!': '!', '!!': 'tag:yaml.org,2002:'} - def __init__(self, loader): - # type: (Any) -> None + def __init__(self, loader: Any) -> None: self.loader = loader if self.loader is not None and getattr(self.loader, '_parser', None) is None: self.loader._parser = self self.reset_parser() - def reset_parser(self): - # type: () -> None + def reset_parser(self) -> None: # Reset the state attributes (to clear self-references) - self.current_event = None - self.tag_handles = {} # type: Dict[Any, Any] - self.states = [] # type: List[Any] - self.marks = [] # type: List[Any] - self.state = self.parse_stream_start # type: Any + self.current_event = self.last_event = None + self.tag_handles: Dict[Any, Any] = {} + self.states: List[Any] = [] + self.marks: List[Any] = [] + self.state: Any = self.parse_stream_start - def dispose(self): - # type: () -> None + def dispose(self) -> None: self.reset_parser() @property - def scanner(self): - # type: () -> Any + def scanner(self) -> Any: if hasattr(self.loader, 'typ'): return self.loader.scanner return self.loader._scanner @property - def resolver(self): - # type: () -> Any + def resolver(self) -> Any: if hasattr(self.loader, 'typ'): return self.loader.resolver return self.loader._resolver - def check_event(self, *choices): - # type: (Any) -> bool + def check_event(self, *choices: Any) -> bool: # Check the type of the next event. if self.current_event is None: if self.state: @@ -146,21 +144,22 @@ class Parser(object): return True return False - def peek_event(self): - # type: () -> Any + def peek_event(self) -> Any: # Get the next event. if self.current_event is None: if self.state: self.current_event = self.state() return self.current_event - def get_event(self): - # type: () -> Any + def get_event(self) -> Any: # Get the next event and proceed further. if self.current_event is None: if self.state: self.current_event = self.state() - value = self.current_event + # assert self.current_event is not None + # if self.current_event.end_mark.line != self.peek_event().start_mark.line: + xprintf('get_event', repr(self.current_event), self.peek_event().start_mark.line) + self.last_event = value = self.current_event self.current_event = None return value @@ -169,11 +168,10 @@ class Parser(object): # implicit_document ::= block_node DOCUMENT-END* # explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END* - def parse_stream_start(self): - # type: () -> Any + def parse_stream_start(self) -> Any: # Parse the stream start. token = self.scanner.get_token() - token.move_comment(self.scanner.peek_token()) + self.move_token_comment(token) event = StreamStartEvent(token.start_mark, token.end_mark, encoding=token.encoding) # Prepare the next state. @@ -181,8 +179,7 @@ class Parser(object): return event - def parse_implicit_document_start(self): - # type: () -> Any + def parse_implicit_document_start(self) -> Any: # Parse an implicit document. if not self.scanner.check_token(DirectiveToken, DocumentStartToken, StreamEndToken): self.tag_handles = self.DEFAULT_TAGS @@ -199,31 +196,35 @@ class Parser(object): else: return self.parse_document_start() - def parse_document_start(self): - # type: () -> Any + def parse_document_start(self) -> Any: # Parse any extra document end indicators. while self.scanner.check_token(DocumentEndToken): self.scanner.get_token() # Parse an explicit document. if not self.scanner.check_token(StreamEndToken): - token = self.scanner.peek_token() - start_mark = token.start_mark version, tags = self.process_directives() if not self.scanner.check_token(DocumentStartToken): raise ParserError( None, None, - "expected '<document start>', but found %r" % self.scanner.peek_token().id, + "expected '<document start>', " + f'but found {self.scanner.peek_token().id,!r}', self.scanner.peek_token().start_mark, ) token = self.scanner.get_token() + start_mark = token.start_mark end_mark = token.end_mark # if self.loader is not None and \ # end_mark.line != self.scanner.peek_token().start_mark.line: # self.loader.scalar_after_indicator = False - event = DocumentStartEvent( - start_mark, end_mark, explicit=True, version=version, tags=tags - ) # type: Any + event: Any = DocumentStartEvent( + start_mark, + end_mark, + explicit=True, + version=version, + tags=tags, + comment=token.comment, + ) self.states.append(self.parse_document_end) self.state = self.parse_document_content else: @@ -235,8 +236,7 @@ class Parser(object): self.state = None return event - def parse_document_end(self): - # type: () -> Any + def parse_document_end(self) -> Any: # Parse the document end. token = self.scanner.peek_token() start_mark = end_mark = token.start_mark @@ -255,8 +255,7 @@ class Parser(object): return event - def parse_document_content(self): - # type: () -> Any + def parse_document_content(self) -> Any: if self.scanner.check_token( DirectiveToken, DocumentStartToken, DocumentEndToken, StreamEndToken ): @@ -266,13 +265,12 @@ class Parser(object): else: return self.parse_block_node() - def process_directives(self): - # type: () -> Any + def process_directives(self) -> Any: yaml_version = None self.tag_handles = {} while self.scanner.check_token(DirectiveToken): token = self.scanner.get_token() - if token.name == u'YAML': + if token.name == 'YAML': if yaml_version is not None: raise ParserError( None, None, 'found duplicate YAML directive', token.start_mark @@ -282,19 +280,19 @@ class Parser(object): raise ParserError( None, None, - 'found incompatible YAML document (version 1.* is ' 'required)', + 'found incompatible YAML document (version 1.* is required)', token.start_mark, ) yaml_version = token.value - elif token.name == u'TAG': + elif token.name == 'TAG': handle, prefix = token.value if handle in self.tag_handles: raise ParserError( - None, None, 'duplicate tag handle %r' % utf8(handle), token.start_mark + None, None, f'duplicate tag handle {handle!r}', token.start_mark, ) self.tag_handles[handle] = prefix if bool(self.tag_handles): - value = yaml_version, self.tag_handles.copy() # type: Any + value: Any = (yaml_version, self.tag_handles.copy()) else: value = yaml_version, None if self.loader is not None and hasattr(self.loader, 'tags'): @@ -324,27 +322,22 @@ class Parser(object): # block_collection ::= block_sequence | block_mapping # flow_collection ::= flow_sequence | flow_mapping - def parse_block_node(self): - # type: () -> Any + def parse_block_node(self) -> Any: return self.parse_node(block=True) - def parse_flow_node(self): - # type: () -> Any + def parse_flow_node(self) -> Any: return self.parse_node() - def parse_block_node_or_indentless_sequence(self): - # type: () -> Any + def parse_block_node_or_indentless_sequence(self) -> Any: return self.parse_node(block=True, indentless_sequence=True) - def transform_tag(self, handle, suffix): - # type: (Any, Any) -> Any + def transform_tag(self, handle: Any, suffix: Any) -> Any: return self.tag_handles[handle] + suffix - def parse_node(self, block=False, indentless_sequence=False): - # type: (bool, bool) -> Any + def parse_node(self, block: bool = False, indentless_sequence: bool = False) -> Any: if self.scanner.check_token(AliasToken): token = self.scanner.get_token() - event = AliasEvent(token.value, token.start_mark, token.end_mark) # type: Any + event: Any = AliasEvent(token.value, token.start_mark, token.end_mark) self.state = self.states.pop() return event @@ -353,6 +346,7 @@ class Parser(object): start_mark = end_mark = tag_mark = None if self.scanner.check_token(AnchorToken): token = self.scanner.get_token() + self.move_token_comment(token) start_mark = token.start_mark end_mark = token.end_mark anchor = token.value @@ -378,13 +372,13 @@ class Parser(object): raise ParserError( 'while parsing a node', start_mark, - 'found undefined tag handle %r' % utf8(handle), + f'found undefined tag handle {handle!r}', tag_mark, ) tag = self.transform_tag(handle, suffix) else: tag = suffix - # if tag == u'!': + # if tag == '!': # raise ParserError("while parsing a node", start_mark, # "found non-specific tag '!'", tag_mark, # "Please check 'http://pyyaml.org/wiki/YAMLNonSpecificTag' @@ -392,13 +386,17 @@ class Parser(object): if start_mark is None: start_mark = end_mark = self.scanner.peek_token().start_mark event = None - implicit = tag is None or tag == u'!' + implicit = tag is None or tag == '!' if indentless_sequence and self.scanner.check_token(BlockEntryToken): comment = None pt = self.scanner.peek_token() - if pt.comment and pt.comment[0]: - comment = [pt.comment[0], []] - pt.comment[0] = None + if self.loader and self.loader.comment_handling is None: + if pt.comment and pt.comment[0]: + comment = [pt.comment[0], []] + pt.comment[0] = None + elif self.loader: + if pt.comment: + comment = pt.comment end_mark = self.scanner.peek_token().end_mark event = SequenceStartEvent( anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment @@ -410,7 +408,7 @@ class Parser(object): token = self.scanner.get_token() # self.scanner.peek_token_same_line_comment(token) end_mark = token.end_mark - if (token.plain and tag is None) or tag == u'!': + if (token.plain and tag is None) or tag == '!': implicit = (True, False) elif tag is None: implicit = (False, True) @@ -462,7 +460,7 @@ class Parser(object): comment = pt.comment # nprint('pt0', type(pt)) if comment is None or comment[1] is None: - comment = pt.split_comment() + comment = pt.split_old_comment() # nprint('pt1', comment) event = SequenceStartEvent( anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment @@ -487,9 +485,9 @@ class Parser(object): node = 'flow' token = self.scanner.peek_token() raise ParserError( - 'while parsing a %s node' % node, + f'while parsing a {node!s} node', start_mark, - 'expected the node content, but found %r' % token.id, + f'expected the node content, but found {token.id!r}', token.start_mark, ) return event @@ -497,19 +495,17 @@ class Parser(object): # block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)* # BLOCK-END - def parse_block_sequence_first_entry(self): - # type: () -> Any + def parse_block_sequence_first_entry(self) -> Any: token = self.scanner.get_token() # move any comment from start token - # token.move_comment(self.scanner.peek_token()) + # self.move_token_comment(token) self.marks.append(token.start_mark) return self.parse_block_sequence_entry() - def parse_block_sequence_entry(self): - # type: () -> Any + def parse_block_sequence_entry(self) -> Any: if self.scanner.check_token(BlockEntryToken): token = self.scanner.get_token() - token.move_comment(self.scanner.peek_token()) + self.move_token_comment(token) if not self.scanner.check_token(BlockEntryToken, BlockEndToken): self.states.append(self.parse_block_sequence_entry) return self.parse_block_node() @@ -521,7 +517,7 @@ class Parser(object): raise ParserError( 'while parsing a block collection', self.marks[-1], - 'expected <block end>, but found %r' % token.id, + f'expected <block end>, but found {token.id!r}', token.start_mark, ) token = self.scanner.get_token() # BlockEndToken @@ -537,11 +533,10 @@ class Parser(object): # - entry # - nested - def parse_indentless_sequence_entry(self): - # type: () -> Any + def parse_indentless_sequence_entry(self) -> Any: if self.scanner.check_token(BlockEntryToken): token = self.scanner.get_token() - token.move_comment(self.scanner.peek_token()) + self.move_token_comment(token) if not self.scanner.check_token( BlockEntryToken, KeyToken, ValueToken, BlockEndToken ): @@ -551,7 +546,14 @@ class Parser(object): self.state = self.parse_indentless_sequence_entry return self.process_empty_scalar(token.end_mark) token = self.scanner.peek_token() - event = SequenceEndEvent(token.start_mark, token.start_mark, comment=token.comment) + c = None + if self.loader and self.loader.comment_handling is None: + c = token.comment + start_mark = token.start_mark + else: + start_mark = self.last_event.end_mark # type: ignore + c = self.distribute_comment(token.comment, start_mark.line) # type: ignore + event = SequenceEndEvent(start_mark, start_mark, comment=c) self.state = self.states.pop() return event @@ -560,17 +562,15 @@ class Parser(object): # (VALUE block_node_or_indentless_sequence?)?)* # BLOCK-END - def parse_block_mapping_first_key(self): - # type: () -> Any + def parse_block_mapping_first_key(self) -> Any: token = self.scanner.get_token() self.marks.append(token.start_mark) return self.parse_block_mapping_key() - def parse_block_mapping_key(self): - # type: () -> Any + def parse_block_mapping_key(self) -> Any: if self.scanner.check_token(KeyToken): token = self.scanner.get_token() - token.move_comment(self.scanner.peek_token()) + self.move_token_comment(token) if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken): self.states.append(self.parse_block_mapping_value) return self.parse_block_node_or_indentless_sequence() @@ -585,26 +585,25 @@ class Parser(object): raise ParserError( 'while parsing a block mapping', self.marks[-1], - 'expected <block end>, but found %r' % token.id, + f'expected <block end>, but found {token.id!r}', token.start_mark, ) token = self.scanner.get_token() - token.move_comment(self.scanner.peek_token()) + self.move_token_comment(token) event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment) self.state = self.states.pop() self.marks.pop() return event - def parse_block_mapping_value(self): - # type: () -> Any + def parse_block_mapping_value(self) -> Any: if self.scanner.check_token(ValueToken): token = self.scanner.get_token() # value token might have post comment move it to e.g. block if self.scanner.check_token(ValueToken): - token.move_comment(self.scanner.peek_token()) + self.move_token_comment(token) else: if not self.scanner.check_token(KeyToken): - token.move_comment(self.scanner.peek_token(), empty=True) + self.move_token_comment(token, empty=True) # else: empty value for this key cannot move token.comment if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken): self.states.append(self.parse_block_mapping_key) @@ -635,14 +634,12 @@ class Parser(object): # For `flow_sequence_entry`, the part `KEY flow_node? (VALUE flow_node?)?` # generate an inline mapping (set syntax). - def parse_flow_sequence_first_entry(self): - # type: () -> Any + def parse_flow_sequence_first_entry(self) -> Any: token = self.scanner.get_token() self.marks.append(token.start_mark) return self.parse_flow_sequence_entry(first=True) - def parse_flow_sequence_entry(self, first=False): - # type: (bool) -> Any + def parse_flow_sequence_entry(self, first: bool = False) -> Any: if not self.scanner.check_token(FlowSequenceEndToken): if not first: if self.scanner.check_token(FlowEntryToken): @@ -652,15 +649,15 @@ class Parser(object): raise ParserError( 'while parsing a flow sequence', self.marks[-1], - "expected ',' or ']', but got %r" % token.id, + f"expected ',' or ']', but got {token.id!r}", token.start_mark, ) if self.scanner.check_token(KeyToken): token = self.scanner.peek_token() - event = MappingStartEvent( + event: Any = MappingStartEvent( None, None, True, token.start_mark, token.end_mark, flow_style=True - ) # type: Any + ) self.state = self.parse_flow_sequence_entry_mapping_key return event elif not self.scanner.check_token(FlowSequenceEndToken): @@ -672,8 +669,7 @@ class Parser(object): self.marks.pop() return event - def parse_flow_sequence_entry_mapping_key(self): - # type: () -> Any + def parse_flow_sequence_entry_mapping_key(self) -> Any: token = self.scanner.get_token() if not self.scanner.check_token(ValueToken, FlowEntryToken, FlowSequenceEndToken): self.states.append(self.parse_flow_sequence_entry_mapping_value) @@ -682,8 +678,7 @@ class Parser(object): self.state = self.parse_flow_sequence_entry_mapping_value return self.process_empty_scalar(token.end_mark) - def parse_flow_sequence_entry_mapping_value(self): - # type: () -> Any + def parse_flow_sequence_entry_mapping_value(self) -> Any: if self.scanner.check_token(ValueToken): token = self.scanner.get_token() if not self.scanner.check_token(FlowEntryToken, FlowSequenceEndToken): @@ -697,8 +692,7 @@ class Parser(object): token = self.scanner.peek_token() return self.process_empty_scalar(token.start_mark) - def parse_flow_sequence_entry_mapping_end(self): - # type: () -> Any + def parse_flow_sequence_entry_mapping_end(self) -> Any: self.state = self.parse_flow_sequence_entry token = self.scanner.peek_token() return MappingEndEvent(token.start_mark, token.start_mark) @@ -709,14 +703,12 @@ class Parser(object): # FLOW-MAPPING-END # flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? - def parse_flow_mapping_first_key(self): - # type: () -> Any + def parse_flow_mapping_first_key(self) -> Any: token = self.scanner.get_token() self.marks.append(token.start_mark) return self.parse_flow_mapping_key(first=True) - def parse_flow_mapping_key(self, first=False): - # type: (Any) -> Any + def parse_flow_mapping_key(self, first: Any = False) -> Any: if not self.scanner.check_token(FlowMappingEndToken): if not first: if self.scanner.check_token(FlowEntryToken): @@ -726,7 +718,7 @@ class Parser(object): raise ParserError( 'while parsing a flow mapping', self.marks[-1], - "expected ',' or '}', but got %r" % token.id, + f"expected ',' or '}}', but got {token.id!r}", token.start_mark, ) if self.scanner.check_token(KeyToken): @@ -753,8 +745,7 @@ class Parser(object): self.marks.pop() return event - def parse_flow_mapping_value(self): - # type: () -> Any + def parse_flow_mapping_value(self) -> Any: if self.scanner.check_token(ValueToken): token = self.scanner.get_token() if not self.scanner.check_token(FlowEntryToken, FlowMappingEndToken): @@ -768,35 +759,89 @@ class Parser(object): token = self.scanner.peek_token() return self.process_empty_scalar(token.start_mark) - def parse_flow_mapping_empty_value(self): - # type: () -> Any + def parse_flow_mapping_empty_value(self) -> Any: self.state = self.parse_flow_mapping_key return self.process_empty_scalar(self.scanner.peek_token().start_mark) - def process_empty_scalar(self, mark, comment=None): - # type: (Any, Any) -> Any + def process_empty_scalar(self, mark: Any, comment: Any = None) -> Any: return ScalarEvent(None, None, (True, False), "", mark, mark, comment=comment) + def move_token_comment( + self, token: Any, nt: Optional[Any] = None, empty: Optional[bool] = False + ) -> Any: + pass + class RoundTripParser(Parser): """roundtrip is a safe loader, that wants to see the unmangled tag""" - def transform_tag(self, handle, suffix): - # type: (Any, Any) -> Any + def transform_tag(self, handle: Any, suffix: Any) -> Any: # return self.tag_handles[handle]+suffix if handle == '!!' and suffix in ( - u'null', - u'bool', - u'int', - u'float', - u'binary', - u'timestamp', - u'omap', - u'pairs', - u'set', - u'str', - u'seq', - u'map', + 'null', + 'bool', + 'int', + 'float', + 'binary', + 'timestamp', + 'omap', + 'pairs', + 'set', + 'str', + 'seq', + 'map', ): return Parser.transform_tag(self, handle, suffix) return handle + suffix + + def move_token_comment( + self, token: Any, nt: Optional[Any] = None, empty: Optional[bool] = False + ) -> Any: + token.move_old_comment(self.scanner.peek_token() if nt is None else nt, empty=empty) + + +class RoundTripParserSC(RoundTripParser): + """roundtrip is a safe loader, that wants to see the unmangled tag""" + + # some of the differences are based on the superclass testing + # if self.loader.comment_handling is not None + + def move_token_comment( + self: Any, token: Any, nt: Any = None, empty: Optional[bool] = False + ) -> None: + token.move_new_comment(self.scanner.peek_token() if nt is None else nt, empty=empty) + + def distribute_comment(self, comment: Any, line: Any) -> Any: + # ToDo, look at indentation of the comment to determine attachment + if comment is None: + return None + if not comment[0]: + return None + if comment[0][0] != line + 1: + nprintf('>>>dcxxx', comment, line) + assert comment[0][0] == line + 1 + # if comment[0] - line > 1: + # return + typ = self.loader.comment_handling & 0b11 + # nprintf('>>>dca', comment, line, typ) + if typ == C_POST: + return None + if typ == C_PRE: + c = [None, None, comment[0]] + comment[0] = None + return c + # nprintf('>>>dcb', comment[0]) + for _idx, cmntidx in enumerate(comment[0]): + # nprintf('>>>dcb', cmntidx) + if isinstance(self.scanner.comments[cmntidx], BlankLineComment): + break + else: + return None # no space found + if _idx == 0: + return None # first line was blank + # nprintf('>>>dcc', idx) + if typ == C_SPLIT_ON_FIRST_BLANK: + c = [None, None, comment[0][:_idx]] + comment[0] = comment[0][_idx:] + return c + raise NotImplementedError # reserved |