diff options
author | Anthon van der Neut <anthon@mnt.org> | 2017-03-21 17:18:18 +0100 |
---|---|---|
committer | Anthon van der Neut <anthon@mnt.org> | 2017-03-21 17:18:18 +0100 |
commit | 9ac44a0873d51d63150b0f1dc1d009b206577a29 (patch) | |
tree | 44fc2ecbdba2a6a63544097d7b9f63d8f87d5aae /scanner.py | |
parent | c8568f99215aaa910953287f63a25459e3800dfc (diff) | |
download | ruamel.yaml-9ac44a0873d51d63150b0f1dc1d009b206577a29.tar.gz |
update for mypy --strict, prepare de-inheritance (Loader/Dumper)0.14.0
Diffstat (limited to 'scanner.py')
-rw-r--r-- | scanner.py | 778 |
1 files changed, 411 insertions, 367 deletions
@@ -29,6 +29,9 @@ from __future__ import print_function, absolute_import, division, unicode_litera # # Read comments in the Scanner code for more details. # + +from typing import Any, Dict, Optional, List, Union, Text # NOQA + from ruamel.yaml.error import MarkedYAMLError from ruamel.yaml.tokens import * # NOQA from ruamel.yaml.compat import utf8, unichr, PY3, check_anchorname_char @@ -44,6 +47,7 @@ class SimpleKey(object): # See below simple keys treatment. def __init__(self, token_number, required, index, line, column, mark): + # type: (Any, Any, int, int, int, Any) -> None self.token_number = token_number self.required = required self.index = index @@ -54,7 +58,8 @@ class SimpleKey(object): class Scanner(object): - def __init__(self): + def __init__(self, loader=None): + # type: (Any) -> None """Initialize the scanner.""" # It is assumed that Scanner and Reader will have a common descendant. # Reader do the dirty work of checking for BOM and converting the @@ -65,6 +70,10 @@ class Scanner(object): # self.prefix(l=1) # peek the next l characters # self.forward(l=1) # read the next l characters and move the pointer + self.loader = loader + if self.loader is not None: + self.loader._scanner = self + # Had we reached the end of the stream? self.done = False @@ -73,7 +82,7 @@ class Scanner(object): self.flow_level = 0 # List of processed tokens that are not yet emitted. - self.tokens = [] + self.tokens = [] # type: List[Any] # Add the STREAM-START token. self.fetch_stream_start() @@ -85,7 +94,7 @@ class Scanner(object): self.indent = -1 # Past indentation levels. - self.indents = [] + self.indents = [] # type: List[int] # Variables related to simple keys treatment. @@ -115,15 +124,21 @@ class Scanner(object): # (token_number, required, index, line, column, mark) # A simple key may start with ALIAS, ANCHOR, TAG, SCALAR(flow), # '[', or '{' tokens. - self.possible_simple_keys = {} + self.possible_simple_keys = {} # type: Dict[Any, Any] + + @property + def reader(self): + # type: () -> Any + return self.loader._reader # Public methods. def check_token(self, *choices): + # type: (Any) -> bool # Check if the next token is one of the given types. while self.need_more_tokens(): self.fetch_more_tokens() - if self.tokens: + if bool(self.tokens): if not choices: return True for choice in choices: @@ -132,23 +147,26 @@ class Scanner(object): return False def peek_token(self): + # type: () -> Any # Return the next token, but do not delete if from the queue. while self.need_more_tokens(): self.fetch_more_tokens() - if self.tokens: + if bool(self.tokens): return self.tokens[0] def get_token(self): + # type: () -> Any # Return the next token. while self.need_more_tokens(): self.fetch_more_tokens() - if self.tokens: + if bool(self.tokens): self.tokens_taken += 1 return self.tokens.pop(0) # Private methods. def need_more_tokens(self): + # type: () -> bool if self.done: return False if not self.tokens: @@ -158,24 +176,27 @@ class Scanner(object): self.stale_possible_simple_keys() if self.next_possible_simple_key() == self.tokens_taken: return True + return False - def fetch_more_tokens(self): + def fetch_comment(self, comment): + # type: (Any) -> None + raise NotImplementedError + def fetch_more_tokens(self): + # type: () -> Any # Eat whitespaces and comments until we reach the next token. comment = self.scan_to_next_token() - if comment is not None: # never happens for base scanner return self.fetch_comment(comment) - # Remove obsolete possible simple keys. self.stale_possible_simple_keys() # Compare the current indentation and column. It may add some tokens # and decrease the current indentation level. - self.unwind_indent(self.column) + self.unwind_indent(self.reader.column) # Peek the next character. - ch = self.peek() + ch = self.reader.peek() # Is it the end of stream? if ch == u'\0': @@ -266,11 +287,12 @@ class Scanner(object): # No? It's an error. Let's produce a nice error message. raise ScannerError("while scanning for the next token", None, "found character %r that cannot start any token" - % utf8(ch), self.get_mark()) + % utf8(ch), self.reader.get_mark()) # Simple keys treatment. def next_possible_simple_key(self): + # type: () -> Any # Return the number of the nearest possible simple key. Actually we # don't need to loop through the whole dictionary. We may replace it # with the following code: @@ -286,6 +308,7 @@ class Scanner(object): return min_token_number def stale_possible_simple_keys(self): + # type: () -> None # Remove entries that are no longer possible simple keys. According to # the YAML specification, simple keys # - should be limited to a single line, @@ -294,21 +317,22 @@ class Scanner(object): # height (may cause problems if indentation is broken though). for level in list(self.possible_simple_keys): key = self.possible_simple_keys[level] - if key.line != self.line \ - or self.index-key.index > 1024: + if key.line != self.reader.line \ + or self.reader.index - key.index > 1024: if key.required: raise ScannerError( "while scanning a simple key", key.mark, - "could not find expected ':'", self.get_mark()) + "could not find expected ':'", self.reader.get_mark()) del self.possible_simple_keys[level] def save_possible_simple_key(self): + # type: () -> None # The next token may start a simple key. We check if it's possible # and save its position. This function is called for # ALIAS, ANCHOR, TAG, SCALAR(flow), '[', and '{'. # Check if a simple key is required at the current position. - required = not self.flow_level and self.indent == self.column + required = not self.flow_level and self.indent == self.reader.column # The next token might be a simple key. Let's save it's number and # position. @@ -317,10 +341,12 @@ class Scanner(object): token_number = self.tokens_taken+len(self.tokens) key = SimpleKey( token_number, required, - self.index, self.line, self.column, self.get_mark()) + self.reader.index, self.reader.line, self.reader.column, + self.reader.get_mark()) self.possible_simple_keys[self.flow_level] = key def remove_possible_simple_key(self): + # type: () -> None # Remove the saved possible key position at the current flow level. if self.flow_level in self.possible_simple_keys: key = self.possible_simple_keys[self.flow_level] @@ -328,14 +354,14 @@ class Scanner(object): if key.required: raise ScannerError( "while scanning a simple key", key.mark, - "could not find expected ':'", self.get_mark()) + "could not find expected ':'", self.reader.get_mark()) del self.possible_simple_keys[self.flow_level] # Indentation functions. def unwind_indent(self, column): - + # type: (Any) -> None # In flow context, tokens should respect indentation. # Actually the condition should be `self.indent >= column` according to # the spec. But this condition will prohibit intuitively correct @@ -346,20 +372,21 @@ class Scanner(object): # if self.flow_level and self.indent > column: # raise ScannerError(None, None, # "invalid intendation or unclosed '[' or '{'", - # self.get_mark()) + # self.reader.get_mark()) # In the flow context, indentation is ignored. We make the scanner less # restrictive then specification requires. - if self.flow_level: + if bool(self.flow_level): return # In block context, we may need to issue the BLOCK-END tokens. while self.indent > column: - mark = self.get_mark() + mark = self.reader.get_mark() self.indent = self.indents.pop() self.tokens.append(BlockEndToken(mark, mark)) def add_indent(self, column): + # type: (int) -> bool # Check if we need to increase indentation. if self.indent < column: self.indents.append(self.indent) @@ -370,37 +397,32 @@ class Scanner(object): # Fetchers. def fetch_stream_start(self): + # type: () -> None # We always add STREAM-START as the first token and STREAM-END as the # last token. - # Read the token. - mark = self.get_mark() - + mark = self.reader.get_mark() # Add STREAM-START. self.tokens.append(StreamStartToken(mark, mark, - encoding=self.encoding)) + encoding=self.reader.encoding)) def fetch_stream_end(self): - + # type: () -> None # Set the current intendation to -1. self.unwind_indent(-1) - # Reset simple keys. self.remove_possible_simple_key() self.allow_simple_key = False - self.possible_simple_keys = {} - + self.possible_simple_keys = {} # type: Dict[Any, Any] # Read the token. - mark = self.get_mark() - + mark = self.reader.get_mark() # Add STREAM-END. self.tokens.append(StreamEndToken(mark, mark)) - # The steam is finished. self.done = True def fetch_directive(self): - + # type: () -> None # Set the current intendation to -1. self.unwind_indent(-1) @@ -412,13 +434,15 @@ class Scanner(object): self.tokens.append(self.scan_directive()) def fetch_document_start(self): + # type: () -> None self.fetch_document_indicator(DocumentStartToken) def fetch_document_end(self): + # type: () -> None self.fetch_document_indicator(DocumentEndToken) def fetch_document_indicator(self, TokenClass): - + # type: (Any) -> None # Set the current intendation to -1. self.unwind_indent(-1) @@ -428,106 +452,97 @@ class Scanner(object): self.allow_simple_key = False # Add DOCUMENT-START or DOCUMENT-END. - start_mark = self.get_mark() - self.forward(3) - end_mark = self.get_mark() + start_mark = self.reader.get_mark() + self.reader.forward(3) + end_mark = self.reader.get_mark() self.tokens.append(TokenClass(start_mark, end_mark)) def fetch_flow_sequence_start(self): + # type: () -> None self.fetch_flow_collection_start(FlowSequenceStartToken) def fetch_flow_mapping_start(self): + # type: () -> None self.fetch_flow_collection_start(FlowMappingStartToken) def fetch_flow_collection_start(self, TokenClass): - + # type: (Any) -> None # '[' and '{' may start a simple key. self.save_possible_simple_key() - # Increase the flow level. self.flow_level += 1 - # Simple keys are allowed after '[' and '{'. self.allow_simple_key = True - # Add FLOW-SEQUENCE-START or FLOW-MAPPING-START. - start_mark = self.get_mark() - self.forward() - end_mark = self.get_mark() + start_mark = self.reader.get_mark() + self.reader.forward() + end_mark = self.reader.get_mark() self.tokens.append(TokenClass(start_mark, end_mark)) def fetch_flow_sequence_end(self): + # type: () -> None self.fetch_flow_collection_end(FlowSequenceEndToken) def fetch_flow_mapping_end(self): + # type: () -> None self.fetch_flow_collection_end(FlowMappingEndToken) def fetch_flow_collection_end(self, TokenClass): - + # type: (Any) -> None # Reset possible simple key on the current level. self.remove_possible_simple_key() - # Decrease the flow level. self.flow_level -= 1 - # No simple keys after ']' or '}'. self.allow_simple_key = False - # Add FLOW-SEQUENCE-END or FLOW-MAPPING-END. - start_mark = self.get_mark() - self.forward() - end_mark = self.get_mark() + start_mark = self.reader.get_mark() + self.reader.forward() + end_mark = self.reader.get_mark() self.tokens.append(TokenClass(start_mark, end_mark)) def fetch_flow_entry(self): - + # type: () -> None # Simple keys are allowed after ','. self.allow_simple_key = True - # Reset possible simple key on the current level. self.remove_possible_simple_key() - # Add FLOW-ENTRY. - start_mark = self.get_mark() - self.forward() - end_mark = self.get_mark() + start_mark = self.reader.get_mark() + self.reader.forward() + end_mark = self.reader.get_mark() self.tokens.append(FlowEntryToken(start_mark, end_mark)) def fetch_block_entry(self): - + # type: () -> None # Block context needs additional checks. if not self.flow_level: - # Are we allowed to start a new entry? if not self.allow_simple_key: raise ScannerError(None, None, "sequence entries are not allowed here", - self.get_mark()) - + self.reader.get_mark()) # We may need to add BLOCK-SEQUENCE-START. - if self.add_indent(self.column): - mark = self.get_mark() + if self.add_indent(self.reader.column): + mark = self.reader.get_mark() self.tokens.append(BlockSequenceStartToken(mark, mark)) - # It's an error for the block entry to occur in the flow context, # but we let the parser detect this. else: pass - # Simple keys are allowed after '-'. self.allow_simple_key = True - # Reset possible simple key on the current level. self.remove_possible_simple_key() # Add BLOCK-ENTRY. - start_mark = self.get_mark() - self.forward() - end_mark = self.get_mark() + start_mark = self.reader.get_mark() + self.reader.forward() + end_mark = self.reader.get_mark() self.tokens.append(BlockEntryToken(start_mark, end_mark)) def fetch_key(self): - + # type: () -> None # Block context needs additional checks. if not self.flow_level: @@ -535,11 +550,11 @@ class Scanner(object): if not self.allow_simple_key: raise ScannerError(None, None, "mapping keys are not allowed here", - self.get_mark()) + self.reader.get_mark()) # We may need to add BLOCK-MAPPING-START. - if self.add_indent(self.column): - mark = self.get_mark() + if self.add_indent(self.reader.column): + mark = self.reader.get_mark() self.tokens.append(BlockMappingStartToken(mark, mark)) # Simple keys are allowed after '?' in the block context. @@ -549,13 +564,13 @@ class Scanner(object): self.remove_possible_simple_key() # Add KEY. - start_mark = self.get_mark() - self.forward() - end_mark = self.get_mark() + start_mark = self.reader.get_mark() + self.reader.forward() + end_mark = self.reader.get_mark() self.tokens.append(KeyToken(start_mark, end_mark)) def fetch_value(self): - + # type: () -> None # Do we determine a simple key? if self.flow_level in self.possible_simple_keys: # Add KEY. @@ -588,14 +603,14 @@ class Scanner(object): if not self.allow_simple_key: raise ScannerError(None, None, "mapping values are not allowed here", - self.get_mark()) + self.reader.get_mark()) # If this value starts a new block mapping, we need to add # BLOCK-MAPPING-START. It will be detected as an error later by # the parser. if not self.flow_level: - if self.add_indent(self.column): - mark = self.get_mark() + if self.add_indent(self.reader.column): + mark = self.reader.get_mark() self.tokens.append(BlockMappingStartToken(mark, mark)) # Simple keys are allowed after ':' in the block context. @@ -605,142 +620,134 @@ class Scanner(object): self.remove_possible_simple_key() # Add VALUE. - start_mark = self.get_mark() - self.forward() - end_mark = self.get_mark() + start_mark = self.reader.get_mark() + self.reader.forward() + end_mark = self.reader.get_mark() self.tokens.append(ValueToken(start_mark, end_mark)) def fetch_alias(self): - + # type: () -> None # ALIAS could be a simple key. self.save_possible_simple_key() - # No simple keys after ALIAS. self.allow_simple_key = False - # Scan and add ALIAS. self.tokens.append(self.scan_anchor(AliasToken)) def fetch_anchor(self): - + # type: () -> None # ANCHOR could start a simple key. self.save_possible_simple_key() - # No simple keys after ANCHOR. self.allow_simple_key = False - # Scan and add ANCHOR. self.tokens.append(self.scan_anchor(AnchorToken)) def fetch_tag(self): - + # type: () -> None # TAG could start a simple key. self.save_possible_simple_key() - # No simple keys after TAG. self.allow_simple_key = False - # Scan and add TAG. self.tokens.append(self.scan_tag()) def fetch_literal(self): + # type: () -> None self.fetch_block_scalar(style='|') def fetch_folded(self): + # type: () -> None self.fetch_block_scalar(style='>') def fetch_block_scalar(self, style): - + # type: (Any) -> None # A simple key may follow a block scalar. self.allow_simple_key = True - # Reset possible simple key on the current level. self.remove_possible_simple_key() - # Scan and add SCALAR. self.tokens.append(self.scan_block_scalar(style)) def fetch_single(self): + # type: () -> None self.fetch_flow_scalar(style='\'') def fetch_double(self): + # type: () -> None self.fetch_flow_scalar(style='"') def fetch_flow_scalar(self, style): - + # type: (Any) -> None # A flow scalar could be a simple key. self.save_possible_simple_key() - # No simple keys after flow scalars. self.allow_simple_key = False - # Scan and add SCALAR. self.tokens.append(self.scan_flow_scalar(style)) def fetch_plain(self): - + # type: () -> None # A plain scalar could be a simple key. self.save_possible_simple_key() - # No simple keys after plain scalars. But note that `scan_plain` will # change this flag if the scan is finished at the beginning of the # line. self.allow_simple_key = False - # Scan and add SCALAR. May change `allow_simple_key`. self.tokens.append(self.scan_plain()) # Checkers. def check_directive(self): - + # type: () -> Any # DIRECTIVE: ^ '%' ... # The '%' indicator is already checked. - if self.column == 0: + if self.reader.column == 0: return True + return None def check_document_start(self): - + # type: () -> Any # DOCUMENT-START: ^ '---' (' '|'\n') - if self.column == 0: - if self.prefix(3) == u'---' \ - and self.peek(3) in u'\0 \t\r\n\x85\u2028\u2029': + if self.reader.column == 0: + if self.reader.prefix(3) == u'---' \ + and self.reader.peek(3) in u'\0 \t\r\n\x85\u2028\u2029': return True + return None def check_document_end(self): - + # type: () -> Any # DOCUMENT-END: ^ '...' (' '|'\n') - if self.column == 0: - if self.prefix(3) == u'...' \ - and self.peek(3) in u'\0 \t\r\n\x85\u2028\u2029': + if self.reader.column == 0: + if self.reader.prefix(3) == u'...' \ + and self.reader.peek(3) in u'\0 \t\r\n\x85\u2028\u2029': return True + return None def check_block_entry(self): - + # type: () -> Any # BLOCK-ENTRY: '-' (' '|'\n') - return self.peek(1) in u'\0 \t\r\n\x85\u2028\u2029' + return self.reader.peek(1) in u'\0 \t\r\n\x85\u2028\u2029' def check_key(self): - + # type: () -> Any # KEY(flow context): '?' - if self.flow_level: + if bool(self.flow_level): return True - # KEY(block context): '?' (' '|'\n') - else: - return self.peek(1) in u'\0 \t\r\n\x85\u2028\u2029' + return self.reader.peek(1) in u'\0 \t\r\n\x85\u2028\u2029' def check_value(self): - + # type: () -> Any # VALUE(flow context): ':' - if self.flow_level: + if bool(self.flow_level): return True - # VALUE(block context): ':' (' '|'\n') - else: - return self.peek(1) in u'\0 \t\r\n\x85\u2028\u2029' + return self.reader.peek(1) in u'\0 \t\r\n\x85\u2028\u2029' def check_plain(self): + # type: () -> Any # A plain scalar may start with any non-space character except: # '-', '?', ':', ',', '[', ']', '{', '}', # '#', '&', '*', '!', '|', '>', '\'', '\"', @@ -753,14 +760,15 @@ class Scanner(object): # Note that we limit the last rule to the block context (except the # '-' character) because we want the flow context to be space # independent. - ch = self.peek() + ch = self.reader.peek() return ch not in u'\0 \t\r\n\x85\u2028\u2029-?:,[]{}#&*!|>\'\"%@`' or \ - (self.peek(1) not in u'\0 \t\r\n\x85\u2028\u2029' and + (self.reader.peek(1) not in u'\0 \t\r\n\x85\u2028\u2029' and (ch == u'-' or (not self.flow_level and ch in u'?:'))) # Scanners. def scan_to_next_token(self): + # type: () -> Any # We ignore spaces, line breaks and comments. # If we find a line break in the block context, we set the flag # `allow_simple_key` on. @@ -780,145 +788,155 @@ class Scanner(object): # `unwind_indent` before issuing BLOCK-END. # Scanners for block, flow, and plain scalars need to be modified. - if self.index == 0 and self.peek() == u'\uFEFF': - self.forward() + if self.reader.index == 0 and self.reader.peek() == u'\uFEFF': + self.reader.forward() found = False while not found: - while self.peek() == u' ': - self.forward() - if self.peek() == u'#': - while self.peek() not in u'\0\r\n\x85\u2028\u2029': - self.forward() + while self.reader.peek() == u' ': + self.reader.forward() + if self.reader.peek() == u'#': + while self.reader.peek() not in u'\0\r\n\x85\u2028\u2029': + self.reader.forward() if self.scan_line_break(): if not self.flow_level: self.allow_simple_key = True else: found = True + return None def scan_directive(self): + # type: () -> Any # See the specification for details. - start_mark = self.get_mark() - self.forward() + start_mark = self.reader.get_mark() + self.reader.forward() name = self.scan_directive_name(start_mark) value = None if name == u'YAML': value = self.scan_yaml_directive_value(start_mark) - end_mark = self.get_mark() + end_mark = self.reader.get_mark() elif name == u'TAG': value = self.scan_tag_directive_value(start_mark) - end_mark = self.get_mark() + end_mark = self.reader.get_mark() else: - end_mark = self.get_mark() - while self.peek() not in u'\0\r\n\x85\u2028\u2029': - self.forward() + end_mark = self.reader.get_mark() + while self.reader.peek() not in u'\0\r\n\x85\u2028\u2029': + self.reader.forward() self.scan_directive_ignored_line(start_mark) return DirectiveToken(name, value, start_mark, end_mark) def scan_directive_name(self, start_mark): + # type: (Any) -> Any # See the specification for details. length = 0 - ch = self.peek(length) + ch = self.reader.peek(length) while u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' or u'a' <= ch <= u'z' \ or ch in u'-_:.': length += 1 - ch = self.peek(length) + ch = self.reader.peek(length) if not length: raise ScannerError( "while scanning a directive", start_mark, "expected alphabetic or numeric character, but found %r" - % utf8(ch), self.get_mark()) - value = self.prefix(length) - self.forward(length) - ch = self.peek() + % utf8(ch), self.reader.get_mark()) + value = self.reader.prefix(length) + self.reader.forward(length) + ch = self.reader.peek() if ch not in u'\0 \r\n\x85\u2028\u2029': raise ScannerError( "while scanning a directive", start_mark, "expected alphabetic or numeric character, but found %r" - % utf8(ch), self.get_mark()) + % utf8(ch), self.reader.get_mark()) return value def scan_yaml_directive_value(self, start_mark): + # type: (Any) -> Any # See the specification for details. - while self.peek() == u' ': - self.forward() + while self.reader.peek() == u' ': + self.reader.forward() major = self.scan_yaml_directive_number(start_mark) - if self.peek() != '.': + if self.reader.peek() != '.': raise ScannerError( "while scanning a directive", start_mark, "expected a digit or '.', but found %r" - % utf8(self.peek()), - self.get_mark()) - self.forward() + % utf8(self.reader.peek()), + self.reader.get_mark()) + self.reader.forward() minor = self.scan_yaml_directive_number(start_mark) - if self.peek() not in u'\0 \r\n\x85\u2028\u2029': + if self.reader.peek() not in u'\0 \r\n\x85\u2028\u2029': raise ScannerError( "while scanning a directive", start_mark, "expected a digit or ' ', but found %r" - % utf8(self.peek()), - self.get_mark()) + % utf8(self.reader.peek()), + self.reader.get_mark()) return (major, minor) def scan_yaml_directive_number(self, start_mark): + # type: (Any) -> Any # See the specification for details. - ch = self.peek() + ch = self.reader.peek() if not (u'0' <= ch <= u'9'): raise ScannerError( "while scanning a directive", start_mark, "expected a digit, but found %r" % utf8(ch), - self.get_mark()) + self.reader.get_mark()) length = 0 - while u'0' <= self.peek(length) <= u'9': + while u'0' <= self.reader.peek(length) <= u'9': length += 1 - value = int(self.prefix(length)) - self.forward(length) + value = int(self.reader.prefix(length)) + self.reader.forward(length) return value def scan_tag_directive_value(self, start_mark): + # type: (Any) -> Any # See the specification for details. - while self.peek() == u' ': - self.forward() + while self.reader.peek() == u' ': + self.reader.forward() handle = self.scan_tag_directive_handle(start_mark) - while self.peek() == u' ': - self.forward() + while self.reader.peek() == u' ': + self.reader.forward() prefix = self.scan_tag_directive_prefix(start_mark) return (handle, prefix) def scan_tag_directive_handle(self, start_mark): + # type: (Any) -> Any # See the specification for details. value = self.scan_tag_handle('directive', start_mark) - ch = self.peek() + ch = self.reader.peek() if ch != u' ': raise ScannerError("while scanning a directive", start_mark, "expected ' ', but found %r" % utf8(ch), - self.get_mark()) + self.reader.get_mark()) return value def scan_tag_directive_prefix(self, start_mark): + # type: (Any) -> Any # See the specification for details. value = self.scan_tag_uri('directive', start_mark) - ch = self.peek() + ch = self.reader.peek() if ch not in u'\0 \r\n\x85\u2028\u2029': raise ScannerError("while scanning a directive", start_mark, "expected ' ', but found %r" % utf8(ch), - self.get_mark()) + self.reader.get_mark()) return value def scan_directive_ignored_line(self, start_mark): + # type: (Any) -> None # See the specification for details. - while self.peek() == u' ': - self.forward() - if self.peek() == u'#': - while self.peek() not in u'\0\r\n\x85\u2028\u2029': - self.forward() - ch = self.peek() + while self.reader.peek() == u' ': + self.reader.forward() + if self.reader.peek() == u'#': + while self.reader.peek() not in u'\0\r\n\x85\u2028\u2029': + self.reader.forward() + ch = self.reader.peek() if ch not in u'\0\r\n\x85\u2028\u2029': raise ScannerError( "while scanning a directive", start_mark, "expected a comment or a line break, but found %r" - % utf8(ch), self.get_mark()) + % utf8(ch), self.reader.get_mark()) self.scan_line_break() def scan_anchor(self, TokenClass): + # type: (Any) -> Any # The specification does not restrict characters for anchors and # aliases. This may lead to problems, for instance, the document: # [ *alias, value ] @@ -927,56 +945,57 @@ class Scanner(object): # and # [ *alias , "value" ] # Therefore we restrict aliases to numbers and ASCII letters. - start_mark = self.get_mark() - indicator = self.peek() + start_mark = self.reader.get_mark() + indicator = self.reader.peek() if indicator == u'*': name = 'alias' else: name = 'anchor' - self.forward() + self.reader.forward() length = 0 - ch = self.peek(length) + ch = self.reader.peek(length) # while u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' or u'a' <= ch <= u'z' \ # or ch in u'-_': while check_anchorname_char(ch): length += 1 - ch = self.peek(length) + ch = self.reader.peek(length) if not length: raise ScannerError( "while scanning an %s" % name, start_mark, "expected alphabetic or numeric character, but found %r" - % utf8(ch), self.get_mark()) - value = self.prefix(length) - self.forward(length) + % utf8(ch), self.reader.get_mark()) + value = self.reader.prefix(length) + self.reader.forward(length) # ch1 = ch - # ch = self.peek() # no need to peek, ch is already set + # ch = self.reader.peek() # no need to peek, ch is already set # assert ch1 == ch if ch not in u'\0 \t\r\n\x85\u2028\u2029?:,[]{}%@`': raise ScannerError( "while scanning an %s" % name, start_mark, "expected alphabetic or numeric character, but found %r" - % utf8(ch), self.get_mark()) - end_mark = self.get_mark() + % utf8(ch), self.reader.get_mark()) + end_mark = self.reader.get_mark() return TokenClass(value, start_mark, end_mark) def scan_tag(self): + # type: () -> Any # See the specification for details. - start_mark = self.get_mark() - ch = self.peek(1) + start_mark = self.reader.get_mark() + ch = self.reader.peek(1) if ch == u'<': handle = None - self.forward(2) + self.reader.forward(2) suffix = self.scan_tag_uri('tag', start_mark) - if self.peek() != u'>': + if self.reader.peek() != u'>': raise ScannerError( "while parsing a tag", start_mark, - "expected '>', but found %r" % utf8(self.peek()), - self.get_mark()) - self.forward() + "expected '>', but found %r" % utf8(self.reader.peek()), + self.reader.get_mark()) + self.reader.forward() elif ch in u'\0 \t\r\n\x85\u2028\u2029': handle = None suffix = u'!' - self.forward() + self.reader.forward() else: length = 1 use_handle = False @@ -985,36 +1004,36 @@ class Scanner(object): use_handle = True break length += 1 - ch = self.peek(length) + ch = self.reader.peek(length) handle = u'!' if use_handle: handle = self.scan_tag_handle('tag', start_mark) else: handle = u'!' - self.forward() + self.reader.forward() suffix = self.scan_tag_uri('tag', start_mark) - ch = self.peek() + ch = self.reader.peek() if ch not in u'\0 \r\n\x85\u2028\u2029': raise ScannerError("while scanning a tag", start_mark, "expected ' ', but found %r" % utf8(ch), - self.get_mark()) + self.reader.get_mark()) value = (handle, suffix) - end_mark = self.get_mark() + end_mark = self.reader.get_mark() return TagToken(value, start_mark, end_mark) def scan_block_scalar(self, style): + # type: (Any) -> Any # See the specification for details. - if style == '>': folded = True else: folded = False - chunks = [] - start_mark = self.get_mark() + chunks = [] # type: List[Any] + start_mark = self.reader.get_mark() # Scan the header. - self.forward() + self.reader.forward() chomping, increment = self.scan_block_scalar_indicators(start_mark) self.scan_block_scalar_ignored_line(start_mark) @@ -1031,24 +1050,24 @@ class Scanner(object): line_break = u'' # Scan the inner part of the block scalar. - while self.column == indent and self.peek() != u'\0': + while self.reader.column == indent and self.reader.peek() != u'\0': chunks.extend(breaks) - leading_non_space = self.peek() not in u' \t' + leading_non_space = self.reader.peek() not in u' \t' length = 0 - while self.peek(length) not in u'\0\r\n\x85\u2028\u2029': + while self.reader.peek(length) not in u'\0\r\n\x85\u2028\u2029': length += 1 - chunks.append(self.prefix(length)) - self.forward(length) + chunks.append(self.reader.prefix(length)) + self.reader.forward(length) line_break = self.scan_line_break() breaks, end_mark = self.scan_block_scalar_breaks(indent) - if self.column == indent and self.peek() != u'\0': + if self.reader.column == indent and self.reader.peek() != u'\0': # Unfortunately, folding rules are ambiguous. # # This is the folding according to the specification: if folded and line_break == u'\n' \ - and leading_non_space and self.peek() not in u' \t': + and leading_non_space and self.reader.peek() not in u' \t': if not breaks: chunks.append(u' ') else: @@ -1059,7 +1078,7 @@ class Scanner(object): # # if folded and line_break == u'\n': # if not breaks: - # if self.peek() not in ' \t': + # if self.reader.peek() not in ' \t': # chunks.append(u' ') # else: # chunks.append(line_break) @@ -1070,7 +1089,7 @@ class Scanner(object): # Process trailing line breaks. The 'chomping' setting determines # whether they are included in the value. - trailing = [] + trailing = [] # type: List[Any] if chomping in [None, True]: chunks.append(line_break) if chomping is True: @@ -1090,32 +1109,33 @@ class Scanner(object): # Keep track of the trailing whitespace and following comments # as a comment token, if isn't all included in the actual value. - comment_end_mark = self.get_mark() + comment_end_mark = self.reader.get_mark() comment = CommentToken(''.join(trailing), end_mark, comment_end_mark) token.add_post_comment(comment) return token def scan_block_scalar_indicators(self, start_mark): + # type: (Any) -> Any # See the specification for details. chomping = None increment = None - ch = self.peek() + ch = self.reader.peek() if ch in u'+-': if ch == '+': chomping = True else: chomping = False - self.forward() - ch = self.peek() + self.reader.forward() + ch = self.reader.peek() if ch in u'0123456789': increment = int(ch) if increment == 0: raise ScannerError( "while scanning a block scalar", start_mark, "expected indentation indicator in the range 1-9, " - "but found 0", self.get_mark()) - self.forward() + "but found 0", self.reader.get_mark()) + self.reader.forward() elif ch in u'0123456789': increment = int(ch) if increment == 0: @@ -1123,67 +1143,71 @@ class Scanner(object): "while scanning a block scalar", start_mark, "expected indentation indicator in the range 1-9, " "but found 0", - self.get_mark()) - self.forward() - ch = self.peek() + self.reader.get_mark()) + self.reader.forward() + ch = self.reader.peek() if ch in u'+-': if ch == '+': chomping = True else: chomping = False - self.forward() - ch = self.peek() + self.reader.forward() + ch = self.reader.peek() if ch not in u'\0 \r\n\x85\u2028\u2029': raise ScannerError( "while scanning a block scalar", start_mark, "expected chomping or indentation indicators, but found %r" - % utf8(ch), self.get_mark()) + % utf8(ch), self.reader.get_mark()) return chomping, increment def scan_block_scalar_ignored_line(self, start_mark): + # type: (Any) -> Any # See the specification for details. - while self.peek() == u' ': - self.forward() - if self.peek() == u'#': - while self.peek() not in u'\0\r\n\x85\u2028\u2029': - self.forward() - ch = self.peek() + while self.reader.peek() == u' ': + self.reader.forward() + if self.reader.peek() == u'#': + while self.reader.peek() not in u'\0\r\n\x85\u2028\u2029': + self.reader.forward() + ch = self.reader.peek() if ch not in u'\0\r\n\x85\u2028\u2029': raise ScannerError( "while scanning a block scalar", start_mark, "expected a comment or a line break, but found %r" - % utf8(ch), self.get_mark()) + % utf8(ch), self.reader.get_mark()) self.scan_line_break() def scan_block_scalar_indentation(self): + # type: () -> Any # See the specification for details. chunks = [] max_indent = 0 - end_mark = self.get_mark() - while self.peek() in u' \r\n\x85\u2028\u2029': - if self.peek() != u' ': + end_mark = self.reader.get_mark() + while self.reader.peek() in u' \r\n\x85\u2028\u2029': + if self.reader.peek() != u' ': chunks.append(self.scan_line_break()) - end_mark = self.get_mark() + end_mark = self.reader.get_mark() else: - self.forward() - if self.column > max_indent: - max_indent = self.column + self.reader.forward() + if self.reader.column > max_indent: + max_indent = self.reader.column return chunks, max_indent, end_mark def scan_block_scalar_breaks(self, indent): + # type: (int) -> Any # See the specification for details. chunks = [] - end_mark = self.get_mark() - while self.column < indent and self.peek() == u' ': - self.forward() - while self.peek() in u'\r\n\x85\u2028\u2029': + end_mark = self.reader.get_mark() + while self.reader.column < indent and self.reader.peek() == u' ': + self.reader.forward() + while self.reader.peek() in u'\r\n\x85\u2028\u2029': chunks.append(self.scan_line_break()) - end_mark = self.get_mark() - while self.column < indent and self.peek() == u' ': - self.forward() + end_mark = self.reader.get_mark() + while self.reader.column < indent and self.reader.peek() == u' ': + self.reader.forward() return chunks, end_mark def scan_flow_scalar(self, style): + # type: (Any) -> Any # See the specification for details. # Note that we loose indentation rules for quoted scalars. Quoted # scalars don't need to adhere indentation because " and ' clearly @@ -1194,16 +1218,16 @@ class Scanner(object): double = True else: double = False - chunks = [] - start_mark = self.get_mark() - quote = self.peek() - self.forward() + chunks = [] # type: List[Any] + start_mark = self.reader.get_mark() + quote = self.reader.peek() + self.reader.forward() chunks.extend(self.scan_flow_scalar_non_spaces(double, start_mark)) - while self.peek() != quote: + while self.reader.peek() != quote: chunks.extend(self.scan_flow_scalar_spaces(double, start_mark)) chunks.extend(self.scan_flow_scalar_non_spaces(double, start_mark)) - self.forward() - end_mark = self.get_mark() + self.reader.forward() + end_mark = self.reader.get_mark() return ScalarToken(u''.join(chunks), False, start_mark, end_mark, style) @@ -1235,42 +1259,43 @@ class Scanner(object): } def scan_flow_scalar_non_spaces(self, double, start_mark): + # type: (Any, Any) -> Any # See the specification for details. - chunks = [] + chunks = [] # type: List[Any] while True: length = 0 - while self.peek(length) not in u'\'\"\\\0 \t\r\n\x85\u2028\u2029': + while self.reader.peek(length) not in u'\'\"\\\0 \t\r\n\x85\u2028\u2029': length += 1 - if length: - chunks.append(self.prefix(length)) - self.forward(length) - ch = self.peek() - if not double and ch == u'\'' and self.peek(1) == u'\'': + if length != 0: + chunks.append(self.reader.prefix(length)) + self.reader.forward(length) + ch = self.reader.peek() + if not double and ch == u'\'' and self.reader.peek(1) == u'\'': chunks.append(u'\'') - self.forward(2) + self.reader.forward(2) elif (double and ch == u'\'') or (not double and ch in u'\"\\'): chunks.append(ch) - self.forward() + self.reader.forward() elif double and ch == u'\\': - self.forward() - ch = self.peek() + self.reader.forward() + ch = self.reader.peek() if ch in self.ESCAPE_REPLACEMENTS: chunks.append(self.ESCAPE_REPLACEMENTS[ch]) - self.forward() + self.reader.forward() elif ch in self.ESCAPE_CODES: length = self.ESCAPE_CODES[ch] - self.forward() + self.reader.forward() for k in range(length): - if self.peek(k) not in u'0123456789ABCDEFabcdef': + if self.reader.peek(k) not in u'0123456789ABCDEFabcdef': raise ScannerError( "while scanning a double-quoted scalar", start_mark, "expected escape sequence of %d hexdecimal " "numbers, but found %r" % - (length, utf8(self.peek(k))), self.get_mark()) - code = int(self.prefix(length), 16) + (length, utf8(self.reader.peek(k))), self.reader.get_mark()) + code = int(self.reader.prefix(length), 16) chunks.append(unichr(code)) - self.forward(length) + self.reader.forward(length) elif ch in u'\r\n\x85\u2028\u2029': self.scan_line_break() chunks.extend(self.scan_flow_scalar_breaks( @@ -1279,23 +1304,24 @@ class Scanner(object): raise ScannerError( "while scanning a double-quoted scalar", start_mark, "found unknown escape character %r" % utf8(ch), - self.get_mark()) + self.reader.get_mark()) else: return chunks def scan_flow_scalar_spaces(self, double, start_mark): + # type: (Any, Any) -> Any # See the specification for details. chunks = [] length = 0 - while self.peek(length) in u' \t': + while self.reader.peek(length) in u' \t': length += 1 - whitespaces = self.prefix(length) - self.forward(length) - ch = self.peek() + whitespaces = self.reader.prefix(length) + self.reader.forward(length) + ch = self.reader.peek() if ch == u'\0': raise ScannerError( "while scanning a quoted scalar", start_mark, - "found unexpected end of stream", self.get_mark()) + "found unexpected end of stream", self.reader.get_mark()) elif ch in u'\r\n\x85\u2028\u2029': line_break = self.scan_line_break() breaks = self.scan_flow_scalar_breaks(double, start_mark) @@ -1309,62 +1335,64 @@ class Scanner(object): return chunks def scan_flow_scalar_breaks(self, double, start_mark): + # type: (Any, Any) -> Any # See the specification for details. - chunks = [] + chunks = [] # type: List[Any] while True: # Instead of checking indentation, we check for document # separators. - prefix = self.prefix(3) + prefix = self.reader.prefix(3) if (prefix == u'---' or prefix == u'...') \ - and self.peek(3) in u'\0 \t\r\n\x85\u2028\u2029': + and self.reader.peek(3) in u'\0 \t\r\n\x85\u2028\u2029': raise ScannerError("while scanning a quoted scalar", start_mark, "found unexpected document separator", - self.get_mark()) - while self.peek() in u' \t': - self.forward() - if self.peek() in u'\r\n\x85\u2028\u2029': + self.reader.get_mark()) + while self.reader.peek() in u' \t': + self.reader.forward() + if self.reader.peek() in u'\r\n\x85\u2028\u2029': chunks.append(self.scan_line_break()) else: return chunks def scan_plain(self): + # type: () -> Any # See the specification for details. # We add an additional restriction for the flow context: # plain scalars in the flow context cannot contain ',', ': ' and '?'. # We also keep track of the `allow_simple_key` flag here. # Indentation rules are loosed for the flow context. - chunks = [] - start_mark = self.get_mark() + chunks = [] # type: List[Any] + start_mark = self.reader.get_mark() end_mark = start_mark indent = self.indent+1 # We allow zero indentation for scalars, but then we need to check for # document separators at the beginning of the line. # if indent == 0: # indent = 1 - spaces = [] + spaces = [] # type: List[Any] while True: length = 0 - if self.peek() == u'#': + if self.reader.peek() == u'#': break while True: - ch = self.peek(length) + ch = self.reader.peek(length) if (ch == u':' and - self.peek(length+1) not in u'\0 \t\r\n\x85\u2028\u2029'): + self.reader.peek(length+1) not in u'\0 \t\r\n\x85\u2028\u2029'): pass elif (ch in u'\0 \t\r\n\x85\u2028\u2029' or (not self.flow_level and ch == u':' and - self.peek(length+1) in u'\0 \t\r\n\x85\u2028\u2029') or + self.reader.peek(length+1) in u'\0 \t\r\n\x85\u2028\u2029') or (self.flow_level and ch in u',:?[]{}')): break length += 1 # It's not clear what we should do with ':' in the flow context. if (self.flow_level and ch == u':' and - self.peek(length+1) not in u'\0 \t\r\n\x85\u2028\u2029,[]{}'): - self.forward(length) + self.reader.peek(length+1) not in u'\0 \t\r\n\x85\u2028\u2029,[]{}'): + self.reader.forward(length) raise ScannerError( "while scanning a plain scalar", start_mark, - "found unexpected ':'", self.get_mark(), + "found unexpected ':'", self.reader.get_mark(), "Please check " "http://pyyaml.org/wiki/YAMLColonInFlowContext " "for details.") @@ -1372,12 +1400,12 @@ class Scanner(object): break self.allow_simple_key = False chunks.extend(spaces) - chunks.append(self.prefix(length)) - self.forward(length) - end_mark = self.get_mark() + chunks.append(self.reader.prefix(length)) + self.reader.forward(length) + end_mark = self.reader.get_mark() spaces = self.scan_plain_spaces(indent, start_mark) - if not spaces or self.peek() == u'#' \ - or (not self.flow_level and self.column < indent): + if not spaces or self.reader.peek() == u'#' \ + or (not self.flow_level and self.reader.column < indent): break token = ScalarToken(u''.join(chunks), True, start_mark, end_mark) @@ -1388,32 +1416,33 @@ class Scanner(object): return token def scan_plain_spaces(self, indent, start_mark): + # type: (Any, Any) -> Any # See the specification for details. # The specification is really confusing about tabs in plain scalars. # We just forbid them completely. Do not use tabs in YAML! chunks = [] length = 0 - while self.peek(length) in u' ': + while self.reader.peek(length) in u' ': length += 1 - whitespaces = self.prefix(length) - self.forward(length) - ch = self.peek() + whitespaces = self.reader.prefix(length) + self.reader.forward(length) + ch = self.reader.peek() if ch in u'\r\n\x85\u2028\u2029': line_break = self.scan_line_break() self.allow_simple_key = True - prefix = self.prefix(3) + prefix = self.reader.prefix(3) if (prefix == u'---' or prefix == u'...') \ - and self.peek(3) in u'\0 \t\r\n\x85\u2028\u2029': + and self.reader.peek(3) in u'\0 \t\r\n\x85\u2028\u2029': return breaks = [] - while self.peek() in u' \r\n\x85\u2028\u2029': - if self.peek() == ' ': - self.forward() + while self.reader.peek() in u' \r\n\x85\u2028\u2029': + if self.reader.peek() == ' ': + self.reader.forward() else: breaks.append(self.scan_line_break()) - prefix = self.prefix(3) + prefix = self.reader.prefix(3) if (prefix == u'---' or prefix == u'...') \ - and self.peek(3) in u'\0 \t\r\n\x85\u2028\u2029': + and self.reader.peek(3) in u'\0 \t\r\n\x85\u2028\u2029': return if line_break != u'\n': chunks.append(line_break) @@ -1425,87 +1454,91 @@ class Scanner(object): return chunks def scan_tag_handle(self, name, start_mark): + # type: (Any, Any) -> Any # See the specification for details. # For some strange reasons, the specification does not allow '_' in # tag handles. I have allowed it anyway. - ch = self.peek() + ch = self.reader.peek() if ch != u'!': raise ScannerError("while scanning a %s" % name, start_mark, "expected '!', but found %r" % utf8(ch), - self.get_mark()) + self.reader.get_mark()) length = 1 - ch = self.peek(length) + ch = self.reader.peek(length) if ch != u' ': while u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' \ or u'a' <= ch <= u'z' \ or ch in u'-_': length += 1 - ch = self.peek(length) + ch = self.reader.peek(length) if ch != u'!': - self.forward(length) + self.reader.forward(length) raise ScannerError("while scanning a %s" % name, start_mark, "expected '!', but found %r" % utf8(ch), - self.get_mark()) + self.reader.get_mark()) length += 1 - value = self.prefix(length) - self.forward(length) + value = self.reader.prefix(length) + self.reader.forward(length) return value def scan_tag_uri(self, name, start_mark): + # type: (Any, Any) -> Any # See the specification for details. # Note: we do not check if URI is well-formed. chunks = [] length = 0 - ch = self.peek(length) + ch = self.reader.peek(length) while u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' or u'a' <= ch <= u'z' \ or ch in u'-;/?:@&=+$,_.!~*\'()[]%': if ch == u'%': - chunks.append(self.prefix(length)) - self.forward(length) + chunks.append(self.reader.prefix(length)) + self.reader.forward(length) length = 0 chunks.append(self.scan_uri_escapes(name, start_mark)) else: length += 1 - ch = self.peek(length) - if length: - chunks.append(self.prefix(length)) - self.forward(length) + ch = self.reader.peek(length) + if length != 0: + chunks.append(self.reader.prefix(length)) + self.reader.forward(length) length = 0 if not chunks: raise ScannerError("while parsing a %s" % name, start_mark, "expected URI, but found %r" % utf8(ch), - self.get_mark()) + self.reader.get_mark()) return u''.join(chunks) def scan_uri_escapes(self, name, start_mark): + # type: (Any, Any) -> Any # See the specification for details. - code_bytes = [] - mark = self.get_mark() - while self.peek() == u'%': - self.forward() + code_bytes = [] # type: List[Any] + mark = self.reader.get_mark() + while self.reader.peek() == u'%': + self.reader.forward() for k in range(2): - if self.peek(k) not in u'0123456789ABCDEFabcdef': + if self.reader.peek(k) not in u'0123456789ABCDEFabcdef': raise ScannerError( "while scanning a %s" % name, start_mark, "expected URI escape sequence of 2 hexdecimal numbers," " but found %r" - % utf8(self.peek(k)), self.get_mark()) + % utf8(self.reader.peek(k)), self.reader.get_mark()) if PY3: - code_bytes.append(int(self.prefix(2), 16)) + code_bytes.append(int(self.reader.prefix(2), 16)) else: - code_bytes.append(chr(int(self.prefix(2), 16))) - self.forward(2) + code_bytes.append(chr(int(self.reader.prefix(2), 16))) + self.reader.forward(2) try: if PY3: value = bytes(code_bytes).decode('utf-8') else: - value = unicode(''.join(code_bytes), 'utf-8') + value = unicode(''.join(code_bytes), 'utf-8') # type: ignore except UnicodeDecodeError as exc: raise ScannerError("while scanning a %s" % name, start_mark, str(exc), mark) return value def scan_line_break(self): + # type: () -> Any # Transforms: # '\r\n' : '\n' # '\r' : '\n' @@ -1514,26 +1547,27 @@ class Scanner(object): # '\u2028' : '\u2028' # '\u2029 : '\u2029' # default : '' - ch = self.peek() + ch = self.reader.peek() if ch in u'\r\n\x85': - if self.prefix(2) == u'\r\n': - self.forward(2) + if self.reader.prefix(2) == u'\r\n': + self.reader.forward(2) else: - self.forward() + self.reader.forward() return u'\n' elif ch in u'\u2028\u2029': - self.forward() + self.reader.forward() return ch return u'' class RoundTripScanner(Scanner): def check_token(self, *choices): + # type: (Any) -> bool # Check if the next token is one of the given types. while self.need_more_tokens(): self.fetch_more_tokens() self._gather_comments() - if self.tokens: + if bool(self.tokens): if not choices: return True for choice in choices: @@ -1542,16 +1576,19 @@ class RoundTripScanner(Scanner): return False def peek_token(self): + # type: () -> Any # Return the next token, but do not delete if from the queue. while self.need_more_tokens(): self.fetch_more_tokens() self._gather_comments() - if self.tokens: + if bool(self.tokens): return self.tokens[0] + return None def _gather_comments(self): + # type: () -> Any """combine multiple comment lines""" - comments = [] + comments = [] # type: List[Any] if not self.tokens: return comments if isinstance(self.tokens[0], CommentToken): @@ -1578,11 +1615,12 @@ class RoundTripScanner(Scanner): self.fetch_more_tokens() def get_token(self): + # type: () -> Any # Return the next token. while self.need_more_tokens(): self.fetch_more_tokens() self._gather_comments() - if self.tokens: + if bool(self.tokens): # only add post comment to single line tokens: # scalar, value token. FlowXEndToken, otherwise # hidden streamtokens could get them (leave them and they will be @@ -1600,8 +1638,10 @@ class RoundTripScanner(Scanner): self.tokens[0].add_post_comment(self.tokens.pop(1)) self.tokens_taken += 1 return self.tokens.pop(0) + return None def fetch_comment(self, comment): + # type: (Any) -> None value, start_mark, end_mark = comment while value and value[-1] == u' ': # empty line within indented key context @@ -1612,6 +1652,7 @@ class RoundTripScanner(Scanner): # scanner def scan_to_next_token(self): + # type: () -> Any # We ignore spaces, line breaks and comments. # If we find a line break in the block context, we set the flag # `allow_simple_key` on. @@ -1631,51 +1672,54 @@ class RoundTripScanner(Scanner): # `unwind_indent` before issuing BLOCK-END. # Scanners for block, flow, and plain scalars need to be modified. - if self.index == 0 and self.peek() == u'\uFEFF': - self.forward() + if self.reader.index == 0 and self.reader.peek() == u'\uFEFF': + self.reader.forward() found = False while not found: - while self.peek() == u' ': - self.forward() - ch = self.peek() + while self.reader.peek() == u' ': + self.reader.forward() + ch = self.reader.peek() if ch == u'#': - start_mark = self.get_mark() + start_mark = self.reader.get_mark() comment = ch - self.forward() + self.reader.forward() while ch not in u'\0\r\n\x85\u2028\u2029': - ch = self.peek() + ch = self.reader.peek() if ch == u'\0': # don't gobble the end-of-stream character break comment += ch - self.forward() + self.reader.forward() # gather any blank lines following the comment too ch = self.scan_line_break() while len(ch) > 0: comment += ch ch = self.scan_line_break() - end_mark = self.get_mark() + end_mark = self.reader.get_mark() if not self.flow_level: self.allow_simple_key = True return comment, start_mark, end_mark - if self.scan_line_break(): - start_mark = self.get_mark() + if bool(self.scan_line_break()): + start_mark = self.reader.get_mark() if not self.flow_level: self.allow_simple_key = True - ch = self.peek() + ch = self.reader.peek() if ch == '\n': # empty toplevel lines - start_mark = self.get_mark() + start_mark = self.reader.get_mark() comment = '' while ch: ch = self.scan_line_break(empty_line=True) comment += ch - if self.peek() == '#': # empty line followed by indented real comment + if self.reader.peek() == '#': + # empty line followed by indented real comment comment = comment.rsplit('\n', 1)[0] + '\n' - end_mark = self.get_mark() + end_mark = self.reader.get_mark() return comment, start_mark, end_mark else: found = True + return None def scan_line_break(self, empty_line=False): + # type: (bool) -> Text # Transforms: # '\r\n' : '\n' # '\r' : '\n' @@ -1684,18 +1728,18 @@ class RoundTripScanner(Scanner): # '\u2028' : '\u2028' # '\u2029 : '\u2029' # default : '' - ch = self.peek() + ch = self.reader.peek() # type: Text if ch in u'\r\n\x85': - if self.prefix(2) == u'\r\n': - self.forward(2) + if self.reader.prefix(2) == u'\r\n': + self.reader.forward(2) else: - self.forward() + self.reader.forward() return u'\n' elif ch in u'\u2028\u2029': - self.forward() + self.reader.forward() return ch elif empty_line and ch in '\t ': - self.forward() + self.reader.forward() return ch return u'' |