summaryrefslogtreecommitdiff
path: root/tests/testutils/artifactshare.py
blob: a15a7c27e9e2080cd0a7b627f5975b205a36cdce (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import os
import shutil
import signal
import sys
from collections import namedtuple
from contextlib import ExitStack, contextmanager
from concurrent import futures
from multiprocessing import Process, Queue

import grpc

from buildstream._cas import CASCache
from buildstream._cas.casserver import create_server
from buildstream._exceptions import CASError
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream._protos.buildstream.v2 import artifact_pb2, source_pb2


class BaseArtifactShare:
    def __init__(self):
        q = Queue()

        self.process = Process(target=self.run, args=(q,))
        self.process.start()

        # Retrieve port from server subprocess
        port = q.get()

        if port is None:
            raise Exception("Error occurred when starting artifact server.")

        self.repo = "http://localhost:{}".format(port)

    # run():
    #
    # Run the artifact server.
    #
    def run(self, q):
        with ExitStack() as stack:
            try:
                # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception,
                # properly executing cleanup code in `finally` clauses and context managers.
                # This is required to terminate buildbox-casd on SIGTERM.
                signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0))

                try:
                    from pytest_cov.embed import cleanup_on_sigterm
                except ImportError:
                    pass
                else:
                    cleanup_on_sigterm()

                server = stack.enter_context(self._create_server())
                port = server.add_insecure_port("localhost:0")
                server.start()
            except Exception:
                q.put(None)
                raise

            # Send port to parent
            q.put(port)

            # Sleep until termination by signal
            signal.pause()

    # _create_server()
    #
    # Create the server that will be run in the process
    #
    def _create_server(self):
        raise NotImplementedError()

    # close():
    #
    # Remove the artifact share.
    #
    def close(self):
        self.process.terminate()
        self.process.join()


# DummyArtifactShare()
#
# A dummy artifact share without any capabilities
#
class DummyArtifactShare(BaseArtifactShare):
    @contextmanager
    def _create_server(self):
        max_workers = (os.cpu_count() or 1) * 5
        server = grpc.server(futures.ThreadPoolExecutor(max_workers))

        yield server


# ArtifactShare()
#
# Abstract class providing scaffolding for
# generating data to be used with various sources
#
# Args:
#    directory (str): The base temp directory for the test
#    cache_quota (int): Maximum amount of disk space to use
#    casd (bool): Allow write access via casd
#    enable_push (bool): Whether the share should allow pushes
#
class ArtifactShare(BaseArtifactShare):
    def __init__(self, directory, *, quota=None, casd=False, index_only=False):

        # The working directory for the artifact share (in case it
        # needs to do something outside of its backend's storage folder).
        #
        self.directory = os.path.abspath(directory)

        # The directory the actual repo will be stored in.
        #
        # Unless this gets more complicated, just use this directly
        # in tests as a remote artifact push/pull configuration
        #
        self.repodir = os.path.join(self.directory, "repo")
        os.makedirs(self.repodir)
        self.artifactdir = os.path.join(self.repodir, "artifacts", "refs")
        os.makedirs(self.artifactdir)
        self.sourcedir = os.path.join(self.repodir, "source_protos", "refs")
        os.makedirs(self.sourcedir)

        logdir = os.path.join(self.directory, "logs") if casd else None

        self.cas = CASCache(self.repodir, casd=casd, log_directory=logdir)

        self.quota = quota
        self.index_only = index_only

        super().__init__()

    def _create_server(self):
        return create_server(self.repodir, quota=self.quota, enable_push=True, index_only=self.index_only,)

    # has_object():
    #
    # Checks whether the object is present in the share
    #
    # Args:
    #    digest (str): The object's digest
    #
    # Returns:
    #    (bool): True if the object exists in the share, otherwise false.
    def has_object(self, digest):

        assert isinstance(digest, remote_execution_pb2.Digest)

        object_path = self.cas.objpath(digest)

        return os.path.exists(object_path)

    def get_artifact_proto(self, artifact_name):
        artifact_proto = artifact_pb2.Artifact()
        artifact_path = os.path.join(self.artifactdir, artifact_name)

        try:
            with open(artifact_path, "rb") as f:
                artifact_proto.ParseFromString(f.read())
        except FileNotFoundError:
            return None

        return artifact_proto

    def get_source_proto(self, source_name):
        source_proto = source_pb2.Source()
        source_path = os.path.join(self.sourcedir, source_name)

        try:
            with open(source_path, "rb") as f:
                source_proto.ParseFromString(f.read())
        except FileNotFoundError:
            return None

        return source_proto

    def get_cas_files(self, artifact_proto):

        reachable = set()

        def reachable_dir(digest):
            self.cas._reachable_refs_dir(reachable, digest, update_mtime=False, check_exists=True)

        try:
            if str(artifact_proto.files):
                reachable_dir(artifact_proto.files)

            if str(artifact_proto.buildtree):
                reachable_dir(artifact_proto.buildtree)

            if str(artifact_proto.public_data):
                if not os.path.exists(self.cas.objpath(artifact_proto.public_data)):
                    return None

            for log_file in artifact_proto.logs:
                if not os.path.exists(self.cas.objpath(log_file.digest)):
                    return None

            return artifact_proto.files

        except CASError:
            return None

        except FileNotFoundError:
            return None

    # has_artifact():
    #
    # Checks whether the artifact is present in the share
    #
    # Args:
    #    artifact_name (str): The composed complete artifact name
    #
    # Returns:
    #    (ArtifactProto): artifact digest if the artifact exists in the share, otherwise None.
    def get_artifact(self, artifact_name):
        artifact_proto = self.get_artifact_proto(artifact_name)
        if not artifact_proto:
            return None
        return self.get_cas_files(artifact_proto)

    # close():
    #
    # Remove the artifact share.
    #
    def close(self):
        super().close()

        self.cas.release_resources()

        shutil.rmtree(self.directory)


# create_artifact_share()
#
# Create an ArtifactShare for use in a test case
#
@contextmanager
def create_artifact_share(directory, *, quota=None, casd=False):
    share = ArtifactShare(directory, quota=quota, casd=casd)
    try:
        yield share
    finally:
        share.close()


@contextmanager
def create_split_share(directory1, directory2, *, quota=None, casd=False):
    index = ArtifactShare(directory1, quota=quota, casd=casd, index_only=True)
    storage = ArtifactShare(directory2, quota=quota, casd=casd)

    try:
        yield index, storage
    finally:
        index.close()
        storage.close()


# create_dummy_artifact_share()
#
# Create a dummy artifact share that doesn't have any capabilities
#
@contextmanager
def create_dummy_artifact_share():
    share = DummyArtifactShare()
    try:
        yield share
    finally:
        share.close()


statvfs_result = namedtuple("statvfs_result", "f_blocks f_bfree f_bsize f_bavail")


# Assert that a given artifact is in the share
#
def assert_shared(cli, share, project, element_name, *, project_name="test"):
    if not share.get_artifact(cli.get_artifact_name(project, project_name, element_name)):
        raise AssertionError(
            "Artifact share at {} does not contain the expected element {}".format(share.repo, element_name)
        )


# Assert that a given artifact is not in the share
#
def assert_not_shared(cli, share, project, element_name, *, project_name="test"):
    if share.get_artifact(cli.get_artifact_name(project, project_name, element_name)):
        raise AssertionError(
            "Artifact share at {} unexpectedly contains the element {}".format(share.repo, element_name)
        )