diff options
Diffstat (limited to 'tempest_lib/cmd')
-rw-r--r-- | tempest_lib/cmd/__init__.py | 0 | ||||
-rwxr-xr-x | tempest_lib/cmd/check_uuid.py | 358 | ||||
-rwxr-xr-x | tempest_lib/cmd/skip_tracker.py | 162 |
3 files changed, 0 insertions, 520 deletions
diff --git a/tempest_lib/cmd/__init__.py b/tempest_lib/cmd/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/tempest_lib/cmd/__init__.py +++ /dev/null diff --git a/tempest_lib/cmd/check_uuid.py b/tempest_lib/cmd/check_uuid.py deleted file mode 100755 index 3adeecd..0000000 --- a/tempest_lib/cmd/check_uuid.py +++ /dev/null @@ -1,358 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2014 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import argparse -import ast -import importlib -import inspect -import os -import sys -import unittest -import uuid - -import six.moves.urllib.parse as urlparse - -DECORATOR_MODULE = 'test' -DECORATOR_NAME = 'idempotent_id' -DECORATOR_IMPORT = 'tempest.%s' % DECORATOR_MODULE -IMPORT_LINE = 'from tempest import %s' % DECORATOR_MODULE -DECORATOR_TEMPLATE = "@%s.%s('%%s')" % (DECORATOR_MODULE, - DECORATOR_NAME) -UNIT_TESTS_EXCLUDE = 'tempest.tests' - - -class SourcePatcher(object): - - """"Lazy patcher for python source files""" - - def __init__(self): - self.source_files = None - self.patches = None - self.clear() - - def clear(self): - """Clear inner state""" - self.source_files = {} - self.patches = {} - - @staticmethod - def _quote(s): - return urlparse.quote(s) - - @staticmethod - def _unquote(s): - return urlparse.unquote(s) - - def add_patch(self, filename, patch, line_no): - """Add lazy patch""" - if filename not in self.source_files: - with open(filename) as f: - self.source_files[filename] = self._quote(f.read()) - patch_id = str(uuid.uuid4()) - if not patch.endswith('\n'): - patch += '\n' - self.patches[patch_id] = self._quote(patch) - lines = self.source_files[filename].split(self._quote('\n')) - lines[line_no - 1] = ''.join(('{%s:s}' % patch_id, lines[line_no - 1])) - self.source_files[filename] = self._quote('\n').join(lines) - - def _save_changes(self, filename, source): - print('%s fixed' % filename) - with open(filename, 'w') as f: - f.write(source) - - def apply_patches(self): - """Apply all patches""" - for filename in self.source_files: - patched_source = self._unquote( - self.source_files[filename].format(**self.patches) - ) - self._save_changes(filename, patched_source) - self.clear() - - -class TestChecker(object): - - def __init__(self, package): - self.package = package - self.base_path = os.path.abspath(os.path.dirname(package.__file__)) - - def _path_to_package(self, path): - relative_path = path[len(self.base_path) + 1:] - if relative_path: - return '.'.join((self.package.__name__,) + - tuple(relative_path.split('/'))) - else: - return self.package.__name__ - - def _modules_search(self): - """Recursive search for python modules in base package""" - modules = [] - for root, dirs, files in os.walk(self.base_path): - if not os.path.exists(os.path.join(root, '__init__.py')): - continue - root_package = self._path_to_package(root) - for item in files: - if item.endswith('.py'): - module_name = '.'.join((root_package, - os.path.splitext(item)[0])) - if not module_name.startswith(UNIT_TESTS_EXCLUDE): - modules.append(module_name) - return modules - - @staticmethod - def _get_idempotent_id(test_node): - """Return key-value dict with all metadata from @test.idempotent_id""" - idempotent_id = None - for decorator in test_node.decorator_list: - if (hasattr(decorator, 'func') and - hasattr(decorator.func, 'attr') and - decorator.func.attr == DECORATOR_NAME and - hasattr(decorator.func, 'value') and - decorator.func.value.id == DECORATOR_MODULE): - for arg in decorator.args: - idempotent_id = ast.literal_eval(arg) - return idempotent_id - - @staticmethod - def _is_decorator(line): - return line.strip().startswith('@') - - @staticmethod - def _is_def(line): - return line.strip().startswith('def ') - - def _add_uuid_to_test(self, patcher, test_node, source_path): - with open(source_path) as src: - src_lines = src.read().split('\n') - lineno = test_node.lineno - insert_position = lineno - while True: - if (self._is_def(src_lines[lineno - 1]) or - (self._is_decorator(src_lines[lineno - 1]) and - (DECORATOR_TEMPLATE.split('(')[0] <= - src_lines[lineno - 1].strip().split('(')[0]))): - insert_position = lineno - break - lineno += 1 - patcher.add_patch( - source_path, - ' ' * test_node.col_offset + DECORATOR_TEMPLATE % uuid.uuid4(), - insert_position - ) - - @staticmethod - def _is_test_case(module, node): - if (node.__class__ is ast.ClassDef and - hasattr(module, node.name) and - inspect.isclass(getattr(module, node.name))): - return issubclass(getattr(module, node.name), unittest.TestCase) - - @staticmethod - def _is_test_method(node): - return (node.__class__ is ast.FunctionDef - and node.name.startswith('test_')) - - @staticmethod - def _next_node(body, node): - if body.index(node) < len(body): - return body[body.index(node) + 1] - - @staticmethod - def _import_name(node): - if type(node) == ast.Import: - return node.names[0].name - elif type(node) == ast.ImportFrom: - return '%s.%s' % (node.module, node.names[0].name) - - def _add_import_for_test_uuid(self, patcher, src_parsed, source_path): - with open(source_path) as f: - src_lines = f.read().split('\n') - line_no = 0 - tempest_imports = [node for node in src_parsed.body - if self._import_name(node) and - 'tempest.' in self._import_name(node)] - if not tempest_imports: - import_snippet = '\n'.join(('', IMPORT_LINE, '')) - else: - for node in tempest_imports: - if self._import_name(node) < DECORATOR_IMPORT: - continue - else: - line_no = node.lineno - import_snippet = IMPORT_LINE - break - else: - line_no = tempest_imports[-1].lineno - while True: - if (not src_lines[line_no - 1] or - getattr(self._next_node(src_parsed.body, - tempest_imports[-1]), - 'lineno') == line_no or - line_no == len(src_lines)): - break - line_no += 1 - import_snippet = '\n'.join((IMPORT_LINE, '')) - patcher.add_patch(source_path, import_snippet, line_no) - - def get_tests(self): - """Get test methods with sources from base package with metadata""" - tests = {} - for module_name in self._modules_search(): - tests[module_name] = {} - module = importlib.import_module(module_name) - source_path = '.'.join( - (os.path.splitext(module.__file__)[0], 'py') - ) - with open(source_path, 'r') as f: - source = f.read() - tests[module_name]['source_path'] = source_path - tests[module_name]['tests'] = {} - source_parsed = ast.parse(source) - tests[module_name]['ast'] = source_parsed - tests[module_name]['import_valid'] = ( - hasattr(module, DECORATOR_MODULE) and - inspect.ismodule(getattr(module, DECORATOR_MODULE)) - ) - test_cases = (node for node in source_parsed.body - if self._is_test_case(module, node)) - for node in test_cases: - for subnode in filter(self._is_test_method, node.body): - test_name = '%s.%s' % (node.name, subnode.name) - tests[module_name]['tests'][test_name] = subnode - return tests - - @staticmethod - def _filter_tests(function, tests): - """Filter tests with condition 'function(test_node) == True'""" - result = {} - for module_name in tests: - for test_name in tests[module_name]['tests']: - if function(module_name, test_name, tests): - if module_name not in result: - result[module_name] = { - 'ast': tests[module_name]['ast'], - 'source_path': tests[module_name]['source_path'], - 'import_valid': tests[module_name]['import_valid'], - 'tests': {} - } - result[module_name]['tests'][test_name] = \ - tests[module_name]['tests'][test_name] - return result - - def find_untagged(self, tests): - """Filter all tests without uuid in metadata""" - def check_uuid_in_meta(module_name, test_name, tests): - idempotent_id = self._get_idempotent_id( - tests[module_name]['tests'][test_name]) - return not idempotent_id - return self._filter_tests(check_uuid_in_meta, tests) - - def report_collisions(self, tests): - """Reports collisions if there are any - - Returns true if collisions exist. - """ - uuids = {} - - def report(module_name, test_name, tests): - test_uuid = self._get_idempotent_id( - tests[module_name]['tests'][test_name]) - if not test_uuid: - return - if test_uuid in uuids: - error_str = "%s:%s\n uuid %s collision: %s<->%s\n%s:%s" % ( - tests[module_name]['source_path'], - tests[module_name]['tests'][test_name].lineno, - test_uuid, - test_name, - uuids[test_uuid]['test_name'], - uuids[test_uuid]['source_path'], - uuids[test_uuid]['test_node'].lineno, - ) - print(error_str) - print("cannot automatically resolve the collision, please " - "manually remove the duplicate value on the new test.") - return True - else: - uuids[test_uuid] = { - 'module': module_name, - 'test_name': test_name, - 'test_node': tests[module_name]['tests'][test_name], - 'source_path': tests[module_name]['source_path'] - } - return bool(self._filter_tests(report, tests)) - - def report_untagged(self, tests): - """Reports untagged tests if there are any - - Returns true if untagged tests exist. - """ - def report(module_name, test_name, tests): - error_str = "%s:%s\nmissing @test.idempotent_id('...')\n%s\n" % ( - tests[module_name]['source_path'], - tests[module_name]['tests'][test_name].lineno, - test_name - ) - print(error_str) - return True - return bool(self._filter_tests(report, tests)) - - def fix_tests(self, tests): - """Add uuids to all specified in tests and fix it in source files""" - patcher = SourcePatcher() - for module_name in tests: - add_import_once = True - for test_name in tests[module_name]['tests']: - if not tests[module_name]['import_valid'] and add_import_once: - self._add_import_for_test_uuid( - patcher, - tests[module_name]['ast'], - tests[module_name]['source_path'] - ) - add_import_once = False - self._add_uuid_to_test( - patcher, tests[module_name]['tests'][test_name], - tests[module_name]['source_path']) - patcher.apply_patches() - - -def run(): - parser = argparse.ArgumentParser() - parser.add_argument('--package', action='store', dest='package', - default='tempest', type=str, - help='Package with tests') - parser.add_argument('--fix', action='store_true', dest='fix_tests', - help='Attempt to fix tests without UUIDs') - args = parser.parse_args() - sys.path.append(os.path.join(os.path.dirname(__file__), '..')) - pkg = importlib.import_module(args.package) - checker = TestChecker(pkg) - errors = False - tests = checker.get_tests() - untagged = checker.find_untagged(tests) - errors = checker.report_collisions(tests) or errors - if args.fix_tests and untagged: - checker.fix_tests(untagged) - else: - errors = checker.report_untagged(untagged) or errors - if errors: - sys.exit("@test.idempotent_id existence and uniqueness checks failed\n" - "Run 'tox -v -euuidgen' to automatically fix tests with\n" - "missing @test.idempotent_id decorators.") - -if __name__ == '__main__': - run() diff --git a/tempest_lib/cmd/skip_tracker.py b/tempest_lib/cmd/skip_tracker.py deleted file mode 100755 index b5c9b95..0000000 --- a/tempest_lib/cmd/skip_tracker.py +++ /dev/null @@ -1,162 +0,0 @@ -#!/usr/bin/env python2 - -# Copyright 2012 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Track test skips via launchpadlib API and raise alerts if a bug -is fixed but a skip is still in the Tempest test code -""" - -import argparse -import logging -import os -import re - -try: - from launchpadlib import launchpad -except ImportError: - launchpad = None - -LPCACHEDIR = os.path.expanduser('~/.launchpadlib/cache') - - -def parse_args(): - parser = argparse.ArgumentParser() - parser.add_argument('test_path', help='Path of test dir') - return parser.parse_args() - - -def info(msg, *args, **kwargs): - logging.info(msg, *args, **kwargs) - - -def debug(msg, *args, **kwargs): - logging.debug(msg, *args, **kwargs) - - -def find_skips(start): - """Find the entire list of skiped tests. - - Returns a list of tuples (method, bug) that represent - test methods that have been decorated to skip because of - a particular bug. - """ - results = {} - debug("Searching in %s", start) - for root, _dirs, files in os.walk(start): - for name in files: - if name.startswith('test_') and name.endswith('py'): - path = os.path.join(root, name) - debug("Searching in %s", path) - temp_result = find_skips_in_file(path) - for method_name, bug_no in temp_result: - if results.get(bug_no): - result_dict = results.get(bug_no) - if result_dict.get(name): - result_dict[name].append(method_name) - else: - result_dict[name] = [method_name] - results[bug_no] = result_dict - else: - results[bug_no] = {name: [method_name]} - return results - - -def find_skips_in_file(path): - """Return the skip tuples in a test file.""" - BUG_RE = re.compile(r'\s*@.*skip_because\(bug=[\'"](\d+)[\'"]') - DEF_RE = re.compile(r'\s*def (\w+)\(') - bug_found = False - results = [] - lines = open(path, 'rb').readlines() - for x, line in enumerate(lines): - if not bug_found: - res = BUG_RE.match(line) - if res: - bug_no = int(res.group(1)) - debug("Found bug skip %s on line %d", bug_no, x + 1) - bug_found = True - else: - res = DEF_RE.match(line) - if res: - method = res.group(1) - debug("Found test method %s skips for bug %d", method, bug_no) - results.append((method, bug_no)) - bug_found = False - return results - - -def get_results(result_dict): - results = [] - for bug_no in result_dict.keys(): - for method in result_dict[bug_no]: - results.append((method, bug_no)) - return results - - -def main(): - logging.basicConfig(format='%(levelname)s: %(message)s', - level=logging.INFO) - parser = parse_args() - results = find_skips(parser.test_path) - unique_bugs = sorted(set([bug for (method, bug) in get_results(results)])) - unskips = [] - duplicates = [] - info("Total bug skips found: %d", len(results)) - info("Total unique bugs causing skips: %d", len(unique_bugs)) - if launchpad is not None: - lp = launchpad.Launchpad.login_anonymously('grabbing bugs', - 'production', - LPCACHEDIR) - else: - print("To check the bug status launchpadlib should be installed") - exit(1) - - for bug_no in unique_bugs: - bug = lp.bugs[bug_no] - duplicate = bug.duplicate_of_link - if duplicate is not None: - dup_id = duplicate.split('/')[-1] - duplicates.append((bug_no, dup_id)) - for task in bug.bug_tasks: - info("Bug #%7s (%12s - %12s)", bug_no, - task.importance, task.status) - if task.status in ('Fix Released', 'Fix Committed'): - unskips.append(bug_no) - - for bug_id, dup_id in duplicates: - if bug_id not in unskips: - dup_bug = lp.bugs[dup_id] - for task in dup_bug.bug_tasks: - info("Bug #%7s is a duplicate of Bug#%7s (%12s - %12s)", - bug_id, dup_id, task.importance, task.status) - if task.status in ('Fix Released', 'Fix Committed'): - unskips.append(bug_id) - - unskips = sorted(set(unskips)) - if unskips: - print("The following bugs have been fixed and the corresponding skips") - print("should be removed from the test cases:") - print() - for bug in unskips: - message = " %7s in " % bug - locations = ["%s" % x for x in results[bug].keys()] - message += " and ".join(locations) - print(message) - - -if __name__ == '__main__': - main() |