summaryrefslogtreecommitdiff
path: root/scanner.py
diff options
context:
space:
mode:
authorAnthon van der Neut <anthon@mnt.org>2023-05-01 19:13:50 +0200
committerAnthon van der Neut <anthon@mnt.org>2023-05-01 19:13:50 +0200
commit8b731994b1543d7886af85f926d9eea5a22d0732 (patch)
tree3553d4cbc80b541484d7a3f39e00cdcfd8f9d030 /scanner.py
parent45111ba0b67e8619265d89f3202635e62c13cde6 (diff)
downloadruamel.yaml-8b731994b1543d7886af85f926d9eea5a22d0732.tar.gz
retrofitted 0.18 changes
Diffstat (limited to 'scanner.py')
-rw-r--r--scanner.py493
1 files changed, 178 insertions, 315 deletions
diff --git a/scanner.py b/scanner.py
index 61cae63..09fd2ad 100644
--- a/scanner.py
+++ b/scanner.py
@@ -31,11 +31,10 @@
import inspect
from ruamel.yaml.error import MarkedYAMLError, CommentMark # NOQA
from ruamel.yaml.tokens import * # NOQA
-from ruamel.yaml.compat import _F, check_anchorname_char, nprint, nprintf # NOQA
+from ruamel.yaml.compat import check_anchorname_char, nprint, nprintf # NOQA
-if False: # MYPY
- from typing import Any, Dict, Optional, List, Union, Text # NOQA
- from ruamel.yaml.compat import VersionType # NOQA
+from typing import Any, Dict, Optional, List, Union, Text # NOQA
+from ruamel.yaml.compat import VersionType # NOQA
__all__ = ['Scanner', 'RoundTripScanner', 'ScannerError']
@@ -45,8 +44,7 @@ _THE_END_SPACE_TAB = ' \n\0\t\r\x85\u2028\u2029'
_SPACE_TAB = ' \t'
-def xprintf(*args, **kw):
- # type: (Any, Any) -> Any
+def xprintf(*args: Any, **kw: Any) -> Any:
return nprintf(*args, **kw)
pass
@@ -58,8 +56,9 @@ class ScannerError(MarkedYAMLError):
class SimpleKey:
# See below simple keys treatment.
- def __init__(self, token_number, required, index, line, column, mark):
- # type: (Any, Any, int, int, int, Any) -> None
+ def __init__(
+ self, token_number: Any, required: Any, index: int, line: int, column: int, mark: Any
+ ) -> None:
self.token_number = token_number
self.required = required
self.index = index
@@ -69,8 +68,7 @@ class SimpleKey:
class Scanner:
- def __init__(self, loader=None):
- # type: (Any) -> None
+ def __init__(self, loader: Any = None) -> None:
"""Initialize the scanner."""
# It is assumed that Scanner and Reader will have a common descendant.
# Reader do the dirty work of checking for BOM and converting the
@@ -86,24 +84,22 @@ class Scanner:
self.loader._scanner = self
self.reset_scanner()
self.first_time = False
- self.yaml_version = None # type: Any
+ self.yaml_version: Any = None
@property
- def flow_level(self):
- # type: () -> int
+ def flow_level(self) -> int:
return len(self.flow_context)
- def reset_scanner(self):
- # type: () -> None
+ def reset_scanner(self) -> None:
# Had we reached the end of the stream?
self.done = False
# flow_context is an expanding/shrinking list consisting of '{' and '['
# for each unclosed flow context. If empty list that means block context
- self.flow_context = [] # type: List[Text]
+ self.flow_context: List[Text] = []
# List of processed tokens that are not yet emitted.
- self.tokens = [] # type: List[Any]
+ self.tokens: List[Any] = []
# Add the STREAM-START token.
self.fetch_stream_start()
@@ -115,7 +111,7 @@ class Scanner:
self.indent = -1
# Past indentation levels.
- self.indents = [] # type: List[int]
+ self.indents: List[int] = []
# Variables related to simple keys treatment.
@@ -145,11 +141,10 @@ class Scanner:
# (token_number, required, index, line, column, mark)
# A simple key may start with ALIAS, ANCHOR, TAG, SCALAR(flow),
# '[', or '{' tokens.
- self.possible_simple_keys = {} # type: Dict[Any, Any]
+ self.possible_simple_keys: Dict[Any, Any] = {}
@property
- def reader(self):
- # type: () -> Any
+ def reader(self) -> Any:
try:
return self._scanner_reader # type: ignore
except AttributeError:
@@ -160,16 +155,14 @@ class Scanner:
return self._scanner_reader
@property
- def scanner_processing_version(self): # prefix until un-composited
- # type: () -> Any
+ def scanner_processing_version(self) -> Any: # prefix until un-composited
if hasattr(self.loader, 'typ'):
return self.loader.resolver.processing_version
return self.loader.processing_version
# Public methods.
- def check_token(self, *choices):
- # type: (Any) -> bool
+ def check_token(self, *choices: Any) -> bool:
# Check if the next token is one of the given types.
while self.need_more_tokens():
self.fetch_more_tokens()
@@ -181,16 +174,14 @@ class Scanner:
return True
return False
- def peek_token(self):
- # type: () -> Any
+ def peek_token(self) -> Any:
# Return the next token, but do not delete if from the queue.
while self.need_more_tokens():
self.fetch_more_tokens()
if len(self.tokens) > 0:
return self.tokens[0]
- def get_token(self):
- # type: () -> Any
+ def get_token(self) -> Any:
# Return the next token.
while self.need_more_tokens():
self.fetch_more_tokens()
@@ -200,8 +191,7 @@ class Scanner:
# Private methods.
- def need_more_tokens(self):
- # type: () -> bool
+ def need_more_tokens(self) -> bool:
if self.done:
return False
if len(self.tokens) == 0:
@@ -213,12 +203,10 @@ class Scanner:
return True
return False
- def fetch_comment(self, comment):
- # type: (Any) -> None
+ def fetch_comment(self, comment: Any) -> None:
raise NotImplementedError
- def fetch_more_tokens(self):
- # type: () -> Any
+ def fetch_more_tokens(self) -> Any:
# Eat whitespaces and comments until we reach the next token.
comment = self.scan_to_next_token()
if comment is not None: # never happens for base scanner
@@ -323,14 +311,13 @@ class Scanner:
raise ScannerError(
'while scanning for the next token',
None,
- _F('found character {ch!r} that cannot start any token', ch=ch),
+ f'found character {ch!r} that cannot start any token',
self.reader.get_mark(),
)
# Simple keys treatment.
- def next_possible_simple_key(self):
- # type: () -> Any
+ def next_possible_simple_key(self) -> Any:
# Return the number of the nearest possible simple key. Actually we
# don't need to loop through the whole dictionary. We may replace it
# with the following code:
@@ -345,8 +332,7 @@ class Scanner:
min_token_number = key.token_number
return min_token_number
- def stale_possible_simple_keys(self):
- # type: () -> None
+ def stale_possible_simple_keys(self) -> None:
# Remove entries that are no longer possible simple keys. According to
# the YAML specification, simple keys
# - should be limited to a single line,
@@ -365,8 +351,7 @@ class Scanner:
)
del self.possible_simple_keys[level]
- def save_possible_simple_key(self):
- # type: () -> None
+ def save_possible_simple_key(self) -> None:
# The next token may start a simple key. We check if it's possible
# and save its position. This function is called for
# ALIAS, ANCHOR, TAG, SCALAR(flow), '[', and '{'.
@@ -389,8 +374,7 @@ class Scanner:
)
self.possible_simple_keys[self.flow_level] = key
- def remove_possible_simple_key(self):
- # type: () -> None
+ def remove_possible_simple_key(self) -> None:
# Remove the saved possible key position at the current flow level.
if self.flow_level in self.possible_simple_keys:
key = self.possible_simple_keys[self.flow_level]
@@ -407,8 +391,7 @@ class Scanner:
# Indentation functions.
- def unwind_indent(self, column):
- # type: (Any) -> None
+ def unwind_indent(self, column: Any) -> None:
# In flow context, tokens should respect indentation.
# Actually the condition should be `self.indent >= column` according to
# the spec. But this condition will prohibit intuitively correct
@@ -432,8 +415,7 @@ class Scanner:
self.indent = self.indents.pop()
self.tokens.append(BlockEndToken(mark, mark))
- def add_indent(self, column):
- # type: (int) -> bool
+ def add_indent(self, column: int) -> bool:
# Check if we need to increase indentation.
if self.indent < column:
self.indents.append(self.indent)
@@ -443,8 +425,7 @@ class Scanner:
# Fetchers.
- def fetch_stream_start(self):
- # type: () -> None
+ def fetch_stream_start(self) -> None:
# We always add STREAM-START as the first token and STREAM-END as the
# last token.
# Read the token.
@@ -452,8 +433,7 @@ class Scanner:
# Add STREAM-START.
self.tokens.append(StreamStartToken(mark, mark, encoding=self.reader.encoding))
- def fetch_stream_end(self):
- # type: () -> None
+ def fetch_stream_end(self) -> None:
# Set the current intendation to -1.
self.unwind_indent(-1)
# Reset simple keys.
@@ -467,8 +447,7 @@ class Scanner:
# The steam is finished.
self.done = True
- def fetch_directive(self):
- # type: () -> None
+ def fetch_directive(self) -> None:
# Set the current intendation to -1.
self.unwind_indent(-1)
@@ -479,16 +458,13 @@ class Scanner:
# Scan and add DIRECTIVE.
self.tokens.append(self.scan_directive())
- def fetch_document_start(self):
- # type: () -> None
+ def fetch_document_start(self) -> None:
self.fetch_document_indicator(DocumentStartToken)
- def fetch_document_end(self):
- # type: () -> None
+ def fetch_document_end(self) -> None:
self.fetch_document_indicator(DocumentEndToken)
- def fetch_document_indicator(self, TokenClass):
- # type: (Any) -> None
+ def fetch_document_indicator(self, TokenClass: Any) -> None:
# Set the current intendation to -1.
self.unwind_indent(-1)
@@ -503,16 +479,13 @@ class Scanner:
end_mark = self.reader.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
- def fetch_flow_sequence_start(self):
- # type: () -> None
+ def fetch_flow_sequence_start(self) -> None:
self.fetch_flow_collection_start(FlowSequenceStartToken, to_push='[')
- def fetch_flow_mapping_start(self):
- # type: () -> None
+ def fetch_flow_mapping_start(self) -> None:
self.fetch_flow_collection_start(FlowMappingStartToken, to_push='{')
- def fetch_flow_collection_start(self, TokenClass, to_push):
- # type: (Any, Text) -> None
+ def fetch_flow_collection_start(self, TokenClass: Any, to_push: Text) -> None:
# '[' and '{' may start a simple key.
self.save_possible_simple_key()
# Increase the flow level.
@@ -525,16 +498,13 @@ class Scanner:
end_mark = self.reader.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
- def fetch_flow_sequence_end(self):
- # type: () -> None
+ def fetch_flow_sequence_end(self) -> None:
self.fetch_flow_collection_end(FlowSequenceEndToken)
- def fetch_flow_mapping_end(self):
- # type: () -> None
+ def fetch_flow_mapping_end(self) -> None:
self.fetch_flow_collection_end(FlowMappingEndToken)
- def fetch_flow_collection_end(self, TokenClass):
- # type: (Any) -> None
+ def fetch_flow_collection_end(self, TokenClass: Any) -> None:
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
# Decrease the flow level.
@@ -552,8 +522,7 @@ class Scanner:
end_mark = self.reader.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
- def fetch_flow_entry(self):
- # type: () -> None
+ def fetch_flow_entry(self) -> None:
# Simple keys are allowed after ','.
self.allow_simple_key = True
# Reset possible simple key on the current level.
@@ -564,8 +533,7 @@ class Scanner:
end_mark = self.reader.get_mark()
self.tokens.append(FlowEntryToken(start_mark, end_mark))
- def fetch_block_entry(self):
- # type: () -> None
+ def fetch_block_entry(self) -> None:
# Block context needs additional checks.
if not self.flow_level:
# Are we allowed to start a new entry?
@@ -592,8 +560,7 @@ class Scanner:
end_mark = self.reader.get_mark()
self.tokens.append(BlockEntryToken(start_mark, end_mark))
- def fetch_key(self):
- # type: () -> None
+ def fetch_key(self) -> None:
# Block context needs additional checks.
if not self.flow_level:
@@ -620,8 +587,7 @@ class Scanner:
end_mark = self.reader.get_mark()
self.tokens.append(KeyToken(start_mark, end_mark))
- def fetch_value(self):
- # type: () -> None
+ def fetch_value(self) -> None:
# Do we determine a simple key?
if self.flow_level in self.possible_simple_keys:
# Add KEY.
@@ -681,8 +647,7 @@ class Scanner:
end_mark = self.reader.get_mark()
self.tokens.append(ValueToken(start_mark, end_mark))
- def fetch_alias(self):
- # type: () -> None
+ def fetch_alias(self) -> None:
# ALIAS could be a simple key.
self.save_possible_simple_key()
# No simple keys after ALIAS.
@@ -690,8 +655,7 @@ class Scanner:
# Scan and add ALIAS.
self.tokens.append(self.scan_anchor(AliasToken))
- def fetch_anchor(self):
- # type: () -> None
+ def fetch_anchor(self) -> None:
# ANCHOR could start a simple key.
self.save_possible_simple_key()
# No simple keys after ANCHOR.
@@ -699,8 +663,7 @@ class Scanner:
# Scan and add ANCHOR.
self.tokens.append(self.scan_anchor(AnchorToken))
- def fetch_tag(self):
- # type: () -> None
+ def fetch_tag(self) -> None:
# TAG could start a simple key.
self.save_possible_simple_key()
# No simple keys after TAG.
@@ -708,16 +671,13 @@ class Scanner:
# Scan and add TAG.
self.tokens.append(self.scan_tag())
- def fetch_literal(self):
- # type: () -> None
+ def fetch_literal(self) -> None:
self.fetch_block_scalar(style='|')
- def fetch_folded(self):
- # type: () -> None
+ def fetch_folded(self) -> None:
self.fetch_block_scalar(style='>')
- def fetch_block_scalar(self, style):
- # type: (Any) -> None
+ def fetch_block_scalar(self, style: Any) -> None:
# A simple key may follow a block scalar.
self.allow_simple_key = True
# Reset possible simple key on the current level.
@@ -725,16 +685,13 @@ class Scanner:
# Scan and add SCALAR.
self.tokens.append(self.scan_block_scalar(style))
- def fetch_single(self):
- # type: () -> None
+ def fetch_single(self) -> None:
self.fetch_flow_scalar(style="'")
- def fetch_double(self):
- # type: () -> None
+ def fetch_double(self) -> None:
self.fetch_flow_scalar(style='"')
- def fetch_flow_scalar(self, style):
- # type: (Any) -> None
+ def fetch_flow_scalar(self, style: Any) -> None:
# A flow scalar could be a simple key.
self.save_possible_simple_key()
# No simple keys after flow scalars.
@@ -742,8 +699,7 @@ class Scanner:
# Scan and add SCALAR.
self.tokens.append(self.scan_flow_scalar(style))
- def fetch_plain(self):
- # type: () -> None
+ def fetch_plain(self) -> None:
# A plain scalar could be a simple key.
self.save_possible_simple_key()
# No simple keys after plain scalars. But note that `scan_plain` will
@@ -755,45 +711,39 @@ class Scanner:
# Checkers.
- def check_directive(self):
- # type: () -> Any
+ def check_directive(self) -> Any:
# DIRECTIVE: ^ '%' ...
# The '%' indicator is already checked.
if self.reader.column == 0:
return True
return None
- def check_document_start(self):
- # type: () -> Any
+ def check_document_start(self) -> Any:
# DOCUMENT-START: ^ '---' (' '|'\n')
if self.reader.column == 0:
if self.reader.prefix(3) == '---' and self.reader.peek(3) in _THE_END_SPACE_TAB:
return True
return None
- def check_document_end(self):
- # type: () -> Any
+ def check_document_end(self) -> Any:
# DOCUMENT-END: ^ '...' (' '|'\n')
if self.reader.column == 0:
if self.reader.prefix(3) == '...' and self.reader.peek(3) in _THE_END_SPACE_TAB:
return True
return None
- def check_block_entry(self):
- # type: () -> Any
+ def check_block_entry(self) -> Any:
# BLOCK-ENTRY: '-' (' '|'\n')
return self.reader.peek(1) in _THE_END_SPACE_TAB
- def check_key(self):
- # type: () -> Any
+ def check_key(self) -> Any:
# KEY(flow context): '?'
if bool(self.flow_level):
return True
# KEY(block context): '?' (' '|'\n')
return self.reader.peek(1) in _THE_END_SPACE_TAB
- def check_value(self):
- # type: () -> Any
+ def check_value(self) -> Any:
# VALUE(flow context): ':'
if self.scanner_processing_version == (1, 1):
if bool(self.flow_level):
@@ -811,8 +761,7 @@ class Scanner:
# VALUE(block context): ':' (' '|'\n')
return self.reader.peek(1) in _THE_END_SPACE_TAB
- def check_plain(self):
- # type: () -> Any
+ def check_plain(self) -> Any:
# A plain scalar may start with any non-space character except:
# '-', '?', ':', ',', '[', ']', '{', '}',
# '#', '&', '*', '!', '|', '>', '\'', '\"',
@@ -848,8 +797,7 @@ class Scanner:
# Scanners.
- def scan_to_next_token(self):
- # type: () -> Any
+ def scan_to_next_token(self) -> Any:
# We ignore spaces, line breaks and comments.
# If we find a line break in the block context, we set the flag
# `allow_simple_key` on.
@@ -887,8 +835,7 @@ class Scanner:
found = True
return None
- def scan_directive(self):
- # type: () -> Any
+ def scan_directive(self) -> Any:
# See the specification for details.
srp = self.reader.peek
srf = self.reader.forward
@@ -909,8 +856,7 @@ class Scanner:
self.scan_directive_ignored_line(start_mark)
return DirectiveToken(name, value, start_mark, end_mark)
- def scan_directive_name(self, start_mark):
- # type: (Any) -> Any
+ def scan_directive_name(self, start_mark: Any) -> Any:
# See the specification for details.
length = 0
srp = self.reader.peek
@@ -922,7 +868,7 @@ class Scanner:
raise ScannerError(
'while scanning a directive',
start_mark,
- _F('expected alphabetic or numeric character, but found {ch!r}', ch=ch),
+ f'expected alphabetic or numeric character, but found {ch!r}',
self.reader.get_mark(),
)
value = self.reader.prefix(length)
@@ -932,13 +878,12 @@ class Scanner:
raise ScannerError(
'while scanning a directive',
start_mark,
- _F('expected alphabetic or numeric character, but found {ch!r}', ch=ch),
+ f'expected alphabetic or numeric character, but found {ch!r}',
self.reader.get_mark(),
)
return value
- def scan_yaml_directive_value(self, start_mark):
- # type: (Any) -> Any
+ def scan_yaml_directive_value(self, start_mark: Any) -> Any:
# See the specification for details.
srp = self.reader.peek
srf = self.reader.forward
@@ -949,7 +894,7 @@ class Scanner:
raise ScannerError(
'while scanning a directive',
start_mark,
- _F("expected a digit or '.', but found {srp_call!r}", srp_call=srp()),
+ f"expected a digit or '.', but found {srp()!r}",
self.reader.get_mark(),
)
srf()
@@ -958,14 +903,13 @@ class Scanner:
raise ScannerError(
'while scanning a directive',
start_mark,
- _F("expected a digit or '.', but found {srp_call!r}", srp_call=srp()),
+ f"expected a digit or '.', but found {srp()!r}",
self.reader.get_mark(),
)
self.yaml_version = (major, minor)
return self.yaml_version
- def scan_yaml_directive_number(self, start_mark):
- # type: (Any) -> Any
+ def scan_yaml_directive_number(self, start_mark: Any) -> Any:
# See the specification for details.
srp = self.reader.peek
srf = self.reader.forward
@@ -974,7 +918,7 @@ class Scanner:
raise ScannerError(
'while scanning a directive',
start_mark,
- _F('expected a digit, but found {ch!r}', ch=ch),
+ f'expected a digit, but found {ch!r}',
self.reader.get_mark(),
)
length = 0
@@ -984,8 +928,7 @@ class Scanner:
srf(length)
return value
- def scan_tag_directive_value(self, start_mark):
- # type: (Any) -> Any
+ def scan_tag_directive_value(self, start_mark: Any) -> Any:
# See the specification for details.
srp = self.reader.peek
srf = self.reader.forward
@@ -997,8 +940,7 @@ class Scanner:
prefix = self.scan_tag_directive_prefix(start_mark)
return (handle, prefix)
- def scan_tag_directive_handle(self, start_mark):
- # type: (Any) -> Any
+ def scan_tag_directive_handle(self, start_mark: Any) -> Any:
# See the specification for details.
value = self.scan_tag_handle('directive', start_mark)
ch = self.reader.peek()
@@ -1006,13 +948,12 @@ class Scanner:
raise ScannerError(
'while scanning a directive',
start_mark,
- _F("expected ' ', but found {ch!r}", ch=ch),
+ f"expected ' ', but found {ch!r}",
self.reader.get_mark(),
)
return value
- def scan_tag_directive_prefix(self, start_mark):
- # type: (Any) -> Any
+ def scan_tag_directive_prefix(self, start_mark: Any) -> Any:
# See the specification for details.
value = self.scan_tag_uri('directive', start_mark)
ch = self.reader.peek()
@@ -1020,13 +961,12 @@ class Scanner:
raise ScannerError(
'while scanning a directive',
start_mark,
- _F("expected ' ', but found {ch!r}", ch=ch),
+ f"expected ' ', but found {ch!r}",
self.reader.get_mark(),
)
return value
- def scan_directive_ignored_line(self, start_mark):
- # type: (Any) -> None
+ def scan_directive_ignored_line(self, start_mark: Any) -> None:
# See the specification for details.
srp = self.reader.peek
srf = self.reader.forward
@@ -1040,13 +980,12 @@ class Scanner:
raise ScannerError(
'while scanning a directive',
start_mark,
- _F('expected a comment or a line break, but found {ch!r}', ch=ch),
+ f'expected a comment or a line break, but found {ch!r}',
self.reader.get_mark(),
)
self.scan_line_break()
- def scan_anchor(self, TokenClass):
- # type: (Any) -> Any
+ def scan_anchor(self, TokenClass: Any) -> Any:
# The specification does not restrict characters for anchors and
# aliases. This may lead to problems, for instance, the document:
# [ *alias, value ]
@@ -1072,9 +1011,9 @@ class Scanner:
ch = srp(length)
if not length:
raise ScannerError(
- _F('while scanning an {name!s}', name=name),
+ f'while scanning an {name!s}',
start_mark,
- _F('expected alphabetic or numeric character, but found {ch!r}', ch=ch),
+ f'expected alphabetic or numeric character, but found {ch!r}',
self.reader.get_mark(),
)
value = self.reader.prefix(length)
@@ -1084,16 +1023,15 @@ class Scanner:
# assert ch1 == ch
if ch not in '\0 \t\r\n\x85\u2028\u2029?:,[]{}%@`':
raise ScannerError(
- _F('while scanning an {name!s}', name=name),
+ f'while scanning an {name!s}',
start_mark,
- _F('expected alphabetic or numeric character, but found {ch!r}', ch=ch),
+ f'expected alphabetic or numeric character, but found {ch!r}',
self.reader.get_mark(),
)
end_mark = self.reader.get_mark()
return TokenClass(value, start_mark, end_mark)
- def scan_tag(self):
- # type: () -> Any
+ def scan_tag(self) -> Any:
# See the specification for details.
srp = self.reader.peek
start_mark = self.reader.get_mark()
@@ -1106,7 +1044,7 @@ class Scanner:
raise ScannerError(
'while parsing a tag',
start_mark,
- _F("expected '>', but found {srp_call!r}", srp_call=srp()),
+ f"expected '>' but found {srp()!r}",
self.reader.get_mark(),
)
self.reader.forward()
@@ -1135,15 +1073,14 @@ class Scanner:
raise ScannerError(
'while scanning a tag',
start_mark,
- _F("expected ' ', but found {ch!r}", ch=ch),
+ f"expected ' ', but found {ch!r}",
self.reader.get_mark(),
)
value = (handle, suffix)
end_mark = self.reader.get_mark()
return TagToken(value, start_mark, end_mark)
- def scan_block_scalar(self, style, rt=False):
- # type: (Any, Optional[bool]) -> Any
+ def scan_block_scalar(self, style: Any, rt: Optional[bool] = False) -> Any:
# See the specification for details.
srp = self.reader.peek
if style == '>':
@@ -1151,7 +1088,7 @@ class Scanner:
else:
folded = False
- chunks = [] # type: List[Any]
+ chunks: List[Any] = []
start_mark = self.reader.get_mark()
# Scan the header.
@@ -1227,7 +1164,7 @@ class Scanner:
# Process trailing line breaks. The 'chomping' setting determines
# whether they are included in the value.
- trailing = [] # type: List[Any]
+ trailing: List[Any] = []
if chomping in [None, True]:
chunks.append(line_break)
if chomping is True:
@@ -1266,8 +1203,7 @@ class Scanner:
token.add_post_comment(comment)
return token
- def scan_block_scalar_indicators(self, start_mark):
- # type: (Any) -> Any
+ def scan_block_scalar_indicators(self, start_mark: Any) -> Any:
# See the specification for details.
srp = self.reader.peek
chomping = None
@@ -1312,13 +1248,12 @@ class Scanner:
raise ScannerError(
'while scanning a block scalar',
start_mark,
- _F('expected chomping or indentation indicators, but found {ch!r}', ch=ch),
+ f'expected chomping or indentation indicators, but found {ch!r}',
self.reader.get_mark(),
)
return chomping, increment
- def scan_block_scalar_ignored_line(self, start_mark):
- # type: (Any) -> Any
+ def scan_block_scalar_ignored_line(self, start_mark: Any) -> Any:
# See the specification for details.
srp = self.reader.peek
srf = self.reader.forward
@@ -1337,14 +1272,13 @@ class Scanner:
raise ScannerError(
'while scanning a block scalar',
start_mark,
- _F('expected a comment or a line break, but found {ch!r}', ch=ch),
+ f'expected a comment or a line break, but found {ch!r}',
self.reader.get_mark(),
)
self.scan_line_break()
return comment
- def scan_block_scalar_indentation(self):
- # type: () -> Any
+ def scan_block_scalar_indentation(self) -> Any:
# See the specification for details.
srp = self.reader.peek
srf = self.reader.forward
@@ -1361,8 +1295,7 @@ class Scanner:
max_indent = self.reader.column
return chunks, max_indent, end_mark
- def scan_block_scalar_breaks(self, indent):
- # type: (int) -> Any
+ def scan_block_scalar_breaks(self, indent: int) -> Any:
# See the specification for details.
chunks = []
srp = self.reader.peek
@@ -1377,8 +1310,7 @@ class Scanner:
srf()
return chunks, end_mark
- def scan_flow_scalar(self, style):
- # type: (Any) -> Any
+ def scan_flow_scalar(self, style: Any) -> Any:
# See the specification for details.
# Note that we loose indentation rules for quoted scalars. Quoted
# scalars don't need to adhere indentation because " and ' clearly
@@ -1390,7 +1322,7 @@ class Scanner:
else:
double = False
srp = self.reader.peek
- chunks = [] # type: List[Any]
+ chunks: List[Any] = []
start_mark = self.reader.get_mark()
quote = srp()
self.reader.forward()
@@ -1425,10 +1357,9 @@ class Scanner:
ESCAPE_CODES = {'x': 2, 'u': 4, 'U': 8}
- def scan_flow_scalar_non_spaces(self, double, start_mark):
- # type: (Any, Any) -> Any
+ def scan_flow_scalar_non_spaces(self, double: Any, start_mark: Any) -> Any:
# See the specification for details.
- chunks = [] # type: List[Any]
+ chunks: List[Any] = []
srp = self.reader.peek
srf = self.reader.forward
while True:
@@ -1459,12 +1390,8 @@ class Scanner:
raise ScannerError(
'while scanning a double-quoted scalar',
start_mark,
- _F(
- 'expected escape sequence of {length:d} hexdecimal '
- 'numbers, but found {srp_call!r}',
- length=length,
- srp_call=srp(k),
- ),
+ f'expected escape sequence of {length:d} '
+ f'hexdecimal numbers, but found {srp(k)!r}',
self.reader.get_mark(),
)
code = int(self.reader.prefix(length), 16)
@@ -1477,14 +1404,13 @@ class Scanner:
raise ScannerError(
'while scanning a double-quoted scalar',
start_mark,
- _F('found unknown escape character {ch!r}', ch=ch),
+ f'found unknown escape character {ch!r}',
self.reader.get_mark(),
)
else:
return chunks
- def scan_flow_scalar_spaces(self, double, start_mark):
- # type: (Any, Any) -> Any
+ def scan_flow_scalar_spaces(self, double: Any, start_mark: Any) -> Any:
# See the specification for details.
srp = self.reader.peek
chunks = []
@@ -1513,10 +1439,9 @@ class Scanner:
chunks.append(whitespaces)
return chunks
- def scan_flow_scalar_breaks(self, double, start_mark):
- # type: (Any, Any) -> Any
+ def scan_flow_scalar_breaks(self, double: Any, start_mark: Any) -> Any:
# See the specification for details.
- chunks = [] # type: List[Any]
+ chunks: List[Any] = []
srp = self.reader.peek
srf = self.reader.forward
while True:
@@ -1537,8 +1462,7 @@ class Scanner:
else:
return chunks
- def scan_plain(self):
- # type: () -> Any
+ def scan_plain(self) -> Any:
# See the specification for details.
# We add an additional restriction for the flow context:
# plain scalars in the flow context cannot contain ',', ': ' and '?'.
@@ -1546,7 +1470,7 @@ class Scanner:
# Indentation rules are loosed for the flow context.
srp = self.reader.peek
srf = self.reader.forward
- chunks = [] # type: List[Any]
+ chunks: List[Any] = []
start_mark = self.reader.get_mark()
end_mark = start_mark
indent = self.indent + 1
@@ -1554,7 +1478,7 @@ class Scanner:
# document separators at the beginning of the line.
# if indent == 0:
# indent = 1
- spaces = [] # type: List[Any]
+ spaces: List[Any] = []
while True:
length = 0
if srp() == '#':
@@ -1626,8 +1550,7 @@ class Scanner:
return token
- def scan_plain_spaces(self, indent, start_mark):
- # type: (Any, Any) -> Any
+ def scan_plain_spaces(self, indent: Any, start_mark: Any) -> Any:
# See the specification for details.
# The specification is really confusing about tabs in plain scalars.
# We just forbid them completely. Do not use tabs in YAML!
@@ -1664,8 +1587,7 @@ class Scanner:
chunks.append(whitespaces)
return chunks
- def scan_tag_handle(self, name, start_mark):
- # type: (Any, Any) -> Any
+ def scan_tag_handle(self, name: Any, start_mark: Any) -> Any:
# See the specification for details.
# For some strange reasons, the specification does not allow '_' in
# tag handles. I have allowed it anyway.
@@ -1673,9 +1595,9 @@ class Scanner:
ch = srp()
if ch != '!':
raise ScannerError(
- _F('while scanning an {name!s}', name=name),
+ f'while scanning an {name!s}',
start_mark,
- _F("expected '!', but found {ch!r}", ch=ch),
+ f"expected '!', but found {ch!r}",
self.reader.get_mark(),
)
length = 1
@@ -1687,9 +1609,9 @@ class Scanner:
if ch != '!':
self.reader.forward(length)
raise ScannerError(
- _F('while scanning an {name!s}', name=name),
+ f'while scanning an {name!s}',
start_mark,
- _F("expected '!', but found {ch!r}", ch=ch),
+ f"expected '!' but found {ch!r}",
self.reader.get_mark(),
)
length += 1
@@ -1697,8 +1619,7 @@ class Scanner:
self.reader.forward(length)
return value
- def scan_tag_uri(self, name, start_mark):
- # type: (Any, Any) -> Any
+ def scan_tag_uri(self, name: Any, start_mark: Any) -> Any:
# See the specification for details.
# Note: we do not check if URI is well-formed.
srp = self.reader.peek
@@ -1726,32 +1647,28 @@ class Scanner:
length = 0
if not chunks:
raise ScannerError(
- _F('while parsing an {name!s}', name=name),
+ f'while parsing an {name!s}',
start_mark,
- _F('expected URI, but found {ch!r}', ch=ch),
+ f'expected URI, but found {ch!r}',
self.reader.get_mark(),
)
return "".join(chunks)
- def scan_uri_escapes(self, name, start_mark):
- # type: (Any, Any) -> Any
+ def scan_uri_escapes(self, name: Any, start_mark: Any) -> Any:
# See the specification for details.
srp = self.reader.peek
srf = self.reader.forward
- code_bytes = [] # type: List[Any]
+ code_bytes: List[Any] = []
mark = self.reader.get_mark()
while srp() == '%':
srf()
for k in range(2):
if srp(k) not in '0123456789ABCDEFabcdef':
raise ScannerError(
- _F('while scanning an {name!s}', name=name),
+ f'while scanning an {name!s}',
start_mark,
- _F(
- 'expected URI escape sequence of 2 hexdecimal numbers,'
- ' but found {srp_call!r}',
- srp_call=srp(k),
- ),
+ f'expected URI escape sequence of 2 hexdecimal numbers, '
+ f'but found {srp(k)!r}',
self.reader.get_mark(),
)
code_bytes.append(int(self.reader.prefix(2), 16))
@@ -1759,13 +1676,10 @@ class Scanner:
try:
value = bytes(code_bytes).decode('utf-8')
except UnicodeDecodeError as exc:
- raise ScannerError(
- _F('while scanning an {name!s}', name=name), start_mark, str(exc), mark
- )
+ raise ScannerError(f'while scanning an {name!s}', start_mark, str(exc), mark)
return value
- def scan_line_break(self):
- # type: () -> Any
+ def scan_line_break(self) -> Any:
# Transforms:
# '\r\n' : '\n'
# '\r' : '\n'
@@ -1788,8 +1702,7 @@ class Scanner:
class RoundTripScanner(Scanner):
- def check_token(self, *choices):
- # type: (Any) -> bool
+ def check_token(self, *choices: Any) -> bool:
# Check if the next token is one of the given types.
while self.need_more_tokens():
self.fetch_more_tokens()
@@ -1802,8 +1715,7 @@ class RoundTripScanner(Scanner):
return True
return False
- def peek_token(self):
- # type: () -> Any
+ def peek_token(self) -> Any:
# Return the next token, but do not delete if from the queue.
while self.need_more_tokens():
self.fetch_more_tokens()
@@ -1812,10 +1724,9 @@ class RoundTripScanner(Scanner):
return self.tokens[0]
return None
- def _gather_comments(self):
- # type: () -> Any
+ def _gather_comments(self) -> Any:
"""combine multiple comment lines and assign to next non-comment-token"""
- comments = [] # type: List[Any]
+ comments: List[Any] = []
if not self.tokens:
return comments
if isinstance(self.tokens[0], CommentToken):
@@ -1837,8 +1748,7 @@ class RoundTripScanner(Scanner):
if not self.done and len(self.tokens) < 2:
self.fetch_more_tokens()
- def get_token(self):
- # type: () -> Any
+ def get_token(self) -> Any:
# Return the next token.
while self.need_more_tokens():
self.fetch_more_tokens()
@@ -1891,8 +1801,7 @@ class RoundTripScanner(Scanner):
return self.tokens.pop(0)
return None
- def fetch_comment(self, comment):
- # type: (Any) -> None
+ def fetch_comment(self, comment: Any) -> None:
value, start_mark, end_mark = comment
while value and value[-1] == ' ':
# empty line within indented key context
@@ -1902,8 +1811,7 @@ class RoundTripScanner(Scanner):
# scanner
- def scan_to_next_token(self):
- # type: () -> Any
+ def scan_to_next_token(self) -> Any:
# We ignore spaces, line breaks and comments.
# If we find a line break in the block context, we set the flag
# `allow_simple_key` on.
@@ -1946,7 +1854,7 @@ class RoundTripScanner(Scanner):
break
comment += ch
srf()
- # gather any blank lines following the comment too
+ # gather any blank lines following the comment
ch = self.scan_line_break()
while len(ch) > 0:
comment += ch
@@ -1975,8 +1883,7 @@ class RoundTripScanner(Scanner):
found = True
return None
- def scan_line_break(self, empty_line=False):
- # type: (bool) -> Text
+ def scan_line_break(self, empty_line: bool = False) -> Text:
# Transforms:
# '\r\n' : '\n'
# '\r' : '\n'
@@ -1985,7 +1892,7 @@ class RoundTripScanner(Scanner):
# '\u2028' : '\u2028'
# '\u2029 : '\u2029'
# default : ''
- ch = self.reader.peek() # type: Text
+ ch: Text = self.reader.peek()
if ch in '\r\n\x85':
if self.reader.prefix(2) == '\r\n':
self.reader.forward(2)
@@ -2000,8 +1907,7 @@ class RoundTripScanner(Scanner):
return ch
return ""
- def scan_block_scalar(self, style, rt=True):
- # type: (Any, Optional[bool]) -> Any
+ def scan_block_scalar(self, style: Any, rt: Optional[bool] = True) -> Any:
return Scanner.scan_block_scalar(self, style, rt=rt)
@@ -2016,8 +1922,7 @@ KEYCMNT = 0 # 1
class CommentBase:
__slots__ = ('value', 'line', 'column', 'used', 'function', 'fline', 'ufun', 'uline')
- def __init__(self, value, line, column):
- # type: (Any, Any, Any) -> None
+ def __init__(self, value: Any, line: Any, column: Any) -> None:
self.value = value
self.line = line
self.column = column
@@ -2028,73 +1933,57 @@ class CommentBase:
self.ufun = None
self.uline = None
- def set_used(self, v='+'):
- # type: (Any) -> None
+ def set_used(self, v: Any = '+') -> None:
self.used = v
info = inspect.getframeinfo(inspect.stack()[1][0])
self.ufun = info.function # type: ignore
self.uline = info.lineno # type: ignore
- def set_assigned(self):
- # type: () -> None
+ def set_assigned(self) -> None:
self.used = '|'
- def __str__(self):
- # type: () -> str
- return _F('{value}', value=self.value) # type: ignore
-
- def __repr__(self):
- # type: () -> str
- return _F('{value!r}', value=self.value) # type: ignore
-
- def info(self):
- # type: () -> str
- return _F( # type: ignore
- '{name}{used} {line:2}:{column:<2} "{value:40s} {function}:{fline} {ufun}:{uline}',
- name=self.name, # type: ignore
- line=self.line,
- column=self.column,
- value=self.value + '"',
- used=self.used,
- function=self.function,
- fline=self.fline,
- ufun=self.ufun,
- uline=self.uline,
+ def __str__(self) -> str:
+ return f'{self.value}'
+
+ def __repr__(self) -> str:
+ return f'{self.value!r}'
+
+ def info(self) -> str:
+ xv = self.value + '"'
+ name = self.name # type: ignore
+ return (
+ f'{name}{self.used} {self.line:2}:{self.column:<2} "{xv:40s} '
+ f'{self.function}:{self.fline} {self.ufun}:{self.uline}'
)
class EOLComment(CommentBase):
name = 'EOLC'
- def __init__(self, value, line, column):
- # type: (Any, Any, Any) -> None
+ def __init__(self, value: Any, line: Any, column: Any) -> None:
super().__init__(value, line, column)
class FullLineComment(CommentBase):
name = 'FULL'
- def __init__(self, value, line, column):
- # type: (Any, Any, Any) -> None
+ def __init__(self, value: Any, line: Any, column: Any) -> None:
super().__init__(value, line, column)
class BlankLineComment(CommentBase):
name = 'BLNK'
- def __init__(self, value, line, column):
- # type: (Any, Any, Any) -> None
+ def __init__(self, value: Any, line: Any, column: Any) -> None:
super().__init__(value, line, column)
class ScannedComments:
- def __init__(self):
- # type: (Any) -> None
+ def __init__(self: Any) -> None:
self.comments = {} # type: ignore
self.unused = [] # type: ignore
- def add_eol_comment(self, comment, column, line):
- # type: (Any, Any, Any) -> Any
+ def add_eol_comment(self, comment: Any, column: Any, line: Any) -> Any:
# info = inspect.getframeinfo(inspect.stack()[1][0])
if comment.count('\n') == 1:
assert comment[-1] == '\n'
@@ -2104,8 +1993,7 @@ class ScannedComments:
self.unused.append(line)
return retval
- def add_blank_line(self, comment, column, line):
- # type: (Any, Any, Any) -> Any
+ def add_blank_line(self, comment: Any, column: Any, line: Any) -> Any:
# info = inspect.getframeinfo(inspect.stack()[1][0])
assert comment.count('\n') == 1 and comment[-1] == '\n'
assert line not in self.comments
@@ -2113,8 +2001,7 @@ class ScannedComments:
self.unused.append(line)
return retval
- def add_full_line_comment(self, comment, column, line):
- # type: (Any, Any, Any) -> Any
+ def add_full_line_comment(self, comment: Any, column: Any, line: Any) -> Any:
# info = inspect.getframeinfo(inspect.stack()[1][0])
assert comment.count('\n') == 1 and comment[-1] == '\n'
# if comment.startswith('# C12'):
@@ -2124,30 +2011,21 @@ class ScannedComments:
self.unused.append(line)
return retval
- def __getitem__(self, idx):
- # type: (Any) -> Any
+ def __getitem__(self, idx: Any) -> Any:
return self.comments[idx]
- def __str__(self):
- # type: () -> Any
+ def __str__(self) -> Any:
return (
'ParsedComments:\n '
- + '\n '.join(
- (
- _F('{lineno:2} {x}', lineno=lineno, x=x.info())
- for lineno, x in self.comments.items()
- )
- )
+ + '\n '.join((f'{lineno:2} {x.info()}' for lineno, x in self.comments.items()))
+ '\n'
)
- def last(self):
- # type: () -> str
+ def last(self) -> str:
lineno, x = list(self.comments.items())[-1]
- return _F('{lineno:2} {x}\n', lineno=lineno, x=x.info()) # type: ignore
+ return f'{lineno:2} {x.info()}\n'
- def any_unprocessed(self):
- # type: () -> bool
+ def any_unprocessed(self) -> bool:
# ToDo: might want to differentiate based on lineno
return len(self.unused) > 0
# for lno, comment in reversed(self.comments.items()):
@@ -2155,8 +2033,7 @@ class ScannedComments:
# return True
# return False
- def unprocessed(self, use=False):
- # type: (Any) -> Any
+ def unprocessed(self, use: Any = False) -> Any:
while len(self.unused) > 0:
first = self.unused.pop(0) if use else self.unused[0]
info = inspect.getframeinfo(inspect.stack()[1][0])
@@ -2165,8 +2042,7 @@ class ScannedComments:
if use:
self.comments[first].set_used()
- def assign_pre(self, token):
- # type: (Any) -> Any
+ def assign_pre(self, token: Any) -> Any:
token_line = token.start_mark.line
info = inspect.getframeinfo(inspect.stack()[1][0])
xprintf('assign_pre', token_line, self.unused, info.function, info.lineno)
@@ -2179,8 +2055,7 @@ class ScannedComments:
token.add_comment_pre(first)
return gobbled
- def assign_eol(self, tokens):
- # type: (Any) -> Any
+ def assign_eol(self, tokens: Any) -> Any:
try:
comment_line = self.unused[0]
except IndexError:
@@ -2235,8 +2110,7 @@ class ScannedComments:
sys.exit(0)
- def assign_post(self, token):
- # type: (Any) -> Any
+ def assign_post(self, token: Any) -> Any:
token_line = token.start_mark.line
info = inspect.getframeinfo(inspect.stack()[1][0])
xprintf('assign_post', token_line, self.unused, info.function, info.lineno)
@@ -2249,28 +2123,21 @@ class ScannedComments:
token.add_comment_post(first)
return gobbled
- def str_unprocessed(self):
- # type: () -> Any
+ def str_unprocessed(self) -> Any:
return ''.join(
- (
- _F(' {ind:2} {x}\n', ind=ind, x=x.info())
- for ind, x in self.comments.items()
- if x.used == ' '
- )
+ (f' {ind:2} {x.info()}\n' for ind, x in self.comments.items() if x.used == ' ')
)
class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments
- def __init__(self, *arg, **kw):
- # type: (Any, Any) -> None
+ def __init__(self, *arg: Any, **kw: Any) -> None:
super().__init__(*arg, **kw)
assert self.loader is not None
# comments isinitialised on .need_more_tokens and persist on
# self.loader.parsed_comments
self.comments = None
- def get_token(self):
- # type: () -> Any
+ def get_token(self) -> Any:
# Return the next token.
while self.need_more_tokens():
self.fetch_more_tokens()
@@ -2282,8 +2149,7 @@ class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments
self.tokens_taken += 1
return self.tokens.pop(0)
- def need_more_tokens(self):
- # type: () -> bool
+ def need_more_tokens(self) -> bool:
if self.comments is None:
self.loader.parsed_comments = self.comments = ScannedComments() # type: ignore
if self.done:
@@ -2309,8 +2175,7 @@ class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments
self.comments.assign_eol(self.tokens) # type: ignore
return False
- def scan_to_next_token(self):
- # type: () -> None
+ def scan_to_next_token(self) -> None:
srp = self.reader.peek
srf = self.reader.forward
if self.reader.index == 0 and srp() == '\uFEFF':
@@ -2373,8 +2238,7 @@ class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments
found = True
return None
- def scan_empty_or_full_line_comments(self):
- # type: () -> None
+ def scan_empty_or_full_line_comments(self) -> None:
blmark = self.reader.get_mark()
assert blmark.column == 0
blanks = ""
@@ -2413,8 +2277,7 @@ class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments
self.reader.forward()
ch = self.reader.peek()
- def scan_block_scalar_ignored_line(self, start_mark):
- # type: (Any) -> Any
+ def scan_block_scalar_ignored_line(self, start_mark: Any) -> Any:
# See the specification for details.
srp = self.reader.peek
srf = self.reader.forward
@@ -2435,7 +2298,7 @@ class RoundTripScannerSC(Scanner): # RoundTripScanner Split Comments
raise ScannerError(
'while scanning a block scalar',
start_mark,
- _F('expected a comment or a line break, but found {ch!r}', ch=ch),
+ f'expected a comment or a line break, but found {ch!r}',
self.reader.get_mark(),
)
if comment is not None: