diff options
author | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2019-01-19 13:51:45 -0500 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2019-01-24 11:55:24 -0500 |
commit | ce01f87e3ac759b12eece4b20be1987ac0e6151c (patch) | |
tree | e8013092c4b514daa5cf6cb4688aca02782041c7 | |
parent | 3e36e3635ee01521facf146958cfedee3e923cdc (diff) | |
download | buildstream-ce01f87e3ac759b12eece4b20be1987ac0e6151c.tar.gz |
_scheduler/scheduler.py: Run cache size exclusively at startup
When running any session that has Queues which require Resource.CACHE,
check if our loaded estimated size exceeds the quota, and if so;
lock the Resource.CACHE resource exclusively right away and run
an exclusive initial cache size job.
This ensures we cleanup first before doing anything which might
add to the cache at startup time, if deemed needed.
This is a partial fix for issue #737
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 61 |
1 files changed, 59 insertions, 2 deletions
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 9b688d1dd..eb67fed68 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -151,6 +151,9 @@ class Scheduler(): # Handle unix signals while running self._connect_signals() + # Check if we need to start with some cache maintenance + self._check_cache_management() + # Run the queues self._sched() self.loop.run_forever() @@ -272,6 +275,31 @@ class Scheduler(): # Local Private Methods # ####################################################### + # _check_cache_management() + # + # Run an initial check if we need to lock the cache + # resource and check the size and possibly launch + # a cleanup. + # + # Sessions which do not add to the cache are not affected. + # + def _check_cache_management(self): + + # Only trigger the check for a scheduler run which has + # queues which require the CACHE resource. + if not any(q for q in self.queues + if ResourceType.CACHE in q.resources): + return + + # If the estimated size outgrows the quota, queue a job to + # actually check the real cache size initially, this one + # should have exclusive access to the cache to ensure nothing + # starts while we are checking the cache. + # + artifacts = self.context.artifactcache + if artifacts.has_quota_exceeded(): + self._sched_cache_size_job(exclusive=True) + # _spawn_job() # # Spanws a job @@ -292,6 +320,11 @@ class Scheduler(): self._cache_size_running = None self.resources.release([ResourceType.CACHE, ResourceType.PROCESS]) + # Unregister the exclusive interest if there was any + self.resources.unregister_exclusive_interest( + [ResourceType.CACHE], 'cache-size' + ) + # Schedule a cleanup job if we've hit the threshold if status != JobStatus.OK: return @@ -344,11 +377,35 @@ class Scheduler(): # Runs a cache size job if one is scheduled to run now and # sufficient recources are available. # - def _sched_cache_size_job(self): + # Args: + # exclusive (bool): Run a cache size job immediately and + # hold the ResourceType.CACHE resource + # exclusively (used at startup). + # + def _sched_cache_size_job(self, *, exclusive=False): + + # The exclusive argument is not intended (or safe) for arbitrary use. + if exclusive: + assert not self._cache_size_scheduled + assert not self._cache_size_running + assert not self._active_jobs + self._cache_size_scheduled = True if self._cache_size_scheduled and not self._cache_size_running: - if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]): + # Handle the exclusive launch + exclusive_resources = set() + if exclusive: + exclusive_resources.add(ResourceType.CACHE) + self.resources.register_exclusive_interest( + exclusive_resources, 'cache-size' + ) + + # Reserve the resources (with the possible exclusive cache resource) + if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS], + exclusive_resources): + + # Update state and launch self._cache_size_scheduled = False self._cache_size_running = \ CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE, |