summaryrefslogtreecommitdiff
path: root/lib/yaml/composer.py
blob: fcd93ab7cbcc09c4b977596d91b701e495b1ade9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

from error import MarkedYAMLError
from events import *
from nodes import *

class ComposerError(MarkedYAMLError):
    pass

class Composer:

    def __init__(self, parser):
        self.parser = parser
        self.all_anchors = {}
        self.complete_anchors = {}

    def check(self):
        # If there are more documents available?
        return not self.parser.check(StreamEndEvent)

    def get(self):
        # Get the root node of the next document.
        if not self.parser.check(StreamEndEvent):
            return self.compose_document()

    def __iter__(self):
        # Iterator protocol.
        while not self.parser.check(StreamEndEvent):
            yield self.compose_document()

    def compose_document(self):
        node = self.compose_node()
        self.all_anchors = {}
        self.complete_anchors = {}
        return node

    def compose_node(self):
        if self.parser.check(AliasEvent):
            event = self.parser.get()
            anchor = event.anchor
            if anchor not in self.all_anchors:
                raise ComposerError(None, None, "found undefined alias %r"
                        % anchor.encode('utf-8'), event.start_marker)
            if anchor not in self.complete_anchors:
                collection_event = self.all_anchors[anchor]
                raise ComposerError("while composing a collection",
                        collection_event.start_marker,
                        "found recursive anchor %r" % anchor.encode('utf-8'),
                        event.start_marker)
            return self.complete_anchors[anchor]
        event = self.parser.peek()
        anchor = event.anchor
        if anchor is not None:
            if anchor in self.all_anchors:
                raise ComposerError("found duplicate anchor %r; first occurence"
                        % anchor.encode('utf-8'), self.all_anchors[anchor].start_marker,
                        "second occurence", event.start_marker)
            self.all_anchors[anchor] = event
        if self.parser.check(ScalarEvent):
            node = self.compose_scalar_node()
        elif self.parser.check(SequenceEvent):
            node = self.compose_sequence_node()
        elif self.parser.check(MappingEvent):
            node = self.compose_mapping_node()
        if anchor is not None:
            self.complete_anchors[anchor] = node
        return node

    def compose_scalar_node(self):
        event = self.parser.get()
        return ScalarNode(event.tag, event.value,
                event.start_marker, event.end_marker)

    def compose_sequence_node(self):
        start_event = self.parser.get()
        value = []
        while not self.parser.check(CollectionEndEvent):
            value.append(self.compose_node())
        end_event = self.parser.get()
        return SequenceNode(start_event.tag, value,
                start_event.start_marker, end_event.end_marker)

    def compose_mapping_node(self):
        start_event = self.parser.get()
        value = []
        while not self.parser.check(CollectionEndEvent):
            item_key = self.compose_node()
            item_value = self.compose_node()
            value.append((item_key, item_value))
        end_event = self.parser.get()
        return MappingNode(start_event.tag, value,
                start_event.start_marker, end_event.end_marker)