diff options
Diffstat (limited to 'scanner.py')
-rw-r--r-- | scanner.py | 445 |
1 files changed, 423 insertions, 22 deletions
@@ -44,6 +44,10 @@ _THE_END = '\n\0\r\x85\u2028\u2029' _THE_END_SPACE_TAB = ' \n\0\t\r\x85\u2028\u2029' _SPACE_TAB = ' \t' +def xprintf(*args, **kw): + return nprintf(*args, **kw) + pass + class ScannerError(MarkedYAMLError): pass @@ -167,7 +171,7 @@ class Scanner: # Check if the next token is one of the given types. while self.need_more_tokens(): self.fetch_more_tokens() - if bool(self.tokens): + if len(self.tokens) > 0: if not choices: return True for choice in choices: @@ -180,7 +184,7 @@ class Scanner: # Return the next token, but do not delete if from the queue. while self.need_more_tokens(): self.fetch_more_tokens() - if bool(self.tokens): + if len(self.tokens) > 0: return self.tokens[0] def get_token(self): @@ -188,7 +192,7 @@ class Scanner: # Return the next token. while self.need_more_tokens(): self.fetch_more_tokens() - if bool(self.tokens): + if len(self.tokens) > 0: self.tokens_taken += 1 return self.tokens.pop(0) @@ -198,7 +202,7 @@ class Scanner: # type: () -> bool if self.done: return False - if not self.tokens: + if len(self.tokens) == 0: return True # The current token may be a potential simple key, so we # need to look further. @@ -1231,21 +1235,33 @@ class Scanner: # We are done. token = ScalarToken("".join(chunks), False, start_mark, end_mark, style) - if block_scalar_comment is not None: - token.add_pre_comments([block_scalar_comment]) + if self.loader is not None: + comment_handler = getattr(self.loader, 'comment_handling', False) + if comment_handler is None: + if block_scalar_comment is not None: + token.add_pre_comments([block_scalar_comment]) if len(trailing) > 0: - # nprint('trailing 1', trailing) # XXXXX # Eat whitespaces and comments until we reach the next token. + if self.loader is not None: + comment_handler = getattr(self.loader, 'comment_handling', None) + if comment_handler is not None: + line = end_mark.line - len(trailing) + for x in trailing: + assert x[-1] == '\n' + self.comments.add_blank_line(x, 0, line) + line += 1 comment = self.scan_to_next_token() while comment: trailing.append(' ' * comment[1].column + comment[0]) comment = self.scan_to_next_token() - - # Keep track of the trailing whitespace and following comments - # as a comment token, if isn't all included in the actual value. - comment_end_mark = self.reader.get_mark() - comment = CommentToken("".join(trailing), end_mark, comment_end_mark) - token.add_post_comment(comment) + if self.loader is not None: + comment_handler = getattr(self.loader, 'comment_handling', False) + if comment_handler is None: + # Keep track of the trailing whitespace and following comments + # as a comment token, if isn't all included in the actual value. + comment_end_mark = self.reader.get_mark() + comment = CommentToken("".join(trailing), end_mark, comment_end_mark) + token.add_post_comment(comment) return token def scan_block_scalar_indicators(self, start_mark): @@ -1590,10 +1606,21 @@ class Scanner: break token = ScalarToken("".join(chunks), True, start_mark, end_mark) - if spaces and spaces[0] == '\n': - # Create a comment token to preserve the trailing line breaks. - comment = CommentToken("".join(spaces) + '\n', start_mark, end_mark) - token.add_post_comment(comment) + # getattr provides True so C type loader, which cannot handle comment, will not make CommentToken + if self.loader is not None: + comment_handler = getattr(self.loader, 'comment_handling', False) + if comment_handler is None: + if spaces and spaces[0] == '\n': + # Create a comment token to preserve the trailing line breaks. + comment = CommentToken("".join(spaces) + '\n', start_mark, end_mark) + token.add_post_comment(comment) + elif comment_handler is not False: + line = start_mark.line + 1 + for ch in spaces: + if ch == '\n': + self.comments.add_blank_line('\n', 0, line) + line += 1 + return token def scan_plain_spaces(self, indent, start_mark): @@ -1764,7 +1791,7 @@ class RoundTripScanner(Scanner): while self.need_more_tokens(): self.fetch_more_tokens() self._gather_comments() - if bool(self.tokens): + if len(self.tokens) > 0: if not choices: return True for choice in choices: @@ -1778,13 +1805,13 @@ class RoundTripScanner(Scanner): while self.need_more_tokens(): self.fetch_more_tokens() self._gather_comments() - if bool(self.tokens): + if len(self.tokens) > 0: return self.tokens[0] return None def _gather_comments(self): # type: () -> Any - """combine multiple comment lines""" + """combine multiple comment lines and assign to next non-comment-token""" comments = [] # type: List[Any] if not self.tokens: return comments @@ -1813,7 +1840,7 @@ class RoundTripScanner(Scanner): while self.need_more_tokens(): self.fetch_more_tokens() self._gather_comments() - if bool(self.tokens): + if len(self.tokens) > 0: # nprint('tk', self.tokens) # only add post comment to single line tokens: # scalar, value token. FlowXEndToken, otherwise @@ -1925,7 +1952,7 @@ class RoundTripScanner(Scanner): if not self.flow_level: self.allow_simple_key = True return comment, start_mark, end_mark - if bool(self.scan_line_break()): + if self.scan_line_break() != '': start_mark = self.reader.get_mark() if not self.flow_level: self.allow_simple_key = True @@ -1973,3 +2000,377 @@ class RoundTripScanner(Scanner): def scan_block_scalar(self, style, rt=True): # type: (Any, Optional[bool]) -> Any return Scanner.scan_block_scalar(self, style, rt=rt) + + +# commenthandling 2021, differentiatiation not needed + +VALUECMNT = 0 +KEYCMNT = 0 # 1 +#TAGCMNT = 2 +#ANCHORCMNT = 3 + + +class CommentBase: + __slots__ = ('value', 'line', 'column', 'used', 'function', 'fline', 'ufun', 'uline') + def __init__(self, value, line, column): + self.value = value + self.line = line + self.column = column + self.used = ' ' + info = inspect.getframeinfo(inspect.stack()[3][0]) + self.function = info.function + self.fline = info.lineno + self.ufun = None + self.uline = None + + def set_used(self, v='+'): + self.used = v + info = inspect.getframeinfo(inspect.stack()[1][0]) + self.ufun = info.function + self.uline = info.lineno + + def set_assigned(self): + self.used = '|' + + def __str__(self): + return _F('{value}', value=self.value) + + def __repr__(self): + return _F('{value!r}', value=self.value) + + def info(self): + return _F('{name}{used} {line:2}:{column:<2} "{value:40s} {function}:{fline} {ufun}:{uline}', + name=self.name, line=self.line, column=self.column, value=self.value + '"', used=self.used, + function=self.function, fline=self.fline, ufun=self.ufun, uline=self.uline) + + +class EOLComment(CommentBase): + name = 'EOLC' + + def __init__(self, value, line, column): + super().__init__(value, line, column) + + +class FullLineComment(CommentBase): + name = 'FULL' + + def __init__(self, value, line, column): + super().__init__(value, line, column) + + +class BlankLineComment(CommentBase): + name = 'BLNK' + + def __init__(self, value, line, column): + super().__init__(value, line, column) + + +class ScannedComments: + def __init__(self): + self.comments = {} + self.unused = [] + + def add_eol_comment(self, comment, column, line): + info = inspect.getframeinfo(inspect.stack()[1][0]) + if comment.count('\n') == 1: + assert comment[-1] == '\n' + else: + assert '\n' not in comment + self.comments[line] = retval = EOLComment(comment[:-1], line, column) + self.unused.append(line) + return retval + + def add_blank_line(self, comment, column, line): + info = inspect.getframeinfo(inspect.stack()[1][0]) + assert comment.count('\n') == 1 and comment[-1] == '\n' + assert line not in self.comments + self.comments[line] = retval = BlankLineComment(comment[:-1], line, column) + self.unused.append(line) + return retval + + def add_full_line_comment(self, comment, column, line): + info = inspect.getframeinfo(inspect.stack()[1][0]) + assert comment.count('\n') == 1 and comment[-1] == '\n' + #if comment.startswith('# C12'): + # raise + # this raises in line 2127 fro 330 + self.comments[line] = retval = FullLineComment(comment[:-1], line, column) + self.unused.append(line) + return retval + + def __getitem__(self, idx): + return self.comments[idx] + + def __str__(self): + return 'ParsedComments:\n ' + \ + '\n '.join((_F('{lineno:2} {x}', lineno=lineno, x=x.info()) for lineno, x in self.comments.items())) + '\n' + + def last(self): + lineno, x = list(self.comments.items())[-1] + return _F('{lineno:2} {x}\n', lineno=lineno, x=x.info()) + + def any_unprocessed(self): + # ToDo: might want to differentiate based on lineno + return len(self.unused) > 0 + #for lno, comment in reversed(self.comments.items()): + # if comment.used == ' ': + # return True + #return False + + def unprocessed(self, use=False): + while len(self.unused) > 0: + first = self.unused.pop(0) if use else self.unused[0] + info = inspect.getframeinfo(inspect.stack()[1][0]) + xprintf('using', first, self.comments[first].value, info.function, info.lineno) + yield first, self.comments[first] + if use: + self.comments[first].set_used() + + def assign_pre(self, token): + token_line = token.start_mark.line + info = inspect.getframeinfo(inspect.stack()[1][0]) + xprintf('assign_pre', token_line, self.unused, info.function, info.lineno) + gobbled = False + while self.unused and self.unused[0] < token_line: + gobled = True + first = self.unused.pop(0) + xprintf('assign_pre < ', first) + self.comments[first].set_used() + token.add_comment_pre(first) + return gobbled + + def assign_eol(self, tokens): + try: + comment_line = self.unused[0] + except IndexError: + return + if not isinstance(self.comments[comment_line], EOLComment): + return + idx = 1 + while tokens[-idx].start_mark.line > comment_line or isinstance(tokens[-idx], ValueToken): + idx += 1 + xprintf('idx1', idx) + if len(tokens) > idx and isinstance(tokens[-idx], ScalarToken) and isinstance(tokens[-(idx+1)], ScalarToken): + return + try: + if isinstance(tokens[-idx], ScalarToken) and isinstance(tokens[-(idx+1)], KeyToken): + try: + eol_idx = self.unused.pop(0) + self.comments[eol_idx].set_used() + xprintf('>>>>>a', idx, eol_idx, KEYCMNT) + tokens[-idx].add_comment_eol(eol_idx, KEYCMNT) + except IndexError: + raise NotImplementedError + return + except IndexError: + xprintf('IndexError1') + pass + try: + if isinstance(tokens[-idx], ScalarToken) and isinstance(tokens[-(idx+1)], (ValueToken, BlockEntryToken)): + try: + eol_idx = self.unused.pop(0) + self.comments[eol_idx].set_used() + tokens[-idx].add_comment_eol(eol_idx, VALUECMNT) + except IndexError: + raise NotImplementedError + return + except IndexError: + xprintf('IndexError2') + pass + for t in tokens: + xprintf('tt-', t) + xprintf('not implemented EOL', type(tokens[-idx])) + import sys; sys.exit(0) + + def assign_post(self, token): + token_line = token.start_mark.line + info = inspect.getframeinfo(inspect.stack()[1][0]) + xprintf('assign_post', token_line, self.unused, info.function, info.lineno) + gobbled = False + while self.unused and self.unused[0] < token_line: + gobled = True + first = self.unused.pop(0) + xprintf('assign_post < ', first) + self.comments[first].set_used() + token.add_comment_post(first) + return gobbled + + def str_unprocessed(self): + return ''.join((_F(' {ind:2} {x}\n', ind=ind, x=x.info()) for ind, x in self.comments.items() if x.used == ' ')) + + +class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments + def __init__(self, *arg, **kw): + super().__init__(*arg, **kw) + assert self.loader is not None + # comments isinitialised on .need_more_tokens and persist on self.loader.parsed_comments + # + self.comments = None + + def get_token(self): + # type: () -> Any + # Return the next token. + while self.need_more_tokens(): + self.fetch_more_tokens() + if len(self.tokens) > 0: + if isinstance(self.tokens[0], BlockEndToken): + self.comments.assign_post(self.tokens[0]) + else: + self.comments.assign_pre(self.tokens[0]) + self.tokens_taken += 1 + return self.tokens.pop(0) + + def need_more_tokens(self): + if self.comments is None: + self.loader.parsed_comments = self.comments = ScannedComments() + if self.done: + return False + if len(self.tokens) == 0: + return True + # The current token may be a potential simple key, so we + # need to look further. + self.stale_possible_simple_keys() + if self.next_possible_simple_key() == self.tokens_taken: + return True + if len(self.tokens) < 2: + return True + if self.tokens[0].start_mark.line == self.tokens[-1].start_mark.line: + return True + if True: + xprintf('-x--', len(self.tokens)) + for t in self.tokens: + xprintf(t) + #xprintf(self.comments.last()) + xprintf(self.comments.str_unprocessed()) + self.comments.assign_pre(self.tokens[0]) + self.comments.assign_eol(self.tokens) + return False + + def scan_to_next_token(self): + srp = self.reader.peek + srf = self.reader.forward + if self.reader.index == 0 and srp() == '\uFEFF': + srf() + start_mark = self.reader.get_mark() + # xprintf('current_mark', start_mark.line, start_mark.column) + found = False + idx = 0 + while not found: + while srp() == ' ': + srf() + ch = srp() + if ch == '#': + comment_start_mark = self.reader.get_mark() + comment = ch + srf() # skipt the '#' + while ch not in _THE_END: + ch = srp() + if ch == '\0': # don't gobble the end-of-stream character + # but add an explicit newline as "YAML processors should terminate + # the stream with an explicit line break + # https://yaml.org/spec/1.2/spec.html#id2780069 + comment += '\n' + break + comment += ch + srf() + # we have a comment + if start_mark.column == 0: + self.comments.add_full_line_comment(comment, comment_start_mark.column, comment_start_mark.line) + else: + self.comments.add_eol_comment(comment, comment_start_mark.column, comment_start_mark.line) + comment = "" + # gather any blank lines or full line comments following the comment as well + self.scan_empty_or_full_line_comments() + if not self.flow_level: + self.allow_simple_key = True + return + if bool(self.scan_line_break()): + # start_mark = self.reader.get_mark() + if not self.flow_level: + self.allow_simple_key = True + self.scan_empty_or_full_line_comments() + return None + ch = srp() + if ch == '\n': # empty toplevel lines + start_mark = self.reader.get_mark() + comment = "" + while ch: + ch = self.scan_line_break(empty_line=True) + comment += ch + if srp() == '#': + # empty line followed by indented real comment + comment = comment.rsplit('\n', 1)[0] + '\n' + end_mark = self.reader.get_mark() + return None + else: + found = True + return None + + def scan_empty_or_full_line_comments(self): + blmark = self.reader.get_mark() + assert blmark.column == 0 + blanks = "" + comment = None + mark = None + ch = self.reader.peek() + while True: + # nprint('ch', repr(ch), self.reader.get_mark().column) + if ch in '\r\n\x85\u2028\u2029': + if self.reader.prefix(2) == '\r\n': + self.reader.forward(2) + else: + self.reader.forward() + if comment is not None: + comment += '\n' + self.comments.add_full_line_comment(comment, mark.column, mark.line) + comment = None + else: + blanks += '\n' + self.comments.add_blank_line(blanks, blmark.column, blmark.line) + blanks = "" + blmark = self.reader.get_mark() + ch = self.reader.peek() + continue + if comment is None: + if ch in ' \t': + blanks += ch + elif ch == '#': + mark = self.reader.get_mark() + comment = '#' + else: + # print('breaking on', repr(ch)) + break + else: + comment += ch + self.reader.forward() + ch = self.reader.peek() + + def scan_block_scalar_ignored_line(self, start_mark): + # type: (Any) -> Any + # See the specification for details. + srp = self.reader.peek + srf = self.reader.forward + prefix = '' + comment = None + while srp() == ' ': + prefix += srp() + srf() + if srp() == '#': + comment = '' + mark = self.reader.get_mark() + while srp() not in _THE_END: + comment += srp() + srf() + comment += '\n' + ch = srp() + if ch not in _THE_END: + raise ScannerError( + 'while scanning a block scalar', + start_mark, + _F('expected a comment or a line break, but found {ch!r}', ch=ch), + self.reader.get_mark(), + ) + if comment is not None: + self.comments.add_eol_comment(comment, mark.column, mark.line) + self.scan_line_break() + return None |