summaryrefslogtreecommitdiff
path: root/emitter.py
diff options
context:
space:
mode:
authorAnthon van der Neut <anthon@mnt.org>2023-05-01 19:13:50 +0200
committerAnthon van der Neut <anthon@mnt.org>2023-05-01 19:13:50 +0200
commit8b731994b1543d7886af85f926d9eea5a22d0732 (patch)
tree3553d4cbc80b541484d7a3f39e00cdcfd8f9d030 /emitter.py
parent45111ba0b67e8619265d89f3202635e62c13cde6 (diff)
downloadruamel.yaml-8b731994b1543d7886af85f926d9eea5a22d0732.tar.gz
retrofitted 0.18 changes
Diffstat (limited to 'emitter.py')
-rw-r--r--emitter.py426
1 files changed, 176 insertions, 250 deletions
diff --git a/emitter.py b/emitter.py
index f9611ee..72ea661 100644
--- a/emitter.py
+++ b/emitter.py
@@ -12,13 +12,13 @@ from ruamel.yaml.error import YAMLError, YAMLStreamError
from ruamel.yaml.events import * # NOQA
# fmt: off
-from ruamel.yaml.compat import _F, nprint, dbg, DBG_EVENT, \
+from ruamel.yaml.compat import nprint, dbg, DBG_EVENT, \
check_anchorname_char, nprintf # NOQA
# fmt: on
-if False: # MYPY
- from typing import Any, Dict, List, Union, Text, Tuple, Optional # NOQA
- from ruamel.yaml.compat import StreamType # NOQA
+
+from typing import Any, Dict, List, Union, Text, Tuple, Optional # NOQA
+from ruamel.yaml.compat import StreamType # NOQA
__all__ = ['Emitter', 'EmitterError']
@@ -30,16 +30,15 @@ class EmitterError(YAMLError):
class ScalarAnalysis:
def __init__(
self,
- scalar,
- empty,
- multiline,
- allow_flow_plain,
- allow_block_plain,
- allow_single_quoted,
- allow_double_quoted,
- allow_block,
- ):
- # type: (Any, Any, Any, bool, bool, bool, bool, bool) -> None
+ scalar: Any,
+ empty: Any,
+ multiline: Any,
+ allow_flow_plain: bool,
+ allow_block_plain: bool,
+ allow_single_quoted: bool,
+ allow_double_quoted: bool,
+ allow_block: bool,
+ ) -> None:
self.scalar = scalar
self.empty = empty
self.multiline = multiline
@@ -52,20 +51,16 @@ class ScalarAnalysis:
class Indents:
# replacement for the list based stack of None/int
- def __init__(self):
- # type: () -> None
- self.values = [] # type: List[Tuple[Any, bool]]
+ def __init__(self) -> None:
+ self.values: List[Tuple[Any, bool]] = []
- def append(self, val, seq):
- # type: (Any, Any) -> None
+ def append(self, val: Any, seq: Any) -> None:
self.values.append((val, seq))
- def pop(self):
- # type: () -> Any
+ def pop(self) -> Any:
return self.values.pop()[0]
- def last_seq(self):
- # type: () -> bool
+ def last_seq(self) -> bool:
# return the seq(uence) value for the element added before the last one
# in increase_indent()
try:
@@ -73,8 +68,9 @@ class Indents:
except IndexError:
return False
- def seq_flow_align(self, seq_indent, column, pre_comment=False):
- # type: (int, int, Optional[bool]) -> int
+ def seq_flow_align(
+ self, seq_indent: int, column: int, pre_comment: Optional[bool] = False
+ ) -> int:
# extra spaces because of dash
# nprint('seq_flow_align', self.values, pre_comment)
if len(self.values) < 2 or not self.values[-1][1]:
@@ -87,8 +83,7 @@ class Indents:
# -1 for the dash
return base + seq_indent - column - 1 # type: ignore
- def __len__(self):
- # type: () -> int
+ def __len__(self) -> int:
return len(self.values)
@@ -104,44 +99,44 @@ class Emitter:
def __init__(
self,
- stream,
- canonical=None,
- indent=None,
- width=None,
- allow_unicode=None,
- line_break=None,
- block_seq_indent=None,
- top_level_colon_align=None,
- prefix_colon=None,
- brace_single_entry_mapping_in_flow_sequence=None,
- dumper=None,
- ):
- # type: (StreamType, Any, Optional[int], Optional[int], Optional[bool], Any, Optional[int], Optional[bool], Any, Optional[bool], Any) -> None # NOQA
+ stream: StreamType,
+ canonical: Any = None,
+ indent: Optional[int] = None,
+ width: Optional[int] = None,
+ allow_unicode: Optional[bool] = None,
+ line_break: Any = None,
+ block_seq_indent: Optional[int] = None,
+ top_level_colon_align: Optional[bool] = None,
+ prefix_colon: Any = None,
+ brace_single_entry_mapping_in_flow_sequence: Optional[bool] = None,
+ dumper: Any = None,
+ ) -> None:
+ # NOQA
self.dumper = dumper
if self.dumper is not None and getattr(self.dumper, '_emitter', None) is None:
self.dumper._emitter = self
self.stream = stream
# Encoding can be overriden by STREAM-START.
- self.encoding = None # type: Optional[Text]
+ self.encoding: Optional[Text] = None
self.allow_space_break = None
# Emitter is a state machine with a stack of states to handle nested
# structures.
- self.states = [] # type: List[Any]
- self.state = self.expect_stream_start # type: Any
+ self.states: List[Any] = []
+ self.state: Any = self.expect_stream_start
# Current event and the event queue.
- self.events = [] # type: List[Any]
- self.event = None # type: Any
+ self.events: List[Any] = []
+ self.event: Any = None
# The current indentation level and the stack of previous indents.
self.indents = Indents()
- self.indent = None # type: Optional[int]
+ self.indent: Optional[int] = None
# flow_context is an expanding/shrinking list consisting of '{' and '['
# for each unclosed flow context. If empty list that means block context
- self.flow_context = [] # type: List[Text]
+ self.flow_context: List[Text] = []
# Contexts.
self.root_context = False
@@ -161,7 +156,7 @@ class Emitter:
self.compact_seq_seq = True # dash after dash
self.compact_seq_map = True # key after dash
# self.compact_ms = False # dash after key, only when excplicit key with ?
- self.no_newline = None # type: Optional[bool] # set if directly after `- `
+ self.no_newline: Optional[bool] = None # set if directly after `- `
# Whether the document requires an explicit document end indicator
self.open_ended = False
@@ -191,36 +186,34 @@ class Emitter:
self.best_width = 80
if width and width > self.best_sequence_indent * 2:
self.best_width = width
- self.best_line_break = '\n' # type: Any
+ self.best_line_break: Any = '\n'
if line_break in ['\r', '\n', '\r\n']:
self.best_line_break = line_break
# Tag prefixes.
- self.tag_prefixes = None # type: Any
+ self.tag_prefixes: Any = None
# Prepared anchor and tag.
- self.prepared_anchor = None # type: Any
- self.prepared_tag = None # type: Any
+ self.prepared_anchor: Any = None
+ self.prepared_tag: Any = None
# Scalar analysis and style.
- self.analysis = None # type: Any
- self.style = None # type: Any
+ self.analysis: Any = None
+ self.style: Any = None
self.scalar_after_indicator = True # write a scalar on the same line as `---`
self.alt_null = 'null'
@property
- def stream(self):
- # type: () -> Any
+ def stream(self) -> Any:
try:
return self._stream
except AttributeError:
raise YAMLStreamError('output stream needs to specified')
@stream.setter
- def stream(self, val):
- # type: (Any) -> None
+ def stream(self, val: Any) -> None:
if val is None:
return
if not hasattr(val, 'write'):
@@ -228,8 +221,7 @@ class Emitter:
self._stream = val
@property
- def serializer(self):
- # type: () -> Any
+ def serializer(self) -> Any:
try:
if hasattr(self.dumper, 'typ'):
return self.dumper.serializer
@@ -238,18 +230,15 @@ class Emitter:
return self # cyaml
@property
- def flow_level(self):
- # type: () -> int
+ def flow_level(self) -> int:
return len(self.flow_context)
- def dispose(self):
- # type: () -> None
+ def dispose(self) -> None:
# Reset the state attributes (to clear self-references)
self.states = []
self.state = None
- def emit(self, event):
- # type: (Any) -> None
+ def emit(self, event: Any) -> None:
if dbg(DBG_EVENT):
nprint(event)
self.events.append(event)
@@ -260,8 +249,7 @@ class Emitter:
# In some cases, we wait for a few next events before emitting.
- def need_more_events(self):
- # type: () -> bool
+ def need_more_events(self) -> bool:
if not self.events:
return True
event = self.events[0]
@@ -274,8 +262,7 @@ class Emitter:
else:
return False
- def need_events(self, count):
- # type: (int) -> bool
+ def need_events(self, count: int) -> bool:
level = 0
for event in self.events[1:]:
if isinstance(event, (DocumentStartEvent, CollectionStartEvent)):
@@ -288,8 +275,9 @@ class Emitter:
return False
return len(self.events) < count + 1
- def increase_indent(self, flow=False, sequence=None, indentless=False):
- # type: (bool, Optional[bool], bool) -> None
+ def increase_indent(
+ self, flow: bool = False, sequence: Optional[bool] = None, indentless: bool = False
+ ) -> None:
self.indents.append(self.indent, sequence)
if self.indent is None: # top level
if flow:
@@ -315,32 +303,24 @@ class Emitter:
# Stream handlers.
- def expect_stream_start(self):
- # type: () -> None
+ def expect_stream_start(self) -> None:
if isinstance(self.event, StreamStartEvent):
if self.event.encoding and not hasattr(self.stream, 'encoding'):
self.encoding = self.event.encoding
self.write_stream_start()
self.state = self.expect_first_document_start
else:
- raise EmitterError(
- _F('expected StreamStartEvent, but got {self_event!s}', self_event=self.event)
- )
+ raise EmitterError(f'expected StreamStartEvent, but got {self.event!s}')
- def expect_nothing(self):
- # type: () -> None
- raise EmitterError(
- _F('expected nothing, but got {self_event!s}', self_event=self.event)
- )
+ def expect_nothing(self) -> None:
+ raise EmitterError(f'expected nothing, but got {self.event!s}')
# Document handlers.
- def expect_first_document_start(self):
- # type: () -> Any
+ def expect_first_document_start(self) -> Any:
return self.expect_document_start(first=True)
- def expect_document_start(self, first=False):
- # type: (bool) -> None
+ def expect_document_start(self, first: bool = False) -> None:
if isinstance(self.event, DocumentStartEvent):
if (self.event.version or self.event.tags) and self.open_ended:
self.write_indicator('...', True)
@@ -378,15 +358,9 @@ class Emitter:
self.write_stream_end()
self.state = self.expect_nothing
else:
- raise EmitterError(
- _F(
- 'expected DocumentStartEvent, but got {self_event!s}',
- self_event=self.event,
- )
- )
+ raise EmitterError(f'expected DocumentStartEvent, but got {self.event!s}')
- def expect_document_end(self):
- # type: () -> None
+ def expect_document_end(self) -> None:
if isinstance(self.event, DocumentEndEvent):
self.write_indent()
if self.event.explicit:
@@ -395,19 +369,21 @@ class Emitter:
self.flush_stream()
self.state = self.expect_document_start
else:
- raise EmitterError(
- _F('expected DocumentEndEvent, but got {self_event!s}', self_event=self.event)
- )
+ raise EmitterError(f'expected DocumentEndEvent, but got {self.event!s}')
- def expect_document_root(self):
- # type: () -> None
+ def expect_document_root(self) -> None:
self.states.append(self.expect_document_end)
self.expect_node(root=True)
# Node handlers.
- def expect_node(self, root=False, sequence=False, mapping=False, simple_key=False):
- # type: (bool, bool, bool, bool) -> None
+ def expect_node(
+ self,
+ root: bool = False,
+ sequence: bool = False,
+ mapping: bool = False,
+ simple_key: bool = False,
+ ) -> None:
self.root_context = root
self.sequence_context = sequence # not used in PyYAML
force_flow_indent = False
@@ -472,24 +448,21 @@ class Emitter:
or self.event.flow_style
or self.check_empty_mapping()
):
- self.expect_flow_mapping(single=self.event.nr_items == 1,
- force_flow_indent=force_flow_indent)
+ self.expect_flow_mapping(
+ single=self.event.nr_items == 1, force_flow_indent=force_flow_indent
+ )
else:
self.expect_block_mapping()
else:
- raise EmitterError(
- _F('expected NodeEvent, but got {self_event!s}', self_event=self.event)
- )
+ raise EmitterError('expected NodeEvent, but got {self.event!s}')
- def expect_alias(self):
- # type: () -> None
+ def expect_alias(self) -> None:
if self.event.anchor is None:
raise EmitterError('anchor is not specified for alias')
self.process_anchor('*')
self.state = self.states.pop()
- def expect_scalar(self):
- # type: () -> None
+ def expect_scalar(self) -> None:
self.increase_indent(flow=True)
self.process_scalar()
self.indent = self.indents.pop()
@@ -497,20 +470,19 @@ class Emitter:
# Flow sequence handlers.
- def expect_flow_sequence(self, force_flow_indent=False):
- # type: (Optional[bool]) -> None
+ def expect_flow_sequence(self, force_flow_indent: Optional[bool] = False) -> None:
if force_flow_indent:
self.increase_indent(flow=True, sequence=True)
- ind = self.indents.seq_flow_align(self.best_sequence_indent, self.column,
- force_flow_indent)
+ ind = self.indents.seq_flow_align(
+ self.best_sequence_indent, self.column, force_flow_indent
+ )
self.write_indicator(' ' * ind + '[', True, whitespace=True)
if not force_flow_indent:
self.increase_indent(flow=True, sequence=True)
self.flow_context.append('[')
self.state = self.expect_first_flow_sequence_item
- def expect_first_flow_sequence_item(self):
- # type: () -> None
+ def expect_first_flow_sequence_item(self) -> None:
if isinstance(self.event, SequenceEndEvent):
self.indent = self.indents.pop()
popped = self.flow_context.pop()
@@ -528,8 +500,7 @@ class Emitter:
self.states.append(self.expect_flow_sequence_item)
self.expect_node(sequence=True)
- def expect_flow_sequence_item(self):
- # type: () -> None
+ def expect_flow_sequence_item(self) -> None:
if isinstance(self.event, SequenceEndEvent):
self.indent = self.indents.pop()
popped = self.flow_context.pop()
@@ -553,12 +524,14 @@ class Emitter:
# Flow mapping handlers.
- def expect_flow_mapping(self, single=False, force_flow_indent=False):
- # type: (Optional[bool], Optional[bool]) -> None
+ def expect_flow_mapping(
+ self, single: Optional[bool] = False, force_flow_indent: Optional[bool] = False
+ ) -> None:
if force_flow_indent:
self.increase_indent(flow=True, sequence=False)
- ind = self.indents.seq_flow_align(self.best_sequence_indent, self.column,
- force_flow_indent)
+ ind = self.indents.seq_flow_align(
+ self.best_sequence_indent, self.column, force_flow_indent
+ )
map_init = '{'
if (
single
@@ -575,8 +548,7 @@ class Emitter:
self.increase_indent(flow=True, sequence=False)
self.state = self.expect_first_flow_mapping_key
- def expect_first_flow_mapping_key(self):
- # type: () -> None
+ def expect_first_flow_mapping_key(self) -> None:
if isinstance(self.event, MappingEndEvent):
self.indent = self.indents.pop()
popped = self.flow_context.pop()
@@ -599,8 +571,7 @@ class Emitter:
self.states.append(self.expect_flow_mapping_value)
self.expect_node(mapping=True)
- def expect_flow_mapping_key(self):
- # type: () -> None
+ def expect_flow_mapping_key(self) -> None:
if isinstance(self.event, MappingEndEvent):
# if self.event.comment and self.event.comment[1]:
# self.write_pre_comment(self.event)
@@ -630,14 +601,12 @@ class Emitter:
self.states.append(self.expect_flow_mapping_value)
self.expect_node(mapping=True)
- def expect_flow_mapping_simple_value(self):
- # type: () -> None
+ def expect_flow_mapping_simple_value(self) -> None:
self.write_indicator(self.prefixed_colon, False)
self.states.append(self.expect_flow_mapping_key)
self.expect_node(mapping=True)
- def expect_flow_mapping_value(self):
- # type: () -> None
+ def expect_flow_mapping_value(self) -> None:
if self.canonical or self.column > self.best_width:
self.write_indent()
self.write_indicator(self.prefixed_colon, True)
@@ -646,8 +615,7 @@ class Emitter:
# Block sequence handlers.
- def expect_block_sequence(self):
- # type: () -> None
+ def expect_block_sequence(self) -> None:
if self.mapping_context:
indentless = not self.indention
else:
@@ -657,12 +625,10 @@ class Emitter:
self.increase_indent(flow=False, sequence=True, indentless=indentless)
self.state = self.expect_first_block_sequence_item
- def expect_first_block_sequence_item(self):
- # type: () -> Any
+ def expect_first_block_sequence_item(self) -> Any:
return self.expect_block_sequence_item(first=True)
- def expect_block_sequence_item(self, first=False):
- # type: (bool) -> None
+ def expect_block_sequence_item(self, first: bool = False) -> None:
if not first and isinstance(self.event, SequenceEndEvent):
if self.event.comment and self.event.comment[1]:
# final comments on a block list e.g. empty line
@@ -684,19 +650,16 @@ class Emitter:
# Block mapping handlers.
- def expect_block_mapping(self):
- # type: () -> None
+ def expect_block_mapping(self) -> None:
if not self.mapping_context and not (self.compact_seq_map or self.column == 0):
self.write_line_break()
self.increase_indent(flow=False, sequence=False)
self.state = self.expect_first_block_mapping_key
- def expect_first_block_mapping_key(self):
- # type: () -> None
+ def expect_first_block_mapping_key(self) -> None:
return self.expect_block_mapping_key(first=True)
- def expect_block_mapping_key(self, first=False):
- # type: (Any) -> None
+ def expect_block_mapping_key(self, first: Any = False) -> None:
if not first and isinstance(self.event, MappingEndEvent):
if self.event.comment and self.event.comment[1]:
# final comments from a doc
@@ -727,8 +690,7 @@ class Emitter:
self.states.append(self.expect_block_mapping_value)
self.expect_node(mapping=True)
- def expect_block_mapping_simple_value(self):
- # type: () -> None
+ def expect_block_mapping_simple_value(self) -> None:
if getattr(self.event, 'style', None) != '?':
# prefix = ''
if self.indent == 0 and self.top_level_colon_align is not None:
@@ -740,8 +702,7 @@ class Emitter:
self.states.append(self.expect_block_mapping_key)
self.expect_node(mapping=True)
- def expect_block_mapping_value(self):
- # type: () -> None
+ def expect_block_mapping_value(self) -> None:
self.write_indent()
self.write_indicator(self.prefixed_colon, True, indention=True)
self.states.append(self.expect_block_mapping_key)
@@ -749,24 +710,21 @@ class Emitter:
# Checkers.
- def check_empty_sequence(self):
- # type: () -> bool
+ def check_empty_sequence(self) -> bool:
return (
isinstance(self.event, SequenceStartEvent)
and bool(self.events)
and isinstance(self.events[0], SequenceEndEvent)
)
- def check_empty_mapping(self):
- # type: () -> bool
+ def check_empty_mapping(self) -> bool:
return (
isinstance(self.event, MappingStartEvent)
and bool(self.events)
and isinstance(self.events[0], MappingEndEvent)
)
- def check_empty_document(self):
- # type: () -> bool
+ def check_empty_document(self) -> bool:
if not isinstance(self.event, DocumentStartEvent) or not self.events:
return False
event = self.events[0]
@@ -778,8 +736,7 @@ class Emitter:
and event.value == ""
)
- def check_simple_key(self):
- # type: () -> bool
+ def check_simple_key(self) -> bool:
length = 0
if isinstance(self.event, NodeEvent) and self.event.anchor is not None:
if self.prepared_anchor is None:
@@ -812,8 +769,7 @@ class Emitter:
# Anchor, Tag, and Scalar processors.
- def process_anchor(self, indicator):
- # type: (Any) -> bool
+ def process_anchor(self, indicator: Any) -> bool:
if self.event.anchor is None:
self.prepared_anchor = None
return False
@@ -826,8 +782,7 @@ class Emitter:
self.prepared_anchor = None
return True
- def process_tag(self):
- # type: () -> None
+ def process_tag(self) -> None:
tag = self.event.tag
if isinstance(self.event, ScalarEvent):
if self.style is None:
@@ -868,8 +823,7 @@ class Emitter:
self.no_newline = True
self.prepared_tag = None
- def choose_scalar_style(self):
- # type: () -> Any
+ def choose_scalar_style(self) -> Any:
if self.analysis is None:
self.analysis = self.analyze_scalar(self.event.value)
if self.event.style == '"' or self.canonical:
@@ -903,8 +857,7 @@ class Emitter:
return "'"
return '"'
- def process_scalar(self):
- # type: () -> None
+ def process_scalar(self) -> None:
if self.analysis is None:
self.analysis = self.analyze_scalar(self.event.value)
if self.style is None:
@@ -921,7 +874,11 @@ class Emitter:
elif self.style == "'":
self.write_single_quoted(self.analysis.scalar, split)
elif self.style == '>':
- self.write_folded(self.analysis.scalar)
+ try:
+ cmx = self.event.comment[1][0]
+ except (IndexError, TypeError):
+ cmx = ""
+ self.write_folded(self.analysis.scalar, cmx)
if (
self.event.comment
and self.event.comment[0]
@@ -952,39 +909,26 @@ class Emitter:
# Analyzers.
- def prepare_version(self, version):
- # type: (Any) -> Any
+ def prepare_version(self, version: Any) -> Any:
major, minor = version
if major != 1:
- raise EmitterError(
- _F('unsupported YAML version: {major:d}.{minor:d}', major=major, minor=minor)
- )
- return _F('{major:d}.{minor:d}', major=major, minor=minor)
+ raise EmitterError(f'unsupported YAML version: {major:d}.{minor:d}')
+ return f'{major:d}.{minor:d}'
- def prepare_tag_handle(self, handle):
- # type: (Any) -> Any
+ def prepare_tag_handle(self, handle: Any) -> Any:
if not handle:
raise EmitterError('tag handle must not be empty')
if handle[0] != '!' or handle[-1] != '!':
- raise EmitterError(
- _F("tag handle must start and end with '!': {handle!r}", handle=handle)
- )
+ raise EmitterError(f"tag handle must start and end with '!': {handle!r}")
for ch in handle[1:-1]:
if not ('0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' or ch in '-_'):
- raise EmitterError(
- _F(
- 'invalid character {ch!r} in the tag handle: {handle!r}',
- ch=ch,
- handle=handle,
- )
- )
+ raise EmitterError(f'invalid character {ch!r} in the tag handle: {handle!r}')
return handle
- def prepare_tag_prefix(self, prefix):
- # type: (Any) -> Any
+ def prepare_tag_prefix(self, prefix: Any) -> Any:
if not prefix:
raise EmitterError('tag prefix must not be empty')
- chunks = [] # type: List[Any]
+ chunks: List[Any] = []
start = end = 0
if prefix[0] == '!':
end = 1
@@ -1003,13 +947,12 @@ class Emitter:
start = end = end + 1
data = ch
for ch in data:
- chunks.append(_F('%{ord_ch:02X}', ord_ch=ord(ch)))
+ chunks.append(f'%{ord(ch):02X}')
if start < end:
chunks.append(prefix[start:end])
return "".join(chunks)
- def prepare_tag(self, tag):
- # type: (Any) -> Any
+ def prepare_tag(self, tag: Any) -> Any:
if not tag:
raise EmitterError('tag must not be empty')
if tag == '!':
@@ -1021,7 +964,7 @@ class Emitter:
if tag.startswith(prefix) and (prefix == '!' or len(prefix) < len(tag)):
handle = self.tag_prefixes[prefix]
suffix = tag[len(prefix) :]
- chunks = [] # type: List[Any]
+ chunks: List[Any] = []
start = end = 0
ch_set = "-;/?:@&=+$,_.~*'()[]"
if self.dumper:
@@ -1044,32 +987,24 @@ class Emitter:
start = end = end + 1
data = ch
for ch in data:
- chunks.append(_F('%{ord_ch:02X}', ord_ch=ord(ch)))
+ chunks.append(f'%{ord(ch):02X}')
if start < end:
chunks.append(suffix[start:end])
suffix_text = "".join(chunks)
if handle:
- return _F('{handle!s}{suffix_text!s}', handle=handle, suffix_text=suffix_text)
+ return f'{handle!s}{suffix_text!s}'
else:
- return _F('!<{suffix_text!s}>', suffix_text=suffix_text)
+ return f'!<{suffix_text!s}>'
- def prepare_anchor(self, anchor):
- # type: (Any) -> Any
+ def prepare_anchor(self, anchor: Any) -> Any:
if not anchor:
raise EmitterError('anchor must not be empty')
for ch in anchor:
if not check_anchorname_char(ch):
- raise EmitterError(
- _F(
- 'invalid character {ch!r} in the anchor: {anchor!r}',
- ch=ch,
- anchor=anchor,
- )
- )
+ raise EmitterError(f'invalid character {ch!r} in the anchor: {anchor!r}')
return anchor
- def analyze_scalar(self, scalar):
- # type: (Any) -> Any
+ def analyze_scalar(self, scalar: Any) -> Any:
# Empty scalar is a special case.
if not scalar:
return ScalarAnalysis(
@@ -1249,23 +1184,25 @@ class Emitter:
# Writers.
- def flush_stream(self):
- # type: () -> None
+ def flush_stream(self) -> None:
if hasattr(self.stream, 'flush'):
self.stream.flush()
- def write_stream_start(self):
- # type: () -> None
+ def write_stream_start(self) -> None:
# Write BOM if needed.
if self.encoding and self.encoding.startswith('utf-16'):
self.stream.write('\uFEFF'.encode(self.encoding))
- def write_stream_end(self):
- # type: () -> None
+ def write_stream_end(self) -> None:
self.flush_stream()
- def write_indicator(self, indicator, need_whitespace, whitespace=False, indention=False):
- # type: (Any, Any, bool, bool) -> None
+ def write_indicator(
+ self,
+ indicator: Any,
+ need_whitespace: Any,
+ whitespace: bool = False,
+ indention: bool = False,
+ ) -> None:
if self.whitespace or not need_whitespace:
data = indicator
else:
@@ -1278,8 +1215,7 @@ class Emitter:
data = data.encode(self.encoding)
self.stream.write(data)
- def write_indent(self):
- # type: () -> None
+ def write_indent(self) -> None:
indent = self.indent or 0
if (
not self.indention
@@ -1298,8 +1234,7 @@ class Emitter:
data = data.encode(self.encoding) # type: ignore
self.stream.write(data)
- def write_line_break(self, data=None):
- # type: (Any) -> None
+ def write_line_break(self, data: Any = None) -> None:
if data is None:
data = self.best_line_break
self.whitespace = True
@@ -1310,21 +1245,15 @@ class Emitter:
data = data.encode(self.encoding)
self.stream.write(data)
- def write_version_directive(self, version_text):
- # type: (Any) -> None
- data = _F('%YAML {version_text!s}', version_text=version_text)
+ def write_version_directive(self, version_text: Any) -> None:
+ data: Any = f'%YAML {version_text!s}'
if self.encoding:
data = data.encode(self.encoding)
self.stream.write(data)
self.write_line_break()
- def write_tag_directive(self, handle_text, prefix_text):
- # type: (Any, Any) -> None
- data = _F(
- '%TAG {handle_text!s} {prefix_text!s}',
- handle_text=handle_text,
- prefix_text=prefix_text,
- )
+ def write_tag_directive(self, handle_text: Any, prefix_text: Any) -> None:
+ data: Any = f'%TAG {handle_text!s} {prefix_text!s}'
if self.encoding:
data = data.encode(self.encoding)
self.stream.write(data)
@@ -1332,8 +1261,7 @@ class Emitter:
# Scalar streams.
- def write_single_quoted(self, text, split=True):
- # type: (Any, Any) -> None
+ def write_single_quoted(self, text: Any, split: Any = True) -> None:
if self.root_context:
if self.requested_indent is not None:
self.write_line_break()
@@ -1415,8 +1343,7 @@ class Emitter:
'\u2029': 'P',
}
- def write_double_quoted(self, text, split=True):
- # type: (Any, Any) -> None
+ def write_double_quoted(self, text: Any, split: Any = True) -> None:
if self.root_context:
if self.requested_indent is not None:
self.write_line_break()
@@ -1450,11 +1377,11 @@ class Emitter:
if ch in self.ESCAPE_REPLACEMENTS:
data = '\\' + self.ESCAPE_REPLACEMENTS[ch]
elif ch <= '\xFF':
- data = _F('\\x{ord_ch:02X}', ord_ch=ord(ch))
+ data = f'\\x{ord(ch):02X}'
elif ch <= '\uFFFF':
- data = _F('\\u{ord_ch:04X}', ord_ch=ord(ch))
+ data = f'\\u{ord(ch):04X}'
else:
- data = _F('\\U{ord_ch:08X}', ord_ch=ord(ch))
+ data = f'\\U{ord(ch):08X}'
self.column += len(data)
if bool(self.encoding):
data = data.encode(self.encoding)
@@ -1485,14 +1412,13 @@ class Emitter:
end += 1
self.write_indicator('"', False)
- def determine_block_hints(self, text):
- # type: (Any) -> Any
+ def determine_block_hints(self, text: Any) -> Any:
indent = 0
indicator = ''
hints = ''
if text:
if text[0] in ' \n\x85\u2028\u2029':
- indent = self.best_sequence_indent
+ indent = 2
hints += str(indent)
elif self.root_context:
for end in ['\n---', '\n...']:
@@ -1510,7 +1436,7 @@ class Emitter:
if pos > -1:
break
if pos > 0:
- indent = self.best_sequence_indent
+ indent = 2
if text[-1] not in '\n\x85\u2028\u2029':
indicator = '-'
elif len(text) == 1 or text[-2] in '\n\x85\u2028\u2029':
@@ -1518,10 +1444,11 @@ class Emitter:
hints += indicator
return hints, indent, indicator
- def write_folded(self, text):
- # type: (Any) -> None
+ def write_folded(self, text: Any, comment: Any) -> None:
hints, _indent, _indicator = self.determine_block_hints(text)
- self.write_indicator('>' + hints, True)
+ if not isinstance(comment, str):
+ comment = ''
+ self.write_indicator('>' + hints + comment, True)
if _indicator == '+':
self.open_ended = True
self.write_line_break()
@@ -1584,8 +1511,7 @@ class Emitter:
spaces = ch == ' '
end += 1
- def write_literal(self, text, comment=None):
- # type: (Any, Any) -> None
+ def write_literal(self, text: Any, comment: Any = None) -> None:
hints, _indent, _indicator = self.determine_block_hints(text)
# if comment is not None:
# try:
@@ -1638,8 +1564,7 @@ class Emitter:
breaks = ch in '\n\x85\u2028\u2029'
end += 1
- def write_plain(self, text, split=True):
- # type: (Any, Any) -> None
+ def write_plain(self, text: Any, split: Any = True) -> None:
if self.root_context:
if self.requested_indent is not None:
self.write_line_break()
@@ -1693,6 +1618,10 @@ class Emitter:
else:
if ch is None or ch in ' \n\x85\u2028\u2029':
data = text[start:end]
+ if len(data) > self.best_width and \
+ self.column > self.indent: # type: ignore
+ # words longer than line length get a line of their own
+ self.write_indent()
self.column += len(data)
if self.encoding:
data = data.encode(self.encoding) # type: ignore
@@ -1707,10 +1636,9 @@ class Emitter:
breaks = ch in '\n\x85\u2028\u2029'
end += 1
- def write_comment(self, comment, pre=False):
- # type: (Any, bool) -> None
+ def write_comment(self, comment: Any, pre: bool = False) -> None:
value = comment.value
- # nprintf('{:02d} {:02d} {!r}'.format(self.column, comment.start_mark.column, value))
+ # nprintf(f'{self.column:02d} {comment.start_mark.column:02d} {value!r}')
if not pre and value[-1] == '\n':
value = value[:-1]
try:
@@ -1743,8 +1671,7 @@ class Emitter:
if not pre:
self.write_line_break()
- def write_pre_comment(self, event):
- # type: (Any) -> bool
+ def write_pre_comment(self, event: Any) -> bool:
comments = event.comment[1]
if comments is None:
return False
@@ -1759,12 +1686,11 @@ class Emitter:
if isinstance(event, start_events):
comment.pre_done = True
except TypeError:
- sys.stdout.write('eventtt {} {}'.format(type(event), event))
+ sys.stdout.write(f'eventtt {type(event)} {event}')
raise
return True
- def write_post_comment(self, event):
- # type: (Any) -> bool
+ def write_post_comment(self, event: Any) -> bool:
if self.event.comment[0] is None:
return False
comment = event.comment[0]