summaryrefslogtreecommitdiff
path: root/parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'parser.py')
-rw-r--r--parser.py315
1 files changed, 180 insertions, 135 deletions
diff --git a/parser.py b/parser.py
index 10deaa8..c8b5fcf 100644
--- a/parser.py
+++ b/parser.py
@@ -1,7 +1,5 @@
# coding: utf-8
-from __future__ import absolute_import
-
# The following YAML grammar is LL(1) and is parsed by a recursive descent
# parser.
#
@@ -46,7 +44,7 @@ from __future__ import absolute_import
#
# FIRST sets:
#
-# stream: { STREAM-START }
+# stream: { STREAM-START <}
# explicit_document: { DIRECTIVE DOCUMENT-START }
# implicit_document: FIRST(block_node)
# block_node: { ALIAS TAG ANCHOR SCALAR BLOCK-SEQUENCE-START
@@ -80,60 +78,60 @@ from ruamel.yaml.error import MarkedYAMLError
from ruamel.yaml.tokens import * # NOQA
from ruamel.yaml.events import * # NOQA
from ruamel.yaml.scanner import Scanner, RoundTripScanner, ScannerError # NOQA
-from ruamel.yaml.compat import utf8, nprint, nprintf # NOQA
+from ruamel.yaml.scanner import BlankLineComment
+from ruamel.yaml.comments import C_PRE, C_POST, C_SPLIT_ON_FIRST_BLANK
+from ruamel.yaml.compat import nprint, nprintf # NOQA
-if False: # MYPY
- from typing import Any, Dict, Optional, List # NOQA
+from typing import Any, Dict, Optional, List, Optional # NOQA
__all__ = ['Parser', 'RoundTripParser', 'ParserError']
+def xprintf(*args: Any, **kw: Any) -> Any:
+ return nprintf(*args, **kw)
+ pass
+
+
class ParserError(MarkedYAMLError):
pass
-class Parser(object):
+class Parser:
# Since writing a recursive-descendant parser is a straightforward task, we
# do not give many comments here.
- DEFAULT_TAGS = {u'!': u'!', u'!!': u'tag:yaml.org,2002:'}
+ DEFAULT_TAGS = {'!': '!', '!!': 'tag:yaml.org,2002:'}
- def __init__(self, loader):
- # type: (Any) -> None
+ def __init__(self, loader: Any) -> None:
self.loader = loader
if self.loader is not None and getattr(self.loader, '_parser', None) is None:
self.loader._parser = self
self.reset_parser()
- def reset_parser(self):
- # type: () -> None
+ def reset_parser(self) -> None:
# Reset the state attributes (to clear self-references)
- self.current_event = None
- self.tag_handles = {} # type: Dict[Any, Any]
- self.states = [] # type: List[Any]
- self.marks = [] # type: List[Any]
- self.state = self.parse_stream_start # type: Any
+ self.current_event = self.last_event = None
+ self.tag_handles: Dict[Any, Any] = {}
+ self.states: List[Any] = []
+ self.marks: List[Any] = []
+ self.state: Any = self.parse_stream_start
- def dispose(self):
- # type: () -> None
+ def dispose(self) -> None:
self.reset_parser()
@property
- def scanner(self):
- # type: () -> Any
+ def scanner(self) -> Any:
if hasattr(self.loader, 'typ'):
return self.loader.scanner
return self.loader._scanner
@property
- def resolver(self):
- # type: () -> Any
+ def resolver(self) -> Any:
if hasattr(self.loader, 'typ'):
return self.loader.resolver
return self.loader._resolver
- def check_event(self, *choices):
- # type: (Any) -> bool
+ def check_event(self, *choices: Any) -> bool:
# Check the type of the next event.
if self.current_event is None:
if self.state:
@@ -146,21 +144,22 @@ class Parser(object):
return True
return False
- def peek_event(self):
- # type: () -> Any
+ def peek_event(self) -> Any:
# Get the next event.
if self.current_event is None:
if self.state:
self.current_event = self.state()
return self.current_event
- def get_event(self):
- # type: () -> Any
+ def get_event(self) -> Any:
# Get the next event and proceed further.
if self.current_event is None:
if self.state:
self.current_event = self.state()
- value = self.current_event
+ # assert self.current_event is not None
+ # if self.current_event.end_mark.line != self.peek_event().start_mark.line:
+ xprintf('get_event', repr(self.current_event), self.peek_event().start_mark.line)
+ self.last_event = value = self.current_event
self.current_event = None
return value
@@ -169,11 +168,10 @@ class Parser(object):
# implicit_document ::= block_node DOCUMENT-END*
# explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END*
- def parse_stream_start(self):
- # type: () -> Any
+ def parse_stream_start(self) -> Any:
# Parse the stream start.
token = self.scanner.get_token()
- token.move_comment(self.scanner.peek_token())
+ self.move_token_comment(token)
event = StreamStartEvent(token.start_mark, token.end_mark, encoding=token.encoding)
# Prepare the next state.
@@ -181,8 +179,7 @@ class Parser(object):
return event
- def parse_implicit_document_start(self):
- # type: () -> Any
+ def parse_implicit_document_start(self) -> Any:
# Parse an implicit document.
if not self.scanner.check_token(DirectiveToken, DocumentStartToken, StreamEndToken):
self.tag_handles = self.DEFAULT_TAGS
@@ -199,31 +196,35 @@ class Parser(object):
else:
return self.parse_document_start()
- def parse_document_start(self):
- # type: () -> Any
+ def parse_document_start(self) -> Any:
# Parse any extra document end indicators.
while self.scanner.check_token(DocumentEndToken):
self.scanner.get_token()
# Parse an explicit document.
if not self.scanner.check_token(StreamEndToken):
- token = self.scanner.peek_token()
- start_mark = token.start_mark
version, tags = self.process_directives()
if not self.scanner.check_token(DocumentStartToken):
raise ParserError(
None,
None,
- "expected '<document start>', but found %r" % self.scanner.peek_token().id,
+ "expected '<document start>', "
+ f'but found {self.scanner.peek_token().id,!r}',
self.scanner.peek_token().start_mark,
)
token = self.scanner.get_token()
+ start_mark = token.start_mark
end_mark = token.end_mark
# if self.loader is not None and \
# end_mark.line != self.scanner.peek_token().start_mark.line:
# self.loader.scalar_after_indicator = False
- event = DocumentStartEvent(
- start_mark, end_mark, explicit=True, version=version, tags=tags
- ) # type: Any
+ event: Any = DocumentStartEvent(
+ start_mark,
+ end_mark,
+ explicit=True,
+ version=version,
+ tags=tags,
+ comment=token.comment,
+ )
self.states.append(self.parse_document_end)
self.state = self.parse_document_content
else:
@@ -235,8 +236,7 @@ class Parser(object):
self.state = None
return event
- def parse_document_end(self):
- # type: () -> Any
+ def parse_document_end(self) -> Any:
# Parse the document end.
token = self.scanner.peek_token()
start_mark = end_mark = token.start_mark
@@ -255,8 +255,7 @@ class Parser(object):
return event
- def parse_document_content(self):
- # type: () -> Any
+ def parse_document_content(self) -> Any:
if self.scanner.check_token(
DirectiveToken, DocumentStartToken, DocumentEndToken, StreamEndToken
):
@@ -266,13 +265,12 @@ class Parser(object):
else:
return self.parse_block_node()
- def process_directives(self):
- # type: () -> Any
+ def process_directives(self) -> Any:
yaml_version = None
self.tag_handles = {}
while self.scanner.check_token(DirectiveToken):
token = self.scanner.get_token()
- if token.name == u'YAML':
+ if token.name == 'YAML':
if yaml_version is not None:
raise ParserError(
None, None, 'found duplicate YAML directive', token.start_mark
@@ -282,19 +280,19 @@ class Parser(object):
raise ParserError(
None,
None,
- 'found incompatible YAML document (version 1.* is ' 'required)',
+ 'found incompatible YAML document (version 1.* is required)',
token.start_mark,
)
yaml_version = token.value
- elif token.name == u'TAG':
+ elif token.name == 'TAG':
handle, prefix = token.value
if handle in self.tag_handles:
raise ParserError(
- None, None, 'duplicate tag handle %r' % utf8(handle), token.start_mark
+ None, None, f'duplicate tag handle {handle!r}', token.start_mark,
)
self.tag_handles[handle] = prefix
if bool(self.tag_handles):
- value = yaml_version, self.tag_handles.copy() # type: Any
+ value: Any = (yaml_version, self.tag_handles.copy())
else:
value = yaml_version, None
if self.loader is not None and hasattr(self.loader, 'tags'):
@@ -324,27 +322,22 @@ class Parser(object):
# block_collection ::= block_sequence | block_mapping
# flow_collection ::= flow_sequence | flow_mapping
- def parse_block_node(self):
- # type: () -> Any
+ def parse_block_node(self) -> Any:
return self.parse_node(block=True)
- def parse_flow_node(self):
- # type: () -> Any
+ def parse_flow_node(self) -> Any:
return self.parse_node()
- def parse_block_node_or_indentless_sequence(self):
- # type: () -> Any
+ def parse_block_node_or_indentless_sequence(self) -> Any:
return self.parse_node(block=True, indentless_sequence=True)
- def transform_tag(self, handle, suffix):
- # type: (Any, Any) -> Any
+ def transform_tag(self, handle: Any, suffix: Any) -> Any:
return self.tag_handles[handle] + suffix
- def parse_node(self, block=False, indentless_sequence=False):
- # type: (bool, bool) -> Any
+ def parse_node(self, block: bool = False, indentless_sequence: bool = False) -> Any:
if self.scanner.check_token(AliasToken):
token = self.scanner.get_token()
- event = AliasEvent(token.value, token.start_mark, token.end_mark) # type: Any
+ event: Any = AliasEvent(token.value, token.start_mark, token.end_mark)
self.state = self.states.pop()
return event
@@ -353,6 +346,7 @@ class Parser(object):
start_mark = end_mark = tag_mark = None
if self.scanner.check_token(AnchorToken):
token = self.scanner.get_token()
+ self.move_token_comment(token)
start_mark = token.start_mark
end_mark = token.end_mark
anchor = token.value
@@ -378,13 +372,13 @@ class Parser(object):
raise ParserError(
'while parsing a node',
start_mark,
- 'found undefined tag handle %r' % utf8(handle),
+ f'found undefined tag handle {handle!r}',
tag_mark,
)
tag = self.transform_tag(handle, suffix)
else:
tag = suffix
- # if tag == u'!':
+ # if tag == '!':
# raise ParserError("while parsing a node", start_mark,
# "found non-specific tag '!'", tag_mark,
# "Please check 'http://pyyaml.org/wiki/YAMLNonSpecificTag'
@@ -392,13 +386,17 @@ class Parser(object):
if start_mark is None:
start_mark = end_mark = self.scanner.peek_token().start_mark
event = None
- implicit = tag is None or tag == u'!'
+ implicit = tag is None or tag == '!'
if indentless_sequence and self.scanner.check_token(BlockEntryToken):
comment = None
pt = self.scanner.peek_token()
- if pt.comment and pt.comment[0]:
- comment = [pt.comment[0], []]
- pt.comment[0] = None
+ if self.loader and self.loader.comment_handling is None:
+ if pt.comment and pt.comment[0]:
+ comment = [pt.comment[0], []]
+ pt.comment[0] = None
+ elif self.loader:
+ if pt.comment:
+ comment = pt.comment
end_mark = self.scanner.peek_token().end_mark
event = SequenceStartEvent(
anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment
@@ -410,7 +408,7 @@ class Parser(object):
token = self.scanner.get_token()
# self.scanner.peek_token_same_line_comment(token)
end_mark = token.end_mark
- if (token.plain and tag is None) or tag == u'!':
+ if (token.plain and tag is None) or tag == '!':
implicit = (True, False)
elif tag is None:
implicit = (False, True)
@@ -462,7 +460,7 @@ class Parser(object):
comment = pt.comment
# nprint('pt0', type(pt))
if comment is None or comment[1] is None:
- comment = pt.split_comment()
+ comment = pt.split_old_comment()
# nprint('pt1', comment)
event = SequenceStartEvent(
anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment
@@ -487,9 +485,9 @@ class Parser(object):
node = 'flow'
token = self.scanner.peek_token()
raise ParserError(
- 'while parsing a %s node' % node,
+ f'while parsing a {node!s} node',
start_mark,
- 'expected the node content, but found %r' % token.id,
+ f'expected the node content, but found {token.id!r}',
token.start_mark,
)
return event
@@ -497,19 +495,17 @@ class Parser(object):
# block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)*
# BLOCK-END
- def parse_block_sequence_first_entry(self):
- # type: () -> Any
+ def parse_block_sequence_first_entry(self) -> Any:
token = self.scanner.get_token()
# move any comment from start token
- # token.move_comment(self.scanner.peek_token())
+ # self.move_token_comment(token)
self.marks.append(token.start_mark)
return self.parse_block_sequence_entry()
- def parse_block_sequence_entry(self):
- # type: () -> Any
+ def parse_block_sequence_entry(self) -> Any:
if self.scanner.check_token(BlockEntryToken):
token = self.scanner.get_token()
- token.move_comment(self.scanner.peek_token())
+ self.move_token_comment(token)
if not self.scanner.check_token(BlockEntryToken, BlockEndToken):
self.states.append(self.parse_block_sequence_entry)
return self.parse_block_node()
@@ -521,7 +517,7 @@ class Parser(object):
raise ParserError(
'while parsing a block collection',
self.marks[-1],
- 'expected <block end>, but found %r' % token.id,
+ f'expected <block end>, but found {token.id!r}',
token.start_mark,
)
token = self.scanner.get_token() # BlockEndToken
@@ -537,11 +533,10 @@ class Parser(object):
# - entry
# - nested
- def parse_indentless_sequence_entry(self):
- # type: () -> Any
+ def parse_indentless_sequence_entry(self) -> Any:
if self.scanner.check_token(BlockEntryToken):
token = self.scanner.get_token()
- token.move_comment(self.scanner.peek_token())
+ self.move_token_comment(token)
if not self.scanner.check_token(
BlockEntryToken, KeyToken, ValueToken, BlockEndToken
):
@@ -551,7 +546,14 @@ class Parser(object):
self.state = self.parse_indentless_sequence_entry
return self.process_empty_scalar(token.end_mark)
token = self.scanner.peek_token()
- event = SequenceEndEvent(token.start_mark, token.start_mark, comment=token.comment)
+ c = None
+ if self.loader and self.loader.comment_handling is None:
+ c = token.comment
+ start_mark = token.start_mark
+ else:
+ start_mark = self.last_event.end_mark # type: ignore
+ c = self.distribute_comment(token.comment, start_mark.line) # type: ignore
+ event = SequenceEndEvent(start_mark, start_mark, comment=c)
self.state = self.states.pop()
return event
@@ -560,17 +562,15 @@ class Parser(object):
# (VALUE block_node_or_indentless_sequence?)?)*
# BLOCK-END
- def parse_block_mapping_first_key(self):
- # type: () -> Any
+ def parse_block_mapping_first_key(self) -> Any:
token = self.scanner.get_token()
self.marks.append(token.start_mark)
return self.parse_block_mapping_key()
- def parse_block_mapping_key(self):
- # type: () -> Any
+ def parse_block_mapping_key(self) -> Any:
if self.scanner.check_token(KeyToken):
token = self.scanner.get_token()
- token.move_comment(self.scanner.peek_token())
+ self.move_token_comment(token)
if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken):
self.states.append(self.parse_block_mapping_value)
return self.parse_block_node_or_indentless_sequence()
@@ -585,26 +585,25 @@ class Parser(object):
raise ParserError(
'while parsing a block mapping',
self.marks[-1],
- 'expected <block end>, but found %r' % token.id,
+ f'expected <block end>, but found {token.id!r}',
token.start_mark,
)
token = self.scanner.get_token()
- token.move_comment(self.scanner.peek_token())
+ self.move_token_comment(token)
event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment)
self.state = self.states.pop()
self.marks.pop()
return event
- def parse_block_mapping_value(self):
- # type: () -> Any
+ def parse_block_mapping_value(self) -> Any:
if self.scanner.check_token(ValueToken):
token = self.scanner.get_token()
# value token might have post comment move it to e.g. block
if self.scanner.check_token(ValueToken):
- token.move_comment(self.scanner.peek_token())
+ self.move_token_comment(token)
else:
if not self.scanner.check_token(KeyToken):
- token.move_comment(self.scanner.peek_token(), empty=True)
+ self.move_token_comment(token, empty=True)
# else: empty value for this key cannot move token.comment
if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken):
self.states.append(self.parse_block_mapping_key)
@@ -635,14 +634,12 @@ class Parser(object):
# For `flow_sequence_entry`, the part `KEY flow_node? (VALUE flow_node?)?`
# generate an inline mapping (set syntax).
- def parse_flow_sequence_first_entry(self):
- # type: () -> Any
+ def parse_flow_sequence_first_entry(self) -> Any:
token = self.scanner.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_sequence_entry(first=True)
- def parse_flow_sequence_entry(self, first=False):
- # type: (bool) -> Any
+ def parse_flow_sequence_entry(self, first: bool = False) -> Any:
if not self.scanner.check_token(FlowSequenceEndToken):
if not first:
if self.scanner.check_token(FlowEntryToken):
@@ -652,15 +649,15 @@ class Parser(object):
raise ParserError(
'while parsing a flow sequence',
self.marks[-1],
- "expected ',' or ']', but got %r" % token.id,
+ f"expected ',' or ']', but got {token.id!r}",
token.start_mark,
)
if self.scanner.check_token(KeyToken):
token = self.scanner.peek_token()
- event = MappingStartEvent(
+ event: Any = MappingStartEvent(
None, None, True, token.start_mark, token.end_mark, flow_style=True
- ) # type: Any
+ )
self.state = self.parse_flow_sequence_entry_mapping_key
return event
elif not self.scanner.check_token(FlowSequenceEndToken):
@@ -672,8 +669,7 @@ class Parser(object):
self.marks.pop()
return event
- def parse_flow_sequence_entry_mapping_key(self):
- # type: () -> Any
+ def parse_flow_sequence_entry_mapping_key(self) -> Any:
token = self.scanner.get_token()
if not self.scanner.check_token(ValueToken, FlowEntryToken, FlowSequenceEndToken):
self.states.append(self.parse_flow_sequence_entry_mapping_value)
@@ -682,8 +678,7 @@ class Parser(object):
self.state = self.parse_flow_sequence_entry_mapping_value
return self.process_empty_scalar(token.end_mark)
- def parse_flow_sequence_entry_mapping_value(self):
- # type: () -> Any
+ def parse_flow_sequence_entry_mapping_value(self) -> Any:
if self.scanner.check_token(ValueToken):
token = self.scanner.get_token()
if not self.scanner.check_token(FlowEntryToken, FlowSequenceEndToken):
@@ -697,8 +692,7 @@ class Parser(object):
token = self.scanner.peek_token()
return self.process_empty_scalar(token.start_mark)
- def parse_flow_sequence_entry_mapping_end(self):
- # type: () -> Any
+ def parse_flow_sequence_entry_mapping_end(self) -> Any:
self.state = self.parse_flow_sequence_entry
token = self.scanner.peek_token()
return MappingEndEvent(token.start_mark, token.start_mark)
@@ -709,14 +703,12 @@ class Parser(object):
# FLOW-MAPPING-END
# flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)?
- def parse_flow_mapping_first_key(self):
- # type: () -> Any
+ def parse_flow_mapping_first_key(self) -> Any:
token = self.scanner.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_mapping_key(first=True)
- def parse_flow_mapping_key(self, first=False):
- # type: (Any) -> Any
+ def parse_flow_mapping_key(self, first: Any = False) -> Any:
if not self.scanner.check_token(FlowMappingEndToken):
if not first:
if self.scanner.check_token(FlowEntryToken):
@@ -726,7 +718,7 @@ class Parser(object):
raise ParserError(
'while parsing a flow mapping',
self.marks[-1],
- "expected ',' or '}', but got %r" % token.id,
+ f"expected ',' or '}}', but got {token.id!r}",
token.start_mark,
)
if self.scanner.check_token(KeyToken):
@@ -753,8 +745,7 @@ class Parser(object):
self.marks.pop()
return event
- def parse_flow_mapping_value(self):
- # type: () -> Any
+ def parse_flow_mapping_value(self) -> Any:
if self.scanner.check_token(ValueToken):
token = self.scanner.get_token()
if not self.scanner.check_token(FlowEntryToken, FlowMappingEndToken):
@@ -768,35 +759,89 @@ class Parser(object):
token = self.scanner.peek_token()
return self.process_empty_scalar(token.start_mark)
- def parse_flow_mapping_empty_value(self):
- # type: () -> Any
+ def parse_flow_mapping_empty_value(self) -> Any:
self.state = self.parse_flow_mapping_key
return self.process_empty_scalar(self.scanner.peek_token().start_mark)
- def process_empty_scalar(self, mark, comment=None):
- # type: (Any, Any) -> Any
+ def process_empty_scalar(self, mark: Any, comment: Any = None) -> Any:
return ScalarEvent(None, None, (True, False), "", mark, mark, comment=comment)
+ def move_token_comment(
+ self, token: Any, nt: Optional[Any] = None, empty: Optional[bool] = False
+ ) -> Any:
+ pass
+
class RoundTripParser(Parser):
"""roundtrip is a safe loader, that wants to see the unmangled tag"""
- def transform_tag(self, handle, suffix):
- # type: (Any, Any) -> Any
+ def transform_tag(self, handle: Any, suffix: Any) -> Any:
# return self.tag_handles[handle]+suffix
if handle == '!!' and suffix in (
- u'null',
- u'bool',
- u'int',
- u'float',
- u'binary',
- u'timestamp',
- u'omap',
- u'pairs',
- u'set',
- u'str',
- u'seq',
- u'map',
+ 'null',
+ 'bool',
+ 'int',
+ 'float',
+ 'binary',
+ 'timestamp',
+ 'omap',
+ 'pairs',
+ 'set',
+ 'str',
+ 'seq',
+ 'map',
):
return Parser.transform_tag(self, handle, suffix)
return handle + suffix
+
+ def move_token_comment(
+ self, token: Any, nt: Optional[Any] = None, empty: Optional[bool] = False
+ ) -> Any:
+ token.move_old_comment(self.scanner.peek_token() if nt is None else nt, empty=empty)
+
+
+class RoundTripParserSC(RoundTripParser):
+ """roundtrip is a safe loader, that wants to see the unmangled tag"""
+
+ # some of the differences are based on the superclass testing
+ # if self.loader.comment_handling is not None
+
+ def move_token_comment(
+ self: Any, token: Any, nt: Any = None, empty: Optional[bool] = False
+ ) -> None:
+ token.move_new_comment(self.scanner.peek_token() if nt is None else nt, empty=empty)
+
+ def distribute_comment(self, comment: Any, line: Any) -> Any:
+ # ToDo, look at indentation of the comment to determine attachment
+ if comment is None:
+ return None
+ if not comment[0]:
+ return None
+ if comment[0][0] != line + 1:
+ nprintf('>>>dcxxx', comment, line)
+ assert comment[0][0] == line + 1
+ # if comment[0] - line > 1:
+ # return
+ typ = self.loader.comment_handling & 0b11
+ # nprintf('>>>dca', comment, line, typ)
+ if typ == C_POST:
+ return None
+ if typ == C_PRE:
+ c = [None, None, comment[0]]
+ comment[0] = None
+ return c
+ # nprintf('>>>dcb', comment[0])
+ for _idx, cmntidx in enumerate(comment[0]):
+ # nprintf('>>>dcb', cmntidx)
+ if isinstance(self.scanner.comments[cmntidx], BlankLineComment):
+ break
+ else:
+ return None # no space found
+ if _idx == 0:
+ return None # first line was blank
+ # nprintf('>>>dcc', idx)
+ if typ == C_SPLIT_ON_FIRST_BLANK:
+ c = [None, None, comment[0][:_idx]]
+ comment[0] = comment[0][_idx:]
+ return c
+ raise NotImplementedError # reserved