summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2022-06-18 11:51:08 -0700
committerJames E. Blair <jim@acmegating.com>2022-06-19 12:31:02 -0700
commit42e1e1e324ac9d29b15dc4c8ba2ca58d9b219e88 (patch)
treeda9c1d4726d95b15bc936a0c534f51abe2c7643b
parentc4476d1b6aebec0ea3198e0203c7d35bedbea57a (diff)
downloadzuul-42e1e1e324ac9d29b15dc4c8ba2ca58d9b219e88.tar.gz
Parallelize config cache loading
Loading config involves significant network operations for each project: * Loading project keys * Asking the source for the list of branches for each project * Retrieving the config file contents from the ZK cache (if present) * Retrieving the config file contents from git (otherwise) Only the third item in that list is parallelized currently; the others are serialized. To parallelize the remainder, use a thread pool executor. The value of max_workers=4 is chosen as it appears in practice on OpenDev to make the most significant reduction in startup time while higher values make little difference (and could potentially contribute to DoS scenarios or local thread contention). Observed config priming times for various worker counts: 1: 282s 2: 181s 4: 144s 8: 146s Change-Id: I65472a8af96ed95eb28b88cc623ef103be76a46f
-rw-r--r--tests/unit/test_client.py3
-rw-r--r--zuul/configloader.py190
-rw-r--r--zuul/zk/__init__.py9
3 files changed, 121 insertions, 81 deletions
diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py
index 307051419..b51639952 100644
--- a/tests/unit/test_client.py
+++ b/tests/unit/test_client.py
@@ -315,6 +315,9 @@ class TestOfflineZKOperations(ZuulTestCase):
def assertFinalState(self):
pass
+ def assertCleanShutdown(self):
+ pass
+
def test_delete_state(self):
# Shut everything down (as much as possible) to reduce
# logspam and errors.
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 8f453756b..3356d67e4 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -12,6 +12,7 @@
import collections
from contextlib import contextmanager
+from concurrent.futures import ThreadPoolExecutor, as_completed
import copy
import itertools
import os
@@ -1576,7 +1577,7 @@ class TenantParser(object):
}
return vs.Schema(tenant)
- def fromYaml(self, abide, conf, ansible_manager, min_ltimes=None,
+ def fromYaml(self, abide, conf, ansible_manager, executor, min_ltimes=None,
layout_uuid=None, branch_cache_min_ltimes=None,
ignore_cat_exception=True):
# Note: This vs schema validation is not necessary in most cases as we
@@ -1622,7 +1623,15 @@ class TenantParser(object):
# We prepare a stack to store config loading issues
loading_errors = model.LoadingErrors()
+ # Get branches in parallel
+ branch_futures = {}
for tpc in config_tpcs + untrusted_tpcs:
+ future = executor.submit(self._getProjectBranches,
+ tenant, tpc, branch_cache_min_ltimes)
+ branch_futures[future] = tpc
+
+ for branch_future in as_completed(branch_futures.keys()):
+ tpc = branch_futures[branch_future]
source_context = model.ProjectContext(
tpc.project.canonical_name, tpc.project.name)
with project_configuration_exceptions(source_context,
@@ -1645,7 +1654,7 @@ class TenantParser(object):
# already cached. Full reconfigurations start with an empty
# cache.
self._cacheTenantYAML(abide, tenant, loading_errors, min_ltimes,
- ignore_cat_exception)
+ executor, ignore_cat_exception)
# Then collect the appropriate YAML based on this tenant
# config.
@@ -1816,7 +1825,7 @@ class TenantParser(object):
raise Exception("Unable to parse project %s", conf)
return projects
- def loadTenantProjects(self, conf_tenant):
+ def loadTenantProjects(self, conf_tenant, executor):
config_projects = []
untrusted_projects = []
@@ -1824,6 +1833,7 @@ class TenantParser(object):
'secret', 'project-template', 'nodeset',
'queue'])
+ futures = []
for source_name, conf_source in conf_tenant.get('source', {}).items():
source = self.connections.getSource(source_name)
@@ -1832,7 +1842,8 @@ class TenantParser(object):
# tpcs = TenantProjectConfigs
tpcs = self._getProjects(source, conf_repo, current_include)
for tpc in tpcs:
- self._loadProjectKeys(source_name, tpc.project)
+ futures.append(executor.submit(
+ self._loadProjectKeys, source_name, tpc.project))
config_projects.append(tpc)
current_include = frozenset(default_include - set(['pipeline']))
@@ -1840,13 +1851,16 @@ class TenantParser(object):
tpcs = self._getProjects(source, conf_repo,
current_include)
for tpc in tpcs:
- self._loadProjectKeys(source_name, tpc.project)
+ futures.append(executor.submit(
+ self._loadProjectKeys, source_name, tpc.project))
untrusted_projects.append(tpc)
+ for f in futures:
+ f.result()
return config_projects, untrusted_projects
def _cacheTenantYAML(self, abide, tenant, loading_errors, min_ltimes,
- ignore_cat_exception=True):
+ executor, ignore_cat_exception=True):
# min_ltimes can be the following: None (that means that we
# should not use the file cache at all) or a nested dict of
# project and branch to ltime. A value of None usually means
@@ -1910,6 +1924,7 @@ class TenantParser(object):
jobs = []
+ futures = []
for project in itertools.chain(
tenant.config_projects, tenant.untrusted_projects):
tpc = tenant.project_configs[project.canonical_name]
@@ -1923,67 +1938,13 @@ class TenantParser(object):
# If all config classes are excluded then do not
# request any getFiles jobs.
continue
+ futures.append(executor.submit(self._cacheTenantYAMLBranch,
+ abide, tenant, loading_errors,
+ min_ltimes, tpc, project,
+ branch, jobs))
+ for future in futures:
+ future.result()
- source_context = model.SourceContext(
- project.canonical_name, project.name,
- project.connection_name, branch, '', False)
- if min_ltimes is not None:
- files_cache = self.unparsed_config_cache.getFilesCache(
- project.canonical_name, branch)
- branch_cache = abide.getUnparsedBranchCache(
- project.canonical_name, branch)
- try:
- pb_ltime = min_ltimes[project.canonical_name][branch]
- except KeyError:
- self.log.exception(
- "Min. ltime missing for project/branch")
- pb_ltime = -1
-
- # If our unparsed branch cache is valid for the
- # time, then we don't need to do anything else.
- if branch_cache.isValidFor(tpc, pb_ltime):
- min_ltimes[project.canonical_name][branch] =\
- branch_cache.ltime
- continue
-
- with self.unparsed_config_cache.readLock(
- project.canonical_name):
- if files_cache.isValidFor(tpc, pb_ltime):
- self.log.debug(
- "Using files from cache for project "
- "%s @%s: %s",
- project.canonical_name, branch,
- list(files_cache.keys()))
- self._updateUnparsedBranchCache(
- abide, tenant, source_context, files_cache,
- loading_errors, files_cache.ltime,
- min_ltimes)
- continue
-
- extra_config_files = abide.getExtraConfigFiles(project.name)
- extra_config_dirs = abide.getExtraConfigDirs(project.name)
- if not self.merger:
- with project_configuration_exceptions(source_context,
- loading_errors):
- raise Exception(
- "Configuration files missing from cache. "
- "Check Zuul scheduler logs for more information.")
- continue
- ltime = self.zk_client.getCurrentLtime()
- job = self.merger.getFiles(
- project.source.connection.connection_name,
- project.name, branch,
- files=(['zuul.yaml', '.zuul.yaml'] +
- list(extra_config_files)),
- dirs=['zuul.d', '.zuul.d'] + list(extra_config_dirs))
- self.log.debug("Submitting cat job %s for %s %s %s" % (
- job, project.source.connection.connection_name,
- project.name, branch))
- job.extra_config_files = extra_config_files
- job.extra_config_dirs = extra_config_dirs
- job.ltime = ltime
- job.source_context = source_context
- jobs.append(job)
try:
self._processCatJobs(abide, tenant, loading_errors, jobs,
min_ltimes)
@@ -2000,7 +1961,76 @@ class TenantParser(object):
if not ignore_cat_exception:
raise
+ def _cacheTenantYAMLBranch(self, abide, tenant, loading_errors, min_ltimes,
+ tpc, project, branch, jobs):
+ # This is the middle section of _cacheTenantYAML, called for
+ # each project-branch. It's a separate method so we can
+ # execute it in parallel. The "jobs" argument is mutated and
+ # accumulates a list of all merger jobs submitted.
+ source_context = model.SourceContext(
+ project.canonical_name, project.name,
+ project.connection_name, branch, '', False)
+ if min_ltimes is not None:
+ files_cache = self.unparsed_config_cache.getFilesCache(
+ project.canonical_name, branch)
+ branch_cache = abide.getUnparsedBranchCache(
+ project.canonical_name, branch)
+ try:
+ pb_ltime = min_ltimes[project.canonical_name][branch]
+ except KeyError:
+ self.log.exception(
+ "Min. ltime missing for project/branch")
+ pb_ltime = -1
+
+ # If our unparsed branch cache is valid for the
+ # time, then we don't need to do anything else.
+ if branch_cache.isValidFor(tpc, pb_ltime):
+ min_ltimes[project.canonical_name][branch] =\
+ branch_cache.ltime
+ return
+
+ with self.unparsed_config_cache.readLock(
+ project.canonical_name):
+ if files_cache.isValidFor(tpc, pb_ltime):
+ self.log.debug(
+ "Using files from cache for project "
+ "%s @%s: %s",
+ project.canonical_name, branch,
+ list(files_cache.keys()))
+ self._updateUnparsedBranchCache(
+ abide, tenant, source_context, files_cache,
+ loading_errors, files_cache.ltime,
+ min_ltimes)
+ return
+
+ extra_config_files = abide.getExtraConfigFiles(project.name)
+ extra_config_dirs = abide.getExtraConfigDirs(project.name)
+ if not self.merger:
+ with project_configuration_exceptions(source_context,
+ loading_errors):
+ raise Exception(
+ "Configuration files missing from cache. "
+ "Check Zuul scheduler logs for more information.")
+ return
+ ltime = self.zk_client.getCurrentLtime()
+ job = self.merger.getFiles(
+ project.source.connection.connection_name,
+ project.name, branch,
+ files=(['zuul.yaml', '.zuul.yaml'] +
+ list(extra_config_files)),
+ dirs=['zuul.d', '.zuul.d'] + list(extra_config_dirs))
+ self.log.debug("Submitting cat job %s for %s %s %s" % (
+ job, project.source.connection.connection_name,
+ project.name, branch))
+ job.extra_config_files = extra_config_files
+ job.extra_config_dirs = extra_config_dirs
+ job.ltime = ltime
+ job.source_context = source_context
+ jobs.append(job)
+
def _processCatJobs(self, abide, tenant, loading_errors, jobs, min_ltimes):
+ # Called at the end of _cacheTenantYAML after all cat jobs
+ # have been submitted
for job in jobs:
self.log.debug("Waiting for cat job %s" % (job,))
res = job.wait(self.merger.git_timeout)
@@ -2475,15 +2505,17 @@ class ConfigLoader(object):
# Pre-load TenantProjectConfigs so we can get and cache all of a
# project's config files (incl. tenant specific extra config) at once.
- for tenant_name, unparsed_config in tenants_to_load.items():
- config_tpcs, untrusted_tpcs = (
- self.tenant_parser.loadTenantProjects(unparsed_config)
- )
- abide.clearTPCs(tenant_name)
- for tpc in config_tpcs:
- abide.addConfigTPC(tenant_name, tpc)
- for tpc in untrusted_tpcs:
- abide.addUntrustedTPC(tenant_name, tpc)
+ with ThreadPoolExecutor(max_workers=4) as executor:
+ for tenant_name, unparsed_config in tenants_to_load.items():
+ config_tpcs, untrusted_tpcs = (
+ self.tenant_parser.loadTenantProjects(unparsed_config,
+ executor)
+ )
+ abide.clearTPCs(tenant_name)
+ for tpc in config_tpcs:
+ abide.addConfigTPC(tenant_name, tpc)
+ for tpc in untrusted_tpcs:
+ abide.addUntrustedTPC(tenant_name, tpc)
def loadTenant(self, abide, tenant_name, ansible_manager, unparsed_abide,
min_ltimes=None, layout_uuid=None,
@@ -2562,9 +2594,11 @@ class ConfigLoader(object):
return None
unparsed_config = unparsed_abide.tenants[tenant_name]
- new_tenant = self.tenant_parser.fromYaml(
- abide, unparsed_config, ansible_manager, min_ltimes, layout_uuid,
- branch_cache_min_ltimes, ignore_cat_exception)
+ with ThreadPoolExecutor(max_workers=4) as executor:
+ new_tenant = self.tenant_parser.fromYaml(
+ abide, unparsed_config, ansible_manager, executor,
+ min_ltimes, layout_uuid, branch_cache_min_ltimes,
+ ignore_cat_exception)
# Copy tenants dictionary to not break concurrent iterations.
tenants = abide.tenants.copy()
tenants[tenant_name] = new_tenant
diff --git a/zuul/zk/__init__.py b/zuul/zk/__init__.py
index 195197c5f..8ddc7511d 100644
--- a/zuul/zk/__init__.py
+++ b/zuul/zk/__init__.py
@@ -17,7 +17,7 @@ from threading import Thread
from typing import List, Callable
from kazoo.client import KazooClient
-from kazoo.exceptions import NoNodeError
+from kazoo.exceptions import NoNodeError, NodeExistsError
from kazoo.handlers.threading import KazooTimeoutError
from kazoo.protocol.states import KazooState
@@ -211,8 +211,11 @@ class ZooKeeperClient(object):
try:
zstat = self.client.set("/zuul/ltime", b"")
except NoNodeError:
- self.client.create("/zuul/ltime", b"", makepath=True)
- zstat = self.client.set("/zuul/ltime", b"")
+ try:
+ self.client.create("/zuul/ltime", b"", makepath=True)
+ zstat = self.client.set("/zuul/ltime", b"")
+ except NodeExistsError:
+ zstat = self.client.set("/zuul/ltime", b"")
return zstat.last_modified_transaction_id