diff options
Diffstat (limited to 'tests/test_appliance.py')
-rw-r--r-- | tests/test_appliance.py | 198 |
1 files changed, 58 insertions, 140 deletions
diff --git a/tests/test_appliance.py b/tests/test_appliance.py index c471398..6879036 100644 --- a/tests/test_appliance.py +++ b/tests/test_appliance.py @@ -1,6 +1,9 @@ import unittest, os +from yaml.tokens import * +from yaml.events import * + class TestAppliance(unittest.TestCase): DATA = 'tests/data' @@ -32,96 +35,12 @@ class TestAppliance(unittest.TestCase): setattr(cls, test_method.__name__, test_method) add_tests = classmethod(add_tests) -class Node: - def __repr__(self): - args = [] - for attribute in ['anchor', 'tag', 'value']: - if hasattr(self, attribute): - args.append(repr(getattr(self, attribute))) - return "%s(%s)" % (self.__class__.__name__, ', '.join(args)) - -class AliasNode(Node): - def __init__(self, anchor): - self.anchor = anchor - -class ScalarNode(Node): - def __init__(self, anchor, tag, value): - self.anchor = anchor - self.tag = tag - self.value = value - -class SequenceNode(Node): - def __init__(self, anchor, tag, value): - self.anchor = anchor - self.tag = tag - self.value = value - -class MappingNode(Node): - def __init__(self, anchor, tag, value): - self.anchor = anchor - self.tag = tag - self.value = value - -class Token: - def __repr__(self): - args = [] - if hasattr(self, 'value'): - args.append(repr(self.value)) - return "%s(%s)" % (self.__class__.__name__, ''.join(args)) - -class StreamEndToken(Token): - pass - -class DirectiveToken(Token): - pass - -class DocumentStartToken(Token): - pass - -class SequenceStartToken(Token): - pass - -class MappingStartToken(Token): - pass - -class SequenceEndToken(Token): - pass - -class MappingEndToken(Token): - pass - -class KeyToken(Token): - pass - -class ValueToken(Token): - pass - -class EntryToken(Token): - pass - -class AliasToken(Token): - def __init__(self, value): - self.value = value - -class AnchorToken(Token): - def __init__(self, value): - self.value = value - -class TagToken(Token): - def __init__(self, value): - self.value = value - -class ScalarToken(Token): - def __init__(self, value): - self.value = value - class Error(Exception): pass class CanonicalScanner: - def __init__(self, source, data): - self.source = source + def __init__(self, data): self.data = unicode(data, 'utf-8')+u'\0' self.index = 0 @@ -132,34 +51,34 @@ class CanonicalScanner: self.find_token() ch = self.data[self.index] if ch == u'\0': - tokens.append(StreamEndToken()) + tokens.append(StreamEndToken(None, None)) break elif ch == u'%': tokens.append(self.scan_directive()) elif ch == u'-' and self.data[self.index:self.index+3] == u'---': self.index += 3 - tokens.append(DocumentStartToken()) + tokens.append(DocumentStartToken(None, None)) elif ch == u'[': self.index += 1 - tokens.append(SequenceStartToken()) + tokens.append(FlowSequenceStartToken(None, None)) elif ch == u'{': self.index += 1 - tokens.append(MappingStartToken()) + tokens.append(FlowMappingStartToken(None, None)) elif ch == u']': self.index += 1 - tokens.append(SequenceEndToken()) + tokens.append(FlowSequenceEndToken(None, None)) elif ch == u'}': self.index += 1 - tokens.append(MappingEndToken()) + tokens.append(FlowMappingEndToken(None, None)) elif ch == u'?': self.index += 1 - tokens.append(KeyToken()) + tokens.append(KeyToken(None, None)) elif ch == u':': self.index += 1 - tokens.append(ValueToken()) + tokens.append(ValueToken(None, None)) elif ch == u',': self.index += 1 - tokens.append(EntryToken()) + tokens.append(FlowEntryToken(None, None)) elif ch == u'*' or ch == u'&': tokens.append(self.scan_alias()) elif ch == u'!': @@ -176,7 +95,7 @@ class CanonicalScanner: if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \ self.data[self.index+len(self.DIRECTIVE)] in u' \n\0': self.index += len(self.DIRECTIVE) - return DirectiveToken() + return DirectiveToken('YAML', (1, 1), None, None) def scan_alias(self): if self.data[self.index] == u'*': @@ -188,7 +107,7 @@ class CanonicalScanner: while self.data[self.index] not in u', \n\0': self.index += 1 value = self.data[start:self.index] - return TokenClass(value) + return TokenClass(value, None, None) def scan_tag(self): self.index += 1 @@ -198,9 +117,11 @@ class CanonicalScanner: value = self.data[start:self.index] if value[0] == u'!': value = 'tag:yaml.org,2002:'+value[1:] - else: + elif value[0] == u'<' and value[-1] == u'>': value = value[1:-1] - return TagToken(value) + else: + value = u'!'+value + return TagToken(value, None, None) QUOTE_CODES = { 'x': 2, @@ -264,7 +185,7 @@ class CanonicalScanner: self.index += 1 chunks.append(self.data[start:self.index]) self.index += 1 - return ScalarToken(u''.join(chunks)) + return ScalarToken(u''.join(chunks), False, None, None) def find_token(self): found = False @@ -281,83 +202,79 @@ class CanonicalScanner: class CanonicalParser: - def __init__(self, source, data): - self.scanner = CanonicalScanner(source, data) + def __init__(self, data): + self.scanner = CanonicalScanner(data) + self.events = [] # stream: document* END def parse_stream(self): - documents = [] while not self.test_token(StreamEndToken): if self.test_token(DirectiveToken, DocumentStartToken): - documents.append(self.parse_document()) + self.parse_document() else: raise Error("document is expected, got "+repr(self.tokens[self.index])) - return documents + self.events.append(StreamEndEvent(None, None)) - # document: DIRECTIVE? DOCUMENT-START node? + # document: DIRECTIVE? DOCUMENT-START node def parse_document(self): node = None if self.test_token(DirectiveToken): self.consume_token(DirectiveToken) self.consume_token(DocumentStartToken) - if self.test_token(TagToken, AliasToken, AnchorToken, TagToken, - SequenceStartToken, MappingStartToken, ScalarToken): - node = self.parse_node() - return node + self.parse_node() # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping) def parse_node(self): if self.test_token(AliasToken): - return AliasNode(self.get_value()) + self.events.append(AliasEvent(self.get_value(), None, None)) else: anchor = None if self.test_token(AnchorToken): anchor = self.get_value() - tag = None + tag = u'!' if self.test_token(TagToken): tag = self.get_value() if self.test_token(ScalarToken): - return ScalarNode(anchor, tag, self.get_value()) - elif self.test_token(SequenceStartToken): - return SequenceNode(anchor, tag, self.parse_sequence()) - elif self.test_token(MappingStartToken): - return MappingNode(anchor, tag, self.parse_mapping()) + self.events.append(ScalarEvent(anchor, tag, self.get_value(), None, None)) + elif self.test_token(FlowSequenceStartToken): + self.events.append(SequenceEvent(anchor, tag, None, None)) + self.parse_sequence() + elif self.test_token(FlowMappingStartToken): + self.events.append(MappingEvent(anchor, tag, None, None)) + self.parse_mapping() else: raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index])) # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END def parse_sequence(self): - values = [] - self.consume_token(SequenceStartToken) - if not self.test_token(SequenceEndToken): - values.append(self.parse_node()) - while not self.test_token(SequenceEndToken): - self.consume_token(EntryToken) - if not self.test_token(SequenceEndToken): - values.append(self.parse_node()) - self.consume_token(SequenceEndToken) - return values + self.consume_token(FlowSequenceStartToken) + if not self.test_token(FlowSequenceEndToken): + self.parse_node() + while not self.test_token(FlowSequenceEndToken): + self.consume_token(FlowEntryToken) + if not self.test_token(FlowSequenceEndToken): + self.parse_node() + self.consume_token(FlowSequenceEndToken) + self.events.append(CollectionEndEvent(None, None)) # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END def parse_mapping(self): - values = [] - self.consume_token(MappingStartToken) - if not self.test_token(MappingEndToken): - values.append(self.parse_map_entry()) - while not self.test_token(MappingEndToken): - self.consume_token(EntryToken) - if not self.test_token(MappingEndToken): - values.append(self.parse_map_entry()) - self.consume_token(MappingEndToken) - return values + self.consume_token(FlowMappingStartToken) + if not self.test_token(FlowMappingEndToken): + self.parse_map_entry() + while not self.test_token(FlowMappingEndToken): + self.consume_token(FlowEntryToken) + if not self.test_token(FlowMappingEndToken): + self.parse_map_entry() + self.consume_token(FlowMappingEndToken) + self.events.append(CollectionEndEvent(None, None)) # map_entry: KEY node VALUE node def parse_map_entry(self): self.consume_token(KeyToken) - key = self.parse_node() + self.parse_node() self.consume_token(ValueToken) - value = self.parse_node() - return (key, value) + self.parse_node() def test_token(self, *choices): for choice in choices: @@ -378,5 +295,6 @@ class CanonicalParser: def parse(self): self.tokens = self.scanner.scan() self.index = 0 - return self.parse_stream() + self.parse_stream() + return self.events |