summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2018-07-17 13:02:57 +0100
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-07-18 14:45:59 +0900
commit7229d2e5d03227f174ac359fd1a73ef50c071c5a (patch)
tree4313de7b8c216485ee3dd53f3db0679029e75fde
parent1ec5c7b1207f212e127b15da4094ffe99504301b (diff)
downloadbuildstream-7229d2e5d03227f174ac359fd1a73ef50c071c5a.tar.gz
Compute the artifact cache size after each build/pull
-rw-r--r--buildstream/_artifactcache/artifactcache.py1
-rw-r--r--buildstream/_artifactcache/cascache.py11
-rw-r--r--buildstream/_scheduler/jobs/__init__.py1
-rw-r--r--buildstream/_scheduler/jobs/cachesizejob.py82
-rw-r--r--buildstream/_scheduler/queues/buildqueue.py17
-rw-r--r--buildstream/_scheduler/queues/pullqueue.py5
-rw-r--r--buildstream/_scheduler/scheduler.py12
7 files changed, 127 insertions, 2 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 3541f244e..9abe68cd0 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -85,6 +85,7 @@ class ArtifactCache():
self.project_remote_specs = {}
self._local = False
+ self.cache_size = None
os.makedirs(context.artifactdir, exist_ok=True)
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index f20250766..4f1d8ac18 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -77,7 +77,7 @@ class CASCache(ArtifactCache):
def extract(self, element, key):
ref = self.get_artifact_fullname(element, key)
- tree = self.resolve_ref(ref)
+ tree = self.resolve_ref(ref, update_mtime=True)
dest = os.path.join(self.extractdir, element._get_project().name, element.normal_name, tree.hash)
if os.path.isdir(dest):
@@ -113,6 +113,8 @@ class CASCache(ArtifactCache):
for ref in refs:
self.set_ref(ref, tree)
+ self.cache_size = None
+
def diff(self, element, key_a, key_b, *, subdir=None):
ref_a = self.get_artifact_fullname(element, key_a)
ref_b = self.get_artifact_fullname(element, key_b)
@@ -448,6 +450,13 @@ class CASCache(ArtifactCache):
except FileNotFoundError as e:
raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
+ def calculate_cache_size(self):
+ if self.cache_size is None:
+ self.cache_size = utils._get_dir_size(self.casdir)
+ self.estimated_size = self.cache_size
+
+ return self.cache_size
+
# list_artifacts():
#
# List cached artifacts in Least Recently Modified (LRM) order.
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
index 0030f5c97..981558641 100644
--- a/buildstream/_scheduler/jobs/__init__.py
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -1 +1,2 @@
from .elementjob import ElementJob
+from .cachesizejob import CacheSizeJob
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
new file mode 100644
index 000000000..ffb945e43
--- /dev/null
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -0,0 +1,82 @@
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author:
+# Tristan Daniƫl Maat <tristan.maat@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from .job import Job
+from ..._platform import Platform
+from ..._message import Message, MessageType
+
+
+class CacheSizeJob(Job):
+ def __init__(self, *args, complete_cb, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._complete_cb = complete_cb
+ self._cache = Platform._instance.artifactcache
+
+ def child_process(self):
+ return self._cache.calculate_cache_size()
+
+ def parent_complete(self, success, result):
+ self._cache._set_cache_size(result)
+ if self._complete_cb:
+ self._complete_cb(result)
+
+ @contextmanager
+ def child_logging_enabled(self, logfile):
+ self._logfile = logfile.format(pid=os.getpid())
+ yield self._logfile
+ self._logfile = None
+
+ def message(self, message_type, message, **kwargs):
+ args = dict(kwargs)
+ args['scheduler'] = True
+ self._scheduler.context.message(Message(None, message_type, message, **args))
+
+ def child_log(self, message):
+ with open(self._logfile, 'a+') as log:
+ INDENT = " "
+ EMPTYTIME = "--:--:--"
+
+ template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
+ detail = ''
+ if message.detail is not None:
+ template += "\n\n{detail}"
+ detail = message.detail.rstrip('\n')
+ detail = INDENT + INDENT.join(detail.splitlines(True))
+
+ timecode = EMPTYTIME
+ if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+ hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+ minutes, seconds = divmod(remainder, 60)
+ timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+ message_text = template.format(timecode=timecode,
+ type=message.message_type.upper(),
+ name='cache_size',
+ message=message.message,
+ detail=detail)
+
+ log.write('{}\n'.format(message_text))
+ log.flush()
+
+ return message
+
+ def child_process_data(self):
+ return {}
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 7f8ac9e8f..376ef5ae2 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -51,10 +51,27 @@ class BuildQueue(Queue):
return QueueStatus.READY
+ def _check_cache_size(self, job, element):
+ if not job.child_data:
+ return
+
+ artifact_size = job.child_data.get('artifact_size', False)
+
+ if artifact_size:
+ cache = element._get_artifact_cache()
+ cache._add_artifact_size(artifact_size)
+
+ if cache.get_approximate_cache_size() > self._scheduler.context.cache_quota:
+ self._scheduler._check_cache_size_real()
+
def done(self, job, element, result, success):
if success:
# Inform element in main process that assembly is done
element._assemble_done()
+ # This has to be done after _assemble_done, such that the
+ # element may register its cache key as required
+ self._check_cache_size(job, element)
+
return True
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index efaa59ef3..430afc410 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -59,6 +59,11 @@ class PullQueue(Queue):
element._pull_done()
+ # Build jobs will check the "approximate" size first. Since we
+ # do not get an artifact size from pull jobs, we have to
+ # actually check the cache size.
+ self._scheduler._check_cache_size_real()
+
# Element._pull() returns True if it downloaded an artifact,
# here we want to appear skipped if we did not download.
return result
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index bc182db32..a11134cc8 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -27,7 +27,8 @@ import datetime
from contextlib import contextmanager
# Local imports
-from .resources import Resources
+from .resources import Resources, ResourceType
+from .jobs import CacheSizeJob
# A decent return code for Scheduler.run()
@@ -312,6 +313,15 @@ class Scheduler():
self.schedule_jobs(ready)
self._sched()
+ def _check_cache_size_real(self):
+ logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log')
+ job = CacheSizeJob(self, 'cache_size', logpath,
+ resources=[ResourceType.CACHE,
+ ResourceType.PROCESS],
+ exclusive_resources=[ResourceType.CACHE],
+ complete_cb=None)
+ self.schedule_jobs([job])
+
# _suspend_jobs()
#
# Suspend all ongoing jobs.