diff options
author | Sloane Hertel <19572925+s-hertel@users.noreply.github.com> | 2022-03-23 16:41:27 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-23 16:41:27 -0400 |
commit | f96a661adaf627764fc3527cab1ca4d829d69ea1 (patch) | |
tree | 1b231cc291cf69ccc5e0431465dfee4f2811adb7 /lib/ansible/galaxy | |
parent | 21827522dc447e92a1581a16ee46033fd2bebbd7 (diff) | |
download | ansible-f96a661adaf627764fc3527cab1ca4d829d69ea1.tar.gz |
ansible-galaxy - add configuration options for more flexible collection signature verification (#77026)
* Add a toggle to control the number of signatures required to verify the authenticity of a collection
* Make the default number of required valid signatures 1
* Add option to make signature verification strict and fail if there are no valid signatures (e.g. "+1")
* Use a regex to validate --required-valid-signature-count
* Add a toggle to limit the gpg status codes that are considered a failure
* Update documentation and changelog
* Add unit and integration tests for the new options
* Fixes #77146
Fix using user-provided signatures when running 'ansible-galaxy collection verify ns.coll --offline'
Add a test for a user-provided signature when running ansible-galaxy collection verify with --offline
Fix displaying overall gpg failure without extra verbosity
Add a test for displaying gpg failure without verbosity
Improve documentation to be more clear that signature verification only currently applies to collections directly sourced from Galaxy servers
Diffstat (limited to 'lib/ansible/galaxy')
-rw-r--r-- | lib/ansible/galaxy/collection/__init__.py | 149 | ||||
-rw-r--r-- | lib/ansible/galaxy/collection/concrete_artifact_manager.py | 27 |
2 files changed, 136 insertions, 40 deletions
diff --git a/lib/ansible/galaxy/collection/__init__.py b/lib/ansible/galaxy/collection/__init__.py index 2e6565d5ff..cdb80069dd 100644 --- a/lib/ansible/galaxy/collection/__init__.py +++ b/lib/ansible/galaxy/collection/__init__.py @@ -12,6 +12,7 @@ import functools import json import os import queue +import re import shutil import stat import sys @@ -75,7 +76,8 @@ from ansible.galaxy.collection.galaxy_api_proxy import MultiGalaxyAPIProxy from ansible.galaxy.collection.gpg import ( run_gpg_verify, parse_gpg_errors, - get_signature_from_source + get_signature_from_source, + GPG_ERROR_MAP, ) from ansible.galaxy.dependency_resolution import ( build_collection_dependency_resolver, @@ -103,12 +105,15 @@ MANIFEST_FILENAME = 'MANIFEST.json' ModifiedContent = namedtuple('ModifiedContent', ['filename', 'expected', 'installed']) +SIGNATURE_COUNT_RE = r"^(?P<strict>\+)?(?:(?P<count>\d+)|(?P<all>all))$" + class CollectionSignatureError(Exception): - def __init__(self, reasons=None, stdout=None, rc=None): + def __init__(self, reasons=None, stdout=None, rc=None, ignore=False): self.reasons = reasons self.stdout = stdout self.rc = rc + self.ignore = ignore self._reason_wrapper = None @@ -197,11 +202,11 @@ def verify_local_collection( return result manifest_file = os.path.join(to_text(b_collection_path, errors='surrogate_or_strict'), MANIFEST_FILENAME) - signatures = [] + signatures = list(local_collection.signatures) if verify_local_only and local_collection.source_info is not None: - signatures = [info["signature"] for info in local_collection.source_info["signatures"]] + signatures = [info["signature"] for info in local_collection.source_info["signatures"]] + signatures elif not verify_local_only and remote_collection.signatures: - signatures = remote_collection.signatures + signatures = list(remote_collection.signatures) + signatures keyring_configured = artifacts_manager.keyring is not None if not keyring_configured and signatures: @@ -213,17 +218,18 @@ def verify_local_collection( "the origin of the collection. " "Skipping signature verification." ) - else: - for signature in signatures: - try: - verify_file_signature(manifest_file, to_text(signature, errors='surrogate_or_strict'), artifacts_manager.keyring) - except CollectionSignatureError as error: - display.vvvv(error.report(local_collection.fqcn)) - result.success = False - if not result.success: + elif keyring_configured: + if not verify_file_signatures( + local_collection.fqcn, + manifest_file, + signatures, + artifacts_manager.keyring, + artifacts_manager.required_successful_signature_count, + artifacts_manager.ignore_signature_errors, + ): + result.success = False return result - elif signatures: - display.vvvv(f"GnuPG signature verification succeeded, verifying contents of {local_collection}") + display.vvvv(f"GnuPG signature verification succeeded, verifying contents of {local_collection}") if verify_local_only: # since we're not downloading this, just seed it with the value from disk @@ -324,8 +330,58 @@ def verify_local_collection( return result -def verify_file_signature(manifest_file, detached_signature, keyring): - # type: (str, str, str) -> None +def verify_file_signatures(fqcn, manifest_file, detached_signatures, keyring, required_successful_count, ignore_signature_errors): + # type: (str, str, list[str], str, str, list[str]) -> bool + successful = 0 + error_messages = [] + + signature_count_requirements = re.match(SIGNATURE_COUNT_RE, required_successful_count).groupdict() + + strict = signature_count_requirements['strict'] or False + require_all = signature_count_requirements['all'] + require_count = signature_count_requirements['count'] + if require_count is not None: + require_count = int(require_count) + + for signature in detached_signatures: + signature = to_text(signature, errors='surrogate_or_strict') + try: + verify_file_signature(manifest_file, signature, keyring, ignore_signature_errors) + except CollectionSignatureError as error: + if error.ignore: + # Do not include ignored errors in either the failed or successful count + continue + error_messages.append(error.report(fqcn)) + else: + successful += 1 + + if require_all: + continue + + if successful == require_count: + break + + if strict and not successful: + verified = False + display.display(f"Signature verification failed for '{fqcn}': no successful signatures") + elif require_all: + verified = not error_messages + if not verified: + display.display(f"Signature verification failed for '{fqcn}': some signatures failed") + else: + verified = not detached_signatures or require_count == successful + if not verified: + display.display(f"Signature verification failed for '{fqcn}': fewer successful signatures than required") + + if not verified: + for msg in error_messages: + display.vvvv(msg) + + return verified + + +def verify_file_signature(manifest_file, detached_signature, keyring, ignore_signature_errors): + # type: (str, str, str, list[str]) -> None """Run the gpg command and parse any errors. Raises CollectionSignatureError on failure.""" gpg_result, gpg_verification_rc = run_gpg_verify(manifest_file, detached_signature, keyring, display) @@ -336,8 +392,19 @@ def verify_file_signature(manifest_file, detached_signature, keyring): except StopIteration: pass else: - reasons = set(error.get_gpg_error_description() for error in chain([error], errors)) - raise CollectionSignatureError(reasons=reasons, stdout=gpg_result, rc=gpg_verification_rc) + reasons = [] + ignored_reasons = 0 + + for error in chain([error], errors): + # Get error status (dict key) from the class (dict value) + status_code = list(GPG_ERROR_MAP.keys())[list(GPG_ERROR_MAP.values()).index(error.__class__)] + if status_code in ignore_signature_errors: + ignored_reasons += 1 + reasons.append(error.get_gpg_error_description()) + + ignore = len(reasons) == ignored_reasons + raise CollectionSignatureError(reasons=set(reasons), stdout=gpg_result, rc=gpg_verification_rc, ignore=ignore) + if gpg_verification_rc: raise CollectionSignatureError(stdout=gpg_result, rc=gpg_verification_rc) @@ -756,6 +823,18 @@ def verify_collections( local_collection = Candidate.from_dir_path( b_search_path, artifacts_manager, ) + supplemental_signatures = [ + get_signature_from_source(source, display) + for source in collection.signature_sources or [] + ] + local_collection = Candidate( + local_collection.fqcn, + local_collection.ver, + local_collection.src, + local_collection.type, + signatures=frozenset(supplemental_signatures), + ) + break else: raise AnsibleError(message=default_err) @@ -764,9 +843,6 @@ def verify_collections( remote_collection = None else: signatures = api_proxy.get_signatures(local_collection) - # NOTE: If there are no Galaxy server signatures, only user-provided signature URLs, - # NOTE: those alone validate the MANIFEST.json and the remote collection is not downloaded. - # NOTE: The remote MANIFEST.json is only used in verification if there are no signatures. signatures.extend([ get_signature_from_source(source, display) for source in collection.signature_sources or [] @@ -784,7 +860,10 @@ def verify_collections( try: # NOTE: If there are no signatures, trigger the lookup. If found, # NOTE: it'll cache download URL and token in artifact manager. - if not signatures: + # NOTE: If there are no Galaxy server signatures, only user-provided signature URLs, + # NOTE: those alone validate the MANIFEST.json and the remote collection is not downloaded. + # NOTE: The remote MANIFEST.json is only used in verification if there are no signatures. + if not signatures and not collection.signature_sources: api_proxy.get_collection_version_metadata( remote_collection, ) @@ -1211,7 +1290,9 @@ def install(collection, path, artifacts_manager): # FIXME: mv to dataclasses? b_collection_path, artifacts_manager._b_working_directory, collection.signatures, - artifacts_manager.keyring + artifacts_manager.keyring, + artifacts_manager.required_successful_signature_count, + artifacts_manager.ignore_signature_errors, ) if (collection.is_online_index_pointer and isinstance(collection.src, GalaxyAPI)): write_source_metadata( @@ -1249,23 +1330,17 @@ def write_source_metadata(collection, b_collection_path, artifacts_manager): raise -def verify_artifact_manifest(manifest_file, signatures, keyring): - # type: (str, str, str) -> None +def verify_artifact_manifest(manifest_file, signatures, keyring, required_signature_count, ignore_signature_errors): + # type: (str, list[str], str, str, list[str]) -> None failed_verify = False coll_path_parts = to_text(manifest_file, errors='surrogate_or_strict').split(os.path.sep) collection_name = '%s.%s' % (coll_path_parts[-3], coll_path_parts[-2]) # get 'ns' and 'coll' from /path/to/ns/coll/MANIFEST.json - for signature in signatures: - try: - verify_file_signature(manifest_file, to_text(signature, errors='surrogate_or_strict'), keyring) - except CollectionSignatureError as error: - display.vvvv(error.report(collection_name)) - failed_verify = True - if failed_verify: + if not verify_file_signatures(collection_name, manifest_file, signatures, keyring, required_signature_count, ignore_signature_errors): raise AnsibleError(f"Not installing {collection_name} because GnuPG signature verification failed.") display.vvvv(f"GnuPG signature verification succeeded for {collection_name}") -def install_artifact(b_coll_targz_path, b_collection_path, b_temp_path, signatures, keyring): +def install_artifact(b_coll_targz_path, b_collection_path, b_temp_path, signatures, keyring, required_signature_count, ignore_signature_errors): """Install a collection from tarball under a given path. :param b_coll_targz_path: Collection tarball to be installed. @@ -1273,15 +1348,17 @@ def install_artifact(b_coll_targz_path, b_collection_path, b_temp_path, signatur :param b_temp_path: Temporary dir path. :param signatures: frozenset of signatures to verify the MANIFEST.json :param keyring: The keyring used during GPG verification + :param required_signature_count: The number of signatures that must successfully verify the collection + :param ignore_signature_errors: GPG errors to ignore during signature verification """ try: with tarfile.open(b_coll_targz_path, mode='r') as collection_tar: # Verify the signature on the MANIFEST.json before extracting anything else _extract_tar_file(collection_tar, MANIFEST_FILENAME, b_collection_path, b_temp_path) - if signatures and keyring is not None: + if keyring is not None: manifest_file = os.path.join(to_text(b_collection_path, errors='surrogate_or_strict'), MANIFEST_FILENAME) - verify_artifact_manifest(manifest_file, signatures, keyring) + verify_artifact_manifest(manifest_file, signatures, keyring, required_signature_count, ignore_signature_errors) files_member_obj = collection_tar.getmember('FILES.json') with _tarfile_extract(collection_tar, files_member_obj) as (dummy, files_obj): diff --git a/lib/ansible/galaxy/collection/concrete_artifact_manager.py b/lib/ansible/galaxy/collection/concrete_artifact_manager.py index bdc2b9c4c3..9aa450eb03 100644 --- a/lib/ansible/galaxy/collection/concrete_artifact_manager.py +++ b/lib/ansible/galaxy/collection/concrete_artifact_manager.py @@ -56,9 +56,8 @@ class ConcreteArtifactsManager: * caching all of above * retrieving the metadata out of the downloaded artifacts """ - - def __init__(self, b_working_directory, validate_certs=True, keyring=None, timeout=60): - # type: (bytes, bool, str, int) -> None + def __init__(self, b_working_directory, validate_certs=True, keyring=None, timeout=60, required_signature_count=None, ignore_signature_errors=None): + # type: (bytes, bool, str, int, str, list[str]) -> None """Initialize ConcreteArtifactsManager caches and costraints.""" self._validate_certs = validate_certs # type: bool self._artifact_cache = {} # type: dict[bytes, bytes] @@ -70,11 +69,23 @@ class ConcreteArtifactsManager: self._supplemental_signature_cache = {} # type: dict[str, str] self._keyring = keyring # type: str self.timeout = timeout # type: int + self._required_signature_count = required_signature_count # type: str + self._ignore_signature_errors = ignore_signature_errors # type: list[str] @property def keyring(self): return self._keyring + @property + def required_successful_signature_count(self): + return self._required_signature_count + + @property + def ignore_signature_errors(self): + if self._ignore_signature_errors is None: + return [] + return self._ignore_signature_errors + def get_galaxy_artifact_source_info(self, collection): # type: (Candidate) -> dict[str, str | list[dict[str, str]]] server = collection.src.api_server @@ -321,6 +332,8 @@ class ConcreteArtifactsManager: temp_dir_base, # type: str validate_certs=True, # type: bool keyring=None, # type: str + required_signature_count=None, # type: str + ignore_signature_errors=None, # type: list[str] ): # type: (...) -> t.Iterator[ConcreteArtifactsManager] """Custom ConcreteArtifactsManager constructor with temp dir. @@ -335,7 +348,13 @@ class ConcreteArtifactsManager: ) b_temp_path = to_bytes(temp_path, errors='surrogate_or_strict') try: - yield cls(b_temp_path, validate_certs, keyring=keyring) + yield cls( + b_temp_path, + validate_certs, + keyring=keyring, + required_signature_count=required_signature_count, + ignore_signature_errors=ignore_signature_errors + ) finally: rmtree(b_temp_path) |