summaryrefslogtreecommitdiff
path: root/scanner.py
diff options
context:
space:
mode:
authorAnthon van der Neut <anthon@mnt.org>2021-05-06 08:36:49 +0200
committerAnthon van der Neut <anthon@mnt.org>2021-05-06 08:36:49 +0200
commit17b35c376fd0fc9a94ba0adfdbf5bf63a6177dc9 (patch)
tree49a76a7328cbc20efde9603d5373ecf003adbbc6 /scanner.py
parent3d77f16e00124b74e150625396617b41e41da014 (diff)
downloadruamel.yaml-17b35c376fd0fc9a94ba0adfdbf5bf63a6177dc9.tar.gz
* extend EOL token handling
* extending comment
Diffstat (limited to 'scanner.py')
-rw-r--r--scanner.py445
1 files changed, 423 insertions, 22 deletions
diff --git a/scanner.py b/scanner.py
index f98da00..f9e6052 100644
--- a/scanner.py
+++ b/scanner.py
@@ -44,6 +44,10 @@ _THE_END = '\n\0\r\x85\u2028\u2029'
_THE_END_SPACE_TAB = ' \n\0\t\r\x85\u2028\u2029'
_SPACE_TAB = ' \t'
+def xprintf(*args, **kw):
+ return nprintf(*args, **kw)
+ pass
+
class ScannerError(MarkedYAMLError):
pass
@@ -167,7 +171,7 @@ class Scanner:
# Check if the next token is one of the given types.
while self.need_more_tokens():
self.fetch_more_tokens()
- if bool(self.tokens):
+ if len(self.tokens) > 0:
if not choices:
return True
for choice in choices:
@@ -180,7 +184,7 @@ class Scanner:
# Return the next token, but do not delete if from the queue.
while self.need_more_tokens():
self.fetch_more_tokens()
- if bool(self.tokens):
+ if len(self.tokens) > 0:
return self.tokens[0]
def get_token(self):
@@ -188,7 +192,7 @@ class Scanner:
# Return the next token.
while self.need_more_tokens():
self.fetch_more_tokens()
- if bool(self.tokens):
+ if len(self.tokens) > 0:
self.tokens_taken += 1
return self.tokens.pop(0)
@@ -198,7 +202,7 @@ class Scanner:
# type: () -> bool
if self.done:
return False
- if not self.tokens:
+ if len(self.tokens) == 0:
return True
# The current token may be a potential simple key, so we
# need to look further.
@@ -1231,21 +1235,33 @@ class Scanner:
# We are done.
token = ScalarToken("".join(chunks), False, start_mark, end_mark, style)
- if block_scalar_comment is not None:
- token.add_pre_comments([block_scalar_comment])
+ if self.loader is not None:
+ comment_handler = getattr(self.loader, 'comment_handling', False)
+ if comment_handler is None:
+ if block_scalar_comment is not None:
+ token.add_pre_comments([block_scalar_comment])
if len(trailing) > 0:
- # nprint('trailing 1', trailing) # XXXXX
# Eat whitespaces and comments until we reach the next token.
+ if self.loader is not None:
+ comment_handler = getattr(self.loader, 'comment_handling', None)
+ if comment_handler is not None:
+ line = end_mark.line - len(trailing)
+ for x in trailing:
+ assert x[-1] == '\n'
+ self.comments.add_blank_line(x, 0, line)
+ line += 1
comment = self.scan_to_next_token()
while comment:
trailing.append(' ' * comment[1].column + comment[0])
comment = self.scan_to_next_token()
-
- # Keep track of the trailing whitespace and following comments
- # as a comment token, if isn't all included in the actual value.
- comment_end_mark = self.reader.get_mark()
- comment = CommentToken("".join(trailing), end_mark, comment_end_mark)
- token.add_post_comment(comment)
+ if self.loader is not None:
+ comment_handler = getattr(self.loader, 'comment_handling', False)
+ if comment_handler is None:
+ # Keep track of the trailing whitespace and following comments
+ # as a comment token, if isn't all included in the actual value.
+ comment_end_mark = self.reader.get_mark()
+ comment = CommentToken("".join(trailing), end_mark, comment_end_mark)
+ token.add_post_comment(comment)
return token
def scan_block_scalar_indicators(self, start_mark):
@@ -1590,10 +1606,21 @@ class Scanner:
break
token = ScalarToken("".join(chunks), True, start_mark, end_mark)
- if spaces and spaces[0] == '\n':
- # Create a comment token to preserve the trailing line breaks.
- comment = CommentToken("".join(spaces) + '\n', start_mark, end_mark)
- token.add_post_comment(comment)
+ # getattr provides True so C type loader, which cannot handle comment, will not make CommentToken
+ if self.loader is not None:
+ comment_handler = getattr(self.loader, 'comment_handling', False)
+ if comment_handler is None:
+ if spaces and spaces[0] == '\n':
+ # Create a comment token to preserve the trailing line breaks.
+ comment = CommentToken("".join(spaces) + '\n', start_mark, end_mark)
+ token.add_post_comment(comment)
+ elif comment_handler is not False:
+ line = start_mark.line + 1
+ for ch in spaces:
+ if ch == '\n':
+ self.comments.add_blank_line('\n', 0, line)
+ line += 1
+
return token
def scan_plain_spaces(self, indent, start_mark):
@@ -1764,7 +1791,7 @@ class RoundTripScanner(Scanner):
while self.need_more_tokens():
self.fetch_more_tokens()
self._gather_comments()
- if bool(self.tokens):
+ if len(self.tokens) > 0:
if not choices:
return True
for choice in choices:
@@ -1778,13 +1805,13 @@ class RoundTripScanner(Scanner):
while self.need_more_tokens():
self.fetch_more_tokens()
self._gather_comments()
- if bool(self.tokens):
+ if len(self.tokens) > 0:
return self.tokens[0]
return None
def _gather_comments(self):
# type: () -> Any
- """combine multiple comment lines"""
+ """combine multiple comment lines and assign to next non-comment-token"""
comments = [] # type: List[Any]
if not self.tokens:
return comments
@@ -1813,7 +1840,7 @@ class RoundTripScanner(Scanner):
while self.need_more_tokens():
self.fetch_more_tokens()
self._gather_comments()
- if bool(self.tokens):
+ if len(self.tokens) > 0:
# nprint('tk', self.tokens)
# only add post comment to single line tokens:
# scalar, value token. FlowXEndToken, otherwise
@@ -1925,7 +1952,7 @@ class RoundTripScanner(Scanner):
if not self.flow_level:
self.allow_simple_key = True
return comment, start_mark, end_mark
- if bool(self.scan_line_break()):
+ if self.scan_line_break() != '':
start_mark = self.reader.get_mark()
if not self.flow_level:
self.allow_simple_key = True
@@ -1973,3 +2000,377 @@ class RoundTripScanner(Scanner):
def scan_block_scalar(self, style, rt=True):
# type: (Any, Optional[bool]) -> Any
return Scanner.scan_block_scalar(self, style, rt=rt)
+
+
+# commenthandling 2021, differentiatiation not needed
+
+VALUECMNT = 0
+KEYCMNT = 0 # 1
+#TAGCMNT = 2
+#ANCHORCMNT = 3
+
+
+class CommentBase:
+ __slots__ = ('value', 'line', 'column', 'used', 'function', 'fline', 'ufun', 'uline')
+ def __init__(self, value, line, column):
+ self.value = value
+ self.line = line
+ self.column = column
+ self.used = ' '
+ info = inspect.getframeinfo(inspect.stack()[3][0])
+ self.function = info.function
+ self.fline = info.lineno
+ self.ufun = None
+ self.uline = None
+
+ def set_used(self, v='+'):
+ self.used = v
+ info = inspect.getframeinfo(inspect.stack()[1][0])
+ self.ufun = info.function
+ self.uline = info.lineno
+
+ def set_assigned(self):
+ self.used = '|'
+
+ def __str__(self):
+ return _F('{value}', value=self.value)
+
+ def __repr__(self):
+ return _F('{value!r}', value=self.value)
+
+ def info(self):
+ return _F('{name}{used} {line:2}:{column:<2} "{value:40s} {function}:{fline} {ufun}:{uline}',
+ name=self.name, line=self.line, column=self.column, value=self.value + '"', used=self.used,
+ function=self.function, fline=self.fline, ufun=self.ufun, uline=self.uline)
+
+
+class EOLComment(CommentBase):
+ name = 'EOLC'
+
+ def __init__(self, value, line, column):
+ super().__init__(value, line, column)
+
+
+class FullLineComment(CommentBase):
+ name = 'FULL'
+
+ def __init__(self, value, line, column):
+ super().__init__(value, line, column)
+
+
+class BlankLineComment(CommentBase):
+ name = 'BLNK'
+
+ def __init__(self, value, line, column):
+ super().__init__(value, line, column)
+
+
+class ScannedComments:
+ def __init__(self):
+ self.comments = {}
+ self.unused = []
+
+ def add_eol_comment(self, comment, column, line):
+ info = inspect.getframeinfo(inspect.stack()[1][0])
+ if comment.count('\n') == 1:
+ assert comment[-1] == '\n'
+ else:
+ assert '\n' not in comment
+ self.comments[line] = retval = EOLComment(comment[:-1], line, column)
+ self.unused.append(line)
+ return retval
+
+ def add_blank_line(self, comment, column, line):
+ info = inspect.getframeinfo(inspect.stack()[1][0])
+ assert comment.count('\n') == 1 and comment[-1] == '\n'
+ assert line not in self.comments
+ self.comments[line] = retval = BlankLineComment(comment[:-1], line, column)
+ self.unused.append(line)
+ return retval
+
+ def add_full_line_comment(self, comment, column, line):
+ info = inspect.getframeinfo(inspect.stack()[1][0])
+ assert comment.count('\n') == 1 and comment[-1] == '\n'
+ #if comment.startswith('# C12'):
+ # raise
+ # this raises in line 2127 fro 330
+ self.comments[line] = retval = FullLineComment(comment[:-1], line, column)
+ self.unused.append(line)
+ return retval
+
+ def __getitem__(self, idx):
+ return self.comments[idx]
+
+ def __str__(self):
+ return 'ParsedComments:\n ' + \
+ '\n '.join((_F('{lineno:2} {x}', lineno=lineno, x=x.info()) for lineno, x in self.comments.items())) + '\n'
+
+ def last(self):
+ lineno, x = list(self.comments.items())[-1]
+ return _F('{lineno:2} {x}\n', lineno=lineno, x=x.info())
+
+ def any_unprocessed(self):
+ # ToDo: might want to differentiate based on lineno
+ return len(self.unused) > 0
+ #for lno, comment in reversed(self.comments.items()):
+ # if comment.used == ' ':
+ # return True
+ #return False
+
+ def unprocessed(self, use=False):
+ while len(self.unused) > 0:
+ first = self.unused.pop(0) if use else self.unused[0]
+ info = inspect.getframeinfo(inspect.stack()[1][0])
+ xprintf('using', first, self.comments[first].value, info.function, info.lineno)
+ yield first, self.comments[first]
+ if use:
+ self.comments[first].set_used()
+
+ def assign_pre(self, token):
+ token_line = token.start_mark.line
+ info = inspect.getframeinfo(inspect.stack()[1][0])
+ xprintf('assign_pre', token_line, self.unused, info.function, info.lineno)
+ gobbled = False
+ while self.unused and self.unused[0] < token_line:
+ gobled = True
+ first = self.unused.pop(0)
+ xprintf('assign_pre < ', first)
+ self.comments[first].set_used()
+ token.add_comment_pre(first)
+ return gobbled
+
+ def assign_eol(self, tokens):
+ try:
+ comment_line = self.unused[0]
+ except IndexError:
+ return
+ if not isinstance(self.comments[comment_line], EOLComment):
+ return
+ idx = 1
+ while tokens[-idx].start_mark.line > comment_line or isinstance(tokens[-idx], ValueToken):
+ idx += 1
+ xprintf('idx1', idx)
+ if len(tokens) > idx and isinstance(tokens[-idx], ScalarToken) and isinstance(tokens[-(idx+1)], ScalarToken):
+ return
+ try:
+ if isinstance(tokens[-idx], ScalarToken) and isinstance(tokens[-(idx+1)], KeyToken):
+ try:
+ eol_idx = self.unused.pop(0)
+ self.comments[eol_idx].set_used()
+ xprintf('>>>>>a', idx, eol_idx, KEYCMNT)
+ tokens[-idx].add_comment_eol(eol_idx, KEYCMNT)
+ except IndexError:
+ raise NotImplementedError
+ return
+ except IndexError:
+ xprintf('IndexError1')
+ pass
+ try:
+ if isinstance(tokens[-idx], ScalarToken) and isinstance(tokens[-(idx+1)], (ValueToken, BlockEntryToken)):
+ try:
+ eol_idx = self.unused.pop(0)
+ self.comments[eol_idx].set_used()
+ tokens[-idx].add_comment_eol(eol_idx, VALUECMNT)
+ except IndexError:
+ raise NotImplementedError
+ return
+ except IndexError:
+ xprintf('IndexError2')
+ pass
+ for t in tokens:
+ xprintf('tt-', t)
+ xprintf('not implemented EOL', type(tokens[-idx]))
+ import sys; sys.exit(0)
+
+ def assign_post(self, token):
+ token_line = token.start_mark.line
+ info = inspect.getframeinfo(inspect.stack()[1][0])
+ xprintf('assign_post', token_line, self.unused, info.function, info.lineno)
+ gobbled = False
+ while self.unused and self.unused[0] < token_line:
+ gobled = True
+ first = self.unused.pop(0)
+ xprintf('assign_post < ', first)
+ self.comments[first].set_used()
+ token.add_comment_post(first)
+ return gobbled
+
+ def str_unprocessed(self):
+ return ''.join((_F(' {ind:2} {x}\n', ind=ind, x=x.info()) for ind, x in self.comments.items() if x.used == ' '))
+
+
+class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments
+ def __init__(self, *arg, **kw):
+ super().__init__(*arg, **kw)
+ assert self.loader is not None
+ # comments isinitialised on .need_more_tokens and persist on self.loader.parsed_comments
+ #
+ self.comments = None
+
+ def get_token(self):
+ # type: () -> Any
+ # Return the next token.
+ while self.need_more_tokens():
+ self.fetch_more_tokens()
+ if len(self.tokens) > 0:
+ if isinstance(self.tokens[0], BlockEndToken):
+ self.comments.assign_post(self.tokens[0])
+ else:
+ self.comments.assign_pre(self.tokens[0])
+ self.tokens_taken += 1
+ return self.tokens.pop(0)
+
+ def need_more_tokens(self):
+ if self.comments is None:
+ self.loader.parsed_comments = self.comments = ScannedComments()
+ if self.done:
+ return False
+ if len(self.tokens) == 0:
+ return True
+ # The current token may be a potential simple key, so we
+ # need to look further.
+ self.stale_possible_simple_keys()
+ if self.next_possible_simple_key() == self.tokens_taken:
+ return True
+ if len(self.tokens) < 2:
+ return True
+ if self.tokens[0].start_mark.line == self.tokens[-1].start_mark.line:
+ return True
+ if True:
+ xprintf('-x--', len(self.tokens))
+ for t in self.tokens:
+ xprintf(t)
+ #xprintf(self.comments.last())
+ xprintf(self.comments.str_unprocessed())
+ self.comments.assign_pre(self.tokens[0])
+ self.comments.assign_eol(self.tokens)
+ return False
+
+ def scan_to_next_token(self):
+ srp = self.reader.peek
+ srf = self.reader.forward
+ if self.reader.index == 0 and srp() == '\uFEFF':
+ srf()
+ start_mark = self.reader.get_mark()
+ # xprintf('current_mark', start_mark.line, start_mark.column)
+ found = False
+ idx = 0
+ while not found:
+ while srp() == ' ':
+ srf()
+ ch = srp()
+ if ch == '#':
+ comment_start_mark = self.reader.get_mark()
+ comment = ch
+ srf() # skipt the '#'
+ while ch not in _THE_END:
+ ch = srp()
+ if ch == '\0': # don't gobble the end-of-stream character
+ # but add an explicit newline as "YAML processors should terminate
+ # the stream with an explicit line break
+ # https://yaml.org/spec/1.2/spec.html#id2780069
+ comment += '\n'
+ break
+ comment += ch
+ srf()
+ # we have a comment
+ if start_mark.column == 0:
+ self.comments.add_full_line_comment(comment, comment_start_mark.column, comment_start_mark.line)
+ else:
+ self.comments.add_eol_comment(comment, comment_start_mark.column, comment_start_mark.line)
+ comment = ""
+ # gather any blank lines or full line comments following the comment as well
+ self.scan_empty_or_full_line_comments()
+ if not self.flow_level:
+ self.allow_simple_key = True
+ return
+ if bool(self.scan_line_break()):
+ # start_mark = self.reader.get_mark()
+ if not self.flow_level:
+ self.allow_simple_key = True
+ self.scan_empty_or_full_line_comments()
+ return None
+ ch = srp()
+ if ch == '\n': # empty toplevel lines
+ start_mark = self.reader.get_mark()
+ comment = ""
+ while ch:
+ ch = self.scan_line_break(empty_line=True)
+ comment += ch
+ if srp() == '#':
+ # empty line followed by indented real comment
+ comment = comment.rsplit('\n', 1)[0] + '\n'
+ end_mark = self.reader.get_mark()
+ return None
+ else:
+ found = True
+ return None
+
+ def scan_empty_or_full_line_comments(self):
+ blmark = self.reader.get_mark()
+ assert blmark.column == 0
+ blanks = ""
+ comment = None
+ mark = None
+ ch = self.reader.peek()
+ while True:
+ # nprint('ch', repr(ch), self.reader.get_mark().column)
+ if ch in '\r\n\x85\u2028\u2029':
+ if self.reader.prefix(2) == '\r\n':
+ self.reader.forward(2)
+ else:
+ self.reader.forward()
+ if comment is not None:
+ comment += '\n'
+ self.comments.add_full_line_comment(comment, mark.column, mark.line)
+ comment = None
+ else:
+ blanks += '\n'
+ self.comments.add_blank_line(blanks, blmark.column, blmark.line)
+ blanks = ""
+ blmark = self.reader.get_mark()
+ ch = self.reader.peek()
+ continue
+ if comment is None:
+ if ch in ' \t':
+ blanks += ch
+ elif ch == '#':
+ mark = self.reader.get_mark()
+ comment = '#'
+ else:
+ # print('breaking on', repr(ch))
+ break
+ else:
+ comment += ch
+ self.reader.forward()
+ ch = self.reader.peek()
+
+ def scan_block_scalar_ignored_line(self, start_mark):
+ # type: (Any) -> Any
+ # See the specification for details.
+ srp = self.reader.peek
+ srf = self.reader.forward
+ prefix = ''
+ comment = None
+ while srp() == ' ':
+ prefix += srp()
+ srf()
+ if srp() == '#':
+ comment = ''
+ mark = self.reader.get_mark()
+ while srp() not in _THE_END:
+ comment += srp()
+ srf()
+ comment += '\n'
+ ch = srp()
+ if ch not in _THE_END:
+ raise ScannerError(
+ 'while scanning a block scalar',
+ start_mark,
+ _F('expected a comment or a line break, but found {ch!r}', ch=ch),
+ self.reader.get_mark(),
+ )
+ if comment is not None:
+ self.comments.add_eol_comment(comment, mark.column, mark.line)
+ self.scan_line_break()
+ return None