summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.zuul.yaml2
-rw-r--r--doc/source/examples/zoo.cfg2
-rw-r--r--playbooks/common/post-system-logs.yaml5
-rw-r--r--tests/base.py15
-rw-r--r--tests/nodepool/test_nodepool_integration.py10
-rw-r--r--tests/unit/test_nodepool.py18
-rw-r--r--tests/unit/test_scheduler.py48
-rw-r--r--tests/unit/test_zk.py37
-rwxr-xr-xtools/test-setup.sh2
-rwxr-xr-xzuul/cmd/scheduler.py14
-rw-r--r--zuul/nodepool.py39
-rw-r--r--zuul/scheduler.py29
-rwxr-xr-xzuul/web/__init__.py24
-rw-r--r--zuul/zk/__init__.py153
-rw-r--r--zuul/zk/exceptions.py27
-rw-r--r--zuul/zk/nodepool.py (renamed from zuul/zk.py)858
16 files changed, 683 insertions, 600 deletions
diff --git a/.zuul.yaml b/.zuul.yaml
index 0e99c9f41..e35aa3594 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -236,6 +236,7 @@
tox_environment:
ZUUL_TEST_ROOT: /tmp/zuul-test
YARN_REGISTRY: "https://{{ zuul_site_mirror_fqdn }}:4443/registry.npmjs"
+ post-run: playbooks/common/post-system-logs.yaml
- tox-py38:
irrelevant-files:
- zuul/cmd/migrate.py
@@ -243,6 +244,7 @@
timeout: 4800 # 80 minutes
nodeset: ubuntu-bionic
vars: *zuul_tox_vars
+ post-run: playbooks/common/post-system-logs.yaml
- zuul-build-dashboard-openstack-whitelabel
- zuul-build-dashboard-software-factory
- zuul-build-dashboard-opendev
diff --git a/doc/source/examples/zoo.cfg b/doc/source/examples/zoo.cfg
index 4d4fcc3ea..7f0cbf7cb 100644
--- a/doc/source/examples/zoo.cfg
+++ b/doc/source/examples/zoo.cfg
@@ -5,7 +5,7 @@ initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
-maxClientCnxns=60
+maxClientCnxns=1000
standaloneEnabled=true
admin.enableServer=true
server.1=examples_zk_1.examples_default:2888:3888
diff --git a/playbooks/common/post-system-logs.yaml b/playbooks/common/post-system-logs.yaml
new file mode 100644
index 000000000..830899c38
--- /dev/null
+++ b/playbooks/common/post-system-logs.yaml
@@ -0,0 +1,5 @@
+- hosts: all
+ tasks:
+
+ - name: Collect zookeeper logs
+ shell: "cp /var/log/zookeeper/zookeeper.log {{ zuul_output_dir }}/logs/zookeeper.log"
diff --git a/tests/base.py b/tests/base.py
index 3d87593ac..bb1cf2338 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -111,10 +111,10 @@ import zuul.merger.server
import zuul.model
import zuul.nodepool
import zuul.rpcclient
-import zuul.zk
import zuul.configloader
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
+from zuul.zk import ZooKeeperClient
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'fixtures')
@@ -3629,13 +3629,14 @@ class ChrootedKazooFixture(fixtures.Fixture):
for x in range(8))
rand_test_path = '%s_%s_%s' % (random_bits, os.getpid(), self.test_id)
- self.zookeeper_chroot = "/nodepool_test/%s" % rand_test_path
+ self.zookeeper_chroot = f"/test/{rand_test_path}"
self.addCleanup(self._cleanup)
# Ensure the chroot path exists and clean up any pre-existing znodes.
_tmp_client = kazoo.client.KazooClient(
- hosts='%s:%s' % (self.zookeeper_host, self.zookeeper_port))
+ hosts=f'{self.zookeeper_host}:{self.zookeeper_port}', timeout=10
+ )
_tmp_client.start()
if _tmp_client.exists(self.zookeeper_chroot):
@@ -3992,13 +3993,13 @@ class SchedulerTestApp:
self.config, self.sched)
merge_client = RecordingMergeClient(self.config, self.sched)
nodepool = zuul.nodepool.Nodepool(self.sched)
- zk = zuul.zk.ZooKeeper(enable_cache=True)
- zk.connect(self.zk_config, timeout=30.0)
+ zk_client = ZooKeeperClient()
+ zk_client.connect(self.zk_config, timeout=30.0)
self.sched.setExecutor(executor_client)
self.sched.setMerger(merge_client)
self.sched.setNodepool(nodepool)
- self.sched.setZooKeeper(zk)
+ self.sched.setZooKeeper(zk_client)
self.sched.start()
executor_client.gearman.waitForServer()
@@ -4626,7 +4627,7 @@ class ZuulTestCase(BaseTestCase):
self.rpcclient.shutdown()
self.gearman_server.shutdown()
self.fake_nodepool.stop()
- self.scheds.execute(lambda app: app.sched.zk.disconnect())
+ self.scheds.execute(lambda app: app.sched.zk_client.disconnect())
self.printHistory()
# We whitelist watchdog threads as they have relatively long delays
# before noticing they should exit, but they should exit on their own.
diff --git a/tests/nodepool/test_nodepool_integration.py b/tests/nodepool/test_nodepool_integration.py
index bb6c8abaa..18fcbc179 100644
--- a/tests/nodepool/test_nodepool_integration.py
+++ b/tests/nodepool/test_nodepool_integration.py
@@ -31,9 +31,9 @@ class TestNodepoolIntegration(BaseTestCase):
super(TestNodepoolIntegration, self).setUp()
self.statsd = None
- self.zk = zuul.zk.ZooKeeper(enable_cache=True)
- self.addCleanup(self.zk.disconnect)
- self.zk.connect('localhost:2181')
+ self.zk_client = zuul.zk.ZooKeeperClient()
+ self.addCleanup(self.zk_client.disconnect)
+ self.zk_client.connect('localhost:2181')
self.hostname = socket.gethostname()
self.provisioned_requests = []
@@ -104,8 +104,8 @@ class TestNodepoolIntegration(BaseTestCase):
job.nodeset = nodeset
self.fake_nodepool.paused = True
request = self.nodepool.requestNodes(None, job, 0)
- self.zk.client.stop()
- self.zk.client.start()
+ self.zk_client.client.stop()
+ self.zk_client.client.start()
self.fake_nodepool.paused = False
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index 2326b1b1b..8b9a9adbe 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -15,11 +15,12 @@
import time
-import zuul.zk
-import zuul.nodepool
from zuul import model
+import zuul.nodepool
from tests.base import BaseTestCase, ChrootedKazooFixture, FakeNodepool
+from zuul.zk import ZooKeeperClient
+from zuul.zk.nodepool import ZooKeeperNodepool
class TestNodepool(BaseTestCase):
@@ -37,9 +38,10 @@ class TestNodepool(BaseTestCase):
self.zk_chroot_fixture.zookeeper_port,
self.zk_chroot_fixture.zookeeper_chroot)
- self.zk = zuul.zk.ZooKeeper(enable_cache=True)
- self.addCleanup(self.zk.disconnect)
- self.zk.connect(self.zk_config)
+ self.zk_client = ZooKeeperClient()
+ self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
+ self.addCleanup(self.zk_client.disconnect)
+ self.zk_client.connect(self.zk_config)
self.hostname = 'nodepool-test-hostname'
self.provisioned_requests = []
@@ -105,8 +107,8 @@ class TestNodepool(BaseTestCase):
job.nodeset = nodeset
self.fake_nodepool.pause()
request = self.nodepool.requestNodes(None, job, 0)
- self.zk.client.stop()
- self.zk.client.start()
+ self.zk_client.client.stop()
+ self.zk_client.client.start()
self.fake_nodepool.unpause()
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
@@ -161,7 +163,7 @@ class TestNodepool(BaseTestCase):
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
- self.zk.deleteNodeRequest(request)
+ self.zk_nodepool.deleteNodeRequest(request)
# Accept the nodes
self.nodepool.acceptNodes(request, request.id)
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 986091158..a58e056e6 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -190,9 +190,10 @@ class TestSchedulerAutoholdHoldExpiration(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
- request_list = self.scheds.first.sched.zk.getHoldRequests()
+ request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
- request = self.scheds.first.sched.zk.getHoldRequest(request_list[0])
+ request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
+ request_list[0])
self.assertIsNotNone(request)
self.assertEqual('tenant-one', request.tenant)
self.assertEqual('review.example.com/org/project', request.project)
@@ -220,9 +221,10 @@ class TestSchedulerAutoholdHoldExpiration(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
- request_list = self.scheds.first.sched.zk.getHoldRequests()
+ request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
- request = self.scheds.first.sched.zk.getHoldRequest(request_list[0])
+ request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
+ request_list[0])
self.assertIsNotNone(request)
self.assertEqual('tenant-one', request.tenant)
self.assertEqual('review.example.com/org/project', request.project)
@@ -251,9 +253,10 @@ class TestSchedulerAutoholdHoldExpiration(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
- request_list = self.scheds.first.sched.zk.getHoldRequests()
+ request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
- request = self.scheds.first.sched.zk.getHoldRequest(request_list[0])
+ request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
+ request_list[0])
self.assertIsNotNone(request)
self.assertEqual('tenant-one', request.tenant)
self.assertEqual('review.example.com/org/project', request.project)
@@ -1766,9 +1769,10 @@ class TestScheduler(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
- request_list = self.scheds.first.sched.zk.getHoldRequests()
+ request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
- request = self.scheds.first.sched.zk.getHoldRequest(request_list[0])
+ request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
+ request_list[0])
self.assertIsNotNone(request)
self.assertEqual('tenant-one', request.tenant)
self.assertEqual('review.example.com/org/project', request.project)
@@ -1827,7 +1831,8 @@ class TestScheduler(ZuulTestCase):
# The hold request current_count should have incremented
# and we should have recorded the held node ID.
- request2 = self.scheds.first.sched.zk.getHoldRequest(request.id)
+ request2 = self.scheds.first.sched.zk_nodepool.getHoldRequest(
+ request.id)
self.assertEqual(request.current_count + 1, request2.current_count)
self.assertEqual(1, len(request2.nodes))
self.assertEqual(1, len(request2.nodes[0]["nodes"]))
@@ -1849,11 +1854,12 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(held_nodes, 1)
# request current_count should not have changed
- request3 = self.scheds.first.sched.zk.getHoldRequest(request2.id)
+ request3 = self.scheds.first.sched.zk_nodepool.getHoldRequest(
+ request2.id)
self.assertEqual(request2.current_count, request3.current_count)
# Deleting hold request should set held nodes to used
- self.scheds.first.sched.zk.deleteHoldRequest(request3)
+ self.scheds.first.sched.zk_nodepool.deleteHoldRequest(request3)
node_states = [n['state'] for n in self.fake_nodepool.getNodes()]
self.assertEqual(3, len(node_states))
self.assertEqual([zuul.model.STATE_USED] * 3, node_states)
@@ -1873,9 +1879,10 @@ class TestScheduler(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
- request_list = self.scheds.first.sched.zk.getHoldRequests()
+ request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
- request = self.scheds.first.sched.zk.getHoldRequest(request_list[0])
+ request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
+ request_list[0])
self.assertIsNotNone(request)
request = client.autohold_info(request.id)
@@ -1897,14 +1904,15 @@ class TestScheduler(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
- request_list = self.scheds.first.sched.zk.getHoldRequests()
+ request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
- request = self.scheds.first.sched.zk.getHoldRequest(request_list[0])
+ request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
+ request_list[0])
self.assertIsNotNone(request)
# Delete and verify no more requests
self.assertTrue(client.autohold_delete(request.id))
- request_list = self.scheds.first.sched.zk.getHoldRequests()
+ request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual([], request_list)
def _test_autohold_scoped(self, change_obj, change, ref):
@@ -5783,8 +5791,8 @@ For CI problems and help debugging, contact ci@example.org"""
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
- self.scheds.execute(lambda app: app.sched.zk.client.stop())
- self.scheds.execute(lambda app: app.sched.zk.client.start())
+ self.scheds.execute(lambda app: app.sched.zk_client.client.stop())
+ self.scheds.execute(lambda app: app.sched.zk_client.client.start())
self.fake_nodepool.unpause()
self.waitUntilSettled()
@@ -5819,8 +5827,8 @@ For CI problems and help debugging, contact ci@example.org"""
# The request is fulfilled, but the scheduler hasn't processed
# it yet. Reconnect ZK.
- self.scheds.execute(lambda app: app.sched.zk.client.stop())
- self.scheds.execute(lambda app: app.sched.zk.client.start())
+ self.scheds.execute(lambda app: app.sched.zk_client.client.stop())
+ self.scheds.execute(lambda app: app.sched.zk_client.client.start())
# Allow the scheduler to continue and process the (now
# out-of-date) notification that nodes are ready.
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py
index d9942a90a..6bc2f3028 100644
--- a/tests/unit/test_zk.py
+++ b/tests/unit/test_zk.py
@@ -15,10 +15,12 @@
import testtools
-import zuul.zk
from zuul import model
+import zuul.zk.exceptions
from tests.base import BaseTestCase, ChrootedKazooFixture
+from zuul.zk import ZooKeeperClient
+from zuul.zk.nodepool import ZooKeeperNodepool
class TestZK(BaseTestCase):
@@ -33,9 +35,10 @@ class TestZK(BaseTestCase):
self.zk_chroot_fixture.zookeeper_port,
self.zk_chroot_fixture.zookeeper_chroot)
- self.zk = zuul.zk.ZooKeeper(enable_cache=True)
- self.addCleanup(self.zk.disconnect)
- self.zk.connect(self.zk_config)
+ self.zk_client = ZooKeeperClient()
+ self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
+ self.addCleanup(self.zk_client.disconnect)
+ self.zk_client.connect(self.zk_config)
def _createRequest(self):
req = model.HoldRequest()
@@ -46,37 +49,37 @@ class TestZK(BaseTestCase):
def test_hold_requests_api(self):
# Test no requests returns empty list
- self.assertEqual([], self.zk.getHoldRequests())
+ self.assertEqual([], self.zk_nodepool.getHoldRequests())
# Test get on non-existent request is None
- self.assertIsNone(self.zk.getHoldRequest('anything'))
+ self.assertIsNone(self.zk_nodepool.getHoldRequest('anything'))
# Test creating a new request
req1 = self._createRequest()
- self.zk.storeHoldRequest(req1)
+ self.zk_nodepool.storeHoldRequest(req1)
self.assertIsNotNone(req1.id)
- self.assertEqual(1, len(self.zk.getHoldRequests()))
+ self.assertEqual(1, len(self.zk_nodepool.getHoldRequests()))
# Test getting the request
- req2 = self.zk.getHoldRequest(req1.id)
+ req2 = self.zk_nodepool.getHoldRequest(req1.id)
self.assertEqual(req1.toDict(), req2.toDict())
# Test updating the request
req2.reason = 'a new reason'
- self.zk.storeHoldRequest(req2)
- req2 = self.zk.getHoldRequest(req2.id)
+ self.zk_nodepool.storeHoldRequest(req2)
+ req2 = self.zk_nodepool.getHoldRequest(req2.id)
self.assertNotEqual(req1.reason, req2.reason)
# Test lock operations
- self.zk.lockHoldRequest(req2, blocking=False)
+ self.zk_nodepool.lockHoldRequest(req2, blocking=False)
with testtools.ExpectedException(
- zuul.zk.LockException,
+ zuul.zk.exceptions.LockException,
"Timeout trying to acquire lock .*"
):
- self.zk.lockHoldRequest(req2, blocking=True, timeout=2)
- self.zk.unlockHoldRequest(req2)
+ self.zk_nodepool.lockHoldRequest(req2, blocking=True, timeout=2)
+ self.zk_nodepool.unlockHoldRequest(req2)
self.assertIsNone(req2.lock)
# Test deleting the request
- self.zk.deleteHoldRequest(req1)
- self.assertEqual([], self.zk.getHoldRequests())
+ self.zk_nodepool.deleteHoldRequest(req1)
+ self.assertEqual([], self.zk_nodepool.getHoldRequests())
diff --git a/tools/test-setup.sh b/tools/test-setup.sh
index 7416040ae..cb524f9c5 100755
--- a/tools/test-setup.sh
+++ b/tools/test-setup.sh
@@ -11,6 +11,8 @@ TOOLSDIR=$(dirname $0)
sudo service zookeeper stop
DATADIR=$(sed -n -e 's/^dataDir=//p' /etc/zookeeper/conf/zoo.cfg)
sudo mount -t tmpfs -o nodev,nosuid,size=500M none $DATADIR
+echo "autopurge.purgeInterval=1" | sudo tee -a /etc/zookeeper/conf/zoo.cfg
+echo "maxClientCnxns=1000" | sudo tee -a /etc/zookeeper/conf/zoo.cfg
# Prepare a tmpfs for Zuul test root
if [[ -n "${ZUUL_TEST_ROOT:-}" ]]; then
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index 4602bb783..cbda4fbd8 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -20,13 +20,12 @@ import signal
import zuul.cmd
import zuul.executor.client
+from zuul.lib.config import get_default
+from zuul.lib.statsd import get_statsd_config
import zuul.merger.client
import zuul.nodepool
import zuul.scheduler
-import zuul.zk
-
-from zuul.lib.config import get_default
-from zuul.lib.statsd import get_statsd_config
+from zuul.zk import ZooKeeperClient
class Scheduler(zuul.cmd.ZuulDaemonApp):
@@ -144,7 +143,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
merger = zuul.merger.client.MergeClient(self.config, self.sched)
nodepool = zuul.nodepool.Nodepool(self.sched)
- zookeeper = zuul.zk.ZooKeeper(enable_cache=True)
+ zk_client = ZooKeeperClient()
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None)
if not zookeeper_hosts:
raise Exception("The zookeeper hosts config value is required")
@@ -153,7 +152,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca')
zookeeper_timeout = float(get_default(self.config, 'zookeeper',
'session_timeout', 10.0))
- zookeeper.connect(
+ zk_client.connect(
zookeeper_hosts,
timeout=zookeeper_timeout,
tls_cert=zookeeper_tls_cert,
@@ -164,7 +163,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.sched.setExecutor(gearman)
self.sched.setMerger(merger)
self.sched.setNodepool(nodepool)
- self.sched.setZooKeeper(zookeeper)
+ self.sched.setZooKeeper(zk_client)
self.log.info('Starting scheduler')
try:
@@ -191,6 +190,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.exit_handler(signal.SIGINT, None)
else:
self.sched.join()
+ zk_client.disconnect()
def main():
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 57d624488..a9732dec7 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -16,7 +16,7 @@ import time
from collections import defaultdict
from zuul import model
from zuul.lib.logutil import get_annotated_logger
-from zuul.zk import LockException
+from zuul.zk.exceptions import LockException
def add_resources(target, source):
@@ -115,7 +115,8 @@ class Nodepool(object):
self.requests[req.uid] = req
if nodeset.nodes:
- self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
+ self.sched.zk_nodepool.submitNodeRequest(req,
+ self._updateNodeRequest)
# Logged after submission so that we have the request id
log.info("Submitted node request %s", req)
self.emitStats(req)
@@ -132,7 +133,7 @@ class Nodepool(object):
if request.uid in self.requests:
request.canceled = True
try:
- self.sched.zk.deleteNodeRequest(request)
+ self.sched.zk_nodepool.deleteNodeRequest(request)
except Exception:
log.exception("Error deleting node request:")
@@ -149,7 +150,7 @@ class Nodepool(object):
if relative_priority is None:
return
try:
- self.sched.zk.lockNodeRequest(request, blocking=False)
+ self.sched.zk_nodepool.lockNodeRequest(request, blocking=False)
except LockException:
# It may be locked by nodepool, which is fine.
log.debug("Unable to revise locked node request %s", request)
@@ -157,7 +158,7 @@ class Nodepool(object):
try:
old_priority = request.relative_priority
request.relative_priority = relative_priority
- self.sched.zk.storeNodeRequest(request)
+ self.sched.zk_nodepool.storeNodeRequest(request)
log.debug("Revised relative priority of "
"node request %s from %s to %s",
request, old_priority, relative_priority)
@@ -165,7 +166,7 @@ class Nodepool(object):
log.exception("Unable to update node request %s", request)
finally:
try:
- self.sched.zk.unlockNodeRequest(request)
+ self.sched.zk_nodepool.unlockNodeRequest(request)
except Exception:
log.exception("Unable to unlock node request %s", request)
@@ -190,7 +191,7 @@ class Nodepool(object):
node.comment = request.reason
if request.node_expiration:
node.hold_expiration = request.node_expiration
- self.sched.zk.storeNode(node)
+ self.sched.zk_nodepool.storeNode(node)
request.nodes.append(dict(
build=build.uuid,
@@ -205,10 +206,10 @@ class Nodepool(object):
# Give ourselves a few seconds to try to obtain the lock rather than
# immediately give up.
- self.sched.zk.lockHoldRequest(request, timeout=5)
+ self.sched.zk_nodepool.lockHoldRequest(request, timeout=5)
try:
- self.sched.zk.storeHoldRequest(request)
+ self.sched.zk_nodepool.storeHoldRequest(request)
except Exception:
# If we fail to update the request count, we won't consider it
# a real autohold error by passing the exception up. It will
@@ -219,7 +220,7 @@ class Nodepool(object):
finally:
# Although any exceptions thrown here are handled higher up in
# _doBuildCompletedEvent, we always want to try to unlock it.
- self.sched.zk.unlockHoldRequest(request)
+ self.sched.zk_nodepool.unlockHoldRequest(request)
def useNodeSet(self, nodeset, build_set=None, event=None):
self.log.info("Setting nodeset %s in use" % (nodeset,))
@@ -228,7 +229,7 @@ class Nodepool(object):
if node.lock is None:
raise Exception("Node %s is not locked" % (node,))
node.state = model.STATE_IN_USE
- self.sched.zk.storeNode(node)
+ self.sched.zk_nodepool.storeNode(node)
if node.resources:
add_resources(resources, node.resources)
if build_set and resources:
@@ -275,7 +276,7 @@ class Nodepool(object):
if node.resources:
add_resources(resources, node.resources)
node.state = model.STATE_USED
- self.sched.zk.storeNode(node)
+ self.sched.zk_nodepool.storeNode(node)
except Exception:
log.exception("Exception storing node %s "
"while unlocking:", node)
@@ -303,7 +304,7 @@ class Nodepool(object):
def _unlockNodes(self, nodes):
for node in nodes:
try:
- self.sched.zk.unlockNode(node)
+ self.sched.zk_nodepool.unlockNode(node)
except Exception:
self.log.exception("Error unlocking node:")
@@ -321,7 +322,7 @@ class Nodepool(object):
raise Exception("Node %s allocated to %s, not %s" %
(node.id, node.allocated_to, request_id))
self.log.debug("Locking node %s" % (node,))
- self.sched.zk.lockNode(node, timeout=30)
+ self.sched.zk_nodepool.lockNode(node, timeout=30)
locked_nodes.append(node)
except Exception:
self.log.exception("Error locking nodes:")
@@ -347,8 +348,8 @@ class Nodepool(object):
if deleted:
log.debug("Resubmitting lost node request %s", request)
request.id = None
- self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
-
+ self.sched.zk_nodepool.submitNodeRequest(request,
+ self._updateNodeRequest)
# Stop watching this request node
return False
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
@@ -397,13 +398,13 @@ class Nodepool(object):
# processing it. Nodepool will automatically reallocate the assigned
# nodes in that situation.
try:
- if not self.sched.zk.nodeRequestExists(request):
+ if not self.sched.zk_nodepool.nodeRequestExists(request):
log.info("Request %s no longer exists, resubmitting",
request.id)
request.id = None
request.state = model.STATE_REQUESTED
self.requests[request.uid] = request
- self.sched.zk.submitNodeRequest(
+ self.sched.zk_nodepool.submitNodeRequest(
request, self._updateNodeRequest)
return False
except Exception:
@@ -430,7 +431,7 @@ class Nodepool(object):
# succeeded, delete the request.
log.debug("Deleting node request %s", request)
try:
- self.sched.zk.deleteNodeRequest(request)
+ self.sched.zk_nodepool.deleteNodeRequest(request)
except Exception:
log.exception("Error deleting node request:")
request.failed = True
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index ad2baa7cb..306e4af9f 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -42,6 +42,7 @@ from zuul.lib.statsd import get_statsd
import zuul.lib.queue
import zuul.lib.repl
from zuul.model import Build, HoldRequest, Tenant, TriggerEvent
+from zuul.zk.nodepool import ZooKeeperNodepool
COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl']
@@ -419,8 +420,9 @@ class Scheduler(threading.Thread):
def setNodepool(self, nodepool):
self.nodepool = nodepool
- def setZooKeeper(self, zk):
- self.zk = zk
+ def setZooKeeper(self, zk_client):
+ self.zk_client = zk_client
+ self.zk_nodepool = ZooKeeperNodepool(zk_client)
def runStats(self):
while not self.stats_stop.wait(self._stats_interval):
@@ -652,15 +654,15 @@ class Scheduler(threading.Thread):
request.node_expiration = node_hold_expiration
# No need to lock it since we are creating a new one.
- self.zk.storeHoldRequest(request)
+ self.zk_nodepool.storeHoldRequest(request)
def autohold_list(self):
'''
Return current hold requests as a list of dicts.
'''
data = []
- for request_id in self.zk.getHoldRequests():
- request = self.zk.getHoldRequest(request_id)
+ for request_id in self.zk_nodepool.getHoldRequests():
+ request = self.zk_nodepool.getHoldRequest(request_id)
if not request:
continue
data.append(request.toDict())
@@ -673,7 +675,7 @@ class Scheduler(threading.Thread):
:param str hold_request_id: The unique ID of the request to delete.
'''
try:
- hold_request = self.zk.getHoldRequest(hold_request_id)
+ hold_request = self.zk_nodepool.getHoldRequest(hold_request_id)
except Exception:
self.log.exception(
"Error retrieving autohold ID %s:", hold_request_id)
@@ -689,8 +691,9 @@ class Scheduler(threading.Thread):
:param str hold_request_id: The unique ID of the request to delete.
'''
+ hold_request = None
try:
- hold_request = self.zk.getHoldRequest(hold_request_id)
+ hold_request = self.zk_nodepool.getHoldRequest(hold_request_id)
except Exception:
self.log.exception(
"Error retrieving autohold ID %s:", hold_request_id)
@@ -702,7 +705,7 @@ class Scheduler(threading.Thread):
self.log.debug("Removing autohold %s", hold_request)
try:
- self.zk.deleteHoldRequest(hold_request)
+ self.zk_nodepool.deleteHoldRequest(hold_request)
except Exception:
self.log.exception(
"Error removing autohold request %s:", hold_request)
@@ -1491,15 +1494,15 @@ class Scheduler(threading.Thread):
return True
try:
- self.zk.lockHoldRequest(request)
+ self.zk_nodepool.lockHoldRequest(request)
self.log.info("Removing expired hold request %s", request)
- self.zk.deleteHoldRequest(request)
+ self.zk_nodepool.deleteHoldRequest(request)
except Exception:
self.log.exception(
"Failed to delete expired hold request %s", request)
finally:
try:
- self.zk.unlockHoldRequest(request)
+ self.zk_nodepool.unlockHoldRequest(request)
except Exception:
pass
@@ -1537,8 +1540,8 @@ class Scheduler(threading.Thread):
autohold = None
scope = Scope.NONE
self.log.debug("Checking build autohold key %s", autohold_key_base)
- for request_id in self.zk.getHoldRequests():
- request = self.zk.getHoldRequest(request_id)
+ for request_id in self.zk_nodepool.getHoldRequests():
+ request = self.zk_nodepool.getHoldRequest(request_id)
if not request:
continue
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
index 3ae9c72a6..5ae23b736 100755
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -28,13 +28,14 @@ import time
import select
import threading
+from zuul import exceptions
import zuul.lib.repl
+from zuul.lib import commandsocket
from zuul.lib.re2util import filter_allowed_disallowed
import zuul.model
-from zuul import exceptions
import zuul.rpcclient
-import zuul.zk
-from zuul.lib import commandsocket
+from zuul.zk import ZooKeeperClient
+from zuul.zk.nodepool import ZooKeeperNodepool
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
cherrypy.tools.websocket = WebSocketTool()
@@ -227,7 +228,8 @@ class ZuulWebAPI(object):
def __init__(self, zuulweb):
self.rpc = zuulweb.rpc
- self.zk = zuulweb.zk
+ self.zk_client = zuulweb.zk_client
+ self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
self.zuulweb = zuulweb
self.cache = {}
self.cache_time = {}
@@ -853,7 +855,7 @@ class ZuulWebAPI(object):
allowed_labels = data['allowed_labels']
disallowed_labels = data['disallowed_labels']
labels = set()
- for launcher in self.zk.getRegisteredLaunchers():
+ for launcher in self.zk_nodepool.getRegisteredLaunchers():
labels.update(filter_allowed_disallowed(
launcher.supported_labels,
allowed_labels, disallowed_labels))
@@ -867,7 +869,7 @@ class ZuulWebAPI(object):
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
def nodes(self, tenant):
ret = []
- for node in self.zk.nodeIterator():
+ for node in self.zk_nodepool.nodeIterator():
node_data = {}
for key in ("id", "type", "connection_type", "external_id",
"provider", "state", "state_time", "comment"):
@@ -1221,11 +1223,11 @@ class ZuulWeb(object):
self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
ssl_key, ssl_cert, ssl_ca,
client_id='Zuul Web Server')
- self.zk = zuul.zk.ZooKeeper(enable_cache=True)
+ self.zk_client = ZooKeeperClient()
if zk_hosts:
- self.zk.connect(hosts=zk_hosts, read_only=True,
- timeout=zk_timeout, tls_cert=zk_tls_cert,
- tls_key=zk_tls_key, tls_ca=zk_tls_ca)
+ self.zk_client.connect(hosts=zk_hosts, read_only=True,
+ timeout=zk_timeout, tls_cert=zk_tls_cert,
+ tls_key=zk_tls_key, tls_ca=zk_tls_ca)
self.connections = connections
self.authenticators = authenticators
@@ -1382,7 +1384,7 @@ class ZuulWeb(object):
cherrypy.server.httpserver = None
self.wsplugin.unsubscribe()
self.stream_manager.stop()
- self.zk.disconnect()
+ self.zk_client.disconnect()
self.stop_repl()
self._command_running = False
self.command_socket.stop()
diff --git a/zuul/zk/__init__.py b/zuul/zk/__init__.py
new file mode 100644
index 000000000..7642ffbd3
--- /dev/null
+++ b/zuul/zk/__init__.py
@@ -0,0 +1,153 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import logging
+import time
+from abc import ABCMeta
+from typing import Optional, List, Callable
+
+from kazoo.client import KazooClient
+from kazoo.handlers.threading import KazooTimeoutError
+from kazoo.protocol.states import KazooState
+
+from zuul.zk.exceptions import NoClientException
+
+
+class ZooKeeperClient(object):
+ log = logging.getLogger("zuul.zk.base.ZooKeeperClient")
+
+ # Log zookeeper retry every 10 seconds
+ retry_log_rate = 10
+
+ def __init__(self):
+ """
+ Initialize the ZooKeeper base client object.
+ """
+ self.client: Optional[KazooClient] = None
+ self._last_retry_log: int = 0
+ self.on_connect_listeners: List[Callable[[], None]] = []
+ self.on_disconnect_listeners: List[Callable[[], None]] = []
+
+ def _connectionListener(self, state):
+ """
+ Listener method for Kazoo connection state changes.
+
+ .. warning:: This method must not block.
+ """
+ if state == KazooState.LOST:
+ self.log.debug("ZooKeeper connection: LOST")
+ elif state == KazooState.SUSPENDED:
+ self.log.debug("ZooKeeper connection: SUSPENDED")
+ else:
+ self.log.debug("ZooKeeper connection: CONNECTED")
+
+ @property
+ def connected(self):
+ return self.client and self.client.state == KazooState.CONNECTED
+
+ @property
+ def suspended(self):
+ return self.client and self.client.state == KazooState.SUSPENDED
+
+ @property
+ def lost(self):
+ return not self.client or self.client.state == KazooState.LOST
+
+ def logConnectionRetryEvent(self):
+ now = time.monotonic()
+ if now - self._last_retry_log >= self.retry_log_rate:
+ self.log.warning("Retrying zookeeper connection")
+ self._last_retry_log = now
+
+ def connect(self, hosts: str, read_only: bool = False,
+ timeout: float = 10.0, tls_cert: Optional[str] = None,
+ tls_key: Optional[str] = None,
+ tls_ca: Optional[str] = None):
+ """
+ Establish a connection with ZooKeeper cluster.
+
+ Convenience method if a pre-existing ZooKeeper connection is not
+ supplied to the ZooKeeper object at instantiation time.
+
+ :param str hosts: Comma-separated list of hosts to connect to (e.g.
+ 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
+ :param bool read_only: If True, establishes a read-only connection.
+ :param float timeout: The ZooKeeper session timeout, in
+ seconds (default: 10.0).
+ :param str tls_key: Path to TLS key
+ :param str tls_cert: Path to TLS cert
+ :param str tls_ca: Path to TLS CA cert
+ """
+ if self.client is None:
+ args = dict(hosts=hosts, read_only=read_only, timeout=timeout)
+ if tls_key:
+ args['use_ssl'] = True
+ args['keyfile'] = tls_key
+ args['certfile'] = tls_cert
+ args['ca'] = tls_ca
+ self.client = KazooClient(**args)
+ self.client.add_listener(self._connectionListener)
+ # Manually retry initial connection attempt
+ while True:
+ try:
+ self.client.start(1)
+ break
+ except KazooTimeoutError:
+ self.logConnectionRetryEvent()
+
+ for listener in self.on_connect_listeners:
+ listener()
+
+ def disconnect(self):
+ """
+ Close the ZooKeeper cluster connection.
+
+ You should call this method if you used connect() to establish a
+ cluster connection.
+ """
+ for listener in self.on_disconnect_listeners:
+ listener()
+
+ if self.client is not None and self.client.connected:
+ self.client.stop()
+ self.client.close()
+ self.client = None
+
+ def resetHosts(self, hosts):
+ """
+ Reset the ZooKeeper cluster connection host list.
+
+ :param str hosts: Comma-separated list of hosts to connect to (e.g.
+ 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
+ """
+ if self.client is not None:
+ self.client.set_hosts(hosts=hosts)
+
+
+class ZooKeeperBase(metaclass=ABCMeta):
+ """Base class for components that need to interact with Zookeeper."""
+
+ def __init__(self, client: ZooKeeperClient):
+ self.client = client
+ self.client.on_connect_listeners.append(self._onConnect)
+ self.client.on_disconnect_listeners.append(self._onDisconnect)
+
+ @property
+ def kazoo_client(self) -> KazooClient:
+ if not self.client.client:
+ raise NoClientException()
+ return self.client.client
+
+ def _onConnect(self):
+ pass
+
+ def _onDisconnect(self):
+ pass
diff --git a/zuul/zk/exceptions.py b/zuul/zk/exceptions.py
new file mode 100644
index 000000000..5237dea2a
--- /dev/null
+++ b/zuul/zk/exceptions.py
@@ -0,0 +1,27 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from kazoo.exceptions import KazooException
+
+
+class ZuulZooKeeperException(KazooException):
+ """Base exception class for all custom ZK exceptions"""
+ pass
+
+
+class LockException(ZuulZooKeeperException):
+ pass
+
+
+class NoClientException(ZuulZooKeeperException):
+
+ def __init__(self):
+ super().__init__("No zookeeper client!")
diff --git a/zuul/zk.py b/zuul/zk/nodepool.py
index 980249526..f28faf463 100644
--- a/zuul/zk.py
+++ b/zuul/zk/nodepool.py
@@ -13,235 +13,348 @@
import json
import logging
import time
+from typing import Dict, Optional, List
-from kazoo.client import KazooClient, KazooState
-from kazoo import exceptions as kze
-from kazoo.handlers.threading import KazooTimeoutError
-from kazoo.recipe.cache import TreeCache, TreeEvent
+from kazoo.exceptions import NoNodeError, LockTimeout
+from kazoo.recipe.cache import TreeCache
+from kazoo.recipe.cache import TreeEvent
from kazoo.recipe.lock import Lock
import zuul.model
+from zuul.model import HoldRequest
+from zuul.zk import ZooKeeperClient, ZooKeeperBase
+from zuul.zk.exceptions import LockException
-class LockException(Exception):
- pass
-
-
-class ZooKeeper(object):
- '''
- Class implementing the ZooKeeper interface.
-
- This class uses the facade design pattern to keep common interaction
- with the ZooKeeper API simple and consistent for the caller, and
- limits coupling between objects. It allows for more complex interactions
- by providing direct access to the client connection when needed (though
- that is discouraged). It also provides for a convenient entry point for
- testing only ZooKeeper interactions.
- '''
-
- log = logging.getLogger("zuul.zk.ZooKeeper")
-
+class ZooKeeperNodepool(ZooKeeperBase):
+ """
+ Class implementing Nodepool related ZooKeeper interface.
+ """
+ NODES_ROOT = "/nodepool/nodes"
+ LAUNCHER_ROOT = "/nodepool/launchers"
REQUEST_ROOT = '/nodepool/requests'
REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
- NODE_ROOT = '/nodepool/nodes'
HOLD_REQUEST_ROOT = '/zuul/hold-requests'
- # Log zookeeper retry every 10 seconds
- retry_log_rate = 10
-
- def __init__(self, enable_cache=True):
- '''
- Initialize the ZooKeeper object.
-
- :param bool enable_cache: When True, enables caching of ZooKeeper
- objects (e.g., HoldRequests).
- '''
- self.client = None
- self._became_lost = False
- self._last_retry_log = 0
- self.enable_cache = enable_cache
+ log = logging.getLogger("zuul.zk.nodepool.ZooKeeperNodepool")
+ def __init__(self, client: ZooKeeperClient, enable_cache: bool = True):
+ super().__init__(client)
+ self.enable_cache = enable_cache # type: bool
# The caching model we use is designed around handing out model
# data as objects. To do this, we use two caches: one is a TreeCache
# which contains raw znode data (among other details), and one for
# storing that data serialized as objects. This allows us to return
# objects from the APIs, and avoids calling the methods to serialize
# the data into objects more than once.
- self._hold_request_tree = None
- self._cached_hold_requests = {}
-
- def _dictToStr(self, data):
- return json.dumps(data).encode('utf8')
-
- def _strToDict(self, data):
- return json.loads(data.decode('utf8'))
-
- def _connection_listener(self, state):
- '''
- Listener method for Kazoo connection state changes.
-
- .. warning:: This method must not block.
- '''
- if state == KazooState.LOST:
- self.log.debug("ZooKeeper connection: LOST")
- self._became_lost = True
- elif state == KazooState.SUSPENDED:
- self.log.debug("ZooKeeper connection: SUSPENDED")
+ self._hold_request_tree: Optional[TreeCache] = None
+ self._cached_hold_requests: Optional[Dict[str, HoldRequest]] = {}
+ if self.client.connected:
+ self._onConnect()
+
+ def _onConnect(self):
+ if self.enable_cache:
+ self._hold_request_tree = TreeCache(self.kazoo_client,
+ self.HOLD_REQUEST_ROOT)
+ self._hold_request_tree.listen_fault(self._cacheFaultListener)
+ self._hold_request_tree.listen(self._holdRequestCacheListener)
+ self._hold_request_tree.start()
+
+ def _onDisconnect(self):
+ if self._hold_request_tree is not None:
+ self._hold_request_tree.close()
+ self._hold_request_tree = None
+
+ def _launcherPath(self, launcher):
+ return "%s/%s" % (self.LAUNCHER_ROOT, launcher)
+
+ def _nodePath(self, node):
+ return "%s/%s" % (self.NODES_ROOT, node)
+
+ def _cacheFaultListener(self, e):
+ self.log.exception(e)
+
+ def getRegisteredLaunchers(self):
+ """
+ Get a list of all launchers that have registered with ZooKeeper.
+
+ :returns: A list of Launcher objects, or empty list if none are found.
+ """
+ try:
+ launcher_ids = self.kazoo_client\
+ .get_children(self.LAUNCHER_ROOT)
+ except NoNodeError:
+ return []
+
+ objs = []
+ for launcher in launcher_ids:
+ path = self._launcherPath(launcher)
+ try:
+ data, _ = self.kazoo_client.get(path)
+ except NoNodeError:
+ # launcher disappeared
+ continue
+
+ objs.append(Launcher.fromDict(json.loads(data.decode('utf8'))))
+ return objs
+
+ def getNodes(self):
+ """
+ Get the current list of all nodes.
+
+ :returns: A list of nodes.
+ """
+ try:
+ return self.kazoo_client.get_children(self.NODES_ROOT)
+ except NoNodeError:
+ return []
+
+ def _getNode(self, node):
+ """
+ Get the data for a specific node.
+
+ :param str node: The node ID.
+
+ :returns: The node data, or None if the node was not found.
+ """
+ path = self._nodePath(node)
+ try:
+ data, stat = self.kazoo_client.get(path)
+ except NoNodeError:
+ return None
+ if not data:
+ return None
+
+ d = json.loads(data.decode('utf8'))
+ d['id'] = node
+ return d
+
+ def nodeIterator(self):
+ """
+ Utility generator method for iterating through all nodes.
+ """
+ for node_id in self.getNodes():
+ node = self._getNode(node_id)
+ if node:
+ yield node
+
+ def getHoldRequests(self):
+ """
+ Get the current list of all hold requests.
+ """
+
+ try:
+ return sorted(self.kazoo_client
+ .get_children(self.HOLD_REQUEST_ROOT))
+ except NoNodeError:
+ return []
+
+ def getHoldRequest(self, hold_request_id):
+ path = self.HOLD_REQUEST_ROOT + "/" + hold_request_id
+ try:
+ data, stat = self.kazoo_client.get(path)
+ except NoNodeError:
+ return None
+ if not data:
+ return None
+
+ obj = HoldRequest.fromDict(json.loads(data.decode('utf8')))
+ obj.id = hold_request_id
+ obj.stat = stat
+ return obj
+
+ def storeHoldRequest(self, request: HoldRequest):
+ """
+ Create or update a hold request.
+
+ If this is a new request with no value for the `id` attribute of the
+ passed in request, then `id` will be set with the unique request
+ identifier after successful creation.
+
+ :param HoldRequest request: Object representing the hold request.
+ """
+ if request.id is None:
+ path = self.kazoo_client.create(
+ self.HOLD_REQUEST_ROOT + "/",
+ value=request.serialize(),
+ sequence=True,
+ makepath=True)
+ request.id = path.split('/')[-1]
else:
- self.log.debug("ZooKeeper connection: CONNECTED")
+ path = self.HOLD_REQUEST_ROOT + "/" + request.id
+ self.kazoo_client.set(path, request.serialize())
- @property
- def connected(self):
- return self.client.state == KazooState.CONNECTED
+ def _markHeldNodesAsUsed(self, request: HoldRequest):
+ """
+ Changes the state for each held node for the hold request to 'used'.
- @property
- def suspended(self):
- return self.client.state == KazooState.SUSPENDED
+ :returns: True if all nodes marked USED, False otherwise.
+ """
+ def getHeldNodeIDs(req: HoldRequest) -> List[str]:
+ node_ids: List[str] = []
+ for data in req.nodes:
+ # TODO(Shrews): Remove type check at some point.
+ # When autoholds were initially changed to be stored in ZK,
+ # the node IDs were originally stored as a list of strings.
+ # A later change embedded them within a dict. Handle both
+ # cases here to deal with the upgrade.
+ if isinstance(data, dict):
+ node_ids += data['nodes']
+ else:
+ node_ids.append(data)
+ return node_ids
- @property
- def lost(self):
- return self.client.state == KazooState.LOST
+ failure = False
+ for node_id in getHeldNodeIDs(request):
+ node = self._getNode(node_id)
+ if not node or node['state'] == zuul.model.STATE_USED:
+ continue
- @property
- def didLoseConnection(self):
- return self._became_lost
-
- def resetLostFlag(self):
- self._became_lost = False
-
- def logConnectionRetryEvent(self):
- now = time.monotonic()
- if now - self._last_retry_log >= self.retry_log_rate:
- self.log.warning("Retrying zookeeper connection")
- self._last_retry_log = now
-
- def connect(self, hosts, read_only=False, timeout=10.0,
- tls_cert=None, tls_key=None, tls_ca=None):
- '''
- Establish a connection with ZooKeeper cluster.
-
- Convenience method if a pre-existing ZooKeeper connection is not
- supplied to the ZooKeeper object at instantiation time.
-
- :param str hosts: Comma-separated list of hosts to connect to (e.g.
- 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
- :param bool read_only: If True, establishes a read-only connection.
- :param float timeout: The ZooKeeper session timeout, in
- seconds (default: 10.0).
- :param str tls_key: Path to TLS key
- :param str tls_cert: Path to TLS cert
- :param str tls_ca: Path to TLS CA cert
- '''
-
- if self.client is None:
- args = dict(hosts=hosts,
- read_only=read_only,
- timeout=timeout,
- )
- if tls_key:
- args['use_ssl'] = True
- args['keyfile'] = tls_key
- args['certfile'] = tls_cert
- args['ca'] = tls_ca
- self.client = KazooClient(**args)
- self.client.add_listener(self._connection_listener)
- # Manually retry initial connection attempt
- while True:
+ node['state'] = zuul.model.STATE_USED
+
+ name = None
+ label = None
+ if 'name' in node:
+ name = node['name']
+ if 'label' in node:
+ label = node['label']
+
+ node_obj = zuul.model.Node(name, label)
+ node_obj.updateFromDict(node)
+
+ try:
+ self.lockNode(node_obj, blocking=False)
+ self.storeNode(node_obj)
+ except Exception:
+ self.log.exception("Cannot change HELD node state to USED "
+ "for node %s in request %s",
+ node_obj.id, request.id)
+ failure = True
+ finally:
try:
- self.client.start(1)
- break
- except KazooTimeoutError:
- self.logConnectionRetryEvent()
+ if node_obj.lock:
+ self.unlockNode(node_obj)
+ except Exception:
+ self.log.exception(
+ "Failed to unlock HELD node %s for request %s",
+ node_obj.id, request.id)
- if self.enable_cache:
- self._hold_request_tree = TreeCache(self.client,
- self.HOLD_REQUEST_ROOT)
- self._hold_request_tree.listen_fault(self.cacheFaultListener)
- self._hold_request_tree.listen(self.holdRequestCacheListener)
- self._hold_request_tree.start()
+ return not failure
- def cacheFaultListener(self, e):
- self.log.exception(e)
+ def deleteHoldRequest(self, request: HoldRequest):
+ """
+ Delete a hold request.
- def holdRequestCacheListener(self, event):
- '''
- Keep the hold request object cache in sync with the TreeCache.
- '''
+ :param HoldRequest request: Object representing the hold request.
+ """
+ if not self._markHeldNodesAsUsed(request):
+ self.log.info("Unable to delete hold request %s because "
+ "not all nodes marked as USED.", request.id)
+ return
+
+ path = self.HOLD_REQUEST_ROOT + "/" + request.id
try:
- self._holdRequestCacheListener(event)
- except Exception:
- self.log.exception(
- "Exception in hold request cache update for event: %s", event)
+ self.kazoo_client.delete(path, recursive=True)
+ except NoNodeError:
+ pass
- def _holdRequestCacheListener(self, event):
- if hasattr(event.event_data, 'path'):
- # Ignore root node
- path = event.event_data.path
- if path == self.HOLD_REQUEST_ROOT:
- return
+ def lockHoldRequest(self, request: HoldRequest,
+ blocking: bool = True, timeout: Optional[int] = None):
+ """
+ Lock a node request.
- if event.event_type not in (TreeEvent.NODE_ADDED,
- TreeEvent.NODE_UPDATED,
- TreeEvent.NODE_REMOVED):
- return
+ This will set the `lock` attribute of the request object when the
+ lock is successfully acquired.
- path = event.event_data.path
- request_id = path.rsplit('/', 1)[1]
+ :param request: The hold request to lock.
+ :param blocking: Block until lock is obtained or return immediately.
+ :param timeout: Don't wait forever to acquire the lock.
+ """
+ if not request.id:
+ raise LockException(
+ "Hold request without an ID cannot be locked: %s" % request)
- if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
- # Requests with no data are invalid
- if not event.event_data.data:
- return
+ path = "%s/%s/lock" % (self.HOLD_REQUEST_ROOT, request.id)
+ try:
+ lock = Lock(self.kazoo_client, path)
+ have_lock = lock.acquire(blocking, timeout)
+ except LockTimeout:
+ raise LockException("Timeout trying to acquire lock %s" % path)
+
+ # If we aren't blocking, it's possible we didn't get the lock
+ # because someone else has it.
+ if not have_lock:
+ raise LockException("Did not get lock on %s" % path)
+
+ request.lock = lock
- # Perform an in-place update of the already cached request
- d = self._bytesToDict(event.event_data.data)
- old_request = self._cached_hold_requests.get(request_id)
- if old_request:
- if event.event_data.stat.version <= old_request.stat.version:
- # Don't update to older data
+ def unlockHoldRequest(self, request: HoldRequest):
+ """
+ Unlock a hold request.
+
+ The request must already have been locked.
+
+ :param HoldRequest request: The request to unlock.
+
+ :raises: ZKLockException if the request is not currently locked.
+ """
+ if request.lock is None:
+ raise LockException("Request %s does not hold a lock" % request)
+ request.lock.release()
+ request.lock = None
+
+ def _holdRequestCacheListener(self, event):
+ """
+ Keep the hold request object cache in sync with the TreeCache.
+ """
+ try:
+ if hasattr(event.event_data, 'path'):
+ # Ignore root node
+ path = event.event_data.path
+ if path == self.HOLD_REQUEST_ROOT:
return
- old_request.updateFromDict(d)
- old_request.stat = event.event_data.stat
- else:
- request = zuul.model.HoldRequest.fromDict(d)
- request.id = request_id
- request.stat = event.event_data.stat
- self._cached_hold_requests[request_id] = request
-
- elif event.event_type == TreeEvent.NODE_REMOVED:
- try:
- del self._cached_hold_requests[request_id]
- except KeyError:
- pass
- def disconnect(self):
- '''
- Close the ZooKeeper cluster connection.
+ if event.event_type not in (TreeEvent.NODE_ADDED,
+ TreeEvent.NODE_UPDATED,
+ TreeEvent.NODE_REMOVED):
+ return
- You should call this method if you used connect() to establish a
- cluster connection.
- '''
- if self._hold_request_tree is not None:
- self._hold_request_tree.close()
- self._hold_request_tree = None
+ path = event.event_data.path
+ request_id = path.rsplit('/', 1)[1]
- if self.client is not None and self.client.connected:
- self.client.stop()
- self.client.close()
- self.client = None
+ if event.event_type in (
+ TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
+ # Requests with no data are invalid
+ if not event.event_data.data:
+ return
- def resetHosts(self, hosts):
- '''
- Reset the ZooKeeper cluster connection host list.
+ # Perform an in-place update of the already cached request
+ d = json.loads(event.event_data.data.decode('utf8'))
+ old_request = self._cached_hold_requests.get(request_id)
+ if old_request:
+ if event.event_data.stat.version <= old_request.stat\
+ .version:
+ # Don't update to older data
+ return
+ old_request.updateFromDict(d)
+ old_request.stat = event.event_data.stat
+ else:
+ request = HoldRequest.fromDict(d)
+ request.id = request_id
+ request.stat = event.event_data.stat
+ self._cached_hold_requests[request_id] = request
- :param str hosts: Comma-separated list of hosts to connect to (e.g.
- 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
- '''
- if self.client is not None:
- self.client.set_hosts(hosts=hosts)
+ elif event.event_type == TreeEvent.NODE_REMOVED:
+ try:
+ del self._cached_hold_requests[request_id]
+ except KeyError:
+ pass
+ except Exception:
+ self.log.exception(
+ "Exception in hold request cache update for event: %s", event)
def submitNodeRequest(self, node_request, watcher):
- '''
+ """
Submit a request for nodes to Nodepool.
:param NodeRequest node_request: A NodeRequest with the
@@ -255,105 +368,106 @@ class ZooKeeper(object):
longer exists (notably, this will happen on disconnection
from ZooKeeper). The watcher should return False when
further updates are no longer necessary.
- '''
+ """
node_request.created_time = time.time()
data = node_request.toDict()
path = '{}/{:0>3}-'.format(self.REQUEST_ROOT, node_request.priority)
- path = self.client.create(path, self._dictToStr(data),
- makepath=True,
- sequence=True, ephemeral=True)
+ path = self.kazoo_client.create(path, json.dumps(data).encode('utf8'),
+ makepath=True, sequence=True,
+ ephemeral=True)
reqid = path.split("/")[-1]
node_request.id = reqid
- def callback(data, stat):
- if data:
- self.updateNodeRequest(node_request, data)
- deleted = (data is None) # data *are* none
+ def callback(value, _):
+ if value:
+ self.updateNodeRequest(node_request, value)
+ deleted = (value is None) # data *are* none
return watcher(node_request, deleted)
- self.client.DataWatch(path, callback)
+ self.kazoo_client.DataWatch(path, callback)
def deleteNodeRequest(self, node_request):
- '''
+ """
Delete a request for nodes.
:param NodeRequest node_request: A NodeRequest with the
contents of the request.
- '''
-
+ """
path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
try:
- self.client.delete(path)
- except kze.NoNodeError:
+ self.kazoo_client.delete(path)
+ except NoNodeError:
pass
def nodeRequestExists(self, node_request):
- '''
+ """
See if a NodeRequest exists in ZooKeeper.
:param NodeRequest node_request: A NodeRequest to verify.
:returns: True if the request exists, False otherwise.
- '''
+ """
path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
- if self.client.exists(path):
+ if self.kazoo_client.exists(path):
return True
return False
def storeNodeRequest(self, node_request):
- '''Store the node request.
+ """
+ Store the node request.
The request is expected to already exist and is updated in its
entirety.
:param NodeRequest node_request: The request to update.
- '''
-
+ """
path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
- self.client.set(path, self._dictToStr(node_request.toDict()))
+ self.kazoo_client.set(
+ path, json.dumps(node_request.toDict()).encode('utf8'))
def updateNodeRequest(self, node_request, data=None):
- '''Refresh an existing node request.
+ """
+ Refresh an existing node request.
:param NodeRequest node_request: The request to update.
:param dict data: The data to use; query ZK if absent.
- '''
+ """
if data is None:
path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
- data, stat = self.client.get(path)
- data = self._strToDict(data)
+ data, stat = self.kazoo_client.get(path)
+ data = json.loads(data.decode('utf8'))
request_nodes = list(node_request.nodeset.getNodes())
for i, nodeid in enumerate(data.get('nodes', [])):
request_nodes[i].id = nodeid
- self.updateNode(request_nodes[i])
+ self._updateNode(request_nodes[i])
node_request.updateFromDict(data)
def storeNode(self, node):
- '''Store the node.
+ """
+ Store the node.
The node is expected to already exist and is updated in its
entirety.
:param Node node: The node to update.
- '''
-
- path = '%s/%s' % (self.NODE_ROOT, node.id)
- self.client.set(path, self._dictToStr(node.toDict()))
+ """
+ path = '%s/%s' % (self.NODES_ROOT, node.id)
+ self.kazoo_client.set(path, json.dumps(node.toDict()).encode('utf8'))
- def updateNode(self, node):
- '''Refresh an existing node.
+ def _updateNode(self, node):
+ """
+ Refresh an existing node.
:param Node node: The node to update.
- '''
-
- node_path = '%s/%s' % (self.NODE_ROOT, node.id)
- node_data, node_stat = self.client.get(node_path)
- node_data = self._strToDict(node_data)
+ """
+ node_path = '%s/%s' % (self.NODES_ROOT, node.id)
+ node_data, node_stat = self.kazoo_client.get(node_path)
+ node_data = json.loads(node_data.decode('utf8'))
node.updateFromDict(node_data)
def lockNode(self, node, blocking=True, timeout=None):
- '''
+ """
Lock a node.
This should be called as soon as a request is fulfilled and
@@ -362,13 +476,12 @@ class ZooKeeper(object):
node should be reclaimed.
:param Node node: The node which should be locked.
- '''
-
- lock_path = '%s/%s/lock' % (self.NODE_ROOT, node.id)
+ """
+ lock_path = '%s/%s/lock' % (self.NODES_ROOT, node.id)
try:
- lock = Lock(self.client, lock_path)
+ lock = Lock(self.kazoo_client, lock_path)
have_lock = lock.acquire(blocking, timeout)
- except kze.LockTimeout:
+ except LockTimeout:
raise LockException(
"Timeout trying to acquire lock %s" % lock_path)
@@ -380,13 +493,13 @@ class ZooKeeper(object):
node.lock = lock
def unlockNode(self, node):
- '''
+ """
Unlock a node.
The node must already have been locked.
:param Node node: The node which should be unlocked.
- '''
+ """
if node.lock is None:
raise LockException("Node %s does not hold a lock" % (node,))
@@ -394,7 +507,7 @@ class ZooKeeper(object):
node.lock = None
def lockNodeRequest(self, request, blocking=True, timeout=None):
- '''
+ """
Lock a node request.
This will set the `lock` attribute of the request object when the
@@ -409,16 +522,15 @@ class ZooKeeper(object):
:raises: TimeoutException if we failed to acquire the lock when
blocking with a timeout. ZKLockException if we are not blocking
and could not get the lock, or a lock is already held.
- '''
-
+ """
path = "%s/%s" % (self.REQUEST_LOCK_ROOT, request.id)
+ lock = Lock(self.kazoo_client, path)
try:
- lock = Lock(self.client, path)
have_lock = lock.acquire(blocking, timeout)
- except kze.LockTimeout:
+ except LockTimeout:
raise LockException(
"Timeout trying to acquire lock %s" % path)
- except kze.NoNodeError:
+ except NoNodeError:
have_lock = False
self.log.error("Request not found for locking: %s", request)
@@ -431,7 +543,7 @@ class ZooKeeper(object):
self.updateNodeRequest(request)
def unlockNodeRequest(self, request):
- '''
+ """
Unlock a node request.
The request must already have been locked.
@@ -439,7 +551,7 @@ class ZooKeeper(object):
:param NodeRequest request: The request to unlock.
:raises: ZKLockException if the request is not currently locked.
- '''
+ """
if request.lock is None:
raise LockException(
"Request %s does not hold a lock" % request)
@@ -447,278 +559,40 @@ class ZooKeeper(object):
request.lock = None
def heldNodeCount(self, autohold_key):
- '''
+ """
Count the number of nodes being held for the given tenant/project/job.
:param set autohold_key: A set with the tenant/project/job names.
- '''
+ """
identifier = " ".join(autohold_key)
try:
- nodes = self.client.get_children(self.NODE_ROOT)
- except kze.NoNodeError:
+ nodes = self.kazoo_client.get_children(self.NODES_ROOT)
+ except NoNodeError:
return 0
count = 0
for nodeid in nodes:
- node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
+ node_path = '%s/%s' % (self.NODES_ROOT, nodeid)
try:
- node_data, node_stat = self.client.get(node_path)
- except kze.NoNodeError:
+ node_data, node_stat = self.kazoo_client.get(node_path)
+ except NoNodeError:
# Node got removed on us. Just ignore.
continue
if not node_data:
self.log.warning("Node ID %s has no data", nodeid)
continue
- node_data = self._strToDict(node_data)
+ node_data = json.loads(node_data.decode('utf8'))
if (node_data['state'] == zuul.model.STATE_HOLD and
node_data.get('hold_job') == identifier):
count += 1
return count
- # Copy of nodepool/zk.py begins here
- NODE_ROOT = "/nodepool/nodes"
- LAUNCHER_ROOT = "/nodepool/launchers"
-
- def _bytesToDict(self, data):
- return json.loads(data.decode('utf8'))
-
- def _launcherPath(self, launcher):
- return "%s/%s" % (self.LAUNCHER_ROOT, launcher)
-
- def _nodePath(self, node):
- return "%s/%s" % (self.NODE_ROOT, node)
-
- def getRegisteredLaunchers(self):
- '''
- Get a list of all launchers that have registered with ZooKeeper.
-
- :returns: A list of Launcher objects, or empty list if none are found.
- '''
- try:
- launcher_ids = self.client.get_children(self.LAUNCHER_ROOT)
- except kze.NoNodeError:
- return []
-
- objs = []
- for launcher in launcher_ids:
- path = self._launcherPath(launcher)
- try:
- data, _ = self.client.get(path)
- except kze.NoNodeError:
- # launcher disappeared
- continue
-
- objs.append(Launcher.fromDict(self._bytesToDict(data)))
- return objs
-
- def getNodes(self):
- '''
- Get the current list of all nodes.
-
- :returns: A list of nodes.
- '''
- try:
- return self.client.get_children(self.NODE_ROOT)
- except kze.NoNodeError:
- return []
-
- def getNode(self, node):
- '''
- Get the data for a specific node.
-
- :param str node: The node ID.
-
- :returns: The node data, or None if the node was not found.
- '''
- path = self._nodePath(node)
- try:
- data, stat = self.client.get(path)
- except kze.NoNodeError:
- return None
- if not data:
- return None
-
- d = self._bytesToDict(data)
- d['id'] = node
- return d
-
- def nodeIterator(self):
- '''
- Utility generator method for iterating through all nodes.
- '''
- for node_id in self.getNodes():
- node = self.getNode(node_id)
- if node:
- yield node
-
- def getHoldRequests(self):
- '''
- Get the current list of all hold requests.
- '''
- try:
- return sorted(self.client.get_children(self.HOLD_REQUEST_ROOT))
- except kze.NoNodeError:
- return []
-
- def getHoldRequest(self, hold_request_id):
- path = self.HOLD_REQUEST_ROOT + "/" + hold_request_id
- try:
- data, stat = self.client.get(path)
- except kze.NoNodeError:
- return None
- if not data:
- return None
-
- obj = zuul.model.HoldRequest.fromDict(self._strToDict(data))
- obj.id = hold_request_id
- obj.stat = stat
- return obj
-
- def storeHoldRequest(self, hold_request):
- '''
- Create or update a hold request.
-
- If this is a new request with no value for the `id` attribute of the
- passed in request, then `id` will be set with the unique request
- identifier after successful creation.
-
- :param HoldRequest hold_request: Object representing the hold request.
- '''
- if hold_request.id is None:
- path = self.client.create(
- self.HOLD_REQUEST_ROOT + "/",
- value=hold_request.serialize(),
- sequence=True,
- makepath=True)
- hold_request.id = path.split('/')[-1]
- else:
- path = self.HOLD_REQUEST_ROOT + "/" + hold_request.id
- self.client.set(path, hold_request.serialize())
-
- def _markHeldNodesAsUsed(self, hold_request):
- '''
- Changes the state for each held node for the hold request to 'used'.
-
- :returns: True if all nodes marked USED, False otherwise.
- '''
- def getHeldNodeIDs(request):
- node_ids = []
- for data in request.nodes:
- # TODO(Shrews): Remove type check at some point.
- # When autoholds were initially changed to be stored in ZK,
- # the node IDs were originally stored as a list of strings.
- # A later change embedded them within a dict. Handle both
- # cases here to deal with the upgrade.
- if isinstance(data, dict):
- node_ids += data['nodes']
- else:
- node_ids.append(data)
- return node_ids
-
- failure = False
- for node_id in getHeldNodeIDs(hold_request):
- node = self.getNode(node_id)
- if not node or node['state'] == zuul.model.STATE_USED:
- continue
-
- node['state'] = zuul.model.STATE_USED
-
- name = None
- label = None
- if 'name' in node:
- name = node['name']
- if 'label' in node:
- label = node['label']
-
- node_obj = zuul.model.Node(name, label)
- node_obj.updateFromDict(node)
-
- try:
- self.lockNode(node_obj, blocking=False)
- self.storeNode(node_obj)
- except Exception:
- self.log.exception("Cannot change HELD node state to USED "
- "for node %s in request %s",
- node_obj.id, hold_request.id)
- failure = True
- finally:
- try:
- if node_obj.lock:
- self.unlockNode(node_obj)
- except Exception:
- self.log.exception(
- "Failed to unlock HELD node %s for request %s",
- node_obj.id, hold_request.id)
-
- return not failure
-
- def deleteHoldRequest(self, hold_request):
- '''
- Delete a hold request.
-
- :param HoldRequest hold_request: Object representing the hold request.
- '''
- if not self._markHeldNodesAsUsed(hold_request):
- self.log.info("Unable to delete hold request %s because "
- "not all nodes marked as USED.", hold_request.id)
- return
-
- path = self.HOLD_REQUEST_ROOT + "/" + hold_request.id
- try:
- self.client.delete(path, recursive=True)
- except kze.NoNodeError:
- pass
-
- def lockHoldRequest(self, request, blocking=True, timeout=None):
- '''
- Lock a node request.
-
- This will set the `lock` attribute of the request object when the
- lock is successfully acquired.
-
- :param HoldRequest request: The hold request to lock.
- '''
- if not request.id:
- raise LockException(
- "Hold request without an ID cannot be locked: %s" % request)
-
- path = "%s/%s/lock" % (self.HOLD_REQUEST_ROOT, request.id)
- try:
- lock = Lock(self.client, path)
- have_lock = lock.acquire(blocking, timeout)
- except kze.LockTimeout:
- raise LockException(
- "Timeout trying to acquire lock %s" % path)
-
- # If we aren't blocking, it's possible we didn't get the lock
- # because someone else has it.
- if not have_lock:
- raise LockException("Did not get lock on %s" % path)
-
- request.lock = lock
-
- def unlockHoldRequest(self, request):
- '''
- Unlock a hold request.
-
- The request must already have been locked.
-
- :param HoldRequest request: The request to unlock.
-
- :raises: ZKLockException if the request is not currently locked.
- '''
- if request.lock is None:
- raise LockException(
- "Request %s does not hold a lock" % request)
- request.lock.release()
- request.lock = None
-
-class Launcher():
- '''
+class Launcher:
+ """
Class to describe a nodepool launcher.
- '''
+ """
def __init__(self):
self.id = None