summaryrefslogtreecommitdiff
path: root/tests/unit/seed/embed/test_bootstrap_link_via_app_data.py
blob: fac9ac6992ac82ccd91deb884d4aa8a49f3e6cb4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
from __future__ import annotations

import contextlib
import os
import sys
from pathlib import Path
from stat import S_IWGRP, S_IWOTH, S_IWUSR
from subprocess import Popen, check_call
from threading import Thread

import pytest
from pytest_mock import MockerFixture

from virtualenv.discovery import cached_py_info
from virtualenv.discovery.py_info import PythonInfo
from virtualenv.info import fs_supports_symlink
from virtualenv.run import cli_run
from virtualenv.seed.wheels.embed import BUNDLE_FOLDER, BUNDLE_SUPPORT
from virtualenv.util.path import safe_delete


@pytest.mark.slow()
@pytest.mark.parametrize("copies", [False, True] if fs_supports_symlink() else [True])
def test_seed_link_via_app_data(tmp_path, coverage_env, current_fastest, copies):
    current = PythonInfo.current_system()
    bundle_ver = BUNDLE_SUPPORT[current.version_release_str]
    create_cmd = [
        str(tmp_path / "en v"),  # space in the name to ensure generated scripts work when path has space
        "--no-periodic-update",
        "--seeder",
        "app-data",
        "--extra-search-dir",
        str(BUNDLE_FOLDER),
        "--download",
        "--pip",
        bundle_ver["pip"].split("-")[1],
        "--setuptools",
        bundle_ver["setuptools"].split("-")[1],
        "--reset-app-data",
        "--creator",
        current_fastest,
        "-vv",
    ]
    if not copies:
        create_cmd.append("--symlink-app-data")
    result = cli_run(create_cmd)
    coverage_env()
    assert result

    # uninstalling pip/setuptools now should leave us with a ensure_safe_to_do env
    site_package = result.creator.purelib
    pip = site_package / "pip"
    setuptools = site_package / "setuptools"

    files_post_first_create = set(site_package.iterdir())
    assert pip in files_post_first_create
    assert setuptools in files_post_first_create
    for pip_exe in [
        result.creator.script_dir / f"pip{suffix}{result.creator.exe.suffix}"
        for suffix in (
            "",
            f"{current.version_info.major}",
            f"{current.version_info.major}.{current.version_info.minor}",
            f"-{current.version_info.major}.{current.version_info.minor}",
        )
    ]:
        assert pip_exe.exists()
        process = Popen([str(pip_exe), "--version", "--disable-pip-version-check"])
        _, __ = process.communicate()
        assert not process.returncode

    remove_cmd = [
        str(result.creator.script("pip")),
        "--verbose",
        "--disable-pip-version-check",
        "uninstall",
        "-y",
        "setuptools",
    ]
    process = Popen(remove_cmd)
    _, __ = process.communicate()
    assert not process.returncode
    assert site_package.exists()

    files_post_first_uninstall = set(site_package.iterdir())
    assert pip in files_post_first_uninstall
    assert setuptools not in files_post_first_uninstall

    # install a different setuptools to test that virtualenv removes this before installing new
    version = f"setuptools<{bundle_ver['setuptools'].split('-')[1]}"
    install_cmd = [str(result.creator.script("pip")), "--verbose", "--disable-pip-version-check", "install", version]
    process = Popen(install_cmd)
    process.communicate()
    assert not process.returncode
    assert site_package.exists()
    files_post_downgrade = set(site_package.iterdir())
    assert setuptools in files_post_downgrade

    # check we can run it again and will work - checks both overwrite and reuse cache
    result = cli_run(create_cmd)
    coverage_env()
    assert result
    files_post_second_create = set(site_package.iterdir())
    assert files_post_first_create == files_post_second_create

    # Windows does not allow removing a executable while running it, so when uninstalling pip we need to do it via
    # python -m pip
    remove_cmd = [str(result.creator.exe), "-m", "pip"] + remove_cmd[1:]
    process = Popen(remove_cmd + ["pip", "wheel"])
    _, __ = process.communicate()
    assert not process.returncode
    # pip is greedy here, removing all packages removes the site-package too
    if site_package.exists():
        purelib = result.creator.purelib
        patch_files = {purelib / f"{'_virtualenv'}.{i}" for i in ("py", "pyc", "pth")}
        patch_files.add(purelib / "__pycache__")
        post_run = set(site_package.iterdir()) - patch_files
        assert not post_run, "\n".join(str(i) for i in post_run)


