diff options
author | Anthon van der Neut <anthon@mnt.org> | 2023-05-01 19:13:50 +0200 |
---|---|---|
committer | Anthon van der Neut <anthon@mnt.org> | 2023-05-01 19:13:50 +0200 |
commit | 8b731994b1543d7886af85f926d9eea5a22d0732 (patch) | |
tree | 3553d4cbc80b541484d7a3f39e00cdcfd8f9d030 /scanner.py | |
parent | 45111ba0b67e8619265d89f3202635e62c13cde6 (diff) | |
download | ruamel.yaml-8b731994b1543d7886af85f926d9eea5a22d0732.tar.gz |
retrofitted 0.18 changes
Diffstat (limited to 'scanner.py')
-rw-r--r-- | scanner.py | 493 |
1 files changed, 178 insertions, 315 deletions
@@ -31,11 +31,10 @@ import inspect from ruamel.yaml.error import MarkedYAMLError, CommentMark # NOQA from ruamel.yaml.tokens import * # NOQA -from ruamel.yaml.compat import _F, check_anchorname_char, nprint, nprintf # NOQA +from ruamel.yaml.compat import check_anchorname_char, nprint, nprintf # NOQA -if False: # MYPY - from typing import Any, Dict, Optional, List, Union, Text # NOQA - from ruamel.yaml.compat import VersionType # NOQA +from typing import Any, Dict, Optional, List, Union, Text # NOQA +from ruamel.yaml.compat import VersionType # NOQA __all__ = ['Scanner', 'RoundTripScanner', 'ScannerError'] @@ -45,8 +44,7 @@ _THE_END_SPACE_TAB = ' \n\0\t\r\x85\u2028\u2029' _SPACE_TAB = ' \t' -def xprintf(*args, **kw): - # type: (Any, Any) -> Any +def xprintf(*args: Any, **kw: Any) -> Any: return nprintf(*args, **kw) pass @@ -58,8 +56,9 @@ class ScannerError(MarkedYAMLError): class SimpleKey: # See below simple keys treatment. - def __init__(self, token_number, required, index, line, column, mark): - # type: (Any, Any, int, int, int, Any) -> None + def __init__( + self, token_number: Any, required: Any, index: int, line: int, column: int, mark: Any + ) -> None: self.token_number = token_number self.required = required self.index = index @@ -69,8 +68,7 @@ class SimpleKey: class Scanner: - def __init__(self, loader=None): - # type: (Any) -> None + def __init__(self, loader: Any = None) -> None: """Initialize the scanner.""" # It is assumed that Scanner and Reader will have a common descendant. # Reader do the dirty work of checking for BOM and converting the @@ -86,24 +84,22 @@ class Scanner: self.loader._scanner = self self.reset_scanner() self.first_time = False - self.yaml_version = None # type: Any + self.yaml_version: Any = None @property - def flow_level(self): - # type: () -> int + def flow_level(self) -> int: return len(self.flow_context) - def reset_scanner(self): - # type: () -> None + def reset_scanner(self) -> None: # Had we reached the end of the stream? self.done = False # flow_context is an expanding/shrinking list consisting of '{' and '[' # for each unclosed flow context. If empty list that means block context - self.flow_context = [] # type: List[Text] + self.flow_context: List[Text] = [] # List of processed tokens that are not yet emitted. - self.tokens = [] # type: List[Any] + self.tokens: List[Any] = [] # Add the STREAM-START token. self.fetch_stream_start() @@ -115,7 +111,7 @@ class Scanner: self.indent = -1 # Past indentation levels. - self.indents = [] # type: List[int] + self.indents: List[int] = [] # Variables related to simple keys treatment. @@ -145,11 +141,10 @@ class Scanner: # (token_number, required, index, line, column, mark) # A simple key may start with ALIAS, ANCHOR, TAG, SCALAR(flow), # '[', or '{' tokens. - self.possible_simple_keys = {} # type: Dict[Any, Any] + self.possible_simple_keys: Dict[Any, Any] = {} @property - def reader(self): - # type: () -> Any + def reader(self) -> Any: try: return self._scanner_reader # type: ignore except AttributeError: @@ -160,16 +155,14 @@ class Scanner: return self._scanner_reader @property - def scanner_processing_version(self): # prefix until un-composited - # type: () -> Any + def scanner_processing_version(self) -> Any: # prefix until un-composited if hasattr(self.loader, 'typ'): return self.loader.resolver.processing_version return self.loader.processing_version # Public methods. - def check_token(self, *choices): - # type: (Any) -> bool + def check_token(self, *choices: Any) -> bool: # Check if the next token is one of the given types. while self.need_more_tokens(): self.fetch_more_tokens() @@ -181,16 +174,14 @@ class Scanner: return True return False - def peek_token(self): - # type: () -> Any + def peek_token(self) -> Any: # Return the next token, but do not delete if from the queue. while self.need_more_tokens(): self.fetch_more_tokens() if len(self.tokens) > 0: return self.tokens[0] - def get_token(self): - # type: () -> Any + def get_token(self) -> Any: # Return the next token. while self.need_more_tokens(): self.fetch_more_tokens() @@ -200,8 +191,7 @@ class Scanner: # Private methods. - def need_more_tokens(self): - # type: () -> bool + def need_more_tokens(self) -> bool: if self.done: return False if len(self.tokens) == 0: @@ -213,12 +203,10 @@ class Scanner: return True return False - def fetch_comment(self, comment): - # type: (Any) -> None + def fetch_comment(self, comment: Any) -> None: raise NotImplementedError - def fetch_more_tokens(self): - # type: () -> Any + def fetch_more_tokens(self) -> Any: # Eat whitespaces and comments until we reach the next token. comment = self.scan_to_next_token() if comment is not None: # never happens for base scanner @@ -323,14 +311,13 @@ class Scanner: raise ScannerError( 'while scanning for the next token', None, - _F('found character {ch!r} that cannot start any token', ch=ch), + f'found character {ch!r} that cannot start any token', self.reader.get_mark(), ) # Simple keys treatment. - def next_possible_simple_key(self): - # type: () -> Any + def next_possible_simple_key(self) -> Any: # Return the number of the nearest possible simple key. Actually we # don't need to loop through the whole dictionary. We may replace it # with the following code: @@ -345,8 +332,7 @@ class Scanner: min_token_number = key.token_number return min_token_number - def stale_possible_simple_keys(self): - # type: () -> None + def stale_possible_simple_keys(self) -> None: # Remove entries that are no longer possible simple keys. According to # the YAML specification, simple keys # - should be limited to a single line, @@ -365,8 +351,7 @@ class Scanner: ) del self.possible_simple_keys[level] - def save_possible_simple_key(self): - # type: () -> None + def save_possible_simple_key(self) -> None: # The next token may start a simple key. We check if it's possible # and save its position. This function is called for # ALIAS, ANCHOR, TAG, SCALAR(flow), '[', and '{'. @@ -389,8 +374,7 @@ class Scanner: ) self.possible_simple_keys[self.flow_level] = key - def remove_possible_simple_key(self): - # type: () -> None + def remove_possible_simple_key(self) -> None: # Remove the saved possible key position at the current flow level. if self.flow_level in self.possible_simple_keys: key = self.possible_simple_keys[self.flow_level] @@ -407,8 +391,7 @@ class Scanner: # Indentation functions. - def unwind_indent(self, column): - # type: (Any) -> None + def unwind_indent(self, column: Any) -> None: # In flow context, tokens should respect indentation. # Actually the condition should be `self.indent >= column` according to # the spec. But this condition will prohibit intuitively correct @@ -432,8 +415,7 @@ class Scanner: self.indent = self.indents.pop() self.tokens.append(BlockEndToken(mark, mark)) - def add_indent(self, column): - # type: (int) -> bool + def add_indent(self, column: int) -> bool: # Check if we need to increase indentation. if self.indent < column: self.indents.append(self.indent) @@ -443,8 +425,7 @@ class Scanner: # Fetchers. - def fetch_stream_start(self): - # type: () -> None + def fetch_stream_start(self) -> None: # We always add STREAM-START as the first token and STREAM-END as the # last token. # Read the token. @@ -452,8 +433,7 @@ class Scanner: # Add STREAM-START. self.tokens.append(StreamStartToken(mark, mark, encoding=self.reader.encoding)) - def fetch_stream_end(self): - # type: () -> None + def fetch_stream_end(self) -> None: # Set the current intendation to -1. self.unwind_indent(-1) # Reset simple keys. @@ -467,8 +447,7 @@ class Scanner: # The steam is finished. self.done = True - def fetch_directive(self): - # type: () -> None + def fetch_directive(self) -> None: # Set the current intendation to -1. self.unwind_indent(-1) @@ -479,16 +458,13 @@ class Scanner: # Scan and add DIRECTIVE. self.tokens.append(self.scan_directive()) - def fetch_document_start(self): - # type: () -> None + def fetch_document_start(self) -> None: self.fetch_document_indicator(DocumentStartToken) - def fetch_document_end(self): - # type: () -> None + def fetch_document_end(self) -> None: self.fetch_document_indicator(DocumentEndToken) - def fetch_document_indicator(self, TokenClass): - # type: (Any) -> None + def fetch_document_indicator(self, TokenClass: Any) -> None: # Set the current intendation to -1. self.unwind_indent(-1) @@ -503,16 +479,13 @@ class Scanner: end_mark = self.reader.get_mark() self.tokens.append(TokenClass(start_mark, end_mark)) - def fetch_flow_sequence_start(self): - # type: () -> None + def fetch_flow_sequence_start(self) -> None: self.fetch_flow_collection_start(FlowSequenceStartToken, to_push='[') - def fetch_flow_mapping_start(self): - # type: () -> None + def fetch_flow_mapping_start(self) -> None: self.fetch_flow_collection_start(FlowMappingStartToken, to_push='{') - def fetch_flow_collection_start(self, TokenClass, to_push): - # type: (Any, Text) -> None + def fetch_flow_collection_start(self, TokenClass: Any, to_push: Text) -> None: # '[' and '{' may start a simple key. self.save_possible_simple_key() # Increase the flow level. @@ -525,16 +498,13 @@ class Scanner: end_mark = self.reader.get_mark() self.tokens.append(TokenClass(start_mark, end_mark)) - def fetch_flow_sequence_end(self): - # type: () -> None + def fetch_flow_sequence_end(self) -> None: self.fetch_flow_collection_end(FlowSequenceEndToken) - def fetch_flow_mapping_end(self): - # type: () -> None + def fetch_flow_mapping_end(self) -> None: self.fetch_flow_collection_end(FlowMappingEndToken) - def fetch_flow_collection_end(self, TokenClass): - # type: (Any) -> None + def fetch_flow_collection_end(self, TokenClass: Any) -> None: # Reset possible simple key on the current level. self.remove_possible_simple_key() # Decrease the flow level. @@ -552,8 +522,7 @@ class Scanner: end_mark = self.reader.get_mark() self.tokens.append(TokenClass(start_mark, end_mark)) - def fetch_flow_entry(self): - # type: () -> None + def fetch_flow_entry(self) -> None: # Simple keys are allowed after ','. self.allow_simple_key = True # Reset possible simple key on the current level. @@ -564,8 +533,7 @@ class Scanner: end_mark = self.reader.get_mark() self.tokens.append(FlowEntryToken(start_mark, end_mark)) - def fetch_block_entry(self): - # type: () -> None + def fetch_block_entry(self) -> None: # Block context needs additional checks. if not self.flow_level: # Are we allowed to start a new entry? @@ -592,8 +560,7 @@ class Scanner: end_mark = self.reader.get_mark() self.tokens.append(BlockEntryToken(start_mark, end_mark)) - def fetch_key(self): - # type: () -> None + def fetch_key(self) -> None: # Block context needs additional checks. if not self.flow_level: @@ -620,8 +587,7 @@ class Scanner: end_mark = self.reader.get_mark() self.tokens.append(KeyToken(start_mark, end_mark)) - def fetch_value(self): - # type: () -> None + def fetch_value(self) -> None: # Do we determine a simple key? if self.flow_level in self.possible_simple_keys: # Add KEY. @@ -681,8 +647,7 @@ class Scanner: end_mark = self.reader.get_mark() self.tokens.append(ValueToken(start_mark, end_mark)) - def fetch_alias(self): - # type: () -> None + def fetch_alias(self) -> None: # ALIAS could be a simple key. self.save_possible_simple_key() # No simple keys after ALIAS. @@ -690,8 +655,7 @@ class Scanner: # Scan and add ALIAS. self.tokens.append(self.scan_anchor(AliasToken)) - def fetch_anchor(self): - # type: () -> None + def fetch_anchor(self) -> None: # ANCHOR could start a simple key. self.save_possible_simple_key() # No simple keys after ANCHOR. @@ -699,8 +663,7 @@ class Scanner: # Scan and add ANCHOR. self.tokens.append(self.scan_anchor(AnchorToken)) - def fetch_tag(self): - # type: () -> None + def fetch_tag(self) -> None: # TAG could start a simple key. self.save_possible_simple_key() # No simple keys after TAG. @@ -708,16 +671,13 @@ class Scanner: # Scan and add TAG. self.tokens.append(self.scan_tag()) - def fetch_literal(self): - # type: () -> None + def fetch_literal(self) -> None: self.fetch_block_scalar(style='|') - def fetch_folded(self): - # type: () -> None + def fetch_folded(self) -> None: self.fetch_block_scalar(style='>') - def fetch_block_scalar(self, style): - # type: (Any) -> None + def fetch_block_scalar(self, style: Any) -> None: # A simple key may follow a block scalar. self.allow_simple_key = True # Reset possible simple key on the current level. @@ -725,16 +685,13 @@ class Scanner: # Scan and add SCALAR. self.tokens.append(self.scan_block_scalar(style)) - def fetch_single(self): - # type: () -> None + def fetch_single(self) -> None: self.fetch_flow_scalar(style="'") - def fetch_double(self): - # type: () -> None + def fetch_double(self) -> None: self.fetch_flow_scalar(style='"') - def fetch_flow_scalar(self, style): - # type: (Any) -> None + def fetch_flow_scalar(self, style: Any) -> None: # A flow scalar could be a simple key. self.save_possible_simple_key() # No simple keys after flow scalars. @@ -742,8 +699,7 @@ class Scanner: # Scan and add SCALAR. self.tokens.append(self.scan_flow_scalar(style)) - def fetch_plain(self): - # type: () -> None + def fetch_plain(self) -> None: # A plain scalar could be a simple key. self.save_possible_simple_key() # No simple keys after plain scalars. But note that `scan_plain` will @@ -755,45 +711,39 @@ class Scanner: # Checkers. - def check_directive(self): - # type: () -> Any + def check_directive(self) -> Any: # DIRECTIVE: ^ '%' ... # The '%' indicator is already checked. if self.reader.column == 0: return True return None - def check_document_start(self): - # type: () -> Any + def check_document_start(self) -> Any: # DOCUMENT-START: ^ '---' (' '|'\n') if self.reader.column == 0: if self.reader.prefix(3) == '---' and self.reader.peek(3) in _THE_END_SPACE_TAB: return True return None - def check_document_end(self): - # type: () -> Any + def check_document_end(self) -> Any: # DOCUMENT-END: ^ '...' (' '|'\n') if self.reader.column == 0: if self.reader.prefix(3) == '...' and self.reader.peek(3) in _THE_END_SPACE_TAB: return True return None - def check_block_entry(self): - # type: () -> Any + def check_block_entry(self) -> Any: # BLOCK-ENTRY: '-' (' '|'\n') return self.reader.peek(1) in _THE_END_SPACE_TAB - def check_key(self): - # type: () -> Any + def check_key(self) -> Any: # KEY(flow context): '?' if bool(self.flow_level): return True # KEY(block context): '?' (' '|'\n') return self.reader.peek(1) in _THE_END_SPACE_TAB - def check_value(self): - # type: () -> Any + def check_value(self) -> Any: # VALUE(flow context): ':' if self.scanner_processing_version == (1, 1): if bool(self.flow_level): @@ -811,8 +761,7 @@ class Scanner: # VALUE(block context): ':' (' '|'\n') return self.reader.peek(1) in _THE_END_SPACE_TAB - def check_plain(self): - # type: () -> Any + def check_plain(self) -> Any: # A plain scalar may start with any non-space character except: # '-', '?', ':', ',', '[', ']', '{', '}', # '#', '&', '*', '!', '|', '>', '\'', '\"', @@ -848,8 +797,7 @@ class Scanner: # Scanners. - def scan_to_next_token(self): - # type: () -> Any + def scan_to_next_token(self) -> Any: # We ignore spaces, line breaks and comments. # If we find a line break in the block context, we set the flag # `allow_simple_key` on. @@ -887,8 +835,7 @@ class Scanner: found = True return None - def scan_directive(self): - # type: () -> Any + def scan_directive(self) -> Any: # See the specification for details. srp = self.reader.peek srf = self.reader.forward @@ -909,8 +856,7 @@ class Scanner: self.scan_directive_ignored_line(start_mark) return DirectiveToken(name, value, start_mark, end_mark) - def scan_directive_name(self, start_mark): - # type: (Any) -> Any + def scan_directive_name(self, start_mark: Any) -> Any: # See the specification for details. length = 0 srp = self.reader.peek @@ -922,7 +868,7 @@ class Scanner: raise ScannerError( 'while scanning a directive', start_mark, - _F('expected alphabetic or numeric character, but found {ch!r}', ch=ch), + f'expected alphabetic or numeric character, but found {ch!r}', self.reader.get_mark(), ) value = self.reader.prefix(length) @@ -932,13 +878,12 @@ class Scanner: raise ScannerError( 'while scanning a directive', start_mark, - _F('expected alphabetic or numeric character, but found {ch!r}', ch=ch), + f'expected alphabetic or numeric character, but found {ch!r}', self.reader.get_mark(), ) return value - def scan_yaml_directive_value(self, start_mark): - # type: (Any) -> Any + def scan_yaml_directive_value(self, start_mark: Any) -> Any: # See the specification for details. srp = self.reader.peek srf = self.reader.forward @@ -949,7 +894,7 @@ class Scanner: raise ScannerError( 'while scanning a directive', start_mark, - _F("expected a digit or '.', but found {srp_call!r}", srp_call=srp()), + f"expected a digit or '.', but found {srp()!r}", self.reader.get_mark(), ) srf() @@ -958,14 +903,13 @@ class Scanner: raise ScannerError( 'while scanning a directive', start_mark, - _F("expected a digit or '.', but found {srp_call!r}", srp_call=srp()), + f"expected a digit or '.', but found {srp()!r}", self.reader.get_mark(), ) self.yaml_version = (major, minor) return self.yaml_version - def scan_yaml_directive_number(self, start_mark): - # type: (Any) -> Any + def scan_yaml_directive_number(self, start_mark: Any) -> Any: # See the specification for details. srp = self.reader.peek srf = self.reader.forward @@ -974,7 +918,7 @@ class Scanner: raise ScannerError( 'while scanning a directive', start_mark, - _F('expected a digit, but found {ch!r}', ch=ch), + f'expected a digit, but found {ch!r}', self.reader.get_mark(), ) length = 0 @@ -984,8 +928,7 @@ class Scanner: srf(length) return value - def scan_tag_directive_value(self, start_mark): - # type: (Any) -> Any + def scan_tag_directive_value(self, start_mark: Any) -> Any: # See the specification for details. srp = self.reader.peek srf = self.reader.forward @@ -997,8 +940,7 @@ class Scanner: prefix = self.scan_tag_directive_prefix(start_mark) return (handle, prefix) - def scan_tag_directive_handle(self, start_mark): - # type: (Any) -> Any + def scan_tag_directive_handle(self, start_mark: Any) -> Any: # See the specification for details. value = self.scan_tag_handle('directive', start_mark) ch = self.reader.peek() @@ -1006,13 +948,12 @@ class Scanner: raise ScannerError( 'while scanning a directive', start_mark, - _F("expected ' ', but found {ch!r}", ch=ch), + f"expected ' ', but found {ch!r}", self.reader.get_mark(), ) return value - def scan_tag_directive_prefix(self, start_mark): - # type: (Any) -> Any + def scan_tag_directive_prefix(self, start_mark: Any) -> Any: # See the specification for details. value = self.scan_tag_uri('directive', start_mark) ch = self.reader.peek() @@ -1020,13 +961,12 @@ class Scanner: raise ScannerError( 'while scanning a directive', start_mark, - _F("expected ' ', but found {ch!r}", ch=ch), + f"expected ' ', but found {ch!r}", self.reader.get_mark(), ) return value - def scan_directive_ignored_line(self, start_mark): - # type: (Any) -> None + def scan_directive_ignored_line(self, start_mark: Any) -> None: # See the specification for details. srp = self.reader.peek srf = self.reader.forward @@ -1040,13 +980,12 @@ class Scanner: raise ScannerError( 'while scanning a directive', start_mark, - _F('expected a comment or a line break, but found {ch!r}', ch=ch), + f'expected a comment or a line break, but found {ch!r}', self.reader.get_mark(), ) self.scan_line_break() - def scan_anchor(self, TokenClass): - # type: (Any) -> Any + def scan_anchor(self, TokenClass: Any) -> Any: # The specification does not restrict characters for anchors and # aliases. This may lead to problems, for instance, the document: # [ *alias, value ] @@ -1072,9 +1011,9 @@ class Scanner: ch = srp(length) if not length: raise ScannerError( - _F('while scanning an {name!s}', name=name), + f'while scanning an {name!s}', start_mark, - _F('expected alphabetic or numeric character, but found {ch!r}', ch=ch), + f'expected alphabetic or numeric character, but found {ch!r}', self.reader.get_mark(), ) value = self.reader.prefix(length) @@ -1084,16 +1023,15 @@ class Scanner: # assert ch1 == ch if ch not in '\0 \t\r\n\x85\u2028\u2029?:,[]{}%@`': raise ScannerError( - _F('while scanning an {name!s}', name=name), + f'while scanning an {name!s}', start_mark, - _F('expected alphabetic or numeric character, but found {ch!r}', ch=ch), + f'expected alphabetic or numeric character, but found {ch!r}', self.reader.get_mark(), ) end_mark = self.reader.get_mark() return TokenClass(value, start_mark, end_mark) - def scan_tag(self): - # type: () -> Any + def scan_tag(self) -> Any: # See the specification for details. srp = self.reader.peek start_mark = self.reader.get_mark() @@ -1106,7 +1044,7 @@ class Scanner: raise ScannerError( 'while parsing a tag', start_mark, - _F("expected '>', but found {srp_call!r}", srp_call=srp()), + f"expected '>' but found {srp()!r}", self.reader.get_mark(), ) self.reader.forward() @@ -1135,15 +1073,14 @@ class Scanner: raise ScannerError( 'while scanning a tag', start_mark, - _F("expected ' ', but found {ch!r}", ch=ch), + f"expected ' ', but found {ch!r}", self.reader.get_mark(), ) value = (handle, suffix) end_mark = self.reader.get_mark() return TagToken(value, start_mark, end_mark) - def scan_block_scalar(self, style, rt=False): - # type: (Any, Optional[bool]) -> Any + def scan_block_scalar(self, style: Any, rt: Optional[bool] = False) -> Any: # See the specification for details. srp = self.reader.peek if style == '>': @@ -1151,7 +1088,7 @@ class Scanner: else: folded = False - chunks = [] # type: List[Any] + chunks: List[Any] = [] start_mark = self.reader.get_mark() # Scan the header. @@ -1227,7 +1164,7 @@ class Scanner: # Process trailing line breaks. The 'chomping' setting determines # whether they are included in the value. - trailing = [] # type: List[Any] + trailing: List[Any] = [] if chomping in [None, True]: chunks.append(line_break) if chomping is True: @@ -1266,8 +1203,7 @@ class Scanner: token.add_post_comment(comment) return token - def scan_block_scalar_indicators(self, start_mark): - # type: (Any) -> Any + def scan_block_scalar_indicators(self, start_mark: Any) -> Any: # See the specification for details. srp = self.reader.peek chomping = None @@ -1312,13 +1248,12 @@ class Scanner: raise ScannerError( 'while scanning a block scalar', start_mark, - _F('expected chomping or indentation indicators, but found {ch!r}', ch=ch), + f'expected chomping or indentation indicators, but found {ch!r}', self.reader.get_mark(), ) return chomping, increment - def scan_block_scalar_ignored_line(self, start_mark): - # type: (Any) -> Any + def scan_block_scalar_ignored_line(self, start_mark: Any) -> Any: # See the specification for details. srp = self.reader.peek srf = self.reader.forward @@ -1337,14 +1272,13 @@ class Scanner: raise ScannerError( 'while scanning a block scalar', start_mark, - _F('expected a comment or a line break, but found {ch!r}', ch=ch), + f'expected a comment or a line break, but found {ch!r}', self.reader.get_mark(), ) self.scan_line_break() return comment - def scan_block_scalar_indentation(self): - # type: () -> Any + def scan_block_scalar_indentation(self) -> Any: # See the specification for details. srp = self.reader.peek srf = self.reader.forward @@ -1361,8 +1295,7 @@ class Scanner: max_indent = self.reader.column return chunks, max_indent, end_mark - def scan_block_scalar_breaks(self, indent): - # type: (int) -> Any + def scan_block_scalar_breaks(self, indent: int) -> Any: # See the specification for details. chunks = [] srp = self.reader.peek @@ -1377,8 +1310,7 @@ class Scanner: srf() return chunks, end_mark - def scan_flow_scalar(self, style): - # type: (Any) -> Any + def scan_flow_scalar(self, style: Any) -> Any: # See the specification for details. # Note that we loose indentation rules for quoted scalars. Quoted # scalars don't need to adhere indentation because " and ' clearly @@ -1390,7 +1322,7 @@ class Scanner: else: double = False srp = self.reader.peek - chunks = [] # type: List[Any] + chunks: List[Any] = [] start_mark = self.reader.get_mark() quote = srp() self.reader.forward() @@ -1425,10 +1357,9 @@ class Scanner: ESCAPE_CODES = {'x': 2, 'u': 4, 'U': 8} - def scan_flow_scalar_non_spaces(self, double, start_mark): - # type: (Any, Any) -> Any + def scan_flow_scalar_non_spaces(self, double: Any, start_mark: Any) -> Any: # See the specification for details. - chunks = [] # type: List[Any] + chunks: List[Any] = [] srp = self.reader.peek srf = self.reader.forward while True: @@ -1459,12 +1390,8 @@ class Scanner: raise ScannerError( 'while scanning a double-quoted scalar', start_mark, - _F( - 'expected escape sequence of {length:d} hexdecimal ' - 'numbers, but found {srp_call!r}', - length=length, - srp_call=srp(k), - ), + f'expected escape sequence of {length:d} ' + f'hexdecimal numbers, but found {srp(k)!r}', self.reader.get_mark(), ) code = int(self.reader.prefix(length), 16) @@ -1477,14 +1404,13 @@ class Scanner: raise ScannerError( 'while scanning a double-quoted scalar', start_mark, - _F('found unknown escape character {ch!r}', ch=ch), + f'found unknown escape character {ch!r}', self.reader.get_mark(), ) else: return chunks - def scan_flow_scalar_spaces(self, double, start_mark): - # type: (Any, Any) -> Any + def scan_flow_scalar_spaces(self, double: Any, start_mark: Any) -> Any: # See the specification for details. srp = self.reader.peek chunks = [] @@ -1513,10 +1439,9 @@ class Scanner: chunks.append(whitespaces) return chunks - def scan_flow_scalar_breaks(self, double, start_mark): - # type: (Any, Any) -> Any + def scan_flow_scalar_breaks(self, double: Any, start_mark: Any) -> Any: # See the specification for details. - chunks = [] # type: List[Any] + chunks: List[Any] = [] srp = self.reader.peek srf = self.reader.forward while True: @@ -1537,8 +1462,7 @@ class Scanner: else: return chunks - def scan_plain(self): - # type: () -> Any + def scan_plain(self) -> Any: # See the specification for details. # We add an additional restriction for the flow context: # plain scalars in the flow context cannot contain ',', ': ' and '?'. @@ -1546,7 +1470,7 @@ class Scanner: # Indentation rules are loosed for the flow context. srp = self.reader.peek srf = self.reader.forward - chunks = [] # type: List[Any] + chunks: List[Any] = [] start_mark = self.reader.get_mark() end_mark = start_mark indent = self.indent + 1 @@ -1554,7 +1478,7 @@ class Scanner: # document separators at the beginning of the line. # if indent == 0: # indent = 1 - spaces = [] # type: List[Any] + spaces: List[Any] = [] while True: length = 0 if srp() == '#': @@ -1626,8 +1550,7 @@ class Scanner: return token - def scan_plain_spaces(self, indent, start_mark): - # type: (Any, Any) -> Any + def scan_plain_spaces(self, indent: Any, start_mark: Any) -> Any: # See the specification for details. # The specification is really confusing about tabs in plain scalars. # We just forbid them completely. Do not use tabs in YAML! @@ -1664,8 +1587,7 @@ class Scanner: chunks.append(whitespaces) return chunks - def scan_tag_handle(self, name, start_mark): - # type: (Any, Any) -> Any + def scan_tag_handle(self, name: Any, start_mark: Any) -> Any: # See the specification for details. # For some strange reasons, the specification does not allow '_' in # tag handles. I have allowed it anyway. @@ -1673,9 +1595,9 @@ class Scanner: ch = srp() if ch != '!': raise ScannerError( - _F('while scanning an {name!s}', name=name), + f'while scanning an {name!s}', start_mark, - _F("expected '!', but found {ch!r}", ch=ch), + f"expected '!', but found {ch!r}", self.reader.get_mark(), ) length = 1 @@ -1687,9 +1609,9 @@ class Scanner: if ch != '!': self.reader.forward(length) raise ScannerError( - _F('while scanning an {name!s}', name=name), + f'while scanning an {name!s}', start_mark, - _F("expected '!', but found {ch!r}", ch=ch), + f"expected '!' but found {ch!r}", self.reader.get_mark(), ) length += 1 @@ -1697,8 +1619,7 @@ class Scanner: self.reader.forward(length) return value - def scan_tag_uri(self, name, start_mark): - # type: (Any, Any) -> Any + def scan_tag_uri(self, name: Any, start_mark: Any) -> Any: # See the specification for details. # Note: we do not check if URI is well-formed. srp = self.reader.peek @@ -1726,32 +1647,28 @@ class Scanner: length = 0 if not chunks: raise ScannerError( - _F('while parsing an {name!s}', name=name), + f'while parsing an {name!s}', start_mark, - _F('expected URI, but found {ch!r}', ch=ch), + f'expected URI, but found {ch!r}', self.reader.get_mark(), ) return "".join(chunks) - def scan_uri_escapes(self, name, start_mark): - # type: (Any, Any) -> Any + def scan_uri_escapes(self, name: Any, start_mark: Any) -> Any: # See the specification for details. srp = self.reader.peek srf = self.reader.forward - code_bytes = [] # type: List[Any] + code_bytes: List[Any] = [] mark = self.reader.get_mark() while srp() == '%': srf() for k in range(2): if srp(k) not in '0123456789ABCDEFabcdef': raise ScannerError( - _F('while scanning an {name!s}', name=name), + f'while scanning an {name!s}', start_mark, - _F( - 'expected URI escape sequence of 2 hexdecimal numbers,' - ' but found {srp_call!r}', - srp_call=srp(k), - ), + f'expected URI escape sequence of 2 hexdecimal numbers, ' + f'but found {srp(k)!r}', self.reader.get_mark(), ) code_bytes.append(int(self.reader.prefix(2), 16)) @@ -1759,13 +1676,10 @@ class Scanner: try: value = bytes(code_bytes).decode('utf-8') except UnicodeDecodeError as exc: - raise ScannerError( - _F('while scanning an {name!s}', name=name), start_mark, str(exc), mark - ) + raise ScannerError(f'while scanning an {name!s}', start_mark, str(exc), mark) return value - def scan_line_break(self): - # type: () -> Any + def scan_line_break(self) -> Any: # Transforms: # '\r\n' : '\n' # '\r' : '\n' @@ -1788,8 +1702,7 @@ class Scanner: class RoundTripScanner(Scanner): - def check_token(self, *choices): - # type: (Any) -> bool + def check_token(self, *choices: Any) -> bool: # Check if the next token is one of the given types. while self.need_more_tokens(): self.fetch_more_tokens() @@ -1802,8 +1715,7 @@ class RoundTripScanner(Scanner): return True return False - def peek_token(self): - # type: () -> Any + def peek_token(self) -> Any: # Return the next token, but do not delete if from the queue. while self.need_more_tokens(): self.fetch_more_tokens() @@ -1812,10 +1724,9 @@ class RoundTripScanner(Scanner): return self.tokens[0] return None - def _gather_comments(self): - # type: () -> Any + def _gather_comments(self) -> Any: """combine multiple comment lines and assign to next non-comment-token""" - comments = [] # type: List[Any] + comments: List[Any] = [] if not self.tokens: return comments if isinstance(self.tokens[0], CommentToken): @@ -1837,8 +1748,7 @@ class RoundTripScanner(Scanner): if not self.done and len(self.tokens) < 2: self.fetch_more_tokens() - def get_token(self): - # type: () -> Any + def get_token(self) -> Any: # Return the next token. while self.need_more_tokens(): self.fetch_more_tokens() @@ -1891,8 +1801,7 @@ class RoundTripScanner(Scanner): return self.tokens.pop(0) return None - def fetch_comment(self, comment): - # type: (Any) -> None + def fetch_comment(self, comment: Any) -> None: value, start_mark, end_mark = comment while value and value[-1] == ' ': # empty line within indented key context @@ -1902,8 +1811,7 @@ class RoundTripScanner(Scanner): # scanner - def scan_to_next_token(self): - # type: () -> Any + def scan_to_next_token(self) -> Any: # We ignore spaces, line breaks and comments. # If we find a line break in the block context, we set the flag # `allow_simple_key` on. @@ -1946,7 +1854,7 @@ class RoundTripScanner(Scanner): break comment += ch srf() - # gather any blank lines following the comment too + # gather any blank lines following the comment ch = self.scan_line_break() while len(ch) > 0: comment += ch @@ -1975,8 +1883,7 @@ class RoundTripScanner(Scanner): found = True return None - def scan_line_break(self, empty_line=False): - # type: (bool) -> Text + def scan_line_break(self, empty_line: bool = False) -> Text: # Transforms: # '\r\n' : '\n' # '\r' : '\n' @@ -1985,7 +1892,7 @@ class RoundTripScanner(Scanner): # '\u2028' : '\u2028' # '\u2029 : '\u2029' # default : '' - ch = self.reader.peek() # type: Text + ch: Text = self.reader.peek() if ch in '\r\n\x85': if self.reader.prefix(2) == '\r\n': self.reader.forward(2) @@ -2000,8 +1907,7 @@ class RoundTripScanner(Scanner): return ch return "" - def scan_block_scalar(self, style, rt=True): - # type: (Any, Optional[bool]) -> Any + def scan_block_scalar(self, style: Any, rt: Optional[bool] = True) -> Any: return Scanner.scan_block_scalar(self, style, rt=rt) @@ -2016,8 +1922,7 @@ KEYCMNT = 0 # 1 class CommentBase: __slots__ = ('value', 'line', 'column', 'used', 'function', 'fline', 'ufun', 'uline') - def __init__(self, value, line, column): - # type: (Any, Any, Any) -> None + def __init__(self, value: Any, line: Any, column: Any) -> None: self.value = value self.line = line self.column = column @@ -2028,73 +1933,57 @@ class CommentBase: self.ufun = None self.uline = None - def set_used(self, v='+'): - # type: (Any) -> None + def set_used(self, v: Any = '+') -> None: self.used = v info = inspect.getframeinfo(inspect.stack()[1][0]) self.ufun = info.function # type: ignore self.uline = info.lineno # type: ignore - def set_assigned(self): - # type: () -> None + def set_assigned(self) -> None: self.used = '|' - def __str__(self): - # type: () -> str - return _F('{value}', value=self.value) # type: ignore - - def __repr__(self): - # type: () -> str - return _F('{value!r}', value=self.value) # type: ignore - - def info(self): - # type: () -> str - return _F( # type: ignore - '{name}{used} {line:2}:{column:<2} "{value:40s} {function}:{fline} {ufun}:{uline}', - name=self.name, # type: ignore - line=self.line, - column=self.column, - value=self.value + '"', - used=self.used, - function=self.function, - fline=self.fline, - ufun=self.ufun, - uline=self.uline, + def __str__(self) -> str: + return f'{self.value}' + + def __repr__(self) -> str: + return f'{self.value!r}' + + def info(self) -> str: + xv = self.value + '"' + name = self.name # type: ignore + return ( + f'{name}{self.used} {self.line:2}:{self.column:<2} "{xv:40s} ' + f'{self.function}:{self.fline} {self.ufun}:{self.uline}' ) class EOLComment(CommentBase): name = 'EOLC' - def __init__(self, value, line, column): - # type: (Any, Any, Any) -> None + def __init__(self, value: Any, line: Any, column: Any) -> None: super().__init__(value, line, column) class FullLineComment(CommentBase): name = 'FULL' - def __init__(self, value, line, column): - # type: (Any, Any, Any) -> None + def __init__(self, value: Any, line: Any, column: Any) -> None: super().__init__(value, line, column) class BlankLineComment(CommentBase): name = 'BLNK' - def __init__(self, value, line, column): - # type: (Any, Any, Any) -> None + def __init__(self, value: Any, line: Any, column: Any) -> None: super().__init__(value, line, column) class ScannedComments: - def __init__(self): - # type: (Any) -> None + def __init__(self: Any) -> None: self.comments = {} # type: ignore self.unused = [] # type: ignore - def add_eol_comment(self, comment, column, line): - # type: (Any, Any, Any) -> Any + def add_eol_comment(self, comment: Any, column: Any, line: Any) -> Any: # info = inspect.getframeinfo(inspect.stack()[1][0]) if comment.count('\n') == 1: assert comment[-1] == '\n' @@ -2104,8 +1993,7 @@ class ScannedComments: self.unused.append(line) return retval - def add_blank_line(self, comment, column, line): - # type: (Any, Any, Any) -> Any + def add_blank_line(self, comment: Any, column: Any, line: Any) -> Any: # info = inspect.getframeinfo(inspect.stack()[1][0]) assert comment.count('\n') == 1 and comment[-1] == '\n' assert line not in self.comments @@ -2113,8 +2001,7 @@ class ScannedComments: self.unused.append(line) return retval - def add_full_line_comment(self, comment, column, line): - # type: (Any, Any, Any) -> Any + def add_full_line_comment(self, comment: Any, column: Any, line: Any) -> Any: # info = inspect.getframeinfo(inspect.stack()[1][0]) assert comment.count('\n') == 1 and comment[-1] == '\n' # if comment.startswith('# C12'): @@ -2124,30 +2011,21 @@ class ScannedComments: self.unused.append(line) return retval - def __getitem__(self, idx): - # type: (Any) -> Any + def __getitem__(self, idx: Any) -> Any: return self.comments[idx] - def __str__(self): - # type: () -> Any + def __str__(self) -> Any: return ( 'ParsedComments:\n ' - + '\n '.join( - ( - _F('{lineno:2} {x}', lineno=lineno, x=x.info()) - for lineno, x in self.comments.items() - ) - ) + + '\n '.join((f'{lineno:2} {x.info()}' for lineno, x in self.comments.items())) + '\n' ) - def last(self): - # type: () -> str + def last(self) -> str: lineno, x = list(self.comments.items())[-1] - return _F('{lineno:2} {x}\n', lineno=lineno, x=x.info()) # type: ignore + return f'{lineno:2} {x.info()}\n' - def any_unprocessed(self): - # type: () -> bool + def any_unprocessed(self) -> bool: # ToDo: might want to differentiate based on lineno return len(self.unused) > 0 # for lno, comment in reversed(self.comments.items()): @@ -2155,8 +2033,7 @@ class ScannedComments: # return True # return False - def unprocessed(self, use=False): - # type: (Any) -> Any + def unprocessed(self, use: Any = False) -> Any: while len(self.unused) > 0: first = self.unused.pop(0) if use else self.unused[0] info = inspect.getframeinfo(inspect.stack()[1][0]) @@ -2165,8 +2042,7 @@ class ScannedComments: if use: self.comments[first].set_used() - def assign_pre(self, token): - # type: (Any) -> Any + def assign_pre(self, token: Any) -> Any: token_line = token.start_mark.line info = inspect.getframeinfo(inspect.stack()[1][0]) xprintf('assign_pre', token_line, self.unused, info.function, info.lineno) @@ -2179,8 +2055,7 @@ class ScannedComments: token.add_comment_pre(first) return gobbled - def assign_eol(self, tokens): - # type: (Any) -> Any + def assign_eol(self, tokens: Any) -> Any: try: comment_line = self.unused[0] except IndexError: @@ -2235,8 +2110,7 @@ class ScannedComments: sys.exit(0) - def assign_post(self, token): - # type: (Any) -> Any + def assign_post(self, token: Any) -> Any: token_line = token.start_mark.line info = inspect.getframeinfo(inspect.stack()[1][0]) xprintf('assign_post', token_line, self.unused, info.function, info.lineno) @@ -2249,28 +2123,21 @@ class ScannedComments: token.add_comment_post(first) return gobbled - def str_unprocessed(self): - # type: () -> Any + def str_unprocessed(self) -> Any: return ''.join( - ( - _F(' {ind:2} {x}\n', ind=ind, x=x.info()) - for ind, x in self.comments.items() - if x.used == ' ' - ) + (f' {ind:2} {x.info()}\n' for ind, x in self.comments.items() if x.used == ' ') ) class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments - def __init__(self, *arg, **kw): - # type: (Any, Any) -> None + def __init__(self, *arg: Any, **kw: Any) -> None: super().__init__(*arg, **kw) assert self.loader is not None # comments isinitialised on .need_more_tokens and persist on # self.loader.parsed_comments self.comments = None - def get_token(self): - # type: () -> Any + def get_token(self) -> Any: # Return the next token. while self.need_more_tokens(): self.fetch_more_tokens() @@ -2282,8 +2149,7 @@ class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments self.tokens_taken += 1 return self.tokens.pop(0) - def need_more_tokens(self): - # type: () -> bool + def need_more_tokens(self) -> bool: if self.comments is None: self.loader.parsed_comments = self.comments = ScannedComments() # type: ignore if self.done: @@ -2309,8 +2175,7 @@ class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments self.comments.assign_eol(self.tokens) # type: ignore return False - def scan_to_next_token(self): - # type: () -> None + def scan_to_next_token(self) -> None: srp = self.reader.peek srf = self.reader.forward if self.reader.index == 0 and srp() == '\uFEFF': @@ -2373,8 +2238,7 @@ class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments found = True return None - def scan_empty_or_full_line_comments(self): - # type: () -> None + def scan_empty_or_full_line_comments(self) -> None: blmark = self.reader.get_mark() assert blmark.column == 0 blanks = "" @@ -2413,8 +2277,7 @@ class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments self.reader.forward() ch = self.reader.peek() - def scan_block_scalar_ignored_line(self, start_mark): - # type: (Any) -> Any + def scan_block_scalar_ignored_line(self, start_mark: Any) -> Any: # See the specification for details. srp = self.reader.peek srf = self.reader.forward @@ -2435,7 +2298,7 @@ class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments raise ScannerError( 'while scanning a block scalar', start_mark, - _F('expected a comment or a line break, but found {ch!r}', ch=ch), + f'expected a comment or a line break, but found {ch!r}', self.reader.get_mark(), ) if comment is not None: |