diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-04-15 08:45:34 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-04-15 08:45:34 +0000 |
commit | 4aafa38e0d3c73b2fb2663984ce170e7d7ed8e95 (patch) | |
tree | cc946f5b1d045d719d6cbbc886f3b288ad378e97 | |
parent | b3817226286a0c60b7ca955686b767bc40fb1051 (diff) | |
parent | 2db11763d50b2295ab49b44c5ed0adee38ad67fe (diff) | |
download | buildstream-4aafa38e0d3c73b2fb2663984ce170e7d7ed8e95.tar.gz |
Merge branch 'tristan/platform-cache-quota-1.2' into 'bst-1.2'
Cache quote related backports
See merge request BuildStream/buildstream!1288
23 files changed, 466 insertions, 140 deletions
diff --git a/buildstream/_artifactcache/__init__.py b/buildstream/_artifactcache/__init__.py index fad483a57..76435e06f 100644 --- a/buildstream/_artifactcache/__init__.py +++ b/buildstream/_artifactcache/__init__.py @@ -18,3 +18,4 @@ # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> from .artifactcache import ArtifactCache, ArtifactCacheSpec, CACHE_SIZE_FILE +from .artifactcache import ArtifactCacheUsage diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index 965d6e132..0beab5537 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -83,6 +83,39 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert cl ArtifactCacheSpec.__new__.__defaults__ = (None, None, None) +# ArtifactCacheUsage +# +# A simple object to report the current artifact cache +# usage details. +# +# Note that this uses the user configured cache quota +# rather than the internal quota with protective headroom +# removed, to provide a more sensible value to display to +# the user. +# +# Args: +# artifacts (ArtifactCache): The artifact cache to get the status of +# +class ArtifactCacheUsage(): + + def __init__(self, artifacts): + context = artifacts.context + self.quota_config = context.config_cache_quota # Configured quota + self.quota_size = artifacts._cache_quota_original # Resolved cache quota in bytes + self.used_size = artifacts.get_cache_size() # Size used by artifacts in bytes + self.used_percent = 0 # Percentage of the quota used + if self.quota_size is not None: + self.used_percent = int(self.used_size * 100 / self.quota_size) + + # Formattable into a human readable string + # + def __str__(self): + return "{} / {} ({}%)" \ + .format(utils._pretty_size(self.used_size, dec_places=1), + self.quota_config, + self.used_percent) + + # An ArtifactCache manages artifacts. # # Args: @@ -100,6 +133,7 @@ class ArtifactCache(): self._required_elements = set() # The elements required for this session self._cache_size = None # The current cache size, sometimes it's an estimate self._cache_quota = None # The cache quota + self._cache_quota_original = None # The cache quota as specified by the user, in bytes self._cache_lower_threshold = None # The target cache size for a cleanup os.makedirs(self.extractdir, exist_ok=True) @@ -242,11 +276,33 @@ class ArtifactCache(): # # Clean the artifact cache as much as possible. # + # Args: + # progress (callable): A callback to call when a ref is removed + # # Returns: # (int): The size of the cache after having cleaned up # - def clean(self): + def clean(self, progress=None): artifacts = self.list_artifacts() + context = self.context + + # Some accumulative statistics + removed_ref_count = 0 + space_saved = 0 + + # Start off with an announcement with as much info as possible + volume_size, volume_avail = self._get_cache_volume_size() + self._message(MessageType.STATUS, "Starting cache cleanup", + detail=("Elements required by the current build plan: {}\n" + + "User specified quota: {} ({})\n" + + "Cache usage: {}\n" + + "Cache volume: {} total, {} available") + .format(len(self._required_elements), + context.config_cache_quota, + utils._pretty_size(self._cache_quota_original, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2), + utils._pretty_size(volume_size, dec_places=2), + utils._pretty_size(volume_avail, dec_places=2))) # Build a set of the cache keys which are required # based on the required elements at cleanup time @@ -271,11 +327,18 @@ class ArtifactCache(): # can't remove them, we have to abort the build. # # FIXME: Asking the user what to do may be neater + # default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], 'buildstream.conf') - detail = ("There is not enough space to complete the build.\n" - "Please increase the cache-quota in {}." - .format(self.context.config_origin or default_conf)) + detail = ("Aborted after removing {} refs and saving {} disk space.\n" + "The remaining {} in the cache is required by the {} elements in your build plan\n\n" + "There is not enough space to complete the build.\n" + "Please increase the cache-quota in {} and/or make more disk space." + .format(removed_ref_count, + utils._pretty_size(space_saved, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2), + len(self._required_elements), + (context.config_origin or default_conf))) if self.has_quota_exceeded(): raise ArtifactError("Cache too full. Aborting.", @@ -290,10 +353,33 @@ class ArtifactCache(): # Remove the actual artifact, if it's not required. size = self.remove(to_remove) + removed_ref_count += 1 + space_saved += size + + self._message(MessageType.STATUS, + "Freed {: <7} {}".format( + utils._pretty_size(size, dec_places=2), + to_remove)) + # Remove the size from the removed size self.set_cache_size(self._cache_size - size) - # This should be O(1) if implemented correctly + # User callback + # + # Currently this process is fairly slow, but we should + # think about throttling this progress() callback if this + # becomes too intense. + if progress: + progress() + + # Informational message about the side effects of the cleanup + self._message(MessageType.INFO, "Cleanup completed", + detail=("Removed {} refs and saving {} disk space.\n" + + "Cache usage is now: {}") + .format(removed_ref_count, + utils._pretty_size(space_saved, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2))) + return self.get_cache_size() # compute_cache_size() @@ -305,7 +391,14 @@ class ArtifactCache(): # (int): The size of the artifact cache. # def compute_cache_size(self): - self._cache_size = self.calculate_cache_size() + old_cache_size = self._cache_size + new_cache_size = self.calculate_cache_size() + + if old_cache_size != new_cache_size: + self._cache_size = new_cache_size + + usage = ArtifactCacheUsage(self) + self._message(MessageType.STATUS, "Cache usage recomputed: {}".format(usage)) return self._cache_size @@ -333,7 +426,7 @@ class ArtifactCache(): # it is greater than the actual cache size. # # Returns: - # (int) An approximation of the artifact cache size. + # (int) An approximation of the artifact cache size, in bytes. # def get_cache_size(self): @@ -378,6 +471,13 @@ class ArtifactCache(): # Abstract methods for subclasses to implement # ################################################ + # preflight(): + # + # Preflight check. + # + def preflight(self): + pass + # update_atime() # # Update the atime of an artifact. @@ -667,21 +767,16 @@ class ArtifactCache(): else: headroom = 2e9 - artifactdir_volume = self.context.artifactdir - while not os.path.exists(artifactdir_volume): - artifactdir_volume = os.path.dirname(artifactdir_volume) - try: - cache_quota = utils._parse_size(self.context.config_cache_quota, artifactdir_volume) + cache_quota = utils._parse_size(self.context.config_cache_quota, + self.context.artifactdir) except utils.UtilError as e: raise LoadError(LoadErrorReason.INVALID_DATA, "{}\nPlease specify the value in bytes or as a % of full disk space.\n" "\nValid values are, for example: 800M 10G 1T 50%\n" .format(str(e))) from e - stat = os.statvfs(artifactdir_volume) - available_space = (stat.f_bsize * stat.f_bavail) - + total_size, available_space = self._get_cache_volume_size() cache_size = self.get_cache_size() # Ensure system has enough storage for the cache_quota @@ -697,15 +792,22 @@ class ArtifactCache(): "Invalid cache quota ({}): ".format(utils._pretty_size(cache_quota)) + "BuildStream requires a minimum cache quota of 2G.") elif cache_quota > cache_size + available_space: # Check maximum - raise LoadError(LoadErrorReason.INVALID_DATA, - ("Your system does not have enough available " + - "space to support the cache quota specified.\n" + - "You currently have:\n" + - "- {used} of cache in use at {local_cache_path}\n" + - "- {available} of available system storage").format( - used=utils._pretty_size(cache_size), - local_cache_path=self.context.artifactdir, - available=utils._pretty_size(available_space))) + if '%' in self.context.config_cache_quota: + available = (available_space / total_size) * 100 + available = '{}% of total disk space'.format(round(available, 1)) + else: + available = utils._pretty_size(available_space) + + raise ArtifactError("Your system does not have enough available " + + "space to support the cache quota specified.", + detail=("You have specified a quota of {quota} total disk space.\n" + + "The filesystem containing {local_cache_path} only " + + "has {available_size} available.") + .format( + quota=self.context.config_cache_quota, + local_cache_path=self.context.artifactdir, + available_size=available), + reason='insufficient-storage-for-quota') # Place a slight headroom (2e9 (2GB) on the cache_quota) into # cache_quota to try and avoid exceptions. @@ -714,9 +816,26 @@ class ArtifactCache(): # if we end up writing more than 2G, but hey, this stuff is # already really fuzzy. # + self._cache_quota_original = cache_quota self._cache_quota = cache_quota - headroom self._cache_lower_threshold = self._cache_quota / 2 + # _get_cache_volume_size() + # + # Get the available space and total space for the volume on + # which the artifact cache is located. + # + # Returns: + # (int): The total number of bytes on the volume + # (int): The number of available bytes on the volume + # + # NOTE: We use this stub to allow the test cases + # to override what an artifact cache thinks + # about it's disk size and available bytes. + # + def _get_cache_volume_size(self): + return utils._get_volume_size(self.context.artifactdir) + # _configured_remote_artifact_cache_specs(): # diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 20fc9847d..9bad0df5e 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -61,7 +61,6 @@ class BlobNotFound(ArtifactError): # # Args: # context (Context): The BuildStream context -# enable_push (bool): Whether pushing is allowed by the platform # # Pushing is explicitly disabled by the platform in some cases, # like when we are falling back to functioning without using @@ -69,7 +68,7 @@ class BlobNotFound(ArtifactError): # class CASCache(ArtifactCache): - def __init__(self, context, *, enable_push=True): + def __init__(self, context): super().__init__(context) self.casdir = os.path.join(context.artifactdir, 'cas') @@ -78,8 +77,6 @@ class CASCache(ArtifactCache): self._calculate_cache_quota() - self._enable_push = enable_push - # Per-project list of _CASRemote instances. self._remotes = {} @@ -90,6 +87,12 @@ class CASCache(ArtifactCache): # Implementation of abstract methods # ################################################ + def preflight(self): + if (not os.path.isdir(os.path.join(self.casdir, 'refs', 'heads')) or + not os.path.isdir(os.path.join(self.casdir, 'objects'))): + raise ArtifactError("CAS repository check failed for '{}'" + .format(self.casdir)) + def contains(self, element, key): refpath = self._refpath(self.get_artifact_fullname(element, key)) @@ -221,7 +224,7 @@ class CASCache(ArtifactCache): return bool(remotes_for_project) def has_push_remotes(self, *, element=None): - if not self._has_push_remotes or not self._enable_push: + if not self._has_push_remotes: # No project has push remotes return False elif element is None: diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index e18689c78..4a9d5191a 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -37,8 +37,6 @@ from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc from .._exceptions import ArtifactError from .._context import Context -from .cascache import CASCache - # The default limit for gRPC messages is 4 MiB. # Limit payload to 1 MiB to leave sufficient headroom for metadata. @@ -50,6 +48,13 @@ class ArtifactTooLargeException(Exception): pass +# We need a message handler because this will own an ArtifactCache +# which can in turn fire messages. +def message_handler(message, context): + logging.info(message.message) + logging.info(message.detail) + + # create_server(): # # Create gRPC CAS artifact server as specified in the Remote Execution API. @@ -63,8 +68,9 @@ def create_server(repo, *, enable_push, min_head_size=int(2e9)): context = Context() context.artifactdir = os.path.abspath(repo) + context.set_message_handler(message_handler) - artifactcache = CASCache(context) + artifactcache = context.artifactcache # Use max_workers default from Python 3.5+ max_workers = (os.cpu_count() or 1) * 5 diff --git a/buildstream/_context.py b/buildstream/_context.py index 8dde091d3..997b82959 100644 --- a/buildstream/_context.py +++ b/buildstream/_context.py @@ -29,7 +29,8 @@ from . import _yaml from ._exceptions import LoadError, LoadErrorReason, BstError from ._message import Message, MessageType from ._profile import Topics, profile_start, profile_end -from ._artifactcache import ArtifactCache +from ._artifactcache import ArtifactCache, ArtifactCacheUsage +from ._artifactcache.cascache import CASCache from ._workspaces import Workspaces from .plugin import Plugin @@ -113,6 +114,7 @@ class Context(): self._cache_key = None self._message_handler = None self._message_depth = deque() + self._artifactcache = None self._projects = [] self._project_overrides = {} self._workspaces = None @@ -228,6 +230,23 @@ class Context(): "{}: on-error should be one of: {}".format( provenance, ", ".join(valid_actions))) + @property + def artifactcache(self): + if not self._artifactcache: + self._artifactcache = CASCache(self) + + return self._artifactcache + + # get_artifact_cache_usage() + # + # Fetches the current usage of the artifact cache + # + # Returns: + # (ArtifactCacheUsage): The current status + # + def get_artifact_cache_usage(self): + return ArtifactCacheUsage(self.artifactcache) + # add_project(): # # Add a project to the context. diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py index 312f11d0c..f48d9cbe1 100644 --- a/buildstream/_frontend/app.py +++ b/buildstream/_frontend/app.py @@ -199,7 +199,7 @@ class App(): if option_value is not None: setattr(self.context, context_attr, option_value) try: - Platform.create_instance(self.context) + Platform.get_platform() except BstError as e: self._error_exit(e, "Error instantiating platform") @@ -215,6 +215,13 @@ class App(): # Propagate pipeline feedback to the user self.context.set_message_handler(self._message_handler) + # Preflight the artifact cache after initializing logging, + # this can cause messages to be emitted. + try: + self.context.artifactcache.preflight() + except BstError as e: + self._error_exit(e, "Error instantiating artifact cache") + # # Load the Project # diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py index fd1a5acf1..9ec7d75e0 100644 --- a/buildstream/_frontend/status.py +++ b/buildstream/_frontend/status.py @@ -353,13 +353,17 @@ class _StatusHeader(): def render(self, line_length, elapsed): project = self._context.get_toplevel_project() line_length = max(line_length, 80) - size = 0 - text = '' + # + # Line 1: Session time, project name, session / total elements + # + # ========= 00:00:00 project-name (143/387) ========= + # session = str(len(self._stream.session_elements)) total = str(len(self._stream.total_elements)) - # Format and calculate size for target and overall time code + size = 0 + text = '' size += len(total) + len(session) + 4 # Size for (N/N) with a leading space size += 8 # Size of time code size += len(project.name) + 1 @@ -372,6 +376,12 @@ class _StatusHeader(): self._format_profile.fmt(')') line1 = self._centered(text, size, line_length, '=') + + # + # Line 2: Dynamic list of queue status reports + # + # (Fetched:0 117 0)→ (Built:4 0 0) + # size = 0 text = '' @@ -389,10 +399,28 @@ class _StatusHeader(): line2 = self._centered(text, size, line_length, ' ') - size = 24 - text = self._format_profile.fmt("~~~~~ ") + \ - self._content_profile.fmt('Active Tasks') + \ - self._format_profile.fmt(" ~~~~~") + # + # Line 3: Cache usage percentage report + # + # ~~~~~~ cache: 69% ~~~~~~ + # + usage = self._context.get_artifact_cache_usage() + usage_percent = '{}%'.format(usage.used_percent) + + size = 21 + size += len(usage_percent) + if usage.used_percent >= 95: + formatted_usage_percent = self._error_profile.fmt(usage_percent) + elif usage.used_percent >= 80: + formatted_usage_percent = self._content_profile.fmt(usage_percent) + else: + formatted_usage_percent = self._success_profile.fmt(usage_percent) + + text = self._format_profile.fmt("~~~~~~ ") + \ + self._content_profile.fmt('cache') + \ + self._format_profile.fmt(': ') + \ + formatted_usage_percent + \ + self._format_profile.fmt(' ~~~~~~') line3 = self._centered(text, size, line_length, ' ') return line1 + '\n' + line2 + '\n' + line3 diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py index 3a41e1052..a772f3248 100644 --- a/buildstream/_frontend/widget.py +++ b/buildstream/_frontend/widget.py @@ -452,6 +452,7 @@ class LogLine(Widget): values["Session Start"] = starttime.strftime('%A, %d-%m-%Y at %H:%M:%S') values["Project"] = "{} ({})".format(project.name, project.directory) values["Targets"] = ", ".join([t.name for t in stream.targets]) + values["Cache Usage"] = "{}".format(context.get_artifact_cache_usage()) text += self._format_values(values) # User configurations diff --git a/buildstream/_loader/loader.py b/buildstream/_loader/loader.py index eec60b193..1d7476776 100644 --- a/buildstream/_loader/loader.py +++ b/buildstream/_loader/loader.py @@ -28,7 +28,6 @@ from .. import Consistency from .. import _yaml from ..element import Element from .._profile import Topics, profile_start, profile_end -from .._platform import Platform from .._includes import Includes from .types import Symbol, Dependency @@ -533,8 +532,7 @@ class Loader(): raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Expected junction but element kind is {}".format(filename, meta_element.kind)) - platform = Platform.get_platform() - element = Element._new_from_meta(meta_element, platform.artifactcache) + element = Element._new_from_meta(meta_element, self._context.artifactcache) element._preflight() for source in element.sources(): diff --git a/buildstream/_platform/linux.py b/buildstream/_platform/linux.py index a5fd0d687..7af1a2283 100644 --- a/buildstream/_platform/linux.py +++ b/buildstream/_platform/linux.py @@ -17,11 +17,11 @@ # Authors: # Tristan Maat <tristan.maat@codethink.co.uk> +import os import subprocess from .. import _site from .. import utils -from .._artifactcache.cascache import CASCache from .._message import Message, MessageType from ..sandbox import SandboxBwrap @@ -30,17 +30,15 @@ from . import Platform class Linux(Platform): - def __init__(self, context): + def __init__(self): - super().__init__(context) + super().__init__() - self._die_with_parent_available = _site.check_bwrap_version(0, 1, 8) - self._user_ns_available = self._check_user_ns_available(context) - self._artifact_cache = CASCache(context, enable_push=self._user_ns_available) + self._uid = os.geteuid() + self._gid = os.getegid() - @property - def artifactcache(self): - return self._artifact_cache + self._die_with_parent_available = _site.check_bwrap_version(0, 1, 8) + self._user_ns_available = self._check_user_ns_available() def create_sandbox(self, *args, **kwargs): # Inform the bubblewrap sandbox as to whether it can use user namespaces or not @@ -48,10 +46,19 @@ class Linux(Platform): kwargs['die_with_parent_available'] = self._die_with_parent_available return SandboxBwrap(*args, **kwargs) + def check_sandbox_config(self, config): + if self._user_ns_available: + # User namespace support allows arbitrary build UID/GID settings. + return True + else: + # Without user namespace support, the UID/GID in the sandbox + # will match the host UID/GID. + return config.build_uid == self._uid and config.build_gid == self._gid + ################################################ # Private Methods # ################################################ - def _check_user_ns_available(self, context): + def _check_user_ns_available(self): # Here, lets check if bwrap is able to create user namespaces, # issue a warning if it's not available, and save the state @@ -75,9 +82,4 @@ class Linux(Platform): return True else: - context.message( - Message(None, MessageType.WARN, - "Unable to create user namespaces with bubblewrap, resorting to fallback", - detail="Some builds may not function due to lack of uid / gid 0, " + - "artifacts created will not be trusted for push purposes.")) return False diff --git a/buildstream/_platform/platform.py b/buildstream/_platform/platform.py index 8a074eb62..b37964986 100644 --- a/buildstream/_platform/platform.py +++ b/buildstream/_platform/platform.py @@ -29,17 +29,13 @@ class Platform(): # Platform() # # A class to manage platform-specific details. Currently holds the - # sandbox factory, the artifact cache and staging operations, as - # well as platform helpers. + # sandbox factory as well as platform helpers. # - # Args: - # context (context): The project context - # - def __init__(self, context): - self.context = context + def __init__(self): + pass @classmethod - def create_instance(cls, *args, **kwargs): + def _create_instance(cls): if sys.platform.startswith('linux'): backend = 'linux' else: @@ -58,23 +54,15 @@ class Platform(): else: raise PlatformError("No such platform: '{}'".format(backend)) - cls._instance = PlatformImpl(*args, **kwargs) + cls._instance = PlatformImpl() @classmethod def get_platform(cls): if not cls._instance: - raise PlatformError("Platform needs to be initialized first") + cls._create_instance() return cls._instance ################################################################## - # Platform properties # - ################################################################## - @property - def artifactcache(self): - raise ImplError("Platform {platform} does not implement an artifactcache" - .format(platform=type(self).__name__)) - - ################################################################## # Sandbox functions # ################################################################## @@ -92,3 +80,7 @@ class Platform(): def create_sandbox(self, *args, **kwargs): raise ImplError("Platform {platform} does not implement create_sandbox()" .format(platform=type(self).__name__)) + + def check_sandbox_config(self, config): + raise ImplError("Platform {platform} does not implement check_sandbox_config()" + .format(platform=type(self).__name__)) diff --git a/buildstream/_platform/unix.py b/buildstream/_platform/unix.py index 0306a4ac5..7aa8cbc0d 100644 --- a/buildstream/_platform/unix.py +++ b/buildstream/_platform/unix.py @@ -19,7 +19,6 @@ import os -from .._artifactcache.cascache import CASCache from .._exceptions import PlatformError from ..sandbox import SandboxChroot @@ -28,18 +27,21 @@ from . import Platform class Unix(Platform): - def __init__(self, context): + def __init__(self): - super().__init__(context) - self._artifact_cache = CASCache(context) + super().__init__() + + self._uid = os.geteuid() + self._gid = os.getegid() # Not necessarily 100% reliable, but we want to fail early. - if os.geteuid() != 0: + if self._uid != 0: raise PlatformError("Root privileges are required to run without bubblewrap.") - @property - def artifactcache(self): - return self._artifact_cache - def create_sandbox(self, *args, **kwargs): return SandboxChroot(*args, **kwargs) + + def check_sandbox_config(self, config): + # With the chroot sandbox, the UID/GID in the sandbox + # will match the host UID/GID (typically 0/0). + return config.build_uid == self._uid and config.build_gid == self._gid diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py index fb56ca016..a96b92353 100644 --- a/buildstream/_scheduler/jobs/cachesizejob.py +++ b/buildstream/_scheduler/jobs/cachesizejob.py @@ -17,7 +17,6 @@ # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # from .job import Job, JobStatus -from ..._platform import Platform class CacheSizeJob(Job): @@ -25,8 +24,8 @@ class CacheSizeJob(Job): super().__init__(*args, **kwargs) self._complete_cb = complete_cb - platform = Platform.get_platform() - self._artifacts = platform.artifactcache + context = self._scheduler.context + self._artifacts = context.artifactcache def child_process(self): return self._artifacts.compute_cache_size() diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py index 97b45901f..a1d49f339 100644 --- a/buildstream/_scheduler/jobs/cleanupjob.py +++ b/buildstream/_scheduler/jobs/cleanupjob.py @@ -17,7 +17,6 @@ # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # from .job import Job, JobStatus -from ..._platform import Platform class CleanupJob(Job): @@ -25,11 +24,24 @@ class CleanupJob(Job): super().__init__(*args, **kwargs) self._complete_cb = complete_cb - platform = Platform.get_platform() - self._artifacts = platform.artifactcache + context = self._scheduler.context + self._artifacts = context.artifactcache def child_process(self): - return self._artifacts.clean() + def progress(): + self.send_message('update-cache-size', + self._artifacts.get_cache_size()) + return self._artifacts.clean(progress) + + def handle_message(self, message_type, message): + + # Update the cache size in the main process as we go, + # this provides better feedback in the UI. + if message_type == 'update-cache-size': + self._artifacts.set_cache_size(message) + return True + + return False def parent_complete(self, status, result): if status == JobStatus.OK: diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index 348204750..b8b4a2c76 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -58,10 +58,10 @@ class JobStatus(): # Used to distinguish between status messages and return values -class Envelope(): +class _Envelope(): def __init__(self, message_type, message): - self._message_type = message_type - self._message = message + self.message_type = message_type + self.message = message # Process class that doesn't call waitpid on its own. @@ -263,10 +263,37 @@ class Job(): def set_task_id(self, task_id): self._task_id = task_id + # send_message() + # + # To be called from inside Job.child_process() implementations + # to send messages to the main process during processing. + # + # These messages will be processed by the class's Job.handle_message() + # implementation. + # + def send_message(self, message_type, message): + self._queue.put(_Envelope(message_type, message)) + ####################################################### # Abstract Methods # ####################################################### + # handle_message() + # + # Handle a custom message. This will be called in the main process in + # response to any messages sent to the main proces using the + # Job.send_message() API from inside a Job.child_process() implementation + # + # Args: + # message_type (str): A string to identify the message type + # message (any): A simple serializable object + # + # Returns: + # (bool): Should return a truthy value if message_type is handled. + # + def handle_message(self, message_type, message): + return False + # parent_complete() # # This will be executed after the job finishes, and is expected to @@ -404,7 +431,7 @@ class Job(): elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox) - self._queue.put(Envelope('child_data', self.child_process_data())) + self._queue.put(_Envelope('child_data', self.child_process_data())) # Report the exception to the parent (for internal testing purposes) self._child_send_error(e) @@ -430,7 +457,7 @@ class Job(): else: # No exception occurred in the action - self._queue.put(Envelope('child_data', self.child_process_data())) + self._queue.put(_Envelope('child_data', self.child_process_data())) self._child_send_result(result) elapsed = datetime.datetime.now() - starttime @@ -457,7 +484,7 @@ class Job(): domain = e.domain reason = e.reason - envelope = Envelope('error', { + envelope = _Envelope('error', { 'domain': domain, 'reason': reason }) @@ -475,7 +502,7 @@ class Job(): # def _child_send_result(self, result): if result is not None: - envelope = Envelope('result', result) + envelope = _Envelope('result', result) self._queue.put(envelope) # _child_shutdown() @@ -512,7 +539,7 @@ class Job(): if message.message_type == MessageType.LOG: return - self._queue.put(Envelope('message', message)) + self._queue.put(_Envelope('message', message)) # _parent_shutdown() # @@ -573,24 +600,28 @@ class Job(): if not self._listening: return - if envelope._message_type == 'message': + if envelope.message_type == 'message': # Propagate received messages from children # back through the context. - self._scheduler.context.message(envelope._message) - elif envelope._message_type == 'error': + self._scheduler.context.message(envelope.message) + elif envelope.message_type == 'error': # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py - set_last_task_error(envelope._message['domain'], - envelope._message['reason']) - elif envelope._message_type == 'result': + set_last_task_error(envelope.message['domain'], + envelope.message['reason']) + elif envelope.message_type == 'result': assert self._result is None - self._result = envelope._message - elif envelope._message_type == 'child_data': + self._result = envelope.message + elif envelope.message_type == 'child_data': # If we retry a job, we assign a new value to this - self.child_data = envelope._message - else: - raise Exception() + self.child_data = envelope.message + + # Try Job subclass specific messages now + elif not self.handle_message(envelope.message_type, + envelope.message): + assert 0, "Unhandled message type '{}': {}" \ + .format(envelope.message_type, envelope.message) # _parent_process_queue() # diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index df8364552..49ded7e2c 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -21,7 +21,6 @@ from . import Queue, QueueStatus from ..jobs import JobStatus from ..resources import ResourceType -from ..._platform import Platform # A queue which assembles elements @@ -55,8 +54,8 @@ class BuildQueue(Queue): # as returned from Element._assemble() to the estimated # artifact cache size # - platform = Platform.get_platform() - artifacts = platform.artifactcache + context = self._scheduler.context + artifacts = context.artifactcache artifacts.add_artifact_size(artifact_size) diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 30e70a4fc..68c115c1b 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -29,7 +29,6 @@ from contextlib import contextmanager # Local imports from .resources import Resources, ResourceType from .jobs import JobStatus, CacheSizeJob, CleanupJob -from .._platform import Platform # A decent return code for Scheduler.run() @@ -286,6 +285,8 @@ class Scheduler(): # Callback for the cache size job def _cache_size_job_complete(self, status, cache_size): + context = self.context + artifacts = context.artifactcache # Deallocate cache size job resources self._cache_size_running = None @@ -295,9 +296,6 @@ class Scheduler(): if status != JobStatus.OK: return - platform = Platform.get_platform() - artifacts = platform.artifactcache - if artifacts.has_quota_exceeded(): self._cleanup_scheduled = True diff --git a/buildstream/_stream.py b/buildstream/_stream.py index de3ae464c..310bfbde0 100644 --- a/buildstream/_stream.py +++ b/buildstream/_stream.py @@ -32,7 +32,6 @@ from ._exceptions import StreamError, ImplError, BstError from ._message import Message, MessageType from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue from ._pipeline import Pipeline, PipelineSelection -from ._platform import Platform from . import utils, _yaml, _site from . import Scope, Consistency @@ -71,8 +70,7 @@ class Stream(): # # Private members # - self._platform = Platform.get_platform() - self._artifacts = self._platform.artifactcache + self._artifacts = context.artifactcache self._context = context self._project = project self._pipeline = Pipeline(context, project, self._artifacts) diff --git a/buildstream/element.py b/buildstream/element.py index 7be27cb2a..898ff0784 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -269,6 +269,16 @@ class Element(Plugin): # Extract Sandbox config self.__sandbox_config = self.__extract_sandbox_config(meta) + # Extract Sandbox config + self.__sandbox_config = self.__extract_sandbox_config(meta) + + self.__sandbox_config_supported = True + platform = Platform.get_platform() + if not platform.check_sandbox_config(self.__sandbox_config): + # Local sandbox does not fully support specified sandbox config. + # This will taint the artifact, disable pushing. + self.__sandbox_config_supported = False + def __lt__(self, other): return self.name < other.name @@ -1483,6 +1493,11 @@ class Element(Plugin): context = self._get_context() with self._output_file() as output_file: + if not self.__sandbox_config_supported: + self.warn("Sandbox configuration is not supported by the platform.", + detail="Falling back to UID {} GID {}. Artifact will not be pushed." + .format(self.__sandbox_config.build_uid, self.__sandbox_config.build_gid)) + # Explicitly clean it up, keep the build dir around if exceptions are raised os.makedirs(context.builddir, exist_ok=True) rootdir = tempfile.mkdtemp(prefix="{}-".format(self.normal_name), dir=context.builddir) @@ -2042,7 +2057,8 @@ class Element(Plugin): workspaced_dependencies = self.__get_artifact_metadata_workspaced_dependencies() # Other conditions should be or-ed - self.__tainted = workspaced or workspaced_dependencies + self.__tainted = (workspaced or workspaced_dependencies or + not self.__sandbox_config_supported) return self.__tainted diff --git a/buildstream/utils.py b/buildstream/utils.py index e3e9dc8c0..d02777897 100644 --- a/buildstream/utils.py +++ b/buildstream/utils.py @@ -583,6 +583,27 @@ def _get_dir_size(path): return get_size(path) +# _get_volume_size(): +# +# Gets the overall usage and total size of a mounted filesystem in bytes. +# +# Args: +# path (str): The path to check +# +# Returns: +# (int): The total number of bytes on the volume +# (int): The number of available bytes on the volume +# +def _get_volume_size(path): + try: + stat_ = os.statvfs(path) + except OSError as e: + raise UtilError("Failed to retrieve stats on volume for path '{}': {}" + .format(path, e)) from e + + return stat_.f_bsize * stat_.f_blocks, stat_.f_bsize * stat_.f_bavail + + # _parse_size(): # # Convert a string representing data size to a number of @@ -617,8 +638,7 @@ def _parse_size(size, volume): if num > 100: raise UtilError("{}% is not a valid percentage value.".format(num)) - stat_ = os.statvfs(volume) - disk_size = stat_.f_blocks * stat_.f_bsize + disk_size, _ = _get_volume_size(volume) return disk_size * (num / 100) diff --git a/tests/artifactcache/cache_size.py b/tests/artifactcache/cache_size.py index 0d12cda8c..63ab9ad07 100644 --- a/tests/artifactcache/cache_size.py +++ b/tests/artifactcache/cache_size.py @@ -1,8 +1,10 @@ import os import pytest +from unittest import mock from buildstream import _yaml from buildstream._artifactcache import CACHE_SIZE_FILE +from buildstream._exceptions import ErrorDomain from tests.testutils import cli, create_element_size @@ -60,3 +62,29 @@ def test_cache_size_write(cli, tmpdir): with open(sizefile, "r") as f: size_data = f.read() size = int(size_data) + + +def test_quota_over_1024T(cli, tmpdir): + KiB = 1024 + MiB = (KiB * 1024) + GiB = (MiB * 1024) + TiB = (GiB * 1024) + + cli.configure({ + 'cache': { + 'quota': 2048 * TiB + } + }) + project = tmpdir.join("main") + os.makedirs(str(project)) + _yaml.dump({'name': 'main'}, str(project.join("project.conf"))) + + volume_space_patch = mock.patch( + "buildstream._artifactcache.ArtifactCache._get_cache_volume_size", + autospec=True, + return_value=(1025 * TiB, 1025 * TiB) + ) + + with volume_space_patch: + result = cli.run(project, args=["build", "file.bst"]) + result.assert_main_error(ErrorDomain.ARTIFACT, 'insufficient-storage-for-quota') diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py index f8b928cbf..ce8e6c9e8 100644 --- a/tests/artifactcache/expiry.py +++ b/tests/artifactcache/expiry.py @@ -1,6 +1,7 @@ import os import pytest +from unittest import mock from buildstream import _yaml from buildstream._exceptions import ErrorDomain, LoadErrorReason @@ -282,18 +283,28 @@ def test_never_delete_required_track(cli, datafiles, tmpdir): # Ensure that only valid cache quotas make it through the loading # process. -@pytest.mark.parametrize("quota,success", [ - ("1", True), - ("1K", True), - ("50%", True), - ("infinity", True), - ("0", True), - ("-1", False), - ("pony", False), - ("200%", False) +# +# This test virtualizes the condition to assume a storage volume +# has 10K total disk space, and 6K of it is already in use (not +# including any space used by the artifact cache). +# +@pytest.mark.parametrize("quota,err_domain,err_reason", [ + # Valid configurations + ("1", 'success', None), + ("1K", 'success', None), + ("50%", 'success', None), + ("infinity", 'success', None), + ("0", 'success', None), + # Invalid configurations + ("-1", ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA), + ("pony", ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA), + ("200%", ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA), + # Not enough space for these caches + ("7K", ErrorDomain.ARTIFACT, 'insufficient-storage-for-quota'), + ("70%", ErrorDomain.ARTIFACT, 'insufficient-storage-for-quota') ]) @pytest.mark.datafiles(DATA_DIR) -def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, success): +def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, err_domain, err_reason): project = os.path.join(datafiles.dirname, datafiles.basename) os.makedirs(os.path.join(project, 'elements')) @@ -303,11 +314,39 @@ def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, success): } }) - res = cli.run(project=project, args=['workspace', 'list']) - if success: + # We patch how we get space information + # Ideally we would instead create a FUSE device on which we control + # everything. + # If the value is a percentage, we fix the current values to take into + # account the block size, since this is important in how we compute the size + + if quota.endswith("%"): # We set the used space at 60% of total space + stats = os.statvfs(".") + free_space = 0.6 * stats.f_bsize * stats.f_blocks + total_space = stats.f_bsize * stats.f_blocks + else: + free_space = 6000 + total_space = 10000 + + volume_space_patch = mock.patch( + "buildstream._artifactcache.ArtifactCache._get_cache_volume_size", + autospec=True, + return_value=(total_space, free_space), + ) + + cache_size_patch = mock.patch( + "buildstream._artifactcache.ArtifactCache.get_cache_size", + autospec=True, + return_value=0, + ) + + with volume_space_patch, cache_size_patch: + res = cli.run(project=project, args=['workspace', 'list']) + + if err_domain == 'success': res.assert_success() else: - res.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA) + res.assert_main_error(err_domain, err_reason) @pytest.mark.datafiles(DATA_DIR) diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index 1e417c592..c7987e02c 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -2,6 +2,7 @@ import string import pytest import subprocess import os +import sys import shutil import signal from collections import namedtuple @@ -10,7 +11,6 @@ from contextlib import contextmanager from multiprocessing import Process, Queue from buildstream import _yaml -from buildstream._artifactcache.cascache import CASCache from buildstream._artifactcache.casserver import create_server from buildstream._context import Context from buildstream._exceptions import ArtifactError @@ -50,8 +50,9 @@ class ArtifactShare(): context = Context() context.artifactdir = self.repodir + context.set_message_handler(self._message_handler) - self.cas = CASCache(context) + self.cas = context.artifactcache self.total_space = total_space self.free_space = free_space @@ -167,6 +168,13 @@ class ArtifactShare(): f_bavail=self.free_space - repo_size, f_bsize=1) + def _message_handler(self, message, context): + # We need a message handler because this will own an ArtifactCache + # which can in turn fire messages. + + # Just unconditionally print the messages to stderr + print(message.message, file=sys.stderr) + # create_artifact_share() # |