summaryrefslogtreecommitdiff
path: root/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/main.py')
-rw-r--r--test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/main.py218
1 files changed, 92 insertions, 126 deletions
diff --git a/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/main.py b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/main.py
index f18477d529..8af944bc61 100644
--- a/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/main.py
+++ b/test/lib/ansible_test/_util/controller/sanity/validate-modules/validate_modules/main.py
@@ -25,9 +25,7 @@ import json
import errno
import os
import re
-import subprocess
import sys
-import tempfile
import traceback
import warnings
@@ -307,8 +305,8 @@ class ModuleValidator(Validator):
ACCEPTLIST_FUTURE_IMPORTS = frozenset(('absolute_import', 'division', 'print_function'))
- def __init__(self, path, analyze_arg_spec=False, collection=None, collection_version=None,
- base_branch=None, git_cache=None, reporter=None, routing=None, plugin_type='module'):
+ def __init__(self, path, git_cache: GitCache, analyze_arg_spec=False, collection=None, collection_version=None,
+ reporter=None, routing=None, plugin_type='module'):
super(ModuleValidator, self).__init__(reporter=reporter or Reporter())
self.path = path
@@ -334,8 +332,8 @@ class ModuleValidator(Validator):
self.collection_version_str = collection_version
self.collection_version = SemanticVersion(collection_version)
- self.base_branch = base_branch
- self.git_cache = git_cache or GitCache()
+ self.git_cache = git_cache
+ self.base_module = self.git_cache.get_original_path(self.path)
self._python_module_override = False
@@ -347,11 +345,6 @@ class ModuleValidator(Validator):
except Exception:
self.ast = None
- if base_branch:
- self.base_module = self._get_base_file()
- else:
- self.base_module = None
-
def _create_version(self, v, collection_name=None):
if not v:
raise ValueError('Empty string is not a valid version')
@@ -374,13 +367,7 @@ class ModuleValidator(Validator):
return self
def __exit__(self, exc_type, exc_value, traceback):
- if not self.base_module:
- return
-
- try:
- os.remove(self.base_module)
- except Exception:
- pass
+ pass
@property
def object_name(self):
@@ -423,35 +410,9 @@ class ModuleValidator(Validator):
except AttributeError:
return False
- def _get_base_branch_module_path(self):
- """List all paths within lib/ansible/modules to try and match a moved module"""
- return self.git_cache.base_module_paths.get(self.object_name)
-
- def _has_alias(self):
- """Return true if the module has any aliases."""
- return self.object_name in self.git_cache.head_aliased_modules
-
- def _get_base_file(self):
- # In case of module moves, look for the original location
- base_path = self._get_base_branch_module_path()
-
- command = ['git', 'show', '%s:%s' % (self.base_branch, base_path or self.path)]
- p = subprocess.run(command, stdin=subprocess.DEVNULL, capture_output=True, check=False)
-
- if int(p.returncode) != 0:
- return None
-
- t = tempfile.NamedTemporaryFile(delete=False)
- t.write(p.stdout)
- t.close()
-
- return t.name
-
- def _is_new_module(self):
- if self._has_alias():
- return False
-
- return not self.object_name.startswith('_') and bool(self.base_branch) and not bool(self.base_module)
+ def _is_new_module(self) -> bool | None:
+ """Return True if the content is new, False if it is not and None if the information is not available."""
+ return self.git_cache.is_new(self.path)
def _check_interpreter(self, powershell=False):
if powershell:
@@ -2006,7 +1967,7 @@ class ModuleValidator(Validator):
)
def _check_for_new_args(self, doc):
- if not self.base_branch or self._is_new_module():
+ if not self.base_module:
return
with CaptureStd():
@@ -2238,7 +2199,7 @@ class ModuleValidator(Validator):
# We can only validate PowerShell arg spec if it is using the new Ansible.Basic.AnsibleModule util
pattern = r'(?im)^#\s*ansiblerequires\s+\-csharputil\s*Ansible\.Basic'
if re.search(pattern, self.text) and self.object_name not in self.PS_ARG_VALIDATE_REJECTLIST:
- with ModuleValidator(docs_path, base_branch=self.base_branch, git_cache=self.git_cache) as docs_mv:
+ with ModuleValidator(docs_path, git_cache=self.git_cache) as docs_mv:
docs = docs_mv._validate_docs()[1]
self._validate_ansible_module_call(docs)
@@ -2283,6 +2244,84 @@ class PythonPackageValidator(Validator):
)
+class GitCache(metaclass=abc.ABCMeta):
+ """Base class for access to original files."""
+ @abc.abstractmethod
+ def get_original_path(self, path: str) -> str | None:
+ """Return the path to the original version of the specified file, or None if there isn't one."""
+
+ @abc.abstractmethod
+ def is_new(self, path: str) -> bool | None:
+ """Return True if the content is new, False if it is not and None if the information is not available."""
+
+ @staticmethod
+ def create(original_plugins: str | None, plugin_type: str) -> GitCache:
+ return CoreGitCache(original_plugins, plugin_type) if original_plugins else NoOpGitCache()
+
+
+class CoreGitCache(GitCache):
+ """Provides access to original files when testing core."""
+ def __init__(self, original_plugins: str | None, plugin_type: str) -> None:
+ super().__init__()
+
+ self.original_plugins = original_plugins
+
+ rel_path = 'lib/ansible/modules/' if plugin_type == 'module' else f'lib/ansible/plugins/{plugin_type}/'
+ head_tree = self._find_files(rel_path)
+
+ head_aliased_modules = set()
+
+ for path in head_tree:
+ filename = os.path.basename(path)
+
+ if filename.startswith('_') and filename != '__init__.py':
+ if os.path.islink(path):
+ head_aliased_modules.add(os.path.basename(os.path.realpath(path)))
+
+ self._head_aliased_modules = head_aliased_modules
+
+ def get_original_path(self, path: str) -> str | None:
+ """Return the path to the original version of the specified file, or None if there isn't one."""
+ path = os.path.join(self.original_plugins, path)
+
+ if not os.path.exists(path):
+ path = None
+
+ return path
+
+ def is_new(self, path: str) -> bool | None:
+ """Return True if the content is new, False if it is not and None if the information is not available."""
+ if os.path.basename(path).startswith('_'):
+ return False
+
+ if os.path.basename(path) in self._head_aliased_modules:
+ return False
+
+ return not self.get_original_path(path)
+
+ @staticmethod
+ def _find_files(path: str) -> list[str]:
+ """Return a list of files found in the specified directory."""
+ paths = []
+
+ for (dir_path, dir_names, file_names) in os.walk(path):
+ for file_name in file_names:
+ paths.append(os.path.join(dir_path, file_name))
+
+ return sorted(paths)
+
+
+class NoOpGitCache(GitCache):
+ """Provides a no-op interface for access to original files."""
+ def get_original_path(self, path: str) -> str | None:
+ """Return the path to the original version of the specified file, or None if there isn't one."""
+ return None
+
+ def is_new(self, path: str) -> bool | None:
+ """Return True if the content is new, False if it is not and None if the information is not available."""
+ return None
+
+
def re_compile(value):
"""
Argparse expects things to raise TypeError, re.compile raises an re.error
@@ -2308,8 +2347,6 @@ def run():
type=re_compile)
parser.add_argument('--arg-spec', help='Analyze module argument spec',
action='store_true', default=False)
- parser.add_argument('--base-branch', default=None,
- help='Used in determining if new options were added')
parser.add_argument('--format', choices=['json', 'plain'], default='plain',
help='Output format. Default: "%(default)s"')
parser.add_argument('--output', default='-',
@@ -2326,13 +2363,14 @@ def run():
parser.add_argument('--plugin-type',
default='module',
help='The plugin type to validate. Defaults to %(default)s')
+ parser.add_argument('--original-plugins')
args = parser.parse_args()
args.plugins = [m.rstrip('/') for m in args.plugins]
reporter = Reporter()
- git_cache = GitCache(args.base_branch, args.plugin_type)
+ git_cache = GitCache.create(args.original_plugins, args.plugin_type)
check_dirs = set()
@@ -2357,7 +2395,7 @@ def run():
if ModuleValidator.is_on_rejectlist(path):
continue
with ModuleValidator(path, collection=args.collection, collection_version=args.collection_version,
- analyze_arg_spec=args.arg_spec, base_branch=args.base_branch,
+ analyze_arg_spec=args.arg_spec,
git_cache=git_cache, reporter=reporter, routing=routing,
plugin_type=args.plugin_type) as mv1:
mv1.validate()
@@ -2382,7 +2420,7 @@ def run():
if ModuleValidator.is_on_rejectlist(path):
continue
with ModuleValidator(path, collection=args.collection, collection_version=args.collection_version,
- analyze_arg_spec=args.arg_spec, base_branch=args.base_branch,
+ analyze_arg_spec=args.arg_spec,
git_cache=git_cache, reporter=reporter, routing=routing,
plugin_type=args.plugin_type) as mv2:
mv2.validate()
@@ -2398,78 +2436,6 @@ def run():
sys.exit(reporter.json(warnings=args.warnings, output=args.output))
-class GitCache:
- def __init__(self, base_branch, plugin_type):
- self.base_branch = base_branch
- self.plugin_type = plugin_type
-
- self.rel_path = 'lib/ansible/modules/'
- if plugin_type != 'module':
- self.rel_path = 'lib/ansible/plugins/%s/' % plugin_type
-
- if self.base_branch:
- self.base_tree = self._git(['ls-tree', '-r', '--name-only', self.base_branch, self.rel_path])
- else:
- self.base_tree = []
-
- try:
- self.head_tree = self._git(['ls-tree', '-r', '--name-only', 'HEAD', self.rel_path])
- except GitError as ex:
- if ex.status == 128:
- # fallback when there is no .git directory
- self.head_tree = self._get_module_files()
- else:
- raise
- except OSError as ex:
- if ex.errno == errno.ENOENT:
- # fallback when git is not installed
- self.head_tree = self._get_module_files()
- else:
- raise
-
- allowed_exts = ('.py', '.ps1')
- if plugin_type != 'module':
- allowed_exts = ('.py', )
- self.base_module_paths = dict((os.path.basename(p), p) for p in self.base_tree if os.path.splitext(p)[1] in allowed_exts)
-
- self.base_module_paths.pop('__init__.py', None)
-
- self.head_aliased_modules = set()
-
- for path in self.head_tree:
- filename = os.path.basename(path)
-
- if filename.startswith('_') and filename != '__init__.py':
- if os.path.islink(path):
- self.head_aliased_modules.add(os.path.basename(os.path.realpath(path)))
-
- def _get_module_files(self):
- module_files = []
-
- for (dir_path, dir_names, file_names) in os.walk(self.rel_path):
- for file_name in file_names:
- module_files.append(os.path.join(dir_path, file_name))
-
- return module_files
-
- @staticmethod
- def _git(args):
- cmd = ['git'] + args
- p = subprocess.run(cmd, stdin=subprocess.DEVNULL, capture_output=True, text=True, check=False)
-
- if p.returncode != 0:
- raise GitError(p.stderr, p.returncode)
-
- return p.stdout.splitlines()
-
-
-class GitError(Exception):
- def __init__(self, message, status):
- super(GitError, self).__init__(message)
-
- self.status = status
-
-
def main():
try:
run()