summaryrefslogtreecommitdiff
path: root/test/runner/lib/target.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/runner/lib/target.py')
-rw-r--r--test/runner/lib/target.py530
1 files changed, 530 insertions, 0 deletions
diff --git a/test/runner/lib/target.py b/test/runner/lib/target.py
new file mode 100644
index 0000000000..1ea925e5cc
--- /dev/null
+++ b/test/runner/lib/target.py
@@ -0,0 +1,530 @@
+"""Test target identification, iteration and inclusion/exclusion."""
+
+from __future__ import absolute_import, print_function
+
+import os
+import re
+import errno
+import itertools
+import abc
+
+from lib.util import ApplicationError
+
+MODULE_EXTENSIONS = '.py', '.ps1'
+
+
+def find_target_completion(target_func, prefix):
+ """
+ :type target_func: () -> collections.Iterable[CompletionTarget]
+ :type prefix: unicode
+ :rtype: list[str]
+ """
+ try:
+ targets = target_func()
+ prefix = prefix.encode()
+ short = os.environ.get('COMP_TYPE') == '63' # double tab completion from bash
+ matches = walk_completion_targets(targets, prefix, short)
+ return matches
+ except Exception as ex: # pylint: disable=locally-disabled, broad-except
+ return [str(ex)]
+
+
+def walk_completion_targets(targets, prefix, short=False):
+ """
+ :type targets: collections.Iterable[CompletionTarget]
+ :type prefix: str
+ :type short: bool
+ :rtype: tuple[str]
+ """
+ aliases = set(alias for target in targets for alias in target.aliases)
+
+ if prefix.endswith('/') and prefix in aliases:
+ aliases.remove(prefix)
+
+ matches = [alias for alias in aliases if alias.startswith(prefix) and '/' not in alias[len(prefix):-1]]
+
+ if short:
+ offset = len(os.path.dirname(prefix))
+ if offset:
+ offset += 1
+ relative_matches = [match[offset:] for match in matches if len(match) > offset]
+ if len(relative_matches) > 1:
+ matches = relative_matches
+
+ return tuple(sorted(matches))
+
+
+def walk_internal_targets(targets, includes=None, excludes=None, requires=None):
+ """
+ :type targets: collections.Iterable[T <= CompletionTarget]
+ :type includes: list[str]
+ :type excludes: list[str]
+ :type requires: list[str]
+ :rtype: tuple[T <= CompletionTarget]
+ """
+ targets = tuple(targets)
+
+ include_targets = sorted(filter_targets(targets, includes, errors=True, directories=False), key=lambda t: t.name)
+
+ if requires:
+ require_targets = set(filter_targets(targets, requires, errors=True, directories=False))
+ include_targets = [target for target in include_targets if target in require_targets]
+
+ if excludes:
+ list(filter_targets(targets, excludes, errors=True, include=False, directories=False))
+
+ internal_targets = set(filter_targets(include_targets, excludes, errors=False, include=False, directories=False))
+ return tuple(sorted(internal_targets, key=lambda t: t.name))
+
+
+def walk_external_targets(targets, includes=None, excludes=None, requires=None):
+ """
+ :type targets: collections.Iterable[CompletionTarget]
+ :type includes: list[str]
+ :type excludes: list[str]
+ :type requires: list[str]
+ :rtype: tuple[CompletionTarget], tuple[CompletionTarget]
+ """
+ targets = tuple(targets)
+
+ if requires:
+ include_targets = list(filter_targets(targets, includes, errors=True, directories=False))
+ require_targets = set(filter_targets(targets, requires, errors=True, directories=False))
+ includes = [target.name for target in include_targets if target in require_targets]
+
+ if includes:
+ include_targets = sorted(filter_targets(targets, includes, errors=True), key=lambda t: t.name)
+ else:
+ include_targets = []
+ else:
+ include_targets = sorted(filter_targets(targets, includes, errors=True), key=lambda t: t.name)
+
+ if excludes:
+ exclude_targets = sorted(filter_targets(targets, excludes, errors=True), key=lambda t: t.name)
+ else:
+ exclude_targets = []
+
+ previous = None
+ include = []
+ for target in include_targets:
+ if isinstance(previous, DirectoryTarget) and isinstance(target, DirectoryTarget) \
+ and previous.name == target.name:
+ previous.modules = tuple(set(previous.modules) | set(target.modules))
+ else:
+ include.append(target)
+ previous = target
+
+ previous = None
+ exclude = []
+ for target in exclude_targets:
+ if isinstance(previous, DirectoryTarget) and isinstance(target, DirectoryTarget) \
+ and previous.name == target.name:
+ previous.modules = tuple(set(previous.modules) | set(target.modules))
+ else:
+ exclude.append(target)
+ previous = target
+
+ return tuple(include), tuple(exclude)
+
+
+def filter_targets(targets, patterns, include=True, directories=True, errors=True):
+ """
+ :type targets: collections.Iterable[CompletionTarget]
+ :type patterns: list[str]
+ :type include: bool
+ :type directories: bool
+ :type errors: bool
+ :rtype: collections.Iterable[CompletionTarget]
+ """
+ unmatched = set(patterns or ())
+
+ for target in targets:
+ matched_directories = set()
+ match = False
+
+ if patterns:
+ for alias in target.aliases:
+ for pattern in patterns:
+ if re.match('^%s$' % pattern, alias):
+ match = True
+
+ try:
+ unmatched.remove(pattern)
+ except KeyError:
+ pass
+
+ if alias.endswith('/'):
+ if target.base_path and len(target.base_path) > len(alias):
+ matched_directories.add(target.base_path)
+ else:
+ matched_directories.add(alias)
+ elif include:
+ match = True
+ if not target.base_path:
+ matched_directories.add('.')
+ for alias in target.aliases:
+ if alias.endswith('/'):
+ if target.base_path and len(target.base_path) > len(alias):
+ matched_directories.add(target.base_path)
+ else:
+ matched_directories.add(alias)
+
+ if match != include:
+ continue
+
+ if directories and matched_directories:
+ yield DirectoryTarget(sorted(matched_directories, key=len)[0], target.modules)
+ else:
+ yield target
+
+ if errors:
+ if unmatched:
+ raise TargetPatternsNotMatched(unmatched)
+
+
+def walk_module_targets():
+ """
+ :rtype: collections.Iterable[TestTarget]
+ """
+ path = 'lib/ansible/modules'
+
+ for target in walk_test_targets(path, path + '/', extensions=MODULE_EXTENSIONS):
+ if not target.module:
+ continue
+
+ yield target
+
+
+def walk_units_targets():
+ """
+ :rtype: collections.Iterable[TestTarget]
+ """
+ return walk_test_targets(path='test/units', module_path='test/units/modules/', extensions=('.py',), prefix='test_')
+
+
+def walk_compile_targets():
+ """
+ :rtype: collections.Iterable[TestTarget]
+ """
+ return walk_test_targets(module_path='lib/ansible/modules/', extensions=('.py',))
+
+
+def walk_sanity_targets():
+ """
+ :rtype: collections.Iterable[TestTarget]
+ """
+ return walk_test_targets(module_path='lib/ansible/modules/')
+
+
+def walk_posix_integration_targets():
+ """
+ :rtype: collections.Iterable[IntegrationTarget]
+ """
+ for target in walk_integration_targets():
+ if 'posix/' in target.aliases:
+ yield target
+
+
+def walk_network_integration_targets():
+ """
+ :rtype: collections.Iterable[IntegrationTarget]
+ """
+ for target in walk_integration_targets():
+ if 'network/' in target.aliases:
+ yield target
+
+
+def walk_windows_integration_targets():
+ """
+ :rtype: collections.Iterable[IntegrationTarget]
+ """
+ for target in walk_integration_targets():
+ if 'windows/' in target.aliases:
+ yield target
+
+
+def walk_integration_targets():
+ """
+ :rtype: collections.Iterable[IntegrationTarget]
+ """
+ path = 'test/integration/targets'
+ modules = frozenset(t.module for t in walk_module_targets())
+ paths = sorted(os.path.join(path, p) for p in os.listdir(path))
+ prefixes = load_integration_prefixes()
+
+ for path in paths:
+ yield IntegrationTarget(path, modules, prefixes)
+
+
+def load_integration_prefixes():
+ """
+ :rtype: dict[str, str]
+ """
+ path = 'test/integration'
+ names = sorted(f for f in os.listdir(path) if os.path.splitext(f)[0] == 'target-prefixes')
+ prefixes = {}
+
+ for name in names:
+ prefix = os.path.splitext(name)[1][1:]
+ with open(os.path.join(path, name), 'r') as prefix_fd:
+ prefixes.update(dict((k, prefix) for k in prefix_fd.read().splitlines()))
+
+ return prefixes
+
+
+def walk_test_targets(path=None, module_path=None, extensions=None, prefix=None):
+ """
+ :type path: str | None
+ :type module_path: str | None
+ :type extensions: tuple[str] | None
+ :type prefix: str | None
+ :rtype: collections.Iterable[TestTarget]
+ """
+ for root, _, file_names in os.walk(path or '.', topdown=False):
+ if root.endswith('/__pycache__'):
+ continue
+
+ if path is None:
+ root = root[2:]
+
+ if root.startswith('.'):
+ continue
+
+ for file_name in file_names:
+ name, ext = os.path.splitext(os.path.basename(file_name))
+
+ if name.startswith('.'):
+ continue
+
+ if extensions and ext not in extensions:
+ continue
+
+ if prefix and not name.startswith(prefix):
+ continue
+
+ yield TestTarget(os.path.join(root, file_name), module_path, prefix, path)
+
+
+class CompletionTarget(object):
+ """Command-line argument completion target base class."""
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self):
+ self.name = None
+ self.path = None
+ self.base_path = None
+ self.modules = tuple()
+ self.aliases = tuple()
+
+ def __eq__(self, other):
+ if isinstance(other, CompletionTarget):
+ return self.__repr__() == other.__repr__()
+ else:
+ return False
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __lt__(self, other):
+ return self.name.__lt__(other.name)
+
+ def __gt__(self, other):
+ return self.name.__gt__(other.name)
+
+ def __hash__(self):
+ return hash(self.__repr__())
+
+ def __repr__(self):
+ if self.modules:
+ return '%s (%s)' % (self.name, ', '.join(self.modules))
+
+ return self.name
+
+
+class DirectoryTarget(CompletionTarget):
+ """Directory target."""
+ def __init__(self, path, modules):
+ """
+ :type path: str
+ :type modules: tuple[str]
+ """
+ super(DirectoryTarget, self).__init__()
+
+ self.name = path
+ self.path = path
+ self.modules = modules
+
+
+class TestTarget(CompletionTarget):
+ """Generic test target."""
+ def __init__(self, path, module_path, module_prefix, base_path):
+ """
+ :type path: str
+ :type module_path: str | None
+ :type module_prefix: str | None
+ :type base_path: str
+ """
+ super(TestTarget, self).__init__()
+
+ self.name = path
+ self.path = path
+ self.base_path = base_path + '/' if base_path else None
+
+ name, ext = os.path.splitext(os.path.basename(self.path))
+
+ if module_path and path.startswith(module_path) and name != '__init__' and ext in MODULE_EXTENSIONS:
+ self.module = name[len(module_prefix or ''):].lstrip('_')
+ self.modules = self.module,
+ else:
+ self.module = None
+ self.modules = tuple()
+
+ aliases = [self.path, self.module]
+ parts = self.path.split('/')
+
+ for i in range(1, len(parts)):
+ alias = '%s/' % '/'.join(parts[:i])
+ aliases.append(alias)
+
+ aliases = [a for a in aliases if a]
+
+ self.aliases = tuple(sorted(aliases))
+
+
+class IntegrationTarget(CompletionTarget):
+ """Integration test target."""
+ non_posix = frozenset((
+ 'network',
+ 'windows',
+ ))
+
+ categories = frozenset(non_posix | frozenset((
+ 'posix',
+ 'module',
+ 'needs',
+ 'skip',
+ )))
+
+ def __init__(self, path, modules, prefixes):
+ """
+ :type path: str
+ :type modules: frozenset[str]
+ :type prefixes: dict[str, str]
+ """
+ super(IntegrationTarget, self).__init__()
+
+ self.name = os.path.basename(path)
+ self.path = path
+
+ # script_path and type
+
+ contents = sorted(os.listdir(path))
+
+ runme_files = tuple(c for c in contents if os.path.splitext(c)[0] == 'runme')
+ test_files = tuple(c for c in contents if os.path.splitext(c)[0] == 'test')
+
+ self.script_path = None
+
+ if runme_files:
+ self.type = 'script'
+ self.script_path = os.path.join(path, runme_files[0])
+ elif test_files:
+ self.type = 'special'
+ elif os.path.isdir(os.path.join(path, 'tasks')):
+ self.type = 'role'
+ else:
+ self.type = 'unknown'
+
+ # static_aliases
+
+ try:
+ with open(os.path.join(path, 'aliases'), 'r') as aliases_file:
+ static_aliases = tuple(aliases_file.read().splitlines())
+ except IOError as ex:
+ if ex.errno != errno.ENOENT:
+ raise
+ static_aliases = tuple()
+
+ # modules
+
+ if self.name in modules:
+ module = self.name
+ elif self.name.startswith('win_') and self.name[4:] in modules:
+ module = self.name[4:]
+ else:
+ module = None
+
+ self.modules = tuple(sorted(a for a in static_aliases + tuple([module]) if a in modules))
+
+ # groups
+
+ groups = [self.type]
+ groups += [a for a in static_aliases if a not in modules]
+ groups += ['module/%s' % m for m in self.modules]
+
+ if not self.modules:
+ groups.append('non_module')
+
+ if 'destructive' not in groups:
+ groups.append('non_destructive')
+
+ if '_' in self.name:
+ prefix = self.name[:self.name.find('_')]
+ else:
+ prefix = None
+
+ if prefix in prefixes:
+ group = prefixes[prefix]
+
+ if group != prefix:
+ group = '%s/%s' % (group, prefix)
+
+ groups.append(group)
+
+ if self.name.startswith('win_'):
+ groups.append('windows')
+
+ if self.name.startswith('connection_'):
+ groups.append('connection')
+
+ if self.name.startswith('setup_') or self.name.startswith('prepare_'):
+ groups.append('hidden')
+
+ if self.type not in ('script', 'role'):
+ groups.append('hidden')
+
+ for group in itertools.islice(groups, 0, len(groups)):
+ if '/' in group:
+ parts = group.split('/')
+ for i in range(1, len(parts)):
+ groups.append('/'.join(parts[:i]))
+
+ if not any(g in self.non_posix for g in groups):
+ groups.append('posix')
+
+ # aliases
+
+ aliases = [self.name] + \
+ ['%s/' % g for g in groups] + \
+ ['%s/%s' % (g, self.name) for g in groups if g not in self.categories]
+
+ if 'hidden/' in aliases:
+ aliases = ['hidden/'] + ['hidden/%s' % a for a in aliases if not a.startswith('hidden/')]
+
+ self.aliases = tuple(sorted(set(aliases)))
+
+
+class TargetPatternsNotMatched(ApplicationError):
+ """One or more targets were not matched when a match was required."""
+ def __init__(self, patterns):
+ """
+ :type patterns: set[str]
+ """
+ self.patterns = sorted(patterns)
+
+ if len(patterns) > 1:
+ message = 'Target patterns not matched:\n%s' % '\n'.join(self.patterns)
+ else:
+ message = 'Target pattern not matched: %s' % self.patterns[0]
+
+ super(TargetPatternsNotMatched, self).__init__(message)