summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-06-23 15:22:06 +0000
committerGerrit Code Review <review@openstack.org>2022-06-23 15:22:06 +0000
commitf2d4ff276b2e22be982cf261864dd938f707288a (patch)
tree263ff953fabb8fc71f175f6b9ce583d429a5d0b0
parent2f2e0ce28ce0275160865a33d76a866bd7f53f03 (diff)
parent42e1e1e324ac9d29b15dc4c8ba2ca58d9b219e88 (diff)
downloadzuul-f2d4ff276b2e22be982cf261864dd938f707288a.tar.gz
Merge "Parallelize config cache loading"
-rw-r--r--tests/unit/test_client.py3
-rw-r--r--zuul/configloader.py190
-rw-r--r--zuul/zk/__init__.py9
3 files changed, 121 insertions, 81 deletions
diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py
index 307051419..b51639952 100644
--- a/tests/unit/test_client.py
+++ b/tests/unit/test_client.py
@@ -315,6 +315,9 @@ class TestOfflineZKOperations(ZuulTestCase):
def assertFinalState(self):
pass
+ def assertCleanShutdown(self):
+ pass
+
def test_delete_state(self):
# Shut everything down (as much as possible) to reduce
# logspam and errors.
diff --git a/zuul/configloader.py b/zuul/configloader.py
index f4c274669..eb468518f 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -12,6 +12,7 @@
import collections
from contextlib import contextmanager
+from concurrent.futures import ThreadPoolExecutor, as_completed
import copy
import itertools
import os
@@ -1608,7 +1609,7 @@ class TenantParser(object):
}
return vs.Schema(tenant)
- def fromYaml(self, abide, conf, ansible_manager, min_ltimes=None,
+ def fromYaml(self, abide, conf, ansible_manager, executor, min_ltimes=None,
layout_uuid=None, branch_cache_min_ltimes=None,
ignore_cat_exception=True):
# Note: This vs schema validation is not necessary in most cases as we
@@ -1659,7 +1660,15 @@ class TenantParser(object):
# We prepare a stack to store config loading issues
loading_errors = model.LoadingErrors()
+ # Get branches in parallel
+ branch_futures = {}
for tpc in config_tpcs + untrusted_tpcs:
+ future = executor.submit(self._getProjectBranches,
+ tenant, tpc, branch_cache_min_ltimes)
+ branch_futures[future] = tpc
+
+ for branch_future in as_completed(branch_futures.keys()):
+ tpc = branch_futures[branch_future]
source_context = model.ProjectContext(
tpc.project.canonical_name, tpc.project.name)
with project_configuration_exceptions(source_context,
@@ -1682,7 +1691,7 @@ class TenantParser(object):
# already cached. Full reconfigurations start with an empty
# cache.
self._cacheTenantYAML(abide, tenant, loading_errors, min_ltimes,
- ignore_cat_exception)
+ executor, ignore_cat_exception)
# Then collect the appropriate YAML based on this tenant
# config.
@@ -1879,7 +1888,7 @@ class TenantParser(object):
raise Exception("Unable to parse project %s", conf)
return projects
- def loadTenantProjects(self, conf_tenant):
+ def loadTenantProjects(self, conf_tenant, executor):
config_projects = []
untrusted_projects = []
@@ -1887,6 +1896,7 @@ class TenantParser(object):
'secret', 'project-template', 'nodeset',
'queue'])
+ futures = []
for source_name, conf_source in conf_tenant.get('source', {}).items():
source = self.connections.getSource(source_name)
@@ -1895,7 +1905,8 @@ class TenantParser(object):
# tpcs = TenantProjectConfigs
tpcs = self._getProjects(source, conf_repo, current_include)
for tpc in tpcs:
- self._loadProjectKeys(source_name, tpc.project)
+ futures.append(executor.submit(
+ self._loadProjectKeys, source_name, tpc.project))
config_projects.append(tpc)
current_include = frozenset(default_include - set(['pipeline']))
@@ -1903,13 +1914,16 @@ class TenantParser(object):
tpcs = self._getProjects(source, conf_repo,
current_include)
for tpc in tpcs:
- self._loadProjectKeys(source_name, tpc.project)
+ futures.append(executor.submit(
+ self._loadProjectKeys, source_name, tpc.project))
untrusted_projects.append(tpc)
+ for f in futures:
+ f.result()
return config_projects, untrusted_projects
def _cacheTenantYAML(self, abide, tenant, loading_errors, min_ltimes,
- ignore_cat_exception=True):
+ executor, ignore_cat_exception=True):
# min_ltimes can be the following: None (that means that we
# should not use the file cache at all) or a nested dict of
# project and branch to ltime. A value of None usually means
@@ -1973,6 +1987,7 @@ class TenantParser(object):
jobs = []
+ futures = []
for project in itertools.chain(
tenant.config_projects, tenant.untrusted_projects):
tpc = tenant.project_configs[project.canonical_name]
@@ -1986,67 +2001,13 @@ class TenantParser(object):
# If all config classes are excluded then do not
# request any getFiles jobs.
continue
+ futures.append(executor.submit(self._cacheTenantYAMLBranch,
+ abide, tenant, loading_errors,
+ min_ltimes, tpc, project,
+ branch, jobs))
+ for future in futures:
+ future.result()
- source_context = model.SourceContext(
- project.canonical_name, project.name,
- project.connection_name, branch, '', False)
- if min_ltimes is not None:
- files_cache = self.unparsed_config_cache.getFilesCache(
- project.canonical_name, branch)
- branch_cache = abide.getUnparsedBranchCache(
- project.canonical_name, branch)
- try:
- pb_ltime = min_ltimes[project.canonical_name][branch]
- except KeyError:
- self.log.exception(
- "Min. ltime missing for project/branch")
- pb_ltime = -1
-
- # If our unparsed branch cache is valid for the
- # time, then we don't need to do anything else.
- if branch_cache.isValidFor(tpc, pb_ltime):
- min_ltimes[project.canonical_name][branch] =\
- branch_cache.ltime
- continue
-
- with self.unparsed_config_cache.readLock(
- project.canonical_name):
- if files_cache.isValidFor(tpc, pb_ltime):
- self.log.debug(
- "Using files from cache for project "
- "%s @%s: %s",
- project.canonical_name, branch,
- list(files_cache.keys()))
- self._updateUnparsedBranchCache(
- abide, tenant, source_context, files_cache,
- loading_errors, files_cache.ltime,
- min_ltimes)
- continue
-
- extra_config_files = abide.getExtraConfigFiles(project.name)
- extra_config_dirs = abide.getExtraConfigDirs(project.name)
- if not self.merger:
- with project_configuration_exceptions(source_context,
- loading_errors):
- raise Exception(
- "Configuration files missing from cache. "
- "Check Zuul scheduler logs for more information.")
- continue
- ltime = self.zk_client.getCurrentLtime()
- job = self.merger.getFiles(
- project.source.connection.connection_name,
- project.name, branch,
- files=(['zuul.yaml', '.zuul.yaml'] +
- list(extra_config_files)),
- dirs=['zuul.d', '.zuul.d'] + list(extra_config_dirs))
- self.log.debug("Submitting cat job %s for %s %s %s" % (
- job, project.source.connection.connection_name,
- project.name, branch))
- job.extra_config_files = extra_config_files
- job.extra_config_dirs = extra_config_dirs
- job.ltime = ltime
- job.source_context = source_context
- jobs.append(job)
try:
self._processCatJobs(abide, tenant, loading_errors, jobs,
min_ltimes)
@@ -2063,7 +2024,76 @@ class TenantParser(object):
if not ignore_cat_exception:
raise
+ def _cacheTenantYAMLBranch(self, abide, tenant, loading_errors, min_ltimes,
+ tpc, project, branch, jobs):
+ # This is the middle section of _cacheTenantYAML, called for
+ # each project-branch. It's a separate method so we can
+ # execute it in parallel. The "jobs" argument is mutated and
+ # accumulates a list of all merger jobs submitted.
+ source_context = model.SourceContext(
+ project.canonical_name, project.name,
+ project.connection_name, branch, '', False)
+ if min_ltimes is not None:
+ files_cache = self.unparsed_config_cache.getFilesCache(
+ project.canonical_name, branch)
+ branch_cache = abide.getUnparsedBranchCache(
+ project.canonical_name, branch)
+ try:
+ pb_ltime = min_ltimes[project.canonical_name][branch]
+ except KeyError:
+ self.log.exception(
+ "Min. ltime missing for project/branch")
+ pb_ltime = -1
+
+ # If our unparsed branch cache is valid for the
+ # time, then we don't need to do anything else.
+ if branch_cache.isValidFor(tpc, pb_ltime):
+ min_ltimes[project.canonical_name][branch] =\
+ branch_cache.ltime
+ return
+
+ with self.unparsed_config_cache.readLock(
+ project.canonical_name):
+ if files_cache.isValidFor(tpc, pb_ltime):
+ self.log.debug(
+ "Using files from cache for project "
+ "%s @%s: %s",
+ project.canonical_name, branch,
+ list(files_cache.keys()))
+ self._updateUnparsedBranchCache(
+ abide, tenant, source_context, files_cache,
+ loading_errors, files_cache.ltime,
+ min_ltimes)
+ return
+
+ extra_config_files = abide.getExtraConfigFiles(project.name)
+ extra_config_dirs = abide.getExtraConfigDirs(project.name)
+ if not self.merger:
+ with project_configuration_exceptions(source_context,
+ loading_errors):
+ raise Exception(
+ "Configuration files missing from cache. "
+ "Check Zuul scheduler logs for more information.")
+ return
+ ltime = self.zk_client.getCurrentLtime()
+ job = self.merger.getFiles(
+ project.source.connection.connection_name,
+ project.name, branch,
+ files=(['zuul.yaml', '.zuul.yaml'] +
+ list(extra_config_files)),
+ dirs=['zuul.d', '.zuul.d'] + list(extra_config_dirs))
+ self.log.debug("Submitting cat job %s for %s %s %s" % (
+ job, project.source.connection.connection_name,
+ project.name, branch))
+ job.extra_config_files = extra_config_files
+ job.extra_config_dirs = extra_config_dirs
+ job.ltime = ltime
+ job.source_context = source_context
+ jobs.append(job)
+
def _processCatJobs(self, abide, tenant, loading_errors, jobs, min_ltimes):
+ # Called at the end of _cacheTenantYAML after all cat jobs
+ # have been submitted
for job in jobs:
self.log.debug("Waiting for cat job %s" % (job,))
res = job.wait(self.merger.git_timeout)
@@ -2545,15 +2575,17 @@ class ConfigLoader(object):
# Pre-load TenantProjectConfigs so we can get and cache all of a
# project's config files (incl. tenant specific extra config) at once.
- for tenant_name, unparsed_config in tenants_to_load.items():
- config_tpcs, untrusted_tpcs = (
- self.tenant_parser.loadTenantProjects(unparsed_config)
- )
- abide.clearTPCs(tenant_name)
- for tpc in config_tpcs:
- abide.addConfigTPC(tenant_name, tpc)
- for tpc in untrusted_tpcs:
- abide.addUntrustedTPC(tenant_name, tpc)
+ with ThreadPoolExecutor(max_workers=4) as executor:
+ for tenant_name, unparsed_config in tenants_to_load.items():
+ config_tpcs, untrusted_tpcs = (
+ self.tenant_parser.loadTenantProjects(unparsed_config,
+ executor)
+ )
+ abide.clearTPCs(tenant_name)
+ for tpc in config_tpcs:
+ abide.addConfigTPC(tenant_name, tpc)
+ for tpc in untrusted_tpcs:
+ abide.addUntrustedTPC(tenant_name, tpc)
def loadTenant(self, abide, tenant_name, ansible_manager, unparsed_abide,
min_ltimes=None, layout_uuid=None,
@@ -2632,9 +2664,11 @@ class ConfigLoader(object):
return None
unparsed_config = unparsed_abide.tenants[tenant_name]
- new_tenant = self.tenant_parser.fromYaml(
- abide, unparsed_config, ansible_manager, min_ltimes, layout_uuid,
- branch_cache_min_ltimes, ignore_cat_exception)
+ with ThreadPoolExecutor(max_workers=4) as executor:
+ new_tenant = self.tenant_parser.fromYaml(
+ abide, unparsed_config, ansible_manager, executor,
+ min_ltimes, layout_uuid, branch_cache_min_ltimes,
+ ignore_cat_exception)
# Copy tenants dictionary to not break concurrent iterations.
tenants = abide.tenants.copy()
tenants[tenant_name] = new_tenant
diff --git a/zuul/zk/__init__.py b/zuul/zk/__init__.py
index 195197c5f..8ddc7511d 100644
--- a/zuul/zk/__init__.py
+++ b/zuul/zk/__init__.py
@@ -17,7 +17,7 @@ from threading import Thread
from typing import List, Callable
from kazoo.client import KazooClient
-from kazoo.exceptions import NoNodeError
+from kazoo.exceptions import NoNodeError, NodeExistsError
from kazoo.handlers.threading import KazooTimeoutError
from kazoo.protocol.states import KazooState
@@ -211,8 +211,11 @@ class ZooKeeperClient(object):
try:
zstat = self.client.set("/zuul/ltime", b"")
except NoNodeError:
- self.client.create("/zuul/ltime", b"", makepath=True)
- zstat = self.client.set("/zuul/ltime", b"")
+ try:
+ self.client.create("/zuul/ltime", b"", makepath=True)
+ zstat = self.client.set("/zuul/ltime", b"")
+ except NodeExistsError:
+ zstat = self.client.set("/zuul/ltime", b"")
return zstat.last_modified_transaction_id