diff options
Diffstat (limited to 'buildstream')
-rw-r--r-- | buildstream/_artifactcache.py | 46 | ||||
-rw-r--r-- | buildstream/_basecache.py | 42 | ||||
-rw-r--r-- | buildstream/_frontend/cli.py | 7 | ||||
-rw-r--r-- | buildstream/_frontend/widget.py | 2 | ||||
-rw-r--r-- | buildstream/_scheduler/__init__.py | 3 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/artifactpushqueue.py (renamed from buildstream/_scheduler/queues/pushqueue.py) | 2 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/fetchqueue.py | 7 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/sourcepushqueue.py | 42 | ||||
-rw-r--r-- | buildstream/_sourcecache.py | 70 | ||||
-rw-r--r-- | buildstream/_stream.py | 37 | ||||
-rw-r--r-- | buildstream/element.py | 44 | ||||
-rw-r--r-- | buildstream/plugintestutils/runcli.py | 2 | ||||
-rw-r--r-- | buildstream/source.py | 7 |
13 files changed, 240 insertions, 71 deletions
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py index f97da4661..3ca6c6e60 100644 --- a/buildstream/_artifactcache.py +++ b/buildstream/_artifactcache.py @@ -263,48 +263,6 @@ class ArtifactCache(BaseCache): return self.cas.diff(ref_a, ref_b, subdir=subdir) - # has_fetch_remotes(): - # - # Check whether any remote repositories are available for fetching. - # - # Args: - # element (Element): The Element to check - # - # Returns: True if any remote repositories are configured, False otherwise - # - def has_fetch_remotes(self, *, element=None): - if not self._has_fetch_remotes: - # No project has fetch remotes - return False - elif element is None: - # At least one (sub)project has fetch remotes - return True - else: - # Check whether the specified element's project has fetch remotes - remotes_for_project = self._remotes[element._get_project()] - return bool(remotes_for_project) - - # has_push_remotes(): - # - # Check whether any remote repositories are available for pushing. - # - # Args: - # element (Element): The Element to check - # - # Returns: True if any remote repository is configured, False otherwise - # - def has_push_remotes(self, *, element=None): - if not self._has_push_remotes: - # No project has push remotes - return False - elif element is None: - # At least one (sub)project has push remotes - return True - else: - # Check whether the specified element's project has push remotes - remotes_for_project = self._remotes[element._get_project()] - return any(remote.spec.push for remote in remotes_for_project) - # push(): # # Push committed artifact to remote repository. @@ -337,7 +295,7 @@ class ArtifactCache(BaseCache): element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) pushed = True else: - element.info("Remote ({}) already has {} cached".format( + element.info("Remote ({}) already has artifact {} cached".format( remote.spec.url, element._get_brief_display_key() )) @@ -372,7 +330,7 @@ class ArtifactCache(BaseCache): # no need to pull from additional remotes return True else: - element.info("Remote ({}) does not have {} cached".format( + element.info("Remote ({}) does not have artifact {} cached".format( remote.spec.url, element._get_brief_display_key() )) diff --git a/buildstream/_basecache.py b/buildstream/_basecache.py index a8c58e48f..696cbf9c1 100644 --- a/buildstream/_basecache.py +++ b/buildstream/_basecache.py @@ -190,6 +190,48 @@ class BaseCache(): self._remotes[project] = project_remotes + # has_fetch_remotes(): + # + # Check whether any remote repositories are available for fetching. + # + # Args: + # plugin (Plugin): The Plugin to check + # + # Returns: True if any remote repositories are configured, False otherwise + # + def has_fetch_remotes(self, *, plugin=None): + if not self._has_fetch_remotes: + # No project has fetch remotes + return False + elif plugin is None: + # At least one (sub)project has fetch remotes + return True + else: + # Check whether the specified element's project has fetch remotes + remotes_for_project = self._remotes[plugin._get_project()] + return bool(remotes_for_project) + + # has_push_remotes(): + # + # Check whether any remote repositories are available for pushing. + # + # Args: + # element (Element): The Element to check + # + # Returns: True if any remote repository is configured, False otherwise + # + def has_push_remotes(self, *, plugin=None): + if not self._has_push_remotes: + # No project has push remotes + return False + elif plugin is None: + # At least one (sub)project has push remotes + return True + else: + # Check whether the specified element's project has push remotes + remotes_for_project = self._remotes[plugin._get_project()] + return any(remote.spec.push for remote in remotes_for_project) + ################################################ # Local Private Methods # ################################################ diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py index 25298a684..a1a780cc4 100644 --- a/buildstream/_frontend/cli.py +++ b/buildstream/_frontend/cli.py @@ -622,10 +622,12 @@ def source(): help="Track new source references before fetching") @click.option('--track-cross-junctions', '-J', default=False, is_flag=True, help="Allow tracking to cross junction boundaries") +@click.option('--remote', '-r', default=None, + help="The URL of the remote source cache (defaults to the first configured cache)") @click.argument('elements', nargs=-1, type=click.Path(readable=False)) @click.pass_obj -def source_fetch(app, elements, deps, track_, except_, track_cross_junctions): +def source_fetch(app, elements, deps, track_, except_, track_cross_junctions, remote): """Fetch sources required to build the pipeline Specifying no elements will result in fetching the default targets @@ -666,7 +668,8 @@ def source_fetch(app, elements, deps, track_, except_, track_cross_junctions): selection=deps, except_targets=except_, track_targets=track_, - track_cross_junctions=track_cross_junctions) + track_cross_junctions=track_cross_junctions, + remote=remote) ################################################################## diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py index ef31b8ba7..f092cb5ec 100644 --- a/buildstream/_frontend/widget.py +++ b/buildstream/_frontend/widget.py @@ -201,7 +201,7 @@ class ElementName(Widget): if not action_name: action_name = "Main" - return self.content_profile.fmt("{: >5}".format(action_name.lower())) + \ + return self.content_profile.fmt("{: >8}".format(action_name.lower())) + \ self.format_profile.fmt(':') + self.content_profile.fmt(name) diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py index 470859864..d2f458fa5 100644 --- a/buildstream/_scheduler/__init__.py +++ b/buildstream/_scheduler/__init__.py @@ -20,9 +20,10 @@ from .queues import Queue, QueueStatus from .queues.fetchqueue import FetchQueue +from .queues.sourcepushqueue import SourcePushQueue from .queues.trackqueue import TrackQueue from .queues.buildqueue import BuildQueue -from .queues.pushqueue import PushQueue +from .queues.artifactpushqueue import ArtifactPushQueue from .queues.pullqueue import PullQueue from .scheduler import Scheduler, SchedStatus diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/artifactpushqueue.py index 35532d23d..b861d4fc7 100644 --- a/buildstream/_scheduler/queues/pushqueue.py +++ b/buildstream/_scheduler/queues/artifactpushqueue.py @@ -26,7 +26,7 @@ from ..._exceptions import SkipJob # A queue which pushes element artifacts # -class PushQueue(Queue): +class ArtifactPushQueue(Queue): action_name = "Push" complete_name = "Pushed" diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index 546c65b65..9edeebb1d 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -73,5 +73,8 @@ class FetchQueue(Queue): element._fetch_done() - # Successful fetch, we must be CACHED now - assert element._get_consistency() == Consistency.CACHED + # Successful fetch, we must be CACHED or in the sourcecache + if self._fetch_original: + assert element._get_consistency() == Consistency.CACHED + else: + assert element._source_cached() diff --git a/buildstream/_scheduler/queues/sourcepushqueue.py b/buildstream/_scheduler/queues/sourcepushqueue.py new file mode 100644 index 000000000..c38460e6a --- /dev/null +++ b/buildstream/_scheduler/queues/sourcepushqueue.py @@ -0,0 +1,42 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> + +from . import Queue, QueueStatus +from ..resources import ResourceType +from ..._exceptions import SkipJob + + +# A queue which pushes staged sources +# +class SourcePushQueue(Queue): + + action_name = "Src-push" + complete_name = "Sources pushed" + resources = [ResourceType.UPLOAD] + + def process(self, element): + # Returns whether a source was pushed or not + if not element._source_push(): + raise SkipJob(self.action_name) + + def status(self, element): + if element._skip_source_push(): + return QueueStatus.SKIP + + return QueueStatus.READY diff --git a/buildstream/_sourcecache.py b/buildstream/_sourcecache.py index b21edaa81..219ecb1e9 100644 --- a/buildstream/_sourcecache.py +++ b/buildstream/_sourcecache.py @@ -20,7 +20,7 @@ from ._cas import CASRemoteSpec from .storage._casbaseddirectory import CasBasedDirectory from ._basecache import BaseCache -from ._exceptions import CASCacheError, SourceCacheError +from ._exceptions import CASError, CASCacheError, SourceCacheError from . import utils @@ -143,3 +143,71 @@ class SourceCache(BaseCache): raise SourceCacheError("Error exporting source: {}".format(e)) return CasBasedDirectory(self.cas, digest=digest) + + # pull() + # + # Attempts to pull sources from configure remote source caches. + # + # Args: + # source (Source): The source we want to fetch + # progress (callable|None): The progress callback + # + # Returns: + # (bool): True if pull successful, False if not + def pull(self, source, *, progress=None): + ref = source._get_source_name() + + project = source._get_project() + + display_key = source._get_brief_display_key() + + for remote in self._remotes[project]: + try: + source.status("Pulling source {} <- {}".format(display_key, remote.spec.url)) + + if self.cas.pull(ref, remote, progress=progress): + source.info("Pulled source {} <- {}".format(display_key, remote.spec.url)) + # no need to pull from additional remotes + return True + else: + source.info("Remote ({}) does not have source {} cached".format( + remote.spec.url, display_key)) + except CASError as e: + raise SourceCacheError("Failed to pull source {}: {}".format( + display_key, e)) from e + return False + + # push() + # + # Push a source to configured remote source caches + # + # Args: + # source (Source): source to push + # + # Returns: + # (Bool): whether it pushed to a remote source cache + # + def push(self, source): + ref = source._get_source_name() + project = source._get_project() + + # find configured push remotes for this source + if self._has_push_remotes: + push_remotes = [r for r in self._remotes[project] if r.spec.push] + else: + push_remotes = [] + + pushed = False + + display_key = source._get_brief_display_key() + for remote in push_remotes: + remote.init() + source.status("Pushing source {} -> {}".format(display_key, remote.spec.url)) + if self.cas.push([ref], remote): + source.info("Pushed source {} -> {}".format(display_key, remote.spec.url)) + pushed = True + else: + source.info("Remote ({}) already has source {} cached" + .format(remote.spec.url, display_key)) + + return pushed diff --git a/buildstream/_stream.py b/buildstream/_stream.py index d77b8d33b..d2ece163d 100644 --- a/buildstream/_stream.py +++ b/buildstream/_stream.py @@ -34,7 +34,8 @@ from fnmatch import fnmatch from ._artifactelement import verify_artifact_ref from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, CASCacheError from ._message import Message, MessageType -from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue +from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ + SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue from ._pipeline import Pipeline, PipelineSelection from ._profile import Topics, profile_start, profile_end from .types import _KeyStrength @@ -77,6 +78,7 @@ class Stream(): # Private members # self._artifacts = context.artifactcache + self._sourcecache = context.sourcecache self._context = context self._project = project self._pipeline = Pipeline(context, project, self._artifacts) @@ -106,7 +108,7 @@ class Stream(): # targets (list of str): Targets to pull # selection (PipelineSelection): The selection mode for the specified targets # except_targets (list of str): Specified targets to except from fetching - # use_artifact_config (bool): If artifact remote config should be loaded + # use_artifact_config (bool): If artifact remote configs should be loaded # # Returns: # (list of Element): The selected elements @@ -239,6 +241,7 @@ class Stream(): ignore_junction_targets=ignore_junction_targets, use_artifact_config=use_config, artifact_remote_url=remote, + use_source_config=True, fetch_subprojects=True, dynamic_plan=True) @@ -259,10 +262,14 @@ class Stream(): self._add_queue(PullQueue(self._scheduler)) self._add_queue(FetchQueue(self._scheduler, skip_cached=True)) + self._add_queue(BuildQueue(self._scheduler)) if self._artifacts.has_push_remotes(): - self._add_queue(PushQueue(self._scheduler)) + self._add_queue(ArtifactPushQueue(self._scheduler)) + + if self._sourcecache.has_push_remotes(): + self._add_queue(SourcePushQueue(self._scheduler)) # Enqueue elements # @@ -281,12 +288,14 @@ class Stream(): # except_targets (list of str): Specified targets to except from fetching # track_targets (bool): Whether to track selected targets in addition to fetching # track_cross_junctions (bool): Whether tracking should cross junction boundaries + # remote (str|None): The URL of a specific remote server to pull from. # def fetch(self, targets, *, selection=PipelineSelection.PLAN, except_targets=None, track_targets=False, - track_cross_junctions=False): + track_cross_junctions=False, + remote=None): if track_targets: track_targets = targets @@ -297,13 +306,19 @@ class Stream(): track_selection = PipelineSelection.NONE track_except_targets = () + use_source_config = True + if remote: + use_source_config = False + elements, track_elements = \ self._load(targets, track_targets, selection=selection, track_selection=track_selection, except_targets=except_targets, track_except_targets=track_except_targets, track_cross_junctions=track_cross_junctions, - fetch_subprojects=True) + fetch_subprojects=True, + use_source_config=use_source_config, + source_remote_url=remote) # Delegated to a shared fetch method self._fetch(elements, track_elements=track_elements) @@ -424,7 +439,7 @@ class Stream(): self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(require_buildtrees) - push_queue = PushQueue(self._scheduler) + push_queue = ArtifactPushQueue(self._scheduler) self._add_queue(push_queue) self._enqueue_plan(elements, queue=push_queue) self._run() @@ -986,7 +1001,9 @@ class Stream(): # track_cross_junctions (bool): Whether tracking should cross junction boundaries # ignore_junction_targets (bool): Whether junction targets should be filtered out # use_artifact_config (bool): Whether to initialize artifacts with the config - # artifact_remote_url (bool): A remote url for initializing the artifacts + # use_source_config (bool): Whether to initialize remote source caches with the config + # artifact_remote_url (str): A remote url for initializing the artifacts + # source_remote_url (str): A remote url for initializing source caches # fetch_subprojects (bool): Whether to fetch subprojects while loading # # Returns: @@ -1002,7 +1019,9 @@ class Stream(): track_cross_junctions=False, ignore_junction_targets=False, use_artifact_config=False, + use_source_config=False, artifact_remote_url=None, + source_remote_url=None, fetch_subprojects=False, dynamic_plan=False, load_refs=False): @@ -1087,6 +1106,7 @@ class Stream(): # Connect to remote caches, this needs to be done before resolving element state self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url) + self._sourcecache.setup_remotes(use_config=use_source_config, remote_url=source_remote_url) # Now move on to loading primary selection. # @@ -1106,7 +1126,7 @@ class Stream(): required_elements = functools.partial(self._pipeline.dependencies, elements, Scope.ALL) self._artifacts.mark_required_elements(required_elements()) - self._context.sourcecache.mark_required_sources( + self._sourcecache.mark_required_sources( itertools.chain.from_iterable( [element.sources() for element in required_elements()])) @@ -1217,6 +1237,7 @@ class Stream(): if track_elements: self._enqueue_plan(track_elements, queue=track_queue) + self._enqueue_plan(fetch_plan) self._run() diff --git a/buildstream/element.py b/buildstream/element.py index cb04a9c15..b27f3e7df 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -213,6 +213,7 @@ class Element(Plugin): self.__weak_cache_key = None # Our cached weak cache key self.__strict_cache_key = None # Our cached cache key for strict builds self.__artifacts = context.artifactcache # Artifact cache + self.__sourcecache = context.sourcecache # Source cache self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state self.__strong_cached = None # Whether we have a cached artifact self.__weak_cached = None # Whether we have a cached artifact @@ -1810,7 +1811,7 @@ class Element(Plugin): # Pull is pending if artifact remote server available # and pull has not been attempted yet - return self.__artifacts.has_fetch_remotes(element=self) and not self.__pull_done + return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done # _pull_done() # @@ -1855,6 +1856,25 @@ class Element(Plugin): # Notify successfull download return True + def _skip_source_push(self): + if not self.__sources or self._get_workspace(): + return True + return not (self.__sourcecache.has_push_remotes(plugin=self) and + self._source_cached()) + + def _source_push(self): + # try and push sources if we've got them + if self.__sourcecache.has_push_remotes(plugin=self) and self._source_cached(): + sources = list(self.sources()) + if sources: + source_pushed = self.__sourcecache.push(sources[-1]) + + if not source_pushed: + return False + + # Notify successful upload + return True + # _skip_push(): # # Determine whether we should create a push job for this element. @@ -1863,7 +1883,7 @@ class Element(Plugin): # (bool): True if this element does not need a push job to be created # def _skip_push(self): - if not self.__artifacts.has_push_remotes(element=self): + if not self.__artifacts.has_push_remotes(plugin=self): # No push remotes for this element's project return True @@ -2111,16 +2131,20 @@ class Element(Plugin): # def _fetch(self, fetch_original=False): previous_sources = [] - source = None - sourcecache = self._get_context().sourcecache - - # check whether the final source is cached - for source in self.sources(): - pass + sources = self.__sources + if sources and not fetch_original: + source = sources[-1] + if self.__sourcecache.contains(source): + return - if source and not fetch_original and sourcecache.contains(source): - return + # try and fetch from source cache + if source._get_consistency() < Consistency.CACHED and \ + self.__sourcecache.has_fetch_remotes() and \ + not self.__sourcecache.contains(source): + if self.__sourcecache.pull(source): + return + # We need to fetch original sources for source in self.sources(): source_consistency = source._get_consistency() if source_consistency != Consistency.CACHED: diff --git a/buildstream/plugintestutils/runcli.py b/buildstream/plugintestutils/runcli.py index c08dd0ff3..71d4b4039 100644 --- a/buildstream/plugintestutils/runcli.py +++ b/buildstream/plugintestutils/runcli.py @@ -225,7 +225,7 @@ class Result(): # (list): A list of element names # def get_tracked_elements(self): - tracked = re.findall(r'\[track:(\S+)\s*]', self.stderr) + tracked = re.findall(r'\[\s*track:(\S+)\s*]', self.stderr) if tracked is None: return [] diff --git a/buildstream/source.py b/buildstream/source.py index af89ff8aa..9e1a8ef3e 100644 --- a/buildstream/source.py +++ b/buildstream/source.py @@ -986,6 +986,13 @@ class Source(Plugin): self.get_kind(), self._key) + def _get_brief_display_key(self): + context = self._get_context() + key = self._key + + length = min(len(key), context.log_key_length) + return key[:length] + ############################################################# # Local Private Methods # ############################################################# |