summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Shrewsbury <dshrewsb@redhat.com>2019-06-05 15:42:18 -0400
committerDavid Shrewsbury <dshrewsb@redhat.com>2019-09-16 10:46:36 -0400
commitf6b6991af29d18f5454c579aeef1f2297e6ba335 (patch)
tree475325c5a7cd9733113ecacfcd634e299b30535e
parent716ac1f2e18394dcb47ac6e02e12313b6fdf18ec (diff)
downloadzuul-f6b6991af29d18f5454c579aeef1f2297e6ba335.tar.gz
Add caching of autohold requests
Change-Id: I94d4a0d2e8630d360ad7c5d07690b6ed33b22f75
-rw-r--r--tests/base.py6
-rw-r--r--tests/nodepool/test_nodepool_integration.py2
-rw-r--r--tests/unit/test_nodepool.py2
-rw-r--r--tests/unit/test_zk.py2
-rwxr-xr-xzuul/cmd/scheduler.py2
-rw-r--r--zuul/model.py14
-rwxr-xr-xzuul/web/__init__.py2
-rw-r--r--zuul/zk.py82
8 files changed, 104 insertions, 8 deletions
diff --git a/tests/base.py b/tests/base.py
index 4dcafe003..c3ff97cd4 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -2953,7 +2953,7 @@ class ZuulTestCase(BaseTestCase):
self.merge_client = RecordingMergeClient(self.config, self.sched)
self.merge_server = None
self.nodepool = zuul.nodepool.Nodepool(self.sched)
- self.zk = zuul.zk.ZooKeeper()
+ self.zk = zuul.zk.ZooKeeper(enable_cache=True)
self.zk.connect(self.zk_config, timeout=30.0)
self.fake_nodepool = FakeNodepool(
@@ -3371,8 +3371,10 @@ class ZuulTestCase(BaseTestCase):
'socketserver_Thread',
'GerritWebServer',
]
+ # Ignore Kazoo TreeCache threads that start with "Thread-"
threads = [t for t in threading.enumerate()
- if t.name not in whitelist]
+ if t.name not in whitelist
+ and not t.name.startswith("Thread-")]
if len(threads) > 1:
log_str = ""
for thread_id, stack_frame in sys._current_frames().items():
diff --git a/tests/nodepool/test_nodepool_integration.py b/tests/nodepool/test_nodepool_integration.py
index b608bba78..bb6c8abaa 100644
--- a/tests/nodepool/test_nodepool_integration.py
+++ b/tests/nodepool/test_nodepool_integration.py
@@ -31,7 +31,7 @@ class TestNodepoolIntegration(BaseTestCase):
super(TestNodepoolIntegration, self).setUp()
self.statsd = None
- self.zk = zuul.zk.ZooKeeper()
+ self.zk = zuul.zk.ZooKeeper(enable_cache=True)
self.addCleanup(self.zk.disconnect)
self.zk.connect('localhost:2181')
self.hostname = socket.gethostname()
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index e822a1024..2326b1b1b 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -37,7 +37,7 @@ class TestNodepool(BaseTestCase):
self.zk_chroot_fixture.zookeeper_port,
self.zk_chroot_fixture.zookeeper_chroot)
- self.zk = zuul.zk.ZooKeeper()
+ self.zk = zuul.zk.ZooKeeper(enable_cache=True)
self.addCleanup(self.zk.disconnect)
self.zk.connect(self.zk_config)
self.hostname = 'nodepool-test-hostname'
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py
index 8a485457f..d9942a90a 100644
--- a/tests/unit/test_zk.py
+++ b/tests/unit/test_zk.py
@@ -33,7 +33,7 @@ class TestZK(BaseTestCase):
self.zk_chroot_fixture.zookeeper_port,
self.zk_chroot_fixture.zookeeper_chroot)
- self.zk = zuul.zk.ZooKeeper()
+ self.zk = zuul.zk.ZooKeeper(enable_cache=True)
self.addCleanup(self.zk.disconnect)
self.zk.connect(self.zk_config)
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index 67484bf87..1eefbe713 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -136,7 +136,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
merger = zuul.merger.client.MergeClient(self.config, self.sched)
nodepool = zuul.nodepool.Nodepool(self.sched)
- zookeeper = zuul.zk.ZooKeeper()
+ zookeeper = zuul.zk.ZooKeeper(enable_cache=True)
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None)
if not zookeeper_hosts:
raise Exception("The zookeeper hosts config value is required")
diff --git a/zuul/model.py b/zuul/model.py
index 9219fc7df..0191d2c0e 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -4647,6 +4647,7 @@ class WebInfo(object):
class HoldRequest(object):
def __init__(self):
self.lock = None
+ self.stat = None
self.id = None
self.tenant = None
self.project = None
@@ -4694,6 +4695,19 @@ class HoldRequest(object):
d['node_expiration'] = self.node_expiration
return d
+ def updateFromDict(self, d):
+ '''
+ Update current object with data from the given dictionary.
+ '''
+ self.tenant = d.get('tenant')
+ self.project = d.get('project')
+ self.job = d.get('job')
+ self.ref_filter = d.get('ref_filter')
+ self.max_count = d.get('max_count', 1)
+ self.current_count = d.get('current_count', 0)
+ self.reason = d.get('reason')
+ self.node_expiration = d.get('node_expiration')
+
def serialize(self):
'''
Return a representation of the object as a string.
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
index 331df1e7a..4c5447365 100755
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -984,7 +984,7 @@ class ZuulWeb(object):
# instanciate handlers
self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
ssl_key, ssl_cert, ssl_ca)
- self.zk = zuul.zk.ZooKeeper()
+ self.zk = zuul.zk.ZooKeeper(enable_cache=True)
if zk_hosts:
self.zk.connect(hosts=zk_hosts, read_only=True)
self.connections = connections
diff --git a/zuul/zk.py b/zuul/zk.py
index 4a1aa09dc..e3691ce95 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -17,6 +17,7 @@ import time
from kazoo.client import KazooClient, KazooState
from kazoo import exceptions as kze
from kazoo.handlers.threading import KazooTimeoutError
+from kazoo.recipe.cache import TreeCache, TreeEvent
from kazoo.recipe.lock import Lock
import zuul.model
@@ -48,13 +49,26 @@ class ZooKeeper(object):
# Log zookeeper retry every 10 seconds
retry_log_rate = 10
- def __init__(self):
+ def __init__(self, enable_cache=True):
'''
Initialize the ZooKeeper object.
+
+ :param bool enable_cache: When True, enables caching of ZooKeeper
+ objects (e.g., HoldRequests).
'''
self.client = None
self._became_lost = False
self._last_retry_log = 0
+ self.enable_cache = enable_cache
+
+ # The caching model we use is designed around handing out model
+ # data as objects. To do this, we use two caches: one is a TreeCache
+ # which contains raw znode data (among other details), and one for
+ # storing that data serialized as objects. This allows us to return
+ # objects from the APIs, and avoids calling the methods to serialize
+ # the data into objects more than once.
+ self._hold_request_tree = None
+ self._cached_hold_requests = {}
def _dictToStr(self, data):
return json.dumps(data).encode('utf8')
@@ -126,6 +140,67 @@ class ZooKeeper(object):
except KazooTimeoutError:
self.logConnectionRetryEvent()
+ if self.enable_cache:
+ self._hold_request_tree = TreeCache(self.client,
+ self.HOLD_REQUEST_ROOT)
+ self._hold_request_tree.listen_fault(self.cacheFaultListener)
+ self._hold_request_tree.listen(self.holdRequestCacheListener)
+ self._hold_request_tree.start()
+
+ def cacheFaultListener(self, e):
+ self.log.exception(e)
+
+ def holdRequestCacheListener(self, event):
+ '''
+ Keep the hold request object cache in sync with the TreeCache.
+ '''
+ try:
+ self._holdRequestCacheListener(event)
+ except Exception:
+ self.log.exception(
+ "Exception in hold request cache update for event: %s", event)
+
+ def _holdRequestCacheListener(self, event):
+ if hasattr(event.event_data, 'path'):
+ # Ignore root node
+ path = event.event_data.path
+ if path == self.HOLD_REQUEST_ROOT:
+ return
+
+ if event.event_type not in (TreeEvent.NODE_ADDED,
+ TreeEvent.NODE_UPDATED,
+ TreeEvent.NODE_REMOVED):
+ return
+
+ path = event.event_data.path
+ request_id = path.rsplit('/', 1)[1]
+
+ if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
+ # Requests with no data are invalid
+ if not event.event_data.data:
+ return
+
+ # Perform an in-place update of the already cached request
+ d = self._bytesToDict(event.event_data.data)
+ old_request = self._cached_hold_requests.get(request_id)
+ if old_request:
+ if event.event_data.stat.version <= old_request.stat.version:
+ # Don't update to older data
+ return
+ old_request.updateFromDict(d)
+ old_request.stat = event.event_data.stat
+ else:
+ request = zuul.model.HoldRequest.fromDict(d)
+ request.id = request_id
+ request.stat = event.event_data.stat
+ self._cached_hold_requests[request_id] = request
+
+ elif event.event_type == TreeEvent.NODE_REMOVED:
+ try:
+ del self._cached_hold_requests[request_id]
+ except KeyError:
+ pass
+
def disconnect(self):
'''
Close the ZooKeeper cluster connection.
@@ -133,6 +208,10 @@ class ZooKeeper(object):
You should call this method if you used connect() to establish a
cluster connection.
'''
+ if self._hold_request_tree is not None:
+ self._hold_request_tree.close()
+ self._hold_request_tree = None
+
if self.client is not None and self.client.connected:
self.client.stop()
self.client.close()
@@ -480,6 +559,7 @@ class ZooKeeper(object):
obj = zuul.model.HoldRequest.fromDict(self._strToDict(data))
obj.id = hold_request_id
+ obj.stat = stat
return obj
def storeHoldRequest(self, hold_request):