diff options
author | Tristan Van Berkom <tristan.van.berkom@gmail.com> | 2018-09-10 09:23:06 +0000 |
---|---|---|
committer | Tristan Van Berkom <tristan.van.berkom@gmail.com> | 2018-09-10 09:23:06 +0000 |
commit | 958ef8ef61eee2221af68f5e1b77c6976dde749f (patch) | |
tree | 3565612947bdcdc4fba11e7ca5ebef504dcb704b | |
parent | 886a836effa2ddaf740a1281487356e775d0c4f0 (diff) | |
parent | 6db54c8f156ad66572e760c38b96bcc355f586a5 (diff) | |
download | buildstream-958ef8ef61eee2221af68f5e1b77c6976dde749f.tar.gz |
Merge branch 'tristan/fix-cache-exclusivity' into 'master'
_scheduler/queues: Mark build and pull queue as requiring shared access to the CACHE
See merge request BuildStream/buildstream!775
-rw-r--r-- | buildstream/_artifactcache/artifactcache.py | 169 | ||||
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 8 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/cachesizejob.py | 14 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/cleanupjob.py | 14 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/elementjob.py | 6 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/buildqueue.py | 34 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pullqueue.py | 4 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 2 | ||||
-rw-r--r-- | buildstream/_scheduler/resources.py | 2 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 45 | ||||
-rw-r--r-- | buildstream/element.py | 27 | ||||
-rw-r--r-- | tests/artifactcache/expiry.py | 19 |
12 files changed, 198 insertions, 146 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index f162c7f08..9ac68b102 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -81,19 +81,16 @@ ArtifactCacheSpec.__new__.__defaults__ = (None, None, None) class ArtifactCache(): def __init__(self, context): self.context = context - self.required_artifacts = set() self.extractdir = os.path.join(context.artifactdir, 'extract') self.tmpdir = os.path.join(context.artifactdir, 'tmp') - self.estimated_size = None - self.global_remote_specs = [] self.project_remote_specs = {} - self._local = False - self.cache_size = None - self.cache_quota = None - self.cache_lower_threshold = None + self._required_artifacts = set() # The artifacts required for this session + self._cache_size = None # The current cache size, sometimes it's an estimate + self._cache_quota = None # The cache quota + self._cache_lower_threshold = None # The target cache size for a cleanup os.makedirs(self.extractdir, exist_ok=True) os.makedirs(self.tmpdir, exist_ok=True) @@ -212,8 +209,8 @@ class ArtifactCache(): weak_key = element._get_cache_key(strength=_KeyStrength.WEAK) for key in (strong_key, weak_key): - if key and key not in self.required_artifacts: - self.required_artifacts.add(key) + if key and key not in self._required_artifacts: + self._required_artifacts.add(key) # We also update the usage times of any artifacts # we will be using, which helps preventing a @@ -228,10 +225,16 @@ class ArtifactCache(): # # Clean the artifact cache as much as possible. # + # Returns: + # (int): The size of the cache after having cleaned up + # def clean(self): artifacts = self.list_artifacts() - while self.calculate_cache_size() >= self.cache_quota - self.cache_lower_threshold: + # Do a real computation of the cache size once, just in case + self.compute_cache_size() + + while self.get_cache_size() >= self._cache_lower_threshold: try: to_remove = artifacts.pop(0) except IndexError: @@ -245,7 +248,7 @@ class ArtifactCache(): "Please increase the cache-quota in {}." .format(self.context.config_origin or default_conf)) - if self.calculate_cache_size() > self.cache_quota: + if self.get_quota_exceeded(): raise ArtifactError("Cache too full. Aborting.", detail=detail, reason="cache-too-full") @@ -253,46 +256,94 @@ class ArtifactCache(): break key = to_remove.rpartition('/')[2] - if key not in self.required_artifacts: + if key not in self._required_artifacts: + + # Remove the actual artifact, if it's not required. size = self.remove(to_remove) - if size: - self.cache_size -= size + + # Remove the size from the removed size + self.set_cache_size(self._cache_size - size) # This should be O(1) if implemented correctly - return self.calculate_cache_size() + return self.get_cache_size() - # get_approximate_cache_size() + # compute_cache_size() # - # A cheap method that aims to serve as an upper limit on the - # artifact cache size. + # Computes the real artifact cache size by calling + # the abstract calculate_cache_size() method. # - # The cache size reported by this function will normally be larger - # than the real cache size, since it is calculated using the - # pre-commit artifact size, but for very small artifacts in - # certain caches additional overhead could cause this to be - # smaller than, but close to, the actual size. + # Returns: + # (int): The size of the artifact cache. + # + def compute_cache_size(self): + self._cache_size = self.calculate_cache_size() + + return self._cache_size + + # add_artifact_size() # - # Nonetheless, in practice this should be safe to use as an upper - # limit on the cache size. + # Adds the reported size of a newly cached artifact to the + # overall estimated size. # - # If the cache has built-in constant-time size reporting, please - # feel free to override this method with a more accurate - # implementation. + # Args: + # artifact_size (int): The size to add. + # + def add_artifact_size(self, artifact_size): + cache_size = self.get_cache_size() + cache_size += artifact_size + + self.set_cache_size(cache_size) + + # get_cache_size() + # + # Fetches the cached size of the cache, this is sometimes + # an estimate and periodically adjusted to the real size + # when a cache size calculation job runs. + # + # When it is an estimate, the value is either correct, or + # it is greater than the actual cache size. # # Returns: # (int) An approximation of the artifact cache size. # - def get_approximate_cache_size(self): - # If we don't currently have an estimate, figure out the real - # cache size. - if self.estimated_size is None: + def get_cache_size(self): + + # If we don't currently have an estimate, figure out the real cache size. + if self._cache_size is None: stored_size = self._read_cache_size() if stored_size is not None: - self.estimated_size = stored_size + self._cache_size = stored_size else: - self.estimated_size = self.calculate_cache_size() + self.compute_cache_size() + + return self._cache_size - return self.estimated_size + # set_cache_size() + # + # Forcefully set the overall cache size. + # + # This is used to update the size in the main process after + # having calculated in a cleanup or a cache size calculation job. + # + # Args: + # cache_size (int): The size to set. + # + def set_cache_size(self, cache_size): + + assert cache_size is not None + + self._cache_size = cache_size + self._write_cache_size(self._cache_size) + + # get_quota_exceeded() + # + # Checks if the current artifact cache size exceeds the quota. + # + # Returns: + # (bool): True of the quota is exceeded + # + def get_quota_exceeded(self): + return self.get_cache_size() > self._cache_quota ################################################ # Abstract methods for subclasses to implement # @@ -484,11 +535,8 @@ class ArtifactCache(): # # Return the real artifact cache size. # - # Implementations should also use this to update estimated_size. - # # Returns: - # - # (int) The size of the artifact cache. + # (int): The size of the artifact cache. # def calculate_cache_size(self): raise ImplError("Cache '{kind}' does not implement calculate_cache_size()" @@ -535,39 +583,13 @@ class ArtifactCache(): with self.context.timed_activity("Initializing remote caches", silent_nested=True): self.initialize_remotes(on_failure=remote_failed) - # _add_artifact_size() - # - # Since we cannot keep track of the cache size between threads, - # this method will be called by the main process every time a - # process that added something to the cache finishes. - # - # This will then add the reported size to - # ArtifactCache.estimated_size. - # - def _add_artifact_size(self, artifact_size): - if not self.estimated_size: - self.estimated_size = self.calculate_cache_size() - - self.estimated_size += artifact_size - self._write_cache_size(self.estimated_size) - - # _set_cache_size() - # - # Similarly to the above method, when we calculate the actual size - # in a child thread, we can't update it. We instead pass the value - # back to the main thread and update it there. - # - def _set_cache_size(self, cache_size): - self.estimated_size = cache_size - - # set_cache_size is called in cleanup, where it may set the cache to None - if self.estimated_size is not None: - self._write_cache_size(self.estimated_size) - # _write_cache_size() # # Writes the given size of the artifact to the cache's size file # + # Args: + # size (int): The size of the artifact cache to record + # def _write_cache_size(self, size): assert isinstance(size, int) size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE) @@ -579,6 +601,9 @@ class ArtifactCache(): # Reads and returns the size of the artifact cache that's stored in the # cache's size file # + # Returns: + # (int): The size of the artifact cache, as recorded in the file + # def _read_cache_size(self): size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE) @@ -628,13 +653,13 @@ class ArtifactCache(): stat = os.statvfs(artifactdir_volume) available_space = (stat.f_bsize * stat.f_bavail) - cache_size = self.get_approximate_cache_size() + cache_size = self.get_cache_size() # Ensure system has enough storage for the cache_quota # # If cache_quota is none, set it to the maximum it could possibly be. # - # Also check that cache_quota is atleast as large as our headroom. + # Also check that cache_quota is at least as large as our headroom. # if cache_quota is None: # Infinity, set to max system storage cache_quota = cache_size + available_space @@ -660,8 +685,8 @@ class ArtifactCache(): # if we end up writing more than 2G, but hey, this stuff is # already really fuzzy. # - self.cache_quota = cache_quota - headroom - self.cache_lower_threshold = self.cache_quota / 2 + self._cache_quota = cache_quota - headroom + self._cache_lower_threshold = self._cache_quota / 2 # _configured_remote_artifact_cache_specs(): diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index ce2b874da..a93ec01ea 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -122,8 +122,6 @@ class CASCache(ArtifactCache): for ref in refs: self.set_ref(ref, tree) - self.cache_size = None - def diff(self, element, key_a, key_b, *, subdir=None): ref_a = self.get_artifact_fullname(element, key_a) ref_b = self.get_artifact_fullname(element, key_b) @@ -530,11 +528,7 @@ class CASCache(ArtifactCache): raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e def calculate_cache_size(self): - if self.cache_size is None: - self.cache_size = utils._get_dir_size(self.casdir) - self.estimated_size = self.cache_size - - return self.cache_size + return utils._get_dir_size(self.casdir) # list_artifacts(): # diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py index f73a09c74..68cd91331 100644 --- a/buildstream/_scheduler/jobs/cachesizejob.py +++ b/buildstream/_scheduler/jobs/cachesizejob.py @@ -24,15 +24,19 @@ class CacheSizeJob(Job): def __init__(self, *args, complete_cb, **kwargs): super().__init__(*args, **kwargs) self._complete_cb = complete_cb - self._cache = Platform._instance.artifactcache + + platform = Platform.get_platform() + self._artifacts = platform.artifactcache def child_process(self): - return self._cache.calculate_cache_size() + return self._artifacts.compute_cache_size() def parent_complete(self, success, result): - self._cache._set_cache_size(result) - if self._complete_cb: - self._complete_cb(result) + if success: + self._artifacts.set_cache_size(result) + + if self._complete_cb: + self._complete_cb(result) def child_process_data(self): return {} diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py index bb78e8751..c22ce3b98 100644 --- a/buildstream/_scheduler/jobs/cleanupjob.py +++ b/buildstream/_scheduler/jobs/cleanupjob.py @@ -24,15 +24,19 @@ class CleanupJob(Job): def __init__(self, *args, complete_cb, **kwargs): super().__init__(*args, **kwargs) self._complete_cb = complete_cb - self._cache = Platform._instance.artifactcache + + platform = Platform.get_platform() + self._artifacts = platform.artifactcache def child_process(self): - return self._cache.clean() + return self._artifacts.clean() def parent_complete(self, success, result): - self._cache._set_cache_size(result) - if self._complete_cb: - self._complete_cb() + if success: + self._artifacts.set_cache_size(result) + + if self._complete_cb: + self._complete_cb() def child_process_data(self): return {} diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py index fcad20ce4..8ce5c062f 100644 --- a/buildstream/_scheduler/jobs/elementjob.py +++ b/buildstream/_scheduler/jobs/elementjob.py @@ -109,13 +109,7 @@ class ElementJob(Job): data = {} workspace = self._element._get_workspace() - artifact_size = self._element._get_artifact_size() - cache_size = self._element._get_artifact_cache().calculate_cache_size() - if workspace is not None: data['workspace'] = workspace.to_dict() - if artifact_size is not None: - data['artifact_size'] = artifact_size - data['cache_size'] = cache_size return data diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index 2009fce97..6e7ce04aa 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -24,6 +24,7 @@ from . import Queue, QueueStatus from ..jobs import ElementJob from ..resources import ResourceType from ..._message import MessageType +from ..._platform import Platform # A queue which assembles elements @@ -32,7 +33,7 @@ class BuildQueue(Queue): action_name = "Build" complete_name = "Built" - resources = [ResourceType.PROCESS] + resources = [ResourceType.PROCESS, ResourceType.CACHE] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -67,8 +68,7 @@ class BuildQueue(Queue): return super().enqueue(to_queue) def process(self, element): - element._assemble() - return element._get_unique_id() + return element._assemble() def status(self, element): # state of dependencies may have changed, recalculate element state @@ -87,18 +87,22 @@ class BuildQueue(Queue): return QueueStatus.READY - def _check_cache_size(self, job, element): - if not job.child_data: - return + def _check_cache_size(self, job, element, artifact_size): - artifact_size = job.child_data.get('artifact_size', False) + # After completing a build job, add the artifact size + # as returned from Element._assemble() to the estimated + # artifact cache size + # + platform = Platform.get_platform() + artifacts = platform.artifactcache - if artifact_size: - cache = element._get_artifact_cache() - cache._add_artifact_size(artifact_size) + artifacts.add_artifact_size(artifact_size) - if cache.get_approximate_cache_size() > cache.cache_quota: - self._scheduler._check_cache_size_real() + # If the estimated size outgrows the quota, ask the scheduler + # to queue a job to actually check the real cache size. + # + if artifacts.get_quota_exceeded(): + self._scheduler.check_cache_size() def done(self, job, element, result, success): @@ -106,8 +110,8 @@ class BuildQueue(Queue): # Inform element in main process that assembly is done element._assemble_done() - # This has to be done after _assemble_done, such that the - # element may register its cache key as required - self._check_cache_size(job, element) + # This has to be done after _assemble_done, such that the + # element may register its cache key as required + self._check_cache_size(job, element, result) return True diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index 5d732fcf8..e18967cf4 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -29,7 +29,7 @@ class PullQueue(Queue): action_name = "Pull" complete_name = "Pulled" - resources = [ResourceType.DOWNLOAD] + resources = [ResourceType.DOWNLOAD, ResourceType.CACHE] def process(self, element): # returns whether an artifact was downloaded or not @@ -62,7 +62,7 @@ class PullQueue(Queue): # Build jobs will check the "approximate" size first. Since we # do not get an artifact size from pull jobs, we have to # actually check the cache size. - self._scheduler._check_cache_size_real() + self._scheduler.check_cache_size() # Element._pull() returns True if it downloaded an artifact, # here we want to appear skipped if we did not download. diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 28da17711..472e033da 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -301,8 +301,6 @@ class Queue(): # Update values that need to be synchronized in the main task # before calling any queue implementation self._update_workspaces(element, job) - if job.child_data: - element._get_artifact_cache().cache_size = job.child_data.get('cache_size') # Give the result of the job to the Queue implementor, # and determine if it should be considered as processed diff --git a/buildstream/_scheduler/resources.py b/buildstream/_scheduler/resources.py index bbf851b06..fcf10d7bd 100644 --- a/buildstream/_scheduler/resources.py +++ b/buildstream/_scheduler/resources.py @@ -8,7 +8,7 @@ class ResourceType(): class Resources(): def __init__(self, num_builders, num_fetchers, num_pushers): self._max_resources = { - ResourceType.CACHE: 1, + ResourceType.CACHE: 0, ResourceType.DOWNLOAD: num_fetchers, ResourceType.PROCESS: num_builders, ResourceType.UPLOAD: num_pushers diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index aec9b6fc9..635b0628c 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -241,6 +241,25 @@ class Scheduler(): self._schedule_queue_jobs() self._sched() + # check_cache_size(): + # + # Queues a cache size calculation job, after the cache + # size is calculated, a cleanup job will be run automatically + # if needed. + # + # FIXME: This should ensure that only one cache size job + # is ever pending at a given time. If a cache size + # job is already running, it is correct to queue + # a new one, it is incorrect to have more than one + # of these jobs pending at a given time, though. + # + def check_cache_size(self): + job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size', + resources=[ResourceType.CACHE, + ResourceType.PROCESS], + complete_cb=self._run_cleanup) + self.schedule_jobs([job]) + ####################################################### # Local Private Methods # ####################################################### @@ -316,26 +335,32 @@ class Scheduler(): self.schedule_jobs(ready) self._sched() + # _run_cleanup() + # + # Schedules the cache cleanup job if the passed size + # exceeds the cache quota. + # + # Args: + # cache_size (int): The calculated cache size (ignored) + # + # NOTE: This runs in response to completion of the cache size + # calculation job lauched by Scheduler.check_cache_size(), + # which will report the calculated cache size. + # def _run_cleanup(self, cache_size): platform = Platform.get_platform() - if cache_size and cache_size < platform.artifactcache.cache_quota: + artifacts = platform.artifactcache + + if not artifacts.get_quota_exceeded(): return - job = CleanupJob(self, 'cleanup', 'cleanup', + job = CleanupJob(self, 'cleanup', 'cleanup/cleanup', resources=[ResourceType.CACHE, ResourceType.PROCESS], exclusive_resources=[ResourceType.CACHE], complete_cb=None) self.schedule_jobs([job]) - def _check_cache_size_real(self): - job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size', - resources=[ResourceType.CACHE, - ResourceType.PROCESS], - exclusive_resources=[ResourceType.CACHE], - complete_cb=self._run_cleanup) - self.schedule_jobs([job]) - # _suspend_jobs() # # Suspend all ongoing jobs. diff --git a/buildstream/element.py b/buildstream/element.py index dd7ccfed3..6c01c0dd4 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -213,7 +213,6 @@ class Element(Plugin): self.__staged_sources_directory = None # Location where Element.stage_sources() was called self.__tainted = None # Whether the artifact is tainted and should not be shared self.__required = False # Whether the artifact is required in the current session - self.__artifact_size = None # The size of data committed to the artifact cache self.__build_result = None # The result of assembling this Element self._build_log_path = None # The path of the build log for this Element @@ -1509,6 +1508,9 @@ class Element(Plugin): # - Call the public abstract methods for the build phase # - Cache the resulting artifact # + # Returns: + # (int): The size of the newly cached artifact + # def _assemble(self): # Assert call ordering @@ -1655,7 +1657,7 @@ class Element(Plugin): }), os.path.join(metadir, 'workspaced-dependencies.yaml')) with self.timed_activity("Caching artifact"): - self.__artifact_size = utils._get_dir_size(assembledir) + artifact_size = utils._get_dir_size(assembledir) self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit()) if collect is not None and collectvdir is None: @@ -1667,6 +1669,8 @@ class Element(Plugin): # Finally cleanup the build dir cleanup_rootdir() + return artifact_size + def _get_build_log(self): return self._build_log_path @@ -1908,25 +1912,6 @@ class Element(Plugin): workspaces = self._get_context().get_workspaces() return workspaces.get_workspace(self._get_full_name()) - # _get_artifact_size() - # - # Get the size of the artifact produced by this element in the - # current pipeline - if this element has not been assembled or - # pulled, this will be None. - # - # Note that this is the size of an artifact *before* committing it - # to the cache, the size on disk may differ. It can act as an - # approximate guide for when to do a proper size calculation. - # - # Returns: - # (int|None): The size of the artifact - # - def _get_artifact_size(self): - return self.__artifact_size - - def _get_artifact_cache(self): - return self.__artifacts - # _write_script(): # # Writes a script to the given directory. diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py index 6190ae814..980710957 100644 --- a/tests/artifactcache/expiry.py +++ b/tests/artifactcache/expiry.py @@ -196,6 +196,22 @@ def test_keep_dependencies(cli, datafiles, tmpdir): # Assert that we never delete a dependency required for a build tree +# +# NOTE: This test expects that a build will fail if it attempts to +# put more artifacts in the cache than the quota can hold, +# and expects that the last two elements which don't fit into +# the quota wont even be built. +# +# In real life, this will not be the case, since once we reach +# the estimated quota we launch a cache size calculation job and +# only launch a cleanup job when the size is calculated; and +# other build tasks will be scheduled while the cache size job +# is running. +# +# This test only passes because we configure `builders` to 1, +# ensuring that the cache size job runs exclusively since it +# also requires a compute resource (a "builder"). +# @pytest.mark.datafiles(DATA_DIR) def test_never_delete_dependencies(cli, datafiles, tmpdir): project = os.path.join(datafiles.dirname, datafiles.basename) @@ -204,6 +220,9 @@ def test_never_delete_dependencies(cli, datafiles, tmpdir): cli.configure({ 'cache': { 'quota': 10000000 + }, + 'scheduler': { + 'builders': 1 } }) |