diff options
Diffstat (limited to 'lib/yaml')
-rw-r--r-- | lib/yaml/emitter.py | 328 |
1 files changed, 266 insertions, 62 deletions
diff --git a/lib/yaml/emitter.py b/lib/yaml/emitter.py index d2b372f..b4298b8 100644 --- a/lib/yaml/emitter.py +++ b/lib/yaml/emitter.py @@ -20,13 +20,26 @@ class Emitter: self.writer = writer self.states = [] self.state = self.expect_stream_start - self.levels = [] - self.level = 0 - self.soft_space = False + self.indents = [] + self.indent = None + self.flow_level = 0 + self.key_context = False + self.space = True + self.line = True + self.allow_inline_collection = False + self.allow_indentless_sequence = False + self.simple_key = False + self.event_queue = [] def emit(self, event): + if self.event_queue: + self.event_queue.append(event) + event = self.event_queue.pop(0) self.state(event) + def push_back(self, event): + self.event_queue.insert(0, event) + def expect_stream_start(self, event): if isinstance(event, StreamStartEvent): self.state = self.expect_document_start @@ -38,6 +51,8 @@ class Emitter: self.write_document_start() self.states.append(self.expect_document_end) self.state = self.expect_root_node + self.allow_inline_collection = False + self.allow_indentless_sequence = False elif isinstance(event, StreamEndEvent): self.writer.flush() self.state = self.expect_nothing @@ -48,6 +63,8 @@ class Emitter: if isinstance(event, DocumentEndEvent): self.write_document_end() self.state = self.expect_document_start + self.allow_inline_collection = False + self.allow_indentless_sequence = False else: raiseEmitterError("expected DocumentEndEvent, but got %s" % event.__class__.__name__) @@ -55,126 +72,313 @@ class Emitter: self.expect_node(event) def expect_node(self, event): + empty = None + if isinstance(event, (SequenceEvent, MappingEvent)): + if not self.event_queue: + return self.push_back(event) + empty = isinstance(self.event_queue[0], CollectionEndEvent) if isinstance(event, AliasEvent): - self.write_anchor("*", event.anchor) - self.state = self.states.pop() - elif isinstance(event, NodeEvent): + self.expect_alias(event) + elif isinstance(event, (ScalarEvent, SequenceEvent, MappingEvent)): if event.anchor: self.write_anchor("&", event.anchor) - if event.tag: + self.allow_inline_collection = False + if event.tag not in [None, u'!']: self.write_tag(event.tag) + self.allow_inline_collection = False if isinstance(event, ScalarEvent): - self.write_scalar(event.value) - self.state = self.states.pop() + self.expect_scalar(event) elif isinstance(event, SequenceEvent): - self.write_collection_start("[") - self.level += 1 - self.state = self.expect_first_sequence_item + self.expect_sequence(event, empty) elif isinstance(event, MappingEvent): - self.write_collection_start("{") - self.level += 1 - self.state = self.expect_first_mapping_key + self.expect_mapping(event, empty) else: raise EmitterError("Expected NodeEvent, but got %s" % event.__class__.__name__) - def expect_first_sequence_item(self, event): + def expect_alias(self, event): + self.write_anchor("*", event.anchor) + self.state = self.states.pop() + + def expect_scalar(self, event): + self.indents.append(self.indent) + if self.indent is None: + self.indent = 2 + else: + self.indent += 2 + self.write_scalar(event.value, event.implicit, event.style) + self.indent = self.indents.pop() + self.state = self.states.pop() + self.allow_inline_collection = False + self.allow_indentless_sequence = False + + def expect_sequence(self, event, empty): + if self.flow_level or event.flow or empty: + self.write_indicator("[", need_space=True, provide_space=True) + self.flow_level += 1 + self.indents.append(self.indent) + if self.indent is None: + self.indent = 2 + else: + self.indent += 2 + self.state = self.expect_first_flow_sequence_item + else: + self.indents.append(self.indent) + if self.indent is None: + self.indent = 0 + else: + self.indent += 2 + self.state = self.expect_first_block_sequence_item + + def expect_mapping(self, event, empty): + if self.flow_level or event.flow or empty: + self.write_indicator("{", need_space=True, provide_space=True) + self.flow_level += 1 + self.indents.append(self.indent) + if self.indent is None: + self.indent = 2 + else: + self.indent += 2 + self.state = self.expect_first_flow_mapping_key + else: + self.indents.append(self.indent) + if self.indent is None: + self.indent = 0 + else: + self.indent += 2 + self.state = self.expect_first_block_mapping_key + + def expect_first_flow_sequence_item(self, event): if isinstance(event, CollectionEndEvent): - self.write_collection_end("]") + self.indent = self.indents.pop() + self.write_indicator("]", provide_space=True) + self.flow_level -= 1 self.state = self.states.pop() else: self.write_indent() - self.states.append(self.expect_sequence_item) + self.states.append(self.expect_flow_sequence_item) + self.state = self.expect_node self.expect_node(event) - def expect_sequence_item(self, event): + def expect_flow_sequence_item(self, event): if isinstance(event, CollectionEndEvent): - self.level -= 1 + self.write_indicator(",") + self.indent = self.indents.pop() self.write_indent() - self.write_collection_end("]") + self.write_indicator("]") + self.flow_level -= 1 self.state = self.states.pop() else: self.write_indicator(",") self.write_indent() - self.states.append(self.expect_sequence_item) + self.states.append(self.expect_flow_sequence_item) + self.state = self.expect_node self.expect_node(event) - - def expect_first_mapping_key(self, event): + + def expect_first_block_sequence_item(self, event): + assert not isinstance(event, CollectionEndEvent) + if not self.allow_inline_collection: + if self.allow_indentless_sequence: + self.indent = self.indents.pop() + self.indents.append(self.indent) + self.write_indent() + self.write_indicator("-", need_space=True) + self.allow_indentless_sequence = False + self.allow_inline_collection = True + self.states.append(self.expect_block_sequence_item) + self.state = self.expect_node + self.expect_node(event) + + def expect_block_sequence_item(self, event): if isinstance(event, CollectionEndEvent): - self.write_collection_end("}") + self.indent = self.indents.pop() self.state = self.states.pop() else: self.write_indent() - self.write_indicator("?") - self.states.append(self.expect_mapping_value) + self.write_indicator("-", need_space=True) + self.allow_indentless_sequence = False + self.allow_inline_collection = True + self.states.append(self.expect_block_sequence_item) + self.state = self.expect_node self.expect_node(event) - def expect_mapping_key(self, event): + def expect_first_flow_mapping_key(self, event): if isinstance(event, CollectionEndEvent): - self.level -= 1 + self.indent = self.indents.pop() + self.write_indicator("}") + self.flow_level -= 1 + self.state = self.states.pop() + else: self.write_indent() - self.write_collection_end("}") + if self.is_simple(event): + self.simple_key = True + else: + self.write_indicator("?", need_space=True) + self.states.append(self.expect_flow_mapping_value) + self.state = self.expect_node + self.expect_node(event) + + def expect_flow_mapping_key(self, event): + if isinstance(event, CollectionEndEvent): + self.indent = self.indents.pop() + self.write_indent() + self.write_indicator("}") + self.flow_level -= 1 self.state = self.states.pop() else: self.write_indicator(",") self.write_indent() - self.write_indicator("?") - self.states.append(self.expect_mapping_value) + if self.is_simple(event): + self.simple_key = True + else: + self.write_indicator("?", need_space=True) + self.states.append(self.expect_flow_mapping_value) + self.state = self.expect_node self.expect_node(event) - def expect_mapping_value(self, event): - self.write_indent() - self.write_indicator(":") - self.states.append(self.expect_mapping_key) + def expect_flow_mapping_value(self, event): + if self.simple_key: + self.write_indicator(":", need_space=False) + self.simple_key = False + else: + self.write_indent() + self.write_indicator(":", need_space=True) + self.states.append(self.expect_flow_mapping_key) + self.state = self.expect_node + self.expect_node(event) + + def expect_first_block_mapping_key(self, event): + assert not isinstance(event, CollectionEndEvent) + simple = self.is_simple(event) + if simple is None: + return self.push_back(event) + if not self.allow_inline_collection: + self.write_indent() + if self.is_simple(event): + self.allow_indentless_sequence = True + self.allow_inline_collection = False + self.simple_key = True + else: + self.write_indicator("?", need_space=True) + self.allow_indentless_sequence = True + self.allow_inline_collection = True + self.states.append(self.expect_block_mapping_value) + self.state = self.expect_node + self.expect_node(event) + + def expect_block_mapping_key(self, event): + if isinstance(event, CollectionEndEvent): + self.indent = self.indents.pop() + self.state = self.states.pop() + else: + self.write_indent() + if self.is_simple(event): + self.allow_indentless_sequence = True + self.allow_inline_collection = False + self.simple_key = True + else: + self.write_indicator("?", need_space=True) + self.allow_indentless_sequence = True + self.allow_inline_collection = True + self.states.append(self.expect_block_mapping_value) + self.state = self.expect_node + self.expect_node(event) + + def expect_block_mapping_value(self, event): + if self.simple_key: + self.write_indicator(":", need_space=False) + self.allow_indentless_sequence = True + self.allow_inline_collection = False + self.simple_key = False + else: + self.write_indent() + self.write_indicator(":", need_space=True) + self.allow_indentless_sequence = True + self.allow_inline_collection = True + self.states.append(self.expect_block_mapping_key) + self.state = self.expect_node self.expect_node(event) def expect_nothing(self, event): raise EmitterError("expected nothing, but got %s" % event.__class__.__name__) def write_document_start(self): - self.writer.write("%YAML 1.1\n") + self.writer.write("%YAML 1.1") + self.write_line_break() self.writer.write("---") - self.soft_space = True + self.space = False + self.line = False def write_document_end(self): - self.writer.write("\n...\n") - self.soft_space = False - - def write_collection_start(self, indicator): - if self.soft_space: - self.writer.write(" ") - self.writer.write(indicator) - self.soft_space = False + if not self.line: + self.write_line_break() + self.writer.write("...") + self.write_line_break() - def write_collection_end(self, indicator): - self.writer.write(indicator) - self.soft_space = True + def write_line_break(self): + self.writer.write('\n') + self.space = True + self.line = True def write_anchor(self, indicator, name): - if self.soft_space: + if not self.space: self.writer.write(" ") self.writer.write("%s%s" % (indicator, name)) - self.soft_space = True + self.space = False + self.line = False def write_tag(self, tag): - if self.soft_space: + if not self.space: self.writer.write(" ") if tag.startswith("tag:yaml.org,2002:"): self.writer.write("!!%s" % tag[len("tag.yaml.org,2002:"):]) else: self.writer.write("!<%s>" % tag) - self.soft_space = True + self.space = False + self.line = False - def write_scalar(self, value): - if self.soft_space: - self.writer.write(" ") - self.writer.write("\"%s\"" % value.encode('utf-8')) - self.soft_space = True + def is_simple(self, event): + if not isinstance(event, ScalarEvent): + return False + if '\n' in event.value or len(event.value) > 128: + return False + if event.style and event.style in '|>': + return False + return True - def write_indicator(self, indicator): + def write_scalar(self, value, implicit, style): + if implicit: + if not self.space: + self.writer.write(" ") + self.writer.write(value.encode('utf-8')) + self.space = False + self.line = False + elif style in ['>', '|'] and not self.flow_level and not self.simple_key: + if not self.space: + self.writer.write(" ") + self.writer.write("|-") + self.write_line_break() + self.write_indent() + self.writer.write(value.encode('utf-8')) + self.write_line_break() + else: + if not self.space: + self.writer.write(" ") + self.writer.write("\"%s\"" % value.encode('utf-8')) + self.space = False + self.line = False + + def write_indicator(self, indicator, need_space=False, provide_space=False): + if need_space and not self.space: + self.writer.write(" ") self.writer.write(indicator) - self.soft_space = True + self.space = provide_space + self.line = False def write_indent(self): - self.writer.write("\n"+" "*(self.level*2)) - self.soft_space = False + if not self.line: + self.write_line_break() + if self.indent: + self.writer.write(" "*self.indent) + self.line = False + self.space = True |