diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-08-30 10:51:17 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-08-30 10:51:17 +0000 |
commit | f3baddd706c90d0e9406eee30bf4304ad3e53ea3 (patch) | |
tree | 1e601d5f141396ed0bad655da216511a3994c5d8 | |
parent | aa9bc230681486fea5f7d23e35bcfd62ad673310 (diff) | |
parent | ec8b3130fd17fd80369c8e04351e5f7779b0c538 (diff) | |
download | buildstream-f3baddd706c90d0e9406eee30bf4304ad3e53ea3.tar.gz |
Merge branch 'tmewett/test-in-subprocess' into 'master'
Add in_subprocess pytest mark and modify tests which run in another process to use it
Closes #1108
See merge request BuildStream/buildstream!1557
-rw-r--r-- | setup.cfg | 4 | ||||
-rw-r--r-- | src/buildstream/testing/_forked.py | 94 | ||||
-rw-r--r-- | tests/artifactcache/artifactservice.py | 170 | ||||
-rw-r--r-- | tests/artifactcache/pull.py | 185 | ||||
-rw-r--r-- | tests/artifactcache/push.py | 135 | ||||
-rwxr-xr-x | tests/conftest.py | 22 | ||||
-rw-r--r-- | tests/internals/storage.py | 62 | ||||
-rw-r--r-- | tests/internals/storage_vdir_import.py | 110 |
8 files changed, 287 insertions, 495 deletions
@@ -20,6 +20,10 @@ env = D:XDG_CACHE_HOME=./tmp/cache D:XDG_CONFIG_HOME=./tmp/config D:XDG_DATA_HOME=./tmp/share +markers = + integration: run test only if --integration option is specified + remoteexecution: run test only if --remote-execution option is specified + in_subprocess: run test in a Python process forked from the main one [pycodestyle] max-line-length = 119 diff --git a/src/buildstream/testing/_forked.py b/src/buildstream/testing/_forked.py new file mode 100644 index 000000000..af5e9c070 --- /dev/null +++ b/src/buildstream/testing/_forked.py @@ -0,0 +1,94 @@ +# This code was based on pytest-forked, commit 6098c1, found here: +# <https://github.com/pytest-dev/pytest-forked> +# Its copyright notice is included below. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import os +import marshal + +import py +import pytest +# XXX Using pytest private internals here +from _pytest import runner + +EXITSTATUS_TESTEXIT = 4 + + +# copied from xdist remote +def serialize_report(rep): + d = rep.__dict__.copy() + if hasattr(rep.longrepr, 'toterminal'): + d['longrepr'] = str(rep.longrepr) + else: + d['longrepr'] = rep.longrepr + for name in d: + if isinstance(d[name], py.path.local): # pylint: disable=no-member + d[name] = str(d[name]) + elif name == "result": + d[name] = None # for now + return d + + +def forked_run_report(item): + def runforked(): + try: + reports = runner.runtestprotocol(item, log=False) + except KeyboardInterrupt: + os._exit(EXITSTATUS_TESTEXIT) + return marshal.dumps([serialize_report(x) for x in reports]) + + ff = py.process.ForkedFunc(runforked) # pylint: disable=no-member + result = ff.waitfinish() + if result.retval is not None: + report_dumps = marshal.loads(result.retval) + return [runner.TestReport(**x) for x in report_dumps] + else: + if result.exitstatus == EXITSTATUS_TESTEXIT: + pytest.exit("forked test item %s raised Exit" % (item,)) + return [report_process_crash(item, result)] + + +def report_process_crash(item, result): + try: + from _pytest.compat import getfslineno + except ImportError: + # pytest<4.2 + path, lineno = item._getfslineno() + else: + path, lineno = getfslineno(item) + info = ("%s:%s: running the test CRASHED with signal %d" % + (path, lineno, result.signal)) + + # We need to create a CallInfo instance that is pre-initialised to contain + # info about an exception. We do this by using a function which does + # 0/0. Also, the API varies between pytest versions. + has_from_call = getattr(runner.CallInfo, "from_call", None) is not None + if has_from_call: # pytest >= 4.1 + call = runner.CallInfo.from_call(lambda: 0 / 0, "???") + else: + call = runner.CallInfo(lambda: 0 / 0, "???") + call.excinfo = info + + rep = runner.pytest_runtest_makereport(item, call) + if result.out: + rep.sections.append(("captured stdout", result.out)) + if result.err: + rep.sections.append(("captured stderr", result.err)) + return rep diff --git a/tests/artifactcache/artifactservice.py b/tests/artifactcache/artifactservice.py index 3d229e24f..b3bc7c218 100644 --- a/tests/artifactcache/artifactservice.py +++ b/tests/artifactcache/artifactservice.py @@ -17,9 +17,7 @@ # Authors: Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> # import os -import signal from urllib.parse import urlparse -from multiprocessing import Process, Queue import grpc import pytest @@ -30,128 +28,84 @@ from buildstream._protos.buildstream.v2.artifact_pb2_grpc import ArtifactService from buildstream._protos.build.bazel.remote.execution.v2 \ import remote_execution_pb2 as re_pb2 from buildstream import utils -from buildstream import _signals from tests.testutils.artifactshare import create_artifact_share -# Since parent processes wait for queue events, we need -# to put something on it if the called process raises an -# exception. -def _queue_wrapper(target, queue, *args): - try: - target(*args, queue=queue) - except Exception as e: - queue.put(str(e)) - raise - - +@pytest.mark.in_subprocess def test_artifact_get_not_found(tmpdir): sharedir = os.path.join(str(tmpdir), "share") with create_artifact_share(sharedir) as share: # set up artifact service stub url = urlparse(share.repo) - queue = Queue() - process = Process(target=_queue_wrapper, args=(_artifact_request, queue, url)) + channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) + artifact_stub = ArtifactServiceStub(channel) + + # Run GetArtifact and check it throws a not found error + request = GetArtifactRequest() + request.cache_key = "@artifact/something/not_there" try: - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - error = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - assert not error - - -def _artifact_request(url, queue): - channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) - artifact_stub = ArtifactServiceStub(channel) - - # Run GetArtifact and check it throws a not found error - request = GetArtifactRequest() - request.cache_key = "@artifact/something/not_there" - try: - artifact_stub.GetArtifact(request) - except grpc.RpcError as e: - assert e.code() == grpc.StatusCode.NOT_FOUND - assert e.details() == "Artifact proto not found" - queue.put(None) - else: - assert False + artifact_stub.GetArtifact(request) + except grpc.RpcError as e: + assert e.code() == grpc.StatusCode.NOT_FOUND + assert e.details() == "Artifact proto not found" + else: + assert False # Successfully getting the artifact +@pytest.mark.in_subprocess @pytest.mark.parametrize("files", ["present", "absent", "invalid"]) def test_update_artifact(tmpdir, files): sharedir = os.path.join(str(tmpdir), "share") with create_artifact_share(sharedir, casd=True) as share: - queue = Queue() - process = Process(target=_queue_wrapper, args=(_update_artifact, queue, share, files)) + # put files object + if files == "present": + directory = re_pb2.Directory() + digest = share.cas.add_object(buffer=directory.SerializeToString()) + elif files == "invalid": + digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8")) + elif files == "absent": + digest = utils._message_digest("abcdefghijklmnop".encode("utf-8")) - try: - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - error = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - assert not error - - -def _update_artifact(share, files, *, queue): - # put files object - if files == "present": - directory = re_pb2.Directory() - digest = share.cas.add_object(buffer=directory.SerializeToString()) - elif files == "invalid": - digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8")) - elif files == "absent": - digest = utils._message_digest("abcdefghijklmnop".encode("utf-8")) - - url = urlparse(share.repo) - - channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) - artifact_stub = ArtifactServiceStub(channel) - - # initialise an artifact - artifact = Artifact() - artifact.version = 0 - artifact.build_success = True - artifact.strong_key = "abcdefghijklmnop" - artifact.files.hash = "hashymchashash" - artifact.files.size_bytes = 10 - - artifact.files.CopyFrom(digest) - - # Put it in the artifact share with an UpdateArtifactRequest - request = UpdateArtifactRequest() - request.artifact.CopyFrom(artifact) - request.cache_key = "a-cache-key" - - # should return the same artifact back - if files == "present": - response = artifact_stub.UpdateArtifact(request) + url = urlparse(share.repo) + + channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) + artifact_stub = ArtifactServiceStub(channel) + + # initialise an artifact + artifact = Artifact() + artifact.version = 0 + artifact.build_success = True + artifact.strong_key = "abcdefghijklmnop" + artifact.files.hash = "hashymchashash" + artifact.files.size_bytes = 10 + + artifact.files.CopyFrom(digest) + + # Put it in the artifact share with an UpdateArtifactRequest + request = UpdateArtifactRequest() + request.artifact.CopyFrom(artifact) + request.cache_key = "a-cache-key" + + # should return the same artifact back + if files == "present": + response = artifact_stub.UpdateArtifact(request) + assert response == artifact + else: + try: + artifact_stub.UpdateArtifact(request) + except grpc.RpcError as e: + assert e.code() == grpc.StatusCode.FAILED_PRECONDITION + if files == "absent": + assert e.details() == "Artifact files specified but no files found" + elif files == "invalid": + assert e.details() == "Artifact files specified but directory not found" + return + + # If we uploaded the artifact check GetArtifact + request = GetArtifactRequest() + request.cache_key = "a-cache-key" + + response = artifact_stub.GetArtifact(request) assert response == artifact - else: - try: - artifact_stub.UpdateArtifact(request) - except grpc.RpcError as e: - assert e.code() == grpc.StatusCode.FAILED_PRECONDITION - if files == "absent": - assert e.details() == "Artifact files specified but no files found" - elif files == "invalid": - assert e.details() == "Artifact files specified but directory not found" - queue.put(None) - return - - # If we uploaded the artifact check GetArtifact - request = GetArtifactRequest() - request.cache_key = "a-cache-key" - - response = artifact_stub.GetArtifact(request) - assert response == artifact - queue.put(None) diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index 6003cea41..71db3e338 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -1,13 +1,11 @@ # Pylint doesn't play well with fixtures and dependency injection from pytest # pylint: disable=redefined-outer-name -import multiprocessing import os -import signal import pytest -from buildstream import _yaml, _signals, utils +from buildstream import _yaml from buildstream._project import Project from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from buildstream.testing import cli # pylint: disable=unused-import @@ -22,17 +20,6 @@ DATA_DIR = os.path.join( ) -# Since parent processes wait for queue events, we need -# to put something on it if the called process raises an -# exception. -def _queue_wrapper(target, queue, *args): - try: - target(*args, queue=queue) - except Exception as e: - queue.put(str(e)) - raise - - def tree_maker(cas, tree, directory): if tree.root.ByteSize() == 0: tree.root.CopyFrom(directory) @@ -46,6 +33,7 @@ def tree_maker(cas, tree, directory): tree_maker(cas, tree, child_directory) +@pytest.mark.in_subprocess @pytest.mark.datafiles(DATA_DIR) def test_pull(cli, tmpdir, datafiles): project_dir = str(datafiles) @@ -96,58 +84,28 @@ def test_pull(cli, tmpdir, datafiles): element_key = cli.get_element_key(project_dir, 'target.bst') assert not cli.artifact.is_cached(cache_dir, element, element_key) - queue = multiprocessing.Queue() - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - process = multiprocessing.Process(target=_queue_wrapper, - args=(_test_pull, queue, user_config_file, project_dir, - cache_dir, 'target.bst', element_key)) - - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - - error = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - assert not error - assert cli.artifact.is_cached(cache_dir, element, element_key) - - -def _test_pull(user_config_file, project_dir, cache_dir, - element_name, element_key, queue): - with dummy_context(config=user_config_file) as context: - context.cachedir = cache_dir - context.casdir = os.path.join(cache_dir, 'cas') - context.tmpdir = os.path.join(cache_dir, 'tmp') + context.cachedir = cache_dir + context.casdir = os.path.join(cache_dir, 'cas') + context.tmpdir = os.path.join(cache_dir, 'tmp') - # Load the project manually - project = Project(project_dir, context) - project.ensure_fully_loaded() + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() - # Create a local artifact cache handle - artifactcache = context.artifactcache + # Create a local artifact cache handle + artifactcache = context.artifactcache - # Load the target element - element = project.load_elements([element_name])[0] + # Manually setup the CAS remote + artifactcache.setup_remotes(use_config=True) - # Manually setup the CAS remote - artifactcache.setup_remotes(use_config=True) + assert artifactcache.has_push_remotes(plugin=element), \ + "No remote configured for element target.bst" + assert artifactcache.pull(element, element_key), "Pull operation failed" - if artifactcache.has_push_remotes(plugin=element): - # Push the element's artifact - if not artifactcache.pull(element, element_key): - queue.put("Pull operation failed") - else: - queue.put(None) - else: - queue.put("No remote configured for element {}".format(element_name)) + assert cli.artifact.is_cached(cache_dir, element, element_key) +@pytest.mark.in_subprocess @pytest.mark.datafiles(DATA_DIR) def test_pull_tree(cli, tmpdir, datafiles): project_dir = str(datafiles) @@ -196,76 +154,11 @@ def test_pull_tree(cli, tmpdir, datafiles): # Retrieve the Directory object from the cached artifact artifact_digest = cli.artifact.get_digest(rootcache_dir, element, element_key) - queue = multiprocessing.Queue() - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - process = multiprocessing.Process(target=_queue_wrapper, - args=(_test_push_tree, queue, user_config_file, project_dir, - artifact_digest)) - - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - - tree_hash, tree_size = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - assert tree_hash and tree_size - - # Now delete the artifact locally - cli.remove_artifact_from_cache(project_dir, 'target.bst') - - # Assert that we are not cached locally anymore - assert cli.get_element_state(project_dir, 'target.bst') != 'cached' - - tree_digest = remote_execution_pb2.Digest(hash=tree_hash, - size_bytes=tree_size) - - queue = multiprocessing.Queue() - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - process = multiprocessing.Process(target=_queue_wrapper, - args=(_test_pull_tree, queue, user_config_file, project_dir, - tree_digest)) - - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - - directory_hash, directory_size = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - # Directory size now zero with AaaP and stack element commit #1cbc5e63dc - assert directory_hash and not directory_size - - directory_digest = remote_execution_pb2.Digest(hash=directory_hash, - size_bytes=directory_size) - - # Ensure the entire Tree stucture has been pulled - assert os.path.exists(cas.objpath(directory_digest)) - - -def _test_push_tree(user_config_file, project_dir, artifact_digest, queue): - with dummy_context(config=user_config_file) as context: - # Load the project manually - project = Project(project_dir, context) - project.ensure_fully_loaded() - - # Create a local artifact cache and cas handle - artifactcache = context.artifactcache - cas = context.get_cascache() - - # Manually setup the CAS remote - artifactcache.setup_remotes(use_config=True) + artifactcache = context.artifactcache + # Manually setup the CAS remote + artifactcache.setup_remotes(use_config=True) + assert artifactcache.has_push_remotes() - if artifactcache.has_push_remotes(): directory = remote_execution_pb2.Directory() with open(cas.objpath(artifact_digest), 'rb') as f: @@ -277,27 +170,27 @@ def _test_push_tree(user_config_file, project_dir, artifact_digest, queue): # Push the Tree as a regular message tree_digest = artifactcache.push_message(project, tree) + tree_hash, tree_size = tree_digest.hash, tree_digest.size_bytes + assert tree_hash and tree_size - queue.put((tree_digest.hash, tree_digest.size_bytes)) - else: - queue.put("No remote configured") + # Now delete the artifact locally + cli.remove_artifact_from_cache(project_dir, 'target.bst') + # Assert that we are not cached locally anymore + assert cli.get_element_state(project_dir, 'target.bst') != 'cached' -def _test_pull_tree(user_config_file, project_dir, artifact_digest, queue): - with dummy_context(config=user_config_file) as context: - # Load the project manually - project = Project(project_dir, context) - project.ensure_fully_loaded() + tree_digest = remote_execution_pb2.Digest(hash=tree_hash, + size_bytes=tree_size) - # Create a local artifact cache handle - artifactcache = context.artifactcache - - # Manually setup the CAS remote - artifactcache.setup_remotes(use_config=True) - - if artifactcache.has_push_remotes(): # Pull the artifact using the Tree object directory_digest = artifactcache.pull_tree(project, artifact_digest) - queue.put((directory_digest.hash, directory_digest.size_bytes)) - else: - queue.put("No remote configured") + directory_hash, directory_size = directory_digest.hash, directory_digest.size_bytes + + # Directory size now zero with AaaP and stack element commit #1cbc5e63dc + assert directory_hash and not directory_size + + directory_digest = remote_execution_pb2.Digest(hash=directory_hash, + size_bytes=directory_size) + + # Ensure the entire Tree stucture has been pulled + assert os.path.exists(cas.objpath(directory_digest)) diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py index 81d75023d..20d9ccfec 100644 --- a/tests/artifactcache/push.py +++ b/tests/artifactcache/push.py @@ -1,13 +1,11 @@ # Pylint doesn't play well with fixtures and dependency injection from pytest # pylint: disable=redefined-outer-name -import multiprocessing import os -import signal import pytest -from buildstream import _yaml, _signals, utils, Scope +from buildstream import _yaml, Scope from buildstream._project import Project from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from buildstream.testing import cli # pylint: disable=unused-import @@ -22,17 +20,7 @@ DATA_DIR = os.path.join( ) -# Since parent processes wait for queue events, we need -# to put something on it if the called process raises an -# exception. -def _queue_wrapper(target, queue, *args): - try: - target(*args, queue=queue) - except Exception as e: - queue.put(str(e)) - raise - - +@pytest.mark.in_subprocess @pytest.mark.datafiles(DATA_DIR) def test_push(cli, tmpdir, datafiles): project_dir = str(datafiles) @@ -73,61 +61,28 @@ def test_push(cli, tmpdir, datafiles): element_key = cli.get_element_key(project_dir, 'target.bst') assert cli.artifact.is_cached(rootcache_dir, element, element_key) - queue = multiprocessing.Queue() - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - process = multiprocessing.Process(target=_queue_wrapper, - args=(_test_push, queue, user_config_file, project_dir, - 'target.bst')) - - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - - error = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - assert not error - assert share.has_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst', cache_key=element_key)) - - -def _test_push(user_config_file, project_dir, element_name, queue): - with dummy_context(config=user_config_file) as context: - # Load the project manually - project = Project(project_dir, context) - project.ensure_fully_loaded() - - # Create a local artifact cache handle - artifactcache = context.artifactcache + # Create a local artifact cache handle + artifactcache = context.artifactcache - # Load the target element - element = project.load_elements([element_name])[0] + # Ensure the element's artifact memeber is initialised + # This is duplicated from Pipeline.resolve_elements() + # as this test does not use the cli frontend. + for e in element.dependencies(Scope.ALL): + # Determine initial element state. + e._update_state() - # Ensure the element's artifact memeber is initialised - # This is duplicated from Pipeline.resolve_elements() - # as this test does not use the cli frontend. - for e in element.dependencies(Scope.ALL): - # Determine initial element state. - e._update_state() + # Manually setup the CAS remotes + artifactcache.setup_remotes(use_config=True) + artifactcache.initialize_remotes() - # Manually setup the CAS remotes - artifactcache.setup_remotes(use_config=True) - artifactcache.initialize_remotes() + assert artifactcache.has_push_remotes(plugin=element), \ + "No remote configured for element target.bst" + assert element._push(), "Push operation failed" - if artifactcache.has_push_remotes(plugin=element): - # Push the element's artifact - if not element._push(): - queue.put("Push operation failed") - else: - queue.put(None) - else: - queue.put("No remote configured for element {}".format(element_name)) + assert share.has_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst', cache_key=element_key)) +@pytest.mark.in_subprocess @pytest.mark.datafiles(DATA_DIR) def test_push_message(tmpdir, datafiles): project_dir = str(datafiles) @@ -151,52 +106,28 @@ def test_push_message(tmpdir, datafiles): # Write down the user configuration file _yaml.roundtrip_dump(user_config, file=user_config_file) - queue = multiprocessing.Queue() - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - process = multiprocessing.Process(target=_queue_wrapper, - args=(_test_push_message, queue, user_config_file, - project_dir)) - - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - - message_hash, message_size = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - assert message_hash and message_size - message_digest = remote_execution_pb2.Digest(hash=message_hash, - size_bytes=message_size) - assert share.has_object(message_digest) - - -def _test_push_message(user_config_file, project_dir, queue): - with dummy_context(config=user_config_file) as context: - # Load the project manually - project = Project(project_dir, context) - project.ensure_fully_loaded() + with dummy_context(config=user_config_file) as context: + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() - # Create a local artifact cache handle - artifactcache = context.artifactcache + # Create a local artifact cache handle + artifactcache = context.artifactcache - # Manually setup the artifact remote - artifactcache.setup_remotes(use_config=True) - artifactcache.initialize_remotes() + # Manually setup the artifact remote + artifactcache.setup_remotes(use_config=True) + artifactcache.initialize_remotes() + assert artifactcache.has_push_remotes() - if artifactcache.has_push_remotes(): - # Create an example message object command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'], working_directory='/buildstream-build', output_directories=['/buildstream-install']) # Push the message object command_digest = artifactcache.push_message(project, command) + message_hash, message_size = command_digest.hash, command_digest.size_bytes - queue.put((command_digest.hash, command_digest.size_bytes)) - else: - queue.put("No remote configured") + assert message_hash and message_size + message_digest = remote_execution_pb2.Digest(hash=message_hash, + size_bytes=message_size) + assert share.has_object(message_digest) diff --git a/tests/conftest.py b/tests/conftest.py index d6b0b02e0..7728fb5c8 100755 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,6 +23,7 @@ import os import pytest from buildstream.testing import register_repo_kind, sourcetests_collection_hook +from buildstream.testing._forked import forked_run_report from buildstream.testing.integration import integration_cache # pylint: disable=unused-import @@ -68,6 +69,27 @@ def pytest_runtest_setup(item): ################################################# +# in_subprocess mark # +################################################# +# +# Various issues can occur when forking the Python process and using gRPC, +# due to its multithreading. As BuildStream forks for parallelisation, gRPC +# features are restricted to child processes, so tests using them must also +# run as child processes. The in_subprocess mark handles this. +# See <https://github.com/grpc/grpc/blob/master/doc/fork_support.md>. +# +@pytest.mark.tryfirst +def pytest_runtest_protocol(item): + if item.get_closest_marker('in_subprocess') is not None: + reports = forked_run_report(item) + for rep in reports: + item.ihook.pytest_runtest_logreport(report=rep) + return True + else: + return None + + +################################################# # remote_services fixture # ################################################# # diff --git a/tests/internals/storage.py b/tests/internals/storage.py index 385162c13..d846e4b1f 100644 --- a/tests/internals/storage.py +++ b/tests/internals/storage.py @@ -1,11 +1,8 @@ from contextlib import contextmanager -import multiprocessing import os -import signal import pytest -from buildstream import utils, _signals from buildstream._cas import CASCache from buildstream.storage._casbaseddirectory import CasBasedDirectory from buildstream.storage._filebaseddirectory import FileBasedDirectory @@ -16,19 +13,6 @@ DATA_DIR = os.path.join( ) -# Since parent processes wait for queue events, we need -# to put something on it if the called process raises an -# exception. -def _queue_wrapper(target, queue, *args): - try: - target(*args) - except Exception as e: - queue.put(str(e)) - raise - - queue.put(None) - - @contextmanager def setup_backend(backend_class, tmpdir): if backend_class == FileBasedDirectory: @@ -41,7 +25,11 @@ def setup_backend(backend_class, tmpdir): cas_cache.release_resources() -def _test_import_subprocess(tmpdir, datafiles, backend): +@pytest.mark.in_subprocess +@pytest.mark.parametrize("backend", [ + FileBasedDirectory, CasBasedDirectory]) +@pytest.mark.datafiles(DATA_DIR) +def test_import(tmpdir, datafiles, backend): original = os.path.join(str(datafiles), "original") with setup_backend(backend, str(tmpdir)) as c: @@ -51,27 +39,11 @@ def _test_import_subprocess(tmpdir, datafiles, backend): assert "bin/hello" in c.list_relative_paths() +@pytest.mark.in_subprocess @pytest.mark.parametrize("backend", [ FileBasedDirectory, CasBasedDirectory]) @pytest.mark.datafiles(DATA_DIR) -def test_import(tmpdir, datafiles, backend): - queue = multiprocessing.Queue() - process = multiprocessing.Process(target=_queue_wrapper, - args=(_test_import_subprocess, queue, - tmpdir, datafiles, backend)) - try: - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - error = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - assert not error - - -def _test_modified_file_list_subprocess(tmpdir, datafiles, backend): +def test_modified_file_list(tmpdir, datafiles, backend): original = os.path.join(str(datafiles), "original") overlay = os.path.join(str(datafiles), "overlay") @@ -86,23 +58,3 @@ def _test_modified_file_list_subprocess(tmpdir, datafiles, backend): assert "bin/bash" in c.list_relative_paths() assert "bin/bash" in c.list_modified_paths() assert "bin/hello" not in c.list_modified_paths() - - -@pytest.mark.parametrize("backend", [ - FileBasedDirectory, CasBasedDirectory]) -@pytest.mark.datafiles(DATA_DIR) -def test_modified_file_list(tmpdir, datafiles, backend): - queue = multiprocessing.Queue() - process = multiprocessing.Process(target=_queue_wrapper, - args=(_test_modified_file_list_subprocess, queue, - tmpdir, datafiles, backend)) - try: - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - error = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - assert not error diff --git a/tests/internals/storage_vdir_import.py b/tests/internals/storage_vdir_import.py index e0165fc13..8fbf4142f 100644 --- a/tests/internals/storage_vdir_import.py +++ b/tests/internals/storage_vdir_import.py @@ -14,14 +14,11 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. from hashlib import sha256 -import multiprocessing import os import random -import signal import pytest -from buildstream import utils, _signals from buildstream.storage._casbaseddirectory import CasBasedDirectory from buildstream.storage._filebaseddirectory import FileBasedDirectory from buildstream._cas import CASCache @@ -43,47 +40,14 @@ from buildstream.storage.directory import VirtualDirectoryError root_filesets = [ [('a/b/c/textfile1', 'F', 'This is textfile 1\n')], [('a/b/c/textfile1', 'F', 'This is the replacement textfile 1\n')], - [('a/b/d', 'D', '')], - [('a/b/e', 'S', '/a/b/d')], [('a/b/f', 'S', '/a/b/c')], - [('a/b/d', 'D', ''), ('a/b/e', 'S', '/a/b/d')], [('a/b/c', 'D', ''), ('a/b/f', 'S', '/a/b/c')], - [('a/c', 'F', 'This is textfile 1\n')], - [('a/b/e', 'F', 'This is textfile 1\n')], - [('a/b/c', 'D', '')] + [('a/b/f', 'F', 'This is textfile 1\n')], ] empty_hash_ref = sha256().hexdigest() RANDOM_SEED = 69105 -NUM_RANDOM_TESTS = 10 - - -# Since parent processes wait for queue events, we need -# to put something on it if the called process raises an -# exception. -def _queue_wrapper(target, queue, *args): - try: - target(*args) - except Exception as e: - queue.put(str(e)) - raise - - queue.put(None) - - -def _run_test_in_subprocess(func, *args): - queue = multiprocessing.Queue() - process = multiprocessing.Process(target=_queue_wrapper, args=(func, queue, *args)) - try: - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - error = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - assert not error +NUM_RANDOM_TESTS = 4 def generate_import_roots(rootno, directory): @@ -213,7 +177,7 @@ def directory_not_empty(path): return os.listdir(path) -def _import_test_subprocess(tmpdir, original, overlay, generator_function, verify_contents=False): +def _import_test(tmpdir, original, overlay, generator_function, verify_contents=False): cas_cache = CASCache(tmpdir) try: # Create some fake content @@ -269,26 +233,21 @@ def _import_test_subprocess(tmpdir, original, overlay, generator_function, verif cas_cache.release_resources() -def _import_test(tmpdir, original, overlay, generator_function, verify_contents=False): - _run_test_in_subprocess(_import_test_subprocess, tmpdir, original, overlay, generator_function, verify_contents) - - -# It's possible to parameterize on both original and overlay values, -# but this leads to more tests being listed in the output than are -# comfortable. +@pytest.mark.in_subprocess @pytest.mark.parametrize("original", range(1, len(root_filesets) + 1)) -def test_fixed_cas_import(tmpdir, original): - for overlay in range(1, len(root_filesets) + 1): - _import_test(str(tmpdir), original, overlay, generate_import_roots, verify_contents=True) +@pytest.mark.parametrize("overlay", range(1, len(root_filesets) + 1)) +def test_fixed_cas_import(tmpdir, original, overlay): + _import_test(str(tmpdir), original, overlay, generate_import_roots, verify_contents=True) +@pytest.mark.in_subprocess @pytest.mark.parametrize("original", range(1, NUM_RANDOM_TESTS + 1)) -def test_random_cas_import(tmpdir, original): - for overlay in range(1, NUM_RANDOM_TESTS + 1): - _import_test(str(tmpdir), original, overlay, generate_random_root, verify_contents=False) +@pytest.mark.parametrize("overlay", range(1, NUM_RANDOM_TESTS + 1)) +def test_random_cas_import(tmpdir, original, overlay): + _import_test(str(tmpdir), original, overlay, generate_random_root, verify_contents=False) -def _listing_test_subprocess(tmpdir, root, generator_function): +def _listing_test(tmpdir, root, generator_function): cas_cache = CASCache(tmpdir) try: # Create some fake content @@ -305,22 +264,21 @@ def _listing_test_subprocess(tmpdir, root, generator_function): cas_cache.release_resources() -def _listing_test(tmpdir, root, generator_function): - _run_test_in_subprocess(_listing_test_subprocess, tmpdir, root, generator_function) - - -@pytest.mark.parametrize("root", range(1, 11)) +@pytest.mark.in_subprocess +@pytest.mark.parametrize("root", range(1, NUM_RANDOM_TESTS + 1)) def test_random_directory_listing(tmpdir, root): _listing_test(str(tmpdir), root, generate_random_root) -@pytest.mark.parametrize("root", [1, 2, 3, 4, 5]) +@pytest.mark.in_subprocess +@pytest.mark.parametrize("root", range(1, len(root_filesets) + 1)) def test_fixed_directory_listing(tmpdir, root): _listing_test(str(tmpdir), root, generate_import_roots) # Check that the vdir is decending and readable -def _test_descend_subprocess(tmpdir): +@pytest.mark.in_subprocess +def test_descend(tmpdir): cas_dir = os.path.join(str(tmpdir), 'cas') cas_cache = CASCache(cas_dir) try: @@ -343,14 +301,11 @@ def _test_descend_subprocess(tmpdir): cas_cache.release_resources() -def test_descend(tmpdir): - _run_test_in_subprocess(_test_descend_subprocess, tmpdir) - - # Check symlink logic for edgecases # Make sure the correct erros are raised when trying # to decend in to files or links to files -def _test_bad_symlinks_subprocess(tmpdir): +@pytest.mark.in_subprocess +def test_bad_symlinks(tmpdir): cas_dir = os.path.join(str(tmpdir), 'cas') cas_cache = CASCache(cas_dir) try: @@ -381,13 +336,10 @@ def _test_bad_symlinks_subprocess(tmpdir): cas_cache.release_resources() -def test_bad_symlinks(tmpdir): - _run_test_in_subprocess(_test_bad_symlinks_subprocess, tmpdir) - - # Check symlink logic for edgecases # Check decend accross relitive link -def _test_relative_symlink_subprocess(tmpdir): +@pytest.mark.in_subprocess +def test_relative_symlink(tmpdir): cas_dir = os.path.join(str(tmpdir), 'cas') cas_cache = CASCache(cas_dir) try: @@ -410,13 +362,10 @@ def _test_relative_symlink_subprocess(tmpdir): cas_cache.release_resources() -def test_relative_symlink(tmpdir): - _run_test_in_subprocess(_test_relative_symlink_subprocess, tmpdir) - - # Check symlink logic for edgecases # Check deccend accross abs link -def _test_abs_symlink_subprocess(tmpdir): +@pytest.mark.in_subprocess +def test_abs_symlink(tmpdir): cas_dir = os.path.join(str(tmpdir), 'cas') cas_cache = CASCache(cas_dir) try: @@ -440,13 +389,10 @@ def _test_abs_symlink_subprocess(tmpdir): cas_cache.release_resources() -def test_abs_symlink(tmpdir): - _run_test_in_subprocess(_test_abs_symlink_subprocess, tmpdir) - - # Check symlink logic for edgecases # Check symlink can not escape root -def _test_bad_sym_escape_subprocess(tmpdir): +@pytest.mark.in_subprocess +def test_bad_sym_escape(tmpdir): cas_dir = os.path.join(str(tmpdir), 'cas') cas_cache = CASCache(cas_dir) try: @@ -468,7 +414,3 @@ def _test_bad_sym_escape_subprocess(tmpdir): assert error.reason == "directory-not-found" finally: cas_cache.release_resources() - - -def test_bad_sym_escape(tmpdir): - _run_test_in_subprocess(_test_bad_sym_escape_subprocess, tmpdir) |