summaryrefslogtreecommitdiff
path: root/lib/yaml/emitter.py
blob: d2b372f9ea0fe1c534290e8ba3ee5dc6cf153b89 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180

# Events should obey the following grammar:
# stream ::= STREAM-START document* STREAM-END
# document ::= DOCUMENT-START node DOCUMENT-END
# node ::= SCALAR | sequence | mapping
# sequence ::= SEQUENCE-START node* SEQUENCE-END
# mapping ::= MAPPING-START (node node)* MAPPING-END

__all__ = ['Emitter', 'EmitterError']

from error import YAMLError
from events import *

class EmitterError(YAMLError):
    pass

class Emitter:

    def __init__(self, writer):
        self.writer = writer
        self.states = []
        self.state = self.expect_stream_start
        self.levels = []
        self.level = 0
        self.soft_space = False

    def emit(self, event):
        self.state(event)

    def expect_stream_start(self, event):
        if isinstance(event, StreamStartEvent):
            self.state = self.expect_document_start
        else:
            raise EmitterError("expected StreamStartEvent, but got %s" % event.__class__.__name__)

    def expect_document_start(self, event):
        if isinstance(event, DocumentStartEvent):
            self.write_document_start()
            self.states.append(self.expect_document_end)
            self.state = self.expect_root_node
        elif isinstance(event, StreamEndEvent):
            self.writer.flush()
            self.state = self.expect_nothing
        else:
            raise EmitterError("expected DocumentStartEvent, but got %s" % event.__class__.__name__)

    def expect_document_end(self, event):
        if isinstance(event, DocumentEndEvent):
            self.write_document_end()
            self.state = self.expect_document_start
        else:
            raiseEmitterError("expected DocumentEndEvent, but got %s" % event.__class__.__name__)

    def expect_root_node(self, event):
        self.expect_node(event)

    def expect_node(self, event):
        if isinstance(event, AliasEvent):
            self.write_anchor("*", event.anchor)
            self.state = self.states.pop()
        elif isinstance(event, NodeEvent):
            if event.anchor:
                self.write_anchor("&", event.anchor)
            if event.tag:
                self.write_tag(event.tag)
            if isinstance(event, ScalarEvent):
                self.write_scalar(event.value)
                self.state = self.states.pop()
            elif isinstance(event, SequenceEvent):
                self.write_collection_start("[")
                self.level += 1
                self.state = self.expect_first_sequence_item
            elif isinstance(event, MappingEvent):
                self.write_collection_start("{")
                self.level += 1
                self.state = self.expect_first_mapping_key
        else:
            raise EmitterError("Expected NodeEvent, but got %s" % event.__class__.__name__)

    def expect_first_sequence_item(self, event):
        if isinstance(event, CollectionEndEvent):
            self.write_collection_end("]")
            self.state = self.states.pop()
        else:
            self.write_indent()
            self.states.append(self.expect_sequence_item)
            self.expect_node(event)

    def expect_sequence_item(self, event):
        if isinstance(event, CollectionEndEvent):
            self.level -= 1
            self.write_indent()
            self.write_collection_end("]")
            self.state = self.states.pop()
        else:
            self.write_indicator(",")
            self.write_indent()
            self.states.append(self.expect_sequence_item)
            self.expect_node(event)
        
    def expect_first_mapping_key(self, event):
        if isinstance(event, CollectionEndEvent):
            self.write_collection_end("}")
            self.state = self.states.pop()
        else:
            self.write_indent()
            self.write_indicator("?")
            self.states.append(self.expect_mapping_value)
            self.expect_node(event)

    def expect_mapping_key(self, event):
        if isinstance(event, CollectionEndEvent):
            self.level -= 1
            self.write_indent()
            self.write_collection_end("}")
            self.state = self.states.pop()
        else:
            self.write_indicator(",")
            self.write_indent()
            self.write_indicator("?")
            self.states.append(self.expect_mapping_value)
            self.expect_node(event)

    def expect_mapping_value(self, event):
        self.write_indent()
        self.write_indicator(":")
        self.states.append(self.expect_mapping_key)
        self.expect_node(event)

    def expect_nothing(self, event):
        raise EmitterError("expected nothing, but got %s" % event.__class__.__name__)

    def write_document_start(self):
        self.writer.write("%YAML 1.1\n")
        self.writer.write("---")
        self.soft_space = True

    def write_document_end(self):
        self.writer.write("\n...\n")
        self.soft_space = False

    def write_collection_start(self, indicator):
        if self.soft_space:
            self.writer.write(" ")
        self.writer.write(indicator)
        self.soft_space = False

    def write_collection_end(self, indicator):
        self.writer.write(indicator)
        self.soft_space = True

    def write_anchor(self, indicator, name):
        if self.soft_space:
            self.writer.write(" ")
        self.writer.write("%s%s" % (indicator, name))
        self.soft_space = True

    def write_tag(self, tag):
        if self.soft_space:
            self.writer.write(" ")
        if tag.startswith("tag:yaml.org,2002:"):
            self.writer.write("!!%s" % tag[len("tag.yaml.org,2002:"):])
        else:
            self.writer.write("!<%s>" % tag)
        self.soft_space = True

    def write_scalar(self, value):
        if self.soft_space:
            self.writer.write(" ")
        self.writer.write("\"%s\"" % value.encode('utf-8'))
        self.soft_space = True

    def write_indicator(self, indicator):
        self.writer.write(indicator)
        self.soft_space = True

    def write_indent(self):
        self.writer.write("\n"+" "*(self.level*2))
        self.soft_space = False