diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-04-14 07:24:19 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-04-14 07:24:19 +0000 |
commit | b3817226286a0c60b7ca955686b767bc40fb1051 (patch) | |
tree | 5d9380ac5d23eac47e03bcdd89ae0c2a7a867ea0 | |
parent | 5dce02bbd68be31b67b080ac6480af359f826609 (diff) | |
parent | 711618fc299cb6442e2e8c14ead8f8d6b76487d1 (diff) | |
download | buildstream-b3817226286a0c60b7ca955686b767bc40fb1051.tar.gz |
Merge branch 'tristan/fix-bzr-race-1.2' into 'bst-1.2'
Backport bzr source plugin race condition to 1.2
See merge request BuildStream/buildstream!1287
-rw-r--r-- | buildstream/plugins/sources/bzr.py | 127 | ||||
-rw-r--r-- | tests/frontend/track.py | 60 | ||||
-rw-r--r-- | tests/testutils/runcli.py | 22 |
3 files changed, 116 insertions, 93 deletions
diff --git a/buildstream/plugins/sources/bzr.py b/buildstream/plugins/sources/bzr.py index c1e289704..60478a3cf 100644 --- a/buildstream/plugins/sources/bzr.py +++ b/buildstream/plugins/sources/bzr.py @@ -57,6 +57,7 @@ bzr - stage files from a bazaar repository import os import shutil +import fcntl from contextlib import contextmanager from buildstream import Source, SourceError, Consistency @@ -85,10 +86,12 @@ class BzrSource(Source): if self.ref is None or self.tracking is None: return Consistency.INCONSISTENT - if self._check_ref(): - return Consistency.CACHED - else: - return Consistency.RESOLVED + # Lock for the _check_ref() + with self._locked(): + if self._check_ref(): + return Consistency.CACHED + else: + return Consistency.RESOLVED def load_ref(self, node): self.ref = self.node_get_member(node, str, 'ref', None) @@ -101,7 +104,7 @@ class BzrSource(Source): def track(self): with self.timed_activity("Tracking {}".format(self.url), - silent_nested=True): + silent_nested=True), self._locked(): self._ensure_mirror(skip_ref_check=True) ret, out = self.check_output([self.host_bzr, "version-info", "--custom", "--template={revno}", @@ -115,7 +118,7 @@ class BzrSource(Source): def fetch(self): with self.timed_activity("Fetching {}".format(self.url), - silent_nested=True): + silent_nested=True), self._locked(): self._ensure_mirror() def stage(self, directory): @@ -140,6 +143,26 @@ class BzrSource(Source): "--directory={}".format(directory), url], fail="Failed to switch workspace's parent branch to {}".format(url)) + # _locked() + # + # This context manager ensures exclusive access to the + # bzr repository. + # + @contextmanager + def _locked(self): + lockdir = os.path.join(self.get_mirror_directory(), 'locks') + lockfile = os.path.join( + lockdir, + utils.url_directory_name(self.original_url) + '.lock' + ) + os.makedirs(lockdir, exist_ok=True) + with open(lockfile, 'w') as lock: + fcntl.flock(lock, fcntl.LOCK_EX) + try: + yield + finally: + fcntl.flock(lock, fcntl.LOCK_UN) + def _check_ref(self): # If the mirror doesnt exist yet, then we dont have the ref if not os.path.exists(self._get_branch_dir()): @@ -156,83 +179,27 @@ class BzrSource(Source): return os.path.join(self.get_mirror_directory(), utils.url_directory_name(self.original_url)) - def _atomic_replace_mirrordir(self, srcdir): - """Helper function to safely replace the mirror dir""" + def _ensure_mirror(self, skip_ref_check=False): + mirror_dir = self._get_mirror_dir() + bzr_metadata_dir = os.path.join(mirror_dir, ".bzr") + if not os.path.exists(bzr_metadata_dir): + self.call([self.host_bzr, "init-repo", "--no-trees", mirror_dir], + fail="Failed to initialize bzr repository") + + branch_dir = os.path.join(mirror_dir, self.tracking) + branch_url = self.url + "/" + self.tracking + if not os.path.exists(branch_dir): + # `bzr branch` the branch if it doesn't exist + # to get the upstream code + self.call([self.host_bzr, "branch", branch_url, branch_dir], + fail="Failed to branch from {} to {}".format(branch_url, branch_dir)) - if not os.path.exists(self._get_mirror_dir()): - # Just move the srcdir to the mirror dir - try: - os.rename(srcdir, self._get_mirror_dir()) - except OSError as e: - raise SourceError("{}: Failed to move srcdir '{}' to mirror dir '{}'" - .format(str(self), srcdir, self._get_mirror_dir())) from e else: - # Atomically swap the backup dir. - backupdir = self._get_mirror_dir() + ".bak" - try: - os.rename(self._get_mirror_dir(), backupdir) - except OSError as e: - raise SourceError("{}: Failed to move mirrordir '{}' to backup dir '{}'" - .format(str(self), self._get_mirror_dir(), backupdir)) from e + # `bzr pull` the branch if it does exist + # to get any changes to the upstream code + self.call([self.host_bzr, "pull", "--directory={}".format(branch_dir), branch_url], + fail="Failed to pull new changes for {}".format(branch_dir)) - try: - os.rename(srcdir, self._get_mirror_dir()) - except OSError as e: - # Attempt to put the backup back! - os.rename(backupdir, self._get_mirror_dir()) - raise SourceError("{}: Failed to replace bzr repo '{}' with '{}" - .format(str(self), srcdir, self._get_mirror_dir())) from e - finally: - if os.path.exists(backupdir): - shutil.rmtree(backupdir) - - @contextmanager - def _atomic_repodir(self): - """Context manager for working in a copy of the bzr repository - - Yields: - (str): A path to the copy of the bzr repo - - This should be used because bzr does not give any guarantees of - atomicity, and aborting an operation at the wrong time (or - accidentally running multiple concurrent operations) can leave the - repo in an inconsistent state. - """ - with self.tempdir() as repodir: - mirror_dir = self._get_mirror_dir() - if os.path.exists(mirror_dir): - try: - # shutil.copytree doesn't like it if destination exists - shutil.rmtree(repodir) - shutil.copytree(mirror_dir, repodir) - except (shutil.Error, OSError) as e: - raise SourceError("{}: Failed to copy bzr repo from '{}' to '{}'" - .format(str(self), mirror_dir, repodir)) from e - - yield repodir - self._atomic_replace_mirrordir(repodir) - - def _ensure_mirror(self, skip_ref_check=False): - with self._atomic_repodir() as repodir: - # Initialize repo if no metadata - bzr_metadata_dir = os.path.join(repodir, ".bzr") - if not os.path.exists(bzr_metadata_dir): - self.call([self.host_bzr, "init-repo", "--no-trees", repodir], - fail="Failed to initialize bzr repository") - - branch_dir = os.path.join(repodir, self.tracking) - branch_url = self.url + "/" + self.tracking - if not os.path.exists(branch_dir): - # `bzr branch` the branch if it doesn't exist - # to get the upstream code - self.call([self.host_bzr, "branch", branch_url, branch_dir], - fail="Failed to branch from {} to {}".format(branch_url, branch_dir)) - - else: - # `bzr pull` the branch if it does exist - # to get any changes to the upstream code - self.call([self.host_bzr, "pull", "--directory={}".format(branch_dir), branch_url], - fail="Failed to pull new changes for {}".format(branch_dir)) if not skip_ref_check and not self._check_ref(): raise SourceError("Failed to ensure ref '{}' was mirrored".format(self.ref), reason="ref-not-mirrored") diff --git a/tests/frontend/track.py b/tests/frontend/track.py index c7921fe4c..cd9f92ac4 100644 --- a/tests/frontend/track.py +++ b/tests/frontend/track.py @@ -73,14 +73,36 @@ def test_track(cli, tmpdir, datafiles, ref_storage, kind): assert not os.path.exists(os.path.join(project, 'project.refs')) +# NOTE: +# +# This test checks that recursive tracking works by observing +# element states after running a recursive tracking operation. +# +# However, this test is ALSO valuable as it stresses the source +# plugins in a situation where many source plugins are operating +# at once on the same backing repository. +# +# Do not change this test to use a separate 'Repo' per element +# as that would defeat the purpose of the stress test, otherwise +# please refactor that aspect into another test. +# @pytest.mark.datafiles(DATA_DIR) +@pytest.mark.parametrize("amount", [(1), (10)]) @pytest.mark.parametrize("kind", [(kind) for kind in ALL_REPO_KINDS]) -def test_track_recurse(cli, tmpdir, datafiles, kind): +def test_track_recurse(cli, tmpdir, datafiles, kind, amount): project = os.path.join(datafiles.dirname, datafiles.basename) dev_files_path = os.path.join(project, 'files', 'dev-files') element_path = os.path.join(project, 'elements') - element_dep_name = 'track-test-dep-{}.bst'.format(kind) - element_target_name = 'track-test-target-{}.bst'.format(kind) + + # Try to actually launch as many fetch jobs as possible at the same time + # + # This stresses the Source plugins and helps to ensure that + # they handle concurrent access to the store correctly. + cli.configure({ + 'scheduler': { + 'fetchers': amount, + } + }) # Create our repo object of the given source type with # the dev files, and then collect the initial ref. @@ -89,18 +111,26 @@ def test_track_recurse(cli, tmpdir, datafiles, kind): ref = repo.create(dev_files_path) # Write out our test targets - generate_element(repo, os.path.join(element_path, element_dep_name)) - generate_element(repo, os.path.join(element_path, element_target_name), - dep_name=element_dep_name) + element_names = [] + last_element_name = None + for i in range(amount + 1): + element_name = 'track-test-{}-{}.bst'.format(kind, i + 1) + filename = os.path.join(element_path, element_name) + + element_names.append(element_name) + + generate_element(repo, filename, dep_name=last_element_name) + last_element_name = element_name # Assert that a fetch is needed - assert cli.get_element_state(project, element_dep_name) == 'no reference' - assert cli.get_element_state(project, element_target_name) == 'no reference' + states = cli.get_element_states(project, last_element_name) + for element_name in element_names: + assert states[element_name] == 'no reference' # Now first try to track it result = cli.run(project=project, args=[ 'track', '--deps', 'all', - element_target_name]) + last_element_name]) result.assert_success() # And now fetch it: The Source has probably already cached the @@ -109,12 +139,16 @@ def test_track_recurse(cli, tmpdir, datafiles, kind): # is the job of fetch. result = cli.run(project=project, args=[ 'fetch', '--deps', 'all', - element_target_name]) + last_element_name]) result.assert_success() - # Assert that the dependency is buildable and the target is waiting - assert cli.get_element_state(project, element_dep_name) == 'buildable' - assert cli.get_element_state(project, element_target_name) == 'waiting' + # Assert that the base is buildable and the rest are waiting + states = cli.get_element_states(project, last_element_name) + for element_name in element_names: + if element_name == element_names[0]: + assert states[element_name] == 'buildable' + else: + assert states[element_name] == 'waiting' @pytest.mark.datafiles(DATA_DIR) diff --git a/tests/testutils/runcli.py b/tests/testutils/runcli.py index 3535e94ea..805b1192d 100644 --- a/tests/testutils/runcli.py +++ b/tests/testutils/runcli.py @@ -343,6 +343,9 @@ class Cli(): # Fetch an element state by name by # invoking bst show on the project with the CLI # + # If you need to get the states of multiple elements, + # then use get_element_states(s) instead. + # def get_element_state(self, project, element_name): result = self.run(project=project, silent=True, args=[ 'show', @@ -353,6 +356,25 @@ class Cli(): result.assert_success() return result.output.strip() + # Fetch the states of elements for a given target / deps + # + # Returns a dictionary with the element names as keys + # + def get_element_states(self, project, target, deps='all'): + result = self.run(project=project, silent=True, args=[ + 'show', + '--deps', deps, + '--format', '%{name}||%{state}', + target + ]) + result.assert_success() + lines = result.output.splitlines() + states = {} + for line in lines: + split = line.split(sep='||') + states[split[0]] = split[1] + return states + # Fetch an element's cache key by invoking bst show # on the project with the CLI # |