summaryrefslogtreecommitdiff
path: root/test/lib/ansible_test/_internal/commands/coverage/combine.py
blob: 8712a4518faea5545f613b48fedee72889684523 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
"""Combine code coverage files."""
from __future__ import annotations

import collections.abc as c
import os
import json
import typing as t

from ...target import (
    walk_compile_targets,
    walk_powershell_targets,
)

from ...io import (
    read_text_file,
)

from ...util import (
    ANSIBLE_TEST_TOOLS_ROOT,
    display,
    ApplicationError,
    raw_command,
)

from ...util_common import (
    ResultType,
    write_json_file,
    write_json_test_results,
)

from ...executor import (
    Delegate,
)

from ...data import (
    data_context,
    PayloadConfig,
)

from ...host_configs import (
    DockerConfig,
    RemoteConfig,
)

from ...provisioning import (
    HostState,
    prepare_profiles,
)

from . import (
    enumerate_python_arcs,
    enumerate_powershell_lines,
    get_collection_path_regexes,
    get_all_coverage_files,
    get_python_coverage_files,
    get_python_modules,
    get_powershell_coverage_files,
    initialize_coverage,
    COVERAGE_OUTPUT_FILE_NAME,
    COVERAGE_GROUPS,
    CoverageConfig,
    PathChecker,
)

TValue = t.TypeVar('TValue')


def command_coverage_combine(args: CoverageCombineConfig) -> None:
    """Patch paths in coverage files and merge into a single file."""
    host_state = prepare_profiles(args)  # coverage combine
    combine_coverage_files(args, host_state)


def combine_coverage_files(args: CoverageCombineConfig, host_state: HostState) -> list[str]:
    """Combine coverage and return a list of the resulting files."""
    if args.delegate:
        if isinstance(args.controller, (DockerConfig, RemoteConfig)):
            paths = get_all_coverage_files()
            exported_paths = [path for path in paths if os.path.basename(path).split('=')[-1].split('.')[:2] == ['coverage', 'combined']]

            if not exported_paths:
                raise ExportedCoverageDataNotFound()

            pairs = [(path, os.path.relpath(path, data_context().content.root)) for path in exported_paths]

            def coverage_callback(payload_config: PayloadConfig) -> None:
                """Add the coverage files to the payload file list."""
                display.info('Including %d exported coverage file(s) in payload.' % len(pairs), verbosity=1)
                files = payload_config.files
                files.extend(pairs)

            data_context().register_payload_callback(coverage_callback)

        raise Delegate(host_state=host_state)

    paths = _command_coverage_combine_powershell(args) + _command_coverage_combine_python(args, host_state)

    for path in paths:
        display.info('Generated combined output: %s' % path, verbosity=1)

    return paths


class ExportedCoverageDataNotFound(ApplicationError):
    """Exception when no combined coverage data is present yet is required."""
    def __init__(self) -> None:
        super().__init__(
            'Coverage data must be exported before processing with the `--docker` or `--remote` option.\n'
            'Export coverage with `ansible-test coverage combine` using the `--export` option.\n'
            'The exported files must be in the directory: %s/' % ResultType.COVERAGE.relative_path)


def _command_coverage_combine_python(args: CoverageCombineConfig, host_state: HostState) -> list[str]:
    """Combine Python coverage files and return a list of the output files."""
    coverage = initialize_coverage(args, host_state)

    modules = get_python_modules()

    coverage_files = get_python_coverage_files()

    def _default_stub_value(source_paths: list[str]) -> dict[str, set[tuple[int, int]]]:
        return {path: set() for path in source_paths}

    counter = 0
    sources = _get_coverage_targets(args, walk_compile_targets)
    groups = _build_stub_groups(args, sources, _default_stub_value)

    collection_search_re, collection_sub_re = get_collection_path_regexes()

    for coverage_file in coverage_files:
        counter += 1
        display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2)

        group = get_coverage_group(args, coverage_file)

        if group is None:
            display.warning('Unexpected name for coverage file: %s' % coverage_file)
            continue

        for filename, arcs in enumerate_python_arcs(coverage_file, coverage, modules, collection_search_re, collection_sub_re):
            if args.export:
                filename = os.path.relpath(filename)  # exported paths must be relative since absolute paths may differ between systems

            if group not in groups:
                groups[group] = {}

            arc_data = groups[group]

            if filename not in arc_data:
                arc_data[filename] = set()

            arc_data[filename].update(arcs)

    output_files = []

    if args.export:
        coverage_file = os.path.join(args.export, '')
        suffix = '=coverage.combined'
    else:
        coverage_file = os.path.join(ResultType.COVERAGE.path, COVERAGE_OUTPUT_FILE_NAME)
        suffix = ''

    path_checker = PathChecker(args, collection_search_re)

    for group in sorted(groups):
        arc_data = groups[group]
        output_file = coverage_file + group + suffix

        if args.explain:
            continue

        updated = coverage.CoverageData(output_file)

        for filename in arc_data:
            if not path_checker.check_path(filename):
                continue

            updated.add_arcs({filename: list(arc_data[filename])})

        if args.all:
            updated.add_arcs(dict((source[0], []) for source in sources))

        updated.write()  # always write files to make sure stale files do not exist

        if updated:
            # only report files which are non-empty to prevent coverage from reporting errors
            output_files.append(output_file)

    path_checker.report()

    return sorted(output_files)


