summaryrefslogtreecommitdiff
path: root/zuul
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2021-09-18 07:32:50 +0000
committerGerrit Code Review <review@openstack.org>2021-09-18 07:32:50 +0000
commitfc4324e4a4d19f5d2c6ce669241faacb8a1b2294 (patch)
tree5ee9d2dcd5152a4b66f16954d7d86243031cda52 /zuul
parent7af43971f25c594f0f95a976f8eeb4027c584b78 (diff)
parent0d635181f8d693c57b59d5144b997c0c8fe715ed (diff)
downloadzuul-fc4324e4a4d19f5d2c6ce669241faacb8a1b2294.tar.gz
Merge "Periodically maintain connection caches"
Diffstat (limited to 'zuul')
-rw-r--r--zuul/connection/__init__.py10
-rw-r--r--zuul/driver/gerrit/gerritconnection.py10
-rw-r--r--zuul/driver/git/gitconnection.py7
-rw-r--r--zuul/driver/github/githubconnection.py7
-rw-r--r--zuul/driver/gitlab/gitlabconnection.py7
-rw-r--r--zuul/driver/pagure/pagureconnection.py7
-rw-r--r--zuul/model.py5
-rw-r--r--zuul/scheduler.py32
-rw-r--r--zuul/zk/change_cache.py16
9 files changed, 47 insertions, 54 deletions
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index 480ee7daa..ff6c8b32f 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -77,13 +77,13 @@ class BaseConnection(object, metaclass=abc.ABCMeta):
def registerScheduler(self, sched) -> None:
self.sched = sched
- def maintainCache(self, relevant):
+ def maintainCache(self, relevant, max_age):
+ """Remove stale changes from the cache.
- """Make cache contain relevant changes.
-
- This lets the user supply a list of change objects that are
+ This lets the user supply a list of change cache keys that are
still in use. Anything in our cache that isn't in the supplied
- list should be safe to remove from the cache."""
+ list and is older than the given max. age (in seconds) should
+ be safe to remove from the cache."""
pass
def getWebController(self, zuul_web):
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 29a3eec79..ba7beb411 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -738,14 +738,8 @@ class GerritConnection(BaseConnection):
except KeyError:
pass
- def maintainCache(self, relevant):
- # This lets the user supply a list of change objects that are
- # still in use. Anything in our cache that isn't in the supplied
- # list should be safe to remove from the cache.
- for change in self._change_cache:
- if change not in relevant:
- self._change_cache.delete(change.cache_stat.key)
- # TODO: remove entries older than X
+ def maintainCache(self, relevant, max_age):
+ self._change_cache.prune(relevant, max_age)
self._change_cache.cleanup()
def updateChangeAttributes(self, change, **attrs):
diff --git a/zuul/driver/git/gitconnection.py b/zuul/driver/git/gitconnection.py
index 36b7242cd..61796a50b 100644
--- a/zuul/driver/git/gitconnection.py
+++ b/zuul/driver/git/gitconnection.py
@@ -103,11 +103,8 @@ class GitConnection(BaseConnection):
refs[ref] = sha
return refs
- def maintainCache(self, relevant):
- for change in self._change_cache:
- if change not in relevant:
- self._change_cache.delete(change.cache_stat.key)
- # TODO: remove entries older than X
+ def maintainCache(self, relevant, max_age):
+ self._change_cache.prune(relevant, max_age)
self._change_cache.cleanup()
def getChange(self, event, refresh=False):
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 9fd9c0a2e..f6a45732f 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -1262,11 +1262,8 @@ class GithubConnection(CachedBranchConnection):
return self._github_client_manager.getGithubClient(
project_name=project_name, zuul_event_id=zuul_event_id)
- def maintainCache(self, relevant):
- for change in self._change_cache:
- if change not in relevant:
- self._change_cache.delete(change.cache_stat.key)
- # TODO: remove entries older than X
+ def maintainCache(self, relevant, max_age):
+ self._change_cache.prune(relevant, max_age)
self._change_cache.cleanup()
def updateChangeAttributes(self, change, **attrs):
diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py
index f26b19582..fed4eca21 100644
--- a/zuul/driver/gitlab/gitlabconnection.py
+++ b/zuul/driver/gitlab/gitlabconnection.py
@@ -459,11 +459,8 @@ class GitlabConnection(CachedBranchConnection):
if hasattr(self, 'gitlab_event_connector'):
self._stop_event_connector()
- def maintainCache(self, relevant):
- for change in self._change_cache:
- if change not in relevant:
- self._change_cache.delete(change.cache_stat.key)
- # TODO: remove entries older than X
+ def maintainCache(self, relevant, max_age):
+ self._change_cache.prune(relevant, max_age)
self._change_cache.cleanup()
def updateChangeAttributes(self, change, **attrs):
diff --git a/zuul/driver/pagure/pagureconnection.py b/zuul/driver/pagure/pagureconnection.py
index 084585d38..016ef82b8 100644
--- a/zuul/driver/pagure/pagureconnection.py
+++ b/zuul/driver/pagure/pagureconnection.py
@@ -545,11 +545,8 @@ class PagureConnection(BaseConnection):
"Fetching project %s webhook token from API" % project)
return token
- def maintainCache(self, relevant):
- for change in self._change_cache:
- if change not in relevant:
- self._change_cache.delete(change.cache_stat.key)
- # TODO: remove entries older than X
+ def maintainCache(self, relevant, max_age):
+ self._change_cache.prune(relevant, max_age)
self._change_cache.cleanup()
def updateChangeAttributes(self, change, **attrs):
diff --git a/zuul/model.py b/zuul/model.py
index c3918857c..d1b248e87 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -3570,7 +3570,8 @@ class Bundle:
# Cache info of a ref
-CacheStat = namedtuple("CacheStat", ["key", "uuid", "version"])
+CacheStat = namedtuple("CacheStat",
+ ["key", "uuid", "version", "last_modified"])
class Ref(object):
@@ -3583,7 +3584,7 @@ class Ref(object):
self.newrev = None
self.files = []
# Cache info about this ref:
- # CacheStat(cache key, uuid, version)
+ # CacheStat(cache key, uuid, version, last_modified)
self.cache_stat = None
@property
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 8cfaa0966..7dad6432f 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -543,6 +543,7 @@ class Scheduler(threading.Thread):
self._runConfigCacheCleanup()
self._runExecutorApiCleanup()
self._runMergerApiCleanup()
+ self.maintainConnectionCache()
finally:
self.general_cleanup_lock.release()
# This has its own locking
@@ -1273,8 +1274,6 @@ class Scheduler(threading.Thread):
if old_tenant:
self._reenqueueTenant(old_tenant, tenant)
- # TODOv3(jeblair): update for tenants
- # self.maintainConnectionCache()
self.connections.reconfigureDrivers(tenant)
# TODOv3(jeblair): remove postconfig calls?
@@ -1604,21 +1603,22 @@ class Scheduler(threading.Thread):
pipeline.state = pipeline.STATE_NORMAL
def maintainConnectionCache(self):
- # TODOv3(jeblair): update for tenants
relevant = set()
- for tenant in self.abide.tenants.values():
- for pipeline in tenant.layout.pipelines.values():
- self.log.debug("Gather relevant cache items for: %s" %
- pipeline)
-
- for item in pipeline.getAllItems():
- relevant.add(item.change)
- relevant.update(item.change.getRelatedChanges())
- for connection in self.connections.values():
- connection.maintainCache(relevant)
- self.log.debug(
- "End maintain connection cache for: %s" % connection)
- self.log.debug("Connection cache size: %s" % len(relevant))
+ with self.layout_lock:
+ for tenant in self.abide.tenants.values():
+ for pipeline in tenant.layout.pipelines.values():
+ self.log.debug("Gather relevant cache items for: %s",
+ pipeline)
+ for item in pipeline.getAllItems():
+ relevant.add(item.change.cache_stat.key)
+ relevant.update(item.change.getRelatedChanges())
+
+ # We'll only remove changes older than `max_age` from the cache, as
+ # it may take a while for an event that was processed by a connection
+ # (which updated/populated the cache) to end up in a pipeline.
+ for connection in self.connections.connections.values():
+ connection.maintainCache(relevant, max_age=7200) # 2h
+ self.log.debug("End maintain connection cache for: %s", connection)
def process_tenant_trigger_queue(self, tenant):
try:
diff --git a/zuul/zk/change_cache.py b/zuul/zk/change_cache.py
index 55bb68ff8..dea69013c 100644
--- a/zuul/zk/change_cache.py
+++ b/zuul/zk/change_cache.py
@@ -18,6 +18,7 @@ import json
import logging
import os
import threading
+import time
import uuid
from collections import defaultdict
from collections.abc import Iterable
@@ -127,11 +128,18 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
data_uuid = data.decode("utf8")
self._get(key, data_uuid, zstat)
+ def prune(self, relevant, max_age=3600): # 1h
+ cutoff_time = time.time() - max_age
+ outdated = {c.cache_stat.key for c in list(self._change_cache.values())
+ if c.cache_stat.last_modified < cutoff_time}
+ to_prune = outdated - set(relevant)
+ for key in to_prune:
+ self.delete(key)
+
def cleanup(self):
valid_uuids = {c.cache_stat.uuid
for c in list(self._change_cache.values())}
stale_uuids = self._data_cleanup_candidates - valid_uuids
- self.log.debug("Cleaning up stale data: %s", stale_uuids)
for data_uuid in stale_uuids:
self.kazoo_client.delete(self._dataPath(data_uuid), recursive=True)
@@ -185,7 +193,8 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
else:
change = self._changeFromData(data)
- change.cache_stat = model.CacheStat(key, data_uuid, zstat.version)
+ change.cache_stat = model.CacheStat(key, data_uuid, zstat.version,
+ zstat.last_modified)
# Use setdefault here so we only have a single instance of a change
# around. In case of a concurrent get this might return a different
# change instance than the one we just created.
@@ -220,7 +229,8 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
except (BadVersionError, NodeExistsError, NoNodeError) as exc:
raise ConcurrentUpdateError from exc
- change.cache_stat = model.CacheStat(key, data_uuid, zstat.version)
+ change.cache_stat = model.CacheStat(
+ key, data_uuid, zstat.version, zstat.last_modified)
self._change_cache[key] = change
def _setData(self, data):