summaryrefslogtreecommitdiff
path: root/reader.py
diff options
context:
space:
mode:
authorAnthon van der Neut <anthon@mnt.org>2023-05-01 19:13:50 +0200
committerAnthon van der Neut <anthon@mnt.org>2023-05-01 19:13:50 +0200
commit8b731994b1543d7886af85f926d9eea5a22d0732 (patch)
tree3553d4cbc80b541484d7a3f39e00cdcfd8f9d030 /reader.py
parent45111ba0b67e8619265d89f3202635e62c13cde6 (diff)
downloadruamel.yaml-8b731994b1543d7886af85f926d9eea5a22d0732.tar.gz
retrofitted 0.18 changes
Diffstat (limited to 'reader.py')
-rw-r--r--reader.py93
1 files changed, 33 insertions, 60 deletions
diff --git a/reader.py b/reader.py
index 4aac40a..dec6e9f 100644
--- a/reader.py
+++ b/reader.py
@@ -22,46 +22,35 @@
import codecs
from ruamel.yaml.error import YAMLError, FileMark, StringMark, YAMLStreamError
-from ruamel.yaml.compat import _F # NOQA
from ruamel.yaml.util import RegExp
-if False: # MYPY
- from typing import Any, Dict, Optional, List, Union, Text, Tuple, Optional # NOQA
-# from ruamel.yaml.compat import StreamTextType # NOQA
+from typing import Any, Dict, Optional, List, Union, Text, Tuple, Optional # NOQA
+# from ruamel.yaml.compat import StreamTextType # NOQA
__all__ = ['Reader', 'ReaderError']
class ReaderError(YAMLError):
- def __init__(self, name, position, character, encoding, reason):
- # type: (Any, Any, Any, Any, Any) -> None
+ def __init__(
+ self, name: Any, position: Any, character: Any, encoding: Any, reason: Any
+ ) -> None:
self.name = name
self.character = character
self.position = position
self.encoding = encoding
self.reason = reason
- def __str__(self):
- # type: () -> Any
+ def __str__(self) -> Any:
if isinstance(self.character, bytes):
- return _F(
- "'{self_encoding!s}' codec can't decode byte #x{ord_self_character:02x}: "
- '{self_reason!s}\n'
- ' in "{self_name!s}", position {self_position:d}',
- self_encoding=self.encoding,
- ord_self_character=ord(self.character),
- self_reason=self.reason,
- self_name=self.name,
- self_position=self.position,
+ return (
+ f"'{self.encoding!s}' codec can't decode byte #x{ord(self.character):02x}: "
+ f'{self.reason!s}\n'
+ f' in "{self.name!s}", position {self.position:d}'
)
else:
- return _F(
- 'unacceptable character #x{self_character:04x}: {self_reason!s}\n'
- ' in "{self_name!s}", position {self_position:d}',
- self_character=self.character,
- self_reason=self.reason,
- self_name=self.name,
- self_position=self.position,
+ return (
+ f'unacceptable character #x{self.character:04x}: {self.reason!s}\n'
+ f' in "{self.name!s}", position {self.position:d}'
)
@@ -79,39 +68,35 @@ class Reader:
# Yeah, it's ugly and slow.
- def __init__(self, stream, loader=None):
- # type: (Any, Any) -> None
+ def __init__(self, stream: Any, loader: Any = None) -> None:
self.loader = loader
if self.loader is not None and getattr(self.loader, '_reader', None) is None:
self.loader._reader = self
self.reset_reader()
- self.stream = stream # type: Any # as .read is called
+ self.stream: Any = stream # as .read is called
- def reset_reader(self):
- # type: () -> None
- self.name = None # type: Any
+ def reset_reader(self) -> None:
+ self.name: Any = None
self.stream_pointer = 0
self.eof = True
self.buffer = ""
self.pointer = 0
- self.raw_buffer = None # type: Any
+ self.raw_buffer: Any = None
self.raw_decode = None
- self.encoding = None # type: Optional[Text]
+ self.encoding: Optional[Text] = None
self.index = 0
self.line = 0
self.column = 0
@property
- def stream(self):
- # type: () -> Any
+ def stream(self) -> Any:
try:
return self._stream
except AttributeError:
raise YAMLStreamError('input stream needs to specified')
@stream.setter
- def stream(self, val):
- # type: (Any) -> None
+ def stream(self, val: Any) -> None:
if val is None:
return
self._stream = None
@@ -132,22 +117,19 @@ class Reader:
self.raw_buffer = None
self.determine_encoding()
- def peek(self, index=0):
- # type: (int) -> Text
+ def peek(self, index: int = 0) -> Text:
try:
return self.buffer[self.pointer + index]
except IndexError:
self.update(index + 1)
return self.buffer[self.pointer + index]
- def prefix(self, length=1):
- # type: (int) -> Any
+ def prefix(self, length: int = 1) -> Any:
if self.pointer + length >= len(self.buffer):
self.update(length)
return self.buffer[self.pointer : self.pointer + length]
- def forward_1_1(self, length=1):
- # type: (int) -> None
+ def forward_1_1(self, length: int = 1) -> None:
if self.pointer + length + 1 >= len(self.buffer):
self.update(length + 1)
while length != 0:
@@ -163,8 +145,7 @@ class Reader:
self.column += 1
length -= 1
- def forward(self, length=1):
- # type: (int) -> None
+ def forward(self, length: int = 1) -> None:
if self.pointer + length + 1 >= len(self.buffer):
self.update(length + 1)
while length != 0:
@@ -178,8 +159,7 @@ class Reader:
self.column += 1
length -= 1
- def get_mark(self):
- # type: () -> Any
+ def get_mark(self) -> Any:
if self.stream is None:
return StringMark(
self.name, self.index, self.line, self.column, self.buffer, self.pointer
@@ -187,8 +167,7 @@ class Reader:
else:
return FileMark(self.name, self.index, self.line, self.column)
- def determine_encoding(self):
- # type: () -> None
+ def determine_encoding(self) -> None:
while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2):
self.update_raw()
if isinstance(self.raw_buffer, bytes):
@@ -210,8 +189,7 @@ class Reader:
_printable_ascii = ('\x09\x0A\x0D' + "".join(map(chr, range(0x20, 0x7F)))).encode('ascii')
@classmethod
- def _get_non_printable_ascii(cls, data): # type: ignore
- # type: (Text, bytes) -> Optional[Tuple[int, Text]]
+ def _get_non_printable_ascii(cls: Text, data: bytes) -> Optional[Tuple[int, Text]]: # type: ignore # NOQA
ascii_bytes = data.encode('ascii') # type: ignore
non_printables = ascii_bytes.translate(None, cls._printable_ascii) # type: ignore
if not non_printables:
@@ -220,23 +198,20 @@ class Reader:
return ascii_bytes.index(non_printable), non_printable.decode('ascii')
@classmethod
- def _get_non_printable_regex(cls, data):
- # type: (Text) -> Optional[Tuple[int, Text]]
+ def _get_non_printable_regex(cls, data: Text) -> Optional[Tuple[int, Text]]:
match = cls.NON_PRINTABLE.search(data)
if not bool(match):
return None
return match.start(), match.group()
@classmethod
- def _get_non_printable(cls, data):
- # type: (Text) -> Optional[Tuple[int, Text]]
+ def _get_non_printable(cls, data: Text) -> Optional[Tuple[int, Text]]:
try:
return cls._get_non_printable_ascii(data) # type: ignore
except UnicodeEncodeError:
return cls._get_non_printable_regex(data)
- def check_printable(self, data):
- # type: (Any) -> None
+ def check_printable(self, data: Any) -> None:
non_printable_match = self._get_non_printable(data)
if non_printable_match is not None:
start, character = non_printable_match
@@ -249,8 +224,7 @@ class Reader:
'special characters are not allowed',
)
- def update(self, length):
- # type: (int) -> None
+ def update(self, length: int) -> None:
if self.raw_buffer is None:
return
self.buffer = self.buffer[self.pointer :]
@@ -281,8 +255,7 @@ class Reader:
self.raw_buffer = None
break
- def update_raw(self, size=None):
- # type: (Optional[int]) -> None
+ def update_raw(self, size: Optional[int] = None) -> None:
if size is None:
size = 4096
data = self.stream.read(size)