summaryrefslogtreecommitdiff
path: root/reader.py
diff options
context:
space:
mode:
authorAnthon van der Neut <anthon@mnt.org>2018-08-03 22:14:57 +0200
committerAnthon van der Neut <anthon@mnt.org>2018-08-03 22:14:57 +0200
commitdce10fcff1de54121fb8b440b883ef5d3fe2f96a (patch)
tree072b4bd247e6f1cd95c08c7b67fea0fc96f0578e /reader.py
parent2966a4f215861fa05e0dc7e0cd53350766e794c6 (diff)
downloadruamel.yaml-dce10fcff1de54121fb8b440b883ef5d3fe2f96a.tar.gz
Apply oitnb and mypy 0.620, then make everything work again0.15.48
Diffstat (limited to 'reader.py')
-rw-r--r--reader.py91
1 files changed, 48 insertions, 43 deletions
diff --git a/reader.py b/reader.py
index 96bb96b..4d0ddb4 100644
--- a/reader.py
+++ b/reader.py
@@ -1,6 +1,7 @@
# coding: utf-8
from __future__ import absolute_import
+
# This module contains abstractions for the input stream. You don't have to
# looks further, there are no pretty code.
#
@@ -34,7 +35,6 @@ __all__ = ['Reader', 'ReaderError']
class ReaderError(YAMLError):
-
def __init__(self, name, position, character, encoding, reason):
# type: (Any, Any, Any, Any, Any) -> None
self.name = name
@@ -46,15 +46,20 @@ class ReaderError(YAMLError):
def __str__(self):
# type: () -> str
if isinstance(self.character, binary_type):
- return "'%s' codec can't decode byte #x%02x: %s\n" \
- " in \"%s\", position %d" \
- % (self.encoding, ord(self.character), self.reason,
- self.name, self.position)
+ return "'%s' codec can't decode byte #x%02x: %s\n" ' in "%s", position %d' % (
+ self.encoding,
+ ord(self.character),
+ self.reason,
+ self.name,
+ self.position,
+ )
else:
- return "unacceptable character #x%04x: %s\n" \
- " in \"%s\", position %d" \
- % (self.character, self.reason,
- self.name, self.position)
+ return 'unacceptable character #x%04x: %s\n' ' in "%s", position %d' % (
+ self.character,
+ self.reason,
+ self.name,
+ self.position,
+ )
class Reader(object):
@@ -81,10 +86,10 @@ class Reader(object):
def reset_reader(self):
# type: () -> None
- self.name = None # type: Any
+ self.name = None # type: Any
self.stream_pointer = 0
self.eof = True
- self.buffer = u''
+ self.buffer = ""
self.pointer = 0
self.raw_buffer = None # type: Any
self.raw_decode = None
@@ -108,18 +113,18 @@ class Reader(object):
return
self._stream = None
if isinstance(val, text_type):
- self.name = "<unicode string>"
+ self.name = '<unicode string>'
self.check_printable(val)
- self.buffer = val + u'\0'
+ self.buffer = val + u'\0' # type: ignore
elif isinstance(val, binary_type):
- self.name = "<byte string>"
+ self.name = '<byte string>'
self.raw_buffer = val
self.determine_encoding()
else:
if not hasattr(val, 'read'):
raise YAMLStreamError('stream argument needs to have a read() method')
self._stream = val
- self.name = getattr(self.stream, 'name', "<file>")
+ self.name = getattr(self.stream, 'name', '<file>')
self.eof = False
self.raw_buffer = None
self.determine_encoding()
@@ -136,7 +141,7 @@ class Reader(object):
# type: (int) -> Any
if self.pointer + length >= len(self.buffer):
self.update(length)
- return self.buffer[self.pointer:self.pointer + length]
+ return self.buffer[self.pointer : self.pointer + length]
def forward(self, length=1):
# type: (int) -> None
@@ -146,8 +151,9 @@ class Reader(object):
ch = self.buffer[self.pointer]
self.pointer += 1
self.index += 1
- if ch in u'\n\x85\u2028\u2029' \
- or (ch == u'\r' and self.buffer[self.pointer] != u'\n'):
+ if ch in u'\n\x85\u2028\u2029' or (
+ ch == u'\r' and self.buffer[self.pointer] != u'\n'
+ ):
self.line += 1
self.column = 0
elif ch != u'\uFEFF':
@@ -157,15 +163,15 @@ class Reader(object):
def get_mark(self):
# type: () -> Any
if self.stream is None:
- return StringMark(self.name, self.index, self.line, self.column,
- self.buffer, self.pointer)
+ return StringMark(
+ self.name, self.index, self.line, self.column, self.buffer, self.pointer
+ )
else:
return FileMark(self.name, self.index, self.line, self.column)
def determine_encoding(self):
# type: () -> None
- while not self.eof and (self.raw_buffer is None or
- len(self.raw_buffer) < 2):
+ while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2):
self.update_raw()
if isinstance(self.raw_buffer, binary_type):
if self.raw_buffer.startswith(codecs.BOM_UTF16_LE):
@@ -181,10 +187,7 @@ class Reader(object):
if UNICODE_SIZE == 2:
NON_PRINTABLE = RegExp(
- u'[^\x09\x0A\x0D\x20-\x7E\x85'
- u'\xA0-\uD7FF'
- u'\uE000-\uFFFD'
- u']'
+ u'[^\x09\x0A\x0D\x20-\x7E\x85' u'\xA0-\uD7FF' u'\uE000-\uFFFD' u']'
)
else:
NON_PRINTABLE = RegExp(
@@ -195,13 +198,13 @@ class Reader(object):
u']'
)
- _printable_ascii = ('\x09\x0A\x0D' + ''.join(map(chr, range(0x20, 0x7F)))).encode('ascii')
+ _printable_ascii = ('\x09\x0A\x0D' + "".join(map(chr, range(0x20, 0x7F)))).encode('ascii')
@classmethod
- def _get_non_printable_ascii(cls, data):
+ def _get_non_printable_ascii(cls, data): # type: ignore
# type: (Text, bytes) -> Union[None, Tuple[int, Text]]
ascii_bytes = data.encode('ascii')
- non_printables = ascii_bytes.translate(None, cls._printable_ascii)
+ non_printables = ascii_bytes.translate(None, cls._printable_ascii) # type: ignore
if not non_printables:
return None
non_printable = non_printables[:1]
@@ -219,7 +222,7 @@ class Reader(object):
def _get_non_printable(cls, data):
# type: (Text) -> Union[None, Tuple[int, Text]]
try:
- return cls._get_non_printable_ascii(data)
+ return cls._get_non_printable_ascii(data) # type: ignore
except UnicodeEncodeError:
return cls._get_non_printable_regex(data)
@@ -229,37 +232,38 @@ class Reader(object):
if non_printable_match is not None:
start, character = non_printable_match
position = self.index + (len(self.buffer) - self.pointer) + start
- raise ReaderError(self.name, position, ord(character),
- 'unicode', "special characters are not allowed")
+ raise ReaderError(
+ self.name,
+ position,
+ ord(character),
+ 'unicode',
+ 'special characters are not allowed',
+ )
def update(self, length):
# type: (int) -> None
if self.raw_buffer is None:
return
- self.buffer = self.buffer[self.pointer:]
+ self.buffer = self.buffer[self.pointer :]
self.pointer = 0
while len(self.buffer) < length:
if not self.eof:
self.update_raw()
if self.raw_decode is not None:
try:
- data, converted = self.raw_decode(self.raw_buffer,
- 'strict', self.eof)
+ data, converted = self.raw_decode(self.raw_buffer, 'strict', self.eof)
except UnicodeDecodeError as exc:
if PY3:
character = self.raw_buffer[exc.start]
else:
character = exc.object[exc.start]
if self.stream is not None:
- position = self.stream_pointer - \
- len(self.raw_buffer) + exc.start
+ position = self.stream_pointer - len(self.raw_buffer) + exc.start
elif self.stream is not None:
- position = self.stream_pointer - \
- len(self.raw_buffer) + exc.start
+ position = self.stream_pointer - len(self.raw_buffer) + exc.start
else:
position = exc.start
- raise ReaderError(self.name, position, character,
- exc.encoding, exc.reason)
+ raise ReaderError(self.name, position, character, exc.encoding, exc.reason)
else:
data = self.raw_buffer
converted = len(data)
@@ -267,12 +271,12 @@ class Reader(object):
self.buffer += data
self.raw_buffer = self.raw_buffer[converted:]
if self.eof:
- self.buffer += u'\0'
+ self.buffer += '\0'
self.raw_buffer = None
break
def update_raw(self, size=None):
- # type: (int) -> None
+ # type: (Union[None, int]) -> None
if size is None:
size = 4096 if PY3 else 1024
data = self.stream.read(size)
@@ -284,6 +288,7 @@ class Reader(object):
if not data:
self.eof = True
+
# try:
# import psyco
# psyco.bind(Reader)