diff options
author | Anthon van der Neut <anthon@mnt.org> | 2017-03-21 17:18:18 +0100 |
---|---|---|
committer | Anthon van der Neut <anthon@mnt.org> | 2017-03-21 17:18:18 +0100 |
commit | 9ac44a0873d51d63150b0f1dc1d009b206577a29 (patch) | |
tree | 44fc2ecbdba2a6a63544097d7b9f63d8f87d5aae /parser.py | |
parent | c8568f99215aaa910953287f63a25459e3800dfc (diff) | |
download | ruamel.yaml-9ac44a0873d51d63150b0f1dc1d009b206577a29.tar.gz |
update for mypy --strict, prepare de-inheritance (Loader/Dumper)0.14.0
Diffstat (limited to 'parser.py')
-rw-r--r-- | parser.py | 295 |
1 files changed, 167 insertions, 128 deletions
@@ -74,7 +74,10 @@ from __future__ import absolute_import # need to have full path, as pkg_resources tries to load parser.py in __init__.py # only to not do anything with the package afterwards # and for Jython too -from ruamel.yaml.error import MarkedYAMLError # type: ignore + +from typing import Any, Dict, Optional, List # NOQA + +from ruamel.yaml.error import MarkedYAMLError from ruamel.yaml.tokens import * # NOQA from ruamel.yaml.events import * # NOQA from ruamel.yaml.scanner import Scanner, RoundTripScanner, ScannerError # NOQA @@ -96,20 +99,31 @@ class Parser(object): u'!!': u'tag:yaml.org,2002:', } - def __init__(self): + def __init__(self, loader): + # type: (Any) -> None + self.loader = loader + if self.loader is not None: + self.loader._parser = self self.current_event = None self.yaml_version = None - self.tag_handles = {} - self.states = [] - self.marks = [] - self.state = self.parse_stream_start + self.tag_handles = {} # type: Dict[Any, Any] + self.states = [] # type: List[Any] + self.marks = [] # type: List[Any] + self.state = self.parse_stream_start # type: Any + + @property + def scanner(self): + # type: () -> Any + return self.loader._scanner def dispose(self): + # type: () -> None # Reset the state attributes (to clear self-references) self.states = [] self.state = None def check_event(self, *choices): + # type: (Any) -> bool # Check the type of the next event. if self.current_event is None: if self.state: @@ -123,6 +137,7 @@ class Parser(object): return False def peek_event(self): + # type: () -> Any # Get the next event. if self.current_event is None: if self.state: @@ -130,6 +145,7 @@ class Parser(object): return self.current_event def get_event(self): + # type: () -> Any # Get the next event and proceed further. if self.current_event is None: if self.state: @@ -144,10 +160,10 @@ class Parser(object): # explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END* def parse_stream_start(self): - + # type: () -> Any # Parse the stream start. - token = self.get_token() - token.move_comment(self.peek_token()) + token = self.scanner.get_token() + token.move_comment(self.scanner.peek_token()) event = StreamStartEvent(token.start_mark, token.end_mark, encoding=token.encoding) @@ -157,12 +173,12 @@ class Parser(object): return event def parse_implicit_document_start(self): - + # type: () -> Any # Parse an implicit document. - if not self.check_token(DirectiveToken, DocumentStartToken, - StreamEndToken): + if not self.scanner.check_token(DirectiveToken, DocumentStartToken, + StreamEndToken): self.tag_handles = self.DEFAULT_TAGS - token = self.peek_token() + token = self.scanner.peek_token() start_mark = end_mark = token.start_mark event = DocumentStartEvent(start_mark, end_mark, explicit=False) @@ -177,31 +193,30 @@ class Parser(object): return self.parse_document_start() def parse_document_start(self): - + # type: () -> Any # Parse any extra document end indicators. - while self.check_token(DocumentEndToken): - self.get_token() - + while self.scanner.check_token(DocumentEndToken): + self.scanner.get_token() # Parse an explicit document. - if not self.check_token(StreamEndToken): - token = self.peek_token() + if not self.scanner.check_token(StreamEndToken): + token = self.scanner.peek_token() start_mark = token.start_mark version, tags = self.process_directives() - if not self.check_token(DocumentStartToken): + if not self.scanner.check_token(DocumentStartToken): raise ParserError(None, None, "expected '<document start>', but found %r" - % self.peek_token().id, - self.peek_token().start_mark) - token = self.get_token() + % self.scanner.peek_token().id, + self.scanner.peek_token().start_mark) + token = self.scanner.get_token() end_mark = token.end_mark event = DocumentStartEvent( start_mark, end_mark, - explicit=True, version=version, tags=tags) + explicit=True, version=version, tags=tags) # type: Any self.states.append(self.parse_document_end) self.state = self.parse_document_content else: # Parse the end of the stream. - token = self.get_token() + token = self.scanner.get_token() event = StreamEndEvent(token.start_mark, token.end_mark, comment=token.comment) assert not self.states @@ -210,13 +225,13 @@ class Parser(object): return event def parse_document_end(self): - + # type: () -> Any # Parse the document end. - token = self.peek_token() + token = self.scanner.peek_token() start_mark = end_mark = token.start_mark explicit = False - if self.check_token(DocumentEndToken): - token = self.get_token() + if self.scanner.check_token(DocumentEndToken): + token = self.scanner.get_token() end_mark = token.end_mark explicit = True event = DocumentEndEvent(start_mark, end_mark, explicit=explicit) @@ -227,20 +242,22 @@ class Parser(object): return event def parse_document_content(self): - if self.check_token( + # type: () -> Any + if self.scanner.check_token( DirectiveToken, DocumentStartToken, DocumentEndToken, StreamEndToken): - event = self.process_empty_scalar(self.peek_token().start_mark) + event = self.process_empty_scalar(self.scanner.peek_token().start_mark) self.state = self.states.pop() return event else: return self.parse_block_node() def process_directives(self): + # type: () -> Any self.yaml_version = None self.tag_handles = {} - while self.check_token(DirectiveToken): - token = self.get_token() + while self.scanner.check_token(DirectiveToken): + token = self.scanner.get_token() if token.name == u'YAML': if self.yaml_version is not None: raise ParserError( @@ -261,8 +278,8 @@ class Parser(object): "duplicate tag handle %r" % utf8(handle), token.start_mark) self.tag_handles[handle] = prefix - if self.tag_handles: - value = self.yaml_version, self.tag_handles.copy() + if bool(self.tag_handles): + value = self.yaml_version, self.tag_handles.copy() # type: Any else: value = self.yaml_version, None for key in self.DEFAULT_TAGS: @@ -287,43 +304,48 @@ class Parser(object): # flow_collection ::= flow_sequence | flow_mapping def parse_block_node(self): + # type: () -> Any return self.parse_node(block=True) def parse_flow_node(self): + # type: () -> Any return self.parse_node() def parse_block_node_or_indentless_sequence(self): + # type: () -> Any return self.parse_node(block=True, indentless_sequence=True) def transform_tag(self, handle, suffix): + # type: (Any, Any) -> Any return self.tag_handles[handle] + suffix def parse_node(self, block=False, indentless_sequence=False): - if self.check_token(AliasToken): - token = self.get_token() - event = AliasEvent(token.value, token.start_mark, token.end_mark) + # type: (bool, bool) -> Any + if self.scanner.check_token(AliasToken): + token = self.scanner.get_token() + event = AliasEvent(token.value, token.start_mark, token.end_mark) # type: Any self.state = self.states.pop() else: anchor = None tag = None start_mark = end_mark = tag_mark = None - if self.check_token(AnchorToken): - token = self.get_token() + if self.scanner.check_token(AnchorToken): + token = self.scanner.get_token() start_mark = token.start_mark end_mark = token.end_mark anchor = token.value - if self.check_token(TagToken): - token = self.get_token() + if self.scanner.check_token(TagToken): + token = self.scanner.get_token() tag_mark = token.start_mark end_mark = token.end_mark tag = token.value - elif self.check_token(TagToken): - token = self.get_token() + elif self.scanner.check_token(TagToken): + token = self.scanner.get_token() start_mark = tag_mark = token.start_mark end_mark = token.end_mark tag = token.value - if self.check_token(AnchorToken): - token = self.get_token() + if self.scanner.check_token(AnchorToken): + token = self.scanner.get_token() end_mark = token.end_mark anchor = token.value if tag is not None: @@ -343,17 +365,17 @@ class Parser(object): # "Please check 'http://pyyaml.org/wiki/YAMLNonSpecificTag' # and share your opinion.") if start_mark is None: - start_mark = end_mark = self.peek_token().start_mark + start_mark = end_mark = self.scanner.peek_token().start_mark event = None implicit = (tag is None or tag == u'!') - if indentless_sequence and self.check_token(BlockEntryToken): - end_mark = self.peek_token().end_mark + if indentless_sequence and self.scanner.check_token(BlockEntryToken): + end_mark = self.scanner.peek_token().end_mark event = SequenceStartEvent(anchor, tag, implicit, start_mark, end_mark) self.state = self.parse_indentless_sequence_entry else: - if self.check_token(ScalarToken): - token = self.get_token() + if self.scanner.check_token(ScalarToken): + token = self.scanner.get_token() end_mark = token.end_mark if (token.plain and tag is None) or tag == u'!': implicit = (True, False) @@ -367,23 +389,23 @@ class Parser(object): comment=token.comment ) self.state = self.states.pop() - elif self.check_token(FlowSequenceStartToken): - end_mark = self.peek_token().end_mark + elif self.scanner.check_token(FlowSequenceStartToken): + end_mark = self.scanner.peek_token().end_mark event = SequenceStartEvent( anchor, tag, implicit, start_mark, end_mark, flow_style=True) self.state = self.parse_flow_sequence_first_entry - elif self.check_token(FlowMappingStartToken): - end_mark = self.peek_token().end_mark + elif self.scanner.check_token(FlowMappingStartToken): + end_mark = self.scanner.peek_token().end_mark event = MappingStartEvent( anchor, tag, implicit, start_mark, end_mark, flow_style=True) self.state = self.parse_flow_mapping_first_key - elif block and self.check_token(BlockSequenceStartToken): - end_mark = self.peek_token().start_mark + elif block and self.scanner.check_token(BlockSequenceStartToken): + end_mark = self.scanner.peek_token().start_mark # should inserting the comment be dependent on the # indentation? - pt = self.peek_token() + pt = self.scanner.peek_token() comment = pt.comment # print('pt0', type(pt)) if comment is None or comment[1] is None: @@ -395,9 +417,9 @@ class Parser(object): comment=comment, ) self.state = self.parse_block_sequence_first_entry - elif block and self.check_token(BlockMappingStartToken): - end_mark = self.peek_token().start_mark - comment = self.peek_token().comment + elif block and self.scanner.check_token(BlockMappingStartToken): + end_mark = self.scanner.peek_token().start_mark + comment = self.scanner.peek_token().comment event = MappingStartEvent( anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment) @@ -413,7 +435,7 @@ class Parser(object): node = 'block' else: node = 'flow' - token = self.peek_token() + token = self.scanner.peek_token() raise ParserError( "while parsing a %s node" % node, start_mark, "expected the node content, but found %r" % token.id, @@ -424,29 +446,31 @@ class Parser(object): # BLOCK-END def parse_block_sequence_first_entry(self): - token = self.get_token() + # type: () -> Any + token = self.scanner.get_token() # move any comment from start token - # token.move_comment(self.peek_token()) + # token.move_comment(self.scanner.peek_token()) self.marks.append(token.start_mark) return self.parse_block_sequence_entry() def parse_block_sequence_entry(self): - if self.check_token(BlockEntryToken): - token = self.get_token() - token.move_comment(self.peek_token()) - if not self.check_token(BlockEntryToken, BlockEndToken): + # type: () -> Any + if self.scanner.check_token(BlockEntryToken): + token = self.scanner.get_token() + token.move_comment(self.scanner.peek_token()) + if not self.scanner.check_token(BlockEntryToken, BlockEndToken): self.states.append(self.parse_block_sequence_entry) return self.parse_block_node() else: self.state = self.parse_block_sequence_entry return self.process_empty_scalar(token.end_mark) - if not self.check_token(BlockEndToken): - token = self.peek_token() + if not self.scanner.check_token(BlockEndToken): + token = self.scanner.peek_token() raise ParserError( "while parsing a block collection", self.marks[-1], "expected <block end>, but found %r" % token.id, token.start_mark) - token = self.get_token() # BlockEndToken + token = self.scanner.get_token() # BlockEndToken event = SequenceEndEvent(token.start_mark, token.end_mark, comment=token.comment) self.state = self.states.pop() @@ -461,17 +485,18 @@ class Parser(object): # - nested def parse_indentless_sequence_entry(self): - if self.check_token(BlockEntryToken): - token = self.get_token() - token.move_comment(self.peek_token()) - if not self.check_token(BlockEntryToken, - KeyToken, ValueToken, BlockEndToken): + # type: () -> Any + if self.scanner.check_token(BlockEntryToken): + token = self.scanner.get_token() + token.move_comment(self.scanner.peek_token()) + if not self.scanner.check_token(BlockEntryToken, + KeyToken, ValueToken, BlockEndToken): self.states.append(self.parse_indentless_sequence_entry) return self.parse_block_node() else: self.state = self.parse_indentless_sequence_entry return self.process_empty_scalar(token.end_mark) - token = self.peek_token() + token = self.scanner.peek_token() event = SequenceEndEvent(token.start_mark, token.start_mark, comment=token.comment) self.state = self.states.pop() @@ -483,28 +508,30 @@ class Parser(object): # BLOCK-END def parse_block_mapping_first_key(self): - token = self.get_token() + # type: () -> Any + token = self.scanner.get_token() self.marks.append(token.start_mark) return self.parse_block_mapping_key() def parse_block_mapping_key(self): - if self.check_token(KeyToken): - token = self.get_token() - token.move_comment(self.peek_token()) - if not self.check_token(KeyToken, ValueToken, BlockEndToken): + # type: () -> Any + if self.scanner.check_token(KeyToken): + token = self.scanner.get_token() + token.move_comment(self.scanner.peek_token()) + if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken): self.states.append(self.parse_block_mapping_value) return self.parse_block_node_or_indentless_sequence() else: self.state = self.parse_block_mapping_value return self.process_empty_scalar(token.end_mark) - if not self.check_token(BlockEndToken): - token = self.peek_token() + if not self.scanner.check_token(BlockEndToken): + token = self.scanner.peek_token() raise ParserError( "while parsing a block mapping", self.marks[-1], "expected <block end>, but found %r" % token.id, token.start_mark) - token = self.get_token() - token.move_comment(self.peek_token()) + token = self.scanner.get_token() + token.move_comment(self.scanner.peek_token()) event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment) self.state = self.states.pop() @@ -512,23 +539,24 @@ class Parser(object): return event def parse_block_mapping_value(self): - if self.check_token(ValueToken): - token = self.get_token() + # type: () -> Any + if self.scanner.check_token(ValueToken): + token = self.scanner.get_token() # value token might have post comment move it to e.g. block - if self.check_token(ValueToken): - token.move_comment(self.peek_token()) + if self.scanner.check_token(ValueToken): + token.move_comment(self.scanner.peek_token()) else: - token.move_comment(self.peek_token(), empty=True) - if not self.check_token(KeyToken, ValueToken, BlockEndToken): + token.move_comment(self.scanner.peek_token(), empty=True) + if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken): self.states.append(self.parse_block_mapping_key) return self.parse_block_node_or_indentless_sequence() else: self.state = self.parse_block_mapping_key return self.process_empty_scalar(token.end_mark, - comment=self.peek_token().comment) + comment=self.scanner.peek_token().comment) else: self.state = self.parse_block_mapping_key - token = self.peek_token() + token = self.scanner.peek_token() return self.process_empty_scalar(token.start_mark) # flow_sequence ::= FLOW-SEQUENCE-START @@ -543,33 +571,35 @@ class Parser(object): # generate an inline mapping (set syntax). def parse_flow_sequence_first_entry(self): - token = self.get_token() + # type: () -> Any + token = self.scanner.get_token() self.marks.append(token.start_mark) return self.parse_flow_sequence_entry(first=True) def parse_flow_sequence_entry(self, first=False): - if not self.check_token(FlowSequenceEndToken): + # type: (bool) -> Any + if not self.scanner.check_token(FlowSequenceEndToken): if not first: - if self.check_token(FlowEntryToken): - self.get_token() + if self.scanner.check_token(FlowEntryToken): + self.scanner.get_token() else: - token = self.peek_token() + token = self.scanner.peek_token() raise ParserError( "while parsing a flow sequence", self.marks[-1], "expected ',' or ']', but got %r" % token.id, token.start_mark) - if self.check_token(KeyToken): - token = self.peek_token() + if self.scanner.check_token(KeyToken): + token = self.scanner.peek_token() event = MappingStartEvent(None, None, True, token.start_mark, token.end_mark, - flow_style=True) + flow_style=True) # type: Any self.state = self.parse_flow_sequence_entry_mapping_key return event - elif not self.check_token(FlowSequenceEndToken): + elif not self.scanner.check_token(FlowSequenceEndToken): self.states.append(self.parse_flow_sequence_entry) return self.parse_flow_node() - token = self.get_token() + token = self.scanner.get_token() event = SequenceEndEvent(token.start_mark, token.end_mark, comment=token.comment) self.state = self.states.pop() @@ -577,9 +607,10 @@ class Parser(object): return event def parse_flow_sequence_entry_mapping_key(self): - token = self.get_token() - if not self.check_token(ValueToken, - FlowEntryToken, FlowSequenceEndToken): + # type: () -> Any + token = self.scanner.get_token() + if not self.scanner.check_token(ValueToken, + FlowEntryToken, FlowSequenceEndToken): self.states.append(self.parse_flow_sequence_entry_mapping_value) return self.parse_flow_node() else: @@ -587,9 +618,10 @@ class Parser(object): return self.process_empty_scalar(token.end_mark) def parse_flow_sequence_entry_mapping_value(self): - if self.check_token(ValueToken): - token = self.get_token() - if not self.check_token(FlowEntryToken, FlowSequenceEndToken): + # type: () -> Any + if self.scanner.check_token(ValueToken): + token = self.scanner.get_token() + if not self.scanner.check_token(FlowEntryToken, FlowSequenceEndToken): self.states.append(self.parse_flow_sequence_entry_mapping_end) return self.parse_flow_node() else: @@ -597,12 +629,13 @@ class Parser(object): return self.process_empty_scalar(token.end_mark) else: self.state = self.parse_flow_sequence_entry_mapping_end - token = self.peek_token() + token = self.scanner.peek_token() return self.process_empty_scalar(token.start_mark) def parse_flow_sequence_entry_mapping_end(self): + # type: () -> Any self.state = self.parse_flow_sequence_entry - token = self.peek_token() + token = self.scanner.peek_token() return MappingEndEvent(token.start_mark, token.start_mark) # flow_mapping ::= FLOW-MAPPING-START @@ -612,34 +645,36 @@ class Parser(object): # flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? def parse_flow_mapping_first_key(self): - token = self.get_token() + # type: () -> Any + token = self.scanner.get_token() self.marks.append(token.start_mark) return self.parse_flow_mapping_key(first=True) def parse_flow_mapping_key(self, first=False): - if not self.check_token(FlowMappingEndToken): + # type: (Any) -> Any + if not self.scanner.check_token(FlowMappingEndToken): if not first: - if self.check_token(FlowEntryToken): - self.get_token() + if self.scanner.check_token(FlowEntryToken): + self.scanner.get_token() else: - token = self.peek_token() + token = self.scanner.peek_token() raise ParserError( "while parsing a flow mapping", self.marks[-1], "expected ',' or '}', but got %r" % token.id, token.start_mark) - if self.check_token(KeyToken): - token = self.get_token() - if not self.check_token(ValueToken, - FlowEntryToken, FlowMappingEndToken): + if self.scanner.check_token(KeyToken): + token = self.scanner.get_token() + if not self.scanner.check_token(ValueToken, + FlowEntryToken, FlowMappingEndToken): self.states.append(self.parse_flow_mapping_value) return self.parse_flow_node() else: self.state = self.parse_flow_mapping_value return self.process_empty_scalar(token.end_mark) - elif not self.check_token(FlowMappingEndToken): + elif not self.scanner.check_token(FlowMappingEndToken): self.states.append(self.parse_flow_mapping_empty_value) return self.parse_flow_node() - token = self.get_token() + token = self.scanner.get_token() event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment) self.state = self.states.pop() @@ -647,9 +682,10 @@ class Parser(object): return event def parse_flow_mapping_value(self): - if self.check_token(ValueToken): - token = self.get_token() - if not self.check_token(FlowEntryToken, FlowMappingEndToken): + # type: () -> Any + if self.scanner.check_token(ValueToken): + token = self.scanner.get_token() + if not self.scanner.check_token(FlowEntryToken, FlowMappingEndToken): self.states.append(self.parse_flow_mapping_key) return self.parse_flow_node() else: @@ -657,20 +693,23 @@ class Parser(object): return self.process_empty_scalar(token.end_mark) else: self.state = self.parse_flow_mapping_key - token = self.peek_token() + token = self.scanner.peek_token() return self.process_empty_scalar(token.start_mark) def parse_flow_mapping_empty_value(self): + # type: () -> Any self.state = self.parse_flow_mapping_key - return self.process_empty_scalar(self.peek_token().start_mark) + return self.process_empty_scalar(self.scanner.peek_token().start_mark) def process_empty_scalar(self, mark, comment=None): + # type: (Any, Any) -> Any return ScalarEvent(None, None, (True, False), u'', mark, mark, comment=comment) class RoundTripParser(Parser): """roundtrip is a safe loader, that wants to see the unmangled tag""" def transform_tag(self, handle, suffix): + # type: (Any, Any) -> Any # return self.tag_handles[handle]+suffix if handle == '!!' and suffix in (u'null', u'bool', u'int', u'float', u'binary', u'timestamp', u'omap', u'pairs', u'set', u'str', |