summaryrefslogtreecommitdiff
path: root/scanner.py
diff options
context:
space:
mode:
Diffstat (limited to 'scanner.py')
-rw-r--r--scanner.py778
1 files changed, 411 insertions, 367 deletions
diff --git a/scanner.py b/scanner.py
index 51f6cb9..68b043c 100644
--- a/scanner.py
+++ b/scanner.py
@@ -29,6 +29,9 @@ from __future__ import print_function, absolute_import, division, unicode_litera
#
# Read comments in the Scanner code for more details.
#
+
+from typing import Any, Dict, Optional, List, Union, Text # NOQA
+
from ruamel.yaml.error import MarkedYAMLError
from ruamel.yaml.tokens import * # NOQA
from ruamel.yaml.compat import utf8, unichr, PY3, check_anchorname_char
@@ -44,6 +47,7 @@ class SimpleKey(object):
# See below simple keys treatment.
def __init__(self, token_number, required, index, line, column, mark):
+ # type: (Any, Any, int, int, int, Any) -> None
self.token_number = token_number
self.required = required
self.index = index
@@ -54,7 +58,8 @@ class SimpleKey(object):
class Scanner(object):
- def __init__(self):
+ def __init__(self, loader=None):
+ # type: (Any) -> None
"""Initialize the scanner."""
# It is assumed that Scanner and Reader will have a common descendant.
# Reader do the dirty work of checking for BOM and converting the
@@ -65,6 +70,10 @@ class Scanner(object):
# self.prefix(l=1) # peek the next l characters
# self.forward(l=1) # read the next l characters and move the pointer
+ self.loader = loader
+ if self.loader is not None:
+ self.loader._scanner = self
+
# Had we reached the end of the stream?
self.done = False
@@ -73,7 +82,7 @@ class Scanner(object):
self.flow_level = 0
# List of processed tokens that are not yet emitted.
- self.tokens = []
+ self.tokens = [] # type: List[Any]
# Add the STREAM-START token.
self.fetch_stream_start()
@@ -85,7 +94,7 @@ class Scanner(object):
self.indent = -1
# Past indentation levels.
- self.indents = []
+ self.indents = [] # type: List[int]
# Variables related to simple keys treatment.
@@ -115,15 +124,21 @@ class Scanner(object):
# (token_number, required, index, line, column, mark)
# A simple key may start with ALIAS, ANCHOR, TAG, SCALAR(flow),
# '[', or '{' tokens.
- self.possible_simple_keys = {}
+ self.possible_simple_keys = {} # type: Dict[Any, Any]
+
+ @property
+ def reader(self):
+ # type: () -> Any
+ return self.loader._reader
# Public methods.
def check_token(self, *choices):
+ # type: (Any) -> bool
# Check if the next token is one of the given types.
while self.need_more_tokens():
self.fetch_more_tokens()
- if self.tokens:
+ if bool(self.tokens):
if not choices:
return True
for choice in choices:
@@ -132,23 +147,26 @@ class Scanner(object):
return False
def peek_token(self):
+ # type: () -> Any
# Return the next token, but do not delete if from the queue.
while self.need_more_tokens():
self.fetch_more_tokens()
- if self.tokens:
+ if bool(self.tokens):
return self.tokens[0]
def get_token(self):
+ # type: () -> Any
# Return the next token.
while self.need_more_tokens():
self.fetch_more_tokens()
- if self.tokens:
+ if bool(self.tokens):
self.tokens_taken += 1
return self.tokens.pop(0)
# Private methods.
def need_more_tokens(self):
+ # type: () -> bool
if self.done:
return False
if not self.tokens:
@@ -158,24 +176,27 @@ class Scanner(object):
self.stale_possible_simple_keys()
if self.next_possible_simple_key() == self.tokens_taken:
return True
+ return False
- def fetch_more_tokens(self):
+ def fetch_comment(self, comment):
+ # type: (Any) -> None
+ raise NotImplementedError
+ def fetch_more_tokens(self):
+ # type: () -> Any
# Eat whitespaces and comments until we reach the next token.
comment = self.scan_to_next_token()
-
if comment is not None: # never happens for base scanner
return self.fetch_comment(comment)
-
# Remove obsolete possible simple keys.
self.stale_possible_simple_keys()
# Compare the current indentation and column. It may add some tokens
# and decrease the current indentation level.
- self.unwind_indent(self.column)
+ self.unwind_indent(self.reader.column)
# Peek the next character.
- ch = self.peek()
+ ch = self.reader.peek()
# Is it the end of stream?
if ch == u'\0':
@@ -266,11 +287,12 @@ class Scanner(object):
# No? It's an error. Let's produce a nice error message.
raise ScannerError("while scanning for the next token", None,
"found character %r that cannot start any token"
- % utf8(ch), self.get_mark())
+ % utf8(ch), self.reader.get_mark())
# Simple keys treatment.
def next_possible_simple_key(self):
+ # type: () -> Any
# Return the number of the nearest possible simple key. Actually we
# don't need to loop through the whole dictionary. We may replace it
# with the following code:
@@ -286,6 +308,7 @@ class Scanner(object):
return min_token_number
def stale_possible_simple_keys(self):
+ # type: () -> None
# Remove entries that are no longer possible simple keys. According to
# the YAML specification, simple keys
# - should be limited to a single line,
@@ -294,21 +317,22 @@ class Scanner(object):
# height (may cause problems if indentation is broken though).
for level in list(self.possible_simple_keys):
key = self.possible_simple_keys[level]
- if key.line != self.line \
- or self.index-key.index > 1024:
+ if key.line != self.reader.line \
+ or self.reader.index - key.index > 1024:
if key.required:
raise ScannerError(
"while scanning a simple key", key.mark,
- "could not find expected ':'", self.get_mark())
+ "could not find expected ':'", self.reader.get_mark())
del self.possible_simple_keys[level]
def save_possible_simple_key(self):
+ # type: () -> None
# The next token may start a simple key. We check if it's possible
# and save its position. This function is called for
# ALIAS, ANCHOR, TAG, SCALAR(flow), '[', and '{'.
# Check if a simple key is required at the current position.
- required = not self.flow_level and self.indent == self.column
+ required = not self.flow_level and self.indent == self.reader.column
# The next token might be a simple key. Let's save it's number and
# position.
@@ -317,10 +341,12 @@ class Scanner(object):
token_number = self.tokens_taken+len(self.tokens)
key = SimpleKey(
token_number, required,
- self.index, self.line, self.column, self.get_mark())
+ self.reader.index, self.reader.line, self.reader.column,
+ self.reader.get_mark())
self.possible_simple_keys[self.flow_level] = key
def remove_possible_simple_key(self):
+ # type: () -> None
# Remove the saved possible key position at the current flow level.
if self.flow_level in self.possible_simple_keys:
key = self.possible_simple_keys[self.flow_level]
@@ -328,14 +354,14 @@ class Scanner(object):
if key.required:
raise ScannerError(
"while scanning a simple key", key.mark,
- "could not find expected ':'", self.get_mark())
+ "could not find expected ':'", self.reader.get_mark())
del self.possible_simple_keys[self.flow_level]
# Indentation functions.
def unwind_indent(self, column):
-
+ # type: (Any) -> None
# In flow context, tokens should respect indentation.
# Actually the condition should be `self.indent >= column` according to
# the spec. But this condition will prohibit intuitively correct
@@ -346,20 +372,21 @@ class Scanner(object):
# if self.flow_level and self.indent > column:
# raise ScannerError(None, None,
# "invalid intendation or unclosed '[' or '{'",
- # self.get_mark())
+ # self.reader.get_mark())
# In the flow context, indentation is ignored. We make the scanner less
# restrictive then specification requires.
- if self.flow_level:
+ if bool(self.flow_level):
return
# In block context, we may need to issue the BLOCK-END tokens.
while self.indent > column:
- mark = self.get_mark()
+ mark = self.reader.get_mark()
self.indent = self.indents.pop()
self.tokens.append(BlockEndToken(mark, mark))
def add_indent(self, column):
+ # type: (int) -> bool
# Check if we need to increase indentation.
if self.indent < column:
self.indents.append(self.indent)
@@ -370,37 +397,32 @@ class Scanner(object):
# Fetchers.
def fetch_stream_start(self):
+ # type: () -> None
# We always add STREAM-START as the first token and STREAM-END as the
# last token.
-
# Read the token.
- mark = self.get_mark()
-
+ mark = self.reader.get_mark()
# Add STREAM-START.
self.tokens.append(StreamStartToken(mark, mark,
- encoding=self.encoding))
+ encoding=self.reader.encoding))
def fetch_stream_end(self):
-
+ # type: () -> None
# Set the current intendation to -1.
self.unwind_indent(-1)
-
# Reset simple keys.
self.remove_possible_simple_key()
self.allow_simple_key = False
- self.possible_simple_keys = {}
-
+ self.possible_simple_keys = {} # type: Dict[Any, Any]
# Read the token.
- mark = self.get_mark()
-
+ mark = self.reader.get_mark()
# Add STREAM-END.
self.tokens.append(StreamEndToken(mark, mark))
-
# The steam is finished.
self.done = True
def fetch_directive(self):
-
+ # type: () -> None
# Set the current intendation to -1.
self.unwind_indent(-1)
@@ -412,13 +434,15 @@ class Scanner(object):
self.tokens.append(self.scan_directive())
def fetch_document_start(self):
+ # type: () -> None
self.fetch_document_indicator(DocumentStartToken)
def fetch_document_end(self):
+ # type: () -> None
self.fetch_document_indicator(DocumentEndToken)
def fetch_document_indicator(self, TokenClass):
-
+ # type: (Any) -> None
# Set the current intendation to -1.
self.unwind_indent(-1)
@@ -428,106 +452,97 @@ class Scanner(object):
self.allow_simple_key = False
# Add DOCUMENT-START or DOCUMENT-END.
- start_mark = self.get_mark()
- self.forward(3)
- end_mark = self.get_mark()
+ start_mark = self.reader.get_mark()
+ self.reader.forward(3)
+ end_mark = self.reader.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
def fetch_flow_sequence_start(self):
+ # type: () -> None
self.fetch_flow_collection_start(FlowSequenceStartToken)
def fetch_flow_mapping_start(self):
+ # type: () -> None
self.fetch_flow_collection_start(FlowMappingStartToken)
def fetch_flow_collection_start(self, TokenClass):
-
+ # type: (Any) -> None
# '[' and '{' may start a simple key.
self.save_possible_simple_key()
-
# Increase the flow level.
self.flow_level += 1
-
# Simple keys are allowed after '[' and '{'.
self.allow_simple_key = True
-
# Add FLOW-SEQUENCE-START or FLOW-MAPPING-START.
- start_mark = self.get_mark()
- self.forward()
- end_mark = self.get_mark()
+ start_mark = self.reader.get_mark()
+ self.reader.forward()
+ end_mark = self.reader.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
def fetch_flow_sequence_end(self):
+ # type: () -> None
self.fetch_flow_collection_end(FlowSequenceEndToken)
def fetch_flow_mapping_end(self):
+ # type: () -> None
self.fetch_flow_collection_end(FlowMappingEndToken)
def fetch_flow_collection_end(self, TokenClass):
-
+ # type: (Any) -> None
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
-
# Decrease the flow level.
self.flow_level -= 1
-
# No simple keys after ']' or '}'.
self.allow_simple_key = False
-
# Add FLOW-SEQUENCE-END or FLOW-MAPPING-END.
- start_mark = self.get_mark()
- self.forward()
- end_mark = self.get_mark()
+ start_mark = self.reader.get_mark()
+ self.reader.forward()
+ end_mark = self.reader.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
def fetch_flow_entry(self):
-
+ # type: () -> None
# Simple keys are allowed after ','.
self.allow_simple_key = True
-
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
-
# Add FLOW-ENTRY.
- start_mark = self.get_mark()
- self.forward()
- end_mark = self.get_mark()
+ start_mark = self.reader.get_mark()
+ self.reader.forward()
+ end_mark = self.reader.get_mark()
self.tokens.append(FlowEntryToken(start_mark, end_mark))
def fetch_block_entry(self):
-
+ # type: () -> None
# Block context needs additional checks.
if not self.flow_level:
-
# Are we allowed to start a new entry?
if not self.allow_simple_key:
raise ScannerError(None, None,
"sequence entries are not allowed here",
- self.get_mark())
-
+ self.reader.get_mark())
# We may need to add BLOCK-SEQUENCE-START.
- if self.add_indent(self.column):
- mark = self.get_mark()
+ if self.add_indent(self.reader.column):
+ mark = self.reader.get_mark()
self.tokens.append(BlockSequenceStartToken(mark, mark))
-
# It's an error for the block entry to occur in the flow context,
# but we let the parser detect this.
else:
pass
-
# Simple keys are allowed after '-'.
self.allow_simple_key = True
-
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
# Add BLOCK-ENTRY.
- start_mark = self.get_mark()
- self.forward()
- end_mark = self.get_mark()
+ start_mark = self.reader.get_mark()
+ self.reader.forward()
+ end_mark = self.reader.get_mark()
self.tokens.append(BlockEntryToken(start_mark, end_mark))
def fetch_key(self):
-
+ # type: () -> None
# Block context needs additional checks.
if not self.flow_level:
@@ -535,11 +550,11 @@ class Scanner(object):
if not self.allow_simple_key:
raise ScannerError(None, None,
"mapping keys are not allowed here",
- self.get_mark())
+ self.reader.get_mark())
# We may need to add BLOCK-MAPPING-START.
- if self.add_indent(self.column):
- mark = self.get_mark()
+ if self.add_indent(self.reader.column):
+ mark = self.reader.get_mark()
self.tokens.append(BlockMappingStartToken(mark, mark))
# Simple keys are allowed after '?' in the block context.
@@ -549,13 +564,13 @@ class Scanner(object):
self.remove_possible_simple_key()
# Add KEY.
- start_mark = self.get_mark()
- self.forward()
- end_mark = self.get_mark()
+ start_mark = self.reader.get_mark()
+ self.reader.forward()
+ end_mark = self.reader.get_mark()
self.tokens.append(KeyToken(start_mark, end_mark))
def fetch_value(self):
-
+ # type: () -> None
# Do we determine a simple key?
if self.flow_level in self.possible_simple_keys:
# Add KEY.
@@ -588,14 +603,14 @@ class Scanner(object):
if not self.allow_simple_key:
raise ScannerError(None, None,
"mapping values are not allowed here",
- self.get_mark())
+ self.reader.get_mark())
# If this value starts a new block mapping, we need to add
# BLOCK-MAPPING-START. It will be detected as an error later by
# the parser.
if not self.flow_level:
- if self.add_indent(self.column):
- mark = self.get_mark()
+ if self.add_indent(self.reader.column):
+ mark = self.reader.get_mark()
self.tokens.append(BlockMappingStartToken(mark, mark))
# Simple keys are allowed after ':' in the block context.
@@ -605,142 +620,134 @@ class Scanner(object):
self.remove_possible_simple_key()
# Add VALUE.
- start_mark = self.get_mark()
- self.forward()
- end_mark = self.get_mark()
+ start_mark = self.reader.get_mark()
+ self.reader.forward()
+ end_mark = self.reader.get_mark()
self.tokens.append(ValueToken(start_mark, end_mark))
def fetch_alias(self):
-
+ # type: () -> None
# ALIAS could be a simple key.
self.save_possible_simple_key()
-
# No simple keys after ALIAS.
self.allow_simple_key = False
-
# Scan and add ALIAS.
self.tokens.append(self.scan_anchor(AliasToken))
def fetch_anchor(self):
-
+ # type: () -> None
# ANCHOR could start a simple key.
self.save_possible_simple_key()
-
# No simple keys after ANCHOR.
self.allow_simple_key = False
-
# Scan and add ANCHOR.
self.tokens.append(self.scan_anchor(AnchorToken))
def fetch_tag(self):
-
+ # type: () -> None
# TAG could start a simple key.
self.save_possible_simple_key()
-
# No simple keys after TAG.
self.allow_simple_key = False
-
# Scan and add TAG.
self.tokens.append(self.scan_tag())
def fetch_literal(self):
+ # type: () -> None
self.fetch_block_scalar(style='|')
def fetch_folded(self):
+ # type: () -> None
self.fetch_block_scalar(style='>')
def fetch_block_scalar(self, style):
-
+ # type: (Any) -> None
# A simple key may follow a block scalar.
self.allow_simple_key = True
-
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
-
# Scan and add SCALAR.
self.tokens.append(self.scan_block_scalar(style))
def fetch_single(self):
+ # type: () -> None
self.fetch_flow_scalar(style='\'')
def fetch_double(self):
+ # type: () -> None
self.fetch_flow_scalar(style='"')
def fetch_flow_scalar(self, style):
-
+ # type: (Any) -> None
# A flow scalar could be a simple key.
self.save_possible_simple_key()
-
# No simple keys after flow scalars.
self.allow_simple_key = False
-
# Scan and add SCALAR.
self.tokens.append(self.scan_flow_scalar(style))
def fetch_plain(self):
-
+ # type: () -> None
# A plain scalar could be a simple key.
self.save_possible_simple_key()
-
# No simple keys after plain scalars. But note that `scan_plain` will
# change this flag if the scan is finished at the beginning of the
# line.
self.allow_simple_key = False
-
# Scan and add SCALAR. May change `allow_simple_key`.
self.tokens.append(self.scan_plain())
# Checkers.
def check_directive(self):
-
+ # type: () -> Any
# DIRECTIVE: ^ '%' ...
# The '%' indicator is already checked.
- if self.column == 0:
+ if self.reader.column == 0:
return True
+ return None
def check_document_start(self):
-
+ # type: () -> Any
# DOCUMENT-START: ^ '---' (' '|'\n')
- if self.column == 0:
- if self.prefix(3) == u'---' \
- and self.peek(3) in u'\0 \t\r\n\x85\u2028\u2029':
+ if self.reader.column == 0:
+ if self.reader.prefix(3) == u'---' \
+ and self.reader.peek(3) in u'\0 \t\r\n\x85\u2028\u2029':
return True
+ return None
def check_document_end(self):
-
+ # type: () -> Any
# DOCUMENT-END: ^ '...' (' '|'\n')
- if self.column == 0:
- if self.prefix(3) == u'...' \
- and self.peek(3) in u'\0 \t\r\n\x85\u2028\u2029':
+ if self.reader.column == 0:
+ if self.reader.prefix(3) == u'...' \
+ and self.reader.peek(3) in u'\0 \t\r\n\x85\u2028\u2029':
return True
+ return None
def check_block_entry(self):
-
+ # type: () -> Any
# BLOCK-ENTRY: '-' (' '|'\n')
- return self.peek(1) in u'\0 \t\r\n\x85\u2028\u2029'
+ return self.reader.peek(1) in u'\0 \t\r\n\x85\u2028\u2029'
def check_key(self):
-
+ # type: () -> Any
# KEY(flow context): '?'
- if self.flow_level:
+ if bool(self.flow_level):
return True
-
# KEY(block context): '?' (' '|'\n')
- else:
- return self.peek(1) in u'\0 \t\r\n\x85\u2028\u2029'
+ return self.reader.peek(1) in u'\0 \t\r\n\x85\u2028\u2029'
def check_value(self):
-
+ # type: () -> Any
# VALUE(flow context): ':'
- if self.flow_level:
+ if bool(self.flow_level):
return True
-
# VALUE(block context): ':' (' '|'\n')
- else:
- return self.peek(1) in u'\0 \t\r\n\x85\u2028\u2029'
+ return self.reader.peek(1) in u'\0 \t\r\n\x85\u2028\u2029'
def check_plain(self):
+ # type: () -> Any
# A plain scalar may start with any non-space character except:
# '-', '?', ':', ',', '[', ']', '{', '}',
# '#', '&', '*', '!', '|', '>', '\'', '\"',
@@ -753,14 +760,15 @@ class Scanner(object):
# Note that we limit the last rule to the block context (except the
# '-' character) because we want the flow context to be space
# independent.
- ch = self.peek()
+ ch = self.reader.peek()
return ch not in u'\0 \t\r\n\x85\u2028\u2029-?:,[]{}#&*!|>\'\"%@`' or \
- (self.peek(1) not in u'\0 \t\r\n\x85\u2028\u2029' and
+ (self.reader.peek(1) not in u'\0 \t\r\n\x85\u2028\u2029' and
(ch == u'-' or (not self.flow_level and ch in u'?:')))
# Scanners.
def scan_to_next_token(self):
+ # type: () -> Any
# We ignore spaces, line breaks and comments.
# If we find a line break in the block context, we set the flag
# `allow_simple_key` on.
@@ -780,145 +788,155 @@ class Scanner(object):
# `unwind_indent` before issuing BLOCK-END.
# Scanners for block, flow, and plain scalars need to be modified.
- if self.index == 0 and self.peek() == u'\uFEFF':
- self.forward()
+ if self.reader.index == 0 and self.reader.peek() == u'\uFEFF':
+ self.reader.forward()
found = False
while not found:
- while self.peek() == u' ':
- self.forward()
- if self.peek() == u'#':
- while self.peek() not in u'\0\r\n\x85\u2028\u2029':
- self.forward()
+ while self.reader.peek() == u' ':
+ self.reader.forward()
+ if self.reader.peek() == u'#':
+ while self.reader.peek() not in u'\0\r\n\x85\u2028\u2029':
+ self.reader.forward()
if self.scan_line_break():
if not self.flow_level:
self.allow_simple_key = True
else:
found = True
+ return None
def scan_directive(self):
+ # type: () -> Any
# See the specification for details.
- start_mark = self.get_mark()
- self.forward()
+ start_mark = self.reader.get_mark()
+ self.reader.forward()
name = self.scan_directive_name(start_mark)
value = None
if name == u'YAML':
value = self.scan_yaml_directive_value(start_mark)
- end_mark = self.get_mark()
+ end_mark = self.reader.get_mark()
elif name == u'TAG':
value = self.scan_tag_directive_value(start_mark)
- end_mark = self.get_mark()
+ end_mark = self.reader.get_mark()
else:
- end_mark = self.get_mark()
- while self.peek() not in u'\0\r\n\x85\u2028\u2029':
- self.forward()
+ end_mark = self.reader.get_mark()
+ while self.reader.peek() not in u'\0\r\n\x85\u2028\u2029':
+ self.reader.forward()
self.scan_directive_ignored_line(start_mark)
return DirectiveToken(name, value, start_mark, end_mark)
def scan_directive_name(self, start_mark):
+ # type: (Any) -> Any
# See the specification for details.
length = 0
- ch = self.peek(length)
+ ch = self.reader.peek(length)
while u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' or u'a' <= ch <= u'z' \
or ch in u'-_:.':
length += 1
- ch = self.peek(length)
+ ch = self.reader.peek(length)
if not length:
raise ScannerError(
"while scanning a directive", start_mark,
"expected alphabetic or numeric character, but found %r"
- % utf8(ch), self.get_mark())
- value = self.prefix(length)
- self.forward(length)
- ch = self.peek()
+ % utf8(ch), self.reader.get_mark())
+ value = self.reader.prefix(length)
+ self.reader.forward(length)
+ ch = self.reader.peek()
if ch not in u'\0 \r\n\x85\u2028\u2029':
raise ScannerError(
"while scanning a directive", start_mark,
"expected alphabetic or numeric character, but found %r"
- % utf8(ch), self.get_mark())
+ % utf8(ch), self.reader.get_mark())
return value
def scan_yaml_directive_value(self, start_mark):
+ # type: (Any) -> Any
# See the specification for details.
- while self.peek() == u' ':
- self.forward()
+ while self.reader.peek() == u' ':
+ self.reader.forward()
major = self.scan_yaml_directive_number(start_mark)
- if self.peek() != '.':
+ if self.reader.peek() != '.':
raise ScannerError(
"while scanning a directive", start_mark,
"expected a digit or '.', but found %r"
- % utf8(self.peek()),
- self.get_mark())
- self.forward()
+ % utf8(self.reader.peek()),
+ self.reader.get_mark())
+ self.reader.forward()
minor = self.scan_yaml_directive_number(start_mark)
- if self.peek() not in u'\0 \r\n\x85\u2028\u2029':
+ if self.reader.peek() not in u'\0 \r\n\x85\u2028\u2029':
raise ScannerError(
"while scanning a directive", start_mark,
"expected a digit or ' ', but found %r"
- % utf8(self.peek()),
- self.get_mark())
+ % utf8(self.reader.peek()),
+ self.reader.get_mark())
return (major, minor)
def scan_yaml_directive_number(self, start_mark):
+ # type: (Any) -> Any
# See the specification for details.
- ch = self.peek()
+ ch = self.reader.peek()
if not (u'0' <= ch <= u'9'):
raise ScannerError(
"while scanning a directive", start_mark,
"expected a digit, but found %r" % utf8(ch),
- self.get_mark())
+ self.reader.get_mark())
length = 0
- while u'0' <= self.peek(length) <= u'9':
+ while u'0' <= self.reader.peek(length) <= u'9':
length += 1
- value = int(self.prefix(length))
- self.forward(length)
+ value = int(self.reader.prefix(length))
+ self.reader.forward(length)
return value
def scan_tag_directive_value(self, start_mark):
+ # type: (Any) -> Any
# See the specification for details.
- while self.peek() == u' ':
- self.forward()
+ while self.reader.peek() == u' ':
+ self.reader.forward()
handle = self.scan_tag_directive_handle(start_mark)
- while self.peek() == u' ':
- self.forward()
+ while self.reader.peek() == u' ':
+ self.reader.forward()
prefix = self.scan_tag_directive_prefix(start_mark)
return (handle, prefix)
def scan_tag_directive_handle(self, start_mark):
+ # type: (Any) -> Any
# See the specification for details.
value = self.scan_tag_handle('directive', start_mark)
- ch = self.peek()
+ ch = self.reader.peek()
if ch != u' ':
raise ScannerError("while scanning a directive", start_mark,
"expected ' ', but found %r" % utf8(ch),
- self.get_mark())
+ self.reader.get_mark())
return value
def scan_tag_directive_prefix(self, start_mark):
+ # type: (Any) -> Any
# See the specification for details.
value = self.scan_tag_uri('directive', start_mark)
- ch = self.peek()
+ ch = self.reader.peek()
if ch not in u'\0 \r\n\x85\u2028\u2029':
raise ScannerError("while scanning a directive", start_mark,
"expected ' ', but found %r" % utf8(ch),
- self.get_mark())
+ self.reader.get_mark())
return value
def scan_directive_ignored_line(self, start_mark):
+ # type: (Any) -> None
# See the specification for details.
- while self.peek() == u' ':
- self.forward()
- if self.peek() == u'#':
- while self.peek() not in u'\0\r\n\x85\u2028\u2029':
- self.forward()
- ch = self.peek()
+ while self.reader.peek() == u' ':
+ self.reader.forward()
+ if self.reader.peek() == u'#':
+ while self.reader.peek() not in u'\0\r\n\x85\u2028\u2029':
+ self.reader.forward()
+ ch = self.reader.peek()
if ch not in u'\0\r\n\x85\u2028\u2029':
raise ScannerError(
"while scanning a directive", start_mark,
"expected a comment or a line break, but found %r"
- % utf8(ch), self.get_mark())
+ % utf8(ch), self.reader.get_mark())
self.scan_line_break()
def scan_anchor(self, TokenClass):
+ # type: (Any) -> Any
# The specification does not restrict characters for anchors and
# aliases. This may lead to problems, for instance, the document:
# [ *alias, value ]
@@ -927,56 +945,57 @@ class Scanner(object):
# and
# [ *alias , "value" ]
# Therefore we restrict aliases to numbers and ASCII letters.
- start_mark = self.get_mark()
- indicator = self.peek()
+ start_mark = self.reader.get_mark()
+ indicator = self.reader.peek()
if indicator == u'*':
name = 'alias'
else:
name = 'anchor'
- self.forward()
+ self.reader.forward()
length = 0
- ch = self.peek(length)
+ ch = self.reader.peek(length)
# while u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' or u'a' <= ch <= u'z' \
# or ch in u'-_':
while check_anchorname_char(ch):
length += 1
- ch = self.peek(length)
+ ch = self.reader.peek(length)
if not length:
raise ScannerError(
"while scanning an %s" % name, start_mark,
"expected alphabetic or numeric character, but found %r"
- % utf8(ch), self.get_mark())
- value = self.prefix(length)
- self.forward(length)
+ % utf8(ch), self.reader.get_mark())
+ value = self.reader.prefix(length)
+ self.reader.forward(length)
# ch1 = ch
- # ch = self.peek() # no need to peek, ch is already set
+ # ch = self.reader.peek() # no need to peek, ch is already set
# assert ch1 == ch
if ch not in u'\0 \t\r\n\x85\u2028\u2029?:,[]{}%@`':
raise ScannerError(
"while scanning an %s" % name, start_mark,
"expected alphabetic or numeric character, but found %r"
- % utf8(ch), self.get_mark())
- end_mark = self.get_mark()
+ % utf8(ch), self.reader.get_mark())
+ end_mark = self.reader.get_mark()
return TokenClass(value, start_mark, end_mark)
def scan_tag(self):
+ # type: () -> Any
# See the specification for details.
- start_mark = self.get_mark()
- ch = self.peek(1)
+ start_mark = self.reader.get_mark()
+ ch = self.reader.peek(1)
if ch == u'<':
handle = None
- self.forward(2)
+ self.reader.forward(2)
suffix = self.scan_tag_uri('tag', start_mark)
- if self.peek() != u'>':
+ if self.reader.peek() != u'>':
raise ScannerError(
"while parsing a tag", start_mark,
- "expected '>', but found %r" % utf8(self.peek()),
- self.get_mark())
- self.forward()
+ "expected '>', but found %r" % utf8(self.reader.peek()),
+ self.reader.get_mark())
+ self.reader.forward()
elif ch in u'\0 \t\r\n\x85\u2028\u2029':
handle = None
suffix = u'!'
- self.forward()
+ self.reader.forward()
else:
length = 1
use_handle = False
@@ -985,36 +1004,36 @@ class Scanner(object):
use_handle = True
break
length += 1
- ch = self.peek(length)
+ ch = self.reader.peek(length)
handle = u'!'
if use_handle:
handle = self.scan_tag_handle('tag', start_mark)
else:
handle = u'!'
- self.forward()
+ self.reader.forward()
suffix = self.scan_tag_uri('tag', start_mark)
- ch = self.peek()
+ ch = self.reader.peek()
if ch not in u'\0 \r\n\x85\u2028\u2029':
raise ScannerError("while scanning a tag", start_mark,
"expected ' ', but found %r" % utf8(ch),
- self.get_mark())
+ self.reader.get_mark())
value = (handle, suffix)
- end_mark = self.get_mark()
+ end_mark = self.reader.get_mark()
return TagToken(value, start_mark, end_mark)
def scan_block_scalar(self, style):
+ # type: (Any) -> Any
# See the specification for details.
-
if style == '>':
folded = True
else:
folded = False
- chunks = []
- start_mark = self.get_mark()
+ chunks = [] # type: List[Any]
+ start_mark = self.reader.get_mark()
# Scan the header.
- self.forward()
+ self.reader.forward()
chomping, increment = self.scan_block_scalar_indicators(start_mark)
self.scan_block_scalar_ignored_line(start_mark)
@@ -1031,24 +1050,24 @@ class Scanner(object):
line_break = u''
# Scan the inner part of the block scalar.
- while self.column == indent and self.peek() != u'\0':
+ while self.reader.column == indent and self.reader.peek() != u'\0':
chunks.extend(breaks)
- leading_non_space = self.peek() not in u' \t'
+ leading_non_space = self.reader.peek() not in u' \t'
length = 0
- while self.peek(length) not in u'\0\r\n\x85\u2028\u2029':
+ while self.reader.peek(length) not in u'\0\r\n\x85\u2028\u2029':
length += 1
- chunks.append(self.prefix(length))
- self.forward(length)
+ chunks.append(self.reader.prefix(length))
+ self.reader.forward(length)
line_break = self.scan_line_break()
breaks, end_mark = self.scan_block_scalar_breaks(indent)
- if self.column == indent and self.peek() != u'\0':
+ if self.reader.column == indent and self.reader.peek() != u'\0':
# Unfortunately, folding rules are ambiguous.
#
# This is the folding according to the specification:
if folded and line_break == u'\n' \
- and leading_non_space and self.peek() not in u' \t':
+ and leading_non_space and self.reader.peek() not in u' \t':
if not breaks:
chunks.append(u' ')
else:
@@ -1059,7 +1078,7 @@ class Scanner(object):
#
# if folded and line_break == u'\n':
# if not breaks:
- # if self.peek() not in ' \t':
+ # if self.reader.peek() not in ' \t':
# chunks.append(u' ')
# else:
# chunks.append(line_break)
@@ -1070,7 +1089,7 @@ class Scanner(object):
# Process trailing line breaks. The 'chomping' setting determines
# whether they are included in the value.
- trailing = []
+ trailing = [] # type: List[Any]
if chomping in [None, True]:
chunks.append(line_break)
if chomping is True:
@@ -1090,32 +1109,33 @@ class Scanner(object):
# Keep track of the trailing whitespace and following comments
# as a comment token, if isn't all included in the actual value.
- comment_end_mark = self.get_mark()
+ comment_end_mark = self.reader.get_mark()
comment = CommentToken(''.join(trailing), end_mark,
comment_end_mark)
token.add_post_comment(comment)
return token
def scan_block_scalar_indicators(self, start_mark):
+ # type: (Any) -> Any
# See the specification for details.
chomping = None
increment = None
- ch = self.peek()
+ ch = self.reader.peek()
if ch in u'+-':
if ch == '+':
chomping = True
else:
chomping = False
- self.forward()
- ch = self.peek()
+ self.reader.forward()
+ ch = self.reader.peek()
if ch in u'0123456789':
increment = int(ch)
if increment == 0:
raise ScannerError(
"while scanning a block scalar", start_mark,
"expected indentation indicator in the range 1-9, "
- "but found 0", self.get_mark())
- self.forward()
+ "but found 0", self.reader.get_mark())
+ self.reader.forward()
elif ch in u'0123456789':
increment = int(ch)
if increment == 0:
@@ -1123,67 +1143,71 @@ class Scanner(object):
"while scanning a block scalar", start_mark,
"expected indentation indicator in the range 1-9, "
"but found 0",
- self.get_mark())
- self.forward()
- ch = self.peek()
+ self.reader.get_mark())
+ self.reader.forward()
+ ch = self.reader.peek()
if ch in u'+-':
if ch == '+':
chomping = True
else:
chomping = False
- self.forward()
- ch = self.peek()
+ self.reader.forward()
+ ch = self.reader.peek()
if ch not in u'\0 \r\n\x85\u2028\u2029':
raise ScannerError(
"while scanning a block scalar", start_mark,
"expected chomping or indentation indicators, but found %r"
- % utf8(ch), self.get_mark())
+ % utf8(ch), self.reader.get_mark())
return chomping, increment
def scan_block_scalar_ignored_line(self, start_mark):
+ # type: (Any) -> Any
# See the specification for details.
- while self.peek() == u' ':
- self.forward()
- if self.peek() == u'#':
- while self.peek() not in u'\0\r\n\x85\u2028\u2029':
- self.forward()
- ch = self.peek()
+ while self.reader.peek() == u' ':
+ self.reader.forward()
+ if self.reader.peek() == u'#':
+ while self.reader.peek() not in u'\0\r\n\x85\u2028\u2029':
+ self.reader.forward()
+ ch = self.reader.peek()
if ch not in u'\0\r\n\x85\u2028\u2029':
raise ScannerError(
"while scanning a block scalar", start_mark,
"expected a comment or a line break, but found %r"
- % utf8(ch), self.get_mark())
+ % utf8(ch), self.reader.get_mark())
self.scan_line_break()
def scan_block_scalar_indentation(self):
+ # type: () -> Any
# See the specification for details.
chunks = []
max_indent = 0
- end_mark = self.get_mark()
- while self.peek() in u' \r\n\x85\u2028\u2029':
- if self.peek() != u' ':
+ end_mark = self.reader.get_mark()
+ while self.reader.peek() in u' \r\n\x85\u2028\u2029':
+ if self.reader.peek() != u' ':
chunks.append(self.scan_line_break())
- end_mark = self.get_mark()
+ end_mark = self.reader.get_mark()
else:
- self.forward()
- if self.column > max_indent:
- max_indent = self.column
+ self.reader.forward()
+ if self.reader.column > max_indent:
+ max_indent = self.reader.column
return chunks, max_indent, end_mark
def scan_block_scalar_breaks(self, indent):
+ # type: (int) -> Any
# See the specification for details.
chunks = []
- end_mark = self.get_mark()
- while self.column < indent and self.peek() == u' ':
- self.forward()
- while self.peek() in u'\r\n\x85\u2028\u2029':
+ end_mark = self.reader.get_mark()
+ while self.reader.column < indent and self.reader.peek() == u' ':
+ self.reader.forward()
+ while self.reader.peek() in u'\r\n\x85\u2028\u2029':
chunks.append(self.scan_line_break())
- end_mark = self.get_mark()
- while self.column < indent and self.peek() == u' ':
- self.forward()
+ end_mark = self.reader.get_mark()
+ while self.reader.column < indent and self.reader.peek() == u' ':
+ self.reader.forward()
return chunks, end_mark
def scan_flow_scalar(self, style):
+ # type: (Any) -> Any
# See the specification for details.
# Note that we loose indentation rules for quoted scalars. Quoted
# scalars don't need to adhere indentation because " and ' clearly
@@ -1194,16 +1218,16 @@ class Scanner(object):
double = True
else:
double = False
- chunks = []
- start_mark = self.get_mark()
- quote = self.peek()
- self.forward()
+ chunks = [] # type: List[Any]
+ start_mark = self.reader.get_mark()
+ quote = self.reader.peek()
+ self.reader.forward()
chunks.extend(self.scan_flow_scalar_non_spaces(double, start_mark))
- while self.peek() != quote:
+ while self.reader.peek() != quote:
chunks.extend(self.scan_flow_scalar_spaces(double, start_mark))
chunks.extend(self.scan_flow_scalar_non_spaces(double, start_mark))
- self.forward()
- end_mark = self.get_mark()
+ self.reader.forward()
+ end_mark = self.reader.get_mark()
return ScalarToken(u''.join(chunks), False, start_mark, end_mark,
style)
@@ -1235,42 +1259,43 @@ class Scanner(object):
}
def scan_flow_scalar_non_spaces(self, double, start_mark):
+ # type: (Any, Any) -> Any
# See the specification for details.
- chunks = []
+ chunks = [] # type: List[Any]
while True:
length = 0
- while self.peek(length) not in u'\'\"\\\0 \t\r\n\x85\u2028\u2029':
+ while self.reader.peek(length) not in u'\'\"\\\0 \t\r\n\x85\u2028\u2029':
length += 1
- if length:
- chunks.append(self.prefix(length))
- self.forward(length)
- ch = self.peek()
- if not double and ch == u'\'' and self.peek(1) == u'\'':
+ if length != 0:
+ chunks.append(self.reader.prefix(length))
+ self.reader.forward(length)
+ ch = self.reader.peek()
+ if not double and ch == u'\'' and self.reader.peek(1) == u'\'':
chunks.append(u'\'')
- self.forward(2)
+ self.reader.forward(2)
elif (double and ch == u'\'') or (not double and ch in u'\"\\'):
chunks.append(ch)
- self.forward()
+ self.reader.forward()
elif double and ch == u'\\':
- self.forward()
- ch = self.peek()
+ self.reader.forward()
+ ch = self.reader.peek()
if ch in self.ESCAPE_REPLACEMENTS:
chunks.append(self.ESCAPE_REPLACEMENTS[ch])
- self.forward()
+ self.reader.forward()
elif ch in self.ESCAPE_CODES:
length = self.ESCAPE_CODES[ch]
- self.forward()
+ self.reader.forward()
for k in range(length):
- if self.peek(k) not in u'0123456789ABCDEFabcdef':
+ if self.reader.peek(k) not in u'0123456789ABCDEFabcdef':
raise ScannerError(
"while scanning a double-quoted scalar",
start_mark,
"expected escape sequence of %d hexdecimal "
"numbers, but found %r" %
- (length, utf8(self.peek(k))), self.get_mark())
- code = int(self.prefix(length), 16)
+ (length, utf8(self.reader.peek(k))), self.reader.get_mark())
+ code = int(self.reader.prefix(length), 16)
chunks.append(unichr(code))
- self.forward(length)
+ self.reader.forward(length)
elif ch in u'\r\n\x85\u2028\u2029':
self.scan_line_break()
chunks.extend(self.scan_flow_scalar_breaks(
@@ -1279,23 +1304,24 @@ class Scanner(object):
raise ScannerError(
"while scanning a double-quoted scalar", start_mark,
"found unknown escape character %r" % utf8(ch),
- self.get_mark())
+ self.reader.get_mark())
else:
return chunks
def scan_flow_scalar_spaces(self, double, start_mark):
+ # type: (Any, Any) -> Any
# See the specification for details.
chunks = []
length = 0
- while self.peek(length) in u' \t':
+ while self.reader.peek(length) in u' \t':
length += 1
- whitespaces = self.prefix(length)
- self.forward(length)
- ch = self.peek()
+ whitespaces = self.reader.prefix(length)
+ self.reader.forward(length)
+ ch = self.reader.peek()
if ch == u'\0':
raise ScannerError(
"while scanning a quoted scalar", start_mark,
- "found unexpected end of stream", self.get_mark())
+ "found unexpected end of stream", self.reader.get_mark())
elif ch in u'\r\n\x85\u2028\u2029':
line_break = self.scan_line_break()
breaks = self.scan_flow_scalar_breaks(double, start_mark)
@@ -1309,62 +1335,64 @@ class Scanner(object):
return chunks
def scan_flow_scalar_breaks(self, double, start_mark):
+ # type: (Any, Any) -> Any
# See the specification for details.
- chunks = []
+ chunks = [] # type: List[Any]
while True:
# Instead of checking indentation, we check for document
# separators.
- prefix = self.prefix(3)
+ prefix = self.reader.prefix(3)
if (prefix == u'---' or prefix == u'...') \
- and self.peek(3) in u'\0 \t\r\n\x85\u2028\u2029':
+ and self.reader.peek(3) in u'\0 \t\r\n\x85\u2028\u2029':
raise ScannerError("while scanning a quoted scalar",
start_mark,
"found unexpected document separator",
- self.get_mark())
- while self.peek() in u' \t':
- self.forward()
- if self.peek() in u'\r\n\x85\u2028\u2029':
+ self.reader.get_mark())
+ while self.reader.peek() in u' \t':
+ self.reader.forward()
+ if self.reader.peek() in u'\r\n\x85\u2028\u2029':
chunks.append(self.scan_line_break())
else:
return chunks
def scan_plain(self):
+ # type: () -> Any
# See the specification for details.
# We add an additional restriction for the flow context:
# plain scalars in the flow context cannot contain ',', ': ' and '?'.
# We also keep track of the `allow_simple_key` flag here.
# Indentation rules are loosed for the flow context.
- chunks = []
- start_mark = self.get_mark()
+ chunks = [] # type: List[Any]
+ start_mark = self.reader.get_mark()
end_mark = start_mark
indent = self.indent+1
# We allow zero indentation for scalars, but then we need to check for
# document separators at the beginning of the line.
# if indent == 0:
# indent = 1
- spaces = []
+ spaces = [] # type: List[Any]
while True:
length = 0
- if self.peek() == u'#':
+ if self.reader.peek() == u'#':
break
while True:
- ch = self.peek(length)
+ ch = self.reader.peek(length)
if (ch == u':' and
- self.peek(length+1) not in u'\0 \t\r\n\x85\u2028\u2029'):
+ self.reader.peek(length+1) not in u'\0 \t\r\n\x85\u2028\u2029'):
pass
elif (ch in u'\0 \t\r\n\x85\u2028\u2029' or
(not self.flow_level and ch == u':' and
- self.peek(length+1) in u'\0 \t\r\n\x85\u2028\u2029') or
+ self.reader.peek(length+1) in u'\0 \t\r\n\x85\u2028\u2029') or
(self.flow_level and ch in u',:?[]{}')):
break
length += 1
# It's not clear what we should do with ':' in the flow context.
if (self.flow_level and ch == u':' and
- self.peek(length+1) not in u'\0 \t\r\n\x85\u2028\u2029,[]{}'):
- self.forward(length)
+ self.reader.peek(length+1) not in u'\0 \t\r\n\x85\u2028\u2029,[]{}'):
+ self.reader.forward(length)
raise ScannerError(
"while scanning a plain scalar", start_mark,
- "found unexpected ':'", self.get_mark(),
+ "found unexpected ':'", self.reader.get_mark(),
"Please check "
"http://pyyaml.org/wiki/YAMLColonInFlowContext "
"for details.")
@@ -1372,12 +1400,12 @@ class Scanner(object):
break
self.allow_simple_key = False
chunks.extend(spaces)
- chunks.append(self.prefix(length))
- self.forward(length)
- end_mark = self.get_mark()
+ chunks.append(self.reader.prefix(length))
+ self.reader.forward(length)
+ end_mark = self.reader.get_mark()
spaces = self.scan_plain_spaces(indent, start_mark)
- if not spaces or self.peek() == u'#' \
- or (not self.flow_level and self.column < indent):
+ if not spaces or self.reader.peek() == u'#' \
+ or (not self.flow_level and self.reader.column < indent):
break
token = ScalarToken(u''.join(chunks), True, start_mark, end_mark)
@@ -1388,32 +1416,33 @@ class Scanner(object):
return token
def scan_plain_spaces(self, indent, start_mark):
+ # type: (Any, Any) -> Any
# See the specification for details.
# The specification is really confusing about tabs in plain scalars.
# We just forbid them completely. Do not use tabs in YAML!
chunks = []
length = 0
- while self.peek(length) in u' ':
+ while self.reader.peek(length) in u' ':
length += 1
- whitespaces = self.prefix(length)
- self.forward(length)
- ch = self.peek()
+ whitespaces = self.reader.prefix(length)
+ self.reader.forward(length)
+ ch = self.reader.peek()
if ch in u'\r\n\x85\u2028\u2029':
line_break = self.scan_line_break()
self.allow_simple_key = True
- prefix = self.prefix(3)
+ prefix = self.reader.prefix(3)
if (prefix == u'---' or prefix == u'...') \
- and self.peek(3) in u'\0 \t\r\n\x85\u2028\u2029':
+ and self.reader.peek(3) in u'\0 \t\r\n\x85\u2028\u2029':
return
breaks = []
- while self.peek() in u' \r\n\x85\u2028\u2029':
- if self.peek() == ' ':
- self.forward()
+ while self.reader.peek() in u' \r\n\x85\u2028\u2029':
+ if self.reader.peek() == ' ':
+ self.reader.forward()
else:
breaks.append(self.scan_line_break())
- prefix = self.prefix(3)
+ prefix = self.reader.prefix(3)
if (prefix == u'---' or prefix == u'...') \
- and self.peek(3) in u'\0 \t\r\n\x85\u2028\u2029':
+ and self.reader.peek(3) in u'\0 \t\r\n\x85\u2028\u2029':
return
if line_break != u'\n':
chunks.append(line_break)
@@ -1425,87 +1454,91 @@ class Scanner(object):
return chunks
def scan_tag_handle(self, name, start_mark):
+ # type: (Any, Any) -> Any
# See the specification for details.
# For some strange reasons, the specification does not allow '_' in
# tag handles. I have allowed it anyway.
- ch = self.peek()
+ ch = self.reader.peek()
if ch != u'!':
raise ScannerError("while scanning a %s" % name, start_mark,
"expected '!', but found %r" % utf8(ch),
- self.get_mark())
+ self.reader.get_mark())
length = 1
- ch = self.peek(length)
+ ch = self.reader.peek(length)
if ch != u' ':
while u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' \
or u'a' <= ch <= u'z' \
or ch in u'-_':
length += 1
- ch = self.peek(length)
+ ch = self.reader.peek(length)
if ch != u'!':
- self.forward(length)
+ self.reader.forward(length)
raise ScannerError("while scanning a %s" % name, start_mark,
"expected '!', but found %r" % utf8(ch),
- self.get_mark())
+ self.reader.get_mark())
length += 1
- value = self.prefix(length)
- self.forward(length)
+ value = self.reader.prefix(length)
+ self.reader.forward(length)
return value
def scan_tag_uri(self, name, start_mark):
+ # type: (Any, Any) -> Any
# See the specification for details.
# Note: we do not check if URI is well-formed.
chunks = []
length = 0
- ch = self.peek(length)
+ ch = self.reader.peek(length)
while u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' or u'a' <= ch <= u'z' \
or ch in u'-;/?:@&=+$,_.!~*\'()[]%':
if ch == u'%':
- chunks.append(self.prefix(length))
- self.forward(length)
+ chunks.append(self.reader.prefix(length))
+ self.reader.forward(length)
length = 0
chunks.append(self.scan_uri_escapes(name, start_mark))
else:
length += 1
- ch = self.peek(length)
- if length:
- chunks.append(self.prefix(length))
- self.forward(length)
+ ch = self.reader.peek(length)
+ if length != 0:
+ chunks.append(self.reader.prefix(length))
+ self.reader.forward(length)
length = 0
if not chunks:
raise ScannerError("while parsing a %s" % name, start_mark,
"expected URI, but found %r" % utf8(ch),
- self.get_mark())
+ self.reader.get_mark())
return u''.join(chunks)
def scan_uri_escapes(self, name, start_mark):
+ # type: (Any, Any) -> Any
# See the specification for details.
- code_bytes = []
- mark = self.get_mark()
- while self.peek() == u'%':
- self.forward()
+ code_bytes = [] # type: List[Any]
+ mark = self.reader.get_mark()
+ while self.reader.peek() == u'%':
+ self.reader.forward()
for k in range(2):
- if self.peek(k) not in u'0123456789ABCDEFabcdef':
+ if self.reader.peek(k) not in u'0123456789ABCDEFabcdef':
raise ScannerError(
"while scanning a %s" % name, start_mark,
"expected URI escape sequence of 2 hexdecimal numbers,"
" but found %r"
- % utf8(self.peek(k)), self.get_mark())
+ % utf8(self.reader.peek(k)), self.reader.get_mark())
if PY3:
- code_bytes.append(int(self.prefix(2), 16))
+ code_bytes.append(int(self.reader.prefix(2), 16))
else:
- code_bytes.append(chr(int(self.prefix(2), 16)))
- self.forward(2)
+ code_bytes.append(chr(int(self.reader.prefix(2), 16)))
+ self.reader.forward(2)
try:
if PY3:
value = bytes(code_bytes).decode('utf-8')
else:
- value = unicode(''.join(code_bytes), 'utf-8')
+ value = unicode(''.join(code_bytes), 'utf-8') # type: ignore
except UnicodeDecodeError as exc:
raise ScannerError("while scanning a %s" % name, start_mark,
str(exc), mark)
return value
def scan_line_break(self):
+ # type: () -> Any
# Transforms:
# '\r\n' : '\n'
# '\r' : '\n'
@@ -1514,26 +1547,27 @@ class Scanner(object):
# '\u2028' : '\u2028'
# '\u2029 : '\u2029'
# default : ''
- ch = self.peek()
+ ch = self.reader.peek()
if ch in u'\r\n\x85':
- if self.prefix(2) == u'\r\n':
- self.forward(2)
+ if self.reader.prefix(2) == u'\r\n':
+ self.reader.forward(2)
else:
- self.forward()
+ self.reader.forward()
return u'\n'
elif ch in u'\u2028\u2029':
- self.forward()
+ self.reader.forward()
return ch
return u''
class RoundTripScanner(Scanner):
def check_token(self, *choices):
+ # type: (Any) -> bool
# Check if the next token is one of the given types.
while self.need_more_tokens():
self.fetch_more_tokens()
self._gather_comments()
- if self.tokens:
+ if bool(self.tokens):
if not choices:
return True
for choice in choices:
@@ -1542,16 +1576,19 @@ class RoundTripScanner(Scanner):
return False
def peek_token(self):
+ # type: () -> Any
# Return the next token, but do not delete if from the queue.
while self.need_more_tokens():
self.fetch_more_tokens()
self._gather_comments()
- if self.tokens:
+ if bool(self.tokens):
return self.tokens[0]
+ return None
def _gather_comments(self):
+ # type: () -> Any
"""combine multiple comment lines"""
- comments = []
+ comments = [] # type: List[Any]
if not self.tokens:
return comments
if isinstance(self.tokens[0], CommentToken):
@@ -1578,11 +1615,12 @@ class RoundTripScanner(Scanner):
self.fetch_more_tokens()
def get_token(self):
+ # type: () -> Any
# Return the next token.
while self.need_more_tokens():
self.fetch_more_tokens()
self._gather_comments()
- if self.tokens:
+ if bool(self.tokens):
# only add post comment to single line tokens:
# scalar, value token. FlowXEndToken, otherwise
# hidden streamtokens could get them (leave them and they will be
@@ -1600,8 +1638,10 @@ class RoundTripScanner(Scanner):
self.tokens[0].add_post_comment(self.tokens.pop(1))
self.tokens_taken += 1
return self.tokens.pop(0)
+ return None
def fetch_comment(self, comment):
+ # type: (Any) -> None
value, start_mark, end_mark = comment
while value and value[-1] == u' ':
# empty line within indented key context
@@ -1612,6 +1652,7 @@ class RoundTripScanner(Scanner):
# scanner
def scan_to_next_token(self):
+ # type: () -> Any
# We ignore spaces, line breaks and comments.
# If we find a line break in the block context, we set the flag
# `allow_simple_key` on.
@@ -1631,51 +1672,54 @@ class RoundTripScanner(Scanner):
# `unwind_indent` before issuing BLOCK-END.
# Scanners for block, flow, and plain scalars need to be modified.
- if self.index == 0 and self.peek() == u'\uFEFF':
- self.forward()
+ if self.reader.index == 0 and self.reader.peek() == u'\uFEFF':
+ self.reader.forward()
found = False
while not found:
- while self.peek() == u' ':
- self.forward()
- ch = self.peek()
+ while self.reader.peek() == u' ':
+ self.reader.forward()
+ ch = self.reader.peek()
if ch == u'#':
- start_mark = self.get_mark()
+ start_mark = self.reader.get_mark()
comment = ch
- self.forward()
+ self.reader.forward()
while ch not in u'\0\r\n\x85\u2028\u2029':
- ch = self.peek()
+ ch = self.reader.peek()
if ch == u'\0': # don't gobble the end-of-stream character
break
comment += ch
- self.forward()
+ self.reader.forward()
# gather any blank lines following the comment too
ch = self.scan_line_break()
while len(ch) > 0:
comment += ch
ch = self.scan_line_break()
- end_mark = self.get_mark()
+ end_mark = self.reader.get_mark()
if not self.flow_level:
self.allow_simple_key = True
return comment, start_mark, end_mark
- if self.scan_line_break():
- start_mark = self.get_mark()
+ if bool(self.scan_line_break()):
+ start_mark = self.reader.get_mark()
if not self.flow_level:
self.allow_simple_key = True
- ch = self.peek()
+ ch = self.reader.peek()
if ch == '\n': # empty toplevel lines
- start_mark = self.get_mark()
+ start_mark = self.reader.get_mark()
comment = ''
while ch:
ch = self.scan_line_break(empty_line=True)
comment += ch
- if self.peek() == '#': # empty line followed by indented real comment
+ if self.reader.peek() == '#':
+ # empty line followed by indented real comment
comment = comment.rsplit('\n', 1)[0] + '\n'
- end_mark = self.get_mark()
+ end_mark = self.reader.get_mark()
return comment, start_mark, end_mark
else:
found = True
+ return None
def scan_line_break(self, empty_line=False):
+ # type: (bool) -> Text
# Transforms:
# '\r\n' : '\n'
# '\r' : '\n'
@@ -1684,18 +1728,18 @@ class RoundTripScanner(Scanner):
# '\u2028' : '\u2028'
# '\u2029 : '\u2029'
# default : ''
- ch = self.peek()
+ ch = self.reader.peek() # type: Text
if ch in u'\r\n\x85':
- if self.prefix(2) == u'\r\n':
- self.forward(2)
+ if self.reader.prefix(2) == u'\r\n':
+ self.reader.forward(2)
else:
- self.forward()
+ self.reader.forward()
return u'\n'
elif ch in u'\u2028\u2029':
- self.forward()
+ self.reader.forward()
return ch
elif empty_line and ch in '\t ':
- self.forward()
+ self.reader.forward()
return ch
return u''