summaryrefslogtreecommitdiff
path: root/tests/test_appliance.py
diff options
context:
space:
mode:
authorxi <xi@18f92427-320e-0410-9341-c67f048884a3>2008-12-28 20:16:50 +0000
committerxi <xi@18f92427-320e-0410-9341-c67f048884a3>2008-12-28 20:16:50 +0000
commit4691639907112004c7242215370d8b42ffec3e5b (patch)
treecf3e23519728f94b6e04d2132a61577ef9deb0a0 /tests/test_appliance.py
parent398bababf57286f4366649052c75075083d2060e (diff)
downloadpyyaml-4691639907112004c7242215370d8b42ffec3e5b.tar.gz
Refactored the test suite; updated include and library paths in setup.cfg.
git-svn-id: http://svn.pyyaml.org/pyyaml/trunk@322 18f92427-320e-0410-9341-c67f048884a3
Diffstat (limited to 'tests/test_appliance.py')
-rw-r--r--tests/test_appliance.py491
1 files changed, 139 insertions, 352 deletions
diff --git a/tests/test_appliance.py b/tests/test_appliance.py
index 61cc03f..09c46c9 100644
--- a/tests/test_appliance.py
+++ b/tests/test_appliance.py
@@ -1,358 +1,145 @@
-import unittest, os
-
-from yaml import *
-from yaml.composer import *
-from yaml.constructor import *
-from yaml.resolver import *
-
-class TestAppliance(unittest.TestCase):
-
- DATA = 'tests/data'
- SKIP_EXT = '.skip'
-
- all_tests = {}
- for filename in os.listdir(DATA):
- if os.path.isfile(os.path.join(DATA, filename)):
- root, ext = os.path.splitext(filename)
- all_tests.setdefault(root, []).append(ext)
-
- def add_tests(cls, method_name, *extensions):
- for test in cls.all_tests:
- available_extensions = cls.all_tests[test]
- if cls.SKIP_EXT in available_extensions:
- continue
- for ext in extensions:
- if ext not in available_extensions:
- break
- else:
- filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
- def test_method(self, test=test, filenames=filenames):
- getattr(self, '_'+method_name)(test, *filenames)
- test = test.replace('-', '_').replace('.', '_')
- try:
- test_method.__name__ = '%s_%s' % (method_name, test)
- except TypeError:
- import new
- test_method = new.function(test_method.func_code, test_method.func_globals,
- '%s_%s' % (method_name, test), test_method.func_defaults,
- test_method.func_closure)
- setattr(cls, test_method.__name__, test_method)
- add_tests = classmethod(add_tests)
-
-class Error(Exception):
- pass
-
-class CanonicalScanner:
-
- def __init__(self, data):
- self.data = unicode(data, 'utf-8')+u'\0'
- self.index = 0
- self.scan()
-
- def check_token(self, *choices):
- if self.tokens:
- if not choices:
- return True
- for choice in choices:
- if isinstance(self.tokens[0], choice):
- return True
- return False
-
- def peek_token(self):
- if self.tokens:
- return self.tokens[0]
-
- def get_token(self, choice=None):
- token = self.tokens.pop(0)
- if choice and not isinstance(token, choice):
- raise Error("unexpected token "+repr(token))
- return token
-
- def get_token_value(self):
- token = self.get_token()
- return token.value
-
- def scan(self):
- self.tokens = []
- self.tokens.append(StreamStartToken(None, None))
- while True:
- self.find_token()
- ch = self.data[self.index]
- if ch == u'\0':
- self.tokens.append(StreamEndToken(None, None))
- break
- elif ch == u'%':
- self.tokens.append(self.scan_directive())
- elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
- self.index += 3
- self.tokens.append(DocumentStartToken(None, None))
- elif ch == u'[':
- self.index += 1
- self.tokens.append(FlowSequenceStartToken(None, None))
- elif ch == u'{':
- self.index += 1
- self.tokens.append(FlowMappingStartToken(None, None))
- elif ch == u']':
- self.index += 1
- self.tokens.append(FlowSequenceEndToken(None, None))
- elif ch == u'}':
- self.index += 1
- self.tokens.append(FlowMappingEndToken(None, None))
- elif ch == u'?':
- self.index += 1
- self.tokens.append(KeyToken(None, None))
- elif ch == u':':
- self.index += 1
- self.tokens.append(ValueToken(None, None))
- elif ch == u',':
- self.index += 1
- self.tokens.append(FlowEntryToken(None, None))
- elif ch == u'*' or ch == u'&':
- self.tokens.append(self.scan_alias())
- elif ch == u'!':
- self.tokens.append(self.scan_tag())
- elif ch == u'"':
- self.tokens.append(self.scan_scalar())
- else:
- raise Error("invalid token")
-
- DIRECTIVE = u'%YAML 1.1'
-
- def scan_directive(self):
- if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
- self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
- self.index += len(self.DIRECTIVE)
- return DirectiveToken('YAML', (1, 1), None, None)
-
- def scan_alias(self):
- if self.data[self.index] == u'*':
- TokenClass = AliasToken
+import sys, os, os.path, types, traceback, pprint
+
+DATA = 'tests/data'
+
+def find_test_functions(collections):
+ if not isinstance(collections, list):
+ collections = [collections]
+ functions = []
+ for collection in collections:
+ if not isinstance(collection, dict):
+ collection = vars(collection)
+ keys = collection.keys()
+ keys.sort()
+ for key in keys:
+ value = collection[key]
+ if isinstance(value, types.FunctionType) and hasattr(value, 'unittest'):
+ functions.append(value)
+ return functions
+
+def find_test_filenames(directory):
+ filenames = {}
+ for filename in os.listdir(directory):
+ if os.path.isfile(os.path.join(directory, filename)):
+ base, ext = os.path.splitext(filename)
+ filenames.setdefault(base, []).append(ext)
+ filenames = filenames.items()
+ filenames.sort()
+ return filenames
+
+def parse_arguments(args):
+ if args is None:
+ args = sys.argv[1:]
+ verbose = False
+ if '-v' in args:
+ verbose = True
+ args.remove('-v')
+ if '--verbose' in args:
+ verbose = True
+ if 'YAML_TEST_VERBOSE' in os.environ:
+ verbose = True
+ include_functions = []
+ if args:
+ include_functions.append(args.pop(0))
+ if 'YAML_TEST_FUNCTIONS' in os.environ:
+ include_functions.extend(os.environ['YAML_TEST_FUNCTIONS'].split())
+ include_filenames = []
+ include_filenames.extend(args)
+ if 'YAML_TEST_FILENAMES' in os.environ:
+ include_filenames.extend(os.environ['YAML_TEST_FILENAMES'].split())
+ return include_functions, include_filenames, verbose
+
+def execute(function, filenames, verbose):
+ if verbose:
+ sys.stdout.write('='*75+'\n')
+ sys.stdout.write('%s(%s)...\n' % (function.func_name, ', '.join(filenames)))
+ try:
+ function(verbose=verbose, *filenames)
+ except Exception, exc:
+ info = sys.exc_info()
+ if isinstance(exc, AssertionError):
+ kind = 'FAILURE'
else:
- TokenClass = AnchorToken
- self.index += 1
- start = self.index
- while self.data[self.index] not in u', \n\0':
- self.index += 1
- value = self.data[start:self.index]
- return TokenClass(value, None, None)
-
- def scan_tag(self):
- self.index += 1
- start = self.index
- while self.data[self.index] not in u' \n\0':
- self.index += 1
- value = self.data[start:self.index]
- if value[0] == u'!':
- value = 'tag:yaml.org,2002:'+value[1:]
- elif value[0] == u'<' and value[-1] == u'>':
- value = value[1:-1]
+ kind = 'ERROR'
+ if verbose:
+ traceback.print_exc(limit=1, file=sys.stdout)
else:
- value = u'!'+value
- return TagToken(value, None, None)
-
- QUOTE_CODES = {
- 'x': 2,
- 'u': 4,
- 'U': 8,
- }
-
- QUOTE_REPLACES = {
- u'\\': u'\\',
- u'\"': u'\"',
- u' ': u' ',
- u'a': u'\x07',
- u'b': u'\x08',
- u'e': u'\x1B',
- u'f': u'\x0C',
- u'n': u'\x0A',
- u'r': u'\x0D',
- u't': u'\x09',
- u'v': u'\x0B',
- u'N': u'\u0085',
- u'L': u'\u2028',
- u'P': u'\u2029',
- u'_': u'_',
- u'0': u'\x00',
-
- }
-
- def scan_scalar(self):
- self.index += 1
- chunks = []
- start = self.index
- ignore_spaces = False
- while self.data[self.index] != u'"':
- if self.data[self.index] == u'\\':
- ignore_spaces = False
- chunks.append(self.data[start:self.index])
- self.index += 1
- ch = self.data[self.index]
- self.index += 1
- if ch == u'\n':
- ignore_spaces = True
- elif ch in self.QUOTE_CODES:
- length = self.QUOTE_CODES[ch]
- code = int(self.data[self.index:self.index+length], 16)
- chunks.append(unichr(code))
- self.index += length
+ sys.stdout.write(kind[0])
+ sys.stdout.flush()
+ else:
+ kind = 'SUCCESS'
+ info = None
+ if not verbose:
+ sys.stdout.write('.')
+ sys.stdout.flush()
+ return (function, filenames, kind, info)
+
+def display(results, verbose):
+ if results and not verbose:
+ sys.stdout.write('\n')
+ total = len(results)
+ failures = 0
+ errors = 0
+ for function, filenames, kind, info in results:
+ if kind == 'SUCCESS':
+ continue
+ if kind == 'FAILURE':
+ failures += 1
+ if kind == 'ERROR':
+ errors += 1
+ sys.stdout.write('='*75+'\n')
+ sys.stdout.write('%s(%s): %s\n' % (function.func_name, ', '.join(filenames), kind))
+ if kind == 'ERROR':
+ traceback.print_exception(file=sys.stdout, *info)
+ else:
+ sys.stdout.write('Traceback (most recent call last):\n')
+ traceback.print_tb(info[2], file=sys.stdout)
+ sys.stdout.write('%s: see below\n' % info[0].__name__)
+ sys.stdout.write('~'*75+'\n')
+ for arg in info[1].args:
+ pprint.pprint(arg, stream=sys.stdout, indent=2)
+ for filename in filenames:
+ sys.stdout.write('-'*75+'\n')
+ sys.stdout.write('%s:\n' % filename)
+ data = open(filename, 'rb').read()
+ sys.stdout.write(data)
+ if data and data[-1] != '\n':
+ sys.stdout.write('\n')
+ sys.stdout.write('='*75+'\n')
+ sys.stdout.write('TESTS: %s\n' % total)
+ if failures:
+ sys.stdout.write('FAILURES: %s\n' % failures)
+ if errors:
+ sys.stdout.write('ERRORS: %s\n' % errors)
+
+def run(collections, args=None):
+ test_functions = find_test_functions(collections)
+ test_filenames = find_test_filenames(DATA)
+ include_functions, include_filenames, verbose = parse_arguments(args)
+ results = []
+ for function in test_functions:
+ if include_functions and function.func_name not in include_functions:
+ continue
+ if function.unittest:
+ for base, exts in test_filenames:
+ if include_filenames and base not in include_filenames:
+ continue
+ filenames = []
+ for ext in function.unittest:
+ if ext not in exts:
+ break
+ filenames.append(os.path.join(DATA, base+ext))
else:
- chunks.append(self.QUOTE_REPLACES[ch])
- start = self.index
- elif self.data[self.index] == u'\n':
- chunks.append(self.data[start:self.index])
- chunks.append(u' ')
- self.index += 1
- start = self.index
- ignore_spaces = True
- elif ignore_spaces and self.data[self.index] == u' ':
- self.index += 1
- start = self.index
- else:
- ignore_spaces = False
- self.index += 1
- chunks.append(self.data[start:self.index])
- self.index += 1
- return ScalarToken(u''.join(chunks), False, None, None)
-
- def find_token(self):
- found = False
- while not found:
- while self.data[self.index] in u' \t':
- self.index += 1
- if self.data[self.index] == u'#':
- while self.data[self.index] != u'\n':
- self.index += 1
- if self.data[self.index] == u'\n':
- self.index += 1
- else:
- found = True
-
-class CanonicalParser:
-
- def __init__(self):
- self.events = []
- self.parse()
-
- # stream: STREAM-START document* STREAM-END
- def parse_stream(self):
- self.get_token(StreamStartToken)
- self.events.append(StreamStartEvent(None, None))
- while not self.check_token(StreamEndToken):
- if self.check_token(DirectiveToken, DocumentStartToken):
- self.parse_document()
- else:
- raise Error("document is expected, got "+repr(self.tokens[self.index]))
- self.get_token(StreamEndToken)
- self.events.append(StreamEndEvent(None, None))
-
- # document: DIRECTIVE? DOCUMENT-START node
- def parse_document(self):
- node = None
- if self.check_token(DirectiveToken):
- self.get_token(DirectiveToken)
- self.get_token(DocumentStartToken)
- self.events.append(DocumentStartEvent(None, None))
- self.parse_node()
- self.events.append(DocumentEndEvent(None, None))
-
- # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
- def parse_node(self):
- if self.check_token(AliasToken):
- self.events.append(AliasEvent(self.get_token_value(), None, None))
+ skip_exts = getattr(function, 'skip', [])
+ for skip_ext in skip_exts:
+ if skip_ext in exts:
+ break
+ else:
+ result = execute(function, filenames, verbose)
+ results.append(result)
else:
- anchor = None
- if self.check_token(AnchorToken):
- anchor = self.get_token_value()
- tag = None
- if self.check_token(TagToken):
- tag = self.get_token_value()
- if self.check_token(ScalarToken):
- self.events.append(ScalarEvent(anchor, tag, (False, False), self.get_token_value(), None, None))
- elif self.check_token(FlowSequenceStartToken):
- self.events.append(SequenceStartEvent(anchor, tag, None, None))
- self.parse_sequence()
- elif self.check_token(FlowMappingStartToken):
- self.events.append(MappingStartEvent(anchor, tag, None, None))
- self.parse_mapping()
- else:
- raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
-
- # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
- def parse_sequence(self):
- self.get_token(FlowSequenceStartToken)
- if not self.check_token(FlowSequenceEndToken):
- self.parse_node()
- while not self.check_token(FlowSequenceEndToken):
- self.get_token(FlowEntryToken)
- if not self.check_token(FlowSequenceEndToken):
- self.parse_node()
- self.get_token(FlowSequenceEndToken)
- self.events.append(SequenceEndEvent(None, None))
-
- # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
- def parse_mapping(self):
- self.get_token(FlowMappingStartToken)
- if not self.check_token(FlowMappingEndToken):
- self.parse_map_entry()
- while not self.check_token(FlowMappingEndToken):
- self.get_token(FlowEntryToken)
- if not self.check_token(FlowMappingEndToken):
- self.parse_map_entry()
- self.get_token(FlowMappingEndToken)
- self.events.append(MappingEndEvent(None, None))
-
- # map_entry: KEY node VALUE node
- def parse_map_entry(self):
- self.get_token(KeyToken)
- self.parse_node()
- self.get_token(ValueToken)
- self.parse_node()
-
- def parse(self):
- self.parse_stream()
-
- def get_event(self):
- return self.events.pop(0)
-
- def check_event(self, *choices):
- if self.events:
- if not choices:
- return True
- for choice in choices:
- if isinstance(self.events[0], choice):
- return True
- return False
-
- def peek_event(self):
- return self.events[0]
-
-class CanonicalLoader(CanonicalScanner, CanonicalParser, Composer, Constructor, Resolver):
-
- def __init__(self, stream):
- if hasattr(stream, 'read'):
- stream = stream.read()
- CanonicalScanner.__init__(self, stream)
- CanonicalParser.__init__(self)
- Composer.__init__(self)
- Constructor.__init__(self)
- Resolver.__init__(self)
-
-def canonical_scan(stream):
- return scan(stream, Loader=CanonicalLoader)
-
-def canonical_parse(stream):
- return parse(stream, Loader=CanonicalLoader)
-
-def canonical_compose(stream):
- return compose(stream, Loader=CanonicalLoader)
-
-def canonical_compose_all(stream):
- return compose_all(stream, Loader=CanonicalLoader)
-
-def canonical_load(stream):
- return load(stream, Loader=CanonicalLoader)
-
-def canonical_load_all(stream):
- return load_all(stream, Loader=CanonicalLoader)
+ result = execute(function, [], verbose)
+ results.append(result)
+ display(results, verbose=verbose)