summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2021-07-02 11:38:58 -0700
committerJames E. Blair <jim@acmegating.com>2021-07-02 15:22:29 -0700
commit10966948d723ea75ca845f77d22b8623cb44eba4 (patch)
tree2f0fcd1e721ee482a2f67bb00dada2e34f25825f
parent4f0ecff3a3d6f0cbe389e2ff78a682d73b4ef52a (diff)
downloadzuul-10966948d723ea75ca845f77d22b8623cb44eba4.tar.gz
Add ExistingDataWatch class
This adds a modified kazoo.DataWatch class which does not set watches on paths that don't exist. This is mostly so that when a DataWatched path is deleted, we don't leave a watch in place. Without that, the executor api will leak watches on the ZK cluster. We vendor the entire watches.py file from kazoo (ASL2 licensed) to avoid any issues with API changes. Separately we will see if they are interested in this class upstream. This updates the tests to use the wchp 4lw to test that we don't leak watches. Depends-On: https://review.opendev.org/c/zuul/zuul-jobs/+/799334 Change-Id: Ie4491eef03b58d858d95c78ba9454839d169cff1
-rw-r--r--tests/unit/test_zk.py23
-rw-r--r--tools/zoo.cfg1
-rw-r--r--zuul/zk/executor.py14
-rw-r--r--zuul/zk/watchers.py482
4 files changed, 518 insertions, 2 deletions
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py
index 61c4966f4..9318ba0dd 100644
--- a/tests/unit/test_zk.py
+++ b/tests/unit/test_zk.py
@@ -320,6 +320,27 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
items.extend(self._get_zk_tree(path))
return items
+ def _get_watches(self):
+ chroot = self.zk_chroot_fixture.zookeeper_chroot
+ data = self.zk_client.client.command(b'wchp')
+ ret = {}
+ sessions = None
+ for line in data.split('\n'):
+ if line.startswith('\t'):
+ if sessions is not None:
+ sessions.append(line.strip())
+ else:
+ line = line.strip()
+ if not line:
+ continue
+ if line.startswith(chroot):
+ line = line[len(chroot):]
+ sessions = []
+ ret[line] = sessions
+ else:
+ sessions = None
+ return ret
+
def test_build_request(self):
# Test the lifecycle of a build request
request_queue = queue.Queue()
@@ -398,6 +419,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
server.fulfillCancel(a)
server.unlock(a)
self.assertEqual(client.get(a.path).state, BuildRequest.COMPLETED)
+ self.assertNotEqual(self._get_watches(), {})
# Scheduler removes build request on completion
client.remove(sched_a)
@@ -408,6 +430,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
self.assertEqual(self._get_zk_tree(
client.BUILD_REQUEST_ROOT + '/zones'), [])
self.assertEqual(self._get_zk_tree(client.LOCK_ROOT), [])
+ self.assertEqual(self._get_watches(), {})
def test_build_request_remove(self):
# Test the scheduler forcibly removing a request (perhaps the
diff --git a/tools/zoo.cfg b/tools/zoo.cfg
index bf1ac6cf7..7d9f3cee4 100644
--- a/tools/zoo.cfg
+++ b/tools/zoo.cfg
@@ -14,3 +14,4 @@ serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
secureClientPort=2281
ssl.keyStore.location=/var/certs/keystores/zuul-test-zookeeper.pem
ssl.trustStore.location=/var/certs/certs/cacert.pem
+4lw.commands.whitelist=*
diff --git a/zuul/zk/executor.py b/zuul/zk/executor.py
index e11af572e..2effe7063 100644
--- a/zuul/zk/executor.py
+++ b/zuul/zk/executor.py
@@ -27,6 +27,7 @@ from zuul.model import BuildRequest
from zuul.zk import ZooKeeperSimpleBase
from zuul.zk.exceptions import BuildRequestNotFound
from zuul.zk import sharding
+from zuul.zk.watchers import ExistingDataWatch
class BuildRequestEvent(Enum):
@@ -175,8 +176,9 @@ class ExecutorApi(ZooKeeperSimpleBase):
)
for req_path in new_build_requests:
- self.kazoo_client.DataWatch(req_path,
- self._makeBuildStateWatcher(req_path))
+ ExistingDataWatch(self.kazoo_client,
+ req_path,
+ self._makeBuildStateWatcher(req_path))
# Notify the user about new build requests if a callback is provided,
# but only if there are new requests (we don't want to fire on the
@@ -313,6 +315,10 @@ class ExecutorApi(ZooKeeperSimpleBase):
return build_request
def remove(self, build_request):
+ log = get_annotated_logger(
+ self.log, event=None, build=build_request.uuid
+ )
+ log.debug("Removing build request %s", build_request)
try:
# As the build node might contain children (result, data, ...) we
# must delete it recursively.
@@ -326,6 +332,10 @@ class ExecutorApi(ZooKeeperSimpleBase):
self.kazoo_client.delete(path, recursive=True)
except NoNodeError:
pass
+ try:
+ self.kazoo_client.get(build_request.path)
+ except NoNodeError:
+ pass
def _watchBuildEvents(self, actions, event=None):
if event is None:
diff --git a/zuul/zk/watchers.py b/zuul/zk/watchers.py
new file mode 100644
index 000000000..bfb9bc161
--- /dev/null
+++ b/zuul/zk/watchers.py
@@ -0,0 +1,482 @@
+# This file is from the Kazoo project
+# https://github.com/python-zk/kazoo/blob/master/kazoo/recipe/watchers.py
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Higher level child and data watching API's.
+
+:Maintainer: Ben Bangert <ben@groovie.org>
+:Status: Production
+
+.. note::
+
+ :ref:`DataWatch` and :ref:`ChildrenWatch` may only handle a single
+ function, attempts to associate a single instance with multiple functions
+ will result in an exception being thrown.
+
+"""
+from functools import partial, wraps
+import logging
+import time
+import warnings
+
+from kazoo.exceptions import (
+ ConnectionClosedError,
+ NoNodeError,
+ KazooException
+)
+from kazoo.protocol.states import KazooState
+from kazoo.retry import KazooRetry
+
+
+log = logging.getLogger(__name__)
+
+
+_STOP_WATCHING = object()
+
+
+def _ignore_closed(func):
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except ConnectionClosedError:
+ pass
+ return wrapper
+
+
+class DataWatch(object):
+ """Watches a node for data updates and calls the specified
+ function each time it changes
+
+ The function will also be called the very first time its
+ registered to get the data.
+
+ Returning `False` from the registered function will disable future
+ data change calls. If the client connection is closed (using the
+ close command), the DataWatch will no longer get updates.
+
+ If the function supplied takes three arguments, then the third one
+ will be a :class:`~kazoo.protocol.states.WatchedEvent`. It will
+ only be set if the change to the data occurs as a result of the
+ server notifying the watch that there has been a change. Events
+ like reconnection or the first call will not include an event.
+
+ If the node does not exist, then the function will be called with
+ ``None`` for all values.
+
+ .. tip::
+
+ Because :class:`DataWatch` can watch nodes that don't exist, it
+ can be used alternatively as a higher-level Exists watcher that
+ survives reconnections and session loss.
+
+ Example with client:
+
+ .. code-block:: python
+
+ @client.DataWatch('/path/to/watch')
+ def my_func(data, stat):
+ print("Data is %s" % data)
+ print("Version is %s" % stat.version)
+
+ # Above function is called immediately and prints
+
+ # Or if you want the event object
+ @client.DataWatch('/path/to/watch')
+ def my_func(data, stat, event):
+ print("Data is %s" % data)
+ print("Version is %s" % stat.version)
+ print("Event is %s" % event)
+
+ .. versionchanged:: 1.2
+
+ DataWatch now ignores additional arguments that were previously
+ passed to it and warns that they are no longer respected.
+
+ """
+ def __init__(self, client, path, func=None, *args, **kwargs):
+ """Create a data watcher for a path
+
+ :param client: A zookeeper client.
+ :type client: :class:`~kazoo.client.KazooClient`
+ :param path: The path to watch for data changes on.
+ :type path: str
+ :param func: Function to call initially and every time the
+ node changes. `func` will be called with a
+ tuple, the value of the node and a
+ :class:`~kazoo.client.ZnodeStat` instance.
+ :type func: callable
+
+ """
+ self._client = client
+ self._path = path
+ self._func = func
+ self._stopped = False
+ self._run_lock = client.handler.lock_object()
+ self._version = None
+ self._retry = KazooRetry(max_tries=None,
+ sleep_func=client.handler.sleep_func)
+ self._include_event = None
+ self._ever_called = False
+ self._used = False
+
+ if args or kwargs:
+ warnings.warn('Passing additional arguments to DataWatch is'
+ ' deprecated. ignore_missing_node is now assumed '
+ ' to be True by default, and the event will be '
+ ' sent if the function can handle receiving it',
+ DeprecationWarning, stacklevel=2)
+
+ # Register our session listener if we're going to resume
+ # across session losses
+ if func is not None:
+ self._used = True
+ self._client.add_listener(self._session_watcher)
+ self._get_data()
+
+ def __call__(self, func):
+ """Callable version for use as a decorator
+
+ :param func: Function to call initially and every time the
+ data changes. `func` will be called with a
+ tuple, the value of the node and a
+ :class:`~kazoo.client.ZnodeStat` instance.
+ :type func: callable
+
+ """
+ if self._used:
+ raise KazooException(
+ "A function has already been associated with this "
+ "DataWatch instance.")
+
+ self._func = func
+
+ self._used = True
+ self._client.add_listener(self._session_watcher)
+ self._get_data()
+ return func
+
+ def _log_func_exception(self, data, stat, event=None):
+ try:
+ # For backwards compatibility, don't send event to the
+ # callback unless the send_event is set in constructor
+ if not self._ever_called:
+ self._ever_called = True
+ try:
+ result = self._func(data, stat, event)
+ except TypeError:
+ result = self._func(data, stat)
+ if result is False:
+ self._stopped = True
+ self._func = None
+ self._client.remove_listener(self._session_watcher)
+ except Exception as exc:
+ log.exception(exc)
+ raise
+
+ @_ignore_closed
+ def _get_data(self, event=None):
+ # Ensure this runs one at a time, possible because the session
+ # watcher may trigger a run
+ with self._run_lock:
+ if self._stopped:
+ return
+
+ initial_version = self._version
+
+ try:
+ data, stat = self._retry(self._client.get,
+ self._path, self._watcher)
+ except NoNodeError:
+ data = None
+
+ # This will set 'stat' to None if the node does not yet
+ # exist.
+ stat = self._retry(self._client.exists, self._path,
+ self._watcher)
+ if stat:
+ self._client.handler.spawn(self._get_data)
+ return
+
+ # No node data, clear out version
+ if stat is None:
+ self._version = None
+ else:
+ self._version = stat.mzxid
+
+ # Call our function if its the first time ever, or if the
+ # version has changed
+ if initial_version != self._version or not self._ever_called:
+ self._log_func_exception(data, stat, event)
+
+ def _watcher(self, event):
+ self._get_data(event=event)
+
+ def _set_watch(self, state):
+ with self._run_lock:
+ self._watch_established = state
+
+ def _session_watcher(self, state):
+ if state == KazooState.CONNECTED:
+ self._client.handler.spawn(self._get_data)
+
+
+class ChildrenWatch(object):
+ """Watches a node for children updates and calls the specified
+ function each time it changes
+
+ The function will also be called the very first time its
+ registered to get children.
+
+ Returning `False` from the registered function will disable future
+ children change calls. If the client connection is closed (using
+ the close command), the ChildrenWatch will no longer get updates.
+
+ if send_event=True in __init__, then the function will always be
+ called with second parameter, ``event``. Upon initial call or when
+ recovering a lost session the ``event`` is always ``None``.
+ Otherwise it's a :class:`~kazoo.prototype.state.WatchedEvent`
+ instance.
+
+ Example with client:
+
+ .. code-block:: python
+
+ @client.ChildrenWatch('/path/to/watch')
+ def my_func(children):
+ print "Children are %s" % children
+
+ # Above function is called immediately and prints children
+
+ """
+ def __init__(self, client, path, func=None,
+ allow_session_lost=True, send_event=False):
+ """Create a children watcher for a path
+
+ :param client: A zookeeper client.
+ :type client: :class:`~kazoo.client.KazooClient`
+ :param path: The path to watch for children on.
+ :type path: str
+ :param func: Function to call initially and every time the
+ children change. `func` will be called with a
+ single argument, the list of children.
+ :type func: callable
+ :param allow_session_lost: Whether the watch should be
+ re-registered if the zookeeper
+ session is lost.
+ :type allow_session_lost: bool
+ :type send_event: bool
+ :param send_event: Whether the function should be passed the
+ event sent by ZooKeeper or None upon
+ initialization (see class documentation)
+
+ The path must already exist for the children watcher to
+ run.
+
+ """
+ self._client = client
+ self._path = path
+ self._func = func
+ self._send_event = send_event
+ self._stopped = False
+ self._watch_established = False
+ self._allow_session_lost = allow_session_lost
+ self._run_lock = client.handler.lock_object()
+ self._prior_children = None
+ self._used = False
+
+ # Register our session listener if we're going to resume
+ # across session losses
+ if func is not None:
+ self._used = True
+ if allow_session_lost:
+ self._client.add_listener(self._session_watcher)
+ self._get_children()
+
+ def __call__(self, func):
+ """Callable version for use as a decorator
+
+ :param func: Function to call initially and every time the
+ children change. `func` will be called with a
+ single argument, the list of children.
+ :type func: callable
+
+ """
+ if self._used:
+ raise KazooException(
+ "A function has already been associated with this "
+ "ChildrenWatch instance.")
+
+ self._func = func
+
+ self._used = True
+ if self._allow_session_lost:
+ self._client.add_listener(self._session_watcher)
+ self._get_children()
+ return func
+
+ @_ignore_closed
+ def _get_children(self, event=None):
+ with self._run_lock: # Ensure this runs one at a time
+ if self._stopped:
+ return
+
+ try:
+ children = self._client.retry(self._client.get_children,
+ self._path, self._watcher)
+ except NoNodeError:
+ self._stopped = True
+ return
+
+ if not self._watch_established:
+ self._watch_established = True
+
+ if self._prior_children is not None and \
+ self._prior_children == children:
+ return
+
+ self._prior_children = children
+
+ try:
+ if self._send_event:
+ result = self._func(children, event)
+ else:
+ result = self._func(children)
+ if result is False:
+ self._stopped = True
+ self._func = None
+ if self._allow_session_lost:
+ self._client.remove_listener(self._session_watcher)
+ except Exception as exc:
+ log.exception(exc)
+ raise
+
+ def _watcher(self, event):
+ if event.type != "NONE":
+ self._get_children(event)
+
+ def _session_watcher(self, state):
+ if state in (KazooState.LOST, KazooState.SUSPENDED):
+ self._watch_established = False
+ elif (state == KazooState.CONNECTED and
+ not self._watch_established and not self._stopped):
+ self._client.handler.spawn(self._get_children)
+
+
+class PatientChildrenWatch(object):
+ """Patient Children Watch that returns values after the children
+ of a node don't change for a period of time
+
+ A separate watcher for the children of a node, that ignores
+ changes within a boundary time and sets the result only when the
+ boundary time has elapsed with no children changes.
+
+ Example::
+
+ watcher = PatientChildrenWatch(client, '/some/path',
+ time_boundary=5)
+ async_object = watcher.start()
+
+ # Blocks until the children have not changed for time boundary
+ # (5 in this case) seconds, returns children list and an
+ # async_result that will be set if the children change in the
+ # future
+ children, child_async = async_object.get()
+
+ .. note::
+
+ This Watch is different from :class:`DataWatch` and
+ :class:`ChildrenWatch` as it only returns once, does not take
+ a function that is called, and provides an
+ :class:`~kazoo.interfaces.IAsyncResult` object that can be
+ checked to see if the children have changed later.
+
+ """
+ def __init__(self, client, path, time_boundary=30):
+ self.client = client
+ self.path = path
+ self.children = []
+ self.time_boundary = time_boundary
+ self.children_changed = client.handler.event_object()
+
+ def start(self):
+ """Begin the watching process asynchronously
+
+ :returns: An :class:`~kazoo.interfaces.IAsyncResult` instance
+ that will be set when no change has occurred to the
+ children for time boundary seconds.
+
+ """
+ self.asy = asy = self.client.handler.async_result()
+ self.client.handler.spawn(self._inner_start)
+ return asy
+
+ def _inner_start(self):
+ try:
+ while True:
+ async_result = self.client.handler.async_result()
+ self.children = self.client.retry(
+ self.client.get_children, self.path,
+ partial(self._children_watcher, async_result))
+ self.client.handler.sleep_func(self.time_boundary)
+
+ if self.children_changed.is_set():
+ self.children_changed.clear()
+ else:
+ break
+
+ self.asy.set((self.children, async_result))
+ except Exception as exc:
+ self.asy.set_exception(exc)
+
+ def _children_watcher(self, async_result, event):
+ self.children_changed.set()
+ async_result.set(time.time())
+
+
+# Local modifications below:
+#
+class ExistingDataWatch(DataWatch):
+ @_ignore_closed
+ def _get_data(self, event=None):
+ # Ensure this runs one at a time, possible because the session
+ # watcher may trigger a run
+ with self._run_lock:
+ if self._stopped:
+ return
+
+ initial_version = self._version
+
+ try:
+ data, stat = self._retry(self._client.get,
+ self._path, self._watcher)
+ except NoNodeError:
+ data = stat = None
+
+ # No node data, clear out version
+ if stat is None:
+ self._version = None
+ else:
+ self._version = stat.mzxid
+
+ # Call our function if its the first time ever, or if the
+ # version has changed
+ if initial_version != self._version or not self._ever_called:
+ self._log_func_exception(data, stat, event)
+
+ # If the node doesn't exist, we won't be watching any more
+ if stat is None:
+ self._stopped = True
+ self._func = None
+ self._client.remove_listener(self._session_watcher)