@contextlib.contextmanager
def read_only_dir(d):
    write = S_IWUSR | S_IWGRP | S_IWOTH
    for root, _, filenames in os.walk(str(d)):
        os.chmod(root, os.stat(root).st_mode & ~write)
        for filename in filenames:
            filename = os.path.join(root, filename)
            os.chmod(filename, os.stat(filename).st_mode & ~write)
    try:
        yield
    finally:
        for root, _, filenames in os.walk(str(d)):
            os.chmod(root, os.stat(root).st_mode | write)
            for filename in filenames:
                filename = os.path.join(root, filename)
                os.chmod(filename, os.stat(filename).st_mode | write)


@pytest.fixture()
def read_only_app_data(temp_app_data):
    temp_app_data.mkdir()
    with read_only_dir(temp_app_data):
        yield temp_app_data


@pytest.mark.skipif(sys.platform == "win32", reason="Windows only applies R/O to files")
@pytest.mark.usefixtures("read_only_app_data")
def test_base_bootstrap_link_via_app_data_not_writable(tmp_path, current_fastest):
    dest = tmp_path / "venv"
    result = cli_run(["--seeder", "app-data", "--creator", current_fastest, "-vv", str(dest)])
    assert result


@pytest.mark.skipif(sys.platform == "win32", reason="Windows only applies R/O to files")
def test_populated_read_only_cache_and_symlinked_app_data(tmp_path, current_fastest, temp_app_data):
    dest = tmp_path / "venv"
    cmd = [
        "--seeder",
        "app-data",
        "--creator",
        current_fastest,
        "--symlink-app-data",
        "-vv",
        str(dest),
    ]

    assert cli_run(cmd)
    check_call((str(dest.joinpath("bin/python")), "-c", "import pip"))

    cached_py_info._CACHE.clear()  # necessary to re-trigger py info discovery
    safe_delete(dest)

    # should succeed with special flag when read-only
    with read_only_dir(temp_app_data):
        assert cli_run(["--read-only-app-data"] + cmd)
        check_call((str(dest.joinpath("bin/python")), "-c", "import pip"))


@pytest.mark.skipif(sys.platform == "win32", reason="Windows only applies R/O to files")
def test_populated_read_only_cache_and_copied_app_data(tmp_path, current_fastest, temp_app_data):
    dest = tmp_path / "venv"
    cmd = [
        "--seeder",
        "app-data",
        "--creator",
        current_fastest,
        "-vv",
        "-p",
        "python",
        str(dest),
    ]

    assert cli_run(cmd)

    cached_py_info._CACHE.clear()  # necessary to re-trigger py info discovery
    safe_delete(dest)

    # should succeed with special flag when read-only
    with read_only_dir(temp_app_data):
        assert cli_run(["--read-only-app-data"] + cmd)


@pytest.mark.slow()
@pytest.mark.parametrize("pkg", ["pip", "setuptools", "wheel"])
@pytest.mark.usefixtures("session_app_data", "current_fastest", "coverage_env")
def test_base_bootstrap_link_via_app_data_no(tmp_path, pkg):
    create_cmd = [str(tmp_path), "--seeder", "app-data", f"--no-{pkg}", "--wheel", "bundle", "--setuptools", "bundle"]
    result = cli_run(create_cmd)
    assert not (result.creator.purelib / pkg).exists()
    for key in {"pip", "setuptools", "wheel"} - {pkg}:
        assert (result.creator.purelib / key).exists()


@pytest.mark.usefixtures("temp_app_data")
def test_app_data_parallel_ok(tmp_path):
    exceptions = _run_parallel_threads(tmp_path)
    assert not exceptions, "\n".join(exceptions)


@pytest.mark.usefixtures("temp_app_data")
def test_app_data_parallel_fail(tmp_path: Path, mocker: MockerFixture) -> None:
    mocker.patch("virtualenv.seed.embed.via_app_data.pip_install.base.PipInstall.build_image", side_effect=RuntimeError)
    exceptions = _run_parallel_threads(tmp_path)
    assert len(exceptions) == 2
    for exception in exceptions:
        assert exception.startswith("failed to build image wheel because:\nTraceback")
        assert "RuntimeError" in exception, exception


def _run_parallel_threads(tmp_path):
    exceptions = []

    def _run(name):
        try:
            cli_run(["--seeder", "app-data", str(tmp_path / name), "--no-pip", "--no-setuptools", "--wheel", "bundle"])
        except Exception as exception:
            as_str = str(exception)
            exceptions.append(as_str)

    threads = [Thread(target=_run, args=(f"env{i}",)) for i in range(1, 3)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    return exceptions