summaryrefslogtreecommitdiff
path: root/test/lib/ansible_test/_internal/ci/__init__.py
blob: 5e53b15075c50ac7d50f92868c8646054f49b96f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
"""Support code for CI environments."""
from __future__ import annotations

import abc
import base64
import json
import os
import tempfile
import typing as t

from ..encoding import (
    to_bytes,
    to_text,
)

from ..io import (
    read_text_file,
    write_text_file,
)

from ..config import (
    CommonConfig,
    TestConfig,
)

from ..util import (
    ApplicationError,
    display,
    get_subclasses,
    import_plugins,
    raw_command,
    cache,
)


class ChangeDetectionNotSupported(ApplicationError):
    """Exception for cases where change detection is not supported."""


class CIProvider(metaclass=abc.ABCMeta):
    """Base class for CI provider plugins."""

    priority = 500

    @staticmethod
    @abc.abstractmethod
    def is_supported() -> bool:
        """Return True if this provider is supported in the current running environment."""

    @property
    @abc.abstractmethod
    def code(self) -> str:
        """Return a unique code representing this provider."""

    @property
    @abc.abstractmethod
    def name(self) -> str:
        """Return descriptive name for this provider."""

    @abc.abstractmethod
    def generate_resource_prefix(self) -> str:
        """Return a resource prefix specific to this CI provider."""

    @abc.abstractmethod
    def get_base_commit(self, args: CommonConfig) -> str:
        """Return the base commit or an empty string."""

    @abc.abstractmethod
    def detect_changes(self, args: TestConfig) -> t.Optional[list[str]]:
        """Initialize change detection."""

    @abc.abstractmethod
    def supports_core_ci_auth(self) -> bool:
        """Return True if Ansible Core CI is supported."""

    @abc.abstractmethod
    def prepare_core_ci_auth(self) -> dict[str, t.Any]:
        """Return authentication details for Ansible Core CI."""

    @abc.abstractmethod
    def get_git_details(self, args: CommonConfig) -> t.Optional[dict[str, t.Any]]:
        """Return details about git in the current environment."""


@cache
def get_ci_provider() -> CIProvider:
    """Return a CI provider instance for the current environment."""
    provider = None

    import_plugins('ci')

    candidates = sorted(get_subclasses(CIProvider), key=lambda subclass: (subclass.priority, subclass.__name__))

    for candidate in candidates:
        if candidate.is_supported():
            provider = candidate()
            break

    if provider.code:
        display.info('Detected CI provider: %s' % provider.name)

    return provider


class AuthHelper(metaclass=abc.ABCMeta):
    """Public key based authentication helper for Ansible Core CI."""

    def sign_request(self, request: dict[str, t.Any]) -> None:
        """Sign the given auth request and make the public key available."""
        payload_bytes = to_bytes(json.dumps(request, sort_keys=True))
        signature_raw_bytes = self.sign_bytes(payload_bytes)
        signature = to_text(base64.b64encode(signature_raw_bytes))

        request.update(signature=signature)

    def initialize_private_key(self) -> str:
        """
        Initialize and publish a new key pair (if needed) and return the private key.
        The private key is cached across ansible-test invocations, so it is only generated and published once per CI job.
        """
        path = os.path.expanduser('~/.ansible-core-ci-private.key')

        if os.path.exists(to_bytes(path)):
            private_key_pem = read_text_file(path)
        else:
            private_key_pem = self.generate_private_key()
            write_text_file(path, private_key_pem)

        return private_key_pem

    @abc.abstractmethod
    def sign_bytes(self, payload_bytes: bytes) -> bytes:
        """Sign the given payload and return the signature, initializing a new key pair if required."""

    @abc.abstractmethod
    def publish_public_key(self, public_key_pem: str) -> None:
        """Publish the given public key."""

    @abc.abstractmethod
    def generate_private_key(self) -> str:
        """Generate a new key pair, publishing the public key and returning the private key."""


class CryptographyAuthHelper(AuthHelper, metaclass=abc.ABCMeta):
    """Cryptography based public key based authentication helper for Ansible Core CI."""

    def sign_bytes(self, payload_bytes: bytes) -> bytes:
        """Sign the given payload and return the signature, initializing a new key pair if required."""
        # import cryptography here to avoid overhead and failures in environments which do not use/provide it
        from cryptography.hazmat.backends import default_backend
        from cryptography.hazmat.primitives import hashes
        from cryptography.hazmat.primitives.asymmetric import ec
        from cryptography.hazmat.primitives.serialization import load_pem_private_key

        private_key_pem = self.initialize_private_key()
        private_key = load_pem_private_key(to_bytes(private_key_pem), None, default_backend())

        assert isinstance(private_key, ec.EllipticCurvePrivateKey)

        signature_raw_bytes = private_key.sign(payload_bytes, ec.ECDSA(hashes.SHA256()))

        return signature_raw_bytes

    def generate_private_key(self) -> str:
        """Generate a new key pair, publishing the public key and returning the private key."""
        # import cryptography here to avoid overhead and failures in environments which do not use/provide it
        from cryptography.hazmat.backends import default_backend
        from cryptography.hazmat.primitives import serialization
        from cryptography.hazmat.primitives.asymmetric import ec

        private_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
        public_key = private_key.public_key()

        private_key_pem = to_text(private_key.private_bytes(  # type: ignore[attr-defined]  # documented method, but missing from type stubs
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption(),
        ))

        public_key_pem = to_text(public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo,
        ))

        self.publish_public_key(public_key_pem)

        return private_key_pem


class OpenSSLAuthHelper(AuthHelper, metaclass=abc.ABCMeta):
    """OpenSSL based public key based authentication helper for Ansible Core CI."""

    def sign_bytes(self, payload_bytes: bytes) -> bytes:
        """Sign the given payload and return the signature, initializing a new key pair if required."""
        private_key_pem = self.initialize_private_key()

        with tempfile.NamedTemporaryFile() as private_key_file:
            private_key_file.write(to_bytes(private_key_pem))
            private_key_file.flush()

            with tempfile.NamedTemporaryFile() as payload_file:
                payload_file.write(payload_bytes)
                payload_file.flush()

                with tempfile.NamedTemporaryFile() as signature_file:
                    raw_command(['openssl', 'dgst', '-sha256', '-sign', private_key_file.name, '-out', signature_file.name, payload_file.name], capture=True)
                    signature_raw_bytes = signature_file.read()

        return signature_raw_bytes

    def generate_private_key(self) -> str:
        """Generate a new key pair, publishing the public key and returning the private key."""
        private_key_pem = raw_command(['openssl', 'ecparam', '-genkey', '-name', 'secp384r1', '-noout'], capture=True)[0]
        public_key_pem = raw_command(['openssl', 'ec', '-pubout'], data=private_key_pem, capture=True)[0]

        self.publish_public_key(public_key_pem)

        return private_key_pem