summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxi <xi@18f92427-320e-0410-9341-c67f048884a3>2008-12-28 20:16:50 +0000
committerxi <xi@18f92427-320e-0410-9341-c67f048884a3>2008-12-28 20:16:50 +0000
commit4691639907112004c7242215370d8b42ffec3e5b (patch)
treecf3e23519728f94b6e04d2132a61577ef9deb0a0
parent398bababf57286f4366649052c75075083d2060e (diff)
downloadpyyaml-4691639907112004c7242215370d8b42ffec3e5b.tar.gz
Refactored the test suite; updated include and library paths in setup.cfg.
git-svn-id: http://svn.pyyaml.org/pyyaml/trunk@322 18f92427-320e-0410-9341-c67f048884a3
-rw-r--r--MANIFEST.in4
-rw-r--r--Makefile3
-rw-r--r--setup.cfg13
-rw-r--r--setup.py20
-rw-r--r--tests/canonical.py357
-rw-r--r--tests/data/construct-python-name-module.code2
-rw-r--r--tests/data/empty-document-bug.empty0
-rw-r--r--tests/data/invalid-character.stream-errorbin2209 -> 1121 bytes
-rw-r--r--tests/data/invalid-utf8-byte.stream-error34
-rw-r--r--tests/data/odd-utf16.stream-errorbin2463 -> 1311 bytes
-rw-r--r--tests/data/serializer-is-already-opened.dumper-error2
-rw-r--r--tests/data/serializer-is-closed-1.dumper-error2
-rw-r--r--tests/data/serializer-is-closed-2.dumper-error4
-rw-r--r--tests/data/serializer-is-not-opened-1.dumper-error2
-rw-r--r--tests/data/serializer-is-not-opened-2.dumper-error4
-rw-r--r--tests/data/unknown.dumper-error2
-rw-r--r--tests/test_all.py20
-rw-r--r--tests/test_appliance.py491
-rw-r--r--tests/test_build.py9
-rw-r--r--tests/test_build_ext.py9
-rw-r--r--tests/test_canonical.py48
-rw-r--r--tests/test_constructor.py554
-rw-r--r--tests/test_emitter.py187
-rw-r--r--tests/test_errors.py153
-rw-r--r--tests/test_mark.py56
-rw-r--r--tests/test_reader.py67
-rw-r--r--tests/test_recursive.py40
-rw-r--r--tests/test_representer.py90
-rw-r--r--tests/test_resolver.py165
-rw-r--r--tests/test_structure.py409
-rw-r--r--tests/test_syck.py30
-rw-r--r--tests/test_tokens.py143
-rw-r--r--tests/test_yaml.py9
-rw-r--r--tests/test_yaml_ext.py418
34 files changed, 1698 insertions, 1649 deletions
diff --git a/MANIFEST.in b/MANIFEST.in
index 1c6b8bf..3d3b4cd 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -1,4 +1,4 @@
include README LICENSE setup.py
recursive-include examples *.py *.cfg *.yaml
-#recursive-include tests *.py
-#recursive-include tests/data *
+recursive-include tests *.py
+recursive-include tests/data *
diff --git a/Makefile b/Makefile
index 7991206..7aaccff 100644
--- a/Makefile
+++ b/Makefile
@@ -29,6 +29,9 @@ test: build
testext: buildext
${PYTHON} tests/test_build_ext.py ${TEST}
+testall:
+ ${PYTHON} setup.py test
+
dist:
${PYTHON} setup.py --with-libyaml sdist --formats=zip,gztar
diff --git a/setup.cfg b/setup.cfg
index b0a20a3..773c4d0 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -17,18 +17,19 @@
# The following options are used to build PyYAML Windows installer
# for Python 2.3 on my PC:
-#include_dirs=../../libyaml/branches/stable/include
-#library_dirs=../../libyaml/branches/stable/win32/vc6/output/release/lib
+#include_dirs=../../../libyaml/tags/0.1.2/include
+#library_dirs=../../../libyaml/tags/0.1.2/win32/vc6/output/release/lib
#define=YAML_DECLARE_STATIC
# The following options are used to build PyYAML Windows installer
# for Python 2.4 and Python 2.5 on my PC:
-#include_dirs=../../libyaml/branches/stable/include
-#library_dirs=../../libyaml/branches/stable/win32/vs2003/output/release/lib
+#include_dirs=../../../libyaml/tags/0.1.2/include
+#library_dirs=../../../libyaml/tags/0.1.2/win32/vs2003/output/release/lib
#define=YAML_DECLARE_STATIC
# The following options are used to build PyYAML Windows installer
# for Python 2.6 on my PC:
-#include_dirs=../../libyaml/branches/stable/include
-#library_dirs=../../libyaml/branches/stable/win32/vs2008/output/release/lib
+#include_dirs=../../../libyaml/tags/0.1.2/include
+#library_dirs=../../../libyaml/tags/0.1.2/win32/vs2008/output/release/lib
#define=YAML_DECLARE_STATIC
+
diff --git a/setup.py b/setup.py
index d963517..13fc574 100644
--- a/setup.py
+++ b/setup.py
@@ -269,6 +269,25 @@ class bdist_rpm(_bdist_rpm):
return spec_file
+class test(Command):
+
+ user_options = []
+
+ def initialize_options(self):
+ pass
+
+ def finalize_options(self):
+ pass
+
+ def run(self):
+ build_cmd = self.get_finalized_command('build')
+ build_cmd.run()
+ sys.path.insert(0, build_cmd.build_lib)
+ sys.path.insert(0, 'tests')
+ import test_all
+ test_all.main([])
+
+
if __name__ == '__main__':
setup(
@@ -296,6 +315,7 @@ if __name__ == '__main__':
cmdclass={
'build_ext': build_ext,
'bdist_rpm': bdist_rpm,
+ 'test': test,
},
)
diff --git a/tests/canonical.py b/tests/canonical.py
new file mode 100644
index 0000000..41d111a
--- /dev/null
+++ b/tests/canonical.py
@@ -0,0 +1,357 @@
+
+import yaml, yaml.composer, yaml.constructor, yaml.resolver
+
+class CanonicalError(yaml.YAMLError):
+ pass
+
+class CanonicalScanner:
+
+ def __init__(self, data):
+ try:
+ self.data = unicode(data, 'utf-8')+u'\0'
+ except UnicodeDecodeError:
+ raise CanonicalError("utf-8 stream is expected")
+ self.index = 0
+ self.tokens = []
+ self.scanned = False
+
+ def check_token(self, *choices):
+ if not self.scanned:
+ self.scan()
+ if self.tokens:
+ if not choices:
+ return True
+ for choice in choices:
+ if isinstance(self.tokens[0], choice):
+ return True
+ return False
+
+ def peek_token(self):
+ if not self.scanned:
+ self.scan()
+ if self.tokens:
+ return self.tokens[0]
+
+ def get_token(self, choice=None):
+ if not self.scanned:
+ self.scan()
+ token = self.tokens.pop(0)
+ if choice and not isinstance(token, choice):
+ raise CanonicalError("unexpected token "+repr(token))
+ return token
+
+ def get_token_value(self):
+ token = self.get_token()
+ return token.value
+
+ def scan(self):
+ self.tokens.append(yaml.StreamStartToken(None, None))
+ while True:
+ self.find_token()
+ ch = self.data[self.index]
+ if ch == u'\0':
+ self.tokens.append(yaml.StreamEndToken(None, None))
+ break
+ elif ch == u'%':
+ self.tokens.append(self.scan_directive())
+ elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
+ self.index += 3
+ self.tokens.append(yaml.DocumentStartToken(None, None))
+ elif ch == u'[':
+ self.index += 1
+ self.tokens.append(yaml.FlowSequenceStartToken(None, None))
+ elif ch == u'{':
+ self.index += 1
+ self.tokens.append(yaml.FlowMappingStartToken(None, None))
+ elif ch == u']':
+ self.index += 1
+ self.tokens.append(yaml.FlowSequenceEndToken(None, None))
+ elif ch == u'}':
+ self.index += 1
+ self.tokens.append(yaml.FlowMappingEndToken(None, None))
+ elif ch == u'?':
+ self.index += 1
+ self.tokens.append(yaml.KeyToken(None, None))
+ elif ch == u':':
+ self.index += 1
+ self.tokens.append(yaml.ValueToken(None, None))
+ elif ch == u',':
+ self.index += 1
+ self.tokens.append(yaml.FlowEntryToken(None, None))
+ elif ch == u'*' or ch == u'&':
+ self.tokens.append(self.scan_alias())
+ elif ch == u'!':
+ self.tokens.append(self.scan_tag())
+ elif ch == u'"':
+ self.tokens.append(self.scan_scalar())
+ else:
+ raise CanonicalError("invalid token")
+ self.scanned = True
+
+ DIRECTIVE = u'%YAML 1.1'
+
+ def scan_directive(self):
+ if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
+ self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
+ self.index += len(self.DIRECTIVE)
+ return yaml.DirectiveToken('YAML', (1, 1), None, None)
+ else:
+ raise CanonicalError("invalid directive")
+
+ def scan_alias(self):
+ if self.data[self.index] == u'*':
+ TokenClass = yaml.AliasToken
+ else:
+ TokenClass = yaml.AnchorToken
+ self.index += 1
+ start = self.index
+ while self.data[self.index] not in u', \n\0':
+ self.index += 1
+ value = self.data[start:self.index]
+ return TokenClass(value, None, None)
+
+ def scan_tag(self):
+ self.index += 1
+ start = self.index
+ while self.data[self.index] not in u' \n\0':
+ self.index += 1
+ value = self.data[start:self.index]
+ if not value:
+ value = u'!'
+ elif value[0] == u'!':
+ value = 'tag:yaml.org,2002:'+value[1:]
+ elif value[0] == u'<' and value[-1] == u'>':
+ value = value[1:-1]
+ else:
+ value = u'!'+value
+ return yaml.TagToken(value, None, None)
+
+ QUOTE_CODES = {
+ 'x': 2,
+ 'u': 4,
+ 'U': 8,
+ }
+
+ QUOTE_REPLACES = {
+ u'\\': u'\\',
+ u'\"': u'\"',
+ u' ': u' ',
+ u'a': u'\x07',
+ u'b': u'\x08',
+ u'e': u'\x1B',
+ u'f': u'\x0C',
+ u'n': u'\x0A',
+ u'r': u'\x0D',
+ u't': u'\x09',
+ u'v': u'\x0B',
+ u'N': u'\u0085',
+ u'L': u'\u2028',
+ u'P': u'\u2029',
+ u'_': u'_',
+ u'0': u'\x00',
+
+ }
+
+ def scan_scalar(self):
+ self.index += 1
+ chunks = []
+ start = self.index
+ ignore_spaces = False
+ while self.data[self.index] != u'"':
+ if self.data[self.index] == u'\\':
+ ignore_spaces = False
+ chunks.append(self.data[start:self.index])
+ self.index += 1
+ ch = self.data[self.index]
+ self.index += 1
+ if ch == u'\n':
+ ignore_spaces = True
+ elif ch in self.QUOTE_CODES:
+ length = self.QUOTE_CODES[ch]
+ code = int(self.data[self.index:self.index+length], 16)
+ chunks.append(unichr(code))
+ self.index += length
+ else:
+ if ch not in self.QUOTE_REPLACES:
+ raise CanonicalError("invalid escape code")
+ chunks.append(self.QUOTE_REPLACES[ch])
+ start = self.index
+ elif self.data[self.index] == u'\n':
+ chunks.append(self.data[start:self.index])
+ chunks.append(u' ')
+ self.index += 1
+ start = self.index
+ ignore_spaces = True
+ elif ignore_spaces and self.data[self.index] == u' ':
+ self.index += 1
+ start = self.index
+ else:
+ ignore_spaces = False
+ self.index += 1
+ chunks.append(self.data[start:self.index])
+ self.index += 1
+ return yaml.ScalarToken(u''.join(chunks), False, None, None)
+
+ def find_token(self):
+ found = False
+ while not found:
+ while self.data[self.index] in u' \t':
+ self.index += 1
+ if self.data[self.index] == u'#':
+ while self.data[self.index] != u'\n':
+ self.index += 1
+ if self.data[self.index] == u'\n':
+ self.index += 1
+ else:
+ found = True
+
+class CanonicalParser:
+
+ def __init__(self):
+ self.events = []
+ self.parsed = False
+
+ # stream: STREAM-START document* STREAM-END
+ def parse_stream(self):
+ self.get_token(yaml.StreamStartToken)
+ self.events.append(yaml.StreamStartEvent(None, None))
+ while not self.check_token(yaml.StreamEndToken):
+ if self.check_token(yaml.DirectiveToken, yaml.DocumentStartToken):
+ self.parse_document()
+ else:
+ raise CanonicalError("document is expected, got "+repr(self.tokens[0]))
+ self.get_token(yaml.StreamEndToken)
+ self.events.append(yaml.StreamEndEvent(None, None))
+
+ # document: DIRECTIVE? DOCUMENT-START node
+ def parse_document(self):
+ node = None
+ if self.check_token(yaml.DirectiveToken):
+ self.get_token(yaml.DirectiveToken)
+ self.get_token(yaml.DocumentStartToken)
+ self.events.append(yaml.DocumentStartEvent(None, None))
+ self.parse_node()
+ self.events.append(yaml.DocumentEndEvent(None, None))
+
+ # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
+ def parse_node(self):
+ if self.check_token(yaml.AliasToken):
+ self.events.append(yaml.AliasEvent(self.get_token_value(), None, None))
+ else:
+ anchor = None
+ if self.check_token(yaml.AnchorToken):
+ anchor = self.get_token_value()
+ tag = None
+ if self.check_token(yaml.TagToken):
+ tag = self.get_token_value()
+ if self.check_token(yaml.ScalarToken):
+ self.events.append(yaml.ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
+ elif self.check_token(yaml.FlowSequenceStartToken):
+ self.events.append(yaml.SequenceStartEvent(anchor, tag, None, None))
+ self.parse_sequence()
+ elif self.check_token(yaml.FlowMappingStartToken):
+ self.events.append(yaml.MappingStartEvent(anchor, tag, None, None))
+ self.parse_mapping()
+ else:
+ raise CanonicalError("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[0]))
+
+ # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
+ def parse_sequence(self):
+ self.get_token(yaml.FlowSequenceStartToken)
+ if not self.check_token(yaml.FlowSequenceEndToken):
+ self.parse_node()
+ while not self.check_token(yaml.FlowSequenceEndToken):
+ self.get_token(yaml.FlowEntryToken)
+ if not self.check_token(yaml.FlowSequenceEndToken):
+ self.parse_node()
+ self.get_token(yaml.FlowSequenceEndToken)
+ self.events.append(yaml.SequenceEndEvent(None, None))
+
+ # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
+ def parse_mapping(self):
+ self.get_token(yaml.FlowMappingStartToken)
+ if not self.check_token(yaml.FlowMappingEndToken):
+ self.parse_map_entry()
+ while not self.check_token(yaml.FlowMappingEndToken):
+ self.get_token(yaml.FlowEntryToken)
+ if not self.check_token(yaml.FlowMappingEndToken):
+ self.parse_map_entry()
+ self.get_token(yaml.FlowMappingEndToken)
+ self.events.append(yaml.MappingEndEvent(None, None))
+
+ # map_entry: KEY node VALUE node
+ def parse_map_entry(self):
+ self.get_token(yaml.KeyToken)
+ self.parse_node()
+ self.get_token(yaml.ValueToken)
+ self.parse_node()
+
+ def parse(self):
+ self.parse_stream()
+ self.parsed = True
+
+ def get_event(self):
+ if not self.parsed:
+ self.parse()
+ return self.events.pop(0)
+
+ def check_event(self, *choices):
+ if not self.parsed:
+ self.parse()
+ if self.events:
+ if not choices:
+ return True
+ for choice in choices:
+ if isinstance(self.events[0], choice):
+ return True
+ return False
+
+ def peek_event(self):
+ if not self.parsed:
+ self.parse()
+ return self.events[0]
+
+class CanonicalLoader(CanonicalScanner, CanonicalParser,
+ yaml.composer.Composer, yaml.constructor.Constructor, yaml.resolver.Resolver):
+
+ def __init__(self, stream):
+ if hasattr(stream, 'read'):
+ stream = stream.read()
+ CanonicalScanner.__init__(self, stream)
+ CanonicalParser.__init__(self)
+ yaml.composer.Composer.__init__(self)
+ yaml.constructor.Constructor.__init__(self)
+ yaml.resolver.Resolver.__init__(self)
+
+yaml.CanonicalLoader = CanonicalLoader
+
+def canonical_scan(stream):
+ return yaml.scan(stream, Loader=CanonicalLoader)
+
+yaml.canonical_scan = canonical_scan
+
+def canonical_parse(stream):
+ return yaml.parse(stream, Loader=CanonicalLoader)
+
+yaml.canonical_parse = canonical_parse
+
+def canonical_compose(stream):
+ return yaml.compose(stream, Loader=CanonicalLoader)
+
+yaml.canonical_compose = canonical_compose
+
+def canonical_compose_all(stream):
+ return yaml.compose_all(stream, Loader=CanonicalLoader)
+
+yaml.canonical_compose_all = canonical_compose_all
+
+def canonical_load(stream):
+ return yaml.load(stream, Loader=CanonicalLoader)
+
+yaml.canonical_load = canonical_load
+
+def canonical_load_all(stream):
+ return yaml.load_all(stream, Loader=CanonicalLoader)
+
+yaml.canonical_load_all = canonical_load_all
+
diff --git a/tests/data/construct-python-name-module.code b/tests/data/construct-python-name-module.code
index 335c7ca..8f93503 100644
--- a/tests/data/construct-python-name-module.code
+++ b/tests/data/construct-python-name-module.code
@@ -1 +1 @@
-[file, Loader, dump, abs, yaml.tokens]
+[file, yaml.Loader, yaml.dump, abs, yaml.tokens]
diff --git a/tests/data/empty-document-bug.empty b/tests/data/empty-document-bug.empty
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/data/empty-document-bug.empty
diff --git a/tests/data/invalid-character.stream-error b/tests/data/invalid-character.stream-error
index 03687b0..24a894c 100644
--- a/tests/data/invalid-character.stream-error
+++ b/tests/data/invalid-character.stream-error
Binary files differ
diff --git a/tests/data/invalid-utf8-byte.stream-error b/tests/data/invalid-utf8-byte.stream-error
index 15111c3..02b1605 100644
--- a/tests/data/invalid-utf8-byte.stream-error
+++ b/tests/data/invalid-utf8-byte.stream-error
@@ -1,18 +1,18 @@
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
+***************************************************************
Invalid byte ('\xFF'): ÿ <--
--------------------------------------------------------------------------------------------------------------------------------
+***************************************************************
diff --git a/tests/data/odd-utf16.stream-error b/tests/data/odd-utf16.stream-error
index 37da060..92ee645 100644
--- a/tests/data/odd-utf16.stream-error
+++ b/tests/data/odd-utf16.stream-error
Binary files differ
diff --git a/tests/data/serializer-is-already-opened.dumper-error b/tests/data/serializer-is-already-opened.dumper-error
index 0f66a46..5ac2e4b 100644
--- a/tests/data/serializer-is-already-opened.dumper-error
+++ b/tests/data/serializer-is-already-opened.dumper-error
@@ -1,3 +1,3 @@
-dumper = Dumper(StringIO.StringIO())
+dumper = yaml.Dumper(StringIO.StringIO())
dumper.open()
dumper.open()
diff --git a/tests/data/serializer-is-closed-1.dumper-error b/tests/data/serializer-is-closed-1.dumper-error
index 5d9ecfc..b6d5c7a 100644
--- a/tests/data/serializer-is-closed-1.dumper-error
+++ b/tests/data/serializer-is-closed-1.dumper-error
@@ -1,4 +1,4 @@
-dumper = Dumper(StringIO.StringIO())
+dumper = yaml.Dumper(StringIO.StringIO())
dumper.open()
dumper.close()
dumper.open()
diff --git a/tests/data/serializer-is-closed-2.dumper-error b/tests/data/serializer-is-closed-2.dumper-error
index 0762145..ff57b4d 100644
--- a/tests/data/serializer-is-closed-2.dumper-error
+++ b/tests/data/serializer-is-closed-2.dumper-error
@@ -1,4 +1,4 @@
-dumper = Dumper(StringIO.StringIO())
+dumper = yaml.Dumper(StringIO.StringIO())
dumper.open()
dumper.close()
-dumper.serialize(ScalarNode(tag='!foo', value='bar'))
+dumper.serialize(yaml.ScalarNode(tag='!foo', value='bar'))
diff --git a/tests/data/serializer-is-not-opened-1.dumper-error b/tests/data/serializer-is-not-opened-1.dumper-error
index 1d21554..8dc6c1e 100644
--- a/tests/data/serializer-is-not-opened-1.dumper-error
+++ b/tests/data/serializer-is-not-opened-1.dumper-error
@@ -1,2 +1,2 @@
-dumper = Dumper(StringIO.StringIO())
+dumper = yaml.Dumper(StringIO.StringIO())
dumper.close()
diff --git a/tests/data/serializer-is-not-opened-2.dumper-error b/tests/data/serializer-is-not-opened-2.dumper-error
index d1d7f98..6922b22 100644
--- a/tests/data/serializer-is-not-opened-2.dumper-error
+++ b/tests/data/serializer-is-not-opened-2.dumper-error
@@ -1,2 +1,2 @@
-dumper = Dumper(StringIO.StringIO())
-dumper.serialize(ScalarNode(tag='!foo', value='bar'))
+dumper = yaml.Dumper(StringIO.StringIO())
+dumper.serialize(yaml.ScalarNode(tag='!foo', value='bar'))
diff --git a/tests/data/unknown.dumper-error b/tests/data/unknown.dumper-error
index 4de4279..83204d2 100644
--- a/tests/data/unknown.dumper-error
+++ b/tests/data/unknown.dumper-error
@@ -1 +1 @@
-safe_dump(object)
+yaml.safe_dump(object)
diff --git a/tests/test_all.py b/tests/test_all.py
index 8ae1c76..fec4ae4 100644
--- a/tests/test_all.py
+++ b/tests/test_all.py
@@ -1,14 +1,14 @@
-import unittest
-
-def main():
- import yaml
- names = ['test_yaml']
- if yaml.__libyaml__:
- names.append('test_yaml_ext')
- suite = unittest.defaultTestLoader.loadTestsFromNames(names)
- runner = unittest.TextTestRunner()
- runner.run(suite)
+import sys, yaml, test_appliance
+
+def main(args=None):
+ collections = []
+ import test_yaml
+ collections.append(test_yaml)
+ if yaml.__with_libyaml__:
+ import test_yaml_ext
+ collections.append(test_yaml_ext)
+ test_appliance.run(collections, args)
if __name__ == '__main__':
main()
diff --git a/tests/test_appliance.py b/tests/test_appliance.py
index 61cc03f..09c46c9 100644
--- a/tests/test_appliance.py
+++ b/tests/test_appliance.py
@@ -1,358 +1,145 @@
-import unittest, os
-
-from yaml import *
-from yaml.composer import *
-from yaml.constructor import *
-from yaml.resolver import *
-
-class TestAppliance(unittest.TestCase):
-
- DATA = 'tests/data'
- SKIP_EXT = '.skip'
-
- all_tests = {}
- for filename in os.listdir(DATA):
- if os.path.isfile(os.path.join(DATA, filename)):
- root, ext = os.path.splitext(filename)
- all_tests.setdefault(root, []).append(ext)
-
- def add_tests(cls, method_name, *extensions):
- for test in cls.all_tests:
- available_extensions = cls.all_tests[test]
- if cls.SKIP_EXT in available_extensions:
- continue
- for ext in extensions:
- if ext not in available_extensions:
- break
- else:
- filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
- def test_method(self, test=test, filenames=filenames):
- getattr(self, '_'+method_name)(test, *filenames)
- test = test.replace('-', '_').replace('.', '_')
- try:
- test_method.__name__ = '%s_%s' % (method_name, test)
- except TypeError:
- import new
- test_method = new.function(test_method.func_code, test_method.func_globals,
- '%s_%s' % (method_name, test), test_method.func_defaults,
- test_method.func_closure)
- setattr(cls, test_method.__name__, test_method)
- add_tests = classmethod(add_tests)
-
-class Error(Exception):
- pass
-
-class CanonicalScanner:
-
- def __init__(self, data):
- self.data = unicode(data, 'utf-8')+u'\0'
- self.index = 0
- self.scan()
-
- def check_token(self, *choices):
- if self.tokens:
- if not choices:
- return True
- for choice in choices:
- if isinstance(self.tokens[0], choice):
- return True
- return False
-
- def peek_token(self):
- if self.tokens:
- return self.tokens[0]
-
- def get_token(self, choice=None):
- token = self.tokens.pop(0)
- if choice and not isinstance(token, choice):
- raise Error("unexpected token "+repr(token))
- return token
-
- def get_token_value(self):
- token = self.get_token()
- return token.value
-
- def scan(self):
- self.tokens = []
- self.tokens.append(StreamStartToken(None, None))
- while True:
- self.find_token()
- ch = self.data[self.index]
- if ch == u'\0':
- self.tokens.append(StreamEndToken(None, None))
- break
- elif ch == u'%':
- self.tokens.append(self.scan_directive())
- elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
- self.index += 3
- self.tokens.append(DocumentStartToken(None, None))
- elif ch == u'[':
- self.index += 1
- self.tokens.append(FlowSequenceStartToken(None, None))
- elif ch == u'{':
- self.index += 1
- self.tokens.append(FlowMappingStartToken(None, None))
- elif ch == u']':
- self.index += 1
- self.tokens.append(FlowSequenceEndToken(None, None))
- elif ch == u'}':
- self.index += 1
- self.tokens.append(FlowMappingEndToken(None, None))
- elif ch == u'?':
- self.index += 1
- self.tokens.append(KeyToken(None, None))
- elif ch == u':':
- self.index += 1
- self.tokens.append(ValueToken(None, None))
- elif ch == u',':
- self.index += 1
- self.tokens.append(FlowEntryToken(None, None))
- elif ch == u'*' or ch == u'&':
- self.tokens.append(self.scan_alias())
- elif ch == u'!':
- self.tokens.append(self.scan_tag())
- elif ch == u'"':
- self.tokens.append(self.scan_scalar())
- else:
- raise Error("invalid token")
-
- DIRECTIVE = u'%YAML 1.1'
-
- def scan_directive(self):
- if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
- self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
- self.index += len(self.DIRECTIVE)
- return DirectiveToken('YAML', (1, 1), None, None)
-
- def scan_alias(self):
- if self.data[self.index] == u'*':
- TokenClass = AliasToken
+import sys, os, os.path, types, traceback, pprint
+
+DATA = 'tests/data'
+
+def find_test_functions(collections):
+ if not isinstance(collections, list):
+ collections = [collections]
+ functions = []
+ for collection in collections:
+ if not isinstance(collection, dict):
+ collection = vars(collection)
+ keys = collection.keys()
+ keys.sort()
+ for key in keys:
+ value = collection[key]
+ if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
+ functions.append(value)
+ return functions
+
+def find_test_filenames(directory):
+ filenames = {}
+ for filename in os.listdir(directory):
+ if os.path.isfile(os.path.join(directory, filename)):
+ base, ext = os.path.splitext(filename)
+ filenames.setdefault(base, []).append(ext)
+ filenames = filenames.items()
+ filenames.sort()
+ return filenames
+
+def parse_arguments(args):
+ if args is None:
+ args = sys.argv[1:]
+ verbose = False
+ if '-v' in args:
+ verbose = True
+ args.remove('-v')
+ if '--verbose' in args:
+ verbose = True
+ if 'YAML_TEST_VERBOSE' in os.environ:
+ verbose = True
+ include_functions = []
+ if args:
+ include_functions.append(args.pop(0))
+ if 'YAML_TEST_FUNCTIONS' in os.environ:
+ include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split())
+ include_filenames = []
+ include_filenames.extend(args)
+ if 'YAML_TEST_FILENAMES' in os.environ:
+ include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split())
+ return include_functions, include_filenames, verbose
+
+def execute(function, filenames, verbose):
+ if verbose:
+ sys.stdout.write('='*75+'\n')
+ sys.stdout.write('%s(%s)...\n' % (function.func_name, ', '.join(filenames)))
+ try:
+ function(verbose=verbose, *filenames)
+ except Exception, exc:
+ info = sys.exc_info()
+ if isinstance(exc, AssertionError):
+ kind = 'FAILURE'
else:
- TokenClass = AnchorToken
- self.index += 1
- start = self.index
- while self.data[self.index] not in u', \n\0':
- self.index += 1
- value = self.data[start:self.index]
- return TokenClass(value, None, None)
-
- def scan_tag(self):
- self.index += 1
- start = self.index
- while self.data[self.index] not in u' \n\0':
- self.index += 1
- value = self.data[start:self.index]
- if value[0] == u'!':
- value = 'tag:yaml.org,2002:'+value[1:]
- elif value[0] == u'<' and value[-1] == u'>':
- value = value[1:-1]
+ kind = 'ERROR'
+ if verbose:
+ traceback.print_exc(limit=1, file=sys.stdout)
else:
- value = u'!'+value
- return TagToken(value, None, None)
-
- QUOTE_CODES = {
- 'x': 2,
- 'u': 4,
- 'U': 8,
- }
-
- QUOTE_REPLACES = {
- u'\\': u'\\',
- u'\"': u'\"',
- u' ': u' ',
- u'a': u'\x07',
- u'b': u'\x08',
- u'e': u'\x1B',
- u'f': u'\x0C',
- u'n': u'\x0A',
- u'r': u'\x0D',
- u't': u'\x09',
- u'v': u'\x0B',
- u'N': u'\u0085',
- u'L': u'\u2028',
- u'P': u'\u2029',
- u'_': u'_',
- u'0': u'\x00',
-
- }
-
- def scan_scalar(self):
- self.index += 1
- chunks = []
- start = self.index
- ignore_spaces = False
- while self.data[self.index] != u'"':
- if self.data[self.index] == u'\\':
- ignore_spaces = False
- chunks.append(self.data[start:self.index])
- self.index += 1
- ch = self.data[self.index]
- self.index += 1
- if ch == u'\n':
- ignore_spaces = True
- elif ch in self.QUOTE_CODES:
- length = self.QUOTE_CODES[ch]
- code = int(self.data[self.index:self.index+length], 16)
- chunks.append(unichr(code))
- self.index += length
+ sys.stdout.write(kind[0])
+ sys.stdout.flush()
+ else:
+ kind = 'SUCCESS'
+ info = None
+ if not verbose:
+ sys.stdout.write('.')
+ sys.stdout.flush()
+ return (function, filenames, kind, info)
+
+def display(results, verbose):
+ if results and not verbose:
+ sys.stdout.write('\n')
+ total = len(results)
+ failures = 0
+ errors = 0
+ for function, filenames, kind, info in results:
+ if kind == 'SUCCESS':
+ continue
+ if kind == 'FAILURE':
+ failures += 1
+ if kind == 'ERROR':
+ errors += 1
+ sys.stdout.write('='*75+'\n')
+ sys.stdout.write('%s(%s): %s\n' % (function.func_name, ', '.join(filenames), kind))
+ if kind == 'ERROR':
+ traceback.print_exception(file=sys.stdout, *info)
+ else:
+ sys.stdout.write('Traceback (most recent call last):\n')
+ traceback.print_tb(info[2], file=sys.stdout)
+ sys.stdout.write('%s: see below\n' % info[0].__name__)
+ sys.stdout.write('~'*75+'\n')
+ for arg in info[1].args:
+ pprint.pprint(arg, stream=sys.stdout, indent=2)
+ for filename in filenames:
+ sys.stdout.write('-'*75+'\n')
+ sys.stdout.write('%s:\n' % filename)
+ data = open(filename, 'rb').read()
+ sys.stdout.write(data)
+ if data and data[-1] != '\n':
+ sys.stdout.write('\n')
+ sys.stdout.write('='*75+'\n')
+ sys.stdout.write('TESTS: %s\n' % total)
+ if failures:
+ sys.stdout.write('FAILURES: %s\n' % failures)
+ if errors:
+ sys.stdout.write('ERRORS: %s\n' % errors)
+
+def run(collections, args=None):
+ test_functions = find_test_functions(collections)
+ test_filenames = find_test_filenames(DATA)
+ include_functions, include_filenames, verbose = parse_arguments(args)
+ results = []
+ for function in test_functions:
+ if include_functions and function.func_name not in include_functions:
+ continue
+ if function.unittest:
+ for base, exts in test_filenames:
+ if include_filenames and base not in include_filenames:
+ continue
+ filenames = []
+ for ext in function.unittest:
+ if ext not in exts:
+ break
+ filenames.append(os.path.join(DATA, base+ext))
else:
- chunks.append(self.QUOTE_REPLACES[ch])
- start = self.index
- elif self.data[self.index] == u'\n':
- chunks.append(self.data[start:self.index])
- chunks.append(u' ')
- self.index += 1
- start = self.index
- ignore_spaces = True
- elif ignore_spaces and self.data[self.index] == u' ':
- self.index += 1
- start = self.index
- else:
- ignore_spaces = False
- self.index += 1
- chunks.append(self.data[start:self.index])
- self.index += 1
- return ScalarToken(u''.join(chunks), False, None, None)
-
- def find_token(self):
- found = False
- while not found:
- while self.data[self.index] in u' \t':
- self.index += 1
- if self.data[self.index] == u'#':
- while self.data[self.index] != u'\n':
- self.index += 1
- if self.data[self.index] == u'\n':
- self.index += 1
- else:
- found = True
-
-class CanonicalParser:
-
- def __init__(self):
- self.events = []
- self.parse()
-
- # stream: STREAM-START document* STREAM-END
- def parse_stream(self):
- self.get_token(StreamStartToken)
- self.events.append(StreamStartEvent(None, None))
- while not self.check_token(StreamEndToken):
- if self.check_token(DirectiveToken, DocumentStartToken):
- self.parse_document()
- else:
- raise Error("document is expected, got "+repr(self.tokens[self.index]))
- self.get_token(StreamEndToken)
- self.events.append(StreamEndEvent(None, None))
-
- # document: DIRECTIVE? DOCUMENT-START node
- def parse_document(self):
- node = None
- if self.check_token(DirectiveToken):
- self.get_token(DirectiveToken)
- self.get_token(DocumentStartToken)
- self.events.append(DocumentStartEvent(None, None))
- self.parse_node()
- self.events.append(DocumentEndEvent(None, None))
-
- # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
- def parse_node(self):
- if self.check_token(AliasToken):
- self.events.append(AliasEvent(self.get_token_value(), None, None))
+ skip_exts = getattr(function, 'skip', [])
+ for skip_ext in skip_exts:
+ if skip_ext in exts:
+ break
+ else:
+ result = execute(function, filenames, verbose)
+ results.append(result)
else:
- anchor = None
- if self.check_token(AnchorToken):
- anchor = self.get_token_value()
- tag = None
- if self.check_token(TagToken):
- tag = self.get_token_value()
- if self.check_token(ScalarToken):
- self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
- elif self.check_token(FlowSequenceStartToken):
- self.events.append(SequenceStartEvent(anchor, tag, None, None))
- self.parse_sequence()
- elif self.check_token(FlowMappingStartToken):
- self.events.append(MappingStartEvent(anchor, tag, None, None))
- self.parse_mapping()
- else:
- raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
-
- # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
- def parse_sequence(self):
- self.get_token(FlowSequenceStartToken)
- if not self.check_token(FlowSequenceEndToken):
- self.parse_node()
- while not self.check_token(FlowSequenceEndToken):
- self.get_token(FlowEntryToken)
- if not self.check_token(FlowSequenceEndToken):
- self.parse_node()
- self.get_token(FlowSequenceEndToken)
- self.events.append(SequenceEndEvent(None, None))
-
- # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
- def parse_mapping(self):
- self.get_token(FlowMappingStartToken)
- if not self.check_token(FlowMappingEndToken):
- self.parse_map_entry()
- while not self.check_token(FlowMappingEndToken):
- self.get_token(FlowEntryToken)
- if not self.check_token(FlowMappingEndToken):
- self.parse_map_entry()
- self.get_token(FlowMappingEndToken)
- self.events.append(MappingEndEvent(None, None))
-
- # map_entry: KEY node VALUE node
- def parse_map_entry(self):
- self.get_token(KeyToken)
- self.parse_node()
- self.get_token(ValueToken)
- self.parse_node()
-
- def parse(self):
- self.parse_stream()
-
- def get_event(self):
- return self.events.pop(0)
-
- def check_event(self, *choices):
- if self.events:
- if not choices:
- return True
- for choice in choices:
- if isinstance(self.events[0], choice):
- return True
- return False
-
- def peek_event(self):
- return self.events[0]
-
-class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
-
- def __init__(self, stream):
- if hasattr(stream, 'read'):
- stream = stream.read()
- CanonicalScanner.__init__(self, stream)
- CanonicalParser.__init__(self)
- Composer.__init__(self)
- Constructor.__init__(self)
- Resolver.__init__(self)
-
-def canonical_scan(stream):
- return scan(stream, Loader=CanonicalLoader)
-
-def canonical_parse(stream):
- return parse(stream, Loader=CanonicalLoader)
-
-def canonical_compose(stream):
- return compose(stream, Loader=CanonicalLoader)
-
-def canonical_compose_all(stream):
- return compose_all(stream, Loader=CanonicalLoader)
-
-def canonical_load(stream):
- return load(stream, Loader=CanonicalLoader)
-
-def canonical_load_all(stream):
- return load_all(stream, Loader=CanonicalLoader)
+ result = execute(function, [], verbose)
+ results.append(result)
+ display(results, verbose=verbose)
diff --git a/tests/test_build.py b/tests/test_build.py
index 42afa78..901e8ed 100644
--- a/tests/test_build.py
+++ b/tests/test_build.py
@@ -1,13 +1,10 @@
-def main():
+if __name__ == '__main__':
import sys, os, distutils.util
build_lib = 'build/lib'
build_lib_ext = os.path.join('build', 'lib.%s-%s' % (distutils.util.get_platform(), sys.version[0:3]))
sys.path.insert(0, build_lib)
sys.path.insert(0, build_lib_ext)
- import test_yaml
- test_yaml.main('test_yaml')
-
-if __name__ == '__main__':
- main()
+ import test_yaml, test_appliance
+ test_appliance.run(test_yaml)
diff --git a/tests/test_build_ext.py b/tests/test_build_ext.py
index a335c1b..ff195d5 100644
--- a/tests/test_build_ext.py
+++ b/tests/test_build_ext.py
@@ -1,14 +1,11 @@
-def main():
+if __name__ == '__main__':
import sys, os, distutils.util
build_lib = 'build/lib'
build_lib_ext = os.path.join('build', 'lib.%s-%s' % (distutils.util.get_platform(), sys.version[0:3]))
sys.path.insert(0, build_lib)
sys.path.insert(0, build_lib_ext)
- import test_yaml_ext
- test_yaml_ext.main('test_yaml_ext')
-
-if __name__ == '__main__':
- main()
+ import test_yaml_ext, test_appliance
+ test_appliance.run(test_yaml_ext)
diff --git a/tests/test_canonical.py b/tests/test_canonical.py
index 4416902..a851ef2 100644
--- a/tests/test_canonical.py
+++ b/tests/test_canonical.py
@@ -1,20 +1,40 @@
-import test_appliance
+import yaml, canonical
-class TestCanonicalAppliance(test_appliance.TestAppliance):
+def test_canonical_scanner(canonical_filename, verbose=False):
+ data = open(canonical_filename, 'rb').read()
+ tokens = list(yaml.canonical_scan(data))
+ assert tokens, tokens
+ if verbose:
+ for token in tokens:
+ print token
- def _testCanonicalScanner(self, test_name, canonical_filename):
- data = file(canonical_filename, 'rb').read()
- tokens = list(test_appliance.canonical_scan(data))
- #for token in tokens:
- # print token
+test_canonical_scanner.unittest = ['.canonical']
- def _testCanonicalParser(self, test_name, canonical_filename):
- data = file(canonical_filename, 'rb').read()
- event = list(test_appliance.canonical_parse(data))
- #for event in events:
- # print event
+def test_canonical_parser(canonical_filename, verbose=False):
+ data = open(canonical_filename, 'rb').read()
+ events = list(yaml.canonical_parse(data))
+ assert events, events
+ if verbose:
+ for event in events:
+ print event
-TestCanonicalAppliance.add_tests('testCanonicalScanner', '.canonical')
-TestCanonicalAppliance.add_tests('testCanonicalParser', '.canonical')
+test_canonical_parser.unittest = ['.canonical']
+
+def test_canonical_error(data_filename, canonical_filename, verbose=False):
+ data = open(data_filename, 'rb').read()
+ try:
+ output = list(yaml.canonical_load_all(data))
+ except yaml.YAMLError, exc:
+ if verbose:
+ print exc
+ else:
+ raise AssertionError("expected an exception")
+
+test_canonical_error.unittest = ['.data', '.canonical']
+test_canonical_error.skip = ['.empty']
+
+if __name__ == '__main__':
+ import test_appliance
+ test_appliance.run(globals())
diff --git a/tests/test_constructor.py b/tests/test_constructor.py
index 04f54ef..bb01692 100644
--- a/tests/test_constructor.py
+++ b/tests/test_constructor.py
@@ -1,313 +1,271 @@
-import test_appliance
+import yaml
+import pprint
import datetime
try:
set
except NameError:
from sets import Set as set
-
-from yaml import *
-
import yaml.tokens
-class MyLoader(Loader):
- pass
-class MyDumper(Dumper):
- pass
-
-class MyTestClass1:
-
- def __init__(self, x, y=0, z=0):
- self.x = x
- self.y = y
- self.z = z
-
- def __eq__(self, other):
- if isinstance(other, MyTestClass1):
- return self.__class__, self.__dict__ == other.__class__, other.__dict__
- else:
- return False
-
-def construct1(constructor, node):
- mapping = constructor.construct_mapping(node)
- return MyTestClass1(**mapping)
-def represent1(representer, native):
- return representer.represent_mapping("!tag1", native.__dict__)
-
-add_constructor("!tag1", construct1, Loader=MyLoader)
-add_representer(MyTestClass1, represent1, Dumper=MyDumper)
-
-class MyTestClass2(MyTestClass1, YAMLObject):
-
- yaml_loader = MyLoader
- yaml_dumper = MyDumper
- yaml_tag = "!tag2"
-
- def from_yaml(cls, constructor, node):
- x = constructor.construct_yaml_int(node)
- return cls(x=x)
- from_yaml = classmethod(from_yaml)
-
- def to_yaml(cls, representer, native):
- return representer.represent_scalar(cls.yaml_tag, str(native.x))
- to_yaml = classmethod(to_yaml)
-
-class MyTestClass3(MyTestClass2):
-
- yaml_tag = "!tag3"
-
- def from_yaml(cls, constructor, node):
- mapping = constructor.construct_mapping(node)
- if '=' in mapping:
- x = mapping['=']
- del mapping['=']
- mapping['x'] = x
- return cls(**mapping)
- from_yaml = classmethod(from_yaml)
-
- def to_yaml(cls, representer, native):
- return representer.represent_mapping(cls.yaml_tag, native.__dict__)
- to_yaml = classmethod(to_yaml)
-
-class YAMLObject1(YAMLObject):
-
- yaml_loader = MyLoader
- yaml_dumper = MyDumper
- yaml_tag = '!foo'
-
- def __init__(self, my_parameter=None, my_another_parameter=None):
- self.my_parameter = my_parameter
- self.my_another_parameter = my_another_parameter
-
- def __eq__(self, other):
- if isinstance(other, YAMLObject1):
- return self.__class__, self.__dict__ == other.__class__, other.__dict__
- else:
- return False
-
-class YAMLObject2(YAMLObject):
-
- yaml_loader = MyLoader
- yaml_dumper = MyDumper
- yaml_tag = '!bar'
-
- def __init__(self, foo=1, bar=2, baz=3):
- self.foo = foo
- self.bar = bar
- self.baz = baz
-
- def __getstate__(self):
- return {1: self.foo, 2: self.bar, 3: self.baz}
-
- def __setstate__(self, state):
- self.foo = state[1]
- self.bar = state[2]
- self.baz = state[3]
-
- def __eq__(self, other):
- if isinstance(other, YAMLObject2):
- return self.__class__, self.__dict__ == other.__class__, other.__dict__
- else:
- return False
-
-class AnObject(object):
-
- def __new__(cls, foo=None, bar=None, baz=None):
- self = object.__new__(cls)
- self.foo = foo
- self.bar = bar
- self.baz = baz
- return self
-
- def __cmp__(self, other):
- return cmp((type(self), self.foo, self.bar, self.baz),
- (type(other), other.foo, other.bar, other.baz))
-
- def __eq__(self, other):
- return type(self) is type(other) and \
- (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)
-
-class AnInstance:
-
- def __init__(self, foo=None, bar=None, baz=None):
- self.foo = foo
- self.bar = bar
- self.baz = baz
-
- def __cmp__(self, other):
- return cmp((type(self), self.foo, self.bar, self.baz),
- (type(other), other.foo, other.bar, other.baz))
-
- def __eq__(self, other):
- return type(self) is type(other) and \
- (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)
-
-class AState(AnInstance):
-
- def __getstate__(self):
- return {
- '_foo': self.foo,
- '_bar': self.bar,
- '_baz': self.baz,
- }
-
- def __setstate__(self, state):
- self.foo = state['_foo']
- self.bar = state['_bar']
- self.baz = state['_baz']
-
-class ACustomState(AnInstance):
-
- def __getstate__(self):
- return (self.foo, self.bar, self.baz)
-
- def __setstate__(self, state):
- self.foo, self.bar, self.baz = state
-
-class InitArgs(AnInstance):
-
- def __getinitargs__(self):
- return (self.foo, self.bar, self.baz)
-
- def __getstate__(self):
- return {}
-
-class InitArgsWithState(AnInstance):
-
- def __getinitargs__(self):
- return (self.foo, self.bar)
-
- def __getstate__(self):
- return self.baz
-
- def __setstate__(self, state):
- self.baz = state
-
-class NewArgs(AnObject):
-
- def __getnewargs__(self):
- return (self.foo, self.bar, self.baz)
-
- def __getstate__(self):
- return {}
-
-class NewArgsWithState(AnObject):
-
- def __getnewargs__(self):
- return (self.foo, self.bar)
-
- def __getstate__(self):
- return self.baz
-
- def __setstate__(self, state):
- self.baz = state
-
-class Reduce(AnObject):
-
- def __reduce__(self):
- return self.__class__, (self.foo, self.bar, self.baz)
-
-class ReduceWithState(AnObject):
-
- def __reduce__(self):
- return self.__class__, (self.foo, self.bar), self.baz
-
- def __setstate__(self, state):
- self.baz = state
-
-class MyInt(int):
-
- def __eq__(self, other):
- return type(self) is type(other) and int(self) == int(other)
-
-class MyList(list):
-
- def __init__(self, n=1):
- self.extend([None]*n)
-
- def __eq__(self, other):
- return type(self) is type(other) and list(self) == list(other)
-
-class MyDict(dict):
-
- def __init__(self, n=1):
- for k in range(n):
- self[k] = None
-
- def __eq__(self, other):
- return type(self) is type(other) and dict(self) == dict(other)
-
-class FixedOffset(datetime.tzinfo):
-
- def __init__(self, offset, name):
- self.__offset = datetime.timedelta(minutes=offset)
- self.__name = name
-
- def utcoffset(self, dt):
- return self.__offset
-
- def tzname(self, dt):
- return self.__name
-
- def dst(self, dt):
- return datetime.timedelta(0)
-
-
def execute(code):
exec code
return value
-class TestConstructorTypes(test_appliance.TestAppliance):
-
- def _testTypes(self, test_name, data_filename, code_filename):
- data1 = None
- data2 = None
+def _make_objects():
+ global MyLoader, MyDumper, MyTestClass1, MyTestClass2, MyTestClass3, YAMLObject1, YAMLObject2, \
+ AnObject, AnInstance, AState, ACustomState, InitArgs, InitArgsWithState, \
+ NewArgs, NewArgsWithState, Reduce, ReduceWithState, MyInt, MyList, MyDict, \
+ FixedOffset, execute
+
+ class MyLoader(yaml.Loader):
+ pass
+ class MyDumper(yaml.Dumper):
+ pass
+
+ class MyTestClass1:
+ def __init__(self, x, y=0, z=0):
+ self.x = x
+ self.y = y
+ self.z = z
+ def __eq__(self, other):
+ if isinstance(other, MyTestClass1):
+ return self.__class__, self.__dict__ == other.__class__, other.__dict__
+ else:
+ return False
+
+ def construct1(constructor, node):
+ mapping = constructor.construct_mapping(node)
+ return MyTestClass1(**mapping)
+ def represent1(representer, native):
+ return representer.represent_mapping("!tag1", native.__dict__)
+
+ yaml.add_constructor("!tag1", construct1, Loader=MyLoader)
+ yaml.add_representer(MyTestClass1, represent1, Dumper=MyDumper)
+
+ class MyTestClass2(MyTestClass1, yaml.YAMLObject):
+ yaml_loader = MyLoader
+ yaml_dumper = MyDumper
+ yaml_tag = "!tag2"
+ def from_yaml(cls, constructor, node):
+ x = constructor.construct_yaml_int(node)
+ return cls(x=x)
+ from_yaml = classmethod(from_yaml)
+ def to_yaml(cls, representer, native):
+ return representer.represent_scalar(cls.yaml_tag, str(native.x))
+ to_yaml = classmethod(to_yaml)
+
+ class MyTestClass3(MyTestClass2):
+ yaml_tag = "!tag3"
+ def from_yaml(cls, constructor, node):
+ mapping = constructor.construct_mapping(node)
+ if '=' in mapping:
+ x = mapping['=']
+ del mapping['=']
+ mapping['x'] = x
+ return cls(**mapping)
+ from_yaml = classmethod(from_yaml)
+ def to_yaml(cls, representer, native):
+ return representer.represent_mapping(cls.yaml_tag, native.__dict__)
+ to_yaml = classmethod(to_yaml)
+
+ class YAMLObject1(yaml.YAMLObject):
+ yaml_loader = MyLoader
+ yaml_dumper = MyDumper
+ yaml_tag = '!foo'
+ def __init__(self, my_parameter=None, my_another_parameter=None):
+ self.my_parameter = my_parameter
+ self.my_another_parameter = my_another_parameter
+ def __eq__(self, other):
+ if isinstance(other, YAMLObject1):
+ return self.__class__, self.__dict__ == other.__class__, other.__dict__
+ else:
+ return False
+
+ class YAMLObject2(yaml.YAMLObject):
+ yaml_loader = MyLoader
+ yaml_dumper = MyDumper
+ yaml_tag = '!bar'
+ def __init__(self, foo=1, bar=2, baz=3):
+ self.foo = foo
+ self.bar = bar
+ self.baz = baz
+ def __getstate__(self):
+ return {1: self.foo, 2: self.bar, 3: self.baz}
+ def __setstate__(self, state):
+ self.foo = state[1]
+ self.bar = state[2]
+ self.baz = state[3]
+ def __eq__(self, other):
+ if isinstance(other, YAMLObject2):
+ return self.__class__, self.__dict__ == other.__class__, other.__dict__
+ else:
+ return False
+
+ class AnObject(object):
+ def __new__(cls, foo=None, bar=None, baz=None):
+ self = object.__new__(cls)
+ self.foo = foo
+ self.bar = bar
+ self.baz = baz
+ return self
+ def __cmp__(self, other):
+ return cmp((type(self), self.foo, self.bar, self.baz),
+ (type(other), other.foo, other.bar, other.baz))
+ def __eq__(self, other):
+ return type(self) is type(other) and \
+ (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)
+
+ class AnInstance:
+ def __init__(self, foo=None, bar=None, baz=None):
+ self.foo = foo
+ self.bar = bar
+ self.baz = baz
+ def __cmp__(self, other):
+ return cmp((type(self), self.foo, self.bar, self.baz),
+ (type(other), other.foo, other.bar, other.baz))
+ def __eq__(self, other):
+ return type(self) is type(other) and \
+ (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz)
+
+ class AState(AnInstance):
+ def __getstate__(self):
+ return {
+ '_foo': self.foo,
+ '_bar': self.bar,
+ '_baz': self.baz,
+ }
+ def __setstate__(self, state):
+ self.foo = state['_foo']
+ self.bar = state['_bar']
+ self.baz = state['_baz']
+
+ class ACustomState(AnInstance):
+ def __getstate__(self):
+ return (self.foo, self.bar, self.baz)
+ def __setstate__(self, state):
+ self.foo, self.bar, self.baz = state
+
+ class InitArgs(AnInstance):
+ def __getinitargs__(self):
+ return (self.foo, self.bar, self.baz)
+ def __getstate__(self):
+ return {}
+
+ class InitArgsWithState(AnInstance):
+ def __getinitargs__(self):
+ return (self.foo, self.bar)
+ def __getstate__(self):
+ return self.baz
+ def __setstate__(self, state):
+ self.baz = state
+
+ class NewArgs(AnObject):
+ def __getnewargs__(self):
+ return (self.foo, self.bar, self.baz)
+ def __getstate__(self):
+ return {}
+
+ class NewArgsWithState(AnObject):
+ def __getnewargs__(self):
+ return (self.foo, self.bar)
+ def __getstate__(self):
+ return self.baz
+ def __setstate__(self, state):
+ self.baz = state
+
+ class Reduce(AnObject):
+ def __reduce__(self):
+ return self.__class__, (self.foo, self.bar, self.baz)
+
+ class ReduceWithState(AnObject):
+ def __reduce__(self):
+ return self.__class__, (self.foo, self.bar), self.baz
+ def __setstate__(self, state):
+ self.baz = state
+
+ class MyInt(int):
+ def __eq__(self, other):
+ return type(self) is type(other) and int(self) == int(other)
+
+ class MyList(list):
+ def __init__(self, n=1):
+ self.extend([None]*n)
+ def __eq__(self, other):
+ return type(self) is type(other) and list(self) == list(other)
+
+ class MyDict(dict):
+ def __init__(self, n=1):
+ for k in range(n):
+ self[k] = None
+ def __eq__(self, other):
+ return type(self) is type(other) and dict(self) == dict(other)
+
+ class FixedOffset(datetime.tzinfo):
+ def __init__(self, offset, name):
+ self.__offset = datetime.timedelta(minutes=offset)
+ self.__name = name
+ def utcoffset(self, dt):
+ return self.__offset
+ def tzname(self, dt):
+ return self.__name
+ def dst(self, dt):
+ return datetime.timedelta(0)
+
+def _load_code(expression):
+ return eval(expression)
+
+def _serialize_value(data):
+ if isinstance(data, list):
+ return '[%s]' % ', '.join(map(_serialize_value, data))
+ elif isinstance(data, dict):
+ items = []
+ for key, value in data.items():
+ key = _serialize_value(key)
+ value = _serialize_value(value)
+ items.append("%s: %s" % (key, value))
+ items.sort()
+ return '{%s}' % ', '.join(items)
+ elif isinstance(data, datetime.datetime):
+ return repr(data.utctimetuple())
+ elif isinstance(data, unicode):
+ return data.encode('utf-8')
+ else:
+ return str(data)
+
+def test_constructor_types(data_filename, code_filename, verbose=False):
+ _make_objects()
+ native1 = None
+ native2 = None
+ try:
+ native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader))
+ if len(native1) == 1:
+ native1 = native1[0]
+ native2 = _load_code(open(code_filename, 'rb').read())
try:
- data1 = list(load_all(file(data_filename, 'rb'), Loader=MyLoader))
- if len(data1) == 1:
- data1 = data1[0]
- data2 = eval(file(code_filename, 'rb').read())
- self.failUnlessEqual(type(data1), type(data2))
- try:
- self.failUnlessEqual(data1, data2)
- except (AssertionError, TypeError):
- if isinstance(data1, dict):
- data1 = [(repr(key), value) for key, value in data1.items()]
- data1.sort()
- data1 = repr(data1)
- data2 = [(repr(key), value) for key, value in data2.items()]
- data2.sort()
- data2 = repr(data2)
- if data1 != data2:
- raise
- elif isinstance(data1, list):
- self.failUnlessEqual(type(data1), type(data2))
- self.failUnlessEqual(len(data1), len(data2))
- for item1, item2 in zip(data1, data2):
- if (item1 != item1 or (item1 == 0.0 and item1 == 1.0)) and \
- (item2 != item2 or (item2 == 0.0 and item2 == 1.0)):
- continue
- if isinstance(item1, datetime.datetime) \
- and isinstance(item2, datetime.datetime):
- self.failUnlessEqual(item1.microsecond,
- item2.microsecond)
- if isinstance(item1, datetime.datetime):
- item1 = item1.utctimetuple()
- if isinstance(item2, datetime.datetime):
- item2 = item2.utctimetuple()
- self.failUnlessEqual(item1, item2)
- else:
- raise
- except:
- print
- print "DATA:"
- print file(data_filename, 'rb').read()
- print "CODE:"
- print file(code_filename, 'rb').read()
- print "NATIVES1:", data1
- print "NATIVES2:", data2
- raise
-
-TestConstructorTypes.add_tests('testTypes', '.data', '.code')
+ if native1 == native2:
+ return
+ except TypeError:
+ pass
+ if verbose:
+ print "SERIALIZED NATIVE1:"
+ print _serialize_value(native1)
+ print "SERIALIZED NATIVE2:"
+ print _serialize_value(native2)
+ assert _serialize_value(native1) == _serialize_value(native2), (native1, native2)
+ finally:
+ if verbose:
+ print "NATIVE1:"
+ pprint.pprint(native1)
+ print "NATIVE2:"
+ pprint.pprint(native2)
+
+test_constructor_types.unittest = ['.data', '.code']
+
+if __name__ == '__main__':
+ import sys, test_constructor
+ sys.modules['test_constructor'] = sys.modules['__main__']
+ import test_appliance
+ test_appliance.run(globals())
diff --git a/tests/test_emitter.py b/tests/test_emitter.py
index 77e3728..61fd941 100644
--- a/tests/test_emitter.py
+++ b/tests/test_emitter.py
@@ -1,91 +1,72 @@
-import test_appliance, sys, StringIO
-
-from yaml import *
import yaml
-class TestEmitter(test_appliance.TestAppliance):
-
- def _testEmitterOnData(self, test_name, canonical_filename, data_filename):
- self._testEmitter(test_name, data_filename)
-
- def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename):
- self._testEmitter(test_name, canonical_filename, False)
-
- def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename):
- self._testEmitter(test_name, canonical_filename, True)
-
- def _testEmitter(self, test_name, filename, canonical=None):
- events = list(parse(file(filename, 'rb')))
- #self._dump(filename, events, canonical)
- stream = StringIO.StringIO()
- emit(events, stream, canonical=canonical)
- data = stream.getvalue()
- new_events = list(parse(data))
- for event, new_event in zip(events, new_events):
- self.failUnlessEqual(event.__class__, new_event.__class__)
- if isinstance(event, NodeEvent):
- self.failUnlessEqual(event.anchor, new_event.anchor)
- if isinstance(event, CollectionStartEvent):
- self.failUnlessEqual(event.tag, new_event.tag)
- if isinstance(event, ScalarEvent):
- #self.failUnlessEqual(event.implicit, new_event.implicit)
- if True not in event.implicit+new_event.implicit:
- self.failUnlessEqual(event.tag, new_event.tag)
- self.failUnlessEqual(event.value, new_event.value)
-
- def _testEmitterStyles(self, test_name, canonical_filename, data_filename):
- for filename in [canonical_filename, data_filename]:
- events = list(parse(file(filename, 'rb')))
- for flow_style in [False, True]:
- for style in ['|', '>', '"', '\'', '']:
- styled_events = []
- for event in events:
- if isinstance(event, ScalarEvent):
- event = ScalarEvent(event.anchor, event.tag,
- event.implicit, event.value, style=style)
- elif isinstance(event, SequenceStartEvent):
- event = SequenceStartEvent(event.anchor, event.tag,
- event.implicit, flow_style=flow_style)
- elif isinstance(event, MappingStartEvent):
- event = MappingStartEvent(event.anchor, event.tag,
- event.implicit, flow_style=flow_style)
- styled_events.append(event)
- stream = StringIO.StringIO()
- emit(styled_events, stream)
- data = stream.getvalue()
- #print data
- new_events = list(parse(data))
- for event, new_event in zip(events, new_events):
- self.failUnlessEqual(event.__class__, new_event.__class__)
- if isinstance(event, NodeEvent):
- self.failUnlessEqual(event.anchor, new_event.anchor)
- if isinstance(event, CollectionStartEvent):
- self.failUnlessEqual(event.tag, new_event.tag)
- if isinstance(event, ScalarEvent):
- #self.failUnlessEqual(event.implicit, new_event.implicit)
- if True not in event.implicit+new_event.implicit:
- self.failUnlessEqual(event.tag, new_event.tag)
- self.failUnlessEqual(event.value, new_event.value)
-
-
- def _dump(self, filename, events, canonical):
- print "="*30
- print "ORIGINAL DOCUMENT:"
- print file(filename, 'rb').read()
- print '-'*30
- print "EMITTED DOCUMENT:"
- emit(events, sys.stdout, canonical=canonical)
-
-TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data')
-TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical')
-TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical')
-TestEmitter.add_tests('testEmitterStyles', '.canonical', '.data')
-
-class EventsLoader(Loader):
+def _compare_events(events1, events2):
+ assert len(events1) == len(events2), (events1, events2)
+ for event1, event2 in zip(events1, events2):
+ assert event1.__class__ == event2.__class__, (event1, event2)
+ if isinstance(event1, yaml.NodeEvent):
+ assert event1.anchor == event2.anchor, (event1, event2)
+ if isinstance(event1, yaml.CollectionStartEvent):
+ assert event1.tag == event2.tag, (event1, event2)
+ if isinstance(event1, yaml.ScalarEvent):
+ if True not in event1.implicit+event2.implicit:
+ assert event1.tag == event2.tag, (event1, event2)
+ assert event1.value == event2.value, (event1, event2)
+
+def test_emitter_on_data(data_filename, canonical_filename, verbose=False):
+ events = list(yaml.parse(open(data_filename, 'rb')))
+ output = yaml.emit(events)
+ if verbose:
+ print "OUTPUT:"
+ print output
+ new_events = list(yaml.parse(output))
+ _compare_events(events, new_events)
+
+test_emitter_on_data.unittest = ['.data', '.canonical']
+
+def test_emitter_on_canonical(canonical_filename, verbose=False):
+ events = list(yaml.parse(open(canonical_filename, 'rb')))
+ for canonical in [False, True]:
+ output = yaml.emit(events, canonical=canonical)
+ if verbose:
+ print "OUTPUT (canonical=%s):" % canonical
+ print output
+ new_events = list(yaml.parse(output))
+ _compare_events(events, new_events)
+
+test_emitter_on_canonical.unittest = ['.canonical']
+
+def test_emitter_styles(data_filename, canonical_filename, verbose=False):
+ for filename in [data_filename, canonical_filename]:
+ events = list(yaml.parse(open(filename, 'rb')))
+ for flow_style in [False, True]:
+ for style in ['|', '>', '"', '\'', '']:
+ styled_events = []
+ for event in events:
+ if isinstance(event, yaml.ScalarEvent):
+ event = yaml.ScalarEvent(event.anchor, event.tag,
+ event.implicit, event.value, style=style)
+ elif isinstance(event, yaml.SequenceStartEvent):
+ event = yaml.SequenceStartEvent(event.anchor, event.tag,
+ event.implicit, flow_style=flow_style)
+ elif isinstance(event, yaml.MappingStartEvent):
+ event = yaml.MappingStartEvent(event.anchor, event.tag,
+ event.implicit, flow_style=flow_style)
+ styled_events.append(event)
+ output = yaml.emit(styled_events)
+ if verbose:
+ print "OUTPUT (filename=%r, flow_style=%r, style=%r)" % (filename, flow_style, style)
+ print output
+ new_events = list(yaml.parse(output))
+ _compare_events(events, new_events)
+
+test_emitter_styles.unittest = ['.data', '.canonical']
+
+class EventsLoader(yaml.Loader):
def construct_event(self, node):
- if isinstance(node, ScalarNode):
+ if isinstance(node, yaml.ScalarNode):
mapping = {}
else:
mapping = self.construct_mapping(node)
@@ -104,34 +85,16 @@ class EventsLoader(Loader):
EventsLoader.add_constructor(None, EventsLoader.construct_event)
-class TestEmitterEvents(test_appliance.TestAppliance):
-
- def _testEmitterEvents(self, test_name, events_filename):
- events = list(load(file(events_filename, 'rb'), Loader=EventsLoader))
- #self._dump(events_filename, events)
- stream = StringIO.StringIO()
- emit(events, stream)
- data = stream.getvalue()
- new_events = list(parse(data))
- self.failUnlessEqual(len(events), len(new_events))
- for event, new_event in zip(events, new_events):
- self.failUnlessEqual(event.__class__, new_event.__class__)
- if isinstance(event, NodeEvent):
- self.failUnlessEqual(event.anchor, new_event.anchor)
- if isinstance(event, CollectionStartEvent):
- self.failUnlessEqual(event.tag, new_event.tag)
- if isinstance(event, ScalarEvent):
- self.failUnless(event.implicit == new_event.implicit
- or event.tag == new_event.tag)
- self.failUnlessEqual(event.value, new_event.value)
-
- def _dump(self, events_filename, events):
- print "="*30
- print "EVENTS:"
- print file(events_filename, 'rb').read()
- print '-'*30
+def test_emitter_events(events_filename, verbose=False):
+ events = list(yaml.load(open(events_filename, 'rb'), Loader=EventsLoader))
+ output = yaml.emit(events)
+ if verbose:
print "OUTPUT:"
- emit(events, sys.stdout)
-
-TestEmitterEvents.add_tests('testEmitterEvents', '.events')
+ print output
+ new_events = list(yaml.parse(output))
+ _compare_events(events, new_events)
+
+if __name__ == '__main__':
+ import test_appliance
+ test_appliance.run(globals())
diff --git a/tests/test_errors.py b/tests/test_errors.py
index 7d6e0d2..5fd7c46 100644
--- a/tests/test_errors.py
+++ b/tests/test_errors.py
@@ -1,91 +1,66 @@
-import test_appliance
-import test_emitter
-
-import StringIO
-
-from yaml import *
-
-class TestErrors(test_appliance.TestAppliance):
-
- def _testLoaderErrors(self, test_name, invalid_filename):
- #self._load(invalid_filename)
- self.failUnlessRaises(YAMLError, lambda: self._load(invalid_filename))
-
- def _testLoaderStringErrors(self, test_name, invalid_filename):
- #self._load_string(invalid_filename)
- self.failUnlessRaises(YAMLError, lambda: self._load_string(invalid_filename))
-
- def _testLoaderSingleErrors(self, test_name, invalid_filename):
- #self._load_single(invalid_filename)
- self.failUnlessRaises(YAMLError, lambda: self._load_single(invalid_filename))
-
- def _testEmitterErrors(self, test_name, invalid_filename):
- events = list(load(file(invalid_filename, 'rb').read(),
- Loader=test_emitter.EventsLoader))
- self.failUnlessRaises(YAMLError, lambda: self._emit(events))
-
- def _testDumperErrors(self, test_name, invalid_filename):
- code = file(invalid_filename, 'rb').read()
- self.failUnlessRaises(YAMLError, lambda: self._dump(code))
-
- def _dump(self, code):
- try:
- exec code
- except YAMLError, exc:
- #print '.'*70
- #print "%s:" % exc.__class__.__name__, exc
- raise
-
- def _emit(self, events):
- try:
- emit(events)
- except YAMLError, exc:
- #print '.'*70
- #print "%s:" % exc.__class__.__name__, exc
- raise
-
- def _load(self, filename):
- try:
- return list(load_all(file(filename, 'rb')))
- except YAMLError, exc:
- #except ScannerError, exc:
- #except ParserError, exc:
- #except ComposerError, exc:
- #except ConstructorError, exc:
- #print '.'*70
- #print "%s:" % exc.__class__.__name__, exc
- raise
-
- def _load_string(self, filename):
- try:
- return list(load_all(file(filename, 'rb').read()))
- except YAMLError, exc:
- #except ScannerError, exc:
- #except ParserError, exc:
- #except ComposerError, exc:
- #except ConstructorError, exc:
- #print '.'*70
- #print "%s:" % filename
- #print "%s:" % exc.__class__.__name__, exc
- raise
-
- def _load_single(self, filename):
- try:
- return load(file(filename, 'rb').read())
- except YAMLError, exc:
- #except ScannerError, exc:
- #except ParserError, exc:
- #except ComposerError, exc:
- #except ConstructorError, exc:
- #print '.'*70
- #print "%s:" % filename
- #print "%s:" % exc.__class__.__name__, exc
- raise
-
-TestErrors.add_tests('testLoaderErrors', '.loader-error')
-TestErrors.add_tests('testLoaderStringErrors', '.loader-error')
-TestErrors.add_tests('testLoaderSingleErrors', '.single-loader-error')
-TestErrors.add_tests('testEmitterErrors', '.emitter-error')
-TestErrors.add_tests('testDumperErrors', '.dumper-error')
+import yaml, test_emitter
+
+def test_loader_error(error_filename, verbose=False):
+ try:
+ list(yaml.load_all(open(error_filename, 'rb')))
+ except yaml.YAMLError, exc:
+ if verbose:
+ print "%s:" % exc.__class__.__name__, exc
+ else:
+ raise AssertionError("expected an exception")
+
+test_loader_error.unittest = ['.loader-error']
+
+def test_loader_error_string(error_filename, verbose=False):
+ try:
+ list(yaml.load_all(open(error_filename, 'rb').read()))
+ except yaml.YAMLError, exc:
+ if verbose:
+ print "%s:" % exc.__class__.__name__, exc
+ else:
+ raise AssertionError("expected an exception")
+
+test_loader_error_string.unittest = ['.loader-error']
+
+def test_loader_error_single(error_filename, verbose=False):
+ try:
+ yaml.load(open(error_filename, 'rb').read())
+ except yaml.YAMLError, exc:
+ if verbose:
+ print "%s:" % exc.__class__.__name__, exc
+ else:
+ raise AssertionError("expected an exception")
+
+test_loader_error_single.unittest = ['.single-loader-error']
+
+def test_emitter_error(error_filename, verbose=False):
+ events = list(yaml.load(open(error_filename, 'rb'),
+ Loader=test_emitter.EventsLoader))
+ try:
+ yaml.emit(events)
+ except yaml.YAMLError, exc:
+ if verbose:
+ print "%s:" % exc.__class__.__name__, exc
+ else:
+ raise AssertionError("expected an exception")
+
+test_emitter_error.unittest = ['.emitter-error']
+
+def test_dumper_error(error_filename, verbose=False):
+ code = open(error_filename, 'rb').read()
+ try:
+ import yaml, StringIO
+ exec code
+ except yaml.YAMLError, exc:
+ if verbose:
+ print "%s:" % exc.__class__.__name__, exc
+ else:
+ raise AssertionError("expected an exception")
+
+test_dumper_error.unittest = ['.dumper-error']
+
+if __name__ == '__main__':
+ import test_appliance
+ test_appliance.run(globals())
diff --git a/tests/test_mark.py b/tests/test_mark.py
index 4fa665e..f30a121 100644
--- a/tests/test_mark.py
+++ b/tests/test_mark.py
@@ -1,34 +1,32 @@
-import test_appliance
+import yaml
-from yaml.reader import Mark
+def test_marks(marks_filename, verbose=False):
+ inputs = open(marks_filename, 'rb').read().split('---\n')[1:]
+ for input in inputs:
+ index = 0
+ line = 0
+ column = 0
+ while input[index] != '*':
+ if input[index] == '\n':
+ line += 1
+ column = 0
+ else:
+ column += 1
+ index += 1
+ mark = yaml.Mark(marks_filename, index, line, column, unicode(input), index)
+ snippet = mark.get_snippet(indent=2, max_length=79)
+ if verbose:
+ print snippet
+ assert isinstance(snippet, str), type(snippet)
+ assert snippet.count('\n') == 1, snippet.count('\n')
+ data, pointer = snippet.split('\n')
+ assert len(data) < 82, len(data)
+ assert data[len(pointer)-1] == '*', data[len(pointer)-1]
-class TestMark(test_appliance.TestAppliance):
+test_marks.unittest = ['.marks']
- def _testMarks(self, test_name, marks_filename):
- inputs = file(marks_filename, 'rb').read().split('---\n')[1:]
- for input in inputs:
- index = 0
- line = 0
- column = 0
- while input[index] != '*':
- if input[index] == '\n':
- line += 1
- column = 0
- else:
- column += 1
- index += 1
- mark = Mark(test_name, index, line, column, unicode(input), index)
- snippet = mark.get_snippet(indent=2, max_length=79)
- #print "INPUT:"
- #print input
- #print "SNIPPET:"
- #print snippet
- self.failUnless(isinstance(snippet, str))
- self.failUnlessEqual(snippet.count('\n'), 1)
- data, pointer = snippet.split('\n')
- self.failUnless(len(data) < 82)
- self.failUnlessEqual(data[len(pointer)-1], '*')
-
-TestMark.add_tests('testMarks', '.marks')
+if __name__ == '__main__':
+ import test_appliance
+ test_appliance.run(globals())
diff --git a/tests/test_reader.py b/tests/test_reader.py
index 1bfae1a..3576ae6 100644
--- a/tests/test_reader.py
+++ b/tests/test_reader.py
@@ -1,44 +1,35 @@
-import test_appliance
-from yaml.reader import Reader, ReaderError
-
+import yaml.reader
import codecs
-class TestReaderErrors(test_appliance.TestAppliance):
-
- def _testReaderUnicodeErrors(self, test_name, stream_filename):
- for encoding in ['utf-8', 'utf-16-le', 'utf-16-be']:
- try:
- data = unicode(file(stream_filename, 'rb').read(), encoding)
- break
- except:
- pass
- else:
- return
- #self._load(data)
- self.failUnlessRaises(ReaderError,
- lambda: self._load(data))
- #self._load(codecs.open(stream_filename, encoding=encoding))
- self.failUnlessRaises(ReaderError,
- lambda: self._load(codecs.open(stream_filename, encoding=encoding)))
-
- def _testReaderStringErrors(self, test_name, stream_filename):
- data = file(stream_filename, 'rb').read()
- #self._load(data)
- self.failUnlessRaises(ReaderError, lambda: self._load(data))
-
- def _testReaderFileErrors(self, test_name, stream_filename):
- data = file(stream_filename, 'rb')
- #self._load(data)
- self.failUnlessRaises(ReaderError, lambda: self._load(data))
-
- def _load(self, data):
- stream = Reader(data)
+def _run_reader(data, verbose):
+ try:
+ stream = yaml.reader.Reader(data)
while stream.peek() != u'\0':
stream.forward()
-
-TestReaderErrors.add_tests('testReaderUnicodeErrors', '.stream-error')
-TestReaderErrors.add_tests('testReaderStringErrors', '.stream-error')
-TestReaderErrors.add_tests('testReaderFileErrors', '.stream-error')
-
+ except yaml.reader.ReaderError, exc:
+ if verbose:
+ print exc
+ else:
+ raise AssertionError("expected an exception")
+
+def test_stream_error(error_filename, verbose=False):
+ _run_reader(open(error_filename, 'rb'), verbose)
+ _run_reader(open(error_filename, 'rb').read(), verbose)
+ for encoding in ['utf-8', 'utf-16-le', 'utf-16-be']:
+ try:
+ data = unicode(open(error_filename, 'rb').read(), encoding)
+ break
+ except UnicodeDecodeError:
+ pass
+ else:
+ return
+ _run_reader(data, verbose)
+ _run_reader(codecs.open(error_filename, encoding=encoding), verbose)
+
+test_stream_error.unittest = ['.stream-error']
+
+if __name__ == '__main__':
+ import test_appliance
+ test_appliance.run(globals())
diff --git a/tests/test_recursive.py b/tests/test_recursive.py
index 3c09264..b2dca3c 100644
--- a/tests/test_recursive.py
+++ b/tests/test_recursive.py
@@ -1,7 +1,5 @@
-import test_appliance
-
-from yaml import *
+import yaml
class AnInstance:
@@ -24,29 +22,29 @@ class AnInstanceWithState(AnInstance):
def __setstate__(self, state):
self.foo, self.bar = state['attributes']
-class TestRecursive(test_appliance.TestAppliance):
-
- def _testRecursive(self, test_name, recursive_filename):
- exec file(recursive_filename, 'r').read()
- value1 = value
- output1 = None
- value2 = None
- output2 = None
- try:
- output1 = dump(value1)
- #print "OUTPUT %s:" % test_name
- #print output1
- value2 = load(output1)
- output2 = dump(value2)
- self.failUnlessEqual(output1, output2)
- except:
+def test_recursive(recursive_filename, verbose=False):
+ exec open(recursive_filename, 'rb').read()
+ value1 = value
+ output1 = None
+ value2 = None
+ output2 = None
+ try:
+ output1 = yaml.dump(value1)
+ value2 = yaml.load(output1)
+ output2 = yaml.dump(value2)
+ assert output1 == output2, (output1, output2)
+ finally:
+ if verbose:
print "VALUE1:", value1
print "VALUE2:", value2
print "OUTPUT1:"
print output1
print "OUTPUT2:"
print output2
- raise
-TestRecursive.add_tests('testRecursive', '.recursive')
+test_recursive.unittest = ['.recursive']
+
+if __name__ == '__main__':
+ import test_appliance
+ test_appliance.run(globals())
diff --git a/tests/test_representer.py b/tests/test_representer.py
index 6746146..f814705 100644
--- a/tests/test_representer.py
+++ b/tests/test_representer.py
@@ -1,60 +1,42 @@
-import test_appliance
-from test_constructor import *
+import yaml
+import test_constructor
+import pprint
-from yaml import *
-
-class TestRepresenterTypes(test_appliance.TestAppliance):
-
- def _testTypesUnicode(self, test_name, data_filename, code_filename):
- return self._testTypes(test_name, data_filename, code_filename, allow_unicode=True)
-
- def _testTypes(self, test_name, data_filename, code_filename, allow_unicode=False):
- data1 = eval(file(code_filename, 'rb').read())
- data2 = None
- output = None
+def test_representer_types(code_filename, verbose=False):
+ test_constructor._make_objects()
+ for allow_unicode in [False, True]:
+ native1 = test_constructor._load_code(open(code_filename, 'rb').read())
+ native2 = None
try:
- output = dump(data1, Dumper=MyDumper, allow_unicode=allow_unicode)
- data2 = load(output, Loader=MyLoader)
- self.failUnlessEqual(type(data1), type(data2))
+ output = yaml.dump(native1, Dumper=test_constructor.MyDumper,
+ allow_unicode=allow_unicode)
+ native2 = yaml.load(output, Loader=test_constructor.MyLoader)
try:
- self.failUnlessEqual(data1, data2)
- except (AssertionError, TypeError):
- if isinstance(data1, dict):
- data1 = [(repr(key), value) for key, value in data1.items()]
- data1.sort()
- data1 = repr(data1)
- data2 = [(repr(key), value) for key, value in data2.items()]
- data2.sort()
- data2 = repr(data2)
- if data1 != data2:
- raise
- elif isinstance(data1, list):
- self.failUnlessEqual(type(data1), type(data2))
- self.failUnlessEqual(len(data1), len(data2))
- for item1, item2 in zip(data1, data2):
- if (item1 != item1 or (item1 == 0.0 and item1 == 1.0)) and \
- (item2 != item2 or (item2 == 0.0 and item2 == 1.0)):
- continue
- if isinstance(item1, datetime.datetime) \
- and isinstance(item2, datetime.datetime):
- self.failUnlessEqual(item1.microsecond,
- item2.microsecond)
- if isinstance(item1, datetime.datetime):
- item1 = item1.utctimetuple()
- if isinstance(item2, datetime.datetime):
- item2 = item2.utctimetuple()
- self.failUnlessEqual(item1, item2)
- else:
- raise
- except:
- print
- print "OUTPUT:"
- print output
- print "NATIVES1:", data1
- print "NATIVES2:", data2
- raise
+ if native1 == native2:
+ continue
+ except TypeError:
+ pass
+ value1 = test_constructor._serialize_value(native1)
+ value2 = test_constructor._serialize_value(native2)
+ if verbose:
+ print "SERIALIZED NATIVE1:"
+ print value1
+ print "SERIALIZED NATIVE2:"
+ print value2
+ assert value1 == value2, (native1, native2)
+ finally:
+ if verbose:
+ print "NATIVE1:"
+ pprint.pprint(native1)
+ print "NATIVE2:"
+ pprint.pprint(native2)
+ print "OUTPUT:"
+ print output
+
+test_representer_types.unittest = ['.code']
-TestRepresenterTypes.add_tests('testTypes', '.data', '.code')
-TestRepresenterTypes.add_tests('testTypesUnicode', '.data', '.code')
+if __name__ == '__main__':
+ import test_appliance
+ test_appliance.run(globals())
diff --git a/tests/test_resolver.py b/tests/test_resolver.py
index a1845d8..5566750 100644
--- a/tests/test_resolver.py
+++ b/tests/test_resolver.py
@@ -1,83 +1,92 @@
-import test_appliance
-
-from yaml import *
-
-class MyLoader(Loader):
- pass
-
-class MyDumper(Dumper):
- pass
-
-add_path_resolver(u'!root', [],
- Loader=MyLoader, Dumper=MyDumper)
-
-add_path_resolver(u'!root/scalar', [], str,
- Loader=MyLoader, Dumper=MyDumper)
-
-add_path_resolver(u'!root/key11/key12/*', ['key11', 'key12'],
- Loader=MyLoader, Dumper=MyDumper)
-
-add_path_resolver(u'!root/key21/1/*', ['key21', 1],
- Loader=MyLoader, Dumper=MyDumper)
-
-add_path_resolver(u'!root/key31/*/*/key14/map', ['key31', None, None, 'key14'], dict,
- Loader=MyLoader, Dumper=MyDumper)
-
-class TestResolver(test_appliance.TestAppliance):
-
- def _testImplicitResolver(self, test_name, data_filename, detect_filename):
- node = None
- correct_tag = None
- try:
- correct_tag = file(detect_filename, 'rb').read().strip()
- node = compose(file(data_filename, 'rb'))
- self.failUnless(isinstance(node, SequenceNode))
- for scalar in node.value:
- self.failUnless(isinstance(scalar, ScalarNode))
- self.failUnlessEqual(scalar.tag, correct_tag)
- except:
- print
- print "DATA:"
- print file(data_filename, 'rb').read()
- print "CORRECT_TAG:"
- print file(detect_filename, 'rb').read()
- print "ROOT NODE:", node
- print "SCALAR NODES:", node.value
- raise
-
- def _testPathResolverLoader(self, test_name, data_filename, path_filename):
- #print serialize_all(compose_all(file(data_filename, 'rb').read(), Loader=MyLoader))
- nodes1 = compose_all(file(data_filename, 'rb').read(), Loader=MyLoader)
- nodes2 = compose_all(file(path_filename, 'rb').read())
+import yaml
+import pprint
+
+def test_implicit_resolver(data_filename, detect_filename, verbose=False):
+ correct_tag = None
+ node = None
+ try:
+ correct_tag = open(detect_filename, 'rb').read().strip()
+ node = yaml.compose(open(data_filename, 'rb'))
+ assert isinstance(node, yaml.SequenceNode), node
+ for scalar in node.value:
+ assert isinstance(scalar, yaml.ScalarNode), scalar
+ assert scalar.tag == correct_tag, (scalar.tag, correct_tag)
+ finally:
+ if verbose:
+ print "CORRECT TAG:", correct_tag
+ if hasattr(node, 'value'):
+ print "CHILDREN:"
+ pprint.pprint(node.value)
+
+test_implicit_resolver.unittest = ['.data', '.detect']
+
+def _make_path_loader_and_dumper():
+ global MyLoader, MyDumper
+
+ class MyLoader(yaml.Loader):
+ pass
+ class MyDumper(yaml.Dumper):
+ pass
+
+ yaml.add_path_resolver(u'!root', [],
+ Loader=MyLoader, Dumper=MyDumper)
+ yaml.add_path_resolver(u'!root/scalar', [], str,
+ Loader=MyLoader, Dumper=MyDumper)
+ yaml.add_path_resolver(u'!root/key11/key12/*', ['key11', 'key12'],
+ Loader=MyLoader, Dumper=MyDumper)
+ yaml.add_path_resolver(u'!root/key21/1/*', ['key21', 1],
+ Loader=MyLoader, Dumper=MyDumper)
+ yaml.add_path_resolver(u'!root/key31/*/*/key14/map', ['key31', None, None, 'key14'], dict,
+ Loader=MyLoader, Dumper=MyDumper)
+
+ return MyLoader, MyDumper
+
+def _convert_node(node):
+ if isinstance(node, yaml.ScalarNode):
+ return (node.tag, node.value)
+ elif isinstance(node, yaml.SequenceNode):
+ value = []
+ for item in node.value:
+ value.append(_convert_node(item))
+ return (node.tag, value)
+ elif isinstance(node, yaml.MappingNode):
+ value = []
+ for key, item in node.value:
+ value.append((_convert_node(key), _convert_node(item)))
+ return (node.tag, value)
+
+def test_path_resolver_loader(data_filename, path_filename, verbose=False):
+ _make_path_loader_and_dumper()
+ nodes1 = list(yaml.compose_all(open(data_filename, 'rb').read(), Loader=MyLoader))
+ nodes2 = list(yaml.compose_all(open(path_filename, 'rb').read()))
+ try:
for node1, node2 in zip(nodes1, nodes2):
- self.failUnlessEqual(self._convert(node1), self._convert(node2))
-
- def _testPathResolverDumper(self, test_name, data_filename, path_filename):
- for filename in [data_filename, path_filename]:
- output = serialize_all(compose_all(file(filename, 'rb').read()), Dumper=MyDumper)
- #print output
- nodes1 = compose_all(output)
- nodes2 = compose_all(file(data_filename, 'rb').read())
- for node1, node2 in zip(nodes1, nodes2):
- self.failUnlessEqual(self._convert(node1), self._convert(node2))
+ data1 = _convert_node(node1)
+ data2 = _convert_node(node2)
+ assert data1 == data2, (data1, data2)
+ finally:
+ if verbose:
+ print yaml.serialize_all(nodes1)
+
+test_path_resolver_loader.unittest = ['.data', '.path']
+
+def test_path_resolver_dumper(data_filename, path_filename, verbose=False):
+ _make_path_loader_and_dumper()
+ for filename in [data_filename, path_filename]:
+ output = yaml.serialize_all(yaml.compose_all(open(filename, 'rb')), Dumper=MyDumper)
+ if verbose:
+ print output
+ nodes1 = yaml.compose_all(output)
+ nodes2 = yaml.compose_all(open(data_filename, 'rb'))
+ for node1, node2 in zip(nodes1, nodes2):
+ data1 = _convert_node(node1)
+ data2 = _convert_node(node2)
+ assert data1 == data2, (data1, data2)
- def _convert(self, node):
- if isinstance(node, ScalarNode):
- return node.tag, node.value
- elif isinstance(node, SequenceNode):
- value = []
- for item in node.value:
- value.append(self._convert(item))
- return node.tag, value
- elif isinstance(node, MappingNode):
- value = []
- for key, item in node.value:
- value.append((self._convert(key), self._convert(item)))
- value.sort()
- return node.tag, value
+test_path_resolver_dumper.unittest = ['.data', '.path']
-TestResolver.add_tests('testImplicitResolver', '.data', '.detect')
-TestResolver.add_tests('testPathResolverLoader', '.data', '.path')
-TestResolver.add_tests('testPathResolverDumper', '.data', '.path')
+if __name__ == '__main__':
+ import test_appliance
+ test_appliance.run(globals())
diff --git a/tests/test_structure.py b/tests/test_structure.py
index 0aef982..61bcb80 100644
--- a/tests/test_structure.py
+++ b/tests/test_structure.py
@@ -1,228 +1,187 @@
-import test_appliance
-
-from yaml import *
-
-class TestStructure(test_appliance.TestAppliance):
-
- def _testStructure(self, test_name, data_filename, structure_filename):
- node1 = None
- node2 = eval(file(structure_filename, 'rb').read())
- try:
- loader = Loader(file(data_filename, 'rb'))
- node1 = []
- while not loader.check_event(StreamEndEvent):
- if not loader.check_event(StreamStartEvent, DocumentStartEvent, DocumentEndEvent):
- node1.append(self._convert(loader))
- else:
- loader.get_event()
- loader.get_event()
- if len(node1) == 1:
- node1 = node1[0]
- self.failUnlessEqual(node1, node2)
- except:
- print
- print "DATA:"
- print file(data_filename, 'rb').read()
- print "NODE1:", node1
- print "NODE2:", node2
- raise
-
- def _convert(self, loader):
- if loader.check_event(ScalarEvent):
- event = loader.get_event()
- if event.tag or event.anchor or event.value:
- return True
- else:
- return None
- elif loader.check_event(SequenceStartEvent):
- loader.get_event()
- sequence = []
- while not loader.check_event(SequenceEndEvent):
- sequence.append(self._convert(loader))
- loader.get_event()
- return sequence
- elif loader.check_event(MappingStartEvent):
- loader.get_event()
- mapping = []
- while not loader.check_event(MappingEndEvent):
- key = self._convert(loader)
- value = self._convert(loader)
- mapping.append((key, value))
- loader.get_event()
- return mapping
- elif loader.check_event(AliasEvent):
- loader.get_event()
- return '*'
+import yaml, canonical
+import pprint
+
+def _convert_structure(loader):
+ if loader.check_event(yaml.ScalarEvent):
+ event = loader.get_event()
+ if event.tag or event.anchor or event.value:
+ return True
else:
- loader.get_event()
- return '?'
-
-TestStructure.add_tests('testStructure', '.data', '.structure')
-
-class TestParser(test_appliance.TestAppliance):
-
- def _testParser(self, test_name, data_filename, canonical_filename):
- events1 = None
- events2 = None
- try:
- events1 = list(parse(file(data_filename, 'rb')))
- events2 = list(test_appliance.canonical_parse(file(canonical_filename, 'rb')))
- self._compare(events1, events2)
- except:
- print
- print "DATA1:"
- print file(data_filename, 'rb').read()
- print "DATA2:"
- print file(canonical_filename, 'rb').read()
- print "EVENTS1:", events1
- print "EVENTS2:", events2
- raise
-
- def _compare(self, events1, events2):
- self.failUnlessEqual(len(events1), len(events2))
- for event1, event2 in zip(events1, events2):
- self.failUnlessEqual(event1.__class__, event2.__class__)
- if isinstance(event1, AliasEvent):
- #self.failUnlessEqual(event1.name, event2.name)
- pass
- elif isinstance(event1, ScalarEvent):
- #self.failUnlessEqual(event1.anchor, event2.anchor)
- #self.failUnlessEqual(event1.tag, event2.tag)
- self.failUnlessEqual(event1.value, event2.value)
- if isinstance(event1, CollectionStartEvent):
- #self.failUnlessEqual(event1.anchor, event2.anchor)
- #self.failUnlessEqual(event1.tag, event2.tag)
- pass
-
-TestParser.add_tests('testParser', '.data', '.canonical')
-
-class TestResolver(test_appliance.TestAppliance):
-
- def _testResolver(self, test_name, data_filename, canonical_filename):
- nodes1 = None
- nodes2 = None
- try:
- nodes1 = list(compose_all(file(data_filename, 'rb')))
- nodes2 = list(test_appliance.canonical_compose_all(file(canonical_filename, 'rb')))
- self.failUnlessEqual(len(nodes1), len(nodes2))
- for node1, node2 in zip(nodes1, nodes2):
- self._compare(node1, node2)
- except:
- print
- print "DATA1:"
- print file(data_filename, 'rb').read()
- print "DATA2:"
- print file(canonical_filename, 'rb').read()
- print "NODES1:", nodes1
- print "NODES2:", nodes2
- raise
-
- def _compare(self, node1, node2):
- self.failUnlessEqual(node1.__class__, node2.__class__)
- if isinstance(node1, ScalarNode):
- #self.failUnlessEqual(node1.tag, node2.tag)
- self.failUnlessEqual(node1.value, node2.value)
- elif isinstance(node1, SequenceNode):
- self.failUnlessEqual(len(node1.value), len(node2.value))
- for item1, item2 in zip(node1.value, node2.value):
- self._compare(item1, item2)
- elif isinstance(node1, MappingNode):
- self.failUnlessEqual(len(node1.value), len(node2.value))
- items1 = node1.value.items()
- items1.sort(lambda (k1,v1), (k2,v2): cmp((k1.tag,k1.value,v1.tag,v1.value),
- (k2.tag,k2.value,v2.tag,v2.value)))
- items2 = node2.value.items()
- items2.sort(lambda (k1,v1), (k2,v2): cmp((k1.tag,k1.value,v1.tag,v1.value),
- (k2.tag,k2.value,v2.tag,v2.value)))
- for (key1, value1), (key2, value2) in zip(items1, items2):
- self._compare(key1, key2)
- self._compare(value1, value2)
-
-TestResolver.add_tests('testResolver', '.data', '.canonical')
-
-class MyLoader(Loader):
- def construct_sequence(self, node):
- return tuple(Loader.construct_sequence(self, node))
-
- def construct_mapping(self, node):
- pairs = self.construct_pairs(node)
- pairs.sort()
- return pairs
-
- def construct_undefined(self, node):
- return self.construct_scalar(node)
-
-MyLoader.add_constructor(u'tag:yaml.org,2002:map', MyLoader.construct_mapping)
-MyLoader.add_constructor(None, MyLoader.construct_undefined)
-
-class MyCanonicalLoader(test_appliance.CanonicalLoader):
-
- def construct_sequence(self, node):
- return tuple(test_appliance.CanonicalLoader.construct_sequence(self, node))
-
- def construct_mapping(self, node):
- pairs = self.construct_pairs(node)
- pairs.sort()
- return pairs
-
- def construct_undefined(self, node):
- return self.construct_scalar(node)
-
-MyCanonicalLoader.add_constructor(u'tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping)
-MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined)
-
-class TestConstructor(test_appliance.TestAppliance):
-
- def _testConstructor(self, test_name, data_filename, canonical_filename):
- data1 = None
- data2 = None
- try:
- data1 = list(load_all(file(data_filename, 'rb'), Loader=MyLoader))
- data2 = list(load_all(file(canonical_filename, 'rb'), Loader=MyCanonicalLoader))
- self.failUnlessEqual(data1, data2)
- except:
- print
- print "DATA1:"
- print file(data_filename, 'rb').read()
- print "DATA2:"
- print file(canonical_filename, 'rb').read()
- print "NATIVES1:", data1
- print "NATIVES2:", data2
- raise
-
-TestConstructor.add_tests('testConstructor', '.data', '.canonical')
-
-class TestParserOnCanonical(test_appliance.TestAppliance):
-
- def _testParserOnCanonical(self, test_name, canonical_filename):
- events1 = None
- events2 = None
- try:
- events1 = list(parse(file(canonical_filename, 'rb')))
- events2 = list(test_appliance.canonical_parse(file(canonical_filename, 'rb')))
- self._compare(events1, events2)
- except:
- print
- print "DATA:"
- print file(canonical_filename, 'rb').read()
- print "EVENTS1:", events1
- print "EVENTS2:", events2
- raise
-
- def _compare(self, events1, events2):
- self.failUnlessEqual(len(events1), len(events2))
- for event1, event2 in zip(events1, events2):
- self.failUnlessEqual(event1.__class__, event2.__class__)
- if isinstance(event1, AliasEvent):
- self.failUnlessEqual(event1.anchor, event2.anchor)
- elif isinstance(event1, ScalarEvent):
- self.failUnlessEqual(event1.anchor, event2.anchor)
- self.failUnlessEqual(event1.tag, event2.tag)
- self.failUnlessEqual(event1.value, event2.value)
- if isinstance(event1, CollectionStartEvent):
- self.failUnlessEqual(event1.anchor, event2.anchor)
- self.failUnlessEqual(event1.tag, event2.tag)
-
-TestParserOnCanonical.add_tests('testParserOnCanonical', '.canonical')
+ return None
+ elif loader.check_event(yaml.SequenceStartEvent):
+ loader.get_event()
+ sequence = []
+ while not loader.check_event(yaml.SequenceEndEvent):
+ sequence.append(_convert_structure(loader))
+ loader.get_event()
+ return sequence
+ elif loader.check_event(yaml.MappingStartEvent):
+ loader.get_event()
+ mapping = []
+ while not loader.check_event(yaml.MappingEndEvent):
+ key = _convert_structure(loader)
+ value = _convert_structure(loader)
+ mapping.append((key, value))
+ loader.get_event()
+ return mapping
+ elif loader.check_event(yaml.AliasEvent):
+ loader.get_event()
+ return '*'
+ else:
+ loader.get_event()
+ return '?'
+
+def test_structure(data_filename, structure_filename, verbose=False):
+ nodes1 = []
+ nodes2 = eval(open(structure_filename, 'rb').read())
+ try:
+ loader = yaml.Loader(open(data_filename, 'rb'))
+ while loader.check_event():
+ if loader.check_event(yaml.StreamStartEvent, yaml.StreamEndEvent,
+ yaml.DocumentStartEvent, yaml.DocumentEndEvent):
+ loader.get_event()
+ continue
+ nodes1.append(_convert_structure(loader))
+ if len(nodes1) == 1:
+ nodes1 = nodes1[0]
+ assert nodes1 == nodes2, (nodes1, nodes2)
+ finally:
+ if verbose:
+ print "NODES1:"
+ pprint.pprint(nodes1)
+ print "NODES2:"
+ pprint.pprint(nodes2)
+
+test_structure.unittest = ['.data', '.structure']
+
+def _compare_events(events1, events2, full=False):
+ assert len(events1) == len(events2), (len(events1), len(events2))
+ for event1, event2 in zip(events1, events2):
+ assert event1.__class__ == event2.__class__, (event1, event2)
+ if isinstance(event1, yaml.AliasEvent) and full:
+ assert event1.anchor == event2.anchor, (event1, event2)
+ if isinstance(event1, (yaml.ScalarEvent, yaml.CollectionStartEvent)):
+ if (event1.tag not in [None, u'!'] and event2.tag not in [None, u'!']) or full:
+ assert event1.tag == event2.tag, (event1, event2)
+ if isinstance(event1, yaml.ScalarEvent):
+ assert event1.value == event2.value, (event1, event2)
+
+def test_parser(data_filename, canonical_filename, verbose=False):
+ events1 = None
+ events2 = None
+ try:
+ events1 = list(yaml.parse(open(data_filename, 'rb')))
+ events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb')))
+ _compare_events(events1, events2)
+ finally:
+ if verbose:
+ print "EVENTS1:"
+ pprint.pprint(events1)
+ print "EVENTS2:"
+ pprint.pprint(events2)
+
+test_parser.unittest = ['.data', '.canonical']
+
+def test_parser_on_canonical(canonical_filename, verbose=False):
+ events1 = None
+ events2 = None
+ try:
+ events1 = list(yaml.parse(open(canonical_filename, 'rb')))
+ events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb')))
+ _compare_events(events1, events2, full=True)
+ finally:
+ if verbose:
+ print "EVENTS1:"
+ pprint.pprint(events1)
+ print "EVENTS2:"
+ pprint.pprint(events2)
+
+test_parser_on_canonical.unittest = ['.canonical']
+
+def _compare_nodes(node1, node2):
+ assert node1.__class__ == node2.__class__, (node1, node2)
+ assert node1.tag == node2.tag, (node1, node2)
+ if isinstance(node1, yaml.ScalarNode):
+ assert node1.value == node2.value, (node1, node2)
+ else:
+ assert len(node1.value) == len(node2.value), (node1, node2)
+ for item1, item2 in zip(node1.value, node2.value):
+ if not isinstance(item1, tuple):
+ item1 = (item1,)
+ item2 = (item2,)
+ for subnode1, subnode2 in zip(item1, item2):
+ _compare_nodes(subnode1, subnode2)
+
+def test_composer(data_filename, canonical_filename, verbose=False):
+ nodes1 = None
+ nodes2 = None
+ try:
+ nodes1 = list(yaml.compose_all(open(data_filename, 'rb')))
+ nodes2 = list(yaml.canonical_compose_all(open(canonical_filename, 'rb')))
+ assert len(nodes1) == len(nodes2), (len(nodes1), len(nodes2))
+ for node1, node2 in zip(nodes1, nodes2):
+ _compare_nodes(node1, node2)
+ finally:
+ if verbose:
+ print "NODES1:"
+ pprint.pprint(nodes1)
+ print "NODES2:"
+ pprint.pprint(nodes2)
+
+test_composer.unittest = ['.data', '.canonical']
+
+def _make_loader():
+ global MyLoader
+
+ class MyLoader(yaml.Loader):
+ def construct_sequence(self, node):
+ return tuple(yaml.Loader.construct_sequence(self, node))
+ def construct_mapping(self, node):
+ pairs = self.construct_pairs(node)
+ pairs.sort()
+ return pairs
+ def construct_undefined(self, node):
+ return self.construct_scalar(node)
+
+ MyLoader.add_constructor(u'tag:yaml.org,2002:map', MyLoader.construct_mapping)
+ MyLoader.add_constructor(None, MyLoader.construct_undefined)
+
+def _make_canonical_loader():
+ global MyCanonicalLoader
+
+ class MyCanonicalLoader(yaml.CanonicalLoader):
+ def construct_sequence(self, node):
+ return tuple(yaml.CanonicalLoader.construct_sequence(self, node))
+ def construct_mapping(self, node):
+ pairs = self.construct_pairs(node)
+ pairs.sort()
+ return pairs
+ def construct_undefined(self, node):
+ return self.construct_scalar(node)
+
+ MyCanonicalLoader.add_constructor(u'tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping)
+ MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined)
+
+def test_constructor(data_filename, canonical_filename, verbose=False):
+ _make_loader()
+ _make_canonical_loader()
+ native1 = None
+ native2 = None
+ try:
+ native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader))
+ native2 = list(yaml.load_all(open(canonical_filename, 'rb'), Loader=MyCanonicalLoader))
+ assert native1 == native2, (native1, native2)
+ finally:
+ if verbose:
+ print "NATIVE1:"
+ pprint.pprint(native1)
+ print "NATIVE2:"
+ pprint.pprint(native2)
+
+test_constructor.unittest = ['.data', '.canonical']
+
+if __name__ == '__main__':
+ import test_appliance
+ test_appliance.run(globals())
diff --git a/tests/test_syck.py b/tests/test_syck.py
deleted file mode 100644
index dd63056..0000000
--- a/tests/test_syck.py
+++ /dev/null
@@ -1,30 +0,0 @@
-
-import test_appliance
-
-class TestSyck(test_appliance.TestAppliance):
-
- def _testSyckOnTokenTests(self, test_name, data_filename, tokens_filename):
- try:
- syck.parse(file(data_filename, 'rb'))
- except:
- print
- print "DATA:"
- print file(data_filename, 'rb').read()
- raise
-
- def _testSyckOnCanonicalTests(self, test_name, data_filename, canonical_filename):
- try:
- syck.parse(file(data_filename, 'rb'))
- except:
- print
- print "DATA:"
- print file(data_filename, 'rb').read()
- raise
-
-try:
- import syck
- #TestSyck.add_tests('testSyckOnTokenTests', '.data', '.tokens')
- #TestSyck.add_tests('testSyckOnCanonicalTests', '.data', '.canonical')
-except ImportError:
- pass
-
diff --git a/tests/test_tokens.py b/tests/test_tokens.py
index 73d07b3..9613fa0 100644
--- a/tests/test_tokens.py
+++ b/tests/test_tokens.py
@@ -1,86 +1,77 @@
-import test_appliance
+import yaml
+import pprint
-from yaml import *
+# Tokens mnemonic:
+# directive: %
+# document_start: ---
+# document_end: ...
+# alias: *
+# anchor: &
+# tag: !
+# scalar _
+# block_sequence_start: [[
+# block_mapping_start: {{
+# block_end: ]}
+# flow_sequence_start: [
+# flow_sequence_end: ]
+# flow_mapping_start: {
+# flow_mapping_end: }
+# entry: ,
+# key: ?
+# value: :
-class TestTokens(test_appliance.TestAppliance):
+_replaces = {
+ yaml.DirectiveToken: '%',
+ yaml.DocumentStartToken: '---',
+ yaml.DocumentEndToken: '...',
+ yaml.AliasToken: '*',
+ yaml.AnchorToken: '&',
+ yaml.TagToken: '!',
+ yaml.ScalarToken: '_',
+ yaml.BlockSequenceStartToken: '[[',
+ yaml.BlockMappingStartToken: '{{',
+ yaml.BlockEndToken: ']}',
+ yaml.FlowSequenceStartToken: '[',
+ yaml.FlowSequenceEndToken: ']',
+ yaml.FlowMappingStartToken: '{',
+ yaml.FlowMappingEndToken: '}',
+ yaml.BlockEntryToken: ',',
+ yaml.FlowEntryToken: ',',
+ yaml.KeyToken: '?',
+ yaml.ValueToken: ':',
+}
- # Tokens mnemonic:
- # directive: %
- # document_start: ---
- # document_end: ...
- # alias: *
- # anchor: &
- # tag: !
- # scalar _
- # block_sequence_start: [[
- # block_mapping_start: {{
- # block_end: ]}
- # flow_sequence_start: [
- # flow_sequence_end: ]
- # flow_mapping_start: {
- # flow_mapping_end: }
- # entry: ,
- # key: ?
- # value: :
+def test_tokens(data_filename, tokens_filename, verbose=False):
+ tokens1 = []
+ tokens2 = open(tokens_filename, 'rb').read().split()
+ try:
+ for token in yaml.scan(open(data_filename, 'rb')):
+ if not isinstance(token, (yaml.StreamStartToken, yaml.StreamEndToken)):
+ tokens1.append(_replaces[token.__class__])
+ finally:
+ if verbose:
+ print "TOKENS1:", ' '.join(tokens1)
+ print "TOKENS2:", ' '.join(tokens2)
+ assert len(tokens1) == len(tokens2), (tokens1, tokens2)
+ for token1, token2 in zip(tokens1, tokens2):
+ assert token1 == token2, (token1, token2)
- replaces = {
- DirectiveToken: '%',
- DocumentStartToken: '---',
- DocumentEndToken: '...',
- AliasToken: '*',
- AnchorToken: '&',
- TagToken: '!',
- ScalarToken: '_',
- BlockSequenceStartToken: '[[',
- BlockMappingStartToken: '{{',
- BlockEndToken: ']}',
- FlowSequenceStartToken: '[',
- FlowSequenceEndToken: ']',
- FlowMappingStartToken: '{',
- FlowMappingEndToken: '}',
- BlockEntryToken: ',',
- FlowEntryToken: ',',
- KeyToken: '?',
- ValueToken: ':',
- }
+test_tokens.unittest = ['.data', '.tokens']
- def _testTokens(self, test_name, data_filename, tokens_filename):
- tokens1 = None
- tokens2 = file(tokens_filename, 'rb').read().split()
+def test_scanner(data_filename, canonical_filename, verbose=False):
+ for filename in [data_filename, canonical_filename]:
+ tokens = []
try:
- tokens1 = []
- for token in scan(file(data_filename, 'rb')):
- if not isinstance(token, (StreamStartToken, StreamEndToken)):
- tokens1.append(token)
- tokens1 = [self.replaces[t.__class__] for t in tokens1]
- self.failUnlessEqual(tokens1, tokens2)
- except:
- print
- print "DATA:"
- print file(data_filename, 'rb').read()
- print "TOKENS1:", tokens1
- print "TOKENS2:", tokens2
- raise
+ for token in yaml.scan(open(filename, 'rb')):
+ tokens.append(token.__class__.__name__)
+ finally:
+ if verbose:
+ pprint.pprint(tokens)
-TestTokens.add_tests('testTokens', '.data', '.tokens')
+test_scanner.unittest = ['.data', '.canonical']
-class TestScanner(test_appliance.TestAppliance):
-
- def _testScanner(self, test_name, data_filename, canonical_filename):
- for filename in [canonical_filename, data_filename]:
- tokens = None
- try:
- tokens = []
- for token in scan(file(filename, 'rb')):
- if not isinstance(token, (StreamStartToken, StreamEndToken)):
- tokens.append(token.__class__.__name__)
- except:
- print
- print "DATA:"
- print file(data_filename, 'rb').read()
- print "TOKENS:", tokens
- raise
-
-TestScanner.add_tests('testScanner', '.data', '.canonical')
+if __name__ == '__main__':
+ import test_appliance
+ test_appliance.run(globals())
diff --git a/tests/test_yaml.py b/tests/test_yaml.py
index d13321f..d195e1a 100644
--- a/tests/test_yaml.py
+++ b/tests/test_yaml.py
@@ -1,6 +1,4 @@
-import unittest
-
from test_mark import *
from test_reader import *
from test_canonical import *
@@ -12,11 +10,8 @@ from test_constructor import *
from test_emitter import *
from test_representer import *
from test_recursive import *
-from test_syck import *
-
-def main(module='__main__'):
- unittest.main(module)
if __name__ == '__main__':
- main()
+ import test_appliance
+ test_appliance.run(globals())
diff --git a/tests/test_yaml_ext.py b/tests/test_yaml_ext.py
index b9a2e58..8840ffd 100644
--- a/tests/test_yaml_ext.py
+++ b/tests/test_yaml_ext.py
@@ -1,195 +1,273 @@
-import unittest, test_appliance
-
import _yaml, yaml
+import types, pprint
-test_appliance.TestAppliance.SKIP_EXT = '.skip-ext'
-
-class TestCVersion(unittest.TestCase):
-
- def testCVersion(self):
- self.failUnlessEqual("%s.%s.%s" % _yaml.get_version(), _yaml.get_version_string())
+yaml.PyBaseLoader = yaml.BaseLoader
+yaml.PySafeLoader = yaml.SafeLoader
+yaml.PyLoader = yaml.Loader
+yaml.PyBaseDumper = yaml.BaseDumper
+yaml.PySafeDumper = yaml.SafeDumper
+yaml.PyDumper = yaml.Dumper
-class TestCLoader(test_appliance.TestAppliance):
-
- def _testCScannerFileInput(self, test_name, data_filename, canonical_filename):
- self._testCScanner(test_name, data_filename, canonical_filename, True)
-
- def _testCScanner(self, test_name, data_filename, canonical_filename, file_input=False, Loader=yaml.Loader):
- if file_input:
- data = file(data_filename, 'r')
- else:
- data = file(data_filename, 'r').read()
- tokens = list(yaml.scan(data, Loader=Loader))
- ext_tokens = []
- try:
- if file_input:
- data = file(data_filename, 'r')
- for token in yaml.scan(data, Loader=yaml.CLoader):
- ext_tokens.append(token)
- self.failUnlessEqual(len(tokens), len(ext_tokens))
- for token, ext_token in zip(tokens, ext_tokens):
- self.failUnlessEqual(token.__class__, ext_token.__class__)
- if not isinstance(token, yaml.StreamEndToken):
- self.failUnlessEqual((token.start_mark.index, token.start_mark.line, token.start_mark.column),
- (ext_token.start_mark.index, ext_token.start_mark.line, ext_token.start_mark.column))
- self.failUnlessEqual((token.end_mark.index, token.end_mark.line, token.end_mark.column),
- (ext_token.end_mark.index, ext_token.end_mark.line, ext_token.end_mark.column))
- if hasattr(token, 'value'):
- self.failUnlessEqual(token.value, ext_token.value)
- except:
- print
- print "DATA:"
- print file(data_filename, 'rb').read()
- print "TOKENS:", tokens
- print "EXT_TOKENS:", ext_tokens
- raise
-
- def _testCParser(self, test_name, data_filename, canonical_filename, Loader=yaml.Loader):
- data = file(data_filename, 'r').read()
- events = list(yaml.parse(data, Loader=Loader))
- ext_events = []
- try:
- for event in yaml.parse(data, Loader=yaml.CLoader):
- ext_events.append(event)
- #print "EVENT:", event
- self.failUnlessEqual(len(events), len(ext_events))
- for event, ext_event in zip(events, ext_events):
- self.failUnlessEqual(event.__class__, ext_event.__class__)
- if hasattr(event, 'anchor'):
- self.failUnlessEqual(event.anchor, ext_event.anchor)
- if hasattr(event, 'tag'):
- self.failUnlessEqual(event.tag, ext_event.tag)
- if hasattr(event, 'implicit'):
- self.failUnlessEqual(event.implicit, ext_event.implicit)
- if hasattr(event, 'value'):
- self.failUnlessEqual(event.value, ext_event.value)
- if hasattr(event, 'explicit'):
- self.failUnlessEqual(event.explicit, ext_event.explicit)
- if hasattr(event, 'version'):
- self.failUnlessEqual(event.version, ext_event.version)
- if hasattr(event, 'tags'):
- self.failUnlessEqual(event.tags, ext_event.tags)
- except:
- print
- print "DATA:"
- print file(data_filename, 'rb').read()
- print "EVENTS:", events
- print "EXT_EVENTS:", ext_events
- raise
-
-TestCLoader.add_tests('testCScanner', '.data', '.canonical')
-TestCLoader.add_tests('testCScannerFileInput', '.data', '.canonical')
-TestCLoader.add_tests('testCParser', '.data', '.canonical')
-
-class TestCEmitter(test_appliance.TestAppliance):
-
- def _testCEmitter(self, test_name, data_filename, canonical_filename, Loader=yaml.Loader):
- data1 = file(data_filename, 'r').read()
- events = list(yaml.parse(data1, Loader=Loader))
- data2 = yaml.emit(events, Dumper=yaml.CDumper)
- ext_events = []
- try:
- for event in yaml.parse(data2):
- ext_events.append(event)
- self.failUnlessEqual(len(events), len(ext_events))
- for event, ext_event in zip(events, ext_events):
- self.failUnlessEqual(event.__class__, ext_event.__class__)
- if hasattr(event, 'anchor'):
- self.failUnlessEqual(event.anchor, ext_event.anchor)
- if hasattr(event, 'tag'):
- if not (event.tag in ['!', None] and ext_event.tag in ['!', None]):
- self.failUnlessEqual(event.tag, ext_event.tag)
- if hasattr(event, 'implicit'):
- self.failUnlessEqual(event.implicit, ext_event.implicit)
- if hasattr(event, 'value'):
- self.failUnlessEqual(event.value, ext_event.value)
- if hasattr(event, 'explicit') and event.explicit:
- self.failUnlessEqual(event.explicit, ext_event.explicit)
- if hasattr(event, 'version'):
- self.failUnlessEqual(event.version, ext_event.version)
- if hasattr(event, 'tags'):
- self.failUnlessEqual(event.tags, ext_event.tags)
- except:
- print
- print "DATA1:"
- print data1
- print "DATA2:"
- print data2
- print "EVENTS:", events
- print "EXT_EVENTS:", ext_events
- raise
-
-TestCEmitter.add_tests('testCEmitter', '.data', '.canonical')
-
-yaml.BaseLoader = yaml.CBaseLoader
-yaml.SafeLoader = yaml.CSafeLoader
-yaml.Loader = yaml.CLoader
-yaml.BaseDumper = yaml.CBaseDumper
-yaml.SafeDumper = yaml.CSafeDumper
-yaml.Dumper = yaml.CDumper
old_scan = yaml.scan
-def scan(stream, Loader=yaml.CLoader):
+def new_scan(stream, Loader=yaml.CLoader):
return old_scan(stream, Loader)
-yaml.scan = scan
+
old_parse = yaml.parse
-def parse(stream, Loader=yaml.CLoader):
+def new_parse(stream, Loader=yaml.CLoader):
return old_parse(stream, Loader)
-yaml.parse = parse
+
old_compose = yaml.compose
-def compose(stream, Loader=yaml.CLoader):
+def new_compose(stream, Loader=yaml.CLoader):
return old_compose(stream, Loader)
-yaml.compose = compose
+
old_compose_all = yaml.compose_all
-def compose_all(stream, Loader=yaml.CLoader):
+def new_compose_all(stream, Loader=yaml.CLoader):
return old_compose_all(stream, Loader)
-yaml.compose_all = compose_all
-old_load_all = yaml.load_all
-def load_all(stream, Loader=yaml.CLoader):
- return old_load_all(stream, Loader)
-yaml.load_all = load_all
+
old_load = yaml.load
-def load(stream, Loader=yaml.CLoader):
+def new_load(stream, Loader=yaml.CLoader):
return old_load(stream, Loader)
-yaml.load = load
-def safe_load_all(stream):
- return yaml.load_all(stream, yaml.CSafeLoader)
-yaml.safe_load_all = safe_load_all
-def safe_load(stream):
- return yaml.load(stream, yaml.CSafeLoader)
-yaml.safe_load = safe_load
+
+old_load_all = yaml.load_all
+def new_load_all(stream, Loader=yaml.CLoader):
+ return old_load_all(stream, Loader)
+
+old_safe_load = yaml.safe_load
+def new_safe_load(stream):
+ return old_load(stream, yaml.CSafeLoader)
+
+old_safe_load_all = yaml.safe_load_all
+def new_safe_load_all(stream):
+ return old_load_all(stream, yaml.CSafeLoader)
+
old_emit = yaml.emit
-def emit(events, stream=None, Dumper=yaml.CDumper, **kwds):
+def new_emit(events, stream=None, Dumper=yaml.CDumper, **kwds):
return old_emit(events, stream, Dumper, **kwds)
-yaml.emit = emit
-old_serialize_all = yaml.serialize_all
-def serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds):
- return old_serialize_all(nodes, stream, Dumper, **kwds)
-yaml.serialize_all = serialize_all
+
old_serialize = yaml.serialize
-def serialize(node, stream, Dumper=yaml.CDumper, **kwds):
+def new_serialize(node, stream, Dumper=yaml.CDumper, **kwds):
return old_serialize(node, stream, Dumper, **kwds)
-yaml.serialize = serialize
-old_dump_all = yaml.dump_all
-def dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds):
- return old_dump_all(documents, stream, Dumper, **kwds)
-yaml.dump_all = dump_all
+
+old_serialize_all = yaml.serialize_all
+def new_serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds):
+ return old_serialize_all(nodes, stream, Dumper, **kwds)
+
old_dump = yaml.dump
-def dump(data, stream=None, Dumper=yaml.CDumper, **kwds):
+def new_dump(data, stream=None, Dumper=yaml.CDumper, **kwds):
return old_dump(data, stream, Dumper, **kwds)
-yaml.dump = dump
-def safe_dump_all(documents, stream=None, **kwds):
- return yaml.dump_all(documents, stream, yaml.CSafeDumper, **kwds)
-yaml.safe_dump_all = safe_dump_all
-def safe_dump(data, stream=None, **kwds):
- return yaml.dump(data, stream, yaml.CSafeDumper, **kwds)
-yaml.safe_dump = safe_dump
-from test_yaml import *
+old_dump_all = yaml.dump_all
+def new_dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds):
+ return old_dump_all(documents, stream, Dumper, **kwds)
+
+old_safe_dump = yaml.safe_dump
+def new_safe_dump(data, stream=None, **kwds):
+ return old_dump(data, stream, yaml.CSafeDumper, **kwds)
+
+old_safe_dump_all = yaml.safe_dump_all
+def new_safe_dump_all(documents, stream=None, **kwds):
+ return old_dump_all(documents, stream, yaml.CSafeDumper, **kwds)
+
+def _set_up():
+ yaml.BaseLoader = yaml.CBaseLoader
+ yaml.SafeLoader = yaml.CSafeLoader
+ yaml.Loader = yaml.CLoader
+ yaml.BaseDumper = yaml.CBaseDumper
+ yaml.SafeDumper = yaml.CSafeDumper
+ yaml.Dumper = yaml.CDumper
+ yaml.scan = new_scan
+ yaml.parse = new_parse
+ yaml.compose = new_compose
+ yaml.compose_all = new_compose_all
+ yaml.load = new_load
+ yaml.load_all = new_load_all
+ yaml.safe_load = new_safe_load
+ yaml.safe_load_all = new_safe_load_all
+ yaml.emit = new_emit
+ yaml.serialize = new_serialize
+ yaml.serialize_all = new_serialize_all
+ yaml.dump = new_dump
+ yaml.dump_all = new_dump_all
+ yaml.safe_dump = new_safe_dump
+ yaml.safe_dump_all = new_safe_dump_all
+
+def _tear_down():
+ yaml.BaseLoader = yaml.PyBaseLoader
+ yaml.SafeLoader = yaml.PySafeLoader
+ yaml.Loader = yaml.PyLoader
+ yaml.BaseDumper = yaml.PyBaseDumper
+ yaml.SafeDumper = yaml.PySafeDumper
+ yaml.Dumper = yaml.PyDumper
+ yaml.scan = old_scan
+ yaml.parse = old_parse
+ yaml.compose = old_compose
+ yaml.compose_all = old_compose_all
+ yaml.load = old_load
+ yaml.load_all = old_load_all
+ yaml.safe_load = old_safe_load
+ yaml.safe_load_all = old_safe_load_all
+ yaml.emit = old_emit
+ yaml.serialize = old_serialize
+ yaml.serialize_all = old_serialize_all
+ yaml.dump = old_dump
+ yaml.dump_all = old_dump_all
+ yaml.safe_dump = old_safe_dump
+ yaml.safe_dump_all = old_safe_dump_all
+
+def test_c_version(verbose=False):
+ if verbose:
+ print _yaml.get_version()
+ print _yaml.get_version_string()
+ assert ("%s.%s.%s" % _yaml.get_version()) == _yaml.get_version_string(), \
+ (_yaml.get_version(), _yaml.get_version_string())
+
+def _compare_scanners(py_data, c_data, verbose):
+ py_tokens = list(yaml.scan(py_data, Loader=yaml.PyLoader))
+ c_tokens = []
+ try:
+ for token in yaml.scan(c_data, Loader=yaml.CLoader):
+ c_tokens.append(token)
+ assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens))
+ for py_token, c_token in zip(py_tokens, c_tokens):
+ assert py_token.__class__ == c_token.__class__, (py_token, c_token)
+ if hasattr(py_token, 'value'):
+ assert py_token.value == c_token.value, (py_token, c_token)
+ if isinstance(py_token, yaml.StreamEndToken):
+ continue
+ py_start = (py_token.start_mark.index, py_token.start_mark.line, py_token.start_mark.column)
+ py_end = (py_token.end_mark.index, py_token.end_mark.line, py_token.end_mark.column)
+ c_start = (c_token.start_mark.index, c_token.start_mark.line, c_token.start_mark.column)
+ c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column)
+ assert py_start == c_start, (py_start, c_start)
+ assert py_end == c_end, (py_end, c_end)
+ finally:
+ if verbose:
+ print "PY_TOKENS:"
+ pprint.pprint(py_tokens)
+ print "C_TOKENS:"
+ pprint.pprint(c_tokens)
+
+def test_c_scanner(data_filename, canonical_filename, verbose=False):
+ _compare_scanners(open(data_filename, 'rb'),
+ open(data_filename, 'rb'), verbose)
+ _compare_scanners(open(data_filename, 'rb').read(),
+ open(data_filename, 'rb').read(), verbose)
+ _compare_scanners(open(canonical_filename, 'rb'),
+ open(canonical_filename, 'rb'), verbose)
+ _compare_scanners(open(canonical_filename, 'rb').read(),
+ open(canonical_filename, 'rb').read(), verbose)
+
+test_c_scanner.unittest = ['.data', '.canonical']
+test_c_scanner.skip = ['.skip-ext']
+
+def _compare_parsers(py_data, c_data, verbose):
+ py_events = list(yaml.parse(py_data, Loader=yaml.PyLoader))
+ c_events = []
+ try:
+ for event in yaml.parse(c_data, Loader=yaml.CLoader):
+ c_events.append(event)
+ assert len(py_events) == len(c_events), (len(py_events), len(c_events))
+ for py_event, c_event in zip(py_events, c_events):
+ for attribute in ['__class__', 'anchor', 'tag', 'implicit',
+ 'value', 'explicit', 'version', 'tags']:
+ py_value = getattr(py_event, attribute, None)
+ c_value = getattr(c_event, attribute, None)
+ assert py_value == c_value, (py_event, c_event, attribute)
+ finally:
+ if verbose:
+ print "PY_EVENTS:"
+ pprint.pprint(py_events)
+ print "C_EVENTS:"
+ pprint.pprint(c_events)
+
+def test_c_parser(data_filename, canonical_filename, verbose=False):
+ _compare_parsers(open(data_filename, 'rb'),
+ open(data_filename, 'rb'), verbose)
+ _compare_parsers(open(data_filename, 'rb').read(),
+ open(data_filename, 'rb').read(), verbose)
+ _compare_parsers(open(canonical_filename, 'rb'),
+ open(canonical_filename, 'rb'), verbose)
+ _compare_parsers(open(canonical_filename, 'rb').read(),
+ open(canonical_filename, 'rb').read(), verbose)
+
+test_c_parser.unittest = ['.data', '.canonical']
+test_c_parser.skip = ['.skip-ext']
+
+def _compare_emitters(data, verbose):
+ events = list(yaml.parse(data, Loader=yaml.PyLoader))
+ c_data = yaml.emit(events, Dumper=yaml.CDumper)
+ if verbose:
+ print c_data
+ py_events = list(yaml.parse(c_data, Loader=yaml.PyLoader))
+ c_events = list(yaml.parse(c_data, Loader=yaml.CLoader))
+ try:
+ assert len(events) == len(py_events), (len(events), len(py_events))
+ assert len(events) == len(c_events), (len(events), len(c_events))
+ for event, py_event, c_event in zip(events, py_events, c_events):
+ for attribute in ['__class__', 'anchor', 'tag', 'implicit',
+ 'value', 'explicit', 'version', 'tags']:
+ value = getattr(event, attribute, None)
+ py_value = getattr(py_event, attribute, None)
+ c_value = getattr(c_event, attribute, None)
+ if attribute == 'tag' and value in [None, u'!'] \
+ and py_value in [None, u'!'] and c_value in [None, u'!']:
+ continue
+ if attribute == 'explicit' and (py_value or c_value):
+ continue
+ assert value == py_value, (event, py_event, attribute)
+ assert value == c_value, (event, c_event, attribute)
+ finally:
+ if verbose:
+ print "EVENTS:"
+ pprint.pprint(events)
+ print "PY_EVENTS:"
+ pprint.pprint(py_events)
+ print "C_EVENTS:"
+ pprint.pprint(c_events)
+
+def test_c_emitter(data_filename, canonical_filename, verbose=False):
+ _compare_emitters(open(data_filename, 'rb').read(), verbose)
+ _compare_emitters(open(canonical_filename, 'rb').read(), verbose)
+
+test_c_emitter.unittest = ['.data', '.canonical']
+test_c_emitter.skip = ['.skip-ext']
+
+def wrap_ext_function(function):
+ def wrapper(*args, **kwds):
+ _set_up()
+ try:
+ function(*args, **kwds)
+ finally:
+ _tear_down()
+ wrapper.func_name = '%s_ext' % function.func_name
+ wrapper.unittest = function.unittest
+ wrapper.skip = getattr(function, 'skip', [])+['.skip-ext']
+ return wrapper
+
+def wrap_ext(collections):
+ functions = []
+ if not isinstance(collections, list):
+ collections = [collections]
+ for collection in collections:
+ if not isinstance(collection, dict):
+ collection = vars(collection)
+ keys = collection.keys()
+ keys.sort()
+ for key in keys:
+ value = collection[key]
+ if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
+ functions.append(wrap_ext_function(value))
+ for function in functions:
+ assert function.func_name not in globals()
+ globals()[function.func_name] = function
-def main(module='__main__'):
- unittest.main(module)
+import test_tokens, test_structure, test_errors, test_resolver, test_constructor, \
+ test_emitter, test_representer, test_recursive
+wrap_ext([test_tokens, test_structure, test_errors, test_resolver, test_constructor,
+ test_emitter, test_representer, test_recursive])
if __name__ == '__main__':
- main()
+ import test_appliance
+ test_appliance.run(globals())