diff options
author | Tristan Maat <tristan.maat@codethink.co.uk> | 2018-07-17 13:02:57 +0100 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-07-18 14:45:59 +0900 |
commit | 7229d2e5d03227f174ac359fd1a73ef50c071c5a (patch) | |
tree | 4313de7b8c216485ee3dd53f3db0679029e75fde /buildstream | |
parent | 1ec5c7b1207f212e127b15da4094ffe99504301b (diff) | |
download | buildstream-7229d2e5d03227f174ac359fd1a73ef50c071c5a.tar.gz |
Compute the artifact cache size after each build/pull
Diffstat (limited to 'buildstream')
-rw-r--r-- | buildstream/_artifactcache/artifactcache.py | 1 | ||||
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 11 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/__init__.py | 1 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/cachesizejob.py | 82 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/buildqueue.py | 17 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pullqueue.py | 5 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 12 |
7 files changed, 127 insertions, 2 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index 3541f244e..9abe68cd0 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -85,6 +85,7 @@ class ArtifactCache(): self.project_remote_specs = {} self._local = False + self.cache_size = None os.makedirs(context.artifactdir, exist_ok=True) diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index f20250766..4f1d8ac18 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -77,7 +77,7 @@ class CASCache(ArtifactCache): def extract(self, element, key): ref = self.get_artifact_fullname(element, key) - tree = self.resolve_ref(ref) + tree = self.resolve_ref(ref, update_mtime=True) dest = os.path.join(self.extractdir, element._get_project().name, element.normal_name, tree.hash) if os.path.isdir(dest): @@ -113,6 +113,8 @@ class CASCache(ArtifactCache): for ref in refs: self.set_ref(ref, tree) + self.cache_size = None + def diff(self, element, key_a, key_b, *, subdir=None): ref_a = self.get_artifact_fullname(element, key_a) ref_b = self.get_artifact_fullname(element, key_b) @@ -448,6 +450,13 @@ class CASCache(ArtifactCache): except FileNotFoundError as e: raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + def calculate_cache_size(self): + if self.cache_size is None: + self.cache_size = utils._get_dir_size(self.casdir) + self.estimated_size = self.cache_size + + return self.cache_size + # list_artifacts(): # # List cached artifacts in Least Recently Modified (LRM) order. diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py index 0030f5c97..981558641 100644 --- a/buildstream/_scheduler/jobs/__init__.py +++ b/buildstream/_scheduler/jobs/__init__.py @@ -1 +1,2 @@ from .elementjob import ElementJob +from .cachesizejob import CacheSizeJob diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py new file mode 100644 index 000000000..ffb945e43 --- /dev/null +++ b/buildstream/_scheduler/jobs/cachesizejob.py @@ -0,0 +1,82 @@ +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Author: +# Tristan Daniƫl Maat <tristan.maat@codethink.co.uk> +# +import os +from contextlib import contextmanager + +from .job import Job +from ..._platform import Platform +from ..._message import Message, MessageType + + +class CacheSizeJob(Job): + def __init__(self, *args, complete_cb, **kwargs): + super().__init__(*args, **kwargs) + self._complete_cb = complete_cb + self._cache = Platform._instance.artifactcache + + def child_process(self): + return self._cache.calculate_cache_size() + + def parent_complete(self, success, result): + self._cache._set_cache_size(result) + if self._complete_cb: + self._complete_cb(result) + + @contextmanager + def child_logging_enabled(self, logfile): + self._logfile = logfile.format(pid=os.getpid()) + yield self._logfile + self._logfile = None + + def message(self, message_type, message, **kwargs): + args = dict(kwargs) + args['scheduler'] = True + self._scheduler.context.message(Message(None, message_type, message, **args)) + + def child_log(self, message): + with open(self._logfile, 'a+') as log: + INDENT = " " + EMPTYTIME = "--:--:--" + + template = "[{timecode: <8}] {type: <7} {name: <15}: {message}" + detail = '' + if message.detail is not None: + template += "\n\n{detail}" + detail = message.detail.rstrip('\n') + detail = INDENT + INDENT.join(detail.splitlines(True)) + + timecode = EMPTYTIME + if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): + hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2) + minutes, seconds = divmod(remainder, 60) + timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) + + message_text = template.format(timecode=timecode, + type=message.message_type.upper(), + name='cache_size', + message=message.message, + detail=detail) + + log.write('{}\n'.format(message_text)) + log.flush() + + return message + + def child_process_data(self): + return {} diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index 7f8ac9e8f..376ef5ae2 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -51,10 +51,27 @@ class BuildQueue(Queue): return QueueStatus.READY + def _check_cache_size(self, job, element): + if not job.child_data: + return + + artifact_size = job.child_data.get('artifact_size', False) + + if artifact_size: + cache = element._get_artifact_cache() + cache._add_artifact_size(artifact_size) + + if cache.get_approximate_cache_size() > self._scheduler.context.cache_quota: + self._scheduler._check_cache_size_real() + def done(self, job, element, result, success): if success: # Inform element in main process that assembly is done element._assemble_done() + # This has to be done after _assemble_done, such that the + # element may register its cache key as required + self._check_cache_size(job, element) + return True diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index efaa59ef3..430afc410 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -59,6 +59,11 @@ class PullQueue(Queue): element._pull_done() + # Build jobs will check the "approximate" size first. Since we + # do not get an artifact size from pull jobs, we have to + # actually check the cache size. + self._scheduler._check_cache_size_real() + # Element._pull() returns True if it downloaded an artifact, # here we want to appear skipped if we did not download. return result diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index bc182db32..a11134cc8 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -27,7 +27,8 @@ import datetime from contextlib import contextmanager # Local imports -from .resources import Resources +from .resources import Resources, ResourceType +from .jobs import CacheSizeJob # A decent return code for Scheduler.run() @@ -312,6 +313,15 @@ class Scheduler(): self.schedule_jobs(ready) self._sched() + def _check_cache_size_real(self): + logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log') + job = CacheSizeJob(self, 'cache_size', logpath, + resources=[ResourceType.CACHE, + ResourceType.PROCESS], + exclusive_resources=[ResourceType.CACHE], + complete_cb=None) + self.schedule_jobs([job]) + # _suspend_jobs() # # Suspend all ongoing jobs. |