diff options
author | Zuul <zuul@review.opendev.org> | 2022-06-23 15:22:06 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2022-06-23 15:22:06 +0000 |
commit | f2d4ff276b2e22be982cf261864dd938f707288a (patch) | |
tree | 263ff953fabb8fc71f175f6b9ce583d429a5d0b0 | |
parent | 2f2e0ce28ce0275160865a33d76a866bd7f53f03 (diff) | |
parent | 42e1e1e324ac9d29b15dc4c8ba2ca58d9b219e88 (diff) | |
download | zuul-f2d4ff276b2e22be982cf261864dd938f707288a.tar.gz |
Merge "Parallelize config cache loading"
-rw-r--r-- | tests/unit/test_client.py | 3 | ||||
-rw-r--r-- | zuul/configloader.py | 190 | ||||
-rw-r--r-- | zuul/zk/__init__.py | 9 |
3 files changed, 121 insertions, 81 deletions
diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 307051419..b51639952 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -315,6 +315,9 @@ class TestOfflineZKOperations(ZuulTestCase): def assertFinalState(self): pass + def assertCleanShutdown(self): + pass + def test_delete_state(self): # Shut everything down (as much as possible) to reduce # logspam and errors. diff --git a/zuul/configloader.py b/zuul/configloader.py index f4c274669..eb468518f 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -12,6 +12,7 @@ import collections from contextlib import contextmanager +from concurrent.futures import ThreadPoolExecutor, as_completed import copy import itertools import os @@ -1608,7 +1609,7 @@ class TenantParser(object): } return vs.Schema(tenant) - def fromYaml(self, abide, conf, ansible_manager, min_ltimes=None, + def fromYaml(self, abide, conf, ansible_manager, executor, min_ltimes=None, layout_uuid=None, branch_cache_min_ltimes=None, ignore_cat_exception=True): # Note: This vs schema validation is not necessary in most cases as we @@ -1659,7 +1660,15 @@ class TenantParser(object): # We prepare a stack to store config loading issues loading_errors = model.LoadingErrors() + # Get branches in parallel + branch_futures = {} for tpc in config_tpcs + untrusted_tpcs: + future = executor.submit(self._getProjectBranches, + tenant, tpc, branch_cache_min_ltimes) + branch_futures[future] = tpc + + for branch_future in as_completed(branch_futures.keys()): + tpc = branch_futures[branch_future] source_context = model.ProjectContext( tpc.project.canonical_name, tpc.project.name) with project_configuration_exceptions(source_context, @@ -1682,7 +1691,7 @@ class TenantParser(object): # already cached. Full reconfigurations start with an empty # cache. self._cacheTenantYAML(abide, tenant, loading_errors, min_ltimes, - ignore_cat_exception) + executor, ignore_cat_exception) # Then collect the appropriate YAML based on this tenant # config. @@ -1879,7 +1888,7 @@ class TenantParser(object): raise Exception("Unable to parse project %s", conf) return projects - def loadTenantProjects(self, conf_tenant): + def loadTenantProjects(self, conf_tenant, executor): config_projects = [] untrusted_projects = [] @@ -1887,6 +1896,7 @@ class TenantParser(object): 'secret', 'project-template', 'nodeset', 'queue']) + futures = [] for source_name, conf_source in conf_tenant.get('source', {}).items(): source = self.connections.getSource(source_name) @@ -1895,7 +1905,8 @@ class TenantParser(object): # tpcs = TenantProjectConfigs tpcs = self._getProjects(source, conf_repo, current_include) for tpc in tpcs: - self._loadProjectKeys(source_name, tpc.project) + futures.append(executor.submit( + self._loadProjectKeys, source_name, tpc.project)) config_projects.append(tpc) current_include = frozenset(default_include - set(['pipeline'])) @@ -1903,13 +1914,16 @@ class TenantParser(object): tpcs = self._getProjects(source, conf_repo, current_include) for tpc in tpcs: - self._loadProjectKeys(source_name, tpc.project) + futures.append(executor.submit( + self._loadProjectKeys, source_name, tpc.project)) untrusted_projects.append(tpc) + for f in futures: + f.result() return config_projects, untrusted_projects def _cacheTenantYAML(self, abide, tenant, loading_errors, min_ltimes, - ignore_cat_exception=True): + executor, ignore_cat_exception=True): # min_ltimes can be the following: None (that means that we # should not use the file cache at all) or a nested dict of # project and branch to ltime. A value of None usually means @@ -1973,6 +1987,7 @@ class TenantParser(object): jobs = [] + futures = [] for project in itertools.chain( tenant.config_projects, tenant.untrusted_projects): tpc = tenant.project_configs[project.canonical_name] @@ -1986,67 +2001,13 @@ class TenantParser(object): # If all config classes are excluded then do not # request any getFiles jobs. continue + futures.append(executor.submit(self._cacheTenantYAMLBranch, + abide, tenant, loading_errors, + min_ltimes, tpc, project, + branch, jobs)) + for future in futures: + future.result() - source_context = model.SourceContext( - project.canonical_name, project.name, - project.connection_name, branch, '', False) - if min_ltimes is not None: - files_cache = self.unparsed_config_cache.getFilesCache( - project.canonical_name, branch) - branch_cache = abide.getUnparsedBranchCache( - project.canonical_name, branch) - try: - pb_ltime = min_ltimes[project.canonical_name][branch] - except KeyError: - self.log.exception( - "Min. ltime missing for project/branch") - pb_ltime = -1 - - # If our unparsed branch cache is valid for the - # time, then we don't need to do anything else. - if branch_cache.isValidFor(tpc, pb_ltime): - min_ltimes[project.canonical_name][branch] =\ - branch_cache.ltime - continue - - with self.unparsed_config_cache.readLock( - project.canonical_name): - if files_cache.isValidFor(tpc, pb_ltime): - self.log.debug( - "Using files from cache for project " - "%s @%s: %s", - project.canonical_name, branch, - list(files_cache.keys())) - self._updateUnparsedBranchCache( - abide, tenant, source_context, files_cache, - loading_errors, files_cache.ltime, - min_ltimes) - continue - - extra_config_files = abide.getExtraConfigFiles(project.name) - extra_config_dirs = abide.getExtraConfigDirs(project.name) - if not self.merger: - with project_configuration_exceptions(source_context, - loading_errors): - raise Exception( - "Configuration files missing from cache. " - "Check Zuul scheduler logs for more information.") - continue - ltime = self.zk_client.getCurrentLtime() - job = self.merger.getFiles( - project.source.connection.connection_name, - project.name, branch, - files=(['zuul.yaml', '.zuul.yaml'] + - list(extra_config_files)), - dirs=['zuul.d', '.zuul.d'] + list(extra_config_dirs)) - self.log.debug("Submitting cat job %s for %s %s %s" % ( - job, project.source.connection.connection_name, - project.name, branch)) - job.extra_config_files = extra_config_files - job.extra_config_dirs = extra_config_dirs - job.ltime = ltime - job.source_context = source_context - jobs.append(job) try: self._processCatJobs(abide, tenant, loading_errors, jobs, min_ltimes) @@ -2063,7 +2024,76 @@ class TenantParser(object): if not ignore_cat_exception: raise + def _cacheTenantYAMLBranch(self, abide, tenant, loading_errors, min_ltimes, + tpc, project, branch, jobs): + # This is the middle section of _cacheTenantYAML, called for + # each project-branch. It's a separate method so we can + # execute it in parallel. The "jobs" argument is mutated and + # accumulates a list of all merger jobs submitted. + source_context = model.SourceContext( + project.canonical_name, project.name, + project.connection_name, branch, '', False) + if min_ltimes is not None: + files_cache = self.unparsed_config_cache.getFilesCache( + project.canonical_name, branch) + branch_cache = abide.getUnparsedBranchCache( + project.canonical_name, branch) + try: + pb_ltime = min_ltimes[project.canonical_name][branch] + except KeyError: + self.log.exception( + "Min. ltime missing for project/branch") + pb_ltime = -1 + + # If our unparsed branch cache is valid for the + # time, then we don't need to do anything else. + if branch_cache.isValidFor(tpc, pb_ltime): + min_ltimes[project.canonical_name][branch] =\ + branch_cache.ltime + return + + with self.unparsed_config_cache.readLock( + project.canonical_name): + if files_cache.isValidFor(tpc, pb_ltime): + self.log.debug( + "Using files from cache for project " + "%s @%s: %s", + project.canonical_name, branch, + list(files_cache.keys())) + self._updateUnparsedBranchCache( + abide, tenant, source_context, files_cache, + loading_errors, files_cache.ltime, + min_ltimes) + return + + extra_config_files = abide.getExtraConfigFiles(project.name) + extra_config_dirs = abide.getExtraConfigDirs(project.name) + if not self.merger: + with project_configuration_exceptions(source_context, + loading_errors): + raise Exception( + "Configuration files missing from cache. " + "Check Zuul scheduler logs for more information.") + return + ltime = self.zk_client.getCurrentLtime() + job = self.merger.getFiles( + project.source.connection.connection_name, + project.name, branch, + files=(['zuul.yaml', '.zuul.yaml'] + + list(extra_config_files)), + dirs=['zuul.d', '.zuul.d'] + list(extra_config_dirs)) + self.log.debug("Submitting cat job %s for %s %s %s" % ( + job, project.source.connection.connection_name, + project.name, branch)) + job.extra_config_files = extra_config_files + job.extra_config_dirs = extra_config_dirs + job.ltime = ltime + job.source_context = source_context + jobs.append(job) + def _processCatJobs(self, abide, tenant, loading_errors, jobs, min_ltimes): + # Called at the end of _cacheTenantYAML after all cat jobs + # have been submitted for job in jobs: self.log.debug("Waiting for cat job %s" % (job,)) res = job.wait(self.merger.git_timeout) @@ -2545,15 +2575,17 @@ class ConfigLoader(object): # Pre-load TenantProjectConfigs so we can get and cache all of a # project's config files (incl. tenant specific extra config) at once. - for tenant_name, unparsed_config in tenants_to_load.items(): - config_tpcs, untrusted_tpcs = ( - self.tenant_parser.loadTenantProjects(unparsed_config) - ) - abide.clearTPCs(tenant_name) - for tpc in config_tpcs: - abide.addConfigTPC(tenant_name, tpc) - for tpc in untrusted_tpcs: - abide.addUntrustedTPC(tenant_name, tpc) + with ThreadPoolExecutor(max_workers=4) as executor: + for tenant_name, unparsed_config in tenants_to_load.items(): + config_tpcs, untrusted_tpcs = ( + self.tenant_parser.loadTenantProjects(unparsed_config, + executor) + ) + abide.clearTPCs(tenant_name) + for tpc in config_tpcs: + abide.addConfigTPC(tenant_name, tpc) + for tpc in untrusted_tpcs: + abide.addUntrustedTPC(tenant_name, tpc) def loadTenant(self, abide, tenant_name, ansible_manager, unparsed_abide, min_ltimes=None, layout_uuid=None, @@ -2632,9 +2664,11 @@ class ConfigLoader(object): return None unparsed_config = unparsed_abide.tenants[tenant_name] - new_tenant = self.tenant_parser.fromYaml( - abide, unparsed_config, ansible_manager, min_ltimes, layout_uuid, - branch_cache_min_ltimes, ignore_cat_exception) + with ThreadPoolExecutor(max_workers=4) as executor: + new_tenant = self.tenant_parser.fromYaml( + abide, unparsed_config, ansible_manager, executor, + min_ltimes, layout_uuid, branch_cache_min_ltimes, + ignore_cat_exception) # Copy tenants dictionary to not break concurrent iterations. tenants = abide.tenants.copy() tenants[tenant_name] = new_tenant diff --git a/zuul/zk/__init__.py b/zuul/zk/__init__.py index 195197c5f..8ddc7511d 100644 --- a/zuul/zk/__init__.py +++ b/zuul/zk/__init__.py @@ -17,7 +17,7 @@ from threading import Thread from typing import List, Callable from kazoo.client import KazooClient -from kazoo.exceptions import NoNodeError +from kazoo.exceptions import NoNodeError, NodeExistsError from kazoo.handlers.threading import KazooTimeoutError from kazoo.protocol.states import KazooState @@ -211,8 +211,11 @@ class ZooKeeperClient(object): try: zstat = self.client.set("/zuul/ltime", b"") except NoNodeError: - self.client.create("/zuul/ltime", b"", makepath=True) - zstat = self.client.set("/zuul/ltime", b"") + try: + self.client.create("/zuul/ltime", b"", makepath=True) + zstat = self.client.set("/zuul/ltime", b"") + except NodeExistsError: + zstat = self.client.set("/zuul/ltime", b"") return zstat.last_modified_transaction_id |