diff options
Diffstat (limited to 'parser.py')
-rw-r--r-- | parser.py | 205 |
1 files changed, 115 insertions, 90 deletions
@@ -77,10 +77,10 @@ from __future__ import absolute_import from ruamel.yaml.error import MarkedYAMLError -from ruamel.yaml.tokens import * # NOQA -from ruamel.yaml.events import * # NOQA -from ruamel.yaml.scanner import Scanner, RoundTripScanner, ScannerError # NOQA -from ruamel.yaml.compat import utf8, nprint # NOQA +from ruamel.yaml.tokens import * # NOQA +from ruamel.yaml.events import * # NOQA +from ruamel.yaml.scanner import Scanner, RoundTripScanner, ScannerError # NOQA +from ruamel.yaml.compat import utf8, nprint # NOQA if False: # MYPY from typing import Any, Dict, Optional, List # NOQA @@ -96,10 +96,7 @@ class Parser(object): # Since writing a recursive-descendant parser is a straightforward task, we # do not give many comments here. - DEFAULT_TAGS = { - u'!': u'!', - u'!!': u'tag:yaml.org,2002:', - } + DEFAULT_TAGS = {u'!': u'!', u'!!': u'tag:yaml.org,2002:'} def __init__(self, loader): # type: (Any) -> None @@ -115,7 +112,7 @@ class Parser(object): self.yaml_version = None self.tag_handles = {} # type: Dict[Any, Any] self.states = [] # type: List[Any] - self.marks = [] # type: List[Any] + self.marks = [] # type: List[Any] self.state = self.parse_stream_start # type: Any def dispose(self): @@ -178,8 +175,7 @@ class Parser(object): # Parse the stream start. token = self.scanner.get_token() token.move_comment(self.scanner.peek_token()) - event = StreamStartEvent(token.start_mark, token.end_mark, - encoding=token.encoding) + event = StreamStartEvent(token.start_mark, token.end_mark, encoding=token.encoding) # Prepare the next state. self.state = self.parse_implicit_document_start @@ -189,13 +185,11 @@ class Parser(object): def parse_implicit_document_start(self): # type: () -> Any # Parse an implicit document. - if not self.scanner.check_token(DirectiveToken, DocumentStartToken, - StreamEndToken): + if not self.scanner.check_token(DirectiveToken, DocumentStartToken, StreamEndToken): self.tag_handles = self.DEFAULT_TAGS token = self.scanner.peek_token() start_mark = end_mark = token.start_mark - event = DocumentStartEvent(start_mark, end_mark, - explicit=False) + event = DocumentStartEvent(start_mark, end_mark, explicit=False) # Prepare the next state. self.states.append(self.parse_document_end) @@ -217,22 +211,23 @@ class Parser(object): start_mark = token.start_mark version, tags = self.process_directives() if not self.scanner.check_token(DocumentStartToken): - raise ParserError(None, None, - "expected '<document start>', but found %r" - % self.scanner.peek_token().id, - self.scanner.peek_token().start_mark) + raise ParserError( + None, + None, + "expected '<document start>', but found %r" % self.scanner.peek_token().id, + self.scanner.peek_token().start_mark, + ) token = self.scanner.get_token() end_mark = token.end_mark event = DocumentStartEvent( - start_mark, end_mark, - explicit=True, version=version, tags=tags) # type: Any + start_mark, end_mark, explicit=True, version=version, tags=tags + ) # type: Any self.states.append(self.parse_document_end) self.state = self.parse_document_content else: # Parse the end of the stream. token = self.scanner.get_token() - event = StreamEndEvent(token.start_mark, token.end_mark, - comment=token.comment) + event = StreamEndEvent(token.start_mark, token.end_mark, comment=token.comment) assert not self.states assert not self.marks self.state = None @@ -261,8 +256,8 @@ class Parser(object): def parse_document_content(self): # type: () -> Any if self.scanner.check_token( - DirectiveToken, - DocumentStartToken, DocumentEndToken, StreamEndToken): + DirectiveToken, DocumentStartToken, DocumentEndToken, StreamEndToken + ): event = self.process_empty_scalar(self.scanner.peek_token().start_mark) self.state = self.states.pop() return event @@ -278,22 +273,23 @@ class Parser(object): if token.name == u'YAML': if self.yaml_version is not None: raise ParserError( - None, None, - "found duplicate YAML directive", token.start_mark) + None, None, 'found duplicate YAML directive', token.start_mark + ) major, minor = token.value if major != 1: raise ParserError( - None, None, - "found incompatible YAML document (version 1.* is " - "required)", - token.start_mark) + None, + None, + 'found incompatible YAML document (version 1.* is ' 'required)', + token.start_mark, + ) self.yaml_version = token.value elif token.name == u'TAG': handle, prefix = token.value if handle in self.tag_handles: - raise ParserError(None, None, - "duplicate tag handle %r" % utf8(handle), - token.start_mark) + raise ParserError( + None, None, 'duplicate tag handle %r' % utf8(handle), token.start_mark + ) self.tag_handles[handle] = prefix if bool(self.tag_handles): value = self.yaml_version, self.tag_handles.copy() # type: Any @@ -372,9 +368,11 @@ class Parser(object): if handle is not None: if handle not in self.tag_handles: raise ParserError( - "while parsing a node", start_mark, - "found undefined tag handle %r" % utf8(handle), - tag_mark) + 'while parsing a node', + start_mark, + 'found undefined tag handle %r' % utf8(handle), + tag_mark, + ) tag = self.transform_tag(handle, suffix) else: tag = suffix @@ -386,7 +384,7 @@ class Parser(object): if start_mark is None: start_mark = end_mark = self.scanner.peek_token().start_mark event = None - implicit = (tag is None or tag == u'!') + implicit = tag is None or tag == u'!' if indentless_sequence and self.scanner.check_token(BlockEntryToken): comment = None pt = self.scanner.peek_token() @@ -394,8 +392,9 @@ class Parser(object): comment = [pt.comment[0], []] pt.comment[0] = None end_mark = self.scanner.peek_token().end_mark - event = SequenceStartEvent(anchor, tag, implicit, start_mark, end_mark, - flow_style=False, comment=comment) + event = SequenceStartEvent( + anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment + ) self.state = self.parse_indentless_sequence_entry return event @@ -411,23 +410,34 @@ class Parser(object): implicit = (False, False) # nprint('se', token.value, token.comment) event = ScalarEvent( - anchor, tag, implicit, token.value, - start_mark, end_mark, style=token.style, - comment=token.comment + anchor, + tag, + implicit, + token.value, + start_mark, + end_mark, + style=token.style, + comment=token.comment, ) self.state = self.states.pop() elif self.scanner.check_token(FlowSequenceStartToken): pt = self.scanner.peek_token() end_mark = pt.end_mark event = SequenceStartEvent( - anchor, tag, implicit, - start_mark, end_mark, flow_style=True, comment=pt.comment) + anchor, + tag, + implicit, + start_mark, + end_mark, + flow_style=True, + comment=pt.comment, + ) self.state = self.parse_flow_sequence_first_entry elif self.scanner.check_token(FlowMappingStartToken): end_mark = self.scanner.peek_token().end_mark event = MappingStartEvent( - anchor, tag, implicit, - start_mark, end_mark, flow_style=True) + anchor, tag, implicit, start_mark, end_mark, flow_style=True + ) self.state = self.parse_flow_mapping_first_key elif block and self.scanner.check_token(BlockSequenceStartToken): end_mark = self.scanner.peek_token().start_mark @@ -440,23 +450,20 @@ class Parser(object): comment = pt.split_comment() # nprint('pt1', comment) event = SequenceStartEvent( - anchor, tag, implicit, start_mark, end_mark, - flow_style=False, - comment=comment, + anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment ) self.state = self.parse_block_sequence_first_entry elif block and self.scanner.check_token(BlockMappingStartToken): end_mark = self.scanner.peek_token().start_mark comment = self.scanner.peek_token().comment event = MappingStartEvent( - anchor, tag, implicit, start_mark, end_mark, - flow_style=False, comment=comment) + anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment + ) self.state = self.parse_block_mapping_first_key elif anchor is not None or tag is not None: # Empty scalars are allowed even if a tag or an anchor is # specified. - event = ScalarEvent(anchor, tag, (implicit, False), u'', - start_mark, end_mark) + event = ScalarEvent(anchor, tag, (implicit, False), "", start_mark, end_mark) self.state = self.states.pop() else: if block: @@ -465,9 +472,11 @@ class Parser(object): node = 'flow' token = self.scanner.peek_token() raise ParserError( - "while parsing a %s node" % node, start_mark, - "expected the node content, but found %r" % token.id, - token.start_mark) + 'while parsing a %s node' % node, + start_mark, + 'expected the node content, but found %r' % token.id, + token.start_mark, + ) return event # block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)* @@ -495,12 +504,13 @@ class Parser(object): if not self.scanner.check_token(BlockEndToken): token = self.scanner.peek_token() raise ParserError( - "while parsing a block collection", self.marks[-1], - "expected <block end>, but found %r" % - token.id, token.start_mark) + 'while parsing a block collection', + self.marks[-1], + 'expected <block end>, but found %r' % token.id, + token.start_mark, + ) token = self.scanner.get_token() # BlockEndToken - event = SequenceEndEvent(token.start_mark, token.end_mark, - comment=token.comment) + event = SequenceEndEvent(token.start_mark, token.end_mark, comment=token.comment) self.state = self.states.pop() self.marks.pop() return event @@ -517,16 +527,16 @@ class Parser(object): if self.scanner.check_token(BlockEntryToken): token = self.scanner.get_token() token.move_comment(self.scanner.peek_token()) - if not self.scanner.check_token(BlockEntryToken, - KeyToken, ValueToken, BlockEndToken): + if not self.scanner.check_token( + BlockEntryToken, KeyToken, ValueToken, BlockEndToken + ): self.states.append(self.parse_indentless_sequence_entry) return self.parse_block_node() else: self.state = self.parse_indentless_sequence_entry return self.process_empty_scalar(token.end_mark) token = self.scanner.peek_token() - event = SequenceEndEvent(token.start_mark, token.start_mark, - comment=token.comment) + event = SequenceEndEvent(token.start_mark, token.start_mark, comment=token.comment) self.state = self.states.pop() return event @@ -556,13 +566,14 @@ class Parser(object): if not self.scanner.check_token(BlockEndToken): token = self.scanner.peek_token() raise ParserError( - "while parsing a block mapping", self.marks[-1], - "expected <block end>, but found %r" % token.id, - token.start_mark) + 'while parsing a block mapping', + self.marks[-1], + 'expected <block end>, but found %r' % token.id, + token.start_mark, + ) token = self.scanner.get_token() token.move_comment(self.scanner.peek_token()) - event = MappingEndEvent(token.start_mark, token.end_mark, - comment=token.comment) + event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment) self.state = self.states.pop() self.marks.pop() return event @@ -620,23 +631,24 @@ class Parser(object): else: token = self.scanner.peek_token() raise ParserError( - "while parsing a flow sequence", self.marks[-1], + 'while parsing a flow sequence', + self.marks[-1], "expected ',' or ']', but got %r" % token.id, - token.start_mark) + token.start_mark, + ) if self.scanner.check_token(KeyToken): token = self.scanner.peek_token() - event = MappingStartEvent(None, None, True, - token.start_mark, token.end_mark, - flow_style=True) # type: Any + event = MappingStartEvent( + None, None, True, token.start_mark, token.end_mark, flow_style=True + ) # type: Any self.state = self.parse_flow_sequence_entry_mapping_key return event elif not self.scanner.check_token(FlowSequenceEndToken): self.states.append(self.parse_flow_sequence_entry) return self.parse_flow_node() token = self.scanner.get_token() - event = SequenceEndEvent(token.start_mark, token.end_mark, - comment=token.comment) + event = SequenceEndEvent(token.start_mark, token.end_mark, comment=token.comment) self.state = self.states.pop() self.marks.pop() return event @@ -644,8 +656,7 @@ class Parser(object): def parse_flow_sequence_entry_mapping_key(self): # type: () -> Any token = self.scanner.get_token() - if not self.scanner.check_token(ValueToken, - FlowEntryToken, FlowSequenceEndToken): + if not self.scanner.check_token(ValueToken, FlowEntryToken, FlowSequenceEndToken): self.states.append(self.parse_flow_sequence_entry_mapping_value) return self.parse_flow_node() else: @@ -694,13 +705,16 @@ class Parser(object): else: token = self.scanner.peek_token() raise ParserError( - "while parsing a flow mapping", self.marks[-1], + 'while parsing a flow mapping', + self.marks[-1], "expected ',' or '}', but got %r" % token.id, - token.start_mark) + token.start_mark, + ) if self.scanner.check_token(KeyToken): token = self.scanner.get_token() - if not self.scanner.check_token(ValueToken, - FlowEntryToken, FlowMappingEndToken): + if not self.scanner.check_token( + ValueToken, FlowEntryToken, FlowMappingEndToken + ): self.states.append(self.parse_flow_mapping_value) return self.parse_flow_node() else: @@ -710,8 +724,7 @@ class Parser(object): self.states.append(self.parse_flow_mapping_empty_value) return self.parse_flow_node() token = self.scanner.get_token() - event = MappingEndEvent(token.start_mark, token.end_mark, - comment=token.comment) + event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment) self.state = self.states.pop() self.marks.pop() return event @@ -738,16 +751,28 @@ class Parser(object): def process_empty_scalar(self, mark, comment=None): # type: (Any, Any) -> Any - return ScalarEvent(None, None, (True, False), u'', mark, mark, comment=comment) + return ScalarEvent(None, None, (True, False), "", mark, mark, comment=comment) class RoundTripParser(Parser): """roundtrip is a safe loader, that wants to see the unmangled tag""" + def transform_tag(self, handle, suffix): # type: (Any, Any) -> Any # return self.tag_handles[handle]+suffix - if handle == '!!' and suffix in (u'null', u'bool', u'int', u'float', u'binary', - u'timestamp', u'omap', u'pairs', u'set', u'str', - u'seq', u'map'): + if handle == '!!' and suffix in ( + u'null', + u'bool', + u'int', + u'float', + u'binary', + u'timestamp', + u'omap', + u'pairs', + u'set', + u'str', + u'seq', + u'map', + ): return Parser.transform_tag(self, handle, suffix) return handle + suffix |