diff options
author | Zuul <zuul@review.opendev.org> | 2021-09-18 07:32:50 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2021-09-18 07:32:50 +0000 |
commit | fc4324e4a4d19f5d2c6ce669241faacb8a1b2294 (patch) | |
tree | 5ee9d2dcd5152a4b66f16954d7d86243031cda52 /zuul | |
parent | 7af43971f25c594f0f95a976f8eeb4027c584b78 (diff) | |
parent | 0d635181f8d693c57b59d5144b997c0c8fe715ed (diff) | |
download | zuul-fc4324e4a4d19f5d2c6ce669241faacb8a1b2294.tar.gz |
Merge "Periodically maintain connection caches"
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/connection/__init__.py | 10 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritconnection.py | 10 | ||||
-rw-r--r-- | zuul/driver/git/gitconnection.py | 7 | ||||
-rw-r--r-- | zuul/driver/github/githubconnection.py | 7 | ||||
-rw-r--r-- | zuul/driver/gitlab/gitlabconnection.py | 7 | ||||
-rw-r--r-- | zuul/driver/pagure/pagureconnection.py | 7 | ||||
-rw-r--r-- | zuul/model.py | 5 | ||||
-rw-r--r-- | zuul/scheduler.py | 32 | ||||
-rw-r--r-- | zuul/zk/change_cache.py | 16 |
9 files changed, 47 insertions, 54 deletions
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py index 480ee7daa..ff6c8b32f 100644 --- a/zuul/connection/__init__.py +++ b/zuul/connection/__init__.py @@ -77,13 +77,13 @@ class BaseConnection(object, metaclass=abc.ABCMeta): def registerScheduler(self, sched) -> None: self.sched = sched - def maintainCache(self, relevant): + def maintainCache(self, relevant, max_age): + """Remove stale changes from the cache. - """Make cache contain relevant changes. - - This lets the user supply a list of change objects that are + This lets the user supply a list of change cache keys that are still in use. Anything in our cache that isn't in the supplied - list should be safe to remove from the cache.""" + list and is older than the given max. age (in seconds) should + be safe to remove from the cache.""" pass def getWebController(self, zuul_web): diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 29a3eec79..ba7beb411 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -738,14 +738,8 @@ class GerritConnection(BaseConnection): except KeyError: pass - def maintainCache(self, relevant): - # This lets the user supply a list of change objects that are - # still in use. Anything in our cache that isn't in the supplied - # list should be safe to remove from the cache. - for change in self._change_cache: - if change not in relevant: - self._change_cache.delete(change.cache_stat.key) - # TODO: remove entries older than X + def maintainCache(self, relevant, max_age): + self._change_cache.prune(relevant, max_age) self._change_cache.cleanup() def updateChangeAttributes(self, change, **attrs): diff --git a/zuul/driver/git/gitconnection.py b/zuul/driver/git/gitconnection.py index 36b7242cd..61796a50b 100644 --- a/zuul/driver/git/gitconnection.py +++ b/zuul/driver/git/gitconnection.py @@ -103,11 +103,8 @@ class GitConnection(BaseConnection): refs[ref] = sha return refs - def maintainCache(self, relevant): - for change in self._change_cache: - if change not in relevant: - self._change_cache.delete(change.cache_stat.key) - # TODO: remove entries older than X + def maintainCache(self, relevant, max_age): + self._change_cache.prune(relevant, max_age) self._change_cache.cleanup() def getChange(self, event, refresh=False): diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 9fd9c0a2e..f6a45732f 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -1262,11 +1262,8 @@ class GithubConnection(CachedBranchConnection): return self._github_client_manager.getGithubClient( project_name=project_name, zuul_event_id=zuul_event_id) - def maintainCache(self, relevant): - for change in self._change_cache: - if change not in relevant: - self._change_cache.delete(change.cache_stat.key) - # TODO: remove entries older than X + def maintainCache(self, relevant, max_age): + self._change_cache.prune(relevant, max_age) self._change_cache.cleanup() def updateChangeAttributes(self, change, **attrs): diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py index f26b19582..fed4eca21 100644 --- a/zuul/driver/gitlab/gitlabconnection.py +++ b/zuul/driver/gitlab/gitlabconnection.py @@ -459,11 +459,8 @@ class GitlabConnection(CachedBranchConnection): if hasattr(self, 'gitlab_event_connector'): self._stop_event_connector() - def maintainCache(self, relevant): - for change in self._change_cache: - if change not in relevant: - self._change_cache.delete(change.cache_stat.key) - # TODO: remove entries older than X + def maintainCache(self, relevant, max_age): + self._change_cache.prune(relevant, max_age) self._change_cache.cleanup() def updateChangeAttributes(self, change, **attrs): diff --git a/zuul/driver/pagure/pagureconnection.py b/zuul/driver/pagure/pagureconnection.py index 084585d38..016ef82b8 100644 --- a/zuul/driver/pagure/pagureconnection.py +++ b/zuul/driver/pagure/pagureconnection.py @@ -545,11 +545,8 @@ class PagureConnection(BaseConnection): "Fetching project %s webhook token from API" % project) return token - def maintainCache(self, relevant): - for change in self._change_cache: - if change not in relevant: - self._change_cache.delete(change.cache_stat.key) - # TODO: remove entries older than X + def maintainCache(self, relevant, max_age): + self._change_cache.prune(relevant, max_age) self._change_cache.cleanup() def updateChangeAttributes(self, change, **attrs): diff --git a/zuul/model.py b/zuul/model.py index c3918857c..d1b248e87 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -3570,7 +3570,8 @@ class Bundle: # Cache info of a ref -CacheStat = namedtuple("CacheStat", ["key", "uuid", "version"]) +CacheStat = namedtuple("CacheStat", + ["key", "uuid", "version", "last_modified"]) class Ref(object): @@ -3583,7 +3584,7 @@ class Ref(object): self.newrev = None self.files = [] # Cache info about this ref: - # CacheStat(cache key, uuid, version) + # CacheStat(cache key, uuid, version, last_modified) self.cache_stat = None @property diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 8cfaa0966..7dad6432f 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -543,6 +543,7 @@ class Scheduler(threading.Thread): self._runConfigCacheCleanup() self._runExecutorApiCleanup() self._runMergerApiCleanup() + self.maintainConnectionCache() finally: self.general_cleanup_lock.release() # This has its own locking @@ -1273,8 +1274,6 @@ class Scheduler(threading.Thread): if old_tenant: self._reenqueueTenant(old_tenant, tenant) - # TODOv3(jeblair): update for tenants - # self.maintainConnectionCache() self.connections.reconfigureDrivers(tenant) # TODOv3(jeblair): remove postconfig calls? @@ -1604,21 +1603,22 @@ class Scheduler(threading.Thread): pipeline.state = pipeline.STATE_NORMAL def maintainConnectionCache(self): - # TODOv3(jeblair): update for tenants relevant = set() - for tenant in self.abide.tenants.values(): - for pipeline in tenant.layout.pipelines.values(): - self.log.debug("Gather relevant cache items for: %s" % - pipeline) - - for item in pipeline.getAllItems(): - relevant.add(item.change) - relevant.update(item.change.getRelatedChanges()) - for connection in self.connections.values(): - connection.maintainCache(relevant) - self.log.debug( - "End maintain connection cache for: %s" % connection) - self.log.debug("Connection cache size: %s" % len(relevant)) + with self.layout_lock: + for tenant in self.abide.tenants.values(): + for pipeline in tenant.layout.pipelines.values(): + self.log.debug("Gather relevant cache items for: %s", + pipeline) + for item in pipeline.getAllItems(): + relevant.add(item.change.cache_stat.key) + relevant.update(item.change.getRelatedChanges()) + + # We'll only remove changes older than `max_age` from the cache, as + # it may take a while for an event that was processed by a connection + # (which updated/populated the cache) to end up in a pipeline. + for connection in self.connections.connections.values(): + connection.maintainCache(relevant, max_age=7200) # 2h + self.log.debug("End maintain connection cache for: %s", connection) def process_tenant_trigger_queue(self, tenant): try: diff --git a/zuul/zk/change_cache.py b/zuul/zk/change_cache.py index 55bb68ff8..dea69013c 100644 --- a/zuul/zk/change_cache.py +++ b/zuul/zk/change_cache.py @@ -18,6 +18,7 @@ import json import logging import os import threading +import time import uuid from collections import defaultdict from collections.abc import Iterable @@ -127,11 +128,18 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC): data_uuid = data.decode("utf8") self._get(key, data_uuid, zstat) + def prune(self, relevant, max_age=3600): # 1h + cutoff_time = time.time() - max_age + outdated = {c.cache_stat.key for c in list(self._change_cache.values()) + if c.cache_stat.last_modified < cutoff_time} + to_prune = outdated - set(relevant) + for key in to_prune: + self.delete(key) + def cleanup(self): valid_uuids = {c.cache_stat.uuid for c in list(self._change_cache.values())} stale_uuids = self._data_cleanup_candidates - valid_uuids - self.log.debug("Cleaning up stale data: %s", stale_uuids) for data_uuid in stale_uuids: self.kazoo_client.delete(self._dataPath(data_uuid), recursive=True) @@ -185,7 +193,8 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC): else: change = self._changeFromData(data) - change.cache_stat = model.CacheStat(key, data_uuid, zstat.version) + change.cache_stat = model.CacheStat(key, data_uuid, zstat.version, + zstat.last_modified) # Use setdefault here so we only have a single instance of a change # around. In case of a concurrent get this might return a different # change instance than the one we just created. @@ -220,7 +229,8 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC): except (BadVersionError, NodeExistsError, NoNodeError) as exc: raise ConcurrentUpdateError from exc - change.cache_stat = model.CacheStat(key, data_uuid, zstat.version) + change.cache_stat = model.CacheStat( + key, data_uuid, zstat.version, zstat.last_modified) self._change_cache[key] = change def _setData(self, data): |