summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Westphahl <simon.westphahl@bmw.de>2021-05-04 15:05:45 +0200
committerJames E. Blair <jim@acmegating.com>2021-05-24 09:31:15 -0700
commit62526da69f3fffd17ca9da2288ce41c97f435977 (patch)
tree73258694548947fd26d290e3f9292525943cfdef
parent997f88185a9e7889e75e1e862a86bef6800075a3 (diff)
downloadzuul-62526da69f3fffd17ca9da2288ce41c97f435977.tar.gz
Cache unparsed config files in Zookeeper
Cache raw config files in Zookeeper so we can load the config on multiple schedulers with only one round trip to the mergers. In case no cache ltime is give, the configloader will get the content of the config files via a cat job and store the returned config in Zookeeper. For tenant reconfigurations the 'zuul_event_ltime' of the corresponding event will be passed as the cache ltime to 'reloadTenant()'. This has the advantage that in case a project is part of multiple tenants, only the first tenant will request the config from the mergers. All following tenants can get the config from Zookeeper. In a later step we can save the logical timestamp of a tenant reconfiguration in the 'global layout state'. This timestamp can then be used as the cache ltime by other schedulers to update their local layout. Change-Id: Ia3d2a124eaf6ee9d82c94a0375ee7ecba61caadf
-rw-r--r--tests/base.py9
-rw-r--r--tests/unit/test_configloader.py57
-rw-r--r--zuul/configloader.py115
-rw-r--r--zuul/scheduler.py19
4 files changed, 157 insertions, 43 deletions
diff --git a/tests/base.py b/tests/base.py
index ced25e6b1..1500bbc68 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -4576,6 +4576,15 @@ class ZuulTestCase(BaseTestCase):
with open(private_key_file, 'w') as o:
o.write(i.read())
+ def getCurrentLtime(self):
+ """Get the logical timestamp as seen by the Zookeeper cluster."""
+ result = self.zk_client.client.command(b"srvr")
+ for line in result.splitlines():
+ match = re.match(r"zxid:\s+0x(?P<zxid>[a-f0-9])", line, re.I)
+ if match:
+ return int(match.group("zxid"), 16)
+ raise RuntimeError("Could not find zxid in Zookeeper srvr output")
+
def copyDirToRepo(self, project, source_path):
self.init_repo(project)
diff --git a/tests/unit/test_configloader.py b/tests/unit/test_configloader.py
index 275692381..fd79e2968 100644
--- a/tests/unit/test_configloader.py
+++ b/tests/unit/test_configloader.py
@@ -18,6 +18,7 @@ import logging
import textwrap
import testtools
+from zuul import model
from zuul.configloader import AuthorizationRuleParser, safe_load_yaml
from tests.base import ZuulTestCase
@@ -521,6 +522,62 @@ class TestConfigConflict(ZuulTestCase):
jobs)
+class TestUnparsedConfigCache(ZuulTestCase):
+ tenant_config_file = 'config/single-tenant/main.yaml'
+
+ def test_config_caching(self):
+ cache = self.scheds.first.sched. unparsed_config_cache
+ tenant = self.scheds.first.sched.abide.tenants["tenant-one"]
+
+ common_cache = cache.getFilesCache("review.example.com/common-config",
+ "master")
+ tpc = tenant.project_configs["review.example.com/common-config"]
+ self.assertTrue(common_cache.isValidFor(tpc, cache_ltime=-1))
+ self.assertEqual(len(common_cache), 1)
+ self.assertIn("zuul.yaml", common_cache)
+ self.assertTrue(len(common_cache["zuul.yaml"]) > 0)
+
+ project_cache = cache.getFilesCache("review.example.com/org/project",
+ "master")
+ # Cache of org/project should be valid but empty (no in-repo config)
+ tpc = tenant.project_configs["review.example.com/org/project"]
+ self.assertTrue(project_cache.isValidFor(tpc, cache_ltime=-1))
+ self.assertEqual(len(project_cache), 0)
+
+ def test_cache_use(self):
+ sched = self.scheds.first.sched
+ # Stop cleanup thread so it's not removing projects from the cache
+ # during the test.
+ sched.cleanup_stop.set()
+ sched.cleanup_thread.join()
+ tenant = sched.abide.tenants['tenant-one']
+ _, project = tenant.getProject('org/project2')
+
+ # Get the current ltime from Zookeeper and run a full reconfiguration,
+ # so that we know all items in the cache have a larger ltime.
+ ltime = self.getCurrentLtime()
+ self.scheds.first.fullReconfigure()
+
+ # Clear the unparsed branch cache so all projects (except for
+ # org/project2) are retrieved from the cache in Zookeeper.
+ sched.abide.unparsed_project_branch_cache.clear()
+ self.gearman_server.jobs_history.clear()
+
+ # Create a tenant reconfiguration event with a known ltime that is
+ # smaller than the ltime of the items in the cache.
+ event = model.TenantReconfigureEvent(
+ tenant.name, project.canonical_name, branch_name=None)
+ event.zuul_event_ltime = ltime
+ sched.management_events.put(event, needs_result=False)
+ self.waitUntilSettled()
+
+ # As the cache should be valid, we only expect a cat job for
+ # org/project2
+ cat_jobs = [job for job in self.gearman_server.jobs_history
+ if job.name == b"merger:cat"]
+ self.assertEqual(len(cat_jobs), 1)
+
+
class TestAuthorizationRuleParser(ZuulTestCase):
tenant_config_file = 'config/tenant-parser/authorizations.yaml'
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 374ffe3a7..8f4cce83b 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -1485,6 +1485,7 @@ class TenantParser(object):
self.scheduler = scheduler
self.merger = merger
self.keystorage = keystorage
+ self.unparsed_config_cache = self.scheduler.unparsed_config_cache
classes = vs.Any('pipeline', 'job', 'semaphore', 'project',
'project-template', 'nodeset', 'secret', 'queue')
@@ -1546,7 +1547,7 @@ class TenantParser(object):
}
return vs.Schema(tenant)
- def fromYaml(self, abide, conf, ansible_manager):
+ def fromYaml(self, abide, conf, ansible_manager, cache_ltime=None):
self.getSchema()(conf)
tenant = model.Tenant(conf['name'])
pcontext = ParseContext(self.connections, self.scheduler,
@@ -1604,7 +1605,7 @@ class TenantParser(object):
# Start by fetching any YAML needed by this tenant which isn't
# already cached. Full reconfigurations start with an empty
# cache.
- self._cacheTenantYAML(abide, tenant, loading_errors)
+ self._cacheTenantYAML(abide, tenant, loading_errors, cache_ltime)
# Then collect the appropriate YAML based on this tenant
# config.
@@ -1770,7 +1771,7 @@ class TenantParser(object):
return config_projects, untrusted_projects
- def _cacheTenantYAML(self, abide, tenant, loading_errors):
+ def _cacheTenantYAML(self, abide, tenant, loading_errors, cache_ltime):
jobs = []
for project in itertools.chain(
tenant.config_projects, tenant.untrusted_projects):
@@ -1790,6 +1791,24 @@ class TenantParser(object):
# If all config classes are excluded then do not
# request any getFiles jobs.
continue
+
+ source_context = model.SourceContext(
+ project, branch, '', False)
+ if cache_ltime is not None:
+ files_cache = self.unparsed_config_cache.getFilesCache(
+ project.canonical_name, branch)
+ with self.unparsed_config_cache.readLock(
+ project.canonical_name):
+ if files_cache.isValidFor(tpc, cache_ltime):
+ self.log.debug(
+ "Using files from cache for project %s @%s",
+ project.canonical_name, branch)
+ self._updateUnparsedBranchCache(
+ abide, tenant, source_context, files_cache,
+ loading_errors)
+ branch_cache.setValidFor(tpc)
+ continue
+
extra_config_files = abide.getExtraConfigFiles(project.name)
extra_config_dirs = abide.getExtraConfigDirs(project.name)
job = self.merger.getFiles(
@@ -1801,8 +1820,9 @@ class TenantParser(object):
self.log.debug("Submitting cat job %s for %s %s %s" % (
job, project.source.connection.connection_name,
project.name, branch))
- job.source_context = model.SourceContext(
- project, branch, '', False)
+ job.extra_config_files = extra_config_files
+ job.extra_config_dirs = extra_config_dirs
+ job.source_context = source_context
jobs.append(job)
branch_cache.setValidFor(tpc)
@@ -1817,41 +1837,54 @@ class TenantParser(object):
raise Exception("Cat job %s failed" % (job,))
self.log.debug("Cat job %s got files %s" %
(job, job.files.keys()))
- loaded = False
- files = sorted(job.files.keys())
- unparsed_config = model.UnparsedConfig()
- tpc = tenant.project_configs[
- job.source_context.project.canonical_name]
- for conf_root in (
- ('zuul.yaml', 'zuul.d', '.zuul.yaml', '.zuul.d') +
- tpc.extra_config_files + tpc.extra_config_dirs):
- for fn in files:
- fn_root = fn.split('/')[0]
- if fn_root != conf_root or not job.files.get(fn):
+
+ self._updateUnparsedBranchCache(abide, tenant, job.source_context,
+ job.files, loading_errors)
+
+ # Save all config files in Zookeeper (not just for the current tpc)
+ files_cache = self.unparsed_config_cache.getFilesCache(
+ job.source_context.project.canonical_name,
+ job.source_context.branch)
+ with self.unparsed_config_cache.writeLock(project.canonical_name):
+ for fn, content in job.files.items():
+ # Cache file in Zookeeper
+ if content is not None:
+ files_cache[fn] = content
+ files_cache.setValidFor(job.extra_config_files,
+ job.extra_config_dirs)
+
+ def _updateUnparsedBranchCache(self, abide, tenant, source_context, files,
+ loading_errors):
+ loaded = False
+ tpc = tenant.project_configs[source_context.project.canonical_name]
+ for conf_root in (
+ ('zuul.yaml', 'zuul.d', '.zuul.yaml', '.zuul.d') +
+ tpc.extra_config_files + tpc.extra_config_dirs):
+ for fn in sorted(files.keys()):
+ fn_root = fn.split('/')[0]
+ if fn_root != conf_root or not files.get(fn):
+ continue
+ # Don't load from more than one configuration in a
+ # project-branch (unless an "extra" file/dir).
+ if (conf_root not in tpc.extra_config_files and
+ conf_root not in tpc.extra_config_dirs):
+ if (loaded and loaded != conf_root):
+ self.log.warning("Multiple configuration files in %s",
+ source_context)
continue
- # Don't load from more than one configuration in a
- # project-branch (unless an "extra" file/dir).
- if (conf_root not in tpc.extra_config_files and
- conf_root not in tpc.extra_config_dirs):
- if (loaded and loaded != conf_root):
- self.log.warning(
- "Multiple configuration files in %s" %
- (job.source_context,))
- continue
- loaded = conf_root
- # Create a new source_context so we have unique filenames.
- source_context = job.source_context.copy()
- source_context.path = fn
- self.log.info(
- "Loading configuration from %s" %
- (source_context,))
- incdata = self.loadProjectYAML(
- job.files[fn], source_context, loading_errors)
- branch_cache = abide.getUnparsedBranchCache(
- source_context.project.canonical_name,
- source_context.branch)
- branch_cache.put(source_context.path, incdata)
- unparsed_config.extend(incdata)
+ loaded = conf_root
+ # Create a new source_context so we have unique filenames.
+ source_context = source_context.copy()
+ source_context.path = fn
+ self.log.info(
+ "Loading configuration from %s" %
+ (source_context,))
+ incdata = self.loadProjectYAML(
+ files[fn], source_context, loading_errors)
+ branch_cache = abide.getUnparsedBranchCache(
+ source_context.project.canonical_name,
+ source_context.branch)
+ branch_cache.put(source_context.path, incdata)
def _loadTenantYAML(self, abide, tenant, loading_errors):
config_projects_config = model.UnparsedConfig()
@@ -2262,7 +2295,7 @@ class ConfigLoader(object):
return abide
def reloadTenant(self, abide, tenant, ansible_manager,
- unparsed_abide=None):
+ unparsed_abide=None, cache_ltime=None):
new_abide = model.Abide()
new_abide.tenants = abide.tenants.copy()
new_abide.admin_rules = abide.admin_rules.copy()
@@ -2296,7 +2329,7 @@ class ConfigLoader(object):
# When reloading a tenant only, use cached data if available.
new_tenant = self.tenant_parser.fromYaml(
- new_abide, unparsed_config, ansible_manager)
+ new_abide, unparsed_config, ansible_manager, cache_ltime)
new_abide.tenants[tenant.name] = new_tenant
if len(new_tenant.layout.loading_errors):
self.log.warning(
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 5a38759ad..63d787b7c 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -71,6 +71,7 @@ from zuul.zk import ZooKeeperClient
from zuul.zk.components import (
BaseComponent, ComponentRegistry, SchedulerComponent
)
+from zuul.zk.config_cache import UnparsedConfigCache
from zuul.zk.event_queues import (
GlobalEventWatcher,
GlobalManagementEventQueue,
@@ -158,6 +159,7 @@ class Scheduler(threading.Thread):
self.component_info = SchedulerComponent(self.zk_client, self.hostname)
self.component_info.register()
self.component_registry = ComponentRegistry(self.zk_client)
+ self.unparsed_config_cache = UnparsedConfigCache(self.zk_client)
self.result_event_queue = NamedQueue("ResultEventQueue")
self.global_watcher = GlobalEventWatcher(
@@ -434,6 +436,14 @@ class Scheduler(threading.Thread):
except Exception:
self.log.exception("Error in semaphore cleanup:")
+ cached_projects = set(
+ self.unparsed_config_cache.listCachedProjects())
+ active_projects = set(
+ self.abide.unparsed_project_branch_cache.keys())
+ unused_projects = cached_projects - active_projects
+ for project_cname in unused_projects:
+ self.unparsed_config_cache.clearCache(project_cname)
+
def addTriggerEvent(self, driver_name, event):
event.arrived_at_scheduler_timestamp = time.time()
self.trigger_events.put(driver_name, event)
@@ -865,11 +875,16 @@ class Scheduler(threading.Thread):
project_name, branch_name)
self.abide.clearUnparsedBranchCache(project_name,
branch_name)
+ with self.unparsed_config_cache.writeLock(project_name):
+ self.unparsed_config_cache.clearCache(project_name,
+ branch_name)
+
old_tenant = self.abide.tenants[event.tenant_name]
loader = configloader.ConfigLoader(
self.connections, self, self.merger, self.keystore)
- abide = loader.reloadTenant(
- self.abide, old_tenant, self.ansible_manager)
+ abide = loader.reloadTenant(self.abide, old_tenant,
+ self.ansible_manager,
+ cache_ltime=event.zuul_event_ltime)
tenant = abide.tenants[event.tenant_name]
self._reconfigureTenant(tenant)
self.abide = abide