diff options
author | xi <xi@18f92427-320e-0410-9341-c67f048884a3> | 2008-12-28 20:16:50 +0000 |
---|---|---|
committer | xi <xi@18f92427-320e-0410-9341-c67f048884a3> | 2008-12-28 20:16:50 +0000 |
commit | 4691639907112004c7242215370d8b42ffec3e5b (patch) | |
tree | cf3e23519728f94b6e04d2132a61577ef9deb0a0 | |
parent | 398bababf57286f4366649052c75075083d2060e (diff) | |
download | pyyaml-4691639907112004c7242215370d8b42ffec3e5b.tar.gz |
Refactored the test suite; updated include and library paths in setup.cfg.
git-svn-id: http://svn.pyyaml.org/pyyaml/trunk@322 18f92427-320e-0410-9341-c67f048884a3
34 files changed, 1698 insertions, 1649 deletions
diff --git a/MANIFEST.in b/MANIFEST.in index 1c6b8bf..3d3b4cd 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ include README LICENSE setup.py recursive-include examples *.py *.cfg *.yaml -#recursive-include tests *.py -#recursive-include tests/data * +recursive-include tests *.py +recursive-include tests/data * @@ -29,6 +29,9 @@ test: build testext: buildext ${PYTHON} tests/test_build_ext.py ${TEST} +testall: + ${PYTHON} setup.py test + dist: ${PYTHON} setup.py --with-libyaml sdist --formats=zip,gztar @@ -17,18 +17,19 @@ # The following options are used to build PyYAML Windows installer # for Python 2.3 on my PC: -#include_dirs=../../libyaml/branches/stable/include -#library_dirs=../../libyaml/branches/stable/win32/vc6/output/release/lib +#include_dirs=../../../libyaml/tags/0.1.2/include +#library_dirs=../../../libyaml/tags/0.1.2/win32/vc6/output/release/lib #define=YAML_DECLARE_STATIC # The following options are used to build PyYAML Windows installer # for Python 2.4 and Python 2.5 on my PC: -#include_dirs=../../libyaml/branches/stable/include -#library_dirs=../../libyaml/branches/stable/win32/vs2003/output/release/lib +#include_dirs=../../../libyaml/tags/0.1.2/include +#library_dirs=../../../libyaml/tags/0.1.2/win32/vs2003/output/release/lib #define=YAML_DECLARE_STATIC # The following options are used to build PyYAML Windows installer # for Python 2.6 on my PC: -#include_dirs=../../libyaml/branches/stable/include -#library_dirs=../../libyaml/branches/stable/win32/vs2008/output/release/lib +#include_dirs=../../../libyaml/tags/0.1.2/include +#library_dirs=../../../libyaml/tags/0.1.2/win32/vs2008/output/release/lib #define=YAML_DECLARE_STATIC + @@ -269,6 +269,25 @@ class bdist_rpm(_bdist_rpm): return spec_file +class test(Command): + + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + build_cmd = self.get_finalized_command('build') + build_cmd.run() + sys.path.insert(0, build_cmd.build_lib) + sys.path.insert(0, 'tests') + import test_all + test_all.main([]) + + if __name__ == '__main__': setup( @@ -296,6 +315,7 @@ if __name__ == '__main__': cmdclass={ 'build_ext': build_ext, 'bdist_rpm': bdist_rpm, + 'test': test, }, ) diff --git a/tests/canonical.py b/tests/canonical.py new file mode 100644 index 0000000..41d111a --- /dev/null +++ b/tests/canonical.py @@ -0,0 +1,357 @@ + +import yaml, yaml.composer, yaml.constructor, yaml.resolver + +class CanonicalError(yaml.YAMLError): + pass + +class CanonicalScanner: + + def __init__(self, data): + try: + self.data = unicode(data, 'utf-8')+u'\0' + except UnicodeDecodeError: + raise CanonicalError("utf-8 stream is expected") + self.index = 0 + self.tokens = [] + self.scanned = False + + def check_token(self, *choices): + if not self.scanned: + self.scan() + if self.tokens: + if not choices: + return True + for choice in choices: + if isinstance(self.tokens[0], choice): + return True + return False + + def peek_token(self): + if not self.scanned: + self.scan() + if self.tokens: + return self.tokens[0] + + def get_token(self, choice=None): + if not self.scanned: + self.scan() + token = self.tokens.pop(0) + if choice and not isinstance(token, choice): + raise CanonicalError("unexpected token "+repr(token)) + return token + + def get_token_value(self): + token = self.get_token() + return token.value + + def scan(self): + self.tokens.append(yaml.StreamStartToken(None, None)) + while True: + self.find_token() + ch = self.data[self.index] + if ch == u'\0': + self.tokens.append(yaml.StreamEndToken(None, None)) + break + elif ch == u'%': + self.tokens.append(self.scan_directive()) + elif ch == u'-' and self.data[self.index:self.index+3] == u'---': + self.index += 3 + self.tokens.append(yaml.DocumentStartToken(None, None)) + elif ch == u'[': + self.index += 1 + self.tokens.append(yaml.FlowSequenceStartToken(None, None)) + elif ch == u'{': + self.index += 1 + self.tokens.append(yaml.FlowMappingStartToken(None, None)) + elif ch == u']': + self.index += 1 + self.tokens.append(yaml.FlowSequenceEndToken(None, None)) + elif ch == u'}': + self.index += 1 + self.tokens.append(yaml.FlowMappingEndToken(None, None)) + elif ch == u'?': + self.index += 1 + self.tokens.append(yaml.KeyToken(None, None)) + elif ch == u':': + self.index += 1 + self.tokens.append(yaml.ValueToken(None, None)) + elif ch == u',': + self.index += 1 + self.tokens.append(yaml.FlowEntryToken(None, None)) + elif ch == u'*' or ch == u'&': + self.tokens.append(self.scan_alias()) + elif ch == u'!': + self.tokens.append(self.scan_tag()) + elif ch == u'"': + self.tokens.append(self.scan_scalar()) + else: + raise CanonicalError("invalid token") + self.scanned = True + + DIRECTIVE = u'%YAML 1.1' + + def scan_directive(self): + if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \ + self.data[self.index+len(self.DIRECTIVE)] in u' \n\0': + self.index += len(self.DIRECTIVE) + return yaml.DirectiveToken('YAML', (1, 1), None, None) + else: + raise CanonicalError("invalid directive") + + def scan_alias(self): + if self.data[self.index] == u'*': + TokenClass = yaml.AliasToken + else: + TokenClass = yaml.AnchorToken + self.index += 1 + start = self.index + while self.data[self.index] not in u', \n\0': + self.index += 1 + value = self.data[start:self.index] + return TokenClass(value, None, None) + + def scan_tag(self): + self.index += 1 + start = self.index + while self.data[self.index] not in u' \n\0': + self.index += 1 + value = self.data[start:self.index] + if not value: + value = u'!' + elif value[0] == u'!': + value = 'tag:yaml.org,2002:'+value[1:] + elif value[0] == u'<' and value[-1] == u'>': + value = value[1:-1] + else: + value = u'!'+value + return yaml.TagToken(value, None, None) + + QUOTE_CODES = { + 'x': 2, + 'u': 4, + 'U': 8, + } + + QUOTE_REPLACES = { + u'\\': u'\\', + u'\"': u'\"', + u' ': u' ', + u'a': u'\x07', + u'b': u'\x08', + u'e': u'\x1B', + u'f': u'\x0C', + u'n': u'\x0A', + u'r': u'\x0D', + u't': u'\x09', + u'v': u'\x0B', + u'N': u'\u0085', + u'L': u'\u2028', + u'P': u'\u2029', + u'_': u'_', + u'0': u'\x00', + + } + + def scan_scalar(self): + self.index += 1 + chunks = [] + start = self.index + ignore_spaces = False + while self.data[self.index] != u'"': + if self.data[self.index] == u'\\': + ignore_spaces = False + chunks.append(self.data[start:self.index]) + self.index += 1 + ch = self.data[self.index] + self.index += 1 + if ch == u'\n': + ignore_spaces = True + elif ch in self.QUOTE_CODES: + length = self.QUOTE_CODES[ch] + code = int(self.data[self.index:self.index+length], 16) + chunks.append(unichr(code)) + self.index += length + else: + if ch not in self.QUOTE_REPLACES: + raise CanonicalError("invalid escape code") + chunks.append(self.QUOTE_REPLACES[ch]) + start = self.index + elif self.data[self.index] == u'\n': + chunks.append(self.data[start:self.index]) + chunks.append(u' ') + self.index += 1 + start = self.index + ignore_spaces = True + elif ignore_spaces and self.data[self.index] == u' ': + self.index += 1 + start = self.index + else: + ignore_spaces = False + self.index += 1 + chunks.append(self.data[start:self.index]) + self.index += 1 + return yaml.ScalarToken(u''.join(chunks), False, None, None) + + def find_token(self): + found = False + while not found: + while self.data[self.index] in u' \t': + self.index += 1 + if self.data[self.index] == u'#': + while self.data[self.index] != u'\n': + self.index += 1 + if self.data[self.index] == u'\n': + self.index += 1 + else: + found = True + +class CanonicalParser: + + def __init__(self): + self.events = [] + self.parsed = False + + # stream: STREAM-START document* STREAM-END + def parse_stream(self): + self.get_token(yaml.StreamStartToken) + self.events.append(yaml.StreamStartEvent(None, None)) + while not self.check_token(yaml.StreamEndToken): + if self.check_token(yaml.DirectiveToken, yaml.DocumentStartToken): + self.parse_document() + else: + raise CanonicalError("document is expected, got "+repr(self.tokens[0])) + self.get_token(yaml.StreamEndToken) + self.events.append(yaml.StreamEndEvent(None, None)) + + # document: DIRECTIVE? DOCUMENT-START node + def parse_document(self): + node = None + if self.check_token(yaml.DirectiveToken): + self.get_token(yaml.DirectiveToken) + self.get_token(yaml.DocumentStartToken) + self.events.append(yaml.DocumentStartEvent(None, None)) + self.parse_node() + self.events.append(yaml.DocumentEndEvent(None, None)) + + # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping) + def parse_node(self): + if self.check_token(yaml.AliasToken): + self.events.append(yaml.AliasEvent(self.get_token_value(), None, None)) + else: + anchor = None + if self.check_token(yaml.AnchorToken): + anchor = self.get_token_value() + tag = None + if self.check_token(yaml.TagToken): + tag = self.get_token_value() + if self.check_token(yaml.ScalarToken): + self.events.append(yaml.ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None)) + elif self.check_token(yaml.FlowSequenceStartToken): + self.events.append(yaml.SequenceStartEvent(anchor, tag, None, None)) + self.parse_sequence() + elif self.check_token(yaml.FlowMappingStartToken): + self.events.append(yaml.MappingStartEvent(anchor, tag, None, None)) + self.parse_mapping() + else: + raise CanonicalError("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[0])) + + # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END + def parse_sequence(self): + self.get_token(yaml.FlowSequenceStartToken) + if not self.check_token(yaml.FlowSequenceEndToken): + self.parse_node() + while not self.check_token(yaml.FlowSequenceEndToken): + self.get_token(yaml.FlowEntryToken) + if not self.check_token(yaml.FlowSequenceEndToken): + self.parse_node() + self.get_token(yaml.FlowSequenceEndToken) + self.events.append(yaml.SequenceEndEvent(None, None)) + + # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END + def parse_mapping(self): + self.get_token(yaml.FlowMappingStartToken) + if not self.check_token(yaml.FlowMappingEndToken): + self.parse_map_entry() + while not self.check_token(yaml.FlowMappingEndToken): + self.get_token(yaml.FlowEntryToken) + if not self.check_token(yaml.FlowMappingEndToken): + self.parse_map_entry() + self.get_token(yaml.FlowMappingEndToken) + self.events.append(yaml.MappingEndEvent(None, None)) + + # map_entry: KEY node VALUE node + def parse_map_entry(self): + self.get_token(yaml.KeyToken) + self.parse_node() + self.get_token(yaml.ValueToken) + self.parse_node() + + def parse(self): + self.parse_stream() + self.parsed = True + + def get_event(self): + if not self.parsed: + self.parse() + return self.events.pop(0) + + def check_event(self, *choices): + if not self.parsed: + self.parse() + if self.events: + if not choices: + return True + for choice in choices: + if isinstance(self.events[0], choice): + return True + return False + + def peek_event(self): + if not self.parsed: + self.parse() + return self.events[0] + +class CanonicalLoader(CanonicalScanner, CanonicalParser, + yaml.composer.Composer, yaml.constructor.Constructor, yaml.resolver.Resolver): + + def __init__(self, stream): + if hasattr(stream, 'read'): + stream = stream.read() + CanonicalScanner.__init__(self, stream) + CanonicalParser.__init__(self) + yaml.composer.Composer.__init__(self) + yaml.constructor.Constructor.__init__(self) + yaml.resolver.Resolver.__init__(self) + +yaml.CanonicalLoader = CanonicalLoader + +def canonical_scan(stream): + return yaml.scan(stream, Loader=CanonicalLoader) + +yaml.canonical_scan = canonical_scan + +def canonical_parse(stream): + return yaml.parse(stream, Loader=CanonicalLoader) + +yaml.canonical_parse = canonical_parse + +def canonical_compose(stream): + return yaml.compose(stream, Loader=CanonicalLoader) + +yaml.canonical_compose = canonical_compose + +def canonical_compose_all(stream): + return yaml.compose_all(stream, Loader=CanonicalLoader) + +yaml.canonical_compose_all = canonical_compose_all + +def canonical_load(stream): + return yaml.load(stream, Loader=CanonicalLoader) + +yaml.canonical_load = canonical_load + +def canonical_load_all(stream): + return yaml.load_all(stream, Loader=CanonicalLoader) + +yaml.canonical_load_all = canonical_load_all + diff --git a/tests/data/construct-python-name-module.code b/tests/data/construct-python-name-module.code index 335c7ca..8f93503 100644 --- a/tests/data/construct-python-name-module.code +++ b/tests/data/construct-python-name-module.code @@ -1 +1 @@ -[file, Loader, dump, abs, yaml.tokens] +[file, yaml.Loader, yaml.dump, abs, yaml.tokens] diff --git a/tests/data/empty-document-bug.empty b/tests/data/empty-document-bug.empty new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/data/empty-document-bug.empty diff --git a/tests/data/invalid-character.stream-error b/tests/data/invalid-character.stream-error Binary files differindex 03687b0..24a894c 100644 --- a/tests/data/invalid-character.stream-error +++ b/tests/data/invalid-character.stream-error diff --git a/tests/data/invalid-utf8-byte.stream-error b/tests/data/invalid-utf8-byte.stream-error index 15111c3..02b1605 100644 --- a/tests/data/invalid-utf8-byte.stream-error +++ b/tests/data/invalid-utf8-byte.stream-error @@ -1,18 +1,18 @@ -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** +*************************************************************** Invalid byte ('\xFF'): ÿ <-- -------------------------------------------------------------------------------------------------------------------------------- +*************************************************************** diff --git a/tests/data/odd-utf16.stream-error b/tests/data/odd-utf16.stream-error Binary files differindex 37da060..92ee645 100644 --- a/tests/data/odd-utf16.stream-error +++ b/tests/data/odd-utf16.stream-error diff --git a/tests/data/serializer-is-already-opened.dumper-error b/tests/data/serializer-is-already-opened.dumper-error index 0f66a46..5ac2e4b 100644 --- a/tests/data/serializer-is-already-opened.dumper-error +++ b/tests/data/serializer-is-already-opened.dumper-error @@ -1,3 +1,3 @@ -dumper = Dumper(StringIO.StringIO()) +dumper = yaml.Dumper(StringIO.StringIO()) dumper.open() dumper.open() diff --git a/tests/data/serializer-is-closed-1.dumper-error b/tests/data/serializer-is-closed-1.dumper-error index 5d9ecfc..b6d5c7a 100644 --- a/tests/data/serializer-is-closed-1.dumper-error +++ b/tests/data/serializer-is-closed-1.dumper-error @@ -1,4 +1,4 @@ -dumper = Dumper(StringIO.StringIO()) +dumper = yaml.Dumper(StringIO.StringIO()) dumper.open() dumper.close() dumper.open() diff --git a/tests/data/serializer-is-closed-2.dumper-error b/tests/data/serializer-is-closed-2.dumper-error index 0762145..ff57b4d 100644 --- a/tests/data/serializer-is-closed-2.dumper-error +++ b/tests/data/serializer-is-closed-2.dumper-error @@ -1,4 +1,4 @@ -dumper = Dumper(StringIO.StringIO()) +dumper = yaml.Dumper(StringIO.StringIO()) dumper.open() dumper.close() -dumper.serialize(ScalarNode(tag='!foo', value='bar')) +dumper.serialize(yaml.ScalarNode(tag='!foo', value='bar')) diff --git a/tests/data/serializer-is-not-opened-1.dumper-error b/tests/data/serializer-is-not-opened-1.dumper-error index 1d21554..8dc6c1e 100644 --- a/tests/data/serializer-is-not-opened-1.dumper-error +++ b/tests/data/serializer-is-not-opened-1.dumper-error @@ -1,2 +1,2 @@ -dumper = Dumper(StringIO.StringIO()) +dumper = yaml.Dumper(StringIO.StringIO()) dumper.close() diff --git a/tests/data/serializer-is-not-opened-2.dumper-error b/tests/data/serializer-is-not-opened-2.dumper-error index d1d7f98..6922b22 100644 --- a/tests/data/serializer-is-not-opened-2.dumper-error +++ b/tests/data/serializer-is-not-opened-2.dumper-error @@ -1,2 +1,2 @@ -dumper = Dumper(StringIO.StringIO()) -dumper.serialize(ScalarNode(tag='!foo', value='bar')) +dumper = yaml.Dumper(StringIO.StringIO()) +dumper.serialize(yaml.ScalarNode(tag='!foo', value='bar')) diff --git a/tests/data/unknown.dumper-error b/tests/data/unknown.dumper-error index 4de4279..83204d2 100644 --- a/tests/data/unknown.dumper-error +++ b/tests/data/unknown.dumper-error @@ -1 +1 @@ -safe_dump(object) +yaml.safe_dump(object) diff --git a/tests/test_all.py b/tests/test_all.py index 8ae1c76..fec4ae4 100644 --- a/tests/test_all.py +++ b/tests/test_all.py @@ -1,14 +1,14 @@ -import unittest - -def main(): - import yaml - names = ['test_yaml'] - if yaml.__libyaml__: - names.append('test_yaml_ext') - suite = unittest.defaultTestLoader.loadTestsFromNames(names) - runner = unittest.TextTestRunner() - runner.run(suite) +import sys, yaml, test_appliance + +def main(args=None): + collections = [] + import test_yaml + collections.append(test_yaml) + if yaml.__with_libyaml__: + import test_yaml_ext + collections.append(test_yaml_ext) + test_appliance.run(collections, args) if __name__ == '__main__': main() diff --git a/tests/test_appliance.py b/tests/test_appliance.py index 61cc03f..09c46c9 100644 --- a/tests/test_appliance.py +++ b/tests/test_appliance.py @@ -1,358 +1,145 @@ -import unittest, os - -from yaml import * -from yaml.composer import * -from yaml.constructor import * -from yaml.resolver import * - -class TestAppliance(unittest.TestCase): - - DATA = 'tests/data' - SKIP_EXT = '.skip' - - all_tests = {} - for filename in os.listdir(DATA): - if os.path.isfile(os.path.join(DATA, filename)): - root, ext = os.path.splitext(filename) - all_tests.setdefault(root, []).append(ext) - - def add_tests(cls, method_name, *extensions): - for test in cls.all_tests: - available_extensions = cls.all_tests[test] - if cls.SKIP_EXT in available_extensions: - continue - for ext in extensions: - if ext not in available_extensions: - break - else: - filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions] - def test_method(self, test=test, filenames=filenames): - getattr(self, '_'+method_name)(test, *filenames) - test = test.replace('-', '_').replace('.', '_') - try: - test_method.__name__ = '%s_%s' % (method_name, test) - except TypeError: - import new - test_method = new.function(test_method.func_code, test_method.func_globals, - '%s_%s' % (method_name, test), test_method.func_defaults, - test_method.func_closure) - setattr(cls, test_method.__name__, test_method) - add_tests = classmethod(add_tests) - -class Error(Exception): - pass - -class CanonicalScanner: - - def __init__(self, data): - self.data = unicode(data, 'utf-8')+u'\0' - self.index = 0 - self.scan() - - def check_token(self, *choices): - if self.tokens: - if not choices: - return True - for choice in choices: - if isinstance(self.tokens[0], choice): - return True - return False - - def peek_token(self): - if self.tokens: - return self.tokens[0] - - def get_token(self, choice=None): - token = self.tokens.pop(0) - if choice and not isinstance(token, choice): - raise Error("unexpected token "+repr(token)) - return token - - def get_token_value(self): - token = self.get_token() - return token.value - - def scan(self): - self.tokens = [] - self.tokens.append(StreamStartToken(None, None)) - while True: - self.find_token() - ch = self.data[self.index] - if ch == u'\0': - self.tokens.append(StreamEndToken(None, None)) - break - elif ch == u'%': - self.tokens.append(self.scan_directive()) - elif ch == u'-' and self.data[self.index:self.index+3] == u'---': - self.index += 3 - self.tokens.append(DocumentStartToken(None, None)) - elif ch == u'[': - self.index += 1 - self.tokens.append(FlowSequenceStartToken(None, None)) - elif ch == u'{': - self.index += 1 - self.tokens.append(FlowMappingStartToken(None, None)) - elif ch == u']': - self.index += 1 - self.tokens.append(FlowSequenceEndToken(None, None)) - elif ch == u'}': - self.index += 1 - self.tokens.append(FlowMappingEndToken(None, None)) - elif ch == u'?': - self.index += 1 - self.tokens.append(KeyToken(None, None)) - elif ch == u':': - self.index += 1 - self.tokens.append(ValueToken(None, None)) - elif ch == u',': - self.index += 1 - self.tokens.append(FlowEntryToken(None, None)) - elif ch == u'*' or ch == u'&': - self.tokens.append(self.scan_alias()) - elif ch == u'!': - self.tokens.append(self.scan_tag()) - elif ch == u'"': - self.tokens.append(self.scan_scalar()) - else: - raise Error("invalid token") - - DIRECTIVE = u'%YAML 1.1' - - def scan_directive(self): - if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \ - self.data[self.index+len(self.DIRECTIVE)] in u' \n\0': - self.index += len(self.DIRECTIVE) - return DirectiveToken('YAML', (1, 1), None, None) - - def scan_alias(self): - if self.data[self.index] == u'*': - TokenClass = AliasToken +import sys, os, os.path, types, traceback, pprint + +DATA = 'tests/data' + +def find_test_functions(collections): + if not isinstance(collections, list): + collections = [collections] + functions = [] + for collection in collections: + if not isinstance(collection, dict): + collection = vars(collection) + keys = collection.keys() + keys.sort() + for key in keys: + value = collection[key] + if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'): + functions.append(value) + return functions + +def find_test_filenames(directory): + filenames = {} + for filename in os.listdir(directory): + if os.path.isfile(os.path.join(directory, filename)): + base, ext = os.path.splitext(filename) + filenames.setdefault(base, []).append(ext) + filenames = filenames.items() + filenames.sort() + return filenames + +def parse_arguments(args): + if args is None: + args = sys.argv[1:] + verbose = False + if '-v' in args: + verbose = True + args.remove('-v') + if '--verbose' in args: + verbose = True + if 'YAML_TEST_VERBOSE' in os.environ: + verbose = True + include_functions = [] + if args: + include_functions.append(args.pop(0)) + if 'YAML_TEST_FUNCTIONS' in os.environ: + include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split()) + include_filenames = [] + include_filenames.extend(args) + if 'YAML_TEST_FILENAMES' in os.environ: + include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split()) + return include_functions, include_filenames, verbose + +def execute(function, filenames, verbose): + if verbose: + sys.stdout.write('='*75+'\n') + sys.stdout.write('%s(%s)...\n' % (function.func_name, ', '.join(filenames))) + try: + function(verbose=verbose, *filenames) + except Exception, exc: + info = sys.exc_info() + if isinstance(exc, AssertionError): + kind = 'FAILURE' else: - TokenClass = AnchorToken - self.index += 1 - start = self.index - while self.data[self.index] not in u', \n\0': - self.index += 1 - value = self.data[start:self.index] - return TokenClass(value, None, None) - - def scan_tag(self): - self.index += 1 - start = self.index - while self.data[self.index] not in u' \n\0': - self.index += 1 - value = self.data[start:self.index] - if value[0] == u'!': - value = 'tag:yaml.org,2002:'+value[1:] - elif value[0] == u'<' and value[-1] == u'>': - value = value[1:-1] + kind = 'ERROR' + if verbose: + traceback.print_exc(limit=1, file=sys.stdout) else: - value = u'!'+value - return TagToken(value, None, None) - - QUOTE_CODES = { - 'x': 2, - 'u': 4, - 'U': 8, - } - - QUOTE_REPLACES = { - u'\\': u'\\', - u'\"': u'\"', - u' ': u' ', - u'a': u'\x07', - u'b': u'\x08', - u'e': u'\x1B', - u'f': u'\x0C', - u'n': u'\x0A', - u'r': u'\x0D', - u't': u'\x09', - u'v': u'\x0B', - u'N': u'\u0085', - u'L': u'\u2028', - u'P': u'\u2029', - u'_': u'_', - u'0': u'\x00', - - } - - def scan_scalar(self): - self.index += 1 - chunks = [] - start = self.index - ignore_spaces = False - while self.data[self.index] != u'"': - if self.data[self.index] == u'\\': - ignore_spaces = False - chunks.append(self.data[start:self.index]) - self.index += 1 - ch = self.data[self.index] - self.index += 1 - if ch == u'\n': - ignore_spaces = True - elif ch in self.QUOTE_CODES: - length = self.QUOTE_CODES[ch] - code = int(self.data[self.index:self.index+length], 16) - chunks.append(unichr(code)) - self.index += length + sys.stdout.write(kind[0]) + sys.stdout.flush() + else: + kind = 'SUCCESS' + info = None + if not verbose: + sys.stdout.write('.') + sys.stdout.flush() + return (function, filenames, kind, info) + +def display(results, verbose): + if results and not verbose: + sys.stdout.write('\n') + total = len(results) + failures = 0 + errors = 0 + for function, filenames, kind, info in results: + if kind == 'SUCCESS': + continue + if kind == 'FAILURE': + failures += 1 + if kind == 'ERROR': + errors += 1 + sys.stdout.write('='*75+'\n') + sys.stdout.write('%s(%s): %s\n' % (function.func_name, ', '.join(filenames), kind)) + if kind == 'ERROR': + traceback.print_exception(file=sys.stdout, *info) + else: + sys.stdout.write('Traceback (most recent call last):\n') + traceback.print_tb(info[2], file=sys.stdout) + sys.stdout.write('%s: see below\n' % info[0].__name__) + sys.stdout.write('~'*75+'\n') + for arg in info[1].args: + pprint.pprint(arg, stream=sys.stdout, indent=2) + for filename in filenames: + sys.stdout.write('-'*75+'\n') + sys.stdout.write('%s:\n' % filename) + data = open(filename, 'rb').read() + sys.stdout.write(data) + if data and data[-1] != '\n': + sys.stdout.write('\n') + sys.stdout.write('='*75+'\n') + sys.stdout.write('TESTS: %s\n' % total) + if failures: + sys.stdout.write('FAILURES: %s\n' % failures) + if errors: + sys.stdout.write('ERRORS: %s\n' % errors) + +def run(collections, args=None): + test_functions = find_test_functions(collections) + test_filenames = find_test_filenames(DATA) + include_functions, include_filenames, verbose = parse_arguments(args) + results = [] + for function in test_functions: + if include_functions and function.func_name not in include_functions: + continue + if function.unittest: + for base, exts in test_filenames: + if include_filenames and base not in include_filenames: + continue + filenames = [] + for ext in function.unittest: + if ext not in exts: + break + filenames.append(os.path.join(DATA, base+ext)) else: - chunks.append(self.QUOTE_REPLACES[ch]) - start = self.index - elif self.data[self.index] == u'\n': - chunks.append(self.data[start:self.index]) - chunks.append(u' ') - self.index += 1 - start = self.index - ignore_spaces = True - elif ignore_spaces and self.data[self.index] == u' ': - self.index += 1 - start = self.index - else: - ignore_spaces = False - self.index += 1 - chunks.append(self.data[start:self.index]) - self.index += 1 - return ScalarToken(u''.join(chunks), False, None, None) - - def find_token(self): - found = False - while not found: - while self.data[self.index] in u' \t': - self.index += 1 - if self.data[self.index] == u'#': - while self.data[self.index] != u'\n': - self.index += 1 - if self.data[self.index] == u'\n': - self.index += 1 - else: - found = True - -class CanonicalParser: - - def __init__(self): - self.events = [] - self.parse() - - # stream: STREAM-START document* STREAM-END - def parse_stream(self): - self.get_token(StreamStartToken) - self.events.append(StreamStartEvent(None, None)) - while not self.check_token(StreamEndToken): - if self.check_token(DirectiveToken, DocumentStartToken): - self.parse_document() - else: - raise Error("document is expected, got "+repr(self.tokens[self.index])) - self.get_token(StreamEndToken) - self.events.append(StreamEndEvent(None, None)) - - # document: DIRECTIVE? DOCUMENT-START node - def parse_document(self): - node = None - if self.check_token(DirectiveToken): - self.get_token(DirectiveToken) - self.get_token(DocumentStartToken) - self.events.append(DocumentStartEvent(None, None)) - self.parse_node() - self.events.append(DocumentEndEvent(None, None)) - - # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping) - def parse_node(self): - if self.check_token(AliasToken): - self.events.append(AliasEvent(self.get_token_value(), None, None)) + skip_exts = getattr(function, 'skip', []) + for skip_ext in skip_exts: + if skip_ext in exts: + break + else: + result = execute(function, filenames, verbose) + results.append(result) else: - anchor = None - if self.check_token(AnchorToken): - anchor = self.get_token_value() - tag = None - if self.check_token(TagToken): - tag = self.get_token_value() - if self.check_token(ScalarToken): - self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None)) - elif self.check_token(FlowSequenceStartToken): - self.events.append(SequenceStartEvent(anchor, tag, None, None)) - self.parse_sequence() - elif self.check_token(FlowMappingStartToken): - self.events.append(MappingStartEvent(anchor, tag, None, None)) - self.parse_mapping() - else: - raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index])) - - # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END - def parse_sequence(self): - self.get_token(FlowSequenceStartToken) - if not self.check_token(FlowSequenceEndToken): - self.parse_node() - while not self.check_token(FlowSequenceEndToken): - self.get_token(FlowEntryToken) - if not self.check_token(FlowSequenceEndToken): - self.parse_node() - self.get_token(FlowSequenceEndToken) - self.events.append(SequenceEndEvent(None, None)) - - # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END - def parse_mapping(self): - self.get_token(FlowMappingStartToken) - if not self.check_token(FlowMappingEndToken): - self.parse_map_entry() - while not self.check_token(FlowMappingEndToken): - self.get_token(FlowEntryToken) - if not self.check_token(FlowMappingEndToken): - self.parse_map_entry() - self.get_token(FlowMappingEndToken) - self.events.append(MappingEndEvent(None, None)) - - # map_entry: KEY node VALUE node - def parse_map_entry(self): - self.get_token(KeyToken) - self.parse_node() - self.get_token(ValueToken) - self.parse_node() - - def parse(self): - self.parse_stream() - - def get_event(self): - return self.events.pop(0) - - def check_event(self, *choices): - if self.events: - if not choices: - return True - for choice in choices: - if isinstance(self.events[0], choice): - return True - return False - - def peek_event(self): - return self.events[0] - -class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver): - - def __init__(self, stream): - if hasattr(stream, 'read'): - stream = stream.read() - CanonicalScanner.__init__(self, stream) - CanonicalParser.__init__(self) - Composer.__init__(self) - Constructor.__init__(self) - Resolver.__init__(self) - -def canonical_scan(stream): - return scan(stream, Loader=CanonicalLoader) - -def canonical_parse(stream): - return parse(stream, Loader=CanonicalLoader) - -def canonical_compose(stream): - return compose(stream, Loader=CanonicalLoader) - -def canonical_compose_all(stream): - return compose_all(stream, Loader=CanonicalLoader) - -def canonical_load(stream): - return load(stream, Loader=CanonicalLoader) - -def canonical_load_all(stream): - return load_all(stream, Loader=CanonicalLoader) + result = execute(function, [], verbose) + results.append(result) + display(results, verbose=verbose) diff --git a/tests/test_build.py b/tests/test_build.py index 42afa78..901e8ed 100644 --- a/tests/test_build.py +++ b/tests/test_build.py @@ -1,13 +1,10 @@ -def main(): +if __name__ == '__main__': import sys, os, distutils.util build_lib = 'build/lib' build_lib_ext = os.path.join('build', 'lib.%s-%s' % (distutils.util.get_platform(), sys.version[0:3])) sys.path.insert(0, build_lib) sys.path.insert(0, build_lib_ext) - import test_yaml - test_yaml.main('test_yaml') - -if __name__ == '__main__': - main() + import test_yaml, test_appliance + test_appliance.run(test_yaml) diff --git a/tests/test_build_ext.py b/tests/test_build_ext.py index a335c1b..ff195d5 100644 --- a/tests/test_build_ext.py +++ b/tests/test_build_ext.py @@ -1,14 +1,11 @@ -def main(): +if __name__ == '__main__': import sys, os, distutils.util build_lib = 'build/lib' build_lib_ext = os.path.join('build', 'lib.%s-%s' % (distutils.util.get_platform(), sys.version[0:3])) sys.path.insert(0, build_lib) sys.path.insert(0, build_lib_ext) - import test_yaml_ext - test_yaml_ext.main('test_yaml_ext') - -if __name__ == '__main__': - main() + import test_yaml_ext, test_appliance + test_appliance.run(test_yaml_ext) diff --git a/tests/test_canonical.py b/tests/test_canonical.py index 4416902..a851ef2 100644 --- a/tests/test_canonical.py +++ b/tests/test_canonical.py @@ -1,20 +1,40 @@ -import test_appliance +import yaml, canonical -class TestCanonicalAppliance(test_appliance.TestAppliance): +def test_canonical_scanner(canonical_filename, verbose=False): + data = open(canonical_filename, 'rb').read() + tokens = list(yaml.canonical_scan(data)) + assert tokens, tokens + if verbose: + for token in tokens: + print token - def _testCanonicalScanner(self, test_name, canonical_filename): - data = file(canonical_filename, 'rb').read() - tokens = list(test_appliance.canonical_scan(data)) - #for token in tokens: - # print token +test_canonical_scanner.unittest = ['.canonical'] - def _testCanonicalParser(self, test_name, canonical_filename): - data = file(canonical_filename, 'rb').read() - event = list(test_appliance.canonical_parse(data)) - #for event in events: - # print event +def test_canonical_parser(canonical_filename, verbose=False): + data = open(canonical_filename, 'rb').read() + events = list(yaml.canonical_parse(data)) + assert events, events + if verbose: + for event in events: + print event -TestCanonicalAppliance.add_tests('testCanonicalScanner', '.canonical') -TestCanonicalAppliance.add_tests('testCanonicalParser', '.canonical') +test_canonical_parser.unittest = ['.canonical'] + +def test_canonical_error(data_filename, canonical_filename, verbose=False): + data = open(data_filename, 'rb').read() + try: + output = list(yaml.canonical_load_all(data)) + except yaml.YAMLError, exc: + if verbose: + print exc + else: + raise AssertionError("expected an exception") + +test_canonical_error.unittest = ['.data', '.canonical'] +test_canonical_error.skip = ['.empty'] + +if __name__ == '__main__': + import test_appliance + test_appliance.run(globals()) diff --git a/tests/test_constructor.py b/tests/test_constructor.py index 04f54ef..bb01692 100644 --- a/tests/test_constructor.py +++ b/tests/test_constructor.py @@ -1,313 +1,271 @@ -import test_appliance +import yaml +import pprint import datetime try: set except NameError: from sets import Set as set - -from yaml import * - import yaml.tokens -class MyLoader(Loader): - pass -class MyDumper(Dumper): - pass - -class MyTestClass1: - - def __init__(self, x, y=0, z=0): - self.x = x - self.y = y - self.z = z - - def __eq__(self, other): - if isinstance(other, MyTestClass1): - return self.__class__, self.__dict__ == other.__class__, other.__dict__ - else: - return False - -def construct1(constructor, node): - mapping = constructor.construct_mapping(node) - return MyTestClass1(**mapping) -def represent1(representer, native): - return representer.represent_mapping("!tag1", native.__dict__) - -add_constructor("!tag1", construct1, Loader=MyLoader) -add_representer(MyTestClass1, represent1, Dumper=MyDumper) - -class MyTestClass2(MyTestClass1, YAMLObject): - - yaml_loader = MyLoader - yaml_dumper = MyDumper - yaml_tag = "!tag2" - - def from_yaml(cls, constructor, node): - x = constructor.construct_yaml_int(node) - return cls(x=x) - from_yaml = classmethod(from_yaml) - - def to_yaml(cls, representer, native): - return representer.represent_scalar(cls.yaml_tag, str(native.x)) - to_yaml = classmethod(to_yaml) - -class MyTestClass3(MyTestClass2): - - yaml_tag = "!tag3" - - def from_yaml(cls, constructor, node): - mapping = constructor.construct_mapping(node) - if '=' in mapping: - x = mapping['='] - del mapping['='] - mapping['x'] = x - return cls(**mapping) - from_yaml = classmethod(from_yaml) - - def to_yaml(cls, representer, native): - return representer.represent_mapping(cls.yaml_tag, native.__dict__) - to_yaml = classmethod(to_yaml) - -class YAMLObject1(YAMLObject): - - yaml_loader = MyLoader - yaml_dumper = MyDumper - yaml_tag = '!foo' - - def __init__(self, my_parameter=None, my_another_parameter=None): - self.my_parameter = my_parameter - self.my_another_parameter = my_another_parameter - - def __eq__(self, other): - if isinstance(other, YAMLObject1): - return self.__class__, self.__dict__ == other.__class__, other.__dict__ - else: - return False - -class YAMLObject2(YAMLObject): - - yaml_loader = MyLoader - yaml_dumper = MyDumper - yaml_tag = '!bar' - - def __init__(self, foo=1, bar=2, baz=3): - self.foo = foo - self.bar = bar - self.baz = baz - - def __getstate__(self): - return {1: self.foo, 2: self.bar, 3: self.baz} - - def __setstate__(self, state): - self.foo = state[1] - self.bar = state[2] - self.baz = state[3] - - def __eq__(self, other): - if isinstance(other, YAMLObject2): - return self.__class__, self.__dict__ == other.__class__, other.__dict__ - else: - return False - -class AnObject(object): - - def __new__(cls, foo=None, bar=None, baz=None): - self = object.__new__(cls) - self.foo = foo - self.bar = bar - self.baz = baz - return self - - def __cmp__(self, other): - return cmp((type(self), self.foo, self.bar, self.baz), - (type(other), other.foo, other.bar, other.baz)) - - def __eq__(self, other): - return type(self) is type(other) and \ - (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz) - -class AnInstance: - - def __init__(self, foo=None, bar=None, baz=None): - self.foo = foo - self.bar = bar - self.baz = baz - - def __cmp__(self, other): - return cmp((type(self), self.foo, self.bar, self.baz), - (type(other), other.foo, other.bar, other.baz)) - - def __eq__(self, other): - return type(self) is type(other) and \ - (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz) - -class AState(AnInstance): - - def __getstate__(self): - return { - '_foo': self.foo, - '_bar': self.bar, - '_baz': self.baz, - } - - def __setstate__(self, state): - self.foo = state['_foo'] - self.bar = state['_bar'] - self.baz = state['_baz'] - -class ACustomState(AnInstance): - - def __getstate__(self): - return (self.foo, self.bar, self.baz) - - def __setstate__(self, state): - self.foo, self.bar, self.baz = state - -class InitArgs(AnInstance): - - def __getinitargs__(self): - return (self.foo, self.bar, self.baz) - - def __getstate__(self): - return {} - -class InitArgsWithState(AnInstance): - - def __getinitargs__(self): - return (self.foo, self.bar) - - def __getstate__(self): - return self.baz - - def __setstate__(self, state): - self.baz = state - -class NewArgs(AnObject): - - def __getnewargs__(self): - return (self.foo, self.bar, self.baz) - - def __getstate__(self): - return {} - -class NewArgsWithState(AnObject): - - def __getnewargs__(self): - return (self.foo, self.bar) - - def __getstate__(self): - return self.baz - - def __setstate__(self, state): - self.baz = state - -class Reduce(AnObject): - - def __reduce__(self): - return self.__class__, (self.foo, self.bar, self.baz) - -class ReduceWithState(AnObject): - - def __reduce__(self): - return self.__class__, (self.foo, self.bar), self.baz - - def __setstate__(self, state): - self.baz = state - -class MyInt(int): - - def __eq__(self, other): - return type(self) is type(other) and int(self) == int(other) - -class MyList(list): - - def __init__(self, n=1): - self.extend([None]*n) - - def __eq__(self, other): - return type(self) is type(other) and list(self) == list(other) - -class MyDict(dict): - - def __init__(self, n=1): - for k in range(n): - self[k] = None - - def __eq__(self, other): - return type(self) is type(other) and dict(self) == dict(other) - -class FixedOffset(datetime.tzinfo): - - def __init__(self, offset, name): - self.__offset = datetime.timedelta(minutes=offset) - self.__name = name - - def utcoffset(self, dt): - return self.__offset - - def tzname(self, dt): - return self.__name - - def dst(self, dt): - return datetime.timedelta(0) - - def execute(code): exec code return value -class TestConstructorTypes(test_appliance.TestAppliance): - - def _testTypes(self, test_name, data_filename, code_filename): - data1 = None - data2 = None +def _make_objects(): + global MyLoader, MyDumper, MyTestClass1, MyTestClass2, MyTestClass3, YAMLObject1, YAMLObject2, \ + AnObject, AnInstance, AState, ACustomState, InitArgs, InitArgsWithState, \ + NewArgs, NewArgsWithState, Reduce, ReduceWithState, MyInt, MyList, MyDict, \ + FixedOffset, execute + + class MyLoader(yaml.Loader): + pass + class MyDumper(yaml.Dumper): + pass + + class MyTestClass1: + def __init__(self, x, y=0, z=0): + self.x = x + self.y = y + self.z = z + def __eq__(self, other): + if isinstance(other, MyTestClass1): + return self.__class__, self.__dict__ == other.__class__, other.__dict__ + else: + return False + + def construct1(constructor, node): + mapping = constructor.construct_mapping(node) + return MyTestClass1(**mapping) + def represent1(representer, native): + return representer.represent_mapping("!tag1", native.__dict__) + + yaml.add_constructor("!tag1", construct1, Loader=MyLoader) + yaml.add_representer(MyTestClass1, represent1, Dumper=MyDumper) + + class MyTestClass2(MyTestClass1, yaml.YAMLObject): + yaml_loader = MyLoader + yaml_dumper = MyDumper + yaml_tag = "!tag2" + def from_yaml(cls, constructor, node): + x = constructor.construct_yaml_int(node) + return cls(x=x) + from_yaml = classmethod(from_yaml) + def to_yaml(cls, representer, native): + return representer.represent_scalar(cls.yaml_tag, str(native.x)) + to_yaml = classmethod(to_yaml) + + class MyTestClass3(MyTestClass2): + yaml_tag = "!tag3" + def from_yaml(cls, constructor, node): + mapping = constructor.construct_mapping(node) + if '=' in mapping: + x = mapping['='] + del mapping['='] + mapping['x'] = x + return cls(**mapping) + from_yaml = classmethod(from_yaml) + def to_yaml(cls, representer, native): + return representer.represent_mapping(cls.yaml_tag, native.__dict__) + to_yaml = classmethod(to_yaml) + + class YAMLObject1(yaml.YAMLObject): + yaml_loader = MyLoader + yaml_dumper = MyDumper + yaml_tag = '!foo' + def __init__(self, my_parameter=None, my_another_parameter=None): + self.my_parameter = my_parameter + self.my_another_parameter = my_another_parameter + def __eq__(self, other): + if isinstance(other, YAMLObject1): + return self.__class__, self.__dict__ == other.__class__, other.__dict__ + else: + return False + + class YAMLObject2(yaml.YAMLObject): + yaml_loader = MyLoader + yaml_dumper = MyDumper + yaml_tag = '!bar' + def __init__(self, foo=1, bar=2, baz=3): + self.foo = foo + self.bar = bar + self.baz = baz + def __getstate__(self): + return {1: self.foo, 2: self.bar, 3: self.baz} + def __setstate__(self, state): + self.foo = state[1] + self.bar = state[2] + self.baz = state[3] + def __eq__(self, other): + if isinstance(other, YAMLObject2): + return self.__class__, self.__dict__ == other.__class__, other.__dict__ + else: + return False + + class AnObject(object): + def __new__(cls, foo=None, bar=None, baz=None): + self = object.__new__(cls) + self.foo = foo + self.bar = bar + self.baz = baz + return self + def __cmp__(self, other): + return cmp((type(self), self.foo, self.bar, self.baz), + (type(other), other.foo, other.bar, other.baz)) + def __eq__(self, other): + return type(self) is type(other) and \ + (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz) + + class AnInstance: + def __init__(self, foo=None, bar=None, baz=None): + self.foo = foo + self.bar = bar + self.baz = baz + def __cmp__(self, other): + return cmp((type(self), self.foo, self.bar, self.baz), + (type(other), other.foo, other.bar, other.baz)) + def __eq__(self, other): + return type(self) is type(other) and \ + (self.foo, self.bar, self.baz) == (other.foo, other.bar, other.baz) + + class AState(AnInstance): + def __getstate__(self): + return { + '_foo': self.foo, + '_bar': self.bar, + '_baz': self.baz, + } + def __setstate__(self, state): + self.foo = state['_foo'] + self.bar = state['_bar'] + self.baz = state['_baz'] + + class ACustomState(AnInstance): + def __getstate__(self): + return (self.foo, self.bar, self.baz) + def __setstate__(self, state): + self.foo, self.bar, self.baz = state + + class InitArgs(AnInstance): + def __getinitargs__(self): + return (self.foo, self.bar, self.baz) + def __getstate__(self): + return {} + + class InitArgsWithState(AnInstance): + def __getinitargs__(self): + return (self.foo, self.bar) + def __getstate__(self): + return self.baz + def __setstate__(self, state): + self.baz = state + + class NewArgs(AnObject): + def __getnewargs__(self): + return (self.foo, self.bar, self.baz) + def __getstate__(self): + return {} + + class NewArgsWithState(AnObject): + def __getnewargs__(self): + return (self.foo, self.bar) + def __getstate__(self): + return self.baz + def __setstate__(self, state): + self.baz = state + + class Reduce(AnObject): + def __reduce__(self): + return self.__class__, (self.foo, self.bar, self.baz) + + class ReduceWithState(AnObject): + def __reduce__(self): + return self.__class__, (self.foo, self.bar), self.baz + def __setstate__(self, state): + self.baz = state + + class MyInt(int): + def __eq__(self, other): + return type(self) is type(other) and int(self) == int(other) + + class MyList(list): + def __init__(self, n=1): + self.extend([None]*n) + def __eq__(self, other): + return type(self) is type(other) and list(self) == list(other) + + class MyDict(dict): + def __init__(self, n=1): + for k in range(n): + self[k] = None + def __eq__(self, other): + return type(self) is type(other) and dict(self) == dict(other) + + class FixedOffset(datetime.tzinfo): + def __init__(self, offset, name): + self.__offset = datetime.timedelta(minutes=offset) + self.__name = name + def utcoffset(self, dt): + return self.__offset + def tzname(self, dt): + return self.__name + def dst(self, dt): + return datetime.timedelta(0) + +def _load_code(expression): + return eval(expression) + +def _serialize_value(data): + if isinstance(data, list): + return '[%s]' % ', '.join(map(_serialize_value, data)) + elif isinstance(data, dict): + items = [] + for key, value in data.items(): + key = _serialize_value(key) + value = _serialize_value(value) + items.append("%s: %s" % (key, value)) + items.sort() + return '{%s}' % ', '.join(items) + elif isinstance(data, datetime.datetime): + return repr(data.utctimetuple()) + elif isinstance(data, unicode): + return data.encode('utf-8') + else: + return str(data) + +def test_constructor_types(data_filename, code_filename, verbose=False): + _make_objects() + native1 = None + native2 = None + try: + native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader)) + if len(native1) == 1: + native1 = native1[0] + native2 = _load_code(open(code_filename, 'rb').read()) try: - data1 = list(load_all(file(data_filename, 'rb'), Loader=MyLoader)) - if len(data1) == 1: - data1 = data1[0] - data2 = eval(file(code_filename, 'rb').read()) - self.failUnlessEqual(type(data1), type(data2)) - try: - self.failUnlessEqual(data1, data2) - except (AssertionError, TypeError): - if isinstance(data1, dict): - data1 = [(repr(key), value) for key, value in data1.items()] - data1.sort() - data1 = repr(data1) - data2 = [(repr(key), value) for key, value in data2.items()] - data2.sort() - data2 = repr(data2) - if data1 != data2: - raise - elif isinstance(data1, list): - self.failUnlessEqual(type(data1), type(data2)) - self.failUnlessEqual(len(data1), len(data2)) - for item1, item2 in zip(data1, data2): - if (item1 != item1 or (item1 == 0.0 and item1 == 1.0)) and \ - (item2 != item2 or (item2 == 0.0 and item2 == 1.0)): - continue - if isinstance(item1, datetime.datetime) \ - and isinstance(item2, datetime.datetime): - self.failUnlessEqual(item1.microsecond, - item2.microsecond) - if isinstance(item1, datetime.datetime): - item1 = item1.utctimetuple() - if isinstance(item2, datetime.datetime): - item2 = item2.utctimetuple() - self.failUnlessEqual(item1, item2) - else: - raise - except: - print - print "DATA:" - print file(data_filename, 'rb').read() - print "CODE:" - print file(code_filename, 'rb').read() - print "NATIVES1:", data1 - print "NATIVES2:", data2 - raise - -TestConstructorTypes.add_tests('testTypes', '.data', '.code') + if native1 == native2: + return + except TypeError: + pass + if verbose: + print "SERIALIZED NATIVE1:" + print _serialize_value(native1) + print "SERIALIZED NATIVE2:" + print _serialize_value(native2) + assert _serialize_value(native1) == _serialize_value(native2), (native1, native2) + finally: + if verbose: + print "NATIVE1:" + pprint.pprint(native1) + print "NATIVE2:" + pprint.pprint(native2) + +test_constructor_types.unittest = ['.data', '.code'] + +if __name__ == '__main__': + import sys, test_constructor + sys.modules['test_constructor'] = sys.modules['__main__'] + import test_appliance + test_appliance.run(globals()) diff --git a/tests/test_emitter.py b/tests/test_emitter.py index 77e3728..61fd941 100644 --- a/tests/test_emitter.py +++ b/tests/test_emitter.py @@ -1,91 +1,72 @@ -import test_appliance, sys, StringIO - -from yaml import * import yaml -class TestEmitter(test_appliance.TestAppliance): - - def _testEmitterOnData(self, test_name, canonical_filename, data_filename): - self._testEmitter(test_name, data_filename) - - def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename): - self._testEmitter(test_name, canonical_filename, False) - - def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename): - self._testEmitter(test_name, canonical_filename, True) - - def _testEmitter(self, test_name, filename, canonical=None): - events = list(parse(file(filename, 'rb'))) - #self._dump(filename, events, canonical) - stream = StringIO.StringIO() - emit(events, stream, canonical=canonical) - data = stream.getvalue() - new_events = list(parse(data)) - for event, new_event in zip(events, new_events): - self.failUnlessEqual(event.__class__, new_event.__class__) - if isinstance(event, NodeEvent): - self.failUnlessEqual(event.anchor, new_event.anchor) - if isinstance(event, CollectionStartEvent): - self.failUnlessEqual(event.tag, new_event.tag) - if isinstance(event, ScalarEvent): - #self.failUnlessEqual(event.implicit, new_event.implicit) - if True not in event.implicit+new_event.implicit: - self.failUnlessEqual(event.tag, new_event.tag) - self.failUnlessEqual(event.value, new_event.value) - - def _testEmitterStyles(self, test_name, canonical_filename, data_filename): - for filename in [canonical_filename, data_filename]: - events = list(parse(file(filename, 'rb'))) - for flow_style in [False, True]: - for style in ['|', '>', '"', '\'', '']: - styled_events = [] - for event in events: - if isinstance(event, ScalarEvent): - event = ScalarEvent(event.anchor, event.tag, - event.implicit, event.value, style=style) - elif isinstance(event, SequenceStartEvent): - event = SequenceStartEvent(event.anchor, event.tag, - event.implicit, flow_style=flow_style) - elif isinstance(event, MappingStartEvent): - event = MappingStartEvent(event.anchor, event.tag, - event.implicit, flow_style=flow_style) - styled_events.append(event) - stream = StringIO.StringIO() - emit(styled_events, stream) - data = stream.getvalue() - #print data - new_events = list(parse(data)) - for event, new_event in zip(events, new_events): - self.failUnlessEqual(event.__class__, new_event.__class__) - if isinstance(event, NodeEvent): - self.failUnlessEqual(event.anchor, new_event.anchor) - if isinstance(event, CollectionStartEvent): - self.failUnlessEqual(event.tag, new_event.tag) - if isinstance(event, ScalarEvent): - #self.failUnlessEqual(event.implicit, new_event.implicit) - if True not in event.implicit+new_event.implicit: - self.failUnlessEqual(event.tag, new_event.tag) - self.failUnlessEqual(event.value, new_event.value) - - - def _dump(self, filename, events, canonical): - print "="*30 - print "ORIGINAL DOCUMENT:" - print file(filename, 'rb').read() - print '-'*30 - print "EMITTED DOCUMENT:" - emit(events, sys.stdout, canonical=canonical) - -TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data') -TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical') -TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical') -TestEmitter.add_tests('testEmitterStyles', '.canonical', '.data') - -class EventsLoader(Loader): +def _compare_events(events1, events2): + assert len(events1) == len(events2), (events1, events2) + for event1, event2 in zip(events1, events2): + assert event1.__class__ == event2.__class__, (event1, event2) + if isinstance(event1, yaml.NodeEvent): + assert event1.anchor == event2.anchor, (event1, event2) + if isinstance(event1, yaml.CollectionStartEvent): + assert event1.tag == event2.tag, (event1, event2) + if isinstance(event1, yaml.ScalarEvent): + if True not in event1.implicit+event2.implicit: + assert event1.tag == event2.tag, (event1, event2) + assert event1.value == event2.value, (event1, event2) + +def test_emitter_on_data(data_filename, canonical_filename, verbose=False): + events = list(yaml.parse(open(data_filename, 'rb'))) + output = yaml.emit(events) + if verbose: + print "OUTPUT:" + print output + new_events = list(yaml.parse(output)) + _compare_events(events, new_events) + +test_emitter_on_data.unittest = ['.data', '.canonical'] + +def test_emitter_on_canonical(canonical_filename, verbose=False): + events = list(yaml.parse(open(canonical_filename, 'rb'))) + for canonical in [False, True]: + output = yaml.emit(events, canonical=canonical) + if verbose: + print "OUTPUT (canonical=%s):" % canonical + print output + new_events = list(yaml.parse(output)) + _compare_events(events, new_events) + +test_emitter_on_canonical.unittest = ['.canonical'] + +def test_emitter_styles(data_filename, canonical_filename, verbose=False): + for filename in [data_filename, canonical_filename]: + events = list(yaml.parse(open(filename, 'rb'))) + for flow_style in [False, True]: + for style in ['|', '>', '"', '\'', '']: + styled_events = [] + for event in events: + if isinstance(event, yaml.ScalarEvent): + event = yaml.ScalarEvent(event.anchor, event.tag, + event.implicit, event.value, style=style) + elif isinstance(event, yaml.SequenceStartEvent): + event = yaml.SequenceStartEvent(event.anchor, event.tag, + event.implicit, flow_style=flow_style) + elif isinstance(event, yaml.MappingStartEvent): + event = yaml.MappingStartEvent(event.anchor, event.tag, + event.implicit, flow_style=flow_style) + styled_events.append(event) + output = yaml.emit(styled_events) + if verbose: + print "OUTPUT (filename=%r, flow_style=%r, style=%r)" % (filename, flow_style, style) + print output + new_events = list(yaml.parse(output)) + _compare_events(events, new_events) + +test_emitter_styles.unittest = ['.data', '.canonical'] + +class EventsLoader(yaml.Loader): def construct_event(self, node): - if isinstance(node, ScalarNode): + if isinstance(node, yaml.ScalarNode): mapping = {} else: mapping = self.construct_mapping(node) @@ -104,34 +85,16 @@ class EventsLoader(Loader): EventsLoader.add_constructor(None, EventsLoader.construct_event) -class TestEmitterEvents(test_appliance.TestAppliance): - - def _testEmitterEvents(self, test_name, events_filename): - events = list(load(file(events_filename, 'rb'), Loader=EventsLoader)) - #self._dump(events_filename, events) - stream = StringIO.StringIO() - emit(events, stream) - data = stream.getvalue() - new_events = list(parse(data)) - self.failUnlessEqual(len(events), len(new_events)) - for event, new_event in zip(events, new_events): - self.failUnlessEqual(event.__class__, new_event.__class__) - if isinstance(event, NodeEvent): - self.failUnlessEqual(event.anchor, new_event.anchor) - if isinstance(event, CollectionStartEvent): - self.failUnlessEqual(event.tag, new_event.tag) - if isinstance(event, ScalarEvent): - self.failUnless(event.implicit == new_event.implicit - or event.tag == new_event.tag) - self.failUnlessEqual(event.value, new_event.value) - - def _dump(self, events_filename, events): - print "="*30 - print "EVENTS:" - print file(events_filename, 'rb').read() - print '-'*30 +def test_emitter_events(events_filename, verbose=False): + events = list(yaml.load(open(events_filename, 'rb'), Loader=EventsLoader)) + output = yaml.emit(events) + if verbose: print "OUTPUT:" - emit(events, sys.stdout) - -TestEmitterEvents.add_tests('testEmitterEvents', '.events') + print output + new_events = list(yaml.parse(output)) + _compare_events(events, new_events) + +if __name__ == '__main__': + import test_appliance + test_appliance.run(globals()) diff --git a/tests/test_errors.py b/tests/test_errors.py index 7d6e0d2..5fd7c46 100644 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -1,91 +1,66 @@ -import test_appliance -import test_emitter - -import StringIO - -from yaml import * - -class TestErrors(test_appliance.TestAppliance): - - def _testLoaderErrors(self, test_name, invalid_filename): - #self._load(invalid_filename) - self.failUnlessRaises(YAMLError, lambda: self._load(invalid_filename)) - - def _testLoaderStringErrors(self, test_name, invalid_filename): - #self._load_string(invalid_filename) - self.failUnlessRaises(YAMLError, lambda: self._load_string(invalid_filename)) - - def _testLoaderSingleErrors(self, test_name, invalid_filename): - #self._load_single(invalid_filename) - self.failUnlessRaises(YAMLError, lambda: self._load_single(invalid_filename)) - - def _testEmitterErrors(self, test_name, invalid_filename): - events = list(load(file(invalid_filename, 'rb').read(), - Loader=test_emitter.EventsLoader)) - self.failUnlessRaises(YAMLError, lambda: self._emit(events)) - - def _testDumperErrors(self, test_name, invalid_filename): - code = file(invalid_filename, 'rb').read() - self.failUnlessRaises(YAMLError, lambda: self._dump(code)) - - def _dump(self, code): - try: - exec code - except YAMLError, exc: - #print '.'*70 - #print "%s:" % exc.__class__.__name__, exc - raise - - def _emit(self, events): - try: - emit(events) - except YAMLError, exc: - #print '.'*70 - #print "%s:" % exc.__class__.__name__, exc - raise - - def _load(self, filename): - try: - return list(load_all(file(filename, 'rb'))) - except YAMLError, exc: - #except ScannerError, exc: - #except ParserError, exc: - #except ComposerError, exc: - #except ConstructorError, exc: - #print '.'*70 - #print "%s:" % exc.__class__.__name__, exc - raise - - def _load_string(self, filename): - try: - return list(load_all(file(filename, 'rb').read())) - except YAMLError, exc: - #except ScannerError, exc: - #except ParserError, exc: - #except ComposerError, exc: - #except ConstructorError, exc: - #print '.'*70 - #print "%s:" % filename - #print "%s:" % exc.__class__.__name__, exc - raise - - def _load_single(self, filename): - try: - return load(file(filename, 'rb').read()) - except YAMLError, exc: - #except ScannerError, exc: - #except ParserError, exc: - #except ComposerError, exc: - #except ConstructorError, exc: - #print '.'*70 - #print "%s:" % filename - #print "%s:" % exc.__class__.__name__, exc - raise - -TestErrors.add_tests('testLoaderErrors', '.loader-error') -TestErrors.add_tests('testLoaderStringErrors', '.loader-error') -TestErrors.add_tests('testLoaderSingleErrors', '.single-loader-error') -TestErrors.add_tests('testEmitterErrors', '.emitter-error') -TestErrors.add_tests('testDumperErrors', '.dumper-error') +import yaml, test_emitter + +def test_loader_error(error_filename, verbose=False): + try: + list(yaml.load_all(open(error_filename, 'rb'))) + except yaml.YAMLError, exc: + if verbose: + print "%s:" % exc.__class__.__name__, exc + else: + raise AssertionError("expected an exception") + +test_loader_error.unittest = ['.loader-error'] + +def test_loader_error_string(error_filename, verbose=False): + try: + list(yaml.load_all(open(error_filename, 'rb').read())) + except yaml.YAMLError, exc: + if verbose: + print "%s:" % exc.__class__.__name__, exc + else: + raise AssertionError("expected an exception") + +test_loader_error_string.unittest = ['.loader-error'] + +def test_loader_error_single(error_filename, verbose=False): + try: + yaml.load(open(error_filename, 'rb').read()) + except yaml.YAMLError, exc: + if verbose: + print "%s:" % exc.__class__.__name__, exc + else: + raise AssertionError("expected an exception") + +test_loader_error_single.unittest = ['.single-loader-error'] + +def test_emitter_error(error_filename, verbose=False): + events = list(yaml.load(open(error_filename, 'rb'), + Loader=test_emitter.EventsLoader)) + try: + yaml.emit(events) + except yaml.YAMLError, exc: + if verbose: + print "%s:" % exc.__class__.__name__, exc + else: + raise AssertionError("expected an exception") + +test_emitter_error.unittest = ['.emitter-error'] + +def test_dumper_error(error_filename, verbose=False): + code = open(error_filename, 'rb').read() + try: + import yaml, StringIO + exec code + except yaml.YAMLError, exc: + if verbose: + print "%s:" % exc.__class__.__name__, exc + else: + raise AssertionError("expected an exception") + +test_dumper_error.unittest = ['.dumper-error'] + +if __name__ == '__main__': + import test_appliance + test_appliance.run(globals()) diff --git a/tests/test_mark.py b/tests/test_mark.py index 4fa665e..f30a121 100644 --- a/tests/test_mark.py +++ b/tests/test_mark.py @@ -1,34 +1,32 @@ -import test_appliance +import yaml -from yaml.reader import Mark +def test_marks(marks_filename, verbose=False): + inputs = open(marks_filename, 'rb').read().split('---\n')[1:] + for input in inputs: + index = 0 + line = 0 + column = 0 + while input[index] != '*': + if input[index] == '\n': + line += 1 + column = 0 + else: + column += 1 + index += 1 + mark = yaml.Mark(marks_filename, index, line, column, unicode(input), index) + snippet = mark.get_snippet(indent=2, max_length=79) + if verbose: + print snippet + assert isinstance(snippet, str), type(snippet) + assert snippet.count('\n') == 1, snippet.count('\n') + data, pointer = snippet.split('\n') + assert len(data) < 82, len(data) + assert data[len(pointer)-1] == '*', data[len(pointer)-1] -class TestMark(test_appliance.TestAppliance): +test_marks.unittest = ['.marks'] - def _testMarks(self, test_name, marks_filename): - inputs = file(marks_filename, 'rb').read().split('---\n')[1:] - for input in inputs: - index = 0 - line = 0 - column = 0 - while input[index] != '*': - if input[index] == '\n': - line += 1 - column = 0 - else: - column += 1 - index += 1 - mark = Mark(test_name, index, line, column, unicode(input), index) - snippet = mark.get_snippet(indent=2, max_length=79) - #print "INPUT:" - #print input - #print "SNIPPET:" - #print snippet - self.failUnless(isinstance(snippet, str)) - self.failUnlessEqual(snippet.count('\n'), 1) - data, pointer = snippet.split('\n') - self.failUnless(len(data) < 82) - self.failUnlessEqual(data[len(pointer)-1], '*') - -TestMark.add_tests('testMarks', '.marks') +if __name__ == '__main__': + import test_appliance + test_appliance.run(globals()) diff --git a/tests/test_reader.py b/tests/test_reader.py index 1bfae1a..3576ae6 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -1,44 +1,35 @@ -import test_appliance -from yaml.reader import Reader, ReaderError - +import yaml.reader import codecs -class TestReaderErrors(test_appliance.TestAppliance): - - def _testReaderUnicodeErrors(self, test_name, stream_filename): - for encoding in ['utf-8', 'utf-16-le', 'utf-16-be']: - try: - data = unicode(file(stream_filename, 'rb').read(), encoding) - break - except: - pass - else: - return - #self._load(data) - self.failUnlessRaises(ReaderError, - lambda: self._load(data)) - #self._load(codecs.open(stream_filename, encoding=encoding)) - self.failUnlessRaises(ReaderError, - lambda: self._load(codecs.open(stream_filename, encoding=encoding))) - - def _testReaderStringErrors(self, test_name, stream_filename): - data = file(stream_filename, 'rb').read() - #self._load(data) - self.failUnlessRaises(ReaderError, lambda: self._load(data)) - - def _testReaderFileErrors(self, test_name, stream_filename): - data = file(stream_filename, 'rb') - #self._load(data) - self.failUnlessRaises(ReaderError, lambda: self._load(data)) - - def _load(self, data): - stream = Reader(data) +def _run_reader(data, verbose): + try: + stream = yaml.reader.Reader(data) while stream.peek() != u'\0': stream.forward() - -TestReaderErrors.add_tests('testReaderUnicodeErrors', '.stream-error') -TestReaderErrors.add_tests('testReaderStringErrors', '.stream-error') -TestReaderErrors.add_tests('testReaderFileErrors', '.stream-error') - + except yaml.reader.ReaderError, exc: + if verbose: + print exc + else: + raise AssertionError("expected an exception") + +def test_stream_error(error_filename, verbose=False): + _run_reader(open(error_filename, 'rb'), verbose) + _run_reader(open(error_filename, 'rb').read(), verbose) + for encoding in ['utf-8', 'utf-16-le', 'utf-16-be']: + try: + data = unicode(open(error_filename, 'rb').read(), encoding) + break + except UnicodeDecodeError: + pass + else: + return + _run_reader(data, verbose) + _run_reader(codecs.open(error_filename, encoding=encoding), verbose) + +test_stream_error.unittest = ['.stream-error'] + +if __name__ == '__main__': + import test_appliance + test_appliance.run(globals()) diff --git a/tests/test_recursive.py b/tests/test_recursive.py index 3c09264..b2dca3c 100644 --- a/tests/test_recursive.py +++ b/tests/test_recursive.py @@ -1,7 +1,5 @@ -import test_appliance - -from yaml import * +import yaml class AnInstance: @@ -24,29 +22,29 @@ class AnInstanceWithState(AnInstance): def __setstate__(self, state): self.foo, self.bar = state['attributes'] -class TestRecursive(test_appliance.TestAppliance): - - def _testRecursive(self, test_name, recursive_filename): - exec file(recursive_filename, 'r').read() - value1 = value - output1 = None - value2 = None - output2 = None - try: - output1 = dump(value1) - #print "OUTPUT %s:" % test_name - #print output1 - value2 = load(output1) - output2 = dump(value2) - self.failUnlessEqual(output1, output2) - except: +def test_recursive(recursive_filename, verbose=False): + exec open(recursive_filename, 'rb').read() + value1 = value + output1 = None + value2 = None + output2 = None + try: + output1 = yaml.dump(value1) + value2 = yaml.load(output1) + output2 = yaml.dump(value2) + assert output1 == output2, (output1, output2) + finally: + if verbose: print "VALUE1:", value1 print "VALUE2:", value2 print "OUTPUT1:" print output1 print "OUTPUT2:" print output2 - raise -TestRecursive.add_tests('testRecursive', '.recursive') +test_recursive.unittest = ['.recursive'] + +if __name__ == '__main__': + import test_appliance + test_appliance.run(globals()) diff --git a/tests/test_representer.py b/tests/test_representer.py index 6746146..f814705 100644 --- a/tests/test_representer.py +++ b/tests/test_representer.py @@ -1,60 +1,42 @@ -import test_appliance -from test_constructor import * +import yaml +import test_constructor +import pprint -from yaml import * - -class TestRepresenterTypes(test_appliance.TestAppliance): - - def _testTypesUnicode(self, test_name, data_filename, code_filename): - return self._testTypes(test_name, data_filename, code_filename, allow_unicode=True) - - def _testTypes(self, test_name, data_filename, code_filename, allow_unicode=False): - data1 = eval(file(code_filename, 'rb').read()) - data2 = None - output = None +def test_representer_types(code_filename, verbose=False): + test_constructor._make_objects() + for allow_unicode in [False, True]: + native1 = test_constructor._load_code(open(code_filename, 'rb').read()) + native2 = None try: - output = dump(data1, Dumper=MyDumper, allow_unicode=allow_unicode) - data2 = load(output, Loader=MyLoader) - self.failUnlessEqual(type(data1), type(data2)) + output = yaml.dump(native1, Dumper=test_constructor.MyDumper, + allow_unicode=allow_unicode) + native2 = yaml.load(output, Loader=test_constructor.MyLoader) try: - self.failUnlessEqual(data1, data2) - except (AssertionError, TypeError): - if isinstance(data1, dict): - data1 = [(repr(key), value) for key, value in data1.items()] - data1.sort() - data1 = repr(data1) - data2 = [(repr(key), value) for key, value in data2.items()] - data2.sort() - data2 = repr(data2) - if data1 != data2: - raise - elif isinstance(data1, list): - self.failUnlessEqual(type(data1), type(data2)) - self.failUnlessEqual(len(data1), len(data2)) - for item1, item2 in zip(data1, data2): - if (item1 != item1 or (item1 == 0.0 and item1 == 1.0)) and \ - (item2 != item2 or (item2 == 0.0 and item2 == 1.0)): - continue - if isinstance(item1, datetime.datetime) \ - and isinstance(item2, datetime.datetime): - self.failUnlessEqual(item1.microsecond, - item2.microsecond) - if isinstance(item1, datetime.datetime): - item1 = item1.utctimetuple() - if isinstance(item2, datetime.datetime): - item2 = item2.utctimetuple() - self.failUnlessEqual(item1, item2) - else: - raise - except: - print - print "OUTPUT:" - print output - print "NATIVES1:", data1 - print "NATIVES2:", data2 - raise + if native1 == native2: + continue + except TypeError: + pass + value1 = test_constructor._serialize_value(native1) + value2 = test_constructor._serialize_value(native2) + if verbose: + print "SERIALIZED NATIVE1:" + print value1 + print "SERIALIZED NATIVE2:" + print value2 + assert value1 == value2, (native1, native2) + finally: + if verbose: + print "NATIVE1:" + pprint.pprint(native1) + print "NATIVE2:" + pprint.pprint(native2) + print "OUTPUT:" + print output + +test_representer_types.unittest = ['.code'] -TestRepresenterTypes.add_tests('testTypes', '.data', '.code') -TestRepresenterTypes.add_tests('testTypesUnicode', '.data', '.code') +if __name__ == '__main__': + import test_appliance + test_appliance.run(globals()) diff --git a/tests/test_resolver.py b/tests/test_resolver.py index a1845d8..5566750 100644 --- a/tests/test_resolver.py +++ b/tests/test_resolver.py @@ -1,83 +1,92 @@ -import test_appliance - -from yaml import * - -class MyLoader(Loader): - pass - -class MyDumper(Dumper): - pass - -add_path_resolver(u'!root', [], - Loader=MyLoader, Dumper=MyDumper) - -add_path_resolver(u'!root/scalar', [], str, - Loader=MyLoader, Dumper=MyDumper) - -add_path_resolver(u'!root/key11/key12/*', ['key11', 'key12'], - Loader=MyLoader, Dumper=MyDumper) - -add_path_resolver(u'!root/key21/1/*', ['key21', 1], - Loader=MyLoader, Dumper=MyDumper) - -add_path_resolver(u'!root/key31/*/*/key14/map', ['key31', None, None, 'key14'], dict, - Loader=MyLoader, Dumper=MyDumper) - -class TestResolver(test_appliance.TestAppliance): - - def _testImplicitResolver(self, test_name, data_filename, detect_filename): - node = None - correct_tag = None - try: - correct_tag = file(detect_filename, 'rb').read().strip() - node = compose(file(data_filename, 'rb')) - self.failUnless(isinstance(node, SequenceNode)) - for scalar in node.value: - self.failUnless(isinstance(scalar, ScalarNode)) - self.failUnlessEqual(scalar.tag, correct_tag) - except: - print - print "DATA:" - print file(data_filename, 'rb').read() - print "CORRECT_TAG:" - print file(detect_filename, 'rb').read() - print "ROOT NODE:", node - print "SCALAR NODES:", node.value - raise - - def _testPathResolverLoader(self, test_name, data_filename, path_filename): - #print serialize_all(compose_all(file(data_filename, 'rb').read(), Loader=MyLoader)) - nodes1 = compose_all(file(data_filename, 'rb').read(), Loader=MyLoader) - nodes2 = compose_all(file(path_filename, 'rb').read()) +import yaml +import pprint + +def test_implicit_resolver(data_filename, detect_filename, verbose=False): + correct_tag = None + node = None + try: + correct_tag = open(detect_filename, 'rb').read().strip() + node = yaml.compose(open(data_filename, 'rb')) + assert isinstance(node, yaml.SequenceNode), node + for scalar in node.value: + assert isinstance(scalar, yaml.ScalarNode), scalar + assert scalar.tag == correct_tag, (scalar.tag, correct_tag) + finally: + if verbose: + print "CORRECT TAG:", correct_tag + if hasattr(node, 'value'): + print "CHILDREN:" + pprint.pprint(node.value) + +test_implicit_resolver.unittest = ['.data', '.detect'] + +def _make_path_loader_and_dumper(): + global MyLoader, MyDumper + + class MyLoader(yaml.Loader): + pass + class MyDumper(yaml.Dumper): + pass + + yaml.add_path_resolver(u'!root', [], + Loader=MyLoader, Dumper=MyDumper) + yaml.add_path_resolver(u'!root/scalar', [], str, + Loader=MyLoader, Dumper=MyDumper) + yaml.add_path_resolver(u'!root/key11/key12/*', ['key11', 'key12'], + Loader=MyLoader, Dumper=MyDumper) + yaml.add_path_resolver(u'!root/key21/1/*', ['key21', 1], + Loader=MyLoader, Dumper=MyDumper) + yaml.add_path_resolver(u'!root/key31/*/*/key14/map', ['key31', None, None, 'key14'], dict, + Loader=MyLoader, Dumper=MyDumper) + + return MyLoader, MyDumper + +def _convert_node(node): + if isinstance(node, yaml.ScalarNode): + return (node.tag, node.value) + elif isinstance(node, yaml.SequenceNode): + value = [] + for item in node.value: + value.append(_convert_node(item)) + return (node.tag, value) + elif isinstance(node, yaml.MappingNode): + value = [] + for key, item in node.value: + value.append((_convert_node(key), _convert_node(item))) + return (node.tag, value) + +def test_path_resolver_loader(data_filename, path_filename, verbose=False): + _make_path_loader_and_dumper() + nodes1 = list(yaml.compose_all(open(data_filename, 'rb').read(), Loader=MyLoader)) + nodes2 = list(yaml.compose_all(open(path_filename, 'rb').read())) + try: for node1, node2 in zip(nodes1, nodes2): - self.failUnlessEqual(self._convert(node1), self._convert(node2)) - - def _testPathResolverDumper(self, test_name, data_filename, path_filename): - for filename in [data_filename, path_filename]: - output = serialize_all(compose_all(file(filename, 'rb').read()), Dumper=MyDumper) - #print output - nodes1 = compose_all(output) - nodes2 = compose_all(file(data_filename, 'rb').read()) - for node1, node2 in zip(nodes1, nodes2): - self.failUnlessEqual(self._convert(node1), self._convert(node2)) + data1 = _convert_node(node1) + data2 = _convert_node(node2) + assert data1 == data2, (data1, data2) + finally: + if verbose: + print yaml.serialize_all(nodes1) + +test_path_resolver_loader.unittest = ['.data', '.path'] + +def test_path_resolver_dumper(data_filename, path_filename, verbose=False): + _make_path_loader_and_dumper() + for filename in [data_filename, path_filename]: + output = yaml.serialize_all(yaml.compose_all(open(filename, 'rb')), Dumper=MyDumper) + if verbose: + print output + nodes1 = yaml.compose_all(output) + nodes2 = yaml.compose_all(open(data_filename, 'rb')) + for node1, node2 in zip(nodes1, nodes2): + data1 = _convert_node(node1) + data2 = _convert_node(node2) + assert data1 == data2, (data1, data2) - def _convert(self, node): - if isinstance(node, ScalarNode): - return node.tag, node.value - elif isinstance(node, SequenceNode): - value = [] - for item in node.value: - value.append(self._convert(item)) - return node.tag, value - elif isinstance(node, MappingNode): - value = [] - for key, item in node.value: - value.append((self._convert(key), self._convert(item))) - value.sort() - return node.tag, value +test_path_resolver_dumper.unittest = ['.data', '.path'] -TestResolver.add_tests('testImplicitResolver', '.data', '.detect') -TestResolver.add_tests('testPathResolverLoader', '.data', '.path') -TestResolver.add_tests('testPathResolverDumper', '.data', '.path') +if __name__ == '__main__': + import test_appliance + test_appliance.run(globals()) diff --git a/tests/test_structure.py b/tests/test_structure.py index 0aef982..61bcb80 100644 --- a/tests/test_structure.py +++ b/tests/test_structure.py @@ -1,228 +1,187 @@ -import test_appliance - -from yaml import * - -class TestStructure(test_appliance.TestAppliance): - - def _testStructure(self, test_name, data_filename, structure_filename): - node1 = None - node2 = eval(file(structure_filename, 'rb').read()) - try: - loader = Loader(file(data_filename, 'rb')) - node1 = [] - while not loader.check_event(StreamEndEvent): - if not loader.check_event(StreamStartEvent, DocumentStartEvent, DocumentEndEvent): - node1.append(self._convert(loader)) - else: - loader.get_event() - loader.get_event() - if len(node1) == 1: - node1 = node1[0] - self.failUnlessEqual(node1, node2) - except: - print - print "DATA:" - print file(data_filename, 'rb').read() - print "NODE1:", node1 - print "NODE2:", node2 - raise - - def _convert(self, loader): - if loader.check_event(ScalarEvent): - event = loader.get_event() - if event.tag or event.anchor or event.value: - return True - else: - return None - elif loader.check_event(SequenceStartEvent): - loader.get_event() - sequence = [] - while not loader.check_event(SequenceEndEvent): - sequence.append(self._convert(loader)) - loader.get_event() - return sequence - elif loader.check_event(MappingStartEvent): - loader.get_event() - mapping = [] - while not loader.check_event(MappingEndEvent): - key = self._convert(loader) - value = self._convert(loader) - mapping.append((key, value)) - loader.get_event() - return mapping - elif loader.check_event(AliasEvent): - loader.get_event() - return '*' +import yaml, canonical +import pprint + +def _convert_structure(loader): + if loader.check_event(yaml.ScalarEvent): + event = loader.get_event() + if event.tag or event.anchor or event.value: + return True else: - loader.get_event() - return '?' - -TestStructure.add_tests('testStructure', '.data', '.structure') - -class TestParser(test_appliance.TestAppliance): - - def _testParser(self, test_name, data_filename, canonical_filename): - events1 = None - events2 = None - try: - events1 = list(parse(file(data_filename, 'rb'))) - events2 = list(test_appliance.canonical_parse(file(canonical_filename, 'rb'))) - self._compare(events1, events2) - except: - print - print "DATA1:" - print file(data_filename, 'rb').read() - print "DATA2:" - print file(canonical_filename, 'rb').read() - print "EVENTS1:", events1 - print "EVENTS2:", events2 - raise - - def _compare(self, events1, events2): - self.failUnlessEqual(len(events1), len(events2)) - for event1, event2 in zip(events1, events2): - self.failUnlessEqual(event1.__class__, event2.__class__) - if isinstance(event1, AliasEvent): - #self.failUnlessEqual(event1.name, event2.name) - pass - elif isinstance(event1, ScalarEvent): - #self.failUnlessEqual(event1.anchor, event2.anchor) - #self.failUnlessEqual(event1.tag, event2.tag) - self.failUnlessEqual(event1.value, event2.value) - if isinstance(event1, CollectionStartEvent): - #self.failUnlessEqual(event1.anchor, event2.anchor) - #self.failUnlessEqual(event1.tag, event2.tag) - pass - -TestParser.add_tests('testParser', '.data', '.canonical') - -class TestResolver(test_appliance.TestAppliance): - - def _testResolver(self, test_name, data_filename, canonical_filename): - nodes1 = None - nodes2 = None - try: - nodes1 = list(compose_all(file(data_filename, 'rb'))) - nodes2 = list(test_appliance.canonical_compose_all(file(canonical_filename, 'rb'))) - self.failUnlessEqual(len(nodes1), len(nodes2)) - for node1, node2 in zip(nodes1, nodes2): - self._compare(node1, node2) - except: - print - print "DATA1:" - print file(data_filename, 'rb').read() - print "DATA2:" - print file(canonical_filename, 'rb').read() - print "NODES1:", nodes1 - print "NODES2:", nodes2 - raise - - def _compare(self, node1, node2): - self.failUnlessEqual(node1.__class__, node2.__class__) - if isinstance(node1, ScalarNode): - #self.failUnlessEqual(node1.tag, node2.tag) - self.failUnlessEqual(node1.value, node2.value) - elif isinstance(node1, SequenceNode): - self.failUnlessEqual(len(node1.value), len(node2.value)) - for item1, item2 in zip(node1.value, node2.value): - self._compare(item1, item2) - elif isinstance(node1, MappingNode): - self.failUnlessEqual(len(node1.value), len(node2.value)) - items1 = node1.value.items() - items1.sort(lambda (k1,v1), (k2,v2): cmp((k1.tag,k1.value,v1.tag,v1.value), - (k2.tag,k2.value,v2.tag,v2.value))) - items2 = node2.value.items() - items2.sort(lambda (k1,v1), (k2,v2): cmp((k1.tag,k1.value,v1.tag,v1.value), - (k2.tag,k2.value,v2.tag,v2.value))) - for (key1, value1), (key2, value2) in zip(items1, items2): - self._compare(key1, key2) - self._compare(value1, value2) - -TestResolver.add_tests('testResolver', '.data', '.canonical') - -class MyLoader(Loader): - def construct_sequence(self, node): - return tuple(Loader.construct_sequence(self, node)) - - def construct_mapping(self, node): - pairs = self.construct_pairs(node) - pairs.sort() - return pairs - - def construct_undefined(self, node): - return self.construct_scalar(node) - -MyLoader.add_constructor(u'tag:yaml.org,2002:map', MyLoader.construct_mapping) -MyLoader.add_constructor(None, MyLoader.construct_undefined) - -class MyCanonicalLoader(test_appliance.CanonicalLoader): - - def construct_sequence(self, node): - return tuple(test_appliance.CanonicalLoader.construct_sequence(self, node)) - - def construct_mapping(self, node): - pairs = self.construct_pairs(node) - pairs.sort() - return pairs - - def construct_undefined(self, node): - return self.construct_scalar(node) - -MyCanonicalLoader.add_constructor(u'tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping) -MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined) - -class TestConstructor(test_appliance.TestAppliance): - - def _testConstructor(self, test_name, data_filename, canonical_filename): - data1 = None - data2 = None - try: - data1 = list(load_all(file(data_filename, 'rb'), Loader=MyLoader)) - data2 = list(load_all(file(canonical_filename, 'rb'), Loader=MyCanonicalLoader)) - self.failUnlessEqual(data1, data2) - except: - print - print "DATA1:" - print file(data_filename, 'rb').read() - print "DATA2:" - print file(canonical_filename, 'rb').read() - print "NATIVES1:", data1 - print "NATIVES2:", data2 - raise - -TestConstructor.add_tests('testConstructor', '.data', '.canonical') - -class TestParserOnCanonical(test_appliance.TestAppliance): - - def _testParserOnCanonical(self, test_name, canonical_filename): - events1 = None - events2 = None - try: - events1 = list(parse(file(canonical_filename, 'rb'))) - events2 = list(test_appliance.canonical_parse(file(canonical_filename, 'rb'))) - self._compare(events1, events2) - except: - print - print "DATA:" - print file(canonical_filename, 'rb').read() - print "EVENTS1:", events1 - print "EVENTS2:", events2 - raise - - def _compare(self, events1, events2): - self.failUnlessEqual(len(events1), len(events2)) - for event1, event2 in zip(events1, events2): - self.failUnlessEqual(event1.__class__, event2.__class__) - if isinstance(event1, AliasEvent): - self.failUnlessEqual(event1.anchor, event2.anchor) - elif isinstance(event1, ScalarEvent): - self.failUnlessEqual(event1.anchor, event2.anchor) - self.failUnlessEqual(event1.tag, event2.tag) - self.failUnlessEqual(event1.value, event2.value) - if isinstance(event1, CollectionStartEvent): - self.failUnlessEqual(event1.anchor, event2.anchor) - self.failUnlessEqual(event1.tag, event2.tag) - -TestParserOnCanonical.add_tests('testParserOnCanonical', '.canonical') + return None + elif loader.check_event(yaml.SequenceStartEvent): + loader.get_event() + sequence = [] + while not loader.check_event(yaml.SequenceEndEvent): + sequence.append(_convert_structure(loader)) + loader.get_event() + return sequence + elif loader.check_event(yaml.MappingStartEvent): + loader.get_event() + mapping = [] + while not loader.check_event(yaml.MappingEndEvent): + key = _convert_structure(loader) + value = _convert_structure(loader) + mapping.append((key, value)) + loader.get_event() + return mapping + elif loader.check_event(yaml.AliasEvent): + loader.get_event() + return '*' + else: + loader.get_event() + return '?' + +def test_structure(data_filename, structure_filename, verbose=False): + nodes1 = [] + nodes2 = eval(open(structure_filename, 'rb').read()) + try: + loader = yaml.Loader(open(data_filename, 'rb')) + while loader.check_event(): + if loader.check_event(yaml.StreamStartEvent, yaml.StreamEndEvent, + yaml.DocumentStartEvent, yaml.DocumentEndEvent): + loader.get_event() + continue + nodes1.append(_convert_structure(loader)) + if len(nodes1) == 1: + nodes1 = nodes1[0] + assert nodes1 == nodes2, (nodes1, nodes2) + finally: + if verbose: + print "NODES1:" + pprint.pprint(nodes1) + print "NODES2:" + pprint.pprint(nodes2) + +test_structure.unittest = ['.data', '.structure'] + +def _compare_events(events1, events2, full=False): + assert len(events1) == len(events2), (len(events1), len(events2)) + for event1, event2 in zip(events1, events2): + assert event1.__class__ == event2.__class__, (event1, event2) + if isinstance(event1, yaml.AliasEvent) and full: + assert event1.anchor == event2.anchor, (event1, event2) + if isinstance(event1, (yaml.ScalarEvent, yaml.CollectionStartEvent)): + if (event1.tag not in [None, u'!'] and event2.tag not in [None, u'!']) or full: + assert event1.tag == event2.tag, (event1, event2) + if isinstance(event1, yaml.ScalarEvent): + assert event1.value == event2.value, (event1, event2) + +def test_parser(data_filename, canonical_filename, verbose=False): + events1 = None + events2 = None + try: + events1 = list(yaml.parse(open(data_filename, 'rb'))) + events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb'))) + _compare_events(events1, events2) + finally: + if verbose: + print "EVENTS1:" + pprint.pprint(events1) + print "EVENTS2:" + pprint.pprint(events2) + +test_parser.unittest = ['.data', '.canonical'] + +def test_parser_on_canonical(canonical_filename, verbose=False): + events1 = None + events2 = None + try: + events1 = list(yaml.parse(open(canonical_filename, 'rb'))) + events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb'))) + _compare_events(events1, events2, full=True) + finally: + if verbose: + print "EVENTS1:" + pprint.pprint(events1) + print "EVENTS2:" + pprint.pprint(events2) + +test_parser_on_canonical.unittest = ['.canonical'] + +def _compare_nodes(node1, node2): + assert node1.__class__ == node2.__class__, (node1, node2) + assert node1.tag == node2.tag, (node1, node2) + if isinstance(node1, yaml.ScalarNode): + assert node1.value == node2.value, (node1, node2) + else: + assert len(node1.value) == len(node2.value), (node1, node2) + for item1, item2 in zip(node1.value, node2.value): + if not isinstance(item1, tuple): + item1 = (item1,) + item2 = (item2,) + for subnode1, subnode2 in zip(item1, item2): + _compare_nodes(subnode1, subnode2) + +def test_composer(data_filename, canonical_filename, verbose=False): + nodes1 = None + nodes2 = None + try: + nodes1 = list(yaml.compose_all(open(data_filename, 'rb'))) + nodes2 = list(yaml.canonical_compose_all(open(canonical_filename, 'rb'))) + assert len(nodes1) == len(nodes2), (len(nodes1), len(nodes2)) + for node1, node2 in zip(nodes1, nodes2): + _compare_nodes(node1, node2) + finally: + if verbose: + print "NODES1:" + pprint.pprint(nodes1) + print "NODES2:" + pprint.pprint(nodes2) + +test_composer.unittest = ['.data', '.canonical'] + +def _make_loader(): + global MyLoader + + class MyLoader(yaml.Loader): + def construct_sequence(self, node): + return tuple(yaml.Loader.construct_sequence(self, node)) + def construct_mapping(self, node): + pairs = self.construct_pairs(node) + pairs.sort() + return pairs + def construct_undefined(self, node): + return self.construct_scalar(node) + + MyLoader.add_constructor(u'tag:yaml.org,2002:map', MyLoader.construct_mapping) + MyLoader.add_constructor(None, MyLoader.construct_undefined) + +def _make_canonical_loader(): + global MyCanonicalLoader + + class MyCanonicalLoader(yaml.CanonicalLoader): + def construct_sequence(self, node): + return tuple(yaml.CanonicalLoader.construct_sequence(self, node)) + def construct_mapping(self, node): + pairs = self.construct_pairs(node) + pairs.sort() + return pairs + def construct_undefined(self, node): + return self.construct_scalar(node) + + MyCanonicalLoader.add_constructor(u'tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping) + MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined) + +def test_constructor(data_filename, canonical_filename, verbose=False): + _make_loader() + _make_canonical_loader() + native1 = None + native2 = None + try: + native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader)) + native2 = list(yaml.load_all(open(canonical_filename, 'rb'), Loader=MyCanonicalLoader)) + assert native1 == native2, (native1, native2) + finally: + if verbose: + print "NATIVE1:" + pprint.pprint(native1) + print "NATIVE2:" + pprint.pprint(native2) + +test_constructor.unittest = ['.data', '.canonical'] + +if __name__ == '__main__': + import test_appliance + test_appliance.run(globals()) diff --git a/tests/test_syck.py b/tests/test_syck.py deleted file mode 100644 index dd63056..0000000 --- a/tests/test_syck.py +++ /dev/null @@ -1,30 +0,0 @@ - -import test_appliance - -class TestSyck(test_appliance.TestAppliance): - - def _testSyckOnTokenTests(self, test_name, data_filename, tokens_filename): - try: - syck.parse(file(data_filename, 'rb')) - except: - print - print "DATA:" - print file(data_filename, 'rb').read() - raise - - def _testSyckOnCanonicalTests(self, test_name, data_filename, canonical_filename): - try: - syck.parse(file(data_filename, 'rb')) - except: - print - print "DATA:" - print file(data_filename, 'rb').read() - raise - -try: - import syck - #TestSyck.add_tests('testSyckOnTokenTests', '.data', '.tokens') - #TestSyck.add_tests('testSyckOnCanonicalTests', '.data', '.canonical') -except ImportError: - pass - diff --git a/tests/test_tokens.py b/tests/test_tokens.py index 73d07b3..9613fa0 100644 --- a/tests/test_tokens.py +++ b/tests/test_tokens.py @@ -1,86 +1,77 @@ -import test_appliance +import yaml +import pprint -from yaml import * +# Tokens mnemonic: +# directive: % +# document_start: --- +# document_end: ... +# alias: * +# anchor: & +# tag: ! +# scalar _ +# block_sequence_start: [[ +# block_mapping_start: {{ +# block_end: ]} +# flow_sequence_start: [ +# flow_sequence_end: ] +# flow_mapping_start: { +# flow_mapping_end: } +# entry: , +# key: ? +# value: : -class TestTokens(test_appliance.TestAppliance): +_replaces = { + yaml.DirectiveToken: '%', + yaml.DocumentStartToken: '---', + yaml.DocumentEndToken: '...', + yaml.AliasToken: '*', + yaml.AnchorToken: '&', + yaml.TagToken: '!', + yaml.ScalarToken: '_', + yaml.BlockSequenceStartToken: '[[', + yaml.BlockMappingStartToken: '{{', + yaml.BlockEndToken: ']}', + yaml.FlowSequenceStartToken: '[', + yaml.FlowSequenceEndToken: ']', + yaml.FlowMappingStartToken: '{', + yaml.FlowMappingEndToken: '}', + yaml.BlockEntryToken: ',', + yaml.FlowEntryToken: ',', + yaml.KeyToken: '?', + yaml.ValueToken: ':', +} - # Tokens mnemonic: - # directive: % - # document_start: --- - # document_end: ... - # alias: * - # anchor: & - # tag: ! - # scalar _ - # block_sequence_start: [[ - # block_mapping_start: {{ - # block_end: ]} - # flow_sequence_start: [ - # flow_sequence_end: ] - # flow_mapping_start: { - # flow_mapping_end: } - # entry: , - # key: ? - # value: : +def test_tokens(data_filename, tokens_filename, verbose=False): + tokens1 = [] + tokens2 = open(tokens_filename, 'rb').read().split() + try: + for token in yaml.scan(open(data_filename, 'rb')): + if not isinstance(token, (yaml.StreamStartToken, yaml.StreamEndToken)): + tokens1.append(_replaces[token.__class__]) + finally: + if verbose: + print "TOKENS1:", ' '.join(tokens1) + print "TOKENS2:", ' '.join(tokens2) + assert len(tokens1) == len(tokens2), (tokens1, tokens2) + for token1, token2 in zip(tokens1, tokens2): + assert token1 == token2, (token1, token2) - replaces = { - DirectiveToken: '%', - DocumentStartToken: '---', - DocumentEndToken: '...', - AliasToken: '*', - AnchorToken: '&', - TagToken: '!', - ScalarToken: '_', - BlockSequenceStartToken: '[[', - BlockMappingStartToken: '{{', - BlockEndToken: ']}', - FlowSequenceStartToken: '[', - FlowSequenceEndToken: ']', - FlowMappingStartToken: '{', - FlowMappingEndToken: '}', - BlockEntryToken: ',', - FlowEntryToken: ',', - KeyToken: '?', - ValueToken: ':', - } +test_tokens.unittest = ['.data', '.tokens'] - def _testTokens(self, test_name, data_filename, tokens_filename): - tokens1 = None - tokens2 = file(tokens_filename, 'rb').read().split() +def test_scanner(data_filename, canonical_filename, verbose=False): + for filename in [data_filename, canonical_filename]: + tokens = [] try: - tokens1 = [] - for token in scan(file(data_filename, 'rb')): - if not isinstance(token, (StreamStartToken, StreamEndToken)): - tokens1.append(token) - tokens1 = [self.replaces[t.__class__] for t in tokens1] - self.failUnlessEqual(tokens1, tokens2) - except: - print - print "DATA:" - print file(data_filename, 'rb').read() - print "TOKENS1:", tokens1 - print "TOKENS2:", tokens2 - raise + for token in yaml.scan(open(filename, 'rb')): + tokens.append(token.__class__.__name__) + finally: + if verbose: + pprint.pprint(tokens) -TestTokens.add_tests('testTokens', '.data', '.tokens') +test_scanner.unittest = ['.data', '.canonical'] -class TestScanner(test_appliance.TestAppliance): - - def _testScanner(self, test_name, data_filename, canonical_filename): - for filename in [canonical_filename, data_filename]: - tokens = None - try: - tokens = [] - for token in scan(file(filename, 'rb')): - if not isinstance(token, (StreamStartToken, StreamEndToken)): - tokens.append(token.__class__.__name__) - except: - print - print "DATA:" - print file(data_filename, 'rb').read() - print "TOKENS:", tokens - raise - -TestScanner.add_tests('testScanner', '.data', '.canonical') +if __name__ == '__main__': + import test_appliance + test_appliance.run(globals()) diff --git a/tests/test_yaml.py b/tests/test_yaml.py index d13321f..d195e1a 100644 --- a/tests/test_yaml.py +++ b/tests/test_yaml.py @@ -1,6 +1,4 @@ -import unittest - from test_mark import * from test_reader import * from test_canonical import * @@ -12,11 +10,8 @@ from test_constructor import * from test_emitter import * from test_representer import * from test_recursive import * -from test_syck import * - -def main(module='__main__'): - unittest.main(module) if __name__ == '__main__': - main() + import test_appliance + test_appliance.run(globals()) diff --git a/tests/test_yaml_ext.py b/tests/test_yaml_ext.py index b9a2e58..8840ffd 100644 --- a/tests/test_yaml_ext.py +++ b/tests/test_yaml_ext.py @@ -1,195 +1,273 @@ -import unittest, test_appliance - import _yaml, yaml +import types, pprint -test_appliance.TestAppliance.SKIP_EXT = '.skip-ext' - -class TestCVersion(unittest.TestCase): - - def testCVersion(self): - self.failUnlessEqual("%s.%s.%s" % _yaml.get_version(), _yaml.get_version_string()) +yaml.PyBaseLoader = yaml.BaseLoader +yaml.PySafeLoader = yaml.SafeLoader +yaml.PyLoader = yaml.Loader +yaml.PyBaseDumper = yaml.BaseDumper +yaml.PySafeDumper = yaml.SafeDumper +yaml.PyDumper = yaml.Dumper -class TestCLoader(test_appliance.TestAppliance): - - def _testCScannerFileInput(self, test_name, data_filename, canonical_filename): - self._testCScanner(test_name, data_filename, canonical_filename, True) - - def _testCScanner(self, test_name, data_filename, canonical_filename, file_input=False, Loader=yaml.Loader): - if file_input: - data = file(data_filename, 'r') - else: - data = file(data_filename, 'r').read() - tokens = list(yaml.scan(data, Loader=Loader)) - ext_tokens = [] - try: - if file_input: - data = file(data_filename, 'r') - for token in yaml.scan(data, Loader=yaml.CLoader): - ext_tokens.append(token) - self.failUnlessEqual(len(tokens), len(ext_tokens)) - for token, ext_token in zip(tokens, ext_tokens): - self.failUnlessEqual(token.__class__, ext_token.__class__) - if not isinstance(token, yaml.StreamEndToken): - self.failUnlessEqual((token.start_mark.index, token.start_mark.line, token.start_mark.column), - (ext_token.start_mark.index, ext_token.start_mark.line, ext_token.start_mark.column)) - self.failUnlessEqual((token.end_mark.index, token.end_mark.line, token.end_mark.column), - (ext_token.end_mark.index, ext_token.end_mark.line, ext_token.end_mark.column)) - if hasattr(token, 'value'): - self.failUnlessEqual(token.value, ext_token.value) - except: - print - print "DATA:" - print file(data_filename, 'rb').read() - print "TOKENS:", tokens - print "EXT_TOKENS:", ext_tokens - raise - - def _testCParser(self, test_name, data_filename, canonical_filename, Loader=yaml.Loader): - data = file(data_filename, 'r').read() - events = list(yaml.parse(data, Loader=Loader)) - ext_events = [] - try: - for event in yaml.parse(data, Loader=yaml.CLoader): - ext_events.append(event) - #print "EVENT:", event - self.failUnlessEqual(len(events), len(ext_events)) - for event, ext_event in zip(events, ext_events): - self.failUnlessEqual(event.__class__, ext_event.__class__) - if hasattr(event, 'anchor'): - self.failUnlessEqual(event.anchor, ext_event.anchor) - if hasattr(event, 'tag'): - self.failUnlessEqual(event.tag, ext_event.tag) - if hasattr(event, 'implicit'): - self.failUnlessEqual(event.implicit, ext_event.implicit) - if hasattr(event, 'value'): - self.failUnlessEqual(event.value, ext_event.value) - if hasattr(event, 'explicit'): - self.failUnlessEqual(event.explicit, ext_event.explicit) - if hasattr(event, 'version'): - self.failUnlessEqual(event.version, ext_event.version) - if hasattr(event, 'tags'): - self.failUnlessEqual(event.tags, ext_event.tags) - except: - print - print "DATA:" - print file(data_filename, 'rb').read() - print "EVENTS:", events - print "EXT_EVENTS:", ext_events - raise - -TestCLoader.add_tests('testCScanner', '.data', '.canonical') -TestCLoader.add_tests('testCScannerFileInput', '.data', '.canonical') -TestCLoader.add_tests('testCParser', '.data', '.canonical') - -class TestCEmitter(test_appliance.TestAppliance): - - def _testCEmitter(self, test_name, data_filename, canonical_filename, Loader=yaml.Loader): - data1 = file(data_filename, 'r').read() - events = list(yaml.parse(data1, Loader=Loader)) - data2 = yaml.emit(events, Dumper=yaml.CDumper) - ext_events = [] - try: - for event in yaml.parse(data2): - ext_events.append(event) - self.failUnlessEqual(len(events), len(ext_events)) - for event, ext_event in zip(events, ext_events): - self.failUnlessEqual(event.__class__, ext_event.__class__) - if hasattr(event, 'anchor'): - self.failUnlessEqual(event.anchor, ext_event.anchor) - if hasattr(event, 'tag'): - if not (event.tag in ['!', None] and ext_event.tag in ['!', None]): - self.failUnlessEqual(event.tag, ext_event.tag) - if hasattr(event, 'implicit'): - self.failUnlessEqual(event.implicit, ext_event.implicit) - if hasattr(event, 'value'): - self.failUnlessEqual(event.value, ext_event.value) - if hasattr(event, 'explicit') and event.explicit: - self.failUnlessEqual(event.explicit, ext_event.explicit) - if hasattr(event, 'version'): - self.failUnlessEqual(event.version, ext_event.version) - if hasattr(event, 'tags'): - self.failUnlessEqual(event.tags, ext_event.tags) - except: - print - print "DATA1:" - print data1 - print "DATA2:" - print data2 - print "EVENTS:", events - print "EXT_EVENTS:", ext_events - raise - -TestCEmitter.add_tests('testCEmitter', '.data', '.canonical') - -yaml.BaseLoader = yaml.CBaseLoader -yaml.SafeLoader = yaml.CSafeLoader -yaml.Loader = yaml.CLoader -yaml.BaseDumper = yaml.CBaseDumper -yaml.SafeDumper = yaml.CSafeDumper -yaml.Dumper = yaml.CDumper old_scan = yaml.scan -def scan(stream, Loader=yaml.CLoader): +def new_scan(stream, Loader=yaml.CLoader): return old_scan(stream, Loader) -yaml.scan = scan + old_parse = yaml.parse -def parse(stream, Loader=yaml.CLoader): +def new_parse(stream, Loader=yaml.CLoader): return old_parse(stream, Loader) -yaml.parse = parse + old_compose = yaml.compose -def compose(stream, Loader=yaml.CLoader): +def new_compose(stream, Loader=yaml.CLoader): return old_compose(stream, Loader) -yaml.compose = compose + old_compose_all = yaml.compose_all -def compose_all(stream, Loader=yaml.CLoader): +def new_compose_all(stream, Loader=yaml.CLoader): return old_compose_all(stream, Loader) -yaml.compose_all = compose_all -old_load_all = yaml.load_all -def load_all(stream, Loader=yaml.CLoader): - return old_load_all(stream, Loader) -yaml.load_all = load_all + old_load = yaml.load -def load(stream, Loader=yaml.CLoader): +def new_load(stream, Loader=yaml.CLoader): return old_load(stream, Loader) -yaml.load = load -def safe_load_all(stream): - return yaml.load_all(stream, yaml.CSafeLoader) -yaml.safe_load_all = safe_load_all -def safe_load(stream): - return yaml.load(stream, yaml.CSafeLoader) -yaml.safe_load = safe_load + +old_load_all = yaml.load_all +def new_load_all(stream, Loader=yaml.CLoader): + return old_load_all(stream, Loader) + +old_safe_load = yaml.safe_load +def new_safe_load(stream): + return old_load(stream, yaml.CSafeLoader) + +old_safe_load_all = yaml.safe_load_all +def new_safe_load_all(stream): + return old_load_all(stream, yaml.CSafeLoader) + old_emit = yaml.emit -def emit(events, stream=None, Dumper=yaml.CDumper, **kwds): +def new_emit(events, stream=None, Dumper=yaml.CDumper, **kwds): return old_emit(events, stream, Dumper, **kwds) -yaml.emit = emit -old_serialize_all = yaml.serialize_all -def serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds): - return old_serialize_all(nodes, stream, Dumper, **kwds) -yaml.serialize_all = serialize_all + old_serialize = yaml.serialize -def serialize(node, stream, Dumper=yaml.CDumper, **kwds): +def new_serialize(node, stream, Dumper=yaml.CDumper, **kwds): return old_serialize(node, stream, Dumper, **kwds) -yaml.serialize = serialize -old_dump_all = yaml.dump_all -def dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds): - return old_dump_all(documents, stream, Dumper, **kwds) -yaml.dump_all = dump_all + +old_serialize_all = yaml.serialize_all +def new_serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds): + return old_serialize_all(nodes, stream, Dumper, **kwds) + old_dump = yaml.dump -def dump(data, stream=None, Dumper=yaml.CDumper, **kwds): +def new_dump(data, stream=None, Dumper=yaml.CDumper, **kwds): return old_dump(data, stream, Dumper, **kwds) -yaml.dump = dump -def safe_dump_all(documents, stream=None, **kwds): - return yaml.dump_all(documents, stream, yaml.CSafeDumper, **kwds) -yaml.safe_dump_all = safe_dump_all -def safe_dump(data, stream=None, **kwds): - return yaml.dump(data, stream, yaml.CSafeDumper, **kwds) -yaml.safe_dump = safe_dump -from test_yaml import * +old_dump_all = yaml.dump_all +def new_dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds): + return old_dump_all(documents, stream, Dumper, **kwds) + +old_safe_dump = yaml.safe_dump +def new_safe_dump(data, stream=None, **kwds): + return old_dump(data, stream, yaml.CSafeDumper, **kwds) + +old_safe_dump_all = yaml.safe_dump_all +def new_safe_dump_all(documents, stream=None, **kwds): + return old_dump_all(documents, stream, yaml.CSafeDumper, **kwds) + +def _set_up(): + yaml.BaseLoader = yaml.CBaseLoader + yaml.SafeLoader = yaml.CSafeLoader + yaml.Loader = yaml.CLoader + yaml.BaseDumper = yaml.CBaseDumper + yaml.SafeDumper = yaml.CSafeDumper + yaml.Dumper = yaml.CDumper + yaml.scan = new_scan + yaml.parse = new_parse + yaml.compose = new_compose + yaml.compose_all = new_compose_all + yaml.load = new_load + yaml.load_all = new_load_all + yaml.safe_load = new_safe_load + yaml.safe_load_all = new_safe_load_all + yaml.emit = new_emit + yaml.serialize = new_serialize + yaml.serialize_all = new_serialize_all + yaml.dump = new_dump + yaml.dump_all = new_dump_all + yaml.safe_dump = new_safe_dump + yaml.safe_dump_all = new_safe_dump_all + +def _tear_down(): + yaml.BaseLoader = yaml.PyBaseLoader + yaml.SafeLoader = yaml.PySafeLoader + yaml.Loader = yaml.PyLoader + yaml.BaseDumper = yaml.PyBaseDumper + yaml.SafeDumper = yaml.PySafeDumper + yaml.Dumper = yaml.PyDumper + yaml.scan = old_scan + yaml.parse = old_parse + yaml.compose = old_compose + yaml.compose_all = old_compose_all + yaml.load = old_load + yaml.load_all = old_load_all + yaml.safe_load = old_safe_load + yaml.safe_load_all = old_safe_load_all + yaml.emit = old_emit + yaml.serialize = old_serialize + yaml.serialize_all = old_serialize_all + yaml.dump = old_dump + yaml.dump_all = old_dump_all + yaml.safe_dump = old_safe_dump + yaml.safe_dump_all = old_safe_dump_all + +def test_c_version(verbose=False): + if verbose: + print _yaml.get_version() + print _yaml.get_version_string() + assert ("%s.%s.%s" % _yaml.get_version()) == _yaml.get_version_string(), \ + (_yaml.get_version(), _yaml.get_version_string()) + +def _compare_scanners(py_data, c_data, verbose): + py_tokens = list(yaml.scan(py_data, Loader=yaml.PyLoader)) + c_tokens = [] + try: + for token in yaml.scan(c_data, Loader=yaml.CLoader): + c_tokens.append(token) + assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens)) + for py_token, c_token in zip(py_tokens, c_tokens): + assert py_token.__class__ == c_token.__class__, (py_token, c_token) + if hasattr(py_token, 'value'): + assert py_token.value == c_token.value, (py_token, c_token) + if isinstance(py_token, yaml.StreamEndToken): + continue + py_start = (py_token.start_mark.index, py_token.start_mark.line, py_token.start_mark.column) + py_end = (py_token.end_mark.index, py_token.end_mark.line, py_token.end_mark.column) + c_start = (c_token.start_mark.index, c_token.start_mark.line, c_token.start_mark.column) + c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column) + assert py_start == c_start, (py_start, c_start) + assert py_end == c_end, (py_end, c_end) + finally: + if verbose: + print "PY_TOKENS:" + pprint.pprint(py_tokens) + print "C_TOKENS:" + pprint.pprint(c_tokens) + +def test_c_scanner(data_filename, canonical_filename, verbose=False): + _compare_scanners(open(data_filename, 'rb'), + open(data_filename, 'rb'), verbose) + _compare_scanners(open(data_filename, 'rb').read(), + open(data_filename, 'rb').read(), verbose) + _compare_scanners(open(canonical_filename, 'rb'), + open(canonical_filename, 'rb'), verbose) + _compare_scanners(open(canonical_filename, 'rb').read(), + open(canonical_filename, 'rb').read(), verbose) + +test_c_scanner.unittest = ['.data', '.canonical'] +test_c_scanner.skip = ['.skip-ext'] + +def _compare_parsers(py_data, c_data, verbose): + py_events = list(yaml.parse(py_data, Loader=yaml.PyLoader)) + c_events = [] + try: + for event in yaml.parse(c_data, Loader=yaml.CLoader): + c_events.append(event) + assert len(py_events) == len(c_events), (len(py_events), len(c_events)) + for py_event, c_event in zip(py_events, c_events): + for attribute in ['__class__', 'anchor', 'tag', 'implicit', + 'value', 'explicit', 'version', 'tags']: + py_value = getattr(py_event, attribute, None) + c_value = getattr(c_event, attribute, None) + assert py_value == c_value, (py_event, c_event, attribute) + finally: + if verbose: + print "PY_EVENTS:" + pprint.pprint(py_events) + print "C_EVENTS:" + pprint.pprint(c_events) + +def test_c_parser(data_filename, canonical_filename, verbose=False): + _compare_parsers(open(data_filename, 'rb'), + open(data_filename, 'rb'), verbose) + _compare_parsers(open(data_filename, 'rb').read(), + open(data_filename, 'rb').read(), verbose) + _compare_parsers(open(canonical_filename, 'rb'), + open(canonical_filename, 'rb'), verbose) + _compare_parsers(open(canonical_filename, 'rb').read(), + open(canonical_filename, 'rb').read(), verbose) + +test_c_parser.unittest = ['.data', '.canonical'] +test_c_parser.skip = ['.skip-ext'] + +def _compare_emitters(data, verbose): + events = list(yaml.parse(data, Loader=yaml.PyLoader)) + c_data = yaml.emit(events, Dumper=yaml.CDumper) + if verbose: + print c_data + py_events = list(yaml.parse(c_data, Loader=yaml.PyLoader)) + c_events = list(yaml.parse(c_data, Loader=yaml.CLoader)) + try: + assert len(events) == len(py_events), (len(events), len(py_events)) + assert len(events) == len(c_events), (len(events), len(c_events)) + for event, py_event, c_event in zip(events, py_events, c_events): + for attribute in ['__class__', 'anchor', 'tag', 'implicit', + 'value', 'explicit', 'version', 'tags']: + value = getattr(event, attribute, None) + py_value = getattr(py_event, attribute, None) + c_value = getattr(c_event, attribute, None) + if attribute == 'tag' and value in [None, u'!'] \ + and py_value in [None, u'!'] and c_value in [None, u'!']: + continue + if attribute == 'explicit' and (py_value or c_value): + continue + assert value == py_value, (event, py_event, attribute) + assert value == c_value, (event, c_event, attribute) + finally: + if verbose: + print "EVENTS:" + pprint.pprint(events) + print "PY_EVENTS:" + pprint.pprint(py_events) + print "C_EVENTS:" + pprint.pprint(c_events) + +def test_c_emitter(data_filename, canonical_filename, verbose=False): + _compare_emitters(open(data_filename, 'rb').read(), verbose) + _compare_emitters(open(canonical_filename, 'rb').read(), verbose) + +test_c_emitter.unittest = ['.data', '.canonical'] +test_c_emitter.skip = ['.skip-ext'] + +def wrap_ext_function(function): + def wrapper(*args, **kwds): + _set_up() + try: + function(*args, **kwds) + finally: + _tear_down() + wrapper.func_name = '%s_ext' % function.func_name + wrapper.unittest = function.unittest + wrapper.skip = getattr(function, 'skip', [])+['.skip-ext'] + return wrapper + +def wrap_ext(collections): + functions = [] + if not isinstance(collections, list): + collections = [collections] + for collection in collections: + if not isinstance(collection, dict): + collection = vars(collection) + keys = collection.keys() + keys.sort() + for key in keys: + value = collection[key] + if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'): + functions.append(wrap_ext_function(value)) + for function in functions: + assert function.func_name not in globals() + globals()[function.func_name] = function -def main(module='__main__'): - unittest.main(module) +import test_tokens, test_structure, test_errors, test_resolver, test_constructor, \ + test_emitter, test_representer, test_recursive +wrap_ext([test_tokens, test_structure, test_errors, test_resolver, test_constructor, + test_emitter, test_representer, test_recursive]) if __name__ == '__main__': - main() + import test_appliance + test_appliance.run(globals()) |