summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Mewett <tom.mewett@codethink.co.uk>2019-08-21 16:45:57 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-08-30 09:28:19 +0000
commit8b6da7d72637b630bbc06242b9a074ab4dbbe989 (patch)
tree98a9dd91fb57db7c5a2f10112f6d96b6b05ff776
parent4e0d56dfd0bad8234ddeb0ee5abb6224aa723091 (diff)
downloadbuildstream-8b6da7d72637b630bbc06242b9a074ab4dbbe989.tar.gz
tests: Modify all tests using subprocesses to use in_subprocess mark
Additionally, test code that was previous executed by a subfunction (in the forked process) has been folded into the test function itself, as separating it is now redundant. This removes some duplicate code for setting up the context and project, etc.
-rw-r--r--tests/artifactcache/artifactservice.py170
-rw-r--r--tests/artifactcache/pull.py185
-rw-r--r--tests/artifactcache/push.py135
-rw-r--r--tests/internals/storage.py62
-rw-r--r--tests/internals/storage_vdir_import.py97
5 files changed, 163 insertions, 486 deletions
diff --git a/tests/artifactcache/artifactservice.py b/tests/artifactcache/artifactservice.py
index 3d229e24f..b3bc7c218 100644
--- a/tests/artifactcache/artifactservice.py
+++ b/tests/artifactcache/artifactservice.py
@@ -17,9 +17,7 @@
# Authors: Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import os
-import signal
from urllib.parse import urlparse
-from multiprocessing import Process, Queue
import grpc
import pytest
@@ -30,128 +28,84 @@ from buildstream._protos.buildstream.v2.artifact_pb2_grpc import ArtifactService
from buildstream._protos.build.bazel.remote.execution.v2 \
import remote_execution_pb2 as re_pb2
from buildstream import utils
-from buildstream import _signals
from tests.testutils.artifactshare import create_artifact_share
-# Since parent processes wait for queue events, we need
-# to put something on it if the called process raises an
-# exception.
-def _queue_wrapper(target, queue, *args):
- try:
- target(*args, queue=queue)
- except Exception as e:
- queue.put(str(e))
- raise
-
-
+@pytest.mark.in_subprocess
def test_artifact_get_not_found(tmpdir):
sharedir = os.path.join(str(tmpdir), "share")
with create_artifact_share(sharedir) as share:
# set up artifact service stub
url = urlparse(share.repo)
- queue = Queue()
- process = Process(target=_queue_wrapper, args=(_artifact_request, queue, url))
+ channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
+ artifact_stub = ArtifactServiceStub(channel)
+
+ # Run GetArtifact and check it throws a not found error
+ request = GetArtifactRequest()
+ request.cache_key = "@artifact/something/not_there"
try:
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
-
-
-def _artifact_request(url, queue):
- channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
- artifact_stub = ArtifactServiceStub(channel)
-
- # Run GetArtifact and check it throws a not found error
- request = GetArtifactRequest()
- request.cache_key = "@artifact/something/not_there"
- try:
- artifact_stub.GetArtifact(request)
- except grpc.RpcError as e:
- assert e.code() == grpc.StatusCode.NOT_FOUND
- assert e.details() == "Artifact proto not found"
- queue.put(None)
- else:
- assert False
+ artifact_stub.GetArtifact(request)
+ except grpc.RpcError as e:
+ assert e.code() == grpc.StatusCode.NOT_FOUND
+ assert e.details() == "Artifact proto not found"
+ else:
+ assert False
# Successfully getting the artifact
+@pytest.mark.in_subprocess
@pytest.mark.parametrize("files", ["present", "absent", "invalid"])
def test_update_artifact(tmpdir, files):
sharedir = os.path.join(str(tmpdir), "share")
with create_artifact_share(sharedir, casd=True) as share:
- queue = Queue()
- process = Process(target=_queue_wrapper, args=(_update_artifact, queue, share, files))
+ # put files object
+ if files == "present":
+ directory = re_pb2.Directory()
+ digest = share.cas.add_object(buffer=directory.SerializeToString())
+ elif files == "invalid":
+ digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8"))
+ elif files == "absent":
+ digest = utils._message_digest("abcdefghijklmnop".encode("utf-8"))
- try:
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
-
-
-def _update_artifact(share, files, *, queue):
- # put files object
- if files == "present":
- directory = re_pb2.Directory()
- digest = share.cas.add_object(buffer=directory.SerializeToString())
- elif files == "invalid":
- digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8"))
- elif files == "absent":
- digest = utils._message_digest("abcdefghijklmnop".encode("utf-8"))
-
- url = urlparse(share.repo)
-
- channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
- artifact_stub = ArtifactServiceStub(channel)
-
- # initialise an artifact
- artifact = Artifact()
- artifact.version = 0
- artifact.build_success = True
- artifact.strong_key = "abcdefghijklmnop"
- artifact.files.hash = "hashymchashash"
- artifact.files.size_bytes = 10
-
- artifact.files.CopyFrom(digest)
-
- # Put it in the artifact share with an UpdateArtifactRequest
- request = UpdateArtifactRequest()
- request.artifact.CopyFrom(artifact)
- request.cache_key = "a-cache-key"
-
- # should return the same artifact back
- if files == "present":
- response = artifact_stub.UpdateArtifact(request)
+ url = urlparse(share.repo)
+
+ channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
+ artifact_stub = ArtifactServiceStub(channel)
+
+ # initialise an artifact
+ artifact = Artifact()
+ artifact.version = 0
+ artifact.build_success = True
+ artifact.strong_key = "abcdefghijklmnop"
+ artifact.files.hash = "hashymchashash"
+ artifact.files.size_bytes = 10
+
+ artifact.files.CopyFrom(digest)
+
+ # Put it in the artifact share with an UpdateArtifactRequest
+ request = UpdateArtifactRequest()
+ request.artifact.CopyFrom(artifact)
+ request.cache_key = "a-cache-key"
+
+ # should return the same artifact back
+ if files == "present":
+ response = artifact_stub.UpdateArtifact(request)
+ assert response == artifact
+ else:
+ try:
+ artifact_stub.UpdateArtifact(request)
+ except grpc.RpcError as e:
+ assert e.code() == grpc.StatusCode.FAILED_PRECONDITION
+ if files == "absent":
+ assert e.details() == "Artifact files specified but no files found"
+ elif files == "invalid":
+ assert e.details() == "Artifact files specified but directory not found"
+ return
+
+ # If we uploaded the artifact check GetArtifact
+ request = GetArtifactRequest()
+ request.cache_key = "a-cache-key"
+
+ response = artifact_stub.GetArtifact(request)
assert response == artifact
- else:
- try:
- artifact_stub.UpdateArtifact(request)
- except grpc.RpcError as e:
- assert e.code() == grpc.StatusCode.FAILED_PRECONDITION
- if files == "absent":
- assert e.details() == "Artifact files specified but no files found"
- elif files == "invalid":
- assert e.details() == "Artifact files specified but directory not found"
- queue.put(None)
- return
-
- # If we uploaded the artifact check GetArtifact
- request = GetArtifactRequest()
- request.cache_key = "a-cache-key"
-
- response = artifact_stub.GetArtifact(request)
- assert response == artifact
- queue.put(None)
diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py
index 6003cea41..71db3e338 100644
--- a/tests/artifactcache/pull.py
+++ b/tests/artifactcache/pull.py
@@ -1,13 +1,11 @@
# Pylint doesn't play well with fixtures and dependency injection from pytest
# pylint: disable=redefined-outer-name
-import multiprocessing
import os
-import signal
import pytest
-from buildstream import _yaml, _signals, utils
+from buildstream import _yaml
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream.testing import cli # pylint: disable=unused-import
@@ -22,17 +20,6 @@ DATA_DIR = os.path.join(
)
-# Since parent processes wait for queue events, we need
-# to put something on it if the called process raises an
-# exception.
-def _queue_wrapper(target, queue, *args):
- try:
- target(*args, queue=queue)
- except Exception as e:
- queue.put(str(e))
- raise
-
-
def tree_maker(cas, tree, directory):
if tree.root.ByteSize() == 0:
tree.root.CopyFrom(directory)
@@ -46,6 +33,7 @@ def tree_maker(cas, tree, directory):
tree_maker(cas, tree, child_directory)
+@pytest.mark.in_subprocess
@pytest.mark.datafiles(DATA_DIR)
def test_pull(cli, tmpdir, datafiles):
project_dir = str(datafiles)
@@ -96,58 +84,28 @@ def test_pull(cli, tmpdir, datafiles):
element_key = cli.get_element_key(project_dir, 'target.bst')
assert not cli.artifact.is_cached(cache_dir, element, element_key)
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_pull, queue, user_config_file, project_dir,
- cache_dir, 'target.bst', element_key))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
- assert cli.artifact.is_cached(cache_dir, element, element_key)
-
-
-def _test_pull(user_config_file, project_dir, cache_dir,
- element_name, element_key, queue):
- with dummy_context(config=user_config_file) as context:
- context.cachedir = cache_dir
- context.casdir = os.path.join(cache_dir, 'cas')
- context.tmpdir = os.path.join(cache_dir, 'tmp')
+ context.cachedir = cache_dir
+ context.casdir = os.path.join(cache_dir, 'cas')
+ context.tmpdir = os.path.join(cache_dir, 'tmp')
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
+ # Load the project manually
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
- # Create a local artifact cache handle
- artifactcache = context.artifactcache
+ # Create a local artifact cache handle
+ artifactcache = context.artifactcache
- # Load the target element
- element = project.load_elements([element_name])[0]
+ # Manually setup the CAS remote
+ artifactcache.setup_remotes(use_config=True)
- # Manually setup the CAS remote
- artifactcache.setup_remotes(use_config=True)
+ assert artifactcache.has_push_remotes(plugin=element), \
+ "No remote configured for element target.bst"
+ assert artifactcache.pull(element, element_key), "Pull operation failed"
- if artifactcache.has_push_remotes(plugin=element):
- # Push the element's artifact
- if not artifactcache.pull(element, element_key):
- queue.put("Pull operation failed")
- else:
- queue.put(None)
- else:
- queue.put("No remote configured for element {}".format(element_name))
+ assert cli.artifact.is_cached(cache_dir, element, element_key)
+@pytest.mark.in_subprocess
@pytest.mark.datafiles(DATA_DIR)
def test_pull_tree(cli, tmpdir, datafiles):
project_dir = str(datafiles)
@@ -196,76 +154,11 @@ def test_pull_tree(cli, tmpdir, datafiles):
# Retrieve the Directory object from the cached artifact
artifact_digest = cli.artifact.get_digest(rootcache_dir, element, element_key)
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_push_tree, queue, user_config_file, project_dir,
- artifact_digest))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- tree_hash, tree_size = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert tree_hash and tree_size
-
- # Now delete the artifact locally
- cli.remove_artifact_from_cache(project_dir, 'target.bst')
-
- # Assert that we are not cached locally anymore
- assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
-
- tree_digest = remote_execution_pb2.Digest(hash=tree_hash,
- size_bytes=tree_size)
-
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_pull_tree, queue, user_config_file, project_dir,
- tree_digest))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- directory_hash, directory_size = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- # Directory size now zero with AaaP and stack element commit #1cbc5e63dc
- assert directory_hash and not directory_size
-
- directory_digest = remote_execution_pb2.Digest(hash=directory_hash,
- size_bytes=directory_size)
-
- # Ensure the entire Tree stucture has been pulled
- assert os.path.exists(cas.objpath(directory_digest))
-
-
-def _test_push_tree(user_config_file, project_dir, artifact_digest, queue):
- with dummy_context(config=user_config_file) as context:
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
-
- # Create a local artifact cache and cas handle
- artifactcache = context.artifactcache
- cas = context.get_cascache()
-
- # Manually setup the CAS remote
- artifactcache.setup_remotes(use_config=True)
+ artifactcache = context.artifactcache
+ # Manually setup the CAS remote
+ artifactcache.setup_remotes(use_config=True)
+ assert artifactcache.has_push_remotes()
- if artifactcache.has_push_remotes():
directory = remote_execution_pb2.Directory()
with open(cas.objpath(artifact_digest), 'rb') as f:
@@ -277,27 +170,27 @@ def _test_push_tree(user_config_file, project_dir, artifact_digest, queue):
# Push the Tree as a regular message
tree_digest = artifactcache.push_message(project, tree)
+ tree_hash, tree_size = tree_digest.hash, tree_digest.size_bytes
+ assert tree_hash and tree_size
- queue.put((tree_digest.hash, tree_digest.size_bytes))
- else:
- queue.put("No remote configured")
+ # Now delete the artifact locally
+ cli.remove_artifact_from_cache(project_dir, 'target.bst')
+ # Assert that we are not cached locally anymore
+ assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
-def _test_pull_tree(user_config_file, project_dir, artifact_digest, queue):
- with dummy_context(config=user_config_file) as context:
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
+ tree_digest = remote_execution_pb2.Digest(hash=tree_hash,
+ size_bytes=tree_size)
- # Create a local artifact cache handle
- artifactcache = context.artifactcache
-
- # Manually setup the CAS remote
- artifactcache.setup_remotes(use_config=True)
-
- if artifactcache.has_push_remotes():
# Pull the artifact using the Tree object
directory_digest = artifactcache.pull_tree(project, artifact_digest)
- queue.put((directory_digest.hash, directory_digest.size_bytes))
- else:
- queue.put("No remote configured")
+ directory_hash, directory_size = directory_digest.hash, directory_digest.size_bytes
+
+ # Directory size now zero with AaaP and stack element commit #1cbc5e63dc
+ assert directory_hash and not directory_size
+
+ directory_digest = remote_execution_pb2.Digest(hash=directory_hash,
+ size_bytes=directory_size)
+
+ # Ensure the entire Tree stucture has been pulled
+ assert os.path.exists(cas.objpath(directory_digest))
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 81d75023d..20d9ccfec 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -1,13 +1,11 @@
# Pylint doesn't play well with fixtures and dependency injection from pytest
# pylint: disable=redefined-outer-name
-import multiprocessing
import os
-import signal
import pytest
-from buildstream import _yaml, _signals, utils, Scope
+from buildstream import _yaml, Scope
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream.testing import cli # pylint: disable=unused-import
@@ -22,17 +20,7 @@ DATA_DIR = os.path.join(
)
-# Since parent processes wait for queue events, we need
-# to put something on it if the called process raises an
-# exception.
-def _queue_wrapper(target, queue, *args):
- try:
- target(*args, queue=queue)
- except Exception as e:
- queue.put(str(e))
- raise
-
-
+@pytest.mark.in_subprocess
@pytest.mark.datafiles(DATA_DIR)
def test_push(cli, tmpdir, datafiles):
project_dir = str(datafiles)
@@ -73,61 +61,28 @@ def test_push(cli, tmpdir, datafiles):
element_key = cli.get_element_key(project_dir, 'target.bst')
assert cli.artifact.is_cached(rootcache_dir, element, element_key)
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_push, queue, user_config_file, project_dir,
- 'target.bst'))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
- assert share.has_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst', cache_key=element_key))
-
-
-def _test_push(user_config_file, project_dir, element_name, queue):
- with dummy_context(config=user_config_file) as context:
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
-
- # Create a local artifact cache handle
- artifactcache = context.artifactcache
+ # Create a local artifact cache handle
+ artifactcache = context.artifactcache
- # Load the target element
- element = project.load_elements([element_name])[0]
+ # Ensure the element's artifact memeber is initialised
+ # This is duplicated from Pipeline.resolve_elements()
+ # as this test does not use the cli frontend.
+ for e in element.dependencies(Scope.ALL):
+ # Determine initial element state.
+ e._update_state()
- # Ensure the element's artifact memeber is initialised
- # This is duplicated from Pipeline.resolve_elements()
- # as this test does not use the cli frontend.
- for e in element.dependencies(Scope.ALL):
- # Determine initial element state.
- e._update_state()
+ # Manually setup the CAS remotes
+ artifactcache.setup_remotes(use_config=True)
+ artifactcache.initialize_remotes()
- # Manually setup the CAS remotes
- artifactcache.setup_remotes(use_config=True)
- artifactcache.initialize_remotes()
+ assert artifactcache.has_push_remotes(plugin=element), \
+ "No remote configured for element target.bst"
+ assert element._push(), "Push operation failed"
- if artifactcache.has_push_remotes(plugin=element):
- # Push the element's artifact
- if not element._push():
- queue.put("Push operation failed")
- else:
- queue.put(None)
- else:
- queue.put("No remote configured for element {}".format(element_name))
+ assert share.has_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst', cache_key=element_key))
+@pytest.mark.in_subprocess
@pytest.mark.datafiles(DATA_DIR)
def test_push_message(tmpdir, datafiles):
project_dir = str(datafiles)
@@ -151,52 +106,28 @@ def test_push_message(tmpdir, datafiles):
# Write down the user configuration file
_yaml.roundtrip_dump(user_config, file=user_config_file)
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_push_message, queue, user_config_file,
- project_dir))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- message_hash, message_size = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert message_hash and message_size
- message_digest = remote_execution_pb2.Digest(hash=message_hash,
- size_bytes=message_size)
- assert share.has_object(message_digest)
-
-
-def _test_push_message(user_config_file, project_dir, queue):
- with dummy_context(config=user_config_file) as context:
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
+ with dummy_context(config=user_config_file) as context:
+ # Load the project manually
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
- # Create a local artifact cache handle
- artifactcache = context.artifactcache
+ # Create a local artifact cache handle
+ artifactcache = context.artifactcache
- # Manually setup the artifact remote
- artifactcache.setup_remotes(use_config=True)
- artifactcache.initialize_remotes()
+ # Manually setup the artifact remote
+ artifactcache.setup_remotes(use_config=True)
+ artifactcache.initialize_remotes()
+ assert artifactcache.has_push_remotes()
- if artifactcache.has_push_remotes():
- # Create an example message object
command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'],
working_directory='/buildstream-build',
output_directories=['/buildstream-install'])
# Push the message object
command_digest = artifactcache.push_message(project, command)
+ message_hash, message_size = command_digest.hash, command_digest.size_bytes
- queue.put((command_digest.hash, command_digest.size_bytes))
- else:
- queue.put("No remote configured")
+ assert message_hash and message_size
+ message_digest = remote_execution_pb2.Digest(hash=message_hash,
+ size_bytes=message_size)
+ assert share.has_object(message_digest)
diff --git a/tests/internals/storage.py b/tests/internals/storage.py
index 385162c13..d846e4b1f 100644
--- a/tests/internals/storage.py
+++ b/tests/internals/storage.py
@@ -1,11 +1,8 @@
from contextlib import contextmanager
-import multiprocessing
import os
-import signal
import pytest
-from buildstream import utils, _signals
from buildstream._cas import CASCache
from buildstream.storage._casbaseddirectory import CasBasedDirectory
from buildstream.storage._filebaseddirectory import FileBasedDirectory
@@ -16,19 +13,6 @@ DATA_DIR = os.path.join(
)
-# Since parent processes wait for queue events, we need
-# to put something on it if the called process raises an
-# exception.
-def _queue_wrapper(target, queue, *args):
- try:
- target(*args)
- except Exception as e:
- queue.put(str(e))
- raise
-
- queue.put(None)
-
-
@contextmanager
def setup_backend(backend_class, tmpdir):
if backend_class == FileBasedDirectory:
@@ -41,7 +25,11 @@ def setup_backend(backend_class, tmpdir):
cas_cache.release_resources()
-def _test_import_subprocess(tmpdir, datafiles, backend):
+@pytest.mark.in_subprocess
+@pytest.mark.parametrize("backend", [
+ FileBasedDirectory, CasBasedDirectory])
+@pytest.mark.datafiles(DATA_DIR)
+def test_import(tmpdir, datafiles, backend):
original = os.path.join(str(datafiles), "original")
with setup_backend(backend, str(tmpdir)) as c:
@@ -51,27 +39,11 @@ def _test_import_subprocess(tmpdir, datafiles, backend):
assert "bin/hello" in c.list_relative_paths()
+@pytest.mark.in_subprocess
@pytest.mark.parametrize("backend", [
FileBasedDirectory, CasBasedDirectory])
@pytest.mark.datafiles(DATA_DIR)
-def test_import(tmpdir, datafiles, backend):
- queue = multiprocessing.Queue()
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_import_subprocess, queue,
- tmpdir, datafiles, backend))
- try:
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
-
-
-def _test_modified_file_list_subprocess(tmpdir, datafiles, backend):
+def test_modified_file_list(tmpdir, datafiles, backend):
original = os.path.join(str(datafiles), "original")
overlay = os.path.join(str(datafiles), "overlay")
@@ -86,23 +58,3 @@ def _test_modified_file_list_subprocess(tmpdir, datafiles, backend):
assert "bin/bash" in c.list_relative_paths()
assert "bin/bash" in c.list_modified_paths()
assert "bin/hello" not in c.list_modified_paths()
-
-
-@pytest.mark.parametrize("backend", [
- FileBasedDirectory, CasBasedDirectory])
-@pytest.mark.datafiles(DATA_DIR)
-def test_modified_file_list(tmpdir, datafiles, backend):
- queue = multiprocessing.Queue()
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_modified_file_list_subprocess, queue,
- tmpdir, datafiles, backend))
- try:
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
diff --git a/tests/internals/storage_vdir_import.py b/tests/internals/storage_vdir_import.py
index e0165fc13..4599d4d5d 100644
--- a/tests/internals/storage_vdir_import.py
+++ b/tests/internals/storage_vdir_import.py
@@ -14,14 +14,11 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from hashlib import sha256
-import multiprocessing
import os
import random
-import signal
import pytest
-from buildstream import utils, _signals
from buildstream.storage._casbaseddirectory import CasBasedDirectory
from buildstream.storage._filebaseddirectory import FileBasedDirectory
from buildstream._cas import CASCache
@@ -58,34 +55,6 @@ RANDOM_SEED = 69105
NUM_RANDOM_TESTS = 10
-# Since parent processes wait for queue events, we need
-# to put something on it if the called process raises an
-# exception.
-def _queue_wrapper(target, queue, *args):
- try:
- target(*args)
- except Exception as e:
- queue.put(str(e))
- raise
-
- queue.put(None)
-
-
-def _run_test_in_subprocess(func, *args):
- queue = multiprocessing.Queue()
- process = multiprocessing.Process(target=_queue_wrapper, args=(func, queue, *args))
- try:
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
-
-
def generate_import_roots(rootno, directory):
rootname = "root{}".format(rootno)
rootdir = os.path.join(directory, "content", rootname)
@@ -213,7 +182,7 @@ def directory_not_empty(path):
return os.listdir(path)
-def _import_test_subprocess(tmpdir, original, overlay, generator_function, verify_contents=False):
+def _import_test(tmpdir, original, overlay, generator_function, verify_contents=False):
cas_cache = CASCache(tmpdir)
try:
# Create some fake content
@@ -269,26 +238,21 @@ def _import_test_subprocess(tmpdir, original, overlay, generator_function, verif
cas_cache.release_resources()
-def _import_test(tmpdir, original, overlay, generator_function, verify_contents=False):
- _run_test_in_subprocess(_import_test_subprocess, tmpdir, original, overlay, generator_function, verify_contents)
-
-
-# It's possible to parameterize on both original and overlay values,
-# but this leads to more tests being listed in the output than are
-# comfortable.
+@pytest.mark.in_subprocess
@pytest.mark.parametrize("original", range(1, len(root_filesets) + 1))
-def test_fixed_cas_import(tmpdir, original):
- for overlay in range(1, len(root_filesets) + 1):
- _import_test(str(tmpdir), original, overlay, generate_import_roots, verify_contents=True)
+@pytest.mark.parametrize("overlay", range(1, len(root_filesets) + 1))
+def test_fixed_cas_import(tmpdir, original, overlay):
+ _import_test(str(tmpdir), original, overlay, generate_import_roots, verify_contents=True)
+@pytest.mark.in_subprocess
@pytest.mark.parametrize("original", range(1, NUM_RANDOM_TESTS + 1))
-def test_random_cas_import(tmpdir, original):
- for overlay in range(1, NUM_RANDOM_TESTS + 1):
- _import_test(str(tmpdir), original, overlay, generate_random_root, verify_contents=False)
+@pytest.mark.parametrize("overlay", range(1, NUM_RANDOM_TESTS + 1))
+def test_random_cas_import(tmpdir, original, overlay):
+ _import_test(str(tmpdir), original, overlay, generate_random_root, verify_contents=False)
-def _listing_test_subprocess(tmpdir, root, generator_function):
+def _listing_test(tmpdir, root, generator_function):
cas_cache = CASCache(tmpdir)
try:
# Create some fake content
@@ -305,22 +269,21 @@ def _listing_test_subprocess(tmpdir, root, generator_function):
cas_cache.release_resources()
-def _listing_test(tmpdir, root, generator_function):
- _run_test_in_subprocess(_listing_test_subprocess, tmpdir, root, generator_function)
-
-
@pytest.mark.parametrize("root", range(1, 11))
+@pytest.mark.in_subprocess
def test_random_directory_listing(tmpdir, root):
_listing_test(str(tmpdir), root, generate_random_root)
@pytest.mark.parametrize("root", [1, 2, 3, 4, 5])
+@pytest.mark.in_subprocess
def test_fixed_directory_listing(tmpdir, root):
_listing_test(str(tmpdir), root, generate_import_roots)
# Check that the vdir is decending and readable
-def _test_descend_subprocess(tmpdir):
+@pytest.mark.in_subprocess
+def test_descend(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
try:
@@ -343,14 +306,11 @@ def _test_descend_subprocess(tmpdir):
cas_cache.release_resources()
-def test_descend(tmpdir):
- _run_test_in_subprocess(_test_descend_subprocess, tmpdir)
-
-
# Check symlink logic for edgecases
# Make sure the correct erros are raised when trying
# to decend in to files or links to files
-def _test_bad_symlinks_subprocess(tmpdir):
+@pytest.mark.in_subprocess
+def test_bad_symlinks(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
try:
@@ -381,13 +341,10 @@ def _test_bad_symlinks_subprocess(tmpdir):
cas_cache.release_resources()
-def test_bad_symlinks(tmpdir):
- _run_test_in_subprocess(_test_bad_symlinks_subprocess, tmpdir)
-
-
# Check symlink logic for edgecases
# Check decend accross relitive link
-def _test_relative_symlink_subprocess(tmpdir):
+@pytest.mark.in_subprocess
+def test_relative_symlink(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
try:
@@ -410,13 +367,10 @@ def _test_relative_symlink_subprocess(tmpdir):
cas_cache.release_resources()
-def test_relative_symlink(tmpdir):
- _run_test_in_subprocess(_test_relative_symlink_subprocess, tmpdir)
-
-
# Check symlink logic for edgecases
# Check deccend accross abs link
-def _test_abs_symlink_subprocess(tmpdir):
+@pytest.mark.in_subprocess
+def test_abs_symlink(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
try:
@@ -440,13 +394,10 @@ def _test_abs_symlink_subprocess(tmpdir):
cas_cache.release_resources()
-def test_abs_symlink(tmpdir):
- _run_test_in_subprocess(_test_abs_symlink_subprocess, tmpdir)
-
-
# Check symlink logic for edgecases
# Check symlink can not escape root
-def _test_bad_sym_escape_subprocess(tmpdir):
+@pytest.mark.in_subprocess
+def test_bad_sym_escape(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
try:
@@ -468,7 +419,3 @@ def _test_bad_sym_escape_subprocess(tmpdir):
assert error.reason == "directory-not-found"
finally:
cas_cache.release_resources()
-
-
-def test_bad_sym_escape(tmpdir):
- _run_test_in_subprocess(_test_bad_sym_escape_subprocess, tmpdir)