def _command_coverage_combine_powershell(args: CoverageCombineConfig) -> list[str]:
    """Combine PowerShell coverage files and return a list of the output files."""
    coverage_files = get_powershell_coverage_files()

    def _default_stub_value(source_paths: list[str]) -> dict[str, dict[int, int]]:
        cmd = ['pwsh', os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'coverage_stub.ps1')]
        cmd.extend(source_paths)

        stubs = json.loads(raw_command(cmd, capture=True)[0])

        return dict((d['Path'], dict((line, 0) for line in d['Lines'])) for d in stubs)

    counter = 0
    sources = _get_coverage_targets(args, walk_powershell_targets)
    groups = _build_stub_groups(args, sources, _default_stub_value)

    collection_search_re, collection_sub_re = get_collection_path_regexes()

    for coverage_file in coverage_files:
        counter += 1
        display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2)

        group = get_coverage_group(args, coverage_file)

        if group is None:
            display.warning('Unexpected name for coverage file: %s' % coverage_file)
            continue

        for filename, hits in enumerate_powershell_lines(coverage_file, collection_search_re, collection_sub_re):
            if args.export:
                filename = os.path.relpath(filename)  # exported paths must be relative since absolute paths may differ between systems

            if group not in groups:
                groups[group] = {}

            coverage_data = groups[group]

            if filename not in coverage_data:
                coverage_data[filename] = {}

            file_coverage = coverage_data[filename]

            for line_no, hit_count in hits.items():
                file_coverage[line_no] = file_coverage.get(line_no, 0) + hit_count

    output_files = []

    path_checker = PathChecker(args)

    for group in sorted(groups):
        coverage_data = dict((filename, data) for filename, data in groups[group].items() if path_checker.check_path(filename))

        if args.all:
            missing_sources = [source for source, _source_line_count in sources if source not in coverage_data]
            coverage_data.update(_default_stub_value(missing_sources))

        if not args.explain:
            if args.export:
                output_file = os.path.join(args.export, group + '=coverage.combined')
                write_json_file(output_file, coverage_data, formatted=False)
                output_files.append(output_file)
                continue

            output_file = COVERAGE_OUTPUT_FILE_NAME + group + '-powershell'

            write_json_test_results(ResultType.COVERAGE, output_file, coverage_data, formatted=False)

            output_files.append(os.path.join(ResultType.COVERAGE.path, output_file))

    path_checker.report()

    return sorted(output_files)


def _get_coverage_targets(args: CoverageCombineConfig, walk_func: c.Callable) -> list[tuple[str, int]]:
    """Return a list of files to cover and the number of lines in each file, using the given function as the source of the files."""
    sources = []

    if args.all or args.stub:
        # excludes symlinks of regular files to avoid reporting on the same file multiple times
        # in the future it would be nice to merge any coverage for symlinks into the real files
        for target in walk_func(include_symlinks=False):
            target_path = os.path.abspath(target.path)

            target_lines = len(read_text_file(target_path).splitlines())

            sources.append((target_path, target_lines))

        sources.sort()

    return sources


def _build_stub_groups(
    args: CoverageCombineConfig,
    sources: list[tuple[str, int]],
    default_stub_value: c.Callable[[list[str]], dict[str, TValue]],
) -> dict[str, dict[str, TValue]]:
    """
    Split the given list of sources with line counts into groups, maintaining a maximum line count for each group.
    Each group consists of a dictionary of sources and default coverage stubs generated by the provided default_stub_value function.
    """
    groups = {}

    if args.stub:
        stub_group: list[str] = []
        stub_groups = [stub_group]
        stub_line_limit = 500000
        stub_line_count = 0

        for source, source_line_count in sources:
            stub_group.append(source)
            stub_line_count += source_line_count

            if stub_line_count > stub_line_limit:
                stub_line_count = 0
                stub_group = []
                stub_groups.append(stub_group)

        for stub_index, stub_group in enumerate(stub_groups):
            if not stub_group:
                continue

            groups['=stub-%02d' % (stub_index + 1)] = default_stub_value(stub_group)

    return groups


def get_coverage_group(args: CoverageCombineConfig, coverage_file: str) -> t.Optional[str]:
    """Return the name of the coverage group for the specified coverage file, or None if no group was found."""
    parts = os.path.basename(coverage_file).split('=', 4)

    if len(parts) != 5 or not parts[4].startswith('coverage.'):
        return None

    names = dict(
        command=parts[0],
        target=parts[1],
        environment=parts[2],
        version=parts[3],
    )

    export_names = dict(
        version=parts[3],
    )

    group = ''

    for part in COVERAGE_GROUPS:
        if part in args.group_by:
            group += '=%s' % names[part]
        elif args.export:
            group += '=%s' % export_names.get(part, 'various')

    if args.export:
        group = group.lstrip('=')

    return group


class CoverageCombineConfig(CoverageConfig):
    """Configuration for the coverage combine command."""
    def __init__(self, args: t.Any) -> None:
        super().__init__(args)

        self.group_by: frozenset[str] = frozenset(args.group_by) if args.group_by else frozenset()
        self.all: bool = args.all
        self.stub: bool = args.stub

        # only available to coverage combine
        self.export: str = args.export if 'export' in args else False