summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-08-30 10:51:17 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-08-30 10:51:17 +0000
commitf3baddd706c90d0e9406eee30bf4304ad3e53ea3 (patch)
tree1e601d5f141396ed0bad655da216511a3994c5d8
parentaa9bc230681486fea5f7d23e35bcfd62ad673310 (diff)
parentec8b3130fd17fd80369c8e04351e5f7779b0c538 (diff)
downloadbuildstream-f3baddd706c90d0e9406eee30bf4304ad3e53ea3.tar.gz
Merge branch 'tmewett/test-in-subprocess' into 'master'
Add in_subprocess pytest mark and modify tests which run in another process to use it Closes #1108 See merge request BuildStream/buildstream!1557
-rw-r--r--setup.cfg4
-rw-r--r--src/buildstream/testing/_forked.py94
-rw-r--r--tests/artifactcache/artifactservice.py170
-rw-r--r--tests/artifactcache/pull.py185
-rw-r--r--tests/artifactcache/push.py135
-rwxr-xr-xtests/conftest.py22
-rw-r--r--tests/internals/storage.py62
-rw-r--r--tests/internals/storage_vdir_import.py110
8 files changed, 287 insertions, 495 deletions
diff --git a/setup.cfg b/setup.cfg
index 06dcd8fd4..4a0dfb54d 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -20,6 +20,10 @@ env =
D:XDG_CACHE_HOME=./tmp/cache
D:XDG_CONFIG_HOME=./tmp/config
D:XDG_DATA_HOME=./tmp/share
+markers =
+ integration: run test only if --integration option is specified
+ remoteexecution: run test only if --remote-execution option is specified
+ in_subprocess: run test in a Python process forked from the main one
[pycodestyle]
max-line-length = 119
diff --git a/src/buildstream/testing/_forked.py b/src/buildstream/testing/_forked.py
new file mode 100644
index 000000000..af5e9c070
--- /dev/null
+++ b/src/buildstream/testing/_forked.py
@@ -0,0 +1,94 @@
+# This code was based on pytest-forked, commit 6098c1, found here:
+# <https://github.com/pytest-dev/pytest-forked>
+# Its copyright notice is included below.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+import os
+import marshal
+
+import py
+import pytest
+# XXX Using pytest private internals here
+from _pytest import runner
+
+EXITSTATUS_TESTEXIT = 4
+
+
+# copied from xdist remote
+def serialize_report(rep):
+ d = rep.__dict__.copy()
+ if hasattr(rep.longrepr, 'toterminal'):
+ d['longrepr'] = str(rep.longrepr)
+ else:
+ d['longrepr'] = rep.longrepr
+ for name in d:
+ if isinstance(d[name], py.path.local): # pylint: disable=no-member
+ d[name] = str(d[name])
+ elif name == "result":
+ d[name] = None # for now
+ return d
+
+
+def forked_run_report(item):
+ def runforked():
+ try:
+ reports = runner.runtestprotocol(item, log=False)
+ except KeyboardInterrupt:
+ os._exit(EXITSTATUS_TESTEXIT)
+ return marshal.dumps([serialize_report(x) for x in reports])
+
+ ff = py.process.ForkedFunc(runforked) # pylint: disable=no-member
+ result = ff.waitfinish()
+ if result.retval is not None:
+ report_dumps = marshal.loads(result.retval)
+ return [runner.TestReport(**x) for x in report_dumps]
+ else:
+ if result.exitstatus == EXITSTATUS_TESTEXIT:
+ pytest.exit("forked test item %s raised Exit" % (item,))
+ return [report_process_crash(item, result)]
+
+
+def report_process_crash(item, result):
+ try:
+ from _pytest.compat import getfslineno
+ except ImportError:
+ # pytest<4.2
+ path, lineno = item._getfslineno()
+ else:
+ path, lineno = getfslineno(item)
+ info = ("%s:%s: running the test CRASHED with signal %d" %
+ (path, lineno, result.signal))
+
+ # We need to create a CallInfo instance that is pre-initialised to contain
+ # info about an exception. We do this by using a function which does
+ # 0/0. Also, the API varies between pytest versions.
+ has_from_call = getattr(runner.CallInfo, "from_call", None) is not None
+ if has_from_call: # pytest >= 4.1
+ call = runner.CallInfo.from_call(lambda: 0 / 0, "???")
+ else:
+ call = runner.CallInfo(lambda: 0 / 0, "???")
+ call.excinfo = info
+
+ rep = runner.pytest_runtest_makereport(item, call)
+ if result.out:
+ rep.sections.append(("captured stdout", result.out))
+ if result.err:
+ rep.sections.append(("captured stderr", result.err))
+ return rep
diff --git a/tests/artifactcache/artifactservice.py b/tests/artifactcache/artifactservice.py
index 3d229e24f..b3bc7c218 100644
--- a/tests/artifactcache/artifactservice.py
+++ b/tests/artifactcache/artifactservice.py
@@ -17,9 +17,7 @@
# Authors: Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import os
-import signal
from urllib.parse import urlparse
-from multiprocessing import Process, Queue
import grpc
import pytest
@@ -30,128 +28,84 @@ from buildstream._protos.buildstream.v2.artifact_pb2_grpc import ArtifactService
from buildstream._protos.build.bazel.remote.execution.v2 \
import remote_execution_pb2 as re_pb2
from buildstream import utils
-from buildstream import _signals
from tests.testutils.artifactshare import create_artifact_share
-# Since parent processes wait for queue events, we need
-# to put something on it if the called process raises an
-# exception.
-def _queue_wrapper(target, queue, *args):
- try:
- target(*args, queue=queue)
- except Exception as e:
- queue.put(str(e))
- raise
-
-
+@pytest.mark.in_subprocess
def test_artifact_get_not_found(tmpdir):
sharedir = os.path.join(str(tmpdir), "share")
with create_artifact_share(sharedir) as share:
# set up artifact service stub
url = urlparse(share.repo)
- queue = Queue()
- process = Process(target=_queue_wrapper, args=(_artifact_request, queue, url))
+ channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
+ artifact_stub = ArtifactServiceStub(channel)
+
+ # Run GetArtifact and check it throws a not found error
+ request = GetArtifactRequest()
+ request.cache_key = "@artifact/something/not_there"
try:
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
-
-
-def _artifact_request(url, queue):
- channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
- artifact_stub = ArtifactServiceStub(channel)
-
- # Run GetArtifact and check it throws a not found error
- request = GetArtifactRequest()
- request.cache_key = "@artifact/something/not_there"
- try:
- artifact_stub.GetArtifact(request)
- except grpc.RpcError as e:
- assert e.code() == grpc.StatusCode.NOT_FOUND
- assert e.details() == "Artifact proto not found"
- queue.put(None)
- else:
- assert False
+ artifact_stub.GetArtifact(request)
+ except grpc.RpcError as e:
+ assert e.code() == grpc.StatusCode.NOT_FOUND
+ assert e.details() == "Artifact proto not found"
+ else:
+ assert False
# Successfully getting the artifact
+@pytest.mark.in_subprocess
@pytest.mark.parametrize("files", ["present", "absent", "invalid"])
def test_update_artifact(tmpdir, files):
sharedir = os.path.join(str(tmpdir), "share")
with create_artifact_share(sharedir, casd=True) as share:
- queue = Queue()
- process = Process(target=_queue_wrapper, args=(_update_artifact, queue, share, files))
+ # put files object
+ if files == "present":
+ directory = re_pb2.Directory()
+ digest = share.cas.add_object(buffer=directory.SerializeToString())
+ elif files == "invalid":
+ digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8"))
+ elif files == "absent":
+ digest = utils._message_digest("abcdefghijklmnop".encode("utf-8"))
- try:
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
-
-
-def _update_artifact(share, files, *, queue):
- # put files object
- if files == "present":
- directory = re_pb2.Directory()
- digest = share.cas.add_object(buffer=directory.SerializeToString())
- elif files == "invalid":
- digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8"))
- elif files == "absent":
- digest = utils._message_digest("abcdefghijklmnop".encode("utf-8"))
-
- url = urlparse(share.repo)
-
- channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
- artifact_stub = ArtifactServiceStub(channel)
-
- # initialise an artifact
- artifact = Artifact()
- artifact.version = 0
- artifact.build_success = True
- artifact.strong_key = "abcdefghijklmnop"
- artifact.files.hash = "hashymchashash"
- artifact.files.size_bytes = 10
-
- artifact.files.CopyFrom(digest)
-
- # Put it in the artifact share with an UpdateArtifactRequest
- request = UpdateArtifactRequest()
- request.artifact.CopyFrom(artifact)
- request.cache_key = "a-cache-key"
-
- # should return the same artifact back
- if files == "present":
- response = artifact_stub.UpdateArtifact(request)
+ url = urlparse(share.repo)
+
+ channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
+ artifact_stub = ArtifactServiceStub(channel)
+
+ # initialise an artifact
+ artifact = Artifact()
+ artifact.version = 0
+ artifact.build_success = True
+ artifact.strong_key = "abcdefghijklmnop"
+ artifact.files.hash = "hashymchashash"
+ artifact.files.size_bytes = 10
+
+ artifact.files.CopyFrom(digest)
+
+ # Put it in the artifact share with an UpdateArtifactRequest
+ request = UpdateArtifactRequest()
+ request.artifact.CopyFrom(artifact)
+ request.cache_key = "a-cache-key"
+
+ # should return the same artifact back
+ if files == "present":
+ response = artifact_stub.UpdateArtifact(request)
+ assert response == artifact
+ else:
+ try:
+ artifact_stub.UpdateArtifact(request)
+ except grpc.RpcError as e:
+ assert e.code() == grpc.StatusCode.FAILED_PRECONDITION
+ if files == "absent":
+ assert e.details() == "Artifact files specified but no files found"
+ elif files == "invalid":
+ assert e.details() == "Artifact files specified but directory not found"
+ return
+
+ # If we uploaded the artifact check GetArtifact
+ request = GetArtifactRequest()
+ request.cache_key = "a-cache-key"
+
+ response = artifact_stub.GetArtifact(request)
assert response == artifact
- else:
- try:
- artifact_stub.UpdateArtifact(request)
- except grpc.RpcError as e:
- assert e.code() == grpc.StatusCode.FAILED_PRECONDITION
- if files == "absent":
- assert e.details() == "Artifact files specified but no files found"
- elif files == "invalid":
- assert e.details() == "Artifact files specified but directory not found"
- queue.put(None)
- return
-
- # If we uploaded the artifact check GetArtifact
- request = GetArtifactRequest()
- request.cache_key = "a-cache-key"
-
- response = artifact_stub.GetArtifact(request)
- assert response == artifact
- queue.put(None)
diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py
index 6003cea41..71db3e338 100644
--- a/tests/artifactcache/pull.py
+++ b/tests/artifactcache/pull.py
@@ -1,13 +1,11 @@
# Pylint doesn't play well with fixtures and dependency injection from pytest
# pylint: disable=redefined-outer-name
-import multiprocessing
import os
-import signal
import pytest
-from buildstream import _yaml, _signals, utils
+from buildstream import _yaml
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream.testing import cli # pylint: disable=unused-import
@@ -22,17 +20,6 @@ DATA_DIR = os.path.join(
)
-# Since parent processes wait for queue events, we need
-# to put something on it if the called process raises an
-# exception.
-def _queue_wrapper(target, queue, *args):
- try:
- target(*args, queue=queue)
- except Exception as e:
- queue.put(str(e))
- raise
-
-
def tree_maker(cas, tree, directory):
if tree.root.ByteSize() == 0:
tree.root.CopyFrom(directory)
@@ -46,6 +33,7 @@ def tree_maker(cas, tree, directory):
tree_maker(cas, tree, child_directory)
+@pytest.mark.in_subprocess
@pytest.mark.datafiles(DATA_DIR)
def test_pull(cli, tmpdir, datafiles):
project_dir = str(datafiles)
@@ -96,58 +84,28 @@ def test_pull(cli, tmpdir, datafiles):
element_key = cli.get_element_key(project_dir, 'target.bst')
assert not cli.artifact.is_cached(cache_dir, element, element_key)
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_pull, queue, user_config_file, project_dir,
- cache_dir, 'target.bst', element_key))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
- assert cli.artifact.is_cached(cache_dir, element, element_key)
-
-
-def _test_pull(user_config_file, project_dir, cache_dir,
- element_name, element_key, queue):
- with dummy_context(config=user_config_file) as context:
- context.cachedir = cache_dir
- context.casdir = os.path.join(cache_dir, 'cas')
- context.tmpdir = os.path.join(cache_dir, 'tmp')
+ context.cachedir = cache_dir
+ context.casdir = os.path.join(cache_dir, 'cas')
+ context.tmpdir = os.path.join(cache_dir, 'tmp')
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
+ # Load the project manually
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
- # Create a local artifact cache handle
- artifactcache = context.artifactcache
+ # Create a local artifact cache handle
+ artifactcache = context.artifactcache
- # Load the target element
- element = project.load_elements([element_name])[0]
+ # Manually setup the CAS remote
+ artifactcache.setup_remotes(use_config=True)
- # Manually setup the CAS remote
- artifactcache.setup_remotes(use_config=True)
+ assert artifactcache.has_push_remotes(plugin=element), \
+ "No remote configured for element target.bst"
+ assert artifactcache.pull(element, element_key), "Pull operation failed"
- if artifactcache.has_push_remotes(plugin=element):
- # Push the element's artifact
- if not artifactcache.pull(element, element_key):
- queue.put("Pull operation failed")
- else:
- queue.put(None)
- else:
- queue.put("No remote configured for element {}".format(element_name))
+ assert cli.artifact.is_cached(cache_dir, element, element_key)
+@pytest.mark.in_subprocess
@pytest.mark.datafiles(DATA_DIR)
def test_pull_tree(cli, tmpdir, datafiles):
project_dir = str(datafiles)
@@ -196,76 +154,11 @@ def test_pull_tree(cli, tmpdir, datafiles):
# Retrieve the Directory object from the cached artifact
artifact_digest = cli.artifact.get_digest(rootcache_dir, element, element_key)
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_push_tree, queue, user_config_file, project_dir,
- artifact_digest))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- tree_hash, tree_size = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert tree_hash and tree_size
-
- # Now delete the artifact locally
- cli.remove_artifact_from_cache(project_dir, 'target.bst')
-
- # Assert that we are not cached locally anymore
- assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
-
- tree_digest = remote_execution_pb2.Digest(hash=tree_hash,
- size_bytes=tree_size)
-
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_pull_tree, queue, user_config_file, project_dir,
- tree_digest))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- directory_hash, directory_size = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- # Directory size now zero with AaaP and stack element commit #1cbc5e63dc
- assert directory_hash and not directory_size
-
- directory_digest = remote_execution_pb2.Digest(hash=directory_hash,
- size_bytes=directory_size)
-
- # Ensure the entire Tree stucture has been pulled
- assert os.path.exists(cas.objpath(directory_digest))
-
-
-def _test_push_tree(user_config_file, project_dir, artifact_digest, queue):
- with dummy_context(config=user_config_file) as context:
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
-
- # Create a local artifact cache and cas handle
- artifactcache = context.artifactcache
- cas = context.get_cascache()
-
- # Manually setup the CAS remote
- artifactcache.setup_remotes(use_config=True)
+ artifactcache = context.artifactcache
+ # Manually setup the CAS remote
+ artifactcache.setup_remotes(use_config=True)
+ assert artifactcache.has_push_remotes()
- if artifactcache.has_push_remotes():
directory = remote_execution_pb2.Directory()
with open(cas.objpath(artifact_digest), 'rb') as f:
@@ -277,27 +170,27 @@ def _test_push_tree(user_config_file, project_dir, artifact_digest, queue):
# Push the Tree as a regular message
tree_digest = artifactcache.push_message(project, tree)
+ tree_hash, tree_size = tree_digest.hash, tree_digest.size_bytes
+ assert tree_hash and tree_size
- queue.put((tree_digest.hash, tree_digest.size_bytes))
- else:
- queue.put("No remote configured")
+ # Now delete the artifact locally
+ cli.remove_artifact_from_cache(project_dir, 'target.bst')
+ # Assert that we are not cached locally anymore
+ assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
-def _test_pull_tree(user_config_file, project_dir, artifact_digest, queue):
- with dummy_context(config=user_config_file) as context:
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
+ tree_digest = remote_execution_pb2.Digest(hash=tree_hash,
+ size_bytes=tree_size)
- # Create a local artifact cache handle
- artifactcache = context.artifactcache
-
- # Manually setup the CAS remote
- artifactcache.setup_remotes(use_config=True)
-
- if artifactcache.has_push_remotes():
# Pull the artifact using the Tree object
directory_digest = artifactcache.pull_tree(project, artifact_digest)
- queue.put((directory_digest.hash, directory_digest.size_bytes))
- else:
- queue.put("No remote configured")
+ directory_hash, directory_size = directory_digest.hash, directory_digest.size_bytes
+
+ # Directory size now zero with AaaP and stack element commit #1cbc5e63dc
+ assert directory_hash and not directory_size
+
+ directory_digest = remote_execution_pb2.Digest(hash=directory_hash,
+ size_bytes=directory_size)
+
+ # Ensure the entire Tree stucture has been pulled
+ assert os.path.exists(cas.objpath(directory_digest))
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 81d75023d..20d9ccfec 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -1,13 +1,11 @@
# Pylint doesn't play well with fixtures and dependency injection from pytest
# pylint: disable=redefined-outer-name
-import multiprocessing
import os
-import signal
import pytest
-from buildstream import _yaml, _signals, utils, Scope
+from buildstream import _yaml, Scope
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream.testing import cli # pylint: disable=unused-import
@@ -22,17 +20,7 @@ DATA_DIR = os.path.join(
)
-# Since parent processes wait for queue events, we need
-# to put something on it if the called process raises an
-# exception.
-def _queue_wrapper(target, queue, *args):
- try:
- target(*args, queue=queue)
- except Exception as e:
- queue.put(str(e))
- raise
-
-
+@pytest.mark.in_subprocess
@pytest.mark.datafiles(DATA_DIR)
def test_push(cli, tmpdir, datafiles):
project_dir = str(datafiles)
@@ -73,61 +61,28 @@ def test_push(cli, tmpdir, datafiles):
element_key = cli.get_element_key(project_dir, 'target.bst')
assert cli.artifact.is_cached(rootcache_dir, element, element_key)
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_push, queue, user_config_file, project_dir,
- 'target.bst'))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
- assert share.has_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst', cache_key=element_key))
-
-
-def _test_push(user_config_file, project_dir, element_name, queue):
- with dummy_context(config=user_config_file) as context:
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
-
- # Create a local artifact cache handle
- artifactcache = context.artifactcache
+ # Create a local artifact cache handle
+ artifactcache = context.artifactcache
- # Load the target element
- element = project.load_elements([element_name])[0]
+ # Ensure the element's artifact memeber is initialised
+ # This is duplicated from Pipeline.resolve_elements()
+ # as this test does not use the cli frontend.
+ for e in element.dependencies(Scope.ALL):
+ # Determine initial element state.
+ e._update_state()
- # Ensure the element's artifact memeber is initialised
- # This is duplicated from Pipeline.resolve_elements()
- # as this test does not use the cli frontend.
- for e in element.dependencies(Scope.ALL):
- # Determine initial element state.
- e._update_state()
+ # Manually setup the CAS remotes
+ artifactcache.setup_remotes(use_config=True)
+ artifactcache.initialize_remotes()
- # Manually setup the CAS remotes
- artifactcache.setup_remotes(use_config=True)
- artifactcache.initialize_remotes()
+ assert artifactcache.has_push_remotes(plugin=element), \
+ "No remote configured for element target.bst"
+ assert element._push(), "Push operation failed"
- if artifactcache.has_push_remotes(plugin=element):
- # Push the element's artifact
- if not element._push():
- queue.put("Push operation failed")
- else:
- queue.put(None)
- else:
- queue.put("No remote configured for element {}".format(element_name))
+ assert share.has_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst', cache_key=element_key))
+@pytest.mark.in_subprocess
@pytest.mark.datafiles(DATA_DIR)
def test_push_message(tmpdir, datafiles):
project_dir = str(datafiles)
@@ -151,52 +106,28 @@ def test_push_message(tmpdir, datafiles):
# Write down the user configuration file
_yaml.roundtrip_dump(user_config, file=user_config_file)
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_push_message, queue, user_config_file,
- project_dir))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- message_hash, message_size = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert message_hash and message_size
- message_digest = remote_execution_pb2.Digest(hash=message_hash,
- size_bytes=message_size)
- assert share.has_object(message_digest)
-
-
-def _test_push_message(user_config_file, project_dir, queue):
- with dummy_context(config=user_config_file) as context:
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
+ with dummy_context(config=user_config_file) as context:
+ # Load the project manually
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
- # Create a local artifact cache handle
- artifactcache = context.artifactcache
+ # Create a local artifact cache handle
+ artifactcache = context.artifactcache
- # Manually setup the artifact remote
- artifactcache.setup_remotes(use_config=True)
- artifactcache.initialize_remotes()
+ # Manually setup the artifact remote
+ artifactcache.setup_remotes(use_config=True)
+ artifactcache.initialize_remotes()
+ assert artifactcache.has_push_remotes()
- if artifactcache.has_push_remotes():
- # Create an example message object
command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'],
working_directory='/buildstream-build',
output_directories=['/buildstream-install'])
# Push the message object
command_digest = artifactcache.push_message(project, command)
+ message_hash, message_size = command_digest.hash, command_digest.size_bytes
- queue.put((command_digest.hash, command_digest.size_bytes))
- else:
- queue.put("No remote configured")
+ assert message_hash and message_size
+ message_digest = remote_execution_pb2.Digest(hash=message_hash,
+ size_bytes=message_size)
+ assert share.has_object(message_digest)
diff --git a/tests/conftest.py b/tests/conftest.py
index d6b0b02e0..7728fb5c8 100755
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -23,6 +23,7 @@ import os
import pytest
from buildstream.testing import register_repo_kind, sourcetests_collection_hook
+from buildstream.testing._forked import forked_run_report
from buildstream.testing.integration import integration_cache # pylint: disable=unused-import
@@ -68,6 +69,27 @@ def pytest_runtest_setup(item):
#################################################
+# in_subprocess mark #
+#################################################
+#
+# Various issues can occur when forking the Python process and using gRPC,
+# due to its multithreading. As BuildStream forks for parallelisation, gRPC
+# features are restricted to child processes, so tests using them must also
+# run as child processes. The in_subprocess mark handles this.
+# See <https://github.com/grpc/grpc/blob/master/doc/fork_support.md>.
+#
+@pytest.mark.tryfirst
+def pytest_runtest_protocol(item):
+ if item.get_closest_marker('in_subprocess') is not None:
+ reports = forked_run_report(item)
+ for rep in reports:
+ item.ihook.pytest_runtest_logreport(report=rep)
+ return True
+ else:
+ return None
+
+
+#################################################
# remote_services fixture #
#################################################
#
diff --git a/tests/internals/storage.py b/tests/internals/storage.py
index 385162c13..d846e4b1f 100644
--- a/tests/internals/storage.py
+++ b/tests/internals/storage.py
@@ -1,11 +1,8 @@
from contextlib import contextmanager
-import multiprocessing
import os
-import signal
import pytest
-from buildstream import utils, _signals
from buildstream._cas import CASCache
from buildstream.storage._casbaseddirectory import CasBasedDirectory
from buildstream.storage._filebaseddirectory import FileBasedDirectory
@@ -16,19 +13,6 @@ DATA_DIR = os.path.join(
)
-# Since parent processes wait for queue events, we need
-# to put something on it if the called process raises an
-# exception.
-def _queue_wrapper(target, queue, *args):
- try:
- target(*args)
- except Exception as e:
- queue.put(str(e))
- raise
-
- queue.put(None)
-
-
@contextmanager
def setup_backend(backend_class, tmpdir):
if backend_class == FileBasedDirectory:
@@ -41,7 +25,11 @@ def setup_backend(backend_class, tmpdir):
cas_cache.release_resources()
-def _test_import_subprocess(tmpdir, datafiles, backend):
+@pytest.mark.in_subprocess
+@pytest.mark.parametrize("backend", [
+ FileBasedDirectory, CasBasedDirectory])
+@pytest.mark.datafiles(DATA_DIR)
+def test_import(tmpdir, datafiles, backend):
original = os.path.join(str(datafiles), "original")
with setup_backend(backend, str(tmpdir)) as c:
@@ -51,27 +39,11 @@ def _test_import_subprocess(tmpdir, datafiles, backend):
assert "bin/hello" in c.list_relative_paths()
+@pytest.mark.in_subprocess
@pytest.mark.parametrize("backend", [
FileBasedDirectory, CasBasedDirectory])
@pytest.mark.datafiles(DATA_DIR)
-def test_import(tmpdir, datafiles, backend):
- queue = multiprocessing.Queue()
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_import_subprocess, queue,
- tmpdir, datafiles, backend))
- try:
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
-
-
-def _test_modified_file_list_subprocess(tmpdir, datafiles, backend):
+def test_modified_file_list(tmpdir, datafiles, backend):
original = os.path.join(str(datafiles), "original")
overlay = os.path.join(str(datafiles), "overlay")
@@ -86,23 +58,3 @@ def _test_modified_file_list_subprocess(tmpdir, datafiles, backend):
assert "bin/bash" in c.list_relative_paths()
assert "bin/bash" in c.list_modified_paths()
assert "bin/hello" not in c.list_modified_paths()
-
-
-@pytest.mark.parametrize("backend", [
- FileBasedDirectory, CasBasedDirectory])
-@pytest.mark.datafiles(DATA_DIR)
-def test_modified_file_list(tmpdir, datafiles, backend):
- queue = multiprocessing.Queue()
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_modified_file_list_subprocess, queue,
- tmpdir, datafiles, backend))
- try:
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
diff --git a/tests/internals/storage_vdir_import.py b/tests/internals/storage_vdir_import.py
index e0165fc13..8fbf4142f 100644
--- a/tests/internals/storage_vdir_import.py
+++ b/tests/internals/storage_vdir_import.py
@@ -14,14 +14,11 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from hashlib import sha256
-import multiprocessing
import os
import random
-import signal
import pytest
-from buildstream import utils, _signals
from buildstream.storage._casbaseddirectory import CasBasedDirectory
from buildstream.storage._filebaseddirectory import FileBasedDirectory
from buildstream._cas import CASCache
@@ -43,47 +40,14 @@ from buildstream.storage.directory import VirtualDirectoryError
root_filesets = [
[('a/b/c/textfile1', 'F', 'This is textfile 1\n')],
[('a/b/c/textfile1', 'F', 'This is the replacement textfile 1\n')],
- [('a/b/d', 'D', '')],
- [('a/b/e', 'S', '/a/b/d')],
[('a/b/f', 'S', '/a/b/c')],
- [('a/b/d', 'D', ''), ('a/b/e', 'S', '/a/b/d')],
[('a/b/c', 'D', ''), ('a/b/f', 'S', '/a/b/c')],
- [('a/c', 'F', 'This is textfile 1\n')],
- [('a/b/e', 'F', 'This is textfile 1\n')],
- [('a/b/c', 'D', '')]
+ [('a/b/f', 'F', 'This is textfile 1\n')],
]
empty_hash_ref = sha256().hexdigest()
RANDOM_SEED = 69105
-NUM_RANDOM_TESTS = 10
-
-
-# Since parent processes wait for queue events, we need
-# to put something on it if the called process raises an
-# exception.
-def _queue_wrapper(target, queue, *args):
- try:
- target(*args)
- except Exception as e:
- queue.put(str(e))
- raise
-
- queue.put(None)
-
-
-def _run_test_in_subprocess(func, *args):
- queue = multiprocessing.Queue()
- process = multiprocessing.Process(target=_queue_wrapper, args=(func, queue, *args))
- try:
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
+NUM_RANDOM_TESTS = 4
def generate_import_roots(rootno, directory):
@@ -213,7 +177,7 @@ def directory_not_empty(path):
return os.listdir(path)
-def _import_test_subprocess(tmpdir, original, overlay, generator_function, verify_contents=False):
+def _import_test(tmpdir, original, overlay, generator_function, verify_contents=False):
cas_cache = CASCache(tmpdir)
try:
# Create some fake content
@@ -269,26 +233,21 @@ def _import_test_subprocess(tmpdir, original, overlay, generator_function, verif
cas_cache.release_resources()
-def _import_test(tmpdir, original, overlay, generator_function, verify_contents=False):
- _run_test_in_subprocess(_import_test_subprocess, tmpdir, original, overlay, generator_function, verify_contents)
-
-
-# It's possible to parameterize on both original and overlay values,
-# but this leads to more tests being listed in the output than are
-# comfortable.
+@pytest.mark.in_subprocess
@pytest.mark.parametrize("original", range(1, len(root_filesets) + 1))
-def test_fixed_cas_import(tmpdir, original):
- for overlay in range(1, len(root_filesets) + 1):
- _import_test(str(tmpdir), original, overlay, generate_import_roots, verify_contents=True)
+@pytest.mark.parametrize("overlay", range(1, len(root_filesets) + 1))
+def test_fixed_cas_import(tmpdir, original, overlay):
+ _import_test(str(tmpdir), original, overlay, generate_import_roots, verify_contents=True)
+@pytest.mark.in_subprocess
@pytest.mark.parametrize("original", range(1, NUM_RANDOM_TESTS + 1))
-def test_random_cas_import(tmpdir, original):
- for overlay in range(1, NUM_RANDOM_TESTS + 1):
- _import_test(str(tmpdir), original, overlay, generate_random_root, verify_contents=False)
+@pytest.mark.parametrize("overlay", range(1, NUM_RANDOM_TESTS + 1))
+def test_random_cas_import(tmpdir, original, overlay):
+ _import_test(str(tmpdir), original, overlay, generate_random_root, verify_contents=False)
-def _listing_test_subprocess(tmpdir, root, generator_function):
+def _listing_test(tmpdir, root, generator_function):
cas_cache = CASCache(tmpdir)
try:
# Create some fake content
@@ -305,22 +264,21 @@ def _listing_test_subprocess(tmpdir, root, generator_function):
cas_cache.release_resources()
-def _listing_test(tmpdir, root, generator_function):
- _run_test_in_subprocess(_listing_test_subprocess, tmpdir, root, generator_function)
-
-
-@pytest.mark.parametrize("root", range(1, 11))
+@pytest.mark.in_subprocess
+@pytest.mark.parametrize("root", range(1, NUM_RANDOM_TESTS + 1))
def test_random_directory_listing(tmpdir, root):
_listing_test(str(tmpdir), root, generate_random_root)
-@pytest.mark.parametrize("root", [1, 2, 3, 4, 5])
+@pytest.mark.in_subprocess
+@pytest.mark.parametrize("root", range(1, len(root_filesets) + 1))
def test_fixed_directory_listing(tmpdir, root):
_listing_test(str(tmpdir), root, generate_import_roots)
# Check that the vdir is decending and readable
-def _test_descend_subprocess(tmpdir):
+@pytest.mark.in_subprocess
+def test_descend(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
try:
@@ -343,14 +301,11 @@ def _test_descend_subprocess(tmpdir):
cas_cache.release_resources()
-def test_descend(tmpdir):
- _run_test_in_subprocess(_test_descend_subprocess, tmpdir)
-
-
# Check symlink logic for edgecases
# Make sure the correct erros are raised when trying
# to decend in to files or links to files
-def _test_bad_symlinks_subprocess(tmpdir):
+@pytest.mark.in_subprocess
+def test_bad_symlinks(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
try:
@@ -381,13 +336,10 @@ def _test_bad_symlinks_subprocess(tmpdir):
cas_cache.release_resources()
-def test_bad_symlinks(tmpdir):
- _run_test_in_subprocess(_test_bad_symlinks_subprocess, tmpdir)
-
-
# Check symlink logic for edgecases
# Check decend accross relitive link
-def _test_relative_symlink_subprocess(tmpdir):
+@pytest.mark.in_subprocess
+def test_relative_symlink(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
try:
@@ -410,13 +362,10 @@ def _test_relative_symlink_subprocess(tmpdir):
cas_cache.release_resources()
-def test_relative_symlink(tmpdir):
- _run_test_in_subprocess(_test_relative_symlink_subprocess, tmpdir)
-
-
# Check symlink logic for edgecases
# Check deccend accross abs link
-def _test_abs_symlink_subprocess(tmpdir):
+@pytest.mark.in_subprocess
+def test_abs_symlink(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
try:
@@ -440,13 +389,10 @@ def _test_abs_symlink_subprocess(tmpdir):
cas_cache.release_resources()
-def test_abs_symlink(tmpdir):
- _run_test_in_subprocess(_test_abs_symlink_subprocess, tmpdir)
-
-
# Check symlink logic for edgecases
# Check symlink can not escape root
-def _test_bad_sym_escape_subprocess(tmpdir):
+@pytest.mark.in_subprocess
+def test_bad_sym_escape(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
try:
@@ -468,7 +414,3 @@ def _test_bad_sym_escape_subprocess(tmpdir):
assert error.reason == "directory-not-found"
finally:
cas_cache.release_resources()
-
-
-def test_bad_sym_escape(tmpdir):
- _run_test_in_subprocess(_test_bad_sym_escape_subprocess, tmpdir)