summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2019-06-13 17:43:11 +0200
committerJürg Billeter <j@bitron.ch>2019-08-20 07:41:23 +0200
commit904f77f01267b4607a2f0bd3687d8b6e6d296ec8 (patch)
tree0000df57a7d31a92103f26e3b6d03f0b62481de3
parent147dd700fa9bf9634e23c8a38173ca49699570c6 (diff)
downloadbuildstream-904f77f01267b4607a2f0bd3687d8b6e6d296ec8.tar.gz
_scheduler: Remove cache size job
Cache size will be tracked by buildbox-casd.
-rw-r--r--src/buildstream/_scheduler/jobs/__init__.py1
-rw-r--r--src/buildstream/_scheduler/jobs/cachesizejob.py48
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py29
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py6
-rw-r--r--src/buildstream/_scheduler/scheduler.py115
-rw-r--r--tests/artifactcache/cache_size.py90
6 files changed, 2 insertions, 287 deletions
diff --git a/src/buildstream/_scheduler/jobs/__init__.py b/src/buildstream/_scheduler/jobs/__init__.py
index 96062089f..9f081c8a0 100644
--- a/src/buildstream/_scheduler/jobs/__init__.py
+++ b/src/buildstream/_scheduler/jobs/__init__.py
@@ -18,5 +18,4 @@
# Tristan Maat <tristan.maat@codethink.co.uk>
from .elementjob import ElementJob
-from .cachesizejob import CacheSizeJob
from .job import JobStatus
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py
deleted file mode 100644
index 581101c07..000000000
--- a/src/buildstream/_scheduler/jobs/cachesizejob.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# Copyright (C) 2018 Codethink Limited
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-#
-# Author:
-# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
-#
-from .job import Job, JobStatus, ChildJob
-
-
-class CacheSizeJob(Job):
- def __init__(self, *args, complete_cb, **kwargs):
- super().__init__(*args, **kwargs)
- self.set_name(self.action_name)
- self._complete_cb = complete_cb
-
- context = self._scheduler.context
- self._casquota = context.get_casquota()
-
- def parent_complete(self, status, result):
- if status is JobStatus.OK:
- self._casquota.set_cache_size(result)
-
- if self._complete_cb:
- self._complete_cb(status, result)
-
- def create_child_job(self, *args, **kwargs):
- return ChildCacheSizeJob(*args, casquota=self._scheduler.context._casquota, **kwargs)
-
-
-class ChildCacheSizeJob(ChildJob):
- def __init__(self, *args, casquota, **kwargs):
- super().__init__(*args, **kwargs)
- self._casquota = casquota
-
- def child_process(self):
- return self._casquota.compute_cache_size()
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index 1be3f7cd0..dc33e6510 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -21,7 +21,6 @@
from datetime import timedelta
from . import Queue, QueueStatus
-from ..jobs import JobStatus
from ..resources import ResourceType
from ..._message import MessageType
@@ -73,39 +72,11 @@ class BuildQueue(Queue):
return QueueStatus.READY
- def _check_cache_size(self, job, element, artifact_size):
-
- # After completing a build job, add the artifact size
- # as returned from Element._assemble() to the estimated
- # artifact cache size
- #
- context = self._scheduler.context
- artifacts = context.artifactcache
-
- artifacts.add_artifact_size(artifact_size)
-
- # If the estimated size outgrows the quota, ask the scheduler
- # to queue a job to actually check the real cache size.
- #
- if artifacts.full():
- self._scheduler.check_cache_size()
-
def done(self, job, element, result, status):
# Inform element in main process that assembly is done
element._assemble_done()
- # This has to be done after _assemble_done, such that the
- # element may register its cache key as required
- #
- # FIXME: Element._assemble() does not report both the failure state and the
- # size of the newly cached failed artifact, so we can only adjust the
- # artifact cache size for a successful build even though we know a
- # failed build also grows the artifact cache size.
- #
- if status is JobStatus.OK:
- self._check_cache_size(job, element, result)
-
def register_pending_element(self, element):
# Set a "buildable" callback for an element not yet ready
# to be processed in the build queue.
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 2c46cd2fd..7f4125099 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -52,12 +52,6 @@ class PullQueue(Queue):
element._pull_done()
- # Build jobs will check the "approximate" size first. Since we
- # do not get an artifact size from pull jobs, we have to
- # actually check the cache size.
- if status is JobStatus.OK:
- self._scheduler.check_cache_size()
-
def register_pending_element(self, element):
# Set a "can_query_cache"_callback for an element which is not
# immediately ready to query the artifact cache so that it
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index b191e7693..b28b26f0b 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -27,8 +27,8 @@ import datetime
from contextlib import contextmanager
# Local imports
-from .resources import Resources, ResourceType
-from .jobs import JobStatus, CacheSizeJob
+from .resources import Resources
+from .jobs import JobStatus
from .._profile import Topics, PROFILER
@@ -39,11 +39,6 @@ class SchedStatus():
TERMINATED = 1
-# Some action names for the internal jobs we launch
-#
-_ACTION_NAME_CACHE_SIZE = 'size'
-
-
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -97,10 +92,6 @@ class Scheduler():
self._queue_jobs = True # Whether we should continue to queue jobs
self._state = state
- # State of cache management related jobs
- self._cache_size_scheduled = False # Whether we have a cache size job scheduled
- self._cache_size_running = None # A running CacheSizeJob, or None
-
# Callbacks to report back to the Scheduler owner
self._interrupt_callback = interrupt_callback
self._ticker_callback = ticker_callback
@@ -139,9 +130,6 @@ class Scheduler():
# Handle unix signals while running
self._connect_signals()
- # Check if we need to start with some cache maintenance
- self._check_cache_management()
-
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
# Run the queues
@@ -268,51 +256,10 @@ class Scheduler():
# Now check for more jobs
self._sched()
- # check_cache_size():
- #
- # Queues a cache size calculation job, after the cache
- # size is calculated, a cleanup job will be run automatically
- # if needed.
- #
- def check_cache_size(self):
-
- # Here we assume we are called in response to a job
- # completion callback, or before entering the scheduler.
- #
- # As such there is no need to call `_sched()` from here,
- # and we prefer to run it once at the last moment.
- #
- self._cache_size_scheduled = True
-
#######################################################
# Local Private Methods #
#######################################################
- # _check_cache_management()
- #
- # Run an initial check if we need to lock the cache
- # resource and check the size and possibly launch
- # a cleanup.
- #
- # Sessions which do not add to the cache are not affected.
- #
- def _check_cache_management(self):
-
- # Only trigger the check for a scheduler run which has
- # queues which require the CACHE resource.
- if not any(q for q in self.queues
- if ResourceType.CACHE in q.resources):
- return
-
- # If the estimated size outgrows the quota, queue a job to
- # actually check the real cache size initially, this one
- # should have exclusive access to the cache to ensure nothing
- # starts while we are checking the cache.
- #
- artifacts = self.context.artifactcache
- if artifacts.full():
- self._sched_cache_size_job(exclusive=True)
-
# _start_job()
#
# Spanws a job
@@ -325,59 +272,6 @@ class Scheduler():
self._state.add_task(job.action_name, job.name, self.elapsed_time())
job.start()
- # Callback for the cache size job
- def _cache_size_job_complete(self, status, cache_size):
-
- # Deallocate cache size job resources
- self._cache_size_running = None
- self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
-
- # Unregister the exclusive interest if there was any
- self.resources.unregister_exclusive_interest(
- [ResourceType.CACHE], 'cache-size'
- )
-
- # _sched_cache_size_job()
- #
- # Runs a cache size job if one is scheduled to run now and
- # sufficient recources are available.
- #
- # Args:
- # exclusive (bool): Run a cache size job immediately and
- # hold the ResourceType.CACHE resource
- # exclusively (used at startup).
- #
- def _sched_cache_size_job(self, *, exclusive=False):
-
- # The exclusive argument is not intended (or safe) for arbitrary use.
- if exclusive:
- assert not self._cache_size_scheduled
- assert not self._cache_size_running
- assert not self._active_jobs
- self._cache_size_scheduled = True
-
- if self._cache_size_scheduled and not self._cache_size_running:
-
- # Handle the exclusive launch
- exclusive_resources = set()
- if exclusive:
- exclusive_resources.add(ResourceType.CACHE)
- self.resources.register_exclusive_interest(
- exclusive_resources, 'cache-size'
- )
-
- # Reserve the resources (with the possible exclusive cache resource)
- if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
- exclusive_resources):
-
- # Update state and launch
- self._cache_size_scheduled = False
- self._cache_size_running = \
- CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
- 'cache_size/cache_size',
- complete_cb=self._cache_size_job_complete)
- self._start_job(self._cache_size_running)
-
# _sched_queue_jobs()
#
# Ask the queues what jobs they want to schedule and schedule
@@ -440,11 +334,6 @@ class Scheduler():
if not self.terminated:
#
- # Try the cache management jobs
- #
- self._sched_cache_size_job()
-
- #
# Run as many jobs as the queues can handle for the
# available resources
#
diff --git a/tests/artifactcache/cache_size.py b/tests/artifactcache/cache_size.py
deleted file mode 100644
index fb34b5fad..000000000
--- a/tests/artifactcache/cache_size.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# Pylint doesn't play well with fixtures and dependency injection from pytest
-# pylint: disable=redefined-outer-name
-
-import os
-from unittest import mock
-
-from buildstream import _yaml
-from buildstream._cas.cascache import CACHE_SIZE_FILE
-from buildstream._exceptions import ErrorDomain
-from buildstream.testing import cli # pylint: disable=unused-import
-
-from tests.testutils import create_element_size
-
-# XXX: Currently lacking:
-# * A way to check whether it's faster to read cache size on
-# successive invocations.
-# * A way to check whether the cache size file has been read.
-
-
-def create_project(project_dir):
- project_file = os.path.join(project_dir, "project.conf")
- project_conf = {
- "name": "test"
- }
- _yaml.roundtrip_dump(project_conf, project_file)
- element_name = "test.bst"
- create_element_size(element_name, project_dir, ".", [], 1024)
-
-
-def test_cache_size_roundtrip(cli, tmpdir):
- # Builds (to put files in the cache), then invokes buildstream again
- # to check nothing breaks
-
- # Create project
- project_dir = str(tmpdir)
- create_project(project_dir)
-
- # Build, to populate the cache
- res = cli.run(project=project_dir, args=["build", "test.bst"])
- res.assert_success()
-
- # Show, to check that nothing breaks while reading cache size
- res = cli.run(project=project_dir, args=["show", "test.bst"])
- res.assert_success()
-
-
-def test_cache_size_write(cli, tmpdir):
- # Builds (to put files in the cache), then checks a number is
- # written to the cache size file.
-
- project_dir = str(tmpdir)
- create_project(project_dir)
-
- # Artifact cache must be in a known place
- casdir = os.path.join(project_dir, "cas")
- cli.configure({"cachedir": project_dir})
-
- # Build, to populate the cache
- res = cli.run(project=project_dir, args=["build", "test.bst"])
- res.assert_success()
-
- # Inspect the artifact cache
- sizefile = os.path.join(casdir, CACHE_SIZE_FILE)
- assert os.path.isfile(sizefile)
-
-
-def test_quota_over_1024T(cli, tmpdir):
- KiB = 1024
- MiB = (KiB * 1024)
- GiB = (MiB * 1024)
- TiB = (GiB * 1024)
-
- cli.configure({
- 'cache': {
- 'quota': 2048 * TiB
- }
- })
- project = tmpdir.join("main")
- os.makedirs(str(project))
- _yaml.roundtrip_dump({'name': 'main'}, str(project.join("project.conf")))
-
- volume_space_patch = mock.patch(
- "buildstream._cas.CASQuota._get_cache_volume_size",
- autospec=True,
- return_value=(1025 * TiB, 1025 * TiB)
- )
-
- with volume_space_patch:
- result = cli.run(project, args=["build", "file.bst"])
- result.assert_main_error(ErrorDomain.CAS, 'insufficient-storage-for-quota')