diff options
Diffstat (limited to 'tests3/test_yaml_ext.py')
-rw-r--r-- | tests3/test_yaml_ext.py | 273 |
1 files changed, 273 insertions, 0 deletions
diff --git a/tests3/test_yaml_ext.py b/tests3/test_yaml_ext.py new file mode 100644 index 0000000..57c02c6 --- /dev/null +++ b/tests3/test_yaml_ext.py @@ -0,0 +1,273 @@ + +import _yaml, yaml +import types, pprint + +yaml.PyBaseLoader = yaml.BaseLoader +yaml.PySafeLoader = yaml.SafeLoader +yaml.PyLoader = yaml.Loader +yaml.PyBaseDumper = yaml.BaseDumper +yaml.PySafeDumper = yaml.SafeDumper +yaml.PyDumper = yaml.Dumper + +old_scan = yaml.scan +def new_scan(stream, Loader=yaml.CLoader): + return old_scan(stream, Loader) + +old_parse = yaml.parse +def new_parse(stream, Loader=yaml.CLoader): + return old_parse(stream, Loader) + +old_compose = yaml.compose +def new_compose(stream, Loader=yaml.CLoader): + return old_compose(stream, Loader) + +old_compose_all = yaml.compose_all +def new_compose_all(stream, Loader=yaml.CLoader): + return old_compose_all(stream, Loader) + +old_load = yaml.load +def new_load(stream, Loader=yaml.CLoader): + return old_load(stream, Loader) + +old_load_all = yaml.load_all +def new_load_all(stream, Loader=yaml.CLoader): + return old_load_all(stream, Loader) + +old_safe_load = yaml.safe_load +def new_safe_load(stream): + return old_load(stream, yaml.CSafeLoader) + +old_safe_load_all = yaml.safe_load_all +def new_safe_load_all(stream): + return old_load_all(stream, yaml.CSafeLoader) + +old_emit = yaml.emit +def new_emit(events, stream=None, Dumper=yaml.CDumper, **kwds): + return old_emit(events, stream, Dumper, **kwds) + +old_serialize = yaml.serialize +def new_serialize(node, stream, Dumper=yaml.CDumper, **kwds): + return old_serialize(node, stream, Dumper, **kwds) + +old_serialize_all = yaml.serialize_all +def new_serialize_all(nodes, stream=None, Dumper=yaml.CDumper, **kwds): + return old_serialize_all(nodes, stream, Dumper, **kwds) + +old_dump = yaml.dump +def new_dump(data, stream=None, Dumper=yaml.CDumper, **kwds): + return old_dump(data, stream, Dumper, **kwds) + +old_dump_all = yaml.dump_all +def new_dump_all(documents, stream=None, Dumper=yaml.CDumper, **kwds): + return old_dump_all(documents, stream, Dumper, **kwds) + +old_safe_dump = yaml.safe_dump +def new_safe_dump(data, stream=None, **kwds): + return old_dump(data, stream, yaml.CSafeDumper, **kwds) + +old_safe_dump_all = yaml.safe_dump_all +def new_safe_dump_all(documents, stream=None, **kwds): + return old_dump_all(documents, stream, yaml.CSafeDumper, **kwds) + +def _set_up(): + yaml.BaseLoader = yaml.CBaseLoader + yaml.SafeLoader = yaml.CSafeLoader + yaml.Loader = yaml.CLoader + yaml.BaseDumper = yaml.CBaseDumper + yaml.SafeDumper = yaml.CSafeDumper + yaml.Dumper = yaml.CDumper + yaml.scan = new_scan + yaml.parse = new_parse + yaml.compose = new_compose + yaml.compose_all = new_compose_all + yaml.load = new_load + yaml.load_all = new_load_all + yaml.safe_load = new_safe_load + yaml.safe_load_all = new_safe_load_all + yaml.emit = new_emit + yaml.serialize = new_serialize + yaml.serialize_all = new_serialize_all + yaml.dump = new_dump + yaml.dump_all = new_dump_all + yaml.safe_dump = new_safe_dump + yaml.safe_dump_all = new_safe_dump_all + +def _tear_down(): + yaml.BaseLoader = yaml.PyBaseLoader + yaml.SafeLoader = yaml.PySafeLoader + yaml.Loader = yaml.PyLoader + yaml.BaseDumper = yaml.PyBaseDumper + yaml.SafeDumper = yaml.PySafeDumper + yaml.Dumper = yaml.PyDumper + yaml.scan = old_scan + yaml.parse = old_parse + yaml.compose = old_compose + yaml.compose_all = old_compose_all + yaml.load = old_load + yaml.load_all = old_load_all + yaml.safe_load = old_safe_load + yaml.safe_load_all = old_safe_load_all + yaml.emit = old_emit + yaml.serialize = old_serialize + yaml.serialize_all = old_serialize_all + yaml.dump = old_dump + yaml.dump_all = old_dump_all + yaml.safe_dump = old_safe_dump + yaml.safe_dump_all = old_safe_dump_all + +def test_c_version(verbose=False): + if verbose: + print(_yaml.get_version()) + print(_yaml.get_version_string()) + assert ("%s.%s.%s" % _yaml.get_version()) == _yaml.get_version_string(), \ + (_yaml.get_version(), _yaml.get_version_string()) + +def _compare_scanners(py_data, c_data, verbose): + py_tokens = list(yaml.scan(py_data, Loader=yaml.PyLoader)) + c_tokens = [] + try: + for token in yaml.scan(c_data, Loader=yaml.CLoader): + c_tokens.append(token) + assert len(py_tokens) == len(c_tokens), (len(py_tokens), len(c_tokens)) + for py_token, c_token in zip(py_tokens, c_tokens): + assert py_token.__class__ == c_token.__class__, (py_token, c_token) + if hasattr(py_token, 'value'): + assert py_token.value == c_token.value, (py_token, c_token) + if isinstance(py_token, yaml.StreamEndToken): + continue + py_start = (py_token.start_mark.index, py_token.start_mark.line, py_token.start_mark.column) + py_end = (py_token.end_mark.index, py_token.end_mark.line, py_token.end_mark.column) + c_start = (c_token.start_mark.index, c_token.start_mark.line, c_token.start_mark.column) + c_end = (c_token.end_mark.index, c_token.end_mark.line, c_token.end_mark.column) + assert py_start == c_start, (py_start, c_start) + assert py_end == c_end, (py_end, c_end) + finally: + if verbose: + print("PY_TOKENS:") + pprint.pprint(py_tokens) + print("C_TOKENS:") + pprint.pprint(c_tokens) + +def test_c_scanner(data_filename, canonical_filename, verbose=False): + _compare_scanners(open(data_filename, 'rb'), + open(data_filename, 'rb'), verbose) + _compare_scanners(open(data_filename, 'rb').read(), + open(data_filename, 'rb').read(), verbose) + _compare_scanners(open(canonical_filename, 'rb'), + open(canonical_filename, 'rb'), verbose) + _compare_scanners(open(canonical_filename, 'rb').read(), + open(canonical_filename, 'rb').read(), verbose) + +test_c_scanner.unittest = ['.data', '.canonical'] +test_c_scanner.skip = ['.skip-ext'] + +def _compare_parsers(py_data, c_data, verbose): + py_events = list(yaml.parse(py_data, Loader=yaml.PyLoader)) + c_events = [] + try: + for event in yaml.parse(c_data, Loader=yaml.CLoader): + c_events.append(event) + assert len(py_events) == len(c_events), (len(py_events), len(c_events)) + for py_event, c_event in zip(py_events, c_events): + for attribute in ['__class__', 'anchor', 'tag', 'implicit', + 'value', 'explicit', 'version', 'tags']: + py_value = getattr(py_event, attribute, None) + c_value = getattr(c_event, attribute, None) + assert py_value == c_value, (py_event, c_event, attribute) + finally: + if verbose: + print("PY_EVENTS:") + pprint.pprint(py_events) + print("C_EVENTS:") + pprint.pprint(c_events) + +def test_c_parser(data_filename, canonical_filename, verbose=False): + _compare_parsers(open(data_filename, 'rb'), + open(data_filename, 'rb'), verbose) + _compare_parsers(open(data_filename, 'rb').read(), + open(data_filename, 'rb').read(), verbose) + _compare_parsers(open(canonical_filename, 'rb'), + open(canonical_filename, 'rb'), verbose) + _compare_parsers(open(canonical_filename, 'rb').read(), + open(canonical_filename, 'rb').read(), verbose) + +test_c_parser.unittest = ['.data', '.canonical'] +test_c_parser.skip = ['.skip-ext'] + +def _compare_emitters(data, verbose): + events = list(yaml.parse(data, Loader=yaml.PyLoader)) + c_data = yaml.emit(events, Dumper=yaml.CDumper) + if verbose: + print(c_data) + py_events = list(yaml.parse(c_data, Loader=yaml.PyLoader)) + c_events = list(yaml.parse(c_data, Loader=yaml.CLoader)) + try: + assert len(events) == len(py_events), (len(events), len(py_events)) + assert len(events) == len(c_events), (len(events), len(c_events)) + for event, py_event, c_event in zip(events, py_events, c_events): + for attribute in ['__class__', 'anchor', 'tag', 'implicit', + 'value', 'explicit', 'version', 'tags']: + value = getattr(event, attribute, None) + py_value = getattr(py_event, attribute, None) + c_value = getattr(c_event, attribute, None) + if attribute == 'tag' and value in [None, '!'] \ + and py_value in [None, '!'] and c_value in [None, '!']: + continue + if attribute == 'explicit' and (py_value or c_value): + continue + assert value == py_value, (event, py_event, attribute) + assert value == c_value, (event, c_event, attribute) + finally: + if verbose: + print("EVENTS:") + pprint.pprint(events) + print("PY_EVENTS:") + pprint.pprint(py_events) + print("C_EVENTS:") + pprint.pprint(c_events) + +def test_c_emitter(data_filename, canonical_filename, verbose=False): + _compare_emitters(open(data_filename, 'rb').read(), verbose) + _compare_emitters(open(canonical_filename, 'rb').read(), verbose) + +test_c_emitter.unittest = ['.data', '.canonical'] +test_c_emitter.skip = ['.skip-ext'] + +def wrap_ext_function(function): + def wrapper(*args, **kwds): + _set_up() + try: + function(*args, **kwds) + finally: + _tear_down() + wrapper.__name__ = '%s_ext' % function.__name__ + wrapper.unittest = function.unittest + wrapper.skip = getattr(function, 'skip', [])+['.skip-ext'] + return wrapper + +def wrap_ext(collections): + functions = [] + if not isinstance(collections, list): + collections = [collections] + for collection in collections: + if not isinstance(collection, dict): + collection = vars(collection) + keys = collection.keys() + keys.sort() + for key in keys: + value = collection[key] + if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'): + functions.append(wrap_ext_function(value)) + for function in functions: + assert function.unittest_name not in globals() + globals()[function.unittest_name] = function + +import test_tokens, test_structure, test_errors, test_resolver, test_constructor, \ + test_emitter, test_representer, test_recursive +wrap_ext([test_tokens, test_structure, test_errors, test_resolver, test_constructor, + test_emitter, test_representer, test_recursive]) + +if __name__ == '__main__': + import test_appliance + test_appliance.run(globals()) + |