summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.van.berkom@gmail.com>2019-01-18 16:35:23 +0000
committerTristan Van Berkom <tristan.van.berkom@gmail.com>2019-01-18 16:35:23 +0000
commit4236bcc7a2a1f13fd64a4262bee25c3e0e2b0d5a (patch)
treec722bcccceb741d2c7ed20d036a0c0816cda72d9
parentad2df651bc3fd0d3bfe915ed81a0b5df6a0e2ca2 (diff)
parenta895cb2ad9c06b257ca821ec9ad6e3fb1eee3028 (diff)
downloadbuildstream-4236bcc7a2a1f13fd64a4262bee25c3e0e2b0d5a.tar.gz
Merge branch 'tristan/fix-bzr-race' into 'master'
Fix bzr race conditions Closes #868 See merge request BuildStream/buildstream!1083
-rw-r--r--buildstream/plugins/sources/bzr.py127
-rw-r--r--tests/frontend/track.py60
-rw-r--r--tests/testutils/runcli.py22
3 files changed, 116 insertions, 93 deletions
diff --git a/buildstream/plugins/sources/bzr.py b/buildstream/plugins/sources/bzr.py
index f52472918..5b993c043 100644
--- a/buildstream/plugins/sources/bzr.py
+++ b/buildstream/plugins/sources/bzr.py
@@ -56,6 +56,7 @@ details on common configuration options for sources.
import os
import shutil
+import fcntl
from contextlib import contextmanager
from buildstream import Source, SourceError, Consistency
@@ -84,10 +85,12 @@ class BzrSource(Source):
if self.ref is None or self.tracking is None:
return Consistency.INCONSISTENT
- if self._check_ref():
- return Consistency.CACHED
- else:
- return Consistency.RESOLVED
+ # Lock for the _check_ref()
+ with self._locked():
+ if self._check_ref():
+ return Consistency.CACHED
+ else:
+ return Consistency.RESOLVED
def load_ref(self, node):
self.ref = self.node_get_member(node, str, 'ref', None)
@@ -100,7 +103,7 @@ class BzrSource(Source):
def track(self):
with self.timed_activity("Tracking {}".format(self.url),
- silent_nested=True):
+ silent_nested=True), self._locked():
self._ensure_mirror(skip_ref_check=True)
ret, out = self.check_output([self.host_bzr, "version-info",
"--custom", "--template={revno}",
@@ -114,7 +117,7 @@ class BzrSource(Source):
def fetch(self):
with self.timed_activity("Fetching {}".format(self.url),
- silent_nested=True):
+ silent_nested=True), self._locked():
self._ensure_mirror()
def stage(self, directory):
@@ -141,6 +144,26 @@ class BzrSource(Source):
"--directory={}".format(directory), url],
fail="Failed to switch workspace's parent branch to {}".format(url))
+ # _locked()
+ #
+ # This context manager ensures exclusive access to the
+ # bzr repository.
+ #
+ @contextmanager
+ def _locked(self):
+ lockdir = os.path.join(self.get_mirror_directory(), 'locks')
+ lockfile = os.path.join(
+ lockdir,
+ utils.url_directory_name(self.original_url) + '.lock'
+ )
+ os.makedirs(lockdir, exist_ok=True)
+ with open(lockfile, 'w') as lock:
+ fcntl.flock(lock, fcntl.LOCK_EX)
+ try:
+ yield
+ finally:
+ fcntl.flock(lock, fcntl.LOCK_UN)
+
def _check_ref(self):
# If the mirror doesnt exist yet, then we dont have the ref
if not os.path.exists(self._get_branch_dir()):
@@ -157,83 +180,27 @@ class BzrSource(Source):
return os.path.join(self.get_mirror_directory(),
utils.url_directory_name(self.original_url))
- def _atomic_replace_mirrordir(self, srcdir):
- """Helper function to safely replace the mirror dir"""
+ def _ensure_mirror(self, skip_ref_check=False):
+ mirror_dir = self._get_mirror_dir()
+ bzr_metadata_dir = os.path.join(mirror_dir, ".bzr")
+ if not os.path.exists(bzr_metadata_dir):
+ self.call([self.host_bzr, "init-repo", "--no-trees", mirror_dir],
+ fail="Failed to initialize bzr repository")
+
+ branch_dir = os.path.join(mirror_dir, self.tracking)
+ branch_url = self.url + "/" + self.tracking
+ if not os.path.exists(branch_dir):
+ # `bzr branch` the branch if it doesn't exist
+ # to get the upstream code
+ self.call([self.host_bzr, "branch", branch_url, branch_dir],
+ fail="Failed to branch from {} to {}".format(branch_url, branch_dir))
- if not os.path.exists(self._get_mirror_dir()):
- # Just move the srcdir to the mirror dir
- try:
- os.rename(srcdir, self._get_mirror_dir())
- except OSError as e:
- raise SourceError("{}: Failed to move srcdir '{}' to mirror dir '{}'"
- .format(str(self), srcdir, self._get_mirror_dir())) from e
else:
- # Atomically swap the backup dir.
- backupdir = self._get_mirror_dir() + ".bak"
- try:
- os.rename(self._get_mirror_dir(), backupdir)
- except OSError as e:
- raise SourceError("{}: Failed to move mirrordir '{}' to backup dir '{}'"
- .format(str(self), self._get_mirror_dir(), backupdir)) from e
+ # `bzr pull` the branch if it does exist
+ # to get any changes to the upstream code
+ self.call([self.host_bzr, "pull", "--directory={}".format(branch_dir), branch_url],
+ fail="Failed to pull new changes for {}".format(branch_dir))
- try:
- os.rename(srcdir, self._get_mirror_dir())
- except OSError as e:
- # Attempt to put the backup back!
- os.rename(backupdir, self._get_mirror_dir())
- raise SourceError("{}: Failed to replace bzr repo '{}' with '{}"
- .format(str(self), srcdir, self._get_mirror_dir())) from e
- finally:
- if os.path.exists(backupdir):
- shutil.rmtree(backupdir)
-
- @contextmanager
- def _atomic_repodir(self):
- """Context manager for working in a copy of the bzr repository
-
- Yields:
- (str): A path to the copy of the bzr repo
-
- This should be used because bzr does not give any guarantees of
- atomicity, and aborting an operation at the wrong time (or
- accidentally running multiple concurrent operations) can leave the
- repo in an inconsistent state.
- """
- with self.tempdir() as repodir:
- mirror_dir = self._get_mirror_dir()
- if os.path.exists(mirror_dir):
- try:
- # shutil.copytree doesn't like it if destination exists
- shutil.rmtree(repodir)
- shutil.copytree(mirror_dir, repodir)
- except (shutil.Error, OSError) as e:
- raise SourceError("{}: Failed to copy bzr repo from '{}' to '{}'"
- .format(str(self), mirror_dir, repodir)) from e
-
- yield repodir
- self._atomic_replace_mirrordir(repodir)
-
- def _ensure_mirror(self, skip_ref_check=False):
- with self._atomic_repodir() as repodir:
- # Initialize repo if no metadata
- bzr_metadata_dir = os.path.join(repodir, ".bzr")
- if not os.path.exists(bzr_metadata_dir):
- self.call([self.host_bzr, "init-repo", "--no-trees", repodir],
- fail="Failed to initialize bzr repository")
-
- branch_dir = os.path.join(repodir, self.tracking)
- branch_url = self.url + "/" + self.tracking
- if not os.path.exists(branch_dir):
- # `bzr branch` the branch if it doesn't exist
- # to get the upstream code
- self.call([self.host_bzr, "branch", branch_url, branch_dir],
- fail="Failed to branch from {} to {}".format(branch_url, branch_dir))
-
- else:
- # `bzr pull` the branch if it does exist
- # to get any changes to the upstream code
- self.call([self.host_bzr, "pull", "--directory={}".format(branch_dir), branch_url],
- fail="Failed to pull new changes for {}".format(branch_dir))
if not skip_ref_check and not self._check_ref():
raise SourceError("Failed to ensure ref '{}' was mirrored".format(self.ref),
reason="ref-not-mirrored")
diff --git a/tests/frontend/track.py b/tests/frontend/track.py
index 82e8ec4ce..d149bd050 100644
--- a/tests/frontend/track.py
+++ b/tests/frontend/track.py
@@ -73,14 +73,36 @@ def test_track(cli, tmpdir, datafiles, ref_storage, kind):
assert not os.path.exists(os.path.join(project, 'project.refs'))
+# NOTE:
+#
+# This test checks that recursive tracking works by observing
+# element states after running a recursive tracking operation.
+#
+# However, this test is ALSO valuable as it stresses the source
+# plugins in a situation where many source plugins are operating
+# at once on the same backing repository.
+#
+# Do not change this test to use a separate 'Repo' per element
+# as that would defeat the purpose of the stress test, otherwise
+# please refactor that aspect into another test.
+#
@pytest.mark.datafiles(DATA_DIR)
+@pytest.mark.parametrize("amount", [(1), (10)])
@pytest.mark.parametrize("kind", [(kind) for kind in ALL_REPO_KINDS])
-def test_track_recurse(cli, tmpdir, datafiles, kind):
+def test_track_recurse(cli, tmpdir, datafiles, kind, amount):
project = os.path.join(datafiles.dirname, datafiles.basename)
dev_files_path = os.path.join(project, 'files', 'dev-files')
element_path = os.path.join(project, 'elements')
- element_dep_name = 'track-test-dep-{}.bst'.format(kind)
- element_target_name = 'track-test-target-{}.bst'.format(kind)
+
+ # Try to actually launch as many fetch jobs as possible at the same time
+ #
+ # This stresses the Source plugins and helps to ensure that
+ # they handle concurrent access to the store correctly.
+ cli.configure({
+ 'scheduler': {
+ 'fetchers': amount,
+ }
+ })
# Create our repo object of the given source type with
# the dev files, and then collect the initial ref.
@@ -89,18 +111,26 @@ def test_track_recurse(cli, tmpdir, datafiles, kind):
ref = repo.create(dev_files_path)
# Write out our test targets
- generate_element(repo, os.path.join(element_path, element_dep_name))
- generate_element(repo, os.path.join(element_path, element_target_name),
- dep_name=element_dep_name)
+ element_names = []
+ last_element_name = None
+ for i in range(amount + 1):
+ element_name = 'track-test-{}-{}.bst'.format(kind, i + 1)
+ filename = os.path.join(element_path, element_name)
+
+ element_names.append(element_name)
+
+ generate_element(repo, filename, dep_name=last_element_name)
+ last_element_name = element_name
# Assert that a fetch is needed
- assert cli.get_element_state(project, element_dep_name) == 'no reference'
- assert cli.get_element_state(project, element_target_name) == 'no reference'
+ states = cli.get_element_states(project, last_element_name)
+ for element_name in element_names:
+ assert states[element_name] == 'no reference'
# Now first try to track it
result = cli.run(project=project, args=[
'source', 'track', '--deps', 'all',
- element_target_name])
+ last_element_name])
result.assert_success()
# And now fetch it: The Source has probably already cached the
@@ -109,12 +139,16 @@ def test_track_recurse(cli, tmpdir, datafiles, kind):
# is the job of fetch.
result = cli.run(project=project, args=[
'source', 'fetch', '--deps', 'all',
- element_target_name])
+ last_element_name])
result.assert_success()
- # Assert that the dependency is buildable and the target is waiting
- assert cli.get_element_state(project, element_dep_name) == 'buildable'
- assert cli.get_element_state(project, element_target_name) == 'waiting'
+ # Assert that the base is buildable and the rest are waiting
+ states = cli.get_element_states(project, last_element_name)
+ for element_name in element_names:
+ if element_name == element_names[0]:
+ assert states[element_name] == 'buildable'
+ else:
+ assert states[element_name] == 'waiting'
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/testutils/runcli.py b/tests/testutils/runcli.py
index 770f6289a..d3f5113a0 100644
--- a/tests/testutils/runcli.py
+++ b/tests/testutils/runcli.py
@@ -375,6 +375,9 @@ class Cli():
# Fetch an element state by name by
# invoking bst show on the project with the CLI
#
+ # If you need to get the states of multiple elements,
+ # then use get_element_states(s) instead.
+ #
def get_element_state(self, project, element_name):
result = self.run(project=project, silent=True, args=[
'show',
@@ -385,6 +388,25 @@ class Cli():
result.assert_success()
return result.output.strip()
+ # Fetch the states of elements for a given target / deps
+ #
+ # Returns a dictionary with the element names as keys
+ #
+ def get_element_states(self, project, target, deps='all'):
+ result = self.run(project=project, silent=True, args=[
+ 'show',
+ '--deps', deps,
+ '--format', '%{name}||%{state}',
+ target
+ ])
+ result.assert_success()
+ lines = result.output.splitlines()
+ states = {}
+ for line in lines:
+ split = line.split(sep='||')
+ states[split[0]] = split[1]
+ return states
+
# Fetch an element's cache key by invoking bst show
# on the project with the CLI
#