diff options
Diffstat (limited to 'tests/lib3/test_structure.py')
-rw-r--r-- | tests/lib3/test_structure.py | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/tests/lib3/test_structure.py b/tests/lib3/test_structure.py new file mode 100644 index 0000000..4d9c4ea --- /dev/null +++ b/tests/lib3/test_structure.py @@ -0,0 +1,187 @@ + +import yaml, canonical +import pprint + +def _convert_structure(loader): + if loader.check_event(yaml.ScalarEvent): + event = loader.get_event() + if event.tag or event.anchor or event.value: + return True + else: + return None + elif loader.check_event(yaml.SequenceStartEvent): + loader.get_event() + sequence = [] + while not loader.check_event(yaml.SequenceEndEvent): + sequence.append(_convert_structure(loader)) + loader.get_event() + return sequence + elif loader.check_event(yaml.MappingStartEvent): + loader.get_event() + mapping = [] + while not loader.check_event(yaml.MappingEndEvent): + key = _convert_structure(loader) + value = _convert_structure(loader) + mapping.append((key, value)) + loader.get_event() + return mapping + elif loader.check_event(yaml.AliasEvent): + loader.get_event() + return '*' + else: + loader.get_event() + return '?' + +def test_structure(data_filename, structure_filename, verbose=False): + nodes1 = [] + nodes2 = eval(open(structure_filename, 'r').read()) + try: + loader = yaml.Loader(open(data_filename, 'rb')) + while loader.check_event(): + if loader.check_event(yaml.StreamStartEvent, yaml.StreamEndEvent, + yaml.DocumentStartEvent, yaml.DocumentEndEvent): + loader.get_event() + continue + nodes1.append(_convert_structure(loader)) + if len(nodes1) == 1: + nodes1 = nodes1[0] + assert nodes1 == nodes2, (nodes1, nodes2) + finally: + if verbose: + print("NODES1:") + pprint.pprint(nodes1) + print("NODES2:") + pprint.pprint(nodes2) + +test_structure.unittest = ['.data', '.structure'] + +def _compare_events(events1, events2, full=False): + assert len(events1) == len(events2), (len(events1), len(events2)) + for event1, event2 in zip(events1, events2): + assert event1.__class__ == event2.__class__, (event1, event2) + if isinstance(event1, yaml.AliasEvent) and full: + assert event1.anchor == event2.anchor, (event1, event2) + if isinstance(event1, (yaml.ScalarEvent, yaml.CollectionStartEvent)): + if (event1.tag not in [None, '!'] and event2.tag not in [None, '!']) or full: + assert event1.tag == event2.tag, (event1, event2) + if isinstance(event1, yaml.ScalarEvent): + assert event1.value == event2.value, (event1, event2) + +def test_parser(data_filename, canonical_filename, verbose=False): + events1 = None + events2 = None + try: + events1 = list(yaml.parse(open(data_filename, 'rb'))) + events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb'))) + _compare_events(events1, events2) + finally: + if verbose: + print("EVENTS1:") + pprint.pprint(events1) + print("EVENTS2:") + pprint.pprint(events2) + +test_parser.unittest = ['.data', '.canonical'] + +def test_parser_on_canonical(canonical_filename, verbose=False): + events1 = None + events2 = None + try: + events1 = list(yaml.parse(open(canonical_filename, 'rb'))) + events2 = list(yaml.canonical_parse(open(canonical_filename, 'rb'))) + _compare_events(events1, events2, full=True) + finally: + if verbose: + print("EVENTS1:") + pprint.pprint(events1) + print("EVENTS2:") + pprint.pprint(events2) + +test_parser_on_canonical.unittest = ['.canonical'] + +def _compare_nodes(node1, node2): + assert node1.__class__ == node2.__class__, (node1, node2) + assert node1.tag == node2.tag, (node1, node2) + if isinstance(node1, yaml.ScalarNode): + assert node1.value == node2.value, (node1, node2) + else: + assert len(node1.value) == len(node2.value), (node1, node2) + for item1, item2 in zip(node1.value, node2.value): + if not isinstance(item1, tuple): + item1 = (item1,) + item2 = (item2,) + for subnode1, subnode2 in zip(item1, item2): + _compare_nodes(subnode1, subnode2) + +def test_composer(data_filename, canonical_filename, verbose=False): + nodes1 = None + nodes2 = None + try: + nodes1 = list(yaml.compose_all(open(data_filename, 'rb'))) + nodes2 = list(yaml.canonical_compose_all(open(canonical_filename, 'rb'))) + assert len(nodes1) == len(nodes2), (len(nodes1), len(nodes2)) + for node1, node2 in zip(nodes1, nodes2): + _compare_nodes(node1, node2) + finally: + if verbose: + print("NODES1:") + pprint.pprint(nodes1) + print("NODES2:") + pprint.pprint(nodes2) + +test_composer.unittest = ['.data', '.canonical'] + +def _make_loader(): + global MyLoader + + class MyLoader(yaml.Loader): + def construct_sequence(self, node): + return tuple(yaml.Loader.construct_sequence(self, node)) + def construct_mapping(self, node): + pairs = self.construct_pairs(node) + pairs.sort() + return pairs + def construct_undefined(self, node): + return self.construct_scalar(node) + + MyLoader.add_constructor('tag:yaml.org,2002:map', MyLoader.construct_mapping) + MyLoader.add_constructor(None, MyLoader.construct_undefined) + +def _make_canonical_loader(): + global MyCanonicalLoader + + class MyCanonicalLoader(yaml.CanonicalLoader): + def construct_sequence(self, node): + return tuple(yaml.CanonicalLoader.construct_sequence(self, node)) + def construct_mapping(self, node): + pairs = self.construct_pairs(node) + pairs.sort() + return pairs + def construct_undefined(self, node): + return self.construct_scalar(node) + + MyCanonicalLoader.add_constructor('tag:yaml.org,2002:map', MyCanonicalLoader.construct_mapping) + MyCanonicalLoader.add_constructor(None, MyCanonicalLoader.construct_undefined) + +def test_constructor(data_filename, canonical_filename, verbose=False): + _make_loader() + _make_canonical_loader() + native1 = None + native2 = None + try: + native1 = list(yaml.load_all(open(data_filename, 'rb'), Loader=MyLoader)) + native2 = list(yaml.load_all(open(canonical_filename, 'rb'), Loader=MyCanonicalLoader)) + assert native1 == native2, (native1, native2) + finally: + if verbose: + print("NATIVE1:") + pprint.pprint(native1) + print("NATIVE2:") + pprint.pprint(native2) + +test_constructor.unittest = ['.data', '.canonical'] + +if __name__ == '__main__': + import test_appliance + test_appliance.run(globals()) + |