summaryrefslogtreecommitdiff
path: root/tests/test_appliance.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_appliance.py')
-rw-r--r--tests/test_appliance.py198
1 files changed, 58 insertions, 140 deletions
diff --git a/tests/test_appliance.py b/tests/test_appliance.py
index c471398..6879036 100644
--- a/tests/test_appliance.py
+++ b/tests/test_appliance.py
@@ -1,6 +1,9 @@
import unittest, os
+from yaml.tokens import *
+from yaml.events import *
+
class TestAppliance(unittest.TestCase):
DATA = 'tests/data'
@@ -32,96 +35,12 @@ class TestAppliance(unittest.TestCase):
setattr(cls, test_method.__name__, test_method)
add_tests = classmethod(add_tests)
-class Node:
- def __repr__(self):
- args = []
- for attribute in ['anchor', 'tag', 'value']:
- if hasattr(self, attribute):
- args.append(repr(getattr(self, attribute)))
- return "%s(%s)" % (self.__class__.__name__, ', '.join(args))
-
-class AliasNode(Node):
- def __init__(self, anchor):
- self.anchor = anchor
-
-class ScalarNode(Node):
- def __init__(self, anchor, tag, value):
- self.anchor = anchor
- self.tag = tag
- self.value = value
-
-class SequenceNode(Node):
- def __init__(self, anchor, tag, value):
- self.anchor = anchor
- self.tag = tag
- self.value = value
-
-class MappingNode(Node):
- def __init__(self, anchor, tag, value):
- self.anchor = anchor
- self.tag = tag
- self.value = value
-
-class Token:
- def __repr__(self):
- args = []
- if hasattr(self, 'value'):
- args.append(repr(self.value))
- return "%s(%s)" % (self.__class__.__name__, ''.join(args))
-
-class StreamEndToken(Token):
- pass
-
-class DirectiveToken(Token):
- pass
-
-class DocumentStartToken(Token):
- pass
-
-class SequenceStartToken(Token):
- pass
-
-class MappingStartToken(Token):
- pass
-
-class SequenceEndToken(Token):
- pass
-
-class MappingEndToken(Token):
- pass
-
-class KeyToken(Token):
- pass
-
-class ValueToken(Token):
- pass
-
-class EntryToken(Token):
- pass
-
-class AliasToken(Token):
- def __init__(self, value):
- self.value = value
-
-class AnchorToken(Token):
- def __init__(self, value):
- self.value = value
-
-class TagToken(Token):
- def __init__(self, value):
- self.value = value
-
-class ScalarToken(Token):
- def __init__(self, value):
- self.value = value
-
class Error(Exception):
pass
class CanonicalScanner:
- def __init__(self, source, data):
- self.source = source
+ def __init__(self, data):
self.data = unicode(data, 'utf-8')+u'\0'
self.index = 0
@@ -132,34 +51,34 @@ class CanonicalScanner:
self.find_token()
ch = self.data[self.index]
if ch == u'\0':
- tokens.append(StreamEndToken())
+ tokens.append(StreamEndToken(None, None))
break
elif ch == u'%':
tokens.append(self.scan_directive())
elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
self.index += 3
- tokens.append(DocumentStartToken())
+ tokens.append(DocumentStartToken(None, None))
elif ch == u'[':
self.index += 1
- tokens.append(SequenceStartToken())
+ tokens.append(FlowSequenceStartToken(None, None))
elif ch == u'{':
self.index += 1
- tokens.append(MappingStartToken())
+ tokens.append(FlowMappingStartToken(None, None))
elif ch == u']':
self.index += 1
- tokens.append(SequenceEndToken())
+ tokens.append(FlowSequenceEndToken(None, None))
elif ch == u'}':
self.index += 1
- tokens.append(MappingEndToken())
+ tokens.append(FlowMappingEndToken(None, None))
elif ch == u'?':
self.index += 1
- tokens.append(KeyToken())
+ tokens.append(KeyToken(None, None))
elif ch == u':':
self.index += 1
- tokens.append(ValueToken())
+ tokens.append(ValueToken(None, None))
elif ch == u',':
self.index += 1
- tokens.append(EntryToken())
+ tokens.append(FlowEntryToken(None, None))
elif ch == u'*' or ch == u'&':
tokens.append(self.scan_alias())
elif ch == u'!':
@@ -176,7 +95,7 @@ class CanonicalScanner:
if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
self.index += len(self.DIRECTIVE)
- return DirectiveToken()
+ return DirectiveToken('YAML', (1, 1), None, None)
def scan_alias(self):
if self.data[self.index] == u'*':
@@ -188,7 +107,7 @@ class CanonicalScanner:
while self.data[self.index] not in u', \n\0':
self.index += 1
value = self.data[start:self.index]
- return TokenClass(value)
+ return TokenClass(value, None, None)
def scan_tag(self):
self.index += 1
@@ -198,9 +117,11 @@ class CanonicalScanner:
value = self.data[start:self.index]
if value[0] == u'!':
value = 'tag:yaml.org,2002:'+value[1:]
- else:
+ elif value[0] == u'<' and value[-1] == u'>':
value = value[1:-1]
- return TagToken(value)
+ else:
+ value = u'!'+value
+ return TagToken(value, None, None)
QUOTE_CODES = {
'x': 2,
@@ -264,7 +185,7 @@ class CanonicalScanner:
self.index += 1
chunks.append(self.data[start:self.index])
self.index += 1
- return ScalarToken(u''.join(chunks))
+ return ScalarToken(u''.join(chunks), False, None, None)
def find_token(self):
found = False
@@ -281,83 +202,79 @@ class CanonicalScanner:
class CanonicalParser:
- def __init__(self, source, data):
- self.scanner = CanonicalScanner(source, data)
+ def __init__(self, data):
+ self.scanner = CanonicalScanner(data)
+ self.events = []
# stream: document* END
def parse_stream(self):
- documents = []
while not self.test_token(StreamEndToken):
if self.test_token(DirectiveToken, DocumentStartToken):
- documents.append(self.parse_document())
+ self.parse_document()
else:
raise Error("document is expected, got "+repr(self.tokens[self.index]))
- return documents
+ self.events.append(StreamEndEvent(None, None))
- # document: DIRECTIVE? DOCUMENT-START node?
+ # document: DIRECTIVE? DOCUMENT-START node
def parse_document(self):
node = None
if self.test_token(DirectiveToken):
self.consume_token(DirectiveToken)
self.consume_token(DocumentStartToken)
- if self.test_token(TagToken, AliasToken, AnchorToken, TagToken,
- SequenceStartToken, MappingStartToken, ScalarToken):
- node = self.parse_node()
- return node
+ self.parse_node()
# node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
def parse_node(self):
if self.test_token(AliasToken):
- return AliasNode(self.get_value())
+ self.events.append(AliasEvent(self.get_value(), None, None))
else:
anchor = None
if self.test_token(AnchorToken):
anchor = self.get_value()
- tag = None
+ tag = u'!'
if self.test_token(TagToken):
tag = self.get_value()
if self.test_token(ScalarToken):
- return ScalarNode(anchor, tag, self.get_value())
- elif self.test_token(SequenceStartToken):
- return SequenceNode(anchor, tag, self.parse_sequence())
- elif self.test_token(MappingStartToken):
- return MappingNode(anchor, tag, self.parse_mapping())
+ self.events.append(ScalarEvent(anchor, tag, self.get_value(), None, None))
+ elif self.test_token(FlowSequenceStartToken):
+ self.events.append(SequenceEvent(anchor, tag, None, None))
+ self.parse_sequence()
+ elif self.test_token(FlowMappingStartToken):
+ self.events.append(MappingEvent(anchor, tag, None, None))
+ self.parse_mapping()
else:
raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
# sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
def parse_sequence(self):
- values = []
- self.consume_token(SequenceStartToken)
- if not self.test_token(SequenceEndToken):
- values.append(self.parse_node())
- while not self.test_token(SequenceEndToken):
- self.consume_token(EntryToken)
- if not self.test_token(SequenceEndToken):
- values.append(self.parse_node())
- self.consume_token(SequenceEndToken)
- return values
+ self.consume_token(FlowSequenceStartToken)
+ if not self.test_token(FlowSequenceEndToken):
+ self.parse_node()
+ while not self.test_token(FlowSequenceEndToken):
+ self.consume_token(FlowEntryToken)
+ if not self.test_token(FlowSequenceEndToken):
+ self.parse_node()
+ self.consume_token(FlowSequenceEndToken)
+ self.events.append(CollectionEndEvent(None, None))
# mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
def parse_mapping(self):
- values = []
- self.consume_token(MappingStartToken)
- if not self.test_token(MappingEndToken):
- values.append(self.parse_map_entry())
- while not self.test_token(MappingEndToken):
- self.consume_token(EntryToken)
- if not self.test_token(MappingEndToken):
- values.append(self.parse_map_entry())
- self.consume_token(MappingEndToken)
- return values
+ self.consume_token(FlowMappingStartToken)
+ if not self.test_token(FlowMappingEndToken):
+ self.parse_map_entry()
+ while not self.test_token(FlowMappingEndToken):
+ self.consume_token(FlowEntryToken)
+ if not self.test_token(FlowMappingEndToken):
+ self.parse_map_entry()
+ self.consume_token(FlowMappingEndToken)
+ self.events.append(CollectionEndEvent(None, None))
# map_entry: KEY node VALUE node
def parse_map_entry(self):
self.consume_token(KeyToken)
- key = self.parse_node()
+ self.parse_node()
self.consume_token(ValueToken)
- value = self.parse_node()
- return (key, value)
+ self.parse_node()
def test_token(self, *choices):
for choice in choices:
@@ -378,5 +295,6 @@ class CanonicalParser:
def parse(self):
self.tokens = self.scanner.scan()
self.index = 0
- return self.parse_stream()
+ self.parse_stream()
+ return self.events