summaryrefslogtreecommitdiff
path: root/parser.py
diff options
context:
space:
mode:
authorAnthon van der Neut <anthon@mnt.org>2017-03-21 17:18:18 +0100
committerAnthon van der Neut <anthon@mnt.org>2017-03-21 17:18:18 +0100
commit9ac44a0873d51d63150b0f1dc1d009b206577a29 (patch)
tree44fc2ecbdba2a6a63544097d7b9f63d8f87d5aae /parser.py
parentc8568f99215aaa910953287f63a25459e3800dfc (diff)
downloadruamel.yaml-9ac44a0873d51d63150b0f1dc1d009b206577a29.tar.gz
update for mypy --strict, prepare de-inheritance (Loader/Dumper)0.14.0
Diffstat (limited to 'parser.py')
-rw-r--r--parser.py295
1 files changed, 167 insertions, 128 deletions
diff --git a/parser.py b/parser.py
index dc5d57f..653eb68 100644
--- a/parser.py
+++ b/parser.py
@@ -74,7 +74,10 @@ from __future__ import absolute_import
# need to have full path, as pkg_resources tries to load parser.py in __init__.py
# only to not do anything with the package afterwards
# and for Jython too
-from ruamel.yaml.error import MarkedYAMLError # type: ignore
+
+from typing import Any, Dict, Optional, List # NOQA
+
+from ruamel.yaml.error import MarkedYAMLError
from ruamel.yaml.tokens import * # NOQA
from ruamel.yaml.events import * # NOQA
from ruamel.yaml.scanner import Scanner, RoundTripScanner, ScannerError # NOQA
@@ -96,20 +99,31 @@ class Parser(object):
u'!!': u'tag:yaml.org,2002:',
}
- def __init__(self):
+ def __init__(self, loader):
+ # type: (Any) -> None
+ self.loader = loader
+ if self.loader is not None:
+ self.loader._parser = self
self.current_event = None
self.yaml_version = None
- self.tag_handles = {}
- self.states = []
- self.marks = []
- self.state = self.parse_stream_start
+ self.tag_handles = {} # type: Dict[Any, Any]
+ self.states = [] # type: List[Any]
+ self.marks = [] # type: List[Any]
+ self.state = self.parse_stream_start # type: Any
+
+ @property
+ def scanner(self):
+ # type: () -> Any
+ return self.loader._scanner
def dispose(self):
+ # type: () -> None
# Reset the state attributes (to clear self-references)
self.states = []
self.state = None
def check_event(self, *choices):
+ # type: (Any) -> bool
# Check the type of the next event.
if self.current_event is None:
if self.state:
@@ -123,6 +137,7 @@ class Parser(object):
return False
def peek_event(self):
+ # type: () -> Any
# Get the next event.
if self.current_event is None:
if self.state:
@@ -130,6 +145,7 @@ class Parser(object):
return self.current_event
def get_event(self):
+ # type: () -> Any
# Get the next event and proceed further.
if self.current_event is None:
if self.state:
@@ -144,10 +160,10 @@ class Parser(object):
# explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END*
def parse_stream_start(self):
-
+ # type: () -> Any
# Parse the stream start.
- token = self.get_token()
- token.move_comment(self.peek_token())
+ token = self.scanner.get_token()
+ token.move_comment(self.scanner.peek_token())
event = StreamStartEvent(token.start_mark, token.end_mark,
encoding=token.encoding)
@@ -157,12 +173,12 @@ class Parser(object):
return event
def parse_implicit_document_start(self):
-
+ # type: () -> Any
# Parse an implicit document.
- if not self.check_token(DirectiveToken, DocumentStartToken,
- StreamEndToken):
+ if not self.scanner.check_token(DirectiveToken, DocumentStartToken,
+ StreamEndToken):
self.tag_handles = self.DEFAULT_TAGS
- token = self.peek_token()
+ token = self.scanner.peek_token()
start_mark = end_mark = token.start_mark
event = DocumentStartEvent(start_mark, end_mark,
explicit=False)
@@ -177,31 +193,30 @@ class Parser(object):
return self.parse_document_start()
def parse_document_start(self):
-
+ # type: () -> Any
# Parse any extra document end indicators.
- while self.check_token(DocumentEndToken):
- self.get_token()
-
+ while self.scanner.check_token(DocumentEndToken):
+ self.scanner.get_token()
# Parse an explicit document.
- if not self.check_token(StreamEndToken):
- token = self.peek_token()
+ if not self.scanner.check_token(StreamEndToken):
+ token = self.scanner.peek_token()
start_mark = token.start_mark
version, tags = self.process_directives()
- if not self.check_token(DocumentStartToken):
+ if not self.scanner.check_token(DocumentStartToken):
raise ParserError(None, None,
"expected '<document start>', but found %r"
- % self.peek_token().id,
- self.peek_token().start_mark)
- token = self.get_token()
+ % self.scanner.peek_token().id,
+ self.scanner.peek_token().start_mark)
+ token = self.scanner.get_token()
end_mark = token.end_mark
event = DocumentStartEvent(
start_mark, end_mark,
- explicit=True, version=version, tags=tags)
+ explicit=True, version=version, tags=tags) # type: Any
self.states.append(self.parse_document_end)
self.state = self.parse_document_content
else:
# Parse the end of the stream.
- token = self.get_token()
+ token = self.scanner.get_token()
event = StreamEndEvent(token.start_mark, token.end_mark,
comment=token.comment)
assert not self.states
@@ -210,13 +225,13 @@ class Parser(object):
return event
def parse_document_end(self):
-
+ # type: () -> Any
# Parse the document end.
- token = self.peek_token()
+ token = self.scanner.peek_token()
start_mark = end_mark = token.start_mark
explicit = False
- if self.check_token(DocumentEndToken):
- token = self.get_token()
+ if self.scanner.check_token(DocumentEndToken):
+ token = self.scanner.get_token()
end_mark = token.end_mark
explicit = True
event = DocumentEndEvent(start_mark, end_mark, explicit=explicit)
@@ -227,20 +242,22 @@ class Parser(object):
return event
def parse_document_content(self):
- if self.check_token(
+ # type: () -> Any
+ if self.scanner.check_token(
DirectiveToken,
DocumentStartToken, DocumentEndToken, StreamEndToken):
- event = self.process_empty_scalar(self.peek_token().start_mark)
+ event = self.process_empty_scalar(self.scanner.peek_token().start_mark)
self.state = self.states.pop()
return event
else:
return self.parse_block_node()
def process_directives(self):
+ # type: () -> Any
self.yaml_version = None
self.tag_handles = {}
- while self.check_token(DirectiveToken):
- token = self.get_token()
+ while self.scanner.check_token(DirectiveToken):
+ token = self.scanner.get_token()
if token.name == u'YAML':
if self.yaml_version is not None:
raise ParserError(
@@ -261,8 +278,8 @@ class Parser(object):
"duplicate tag handle %r" % utf8(handle),
token.start_mark)
self.tag_handles[handle] = prefix
- if self.tag_handles:
- value = self.yaml_version, self.tag_handles.copy()
+ if bool(self.tag_handles):
+ value = self.yaml_version, self.tag_handles.copy() # type: Any
else:
value = self.yaml_version, None
for key in self.DEFAULT_TAGS:
@@ -287,43 +304,48 @@ class Parser(object):
# flow_collection ::= flow_sequence | flow_mapping
def parse_block_node(self):
+ # type: () -> Any
return self.parse_node(block=True)
def parse_flow_node(self):
+ # type: () -> Any
return self.parse_node()
def parse_block_node_or_indentless_sequence(self):
+ # type: () -> Any
return self.parse_node(block=True, indentless_sequence=True)
def transform_tag(self, handle, suffix):
+ # type: (Any, Any) -> Any
return self.tag_handles[handle] + suffix
def parse_node(self, block=False, indentless_sequence=False):
- if self.check_token(AliasToken):
- token = self.get_token()
- event = AliasEvent(token.value, token.start_mark, token.end_mark)
+ # type: (bool, bool) -> Any
+ if self.scanner.check_token(AliasToken):
+ token = self.scanner.get_token()
+ event = AliasEvent(token.value, token.start_mark, token.end_mark) # type: Any
self.state = self.states.pop()
else:
anchor = None
tag = None
start_mark = end_mark = tag_mark = None
- if self.check_token(AnchorToken):
- token = self.get_token()
+ if self.scanner.check_token(AnchorToken):
+ token = self.scanner.get_token()
start_mark = token.start_mark
end_mark = token.end_mark
anchor = token.value
- if self.check_token(TagToken):
- token = self.get_token()
+ if self.scanner.check_token(TagToken):
+ token = self.scanner.get_token()
tag_mark = token.start_mark
end_mark = token.end_mark
tag = token.value
- elif self.check_token(TagToken):
- token = self.get_token()
+ elif self.scanner.check_token(TagToken):
+ token = self.scanner.get_token()
start_mark = tag_mark = token.start_mark
end_mark = token.end_mark
tag = token.value
- if self.check_token(AnchorToken):
- token = self.get_token()
+ if self.scanner.check_token(AnchorToken):
+ token = self.scanner.get_token()
end_mark = token.end_mark
anchor = token.value
if tag is not None:
@@ -343,17 +365,17 @@ class Parser(object):
# "Please check 'http://pyyaml.org/wiki/YAMLNonSpecificTag'
# and share your opinion.")
if start_mark is None:
- start_mark = end_mark = self.peek_token().start_mark
+ start_mark = end_mark = self.scanner.peek_token().start_mark
event = None
implicit = (tag is None or tag == u'!')
- if indentless_sequence and self.check_token(BlockEntryToken):
- end_mark = self.peek_token().end_mark
+ if indentless_sequence and self.scanner.check_token(BlockEntryToken):
+ end_mark = self.scanner.peek_token().end_mark
event = SequenceStartEvent(anchor, tag, implicit,
start_mark, end_mark)
self.state = self.parse_indentless_sequence_entry
else:
- if self.check_token(ScalarToken):
- token = self.get_token()
+ if self.scanner.check_token(ScalarToken):
+ token = self.scanner.get_token()
end_mark = token.end_mark
if (token.plain and tag is None) or tag == u'!':
implicit = (True, False)
@@ -367,23 +389,23 @@ class Parser(object):
comment=token.comment
)
self.state = self.states.pop()
- elif self.check_token(FlowSequenceStartToken):
- end_mark = self.peek_token().end_mark
+ elif self.scanner.check_token(FlowSequenceStartToken):
+ end_mark = self.scanner.peek_token().end_mark
event = SequenceStartEvent(
anchor, tag, implicit,
start_mark, end_mark, flow_style=True)
self.state = self.parse_flow_sequence_first_entry
- elif self.check_token(FlowMappingStartToken):
- end_mark = self.peek_token().end_mark
+ elif self.scanner.check_token(FlowMappingStartToken):
+ end_mark = self.scanner.peek_token().end_mark
event = MappingStartEvent(
anchor, tag, implicit,
start_mark, end_mark, flow_style=True)
self.state = self.parse_flow_mapping_first_key
- elif block and self.check_token(BlockSequenceStartToken):
- end_mark = self.peek_token().start_mark
+ elif block and self.scanner.check_token(BlockSequenceStartToken):
+ end_mark = self.scanner.peek_token().start_mark
# should inserting the comment be dependent on the
# indentation?
- pt = self.peek_token()
+ pt = self.scanner.peek_token()
comment = pt.comment
# print('pt0', type(pt))
if comment is None or comment[1] is None:
@@ -395,9 +417,9 @@ class Parser(object):
comment=comment,
)
self.state = self.parse_block_sequence_first_entry
- elif block and self.check_token(BlockMappingStartToken):
- end_mark = self.peek_token().start_mark
- comment = self.peek_token().comment
+ elif block and self.scanner.check_token(BlockMappingStartToken):
+ end_mark = self.scanner.peek_token().start_mark
+ comment = self.scanner.peek_token().comment
event = MappingStartEvent(
anchor, tag, implicit, start_mark, end_mark,
flow_style=False, comment=comment)
@@ -413,7 +435,7 @@ class Parser(object):
node = 'block'
else:
node = 'flow'
- token = self.peek_token()
+ token = self.scanner.peek_token()
raise ParserError(
"while parsing a %s node" % node, start_mark,
"expected the node content, but found %r" % token.id,
@@ -424,29 +446,31 @@ class Parser(object):
# BLOCK-END
def parse_block_sequence_first_entry(self):
- token = self.get_token()
+ # type: () -> Any
+ token = self.scanner.get_token()
# move any comment from start token
- # token.move_comment(self.peek_token())
+ # token.move_comment(self.scanner.peek_token())
self.marks.append(token.start_mark)
return self.parse_block_sequence_entry()
def parse_block_sequence_entry(self):
- if self.check_token(BlockEntryToken):
- token = self.get_token()
- token.move_comment(self.peek_token())
- if not self.check_token(BlockEntryToken, BlockEndToken):
+ # type: () -> Any
+ if self.scanner.check_token(BlockEntryToken):
+ token = self.scanner.get_token()
+ token.move_comment(self.scanner.peek_token())
+ if not self.scanner.check_token(BlockEntryToken, BlockEndToken):
self.states.append(self.parse_block_sequence_entry)
return self.parse_block_node()
else:
self.state = self.parse_block_sequence_entry
return self.process_empty_scalar(token.end_mark)
- if not self.check_token(BlockEndToken):
- token = self.peek_token()
+ if not self.scanner.check_token(BlockEndToken):
+ token = self.scanner.peek_token()
raise ParserError(
"while parsing a block collection", self.marks[-1],
"expected <block end>, but found %r" %
token.id, token.start_mark)
- token = self.get_token() # BlockEndToken
+ token = self.scanner.get_token() # BlockEndToken
event = SequenceEndEvent(token.start_mark, token.end_mark,
comment=token.comment)
self.state = self.states.pop()
@@ -461,17 +485,18 @@ class Parser(object):
# - nested
def parse_indentless_sequence_entry(self):
- if self.check_token(BlockEntryToken):
- token = self.get_token()
- token.move_comment(self.peek_token())
- if not self.check_token(BlockEntryToken,
- KeyToken, ValueToken, BlockEndToken):
+ # type: () -> Any
+ if self.scanner.check_token(BlockEntryToken):
+ token = self.scanner.get_token()
+ token.move_comment(self.scanner.peek_token())
+ if not self.scanner.check_token(BlockEntryToken,
+ KeyToken, ValueToken, BlockEndToken):
self.states.append(self.parse_indentless_sequence_entry)
return self.parse_block_node()
else:
self.state = self.parse_indentless_sequence_entry
return self.process_empty_scalar(token.end_mark)
- token = self.peek_token()
+ token = self.scanner.peek_token()
event = SequenceEndEvent(token.start_mark, token.start_mark,
comment=token.comment)
self.state = self.states.pop()
@@ -483,28 +508,30 @@ class Parser(object):
# BLOCK-END
def parse_block_mapping_first_key(self):
- token = self.get_token()
+ # type: () -> Any
+ token = self.scanner.get_token()
self.marks.append(token.start_mark)
return self.parse_block_mapping_key()
def parse_block_mapping_key(self):
- if self.check_token(KeyToken):
- token = self.get_token()
- token.move_comment(self.peek_token())
- if not self.check_token(KeyToken, ValueToken, BlockEndToken):
+ # type: () -> Any
+ if self.scanner.check_token(KeyToken):
+ token = self.scanner.get_token()
+ token.move_comment(self.scanner.peek_token())
+ if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken):
self.states.append(self.parse_block_mapping_value)
return self.parse_block_node_or_indentless_sequence()
else:
self.state = self.parse_block_mapping_value
return self.process_empty_scalar(token.end_mark)
- if not self.check_token(BlockEndToken):
- token = self.peek_token()
+ if not self.scanner.check_token(BlockEndToken):
+ token = self.scanner.peek_token()
raise ParserError(
"while parsing a block mapping", self.marks[-1],
"expected <block end>, but found %r" % token.id,
token.start_mark)
- token = self.get_token()
- token.move_comment(self.peek_token())
+ token = self.scanner.get_token()
+ token.move_comment(self.scanner.peek_token())
event = MappingEndEvent(token.start_mark, token.end_mark,
comment=token.comment)
self.state = self.states.pop()
@@ -512,23 +539,24 @@ class Parser(object):
return event
def parse_block_mapping_value(self):
- if self.check_token(ValueToken):
- token = self.get_token()
+ # type: () -> Any
+ if self.scanner.check_token(ValueToken):
+ token = self.scanner.get_token()
# value token might have post comment move it to e.g. block
- if self.check_token(ValueToken):
- token.move_comment(self.peek_token())
+ if self.scanner.check_token(ValueToken):
+ token.move_comment(self.scanner.peek_token())
else:
- token.move_comment(self.peek_token(), empty=True)
- if not self.check_token(KeyToken, ValueToken, BlockEndToken):
+ token.move_comment(self.scanner.peek_token(), empty=True)
+ if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken):
self.states.append(self.parse_block_mapping_key)
return self.parse_block_node_or_indentless_sequence()
else:
self.state = self.parse_block_mapping_key
return self.process_empty_scalar(token.end_mark,
- comment=self.peek_token().comment)
+ comment=self.scanner.peek_token().comment)
else:
self.state = self.parse_block_mapping_key
- token = self.peek_token()
+ token = self.scanner.peek_token()
return self.process_empty_scalar(token.start_mark)
# flow_sequence ::= FLOW-SEQUENCE-START
@@ -543,33 +571,35 @@ class Parser(object):
# generate an inline mapping (set syntax).
def parse_flow_sequence_first_entry(self):
- token = self.get_token()
+ # type: () -> Any
+ token = self.scanner.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_sequence_entry(first=True)
def parse_flow_sequence_entry(self, first=False):
- if not self.check_token(FlowSequenceEndToken):
+ # type: (bool) -> Any
+ if not self.scanner.check_token(FlowSequenceEndToken):
if not first:
- if self.check_token(FlowEntryToken):
- self.get_token()
+ if self.scanner.check_token(FlowEntryToken):
+ self.scanner.get_token()
else:
- token = self.peek_token()
+ token = self.scanner.peek_token()
raise ParserError(
"while parsing a flow sequence", self.marks[-1],
"expected ',' or ']', but got %r" % token.id,
token.start_mark)
- if self.check_token(KeyToken):
- token = self.peek_token()
+ if self.scanner.check_token(KeyToken):
+ token = self.scanner.peek_token()
event = MappingStartEvent(None, None, True,
token.start_mark, token.end_mark,
- flow_style=True)
+ flow_style=True) # type: Any
self.state = self.parse_flow_sequence_entry_mapping_key
return event
- elif not self.check_token(FlowSequenceEndToken):
+ elif not self.scanner.check_token(FlowSequenceEndToken):
self.states.append(self.parse_flow_sequence_entry)
return self.parse_flow_node()
- token = self.get_token()
+ token = self.scanner.get_token()
event = SequenceEndEvent(token.start_mark, token.end_mark,
comment=token.comment)
self.state = self.states.pop()
@@ -577,9 +607,10 @@ class Parser(object):
return event
def parse_flow_sequence_entry_mapping_key(self):
- token = self.get_token()
- if not self.check_token(ValueToken,
- FlowEntryToken, FlowSequenceEndToken):
+ # type: () -> Any
+ token = self.scanner.get_token()
+ if not self.scanner.check_token(ValueToken,
+ FlowEntryToken, FlowSequenceEndToken):
self.states.append(self.parse_flow_sequence_entry_mapping_value)
return self.parse_flow_node()
else:
@@ -587,9 +618,10 @@ class Parser(object):
return self.process_empty_scalar(token.end_mark)
def parse_flow_sequence_entry_mapping_value(self):
- if self.check_token(ValueToken):
- token = self.get_token()
- if not self.check_token(FlowEntryToken, FlowSequenceEndToken):
+ # type: () -> Any
+ if self.scanner.check_token(ValueToken):
+ token = self.scanner.get_token()
+ if not self.scanner.check_token(FlowEntryToken, FlowSequenceEndToken):
self.states.append(self.parse_flow_sequence_entry_mapping_end)
return self.parse_flow_node()
else:
@@ -597,12 +629,13 @@ class Parser(object):
return self.process_empty_scalar(token.end_mark)
else:
self.state = self.parse_flow_sequence_entry_mapping_end
- token = self.peek_token()
+ token = self.scanner.peek_token()
return self.process_empty_scalar(token.start_mark)
def parse_flow_sequence_entry_mapping_end(self):
+ # type: () -> Any
self.state = self.parse_flow_sequence_entry
- token = self.peek_token()
+ token = self.scanner.peek_token()
return MappingEndEvent(token.start_mark, token.start_mark)
# flow_mapping ::= FLOW-MAPPING-START
@@ -612,34 +645,36 @@ class Parser(object):
# flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)?
def parse_flow_mapping_first_key(self):
- token = self.get_token()
+ # type: () -> Any
+ token = self.scanner.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_mapping_key(first=True)
def parse_flow_mapping_key(self, first=False):
- if not self.check_token(FlowMappingEndToken):
+ # type: (Any) -> Any
+ if not self.scanner.check_token(FlowMappingEndToken):
if not first:
- if self.check_token(FlowEntryToken):
- self.get_token()
+ if self.scanner.check_token(FlowEntryToken):
+ self.scanner.get_token()
else:
- token = self.peek_token()
+ token = self.scanner.peek_token()
raise ParserError(
"while parsing a flow mapping", self.marks[-1],
"expected ',' or '}', but got %r" % token.id,
token.start_mark)
- if self.check_token(KeyToken):
- token = self.get_token()
- if not self.check_token(ValueToken,
- FlowEntryToken, FlowMappingEndToken):
+ if self.scanner.check_token(KeyToken):
+ token = self.scanner.get_token()
+ if not self.scanner.check_token(ValueToken,
+ FlowEntryToken, FlowMappingEndToken):
self.states.append(self.parse_flow_mapping_value)
return self.parse_flow_node()
else:
self.state = self.parse_flow_mapping_value
return self.process_empty_scalar(token.end_mark)
- elif not self.check_token(FlowMappingEndToken):
+ elif not self.scanner.check_token(FlowMappingEndToken):
self.states.append(self.parse_flow_mapping_empty_value)
return self.parse_flow_node()
- token = self.get_token()
+ token = self.scanner.get_token()
event = MappingEndEvent(token.start_mark, token.end_mark,
comment=token.comment)
self.state = self.states.pop()
@@ -647,9 +682,10 @@ class Parser(object):
return event
def parse_flow_mapping_value(self):
- if self.check_token(ValueToken):
- token = self.get_token()
- if not self.check_token(FlowEntryToken, FlowMappingEndToken):
+ # type: () -> Any
+ if self.scanner.check_token(ValueToken):
+ token = self.scanner.get_token()
+ if not self.scanner.check_token(FlowEntryToken, FlowMappingEndToken):
self.states.append(self.parse_flow_mapping_key)
return self.parse_flow_node()
else:
@@ -657,20 +693,23 @@ class Parser(object):
return self.process_empty_scalar(token.end_mark)
else:
self.state = self.parse_flow_mapping_key
- token = self.peek_token()
+ token = self.scanner.peek_token()
return self.process_empty_scalar(token.start_mark)
def parse_flow_mapping_empty_value(self):
+ # type: () -> Any
self.state = self.parse_flow_mapping_key
- return self.process_empty_scalar(self.peek_token().start_mark)
+ return self.process_empty_scalar(self.scanner.peek_token().start_mark)
def process_empty_scalar(self, mark, comment=None):
+ # type: (Any, Any) -> Any
return ScalarEvent(None, None, (True, False), u'', mark, mark, comment=comment)
class RoundTripParser(Parser):
"""roundtrip is a safe loader, that wants to see the unmangled tag"""
def transform_tag(self, handle, suffix):
+ # type: (Any, Any) -> Any
# return self.tag_handles[handle]+suffix
if handle == '!!' and suffix in (u'null', u'bool', u'int', u'float', u'binary',
u'timestamp', u'omap', u'pairs', u'set', u'str',