1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
"""Tools for detecting changes in a commit."""
import os
from itertools import chain
from typing import Any, Dict, Iterable, Set, Optional, List
import structlog
from git import DiffIndex, Repo
from evergreen import EvergreenApi
LOGGER = structlog.get_logger(__name__)
RevisionMap = Dict[str, str]
def _get_id_from_repo(repo: Repo) -> str:
"""
Get the identifier of the given repo.
:param repo: Repository to get id for.
:return: Identifier for repository.
"""
if repo.working_dir == os.getcwd():
return "mongo"
return os.path.basename(repo.working_dir)
def generate_revision_map(repos: List[Repo], revisions_data: Dict[str, str]) -> RevisionMap:
"""
Generate a revision map for the given repositories using the revisions in the given file.
:param repos: Repositories to generate map for.
:param revisions_data: Dictionary of revisions to use for repositories.
:return: Map of repositories to revisions
"""
revision_map = {repo.git_dir: revisions_data.get(_get_id_from_repo(repo)) for repo in repos}
return {k: v for k, v in revision_map.items() if v}
def generate_revision_map_from_manifest(repos: List[Repo], task_id: str,
evg_api: EvergreenApi) -> RevisionMap:
"""
Generate a revision map for the given repositories using the revisions from the manifest.
:param repos: Repositories to generate map for.
:param task_id: Id of evergreen task running.
:param evg_api: Evergreen API object.
:return: Map of repositories to revisions
"""
manifest = evg_api.manifest_for_task(task_id)
revisions_data = {
module_name: module.revision
for module_name, module in manifest.modules.items()
}
revisions_data["mongo"] = manifest.revision
return generate_revision_map(repos, revisions_data)
def _paths_for_iter(diff, iter_type):
"""
Get the set for all the files in the given diff for the specified type.
:param diff: git diff to query.
:param iter_type: Iter type ['M', 'A', 'R', 'D'].
:return: set of changed files.
"""
a_path_changes = {change.a_path for change in diff.iter_change_type(iter_type)}
b_path_changes = {change.b_path for change in diff.iter_change_type(iter_type)}
return a_path_changes.union(b_path_changes)
def _modified_files_for_diff(diff: DiffIndex, log: Any) -> Set:
"""
Get the set of files modified in the given git diff.
:param diff: Git diff information.
:param log: Logger for logging.
:return: Set of files that were modified in diff.
"""
modified_files = _paths_for_iter(diff, 'M')
log.debug("modified files", files=modified_files)
added_files = _paths_for_iter(diff, 'A')
log.debug("added files", files=added_files)
renamed_files = _paths_for_iter(diff, 'R')
log.debug("renamed files", files=renamed_files)
deleted_files = _paths_for_iter(diff, 'D')
log.debug("deleted files", files=deleted_files)
return modified_files.union(added_files).union(renamed_files).union(deleted_files)
def find_changed_files(repo: Repo, revision_map: Optional[RevisionMap] = None) -> Set[str]:
"""
Find files that were new or added to the repository between commits.
:param repo: Git repository.
:param revision_map: Map of revisions to compare against for repos.
:return: Set of changed files.
"""
LOGGER.info("Getting diff for repo", repo=repo.git_dir)
if not revision_map:
revision_map = {}
diff = repo.index.diff(None)
work_tree_files = _modified_files_for_diff(diff, LOGGER.bind(diff="working tree diff"))
commit = repo.index
diff = commit.diff(revision_map.get(repo.git_dir, repo.head.commit))
index_files = _modified_files_for_diff(diff, LOGGER.bind(diff="index diff"))
untracked_files = set(repo.untracked_files)
LOGGER.info("untracked files", files=untracked_files, diff="untracked diff")
paths = work_tree_files.union(index_files).union(untracked_files)
return {
os.path.relpath(f"{repo.working_dir}/{os.path.normpath(path)}", os.getcwd())
for path in paths
}
def find_changed_files_in_repos(repos: Iterable[Repo],
revision_map: Optional[RevisionMap] = None) -> Set[str]:
"""
Find the changed files.
Use git to find which files have changed in this patch.
:param repos: List of repos containing changed files.
:param revision_map: Map of revisions to compare against for repos.
:return: Set of changed files.
"""
return set(chain.from_iterable([find_changed_files(repo, revision_map) for repo in repos]))
|