diff options
-rw-r--r-- | .zuul.yaml | 2 | ||||
-rw-r--r-- | doc/source/examples/zoo.cfg | 2 | ||||
-rw-r--r-- | playbooks/common/post-system-logs.yaml | 5 | ||||
-rw-r--r-- | tests/base.py | 15 | ||||
-rw-r--r-- | tests/nodepool/test_nodepool_integration.py | 10 | ||||
-rw-r--r-- | tests/unit/test_nodepool.py | 18 | ||||
-rw-r--r-- | tests/unit/test_scheduler.py | 48 | ||||
-rw-r--r-- | tests/unit/test_zk.py | 37 | ||||
-rwxr-xr-x | tools/test-setup.sh | 2 | ||||
-rwxr-xr-x | zuul/cmd/scheduler.py | 14 | ||||
-rw-r--r-- | zuul/nodepool.py | 39 | ||||
-rw-r--r-- | zuul/scheduler.py | 29 | ||||
-rwxr-xr-x | zuul/web/__init__.py | 24 | ||||
-rw-r--r-- | zuul/zk/__init__.py | 153 | ||||
-rw-r--r-- | zuul/zk/exceptions.py | 27 | ||||
-rw-r--r-- | zuul/zk/nodepool.py (renamed from zuul/zk.py) | 858 |
16 files changed, 683 insertions, 600 deletions
diff --git a/.zuul.yaml b/.zuul.yaml index 0e99c9f41..e35aa3594 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -236,6 +236,7 @@ tox_environment: ZUUL_TEST_ROOT: /tmp/zuul-test YARN_REGISTRY: "https://{{ zuul_site_mirror_fqdn }}:4443/registry.npmjs" + post-run: playbooks/common/post-system-logs.yaml - tox-py38: irrelevant-files: - zuul/cmd/migrate.py @@ -243,6 +244,7 @@ timeout: 4800 # 80 minutes nodeset: ubuntu-bionic vars: *zuul_tox_vars + post-run: playbooks/common/post-system-logs.yaml - zuul-build-dashboard-openstack-whitelabel - zuul-build-dashboard-software-factory - zuul-build-dashboard-opendev diff --git a/doc/source/examples/zoo.cfg b/doc/source/examples/zoo.cfg index 4d4fcc3ea..7f0cbf7cb 100644 --- a/doc/source/examples/zoo.cfg +++ b/doc/source/examples/zoo.cfg @@ -5,7 +5,7 @@ initLimit=5 syncLimit=2 autopurge.snapRetainCount=3 autopurge.purgeInterval=0 -maxClientCnxns=60 +maxClientCnxns=1000 standaloneEnabled=true admin.enableServer=true server.1=examples_zk_1.examples_default:2888:3888 diff --git a/playbooks/common/post-system-logs.yaml b/playbooks/common/post-system-logs.yaml new file mode 100644 index 000000000..830899c38 --- /dev/null +++ b/playbooks/common/post-system-logs.yaml @@ -0,0 +1,5 @@ +- hosts: all + tasks: + + - name: Collect zookeeper logs + shell: "cp /var/log/zookeeper/zookeeper.log {{ zuul_output_dir }}/logs/zookeeper.log" diff --git a/tests/base.py b/tests/base.py index 3d87593ac..bb1cf2338 100644 --- a/tests/base.py +++ b/tests/base.py @@ -111,10 +111,10 @@ import zuul.merger.server import zuul.model import zuul.nodepool import zuul.rpcclient -import zuul.zk import zuul.configloader from zuul.lib.config import get_default from zuul.lib.logutil import get_annotated_logger +from zuul.zk import ZooKeeperClient FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'fixtures') @@ -3629,13 +3629,14 @@ class ChrootedKazooFixture(fixtures.Fixture): for x in range(8)) rand_test_path = '%s_%s_%s' % (random_bits, os.getpid(), self.test_id) - self.zookeeper_chroot = "/nodepool_test/%s" % rand_test_path + self.zookeeper_chroot = f"/test/{rand_test_path}" self.addCleanup(self._cleanup) # Ensure the chroot path exists and clean up any pre-existing znodes. _tmp_client = kazoo.client.KazooClient( - hosts='%s:%s' % (self.zookeeper_host, self.zookeeper_port)) + hosts=f'{self.zookeeper_host}:{self.zookeeper_port}', timeout=10 + ) _tmp_client.start() if _tmp_client.exists(self.zookeeper_chroot): @@ -3992,13 +3993,13 @@ class SchedulerTestApp: self.config, self.sched) merge_client = RecordingMergeClient(self.config, self.sched) nodepool = zuul.nodepool.Nodepool(self.sched) - zk = zuul.zk.ZooKeeper(enable_cache=True) - zk.connect(self.zk_config, timeout=30.0) + zk_client = ZooKeeperClient() + zk_client.connect(self.zk_config, timeout=30.0) self.sched.setExecutor(executor_client) self.sched.setMerger(merge_client) self.sched.setNodepool(nodepool) - self.sched.setZooKeeper(zk) + self.sched.setZooKeeper(zk_client) self.sched.start() executor_client.gearman.waitForServer() @@ -4626,7 +4627,7 @@ class ZuulTestCase(BaseTestCase): self.rpcclient.shutdown() self.gearman_server.shutdown() self.fake_nodepool.stop() - self.scheds.execute(lambda app: app.sched.zk.disconnect()) + self.scheds.execute(lambda app: app.sched.zk_client.disconnect()) self.printHistory() # We whitelist watchdog threads as they have relatively long delays # before noticing they should exit, but they should exit on their own. diff --git a/tests/nodepool/test_nodepool_integration.py b/tests/nodepool/test_nodepool_integration.py index bb6c8abaa..18fcbc179 100644 --- a/tests/nodepool/test_nodepool_integration.py +++ b/tests/nodepool/test_nodepool_integration.py @@ -31,9 +31,9 @@ class TestNodepoolIntegration(BaseTestCase): super(TestNodepoolIntegration, self).setUp() self.statsd = None - self.zk = zuul.zk.ZooKeeper(enable_cache=True) - self.addCleanup(self.zk.disconnect) - self.zk.connect('localhost:2181') + self.zk_client = zuul.zk.ZooKeeperClient() + self.addCleanup(self.zk_client.disconnect) + self.zk_client.connect('localhost:2181') self.hostname = socket.gethostname() self.provisioned_requests = [] @@ -104,8 +104,8 @@ class TestNodepoolIntegration(BaseTestCase): job.nodeset = nodeset self.fake_nodepool.paused = True request = self.nodepool.requestNodes(None, job, 0) - self.zk.client.stop() - self.zk.client.start() + self.zk_client.client.stop() + self.zk_client.client.start() self.fake_nodepool.paused = False self.waitForRequests() self.assertEqual(len(self.provisioned_requests), 1) diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py index 2326b1b1b..8b9a9adbe 100644 --- a/tests/unit/test_nodepool.py +++ b/tests/unit/test_nodepool.py @@ -15,11 +15,12 @@ import time -import zuul.zk -import zuul.nodepool from zuul import model +import zuul.nodepool from tests.base import BaseTestCase, ChrootedKazooFixture, FakeNodepool +from zuul.zk import ZooKeeperClient +from zuul.zk.nodepool import ZooKeeperNodepool class TestNodepool(BaseTestCase): @@ -37,9 +38,10 @@ class TestNodepool(BaseTestCase): self.zk_chroot_fixture.zookeeper_port, self.zk_chroot_fixture.zookeeper_chroot) - self.zk = zuul.zk.ZooKeeper(enable_cache=True) - self.addCleanup(self.zk.disconnect) - self.zk.connect(self.zk_config) + self.zk_client = ZooKeeperClient() + self.zk_nodepool = ZooKeeperNodepool(self.zk_client) + self.addCleanup(self.zk_client.disconnect) + self.zk_client.connect(self.zk_config) self.hostname = 'nodepool-test-hostname' self.provisioned_requests = [] @@ -105,8 +107,8 @@ class TestNodepool(BaseTestCase): job.nodeset = nodeset self.fake_nodepool.pause() request = self.nodepool.requestNodes(None, job, 0) - self.zk.client.stop() - self.zk.client.start() + self.zk_client.client.stop() + self.zk_client.client.start() self.fake_nodepool.unpause() self.waitForRequests() self.assertEqual(len(self.provisioned_requests), 1) @@ -161,7 +163,7 @@ class TestNodepool(BaseTestCase): self.assertEqual(len(self.provisioned_requests), 1) self.assertEqual(request.state, 'fulfilled') - self.zk.deleteNodeRequest(request) + self.zk_nodepool.deleteNodeRequest(request) # Accept the nodes self.nodepool.acceptNodes(request, request.id) diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 986091158..a58e056e6 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -190,9 +190,10 @@ class TestSchedulerAutoholdHoldExpiration(ZuulTestCase): self.assertTrue(r) # There should be a record in ZooKeeper - request_list = self.scheds.first.sched.zk.getHoldRequests() + request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests() self.assertEqual(1, len(request_list)) - request = self.scheds.first.sched.zk.getHoldRequest(request_list[0]) + request = self.scheds.first.sched.zk_nodepool.getHoldRequest( + request_list[0]) self.assertIsNotNone(request) self.assertEqual('tenant-one', request.tenant) self.assertEqual('review.example.com/org/project', request.project) @@ -220,9 +221,10 @@ class TestSchedulerAutoholdHoldExpiration(ZuulTestCase): self.assertTrue(r) # There should be a record in ZooKeeper - request_list = self.scheds.first.sched.zk.getHoldRequests() + request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests() self.assertEqual(1, len(request_list)) - request = self.scheds.first.sched.zk.getHoldRequest(request_list[0]) + request = self.scheds.first.sched.zk_nodepool.getHoldRequest( + request_list[0]) self.assertIsNotNone(request) self.assertEqual('tenant-one', request.tenant) self.assertEqual('review.example.com/org/project', request.project) @@ -251,9 +253,10 @@ class TestSchedulerAutoholdHoldExpiration(ZuulTestCase): self.assertTrue(r) # There should be a record in ZooKeeper - request_list = self.scheds.first.sched.zk.getHoldRequests() + request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests() self.assertEqual(1, len(request_list)) - request = self.scheds.first.sched.zk.getHoldRequest(request_list[0]) + request = self.scheds.first.sched.zk_nodepool.getHoldRequest( + request_list[0]) self.assertIsNotNone(request) self.assertEqual('tenant-one', request.tenant) self.assertEqual('review.example.com/org/project', request.project) @@ -1766,9 +1769,10 @@ class TestScheduler(ZuulTestCase): self.assertTrue(r) # There should be a record in ZooKeeper - request_list = self.scheds.first.sched.zk.getHoldRequests() + request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests() self.assertEqual(1, len(request_list)) - request = self.scheds.first.sched.zk.getHoldRequest(request_list[0]) + request = self.scheds.first.sched.zk_nodepool.getHoldRequest( + request_list[0]) self.assertIsNotNone(request) self.assertEqual('tenant-one', request.tenant) self.assertEqual('review.example.com/org/project', request.project) @@ -1827,7 +1831,8 @@ class TestScheduler(ZuulTestCase): # The hold request current_count should have incremented # and we should have recorded the held node ID. - request2 = self.scheds.first.sched.zk.getHoldRequest(request.id) + request2 = self.scheds.first.sched.zk_nodepool.getHoldRequest( + request.id) self.assertEqual(request.current_count + 1, request2.current_count) self.assertEqual(1, len(request2.nodes)) self.assertEqual(1, len(request2.nodes[0]["nodes"])) @@ -1849,11 +1854,12 @@ class TestScheduler(ZuulTestCase): self.assertEqual(held_nodes, 1) # request current_count should not have changed - request3 = self.scheds.first.sched.zk.getHoldRequest(request2.id) + request3 = self.scheds.first.sched.zk_nodepool.getHoldRequest( + request2.id) self.assertEqual(request2.current_count, request3.current_count) # Deleting hold request should set held nodes to used - self.scheds.first.sched.zk.deleteHoldRequest(request3) + self.scheds.first.sched.zk_nodepool.deleteHoldRequest(request3) node_states = [n['state'] for n in self.fake_nodepool.getNodes()] self.assertEqual(3, len(node_states)) self.assertEqual([zuul.model.STATE_USED] * 3, node_states) @@ -1873,9 +1879,10 @@ class TestScheduler(ZuulTestCase): self.assertTrue(r) # There should be a record in ZooKeeper - request_list = self.scheds.first.sched.zk.getHoldRequests() + request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests() self.assertEqual(1, len(request_list)) - request = self.scheds.first.sched.zk.getHoldRequest(request_list[0]) + request = self.scheds.first.sched.zk_nodepool.getHoldRequest( + request_list[0]) self.assertIsNotNone(request) request = client.autohold_info(request.id) @@ -1897,14 +1904,15 @@ class TestScheduler(ZuulTestCase): self.assertTrue(r) # There should be a record in ZooKeeper - request_list = self.scheds.first.sched.zk.getHoldRequests() + request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests() self.assertEqual(1, len(request_list)) - request = self.scheds.first.sched.zk.getHoldRequest(request_list[0]) + request = self.scheds.first.sched.zk_nodepool.getHoldRequest( + request_list[0]) self.assertIsNotNone(request) # Delete and verify no more requests self.assertTrue(client.autohold_delete(request.id)) - request_list = self.scheds.first.sched.zk.getHoldRequests() + request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests() self.assertEqual([], request_list) def _test_autohold_scoped(self, change_obj, change, ref): @@ -5783,8 +5791,8 @@ For CI problems and help debugging, contact ci@example.org""" self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) self.waitUntilSettled() - self.scheds.execute(lambda app: app.sched.zk.client.stop()) - self.scheds.execute(lambda app: app.sched.zk.client.start()) + self.scheds.execute(lambda app: app.sched.zk_client.client.stop()) + self.scheds.execute(lambda app: app.sched.zk_client.client.start()) self.fake_nodepool.unpause() self.waitUntilSettled() @@ -5819,8 +5827,8 @@ For CI problems and help debugging, contact ci@example.org""" # The request is fulfilled, but the scheduler hasn't processed # it yet. Reconnect ZK. - self.scheds.execute(lambda app: app.sched.zk.client.stop()) - self.scheds.execute(lambda app: app.sched.zk.client.start()) + self.scheds.execute(lambda app: app.sched.zk_client.client.stop()) + self.scheds.execute(lambda app: app.sched.zk_client.client.start()) # Allow the scheduler to continue and process the (now # out-of-date) notification that nodes are ready. diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index d9942a90a..6bc2f3028 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -15,10 +15,12 @@ import testtools -import zuul.zk from zuul import model +import zuul.zk.exceptions from tests.base import BaseTestCase, ChrootedKazooFixture +from zuul.zk import ZooKeeperClient +from zuul.zk.nodepool import ZooKeeperNodepool class TestZK(BaseTestCase): @@ -33,9 +35,10 @@ class TestZK(BaseTestCase): self.zk_chroot_fixture.zookeeper_port, self.zk_chroot_fixture.zookeeper_chroot) - self.zk = zuul.zk.ZooKeeper(enable_cache=True) - self.addCleanup(self.zk.disconnect) - self.zk.connect(self.zk_config) + self.zk_client = ZooKeeperClient() + self.zk_nodepool = ZooKeeperNodepool(self.zk_client) + self.addCleanup(self.zk_client.disconnect) + self.zk_client.connect(self.zk_config) def _createRequest(self): req = model.HoldRequest() @@ -46,37 +49,37 @@ class TestZK(BaseTestCase): def test_hold_requests_api(self): # Test no requests returns empty list - self.assertEqual([], self.zk.getHoldRequests()) + self.assertEqual([], self.zk_nodepool.getHoldRequests()) # Test get on non-existent request is None - self.assertIsNone(self.zk.getHoldRequest('anything')) + self.assertIsNone(self.zk_nodepool.getHoldRequest('anything')) # Test creating a new request req1 = self._createRequest() - self.zk.storeHoldRequest(req1) + self.zk_nodepool.storeHoldRequest(req1) self.assertIsNotNone(req1.id) - self.assertEqual(1, len(self.zk.getHoldRequests())) + self.assertEqual(1, len(self.zk_nodepool.getHoldRequests())) # Test getting the request - req2 = self.zk.getHoldRequest(req1.id) + req2 = self.zk_nodepool.getHoldRequest(req1.id) self.assertEqual(req1.toDict(), req2.toDict()) # Test updating the request req2.reason = 'a new reason' - self.zk.storeHoldRequest(req2) - req2 = self.zk.getHoldRequest(req2.id) + self.zk_nodepool.storeHoldRequest(req2) + req2 = self.zk_nodepool.getHoldRequest(req2.id) self.assertNotEqual(req1.reason, req2.reason) # Test lock operations - self.zk.lockHoldRequest(req2, blocking=False) + self.zk_nodepool.lockHoldRequest(req2, blocking=False) with testtools.ExpectedException( - zuul.zk.LockException, + zuul.zk.exceptions.LockException, "Timeout trying to acquire lock .*" ): - self.zk.lockHoldRequest(req2, blocking=True, timeout=2) - self.zk.unlockHoldRequest(req2) + self.zk_nodepool.lockHoldRequest(req2, blocking=True, timeout=2) + self.zk_nodepool.unlockHoldRequest(req2) self.assertIsNone(req2.lock) # Test deleting the request - self.zk.deleteHoldRequest(req1) - self.assertEqual([], self.zk.getHoldRequests()) + self.zk_nodepool.deleteHoldRequest(req1) + self.assertEqual([], self.zk_nodepool.getHoldRequests()) diff --git a/tools/test-setup.sh b/tools/test-setup.sh index 7416040ae..cb524f9c5 100755 --- a/tools/test-setup.sh +++ b/tools/test-setup.sh @@ -11,6 +11,8 @@ TOOLSDIR=$(dirname $0) sudo service zookeeper stop DATADIR=$(sed -n -e 's/^dataDir=//p' /etc/zookeeper/conf/zoo.cfg) sudo mount -t tmpfs -o nodev,nosuid,size=500M none $DATADIR +echo "autopurge.purgeInterval=1" | sudo tee -a /etc/zookeeper/conf/zoo.cfg +echo "maxClientCnxns=1000" | sudo tee -a /etc/zookeeper/conf/zoo.cfg # Prepare a tmpfs for Zuul test root if [[ -n "${ZUUL_TEST_ROOT:-}" ]]; then diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py index 4602bb783..cbda4fbd8 100755 --- a/zuul/cmd/scheduler.py +++ b/zuul/cmd/scheduler.py @@ -20,13 +20,12 @@ import signal import zuul.cmd import zuul.executor.client +from zuul.lib.config import get_default +from zuul.lib.statsd import get_statsd_config import zuul.merger.client import zuul.nodepool import zuul.scheduler -import zuul.zk - -from zuul.lib.config import get_default -from zuul.lib.statsd import get_statsd_config +from zuul.zk import ZooKeeperClient class Scheduler(zuul.cmd.ZuulDaemonApp): @@ -144,7 +143,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): merger = zuul.merger.client.MergeClient(self.config, self.sched) nodepool = zuul.nodepool.Nodepool(self.sched) - zookeeper = zuul.zk.ZooKeeper(enable_cache=True) + zk_client = ZooKeeperClient() zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None) if not zookeeper_hosts: raise Exception("The zookeeper hosts config value is required") @@ -153,7 +152,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca') zookeeper_timeout = float(get_default(self.config, 'zookeeper', 'session_timeout', 10.0)) - zookeeper.connect( + zk_client.connect( zookeeper_hosts, timeout=zookeeper_timeout, tls_cert=zookeeper_tls_cert, @@ -164,7 +163,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): self.sched.setExecutor(gearman) self.sched.setMerger(merger) self.sched.setNodepool(nodepool) - self.sched.setZooKeeper(zookeeper) + self.sched.setZooKeeper(zk_client) self.log.info('Starting scheduler') try: @@ -191,6 +190,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): self.exit_handler(signal.SIGINT, None) else: self.sched.join() + zk_client.disconnect() def main(): diff --git a/zuul/nodepool.py b/zuul/nodepool.py index 57d624488..a9732dec7 100644 --- a/zuul/nodepool.py +++ b/zuul/nodepool.py @@ -16,7 +16,7 @@ import time from collections import defaultdict from zuul import model from zuul.lib.logutil import get_annotated_logger -from zuul.zk import LockException +from zuul.zk.exceptions import LockException def add_resources(target, source): @@ -115,7 +115,8 @@ class Nodepool(object): self.requests[req.uid] = req if nodeset.nodes: - self.sched.zk.submitNodeRequest(req, self._updateNodeRequest) + self.sched.zk_nodepool.submitNodeRequest(req, + self._updateNodeRequest) # Logged after submission so that we have the request id log.info("Submitted node request %s", req) self.emitStats(req) @@ -132,7 +133,7 @@ class Nodepool(object): if request.uid in self.requests: request.canceled = True try: - self.sched.zk.deleteNodeRequest(request) + self.sched.zk_nodepool.deleteNodeRequest(request) except Exception: log.exception("Error deleting node request:") @@ -149,7 +150,7 @@ class Nodepool(object): if relative_priority is None: return try: - self.sched.zk.lockNodeRequest(request, blocking=False) + self.sched.zk_nodepool.lockNodeRequest(request, blocking=False) except LockException: # It may be locked by nodepool, which is fine. log.debug("Unable to revise locked node request %s", request) @@ -157,7 +158,7 @@ class Nodepool(object): try: old_priority = request.relative_priority request.relative_priority = relative_priority - self.sched.zk.storeNodeRequest(request) + self.sched.zk_nodepool.storeNodeRequest(request) log.debug("Revised relative priority of " "node request %s from %s to %s", request, old_priority, relative_priority) @@ -165,7 +166,7 @@ class Nodepool(object): log.exception("Unable to update node request %s", request) finally: try: - self.sched.zk.unlockNodeRequest(request) + self.sched.zk_nodepool.unlockNodeRequest(request) except Exception: log.exception("Unable to unlock node request %s", request) @@ -190,7 +191,7 @@ class Nodepool(object): node.comment = request.reason if request.node_expiration: node.hold_expiration = request.node_expiration - self.sched.zk.storeNode(node) + self.sched.zk_nodepool.storeNode(node) request.nodes.append(dict( build=build.uuid, @@ -205,10 +206,10 @@ class Nodepool(object): # Give ourselves a few seconds to try to obtain the lock rather than # immediately give up. - self.sched.zk.lockHoldRequest(request, timeout=5) + self.sched.zk_nodepool.lockHoldRequest(request, timeout=5) try: - self.sched.zk.storeHoldRequest(request) + self.sched.zk_nodepool.storeHoldRequest(request) except Exception: # If we fail to update the request count, we won't consider it # a real autohold error by passing the exception up. It will @@ -219,7 +220,7 @@ class Nodepool(object): finally: # Although any exceptions thrown here are handled higher up in # _doBuildCompletedEvent, we always want to try to unlock it. - self.sched.zk.unlockHoldRequest(request) + self.sched.zk_nodepool.unlockHoldRequest(request) def useNodeSet(self, nodeset, build_set=None, event=None): self.log.info("Setting nodeset %s in use" % (nodeset,)) @@ -228,7 +229,7 @@ class Nodepool(object): if node.lock is None: raise Exception("Node %s is not locked" % (node,)) node.state = model.STATE_IN_USE - self.sched.zk.storeNode(node) + self.sched.zk_nodepool.storeNode(node) if node.resources: add_resources(resources, node.resources) if build_set and resources: @@ -275,7 +276,7 @@ class Nodepool(object): if node.resources: add_resources(resources, node.resources) node.state = model.STATE_USED - self.sched.zk.storeNode(node) + self.sched.zk_nodepool.storeNode(node) except Exception: log.exception("Exception storing node %s " "while unlocking:", node) @@ -303,7 +304,7 @@ class Nodepool(object): def _unlockNodes(self, nodes): for node in nodes: try: - self.sched.zk.unlockNode(node) + self.sched.zk_nodepool.unlockNode(node) except Exception: self.log.exception("Error unlocking node:") @@ -321,7 +322,7 @@ class Nodepool(object): raise Exception("Node %s allocated to %s, not %s" % (node.id, node.allocated_to, request_id)) self.log.debug("Locking node %s" % (node,)) - self.sched.zk.lockNode(node, timeout=30) + self.sched.zk_nodepool.lockNode(node, timeout=30) locked_nodes.append(node) except Exception: self.log.exception("Error locking nodes:") @@ -347,8 +348,8 @@ class Nodepool(object): if deleted: log.debug("Resubmitting lost node request %s", request) request.id = None - self.sched.zk.submitNodeRequest(request, self._updateNodeRequest) - + self.sched.zk_nodepool.submitNodeRequest(request, + self._updateNodeRequest) # Stop watching this request node return False elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED): @@ -397,13 +398,13 @@ class Nodepool(object): # processing it. Nodepool will automatically reallocate the assigned # nodes in that situation. try: - if not self.sched.zk.nodeRequestExists(request): + if not self.sched.zk_nodepool.nodeRequestExists(request): log.info("Request %s no longer exists, resubmitting", request.id) request.id = None request.state = model.STATE_REQUESTED self.requests[request.uid] = request - self.sched.zk.submitNodeRequest( + self.sched.zk_nodepool.submitNodeRequest( request, self._updateNodeRequest) return False except Exception: @@ -430,7 +431,7 @@ class Nodepool(object): # succeeded, delete the request. log.debug("Deleting node request %s", request) try: - self.sched.zk.deleteNodeRequest(request) + self.sched.zk_nodepool.deleteNodeRequest(request) except Exception: log.exception("Error deleting node request:") request.failed = True diff --git a/zuul/scheduler.py b/zuul/scheduler.py index ad2baa7cb..306e4af9f 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -42,6 +42,7 @@ from zuul.lib.statsd import get_statsd import zuul.lib.queue import zuul.lib.repl from zuul.model import Build, HoldRequest, Tenant, TriggerEvent +from zuul.zk.nodepool import ZooKeeperNodepool COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl'] @@ -419,8 +420,9 @@ class Scheduler(threading.Thread): def setNodepool(self, nodepool): self.nodepool = nodepool - def setZooKeeper(self, zk): - self.zk = zk + def setZooKeeper(self, zk_client): + self.zk_client = zk_client + self.zk_nodepool = ZooKeeperNodepool(zk_client) def runStats(self): while not self.stats_stop.wait(self._stats_interval): @@ -652,15 +654,15 @@ class Scheduler(threading.Thread): request.node_expiration = node_hold_expiration # No need to lock it since we are creating a new one. - self.zk.storeHoldRequest(request) + self.zk_nodepool.storeHoldRequest(request) def autohold_list(self): ''' Return current hold requests as a list of dicts. ''' data = [] - for request_id in self.zk.getHoldRequests(): - request = self.zk.getHoldRequest(request_id) + for request_id in self.zk_nodepool.getHoldRequests(): + request = self.zk_nodepool.getHoldRequest(request_id) if not request: continue data.append(request.toDict()) @@ -673,7 +675,7 @@ class Scheduler(threading.Thread): :param str hold_request_id: The unique ID of the request to delete. ''' try: - hold_request = self.zk.getHoldRequest(hold_request_id) + hold_request = self.zk_nodepool.getHoldRequest(hold_request_id) except Exception: self.log.exception( "Error retrieving autohold ID %s:", hold_request_id) @@ -689,8 +691,9 @@ class Scheduler(threading.Thread): :param str hold_request_id: The unique ID of the request to delete. ''' + hold_request = None try: - hold_request = self.zk.getHoldRequest(hold_request_id) + hold_request = self.zk_nodepool.getHoldRequest(hold_request_id) except Exception: self.log.exception( "Error retrieving autohold ID %s:", hold_request_id) @@ -702,7 +705,7 @@ class Scheduler(threading.Thread): self.log.debug("Removing autohold %s", hold_request) try: - self.zk.deleteHoldRequest(hold_request) + self.zk_nodepool.deleteHoldRequest(hold_request) except Exception: self.log.exception( "Error removing autohold request %s:", hold_request) @@ -1491,15 +1494,15 @@ class Scheduler(threading.Thread): return True try: - self.zk.lockHoldRequest(request) + self.zk_nodepool.lockHoldRequest(request) self.log.info("Removing expired hold request %s", request) - self.zk.deleteHoldRequest(request) + self.zk_nodepool.deleteHoldRequest(request) except Exception: self.log.exception( "Failed to delete expired hold request %s", request) finally: try: - self.zk.unlockHoldRequest(request) + self.zk_nodepool.unlockHoldRequest(request) except Exception: pass @@ -1537,8 +1540,8 @@ class Scheduler(threading.Thread): autohold = None scope = Scope.NONE self.log.debug("Checking build autohold key %s", autohold_key_base) - for request_id in self.zk.getHoldRequests(): - request = self.zk.getHoldRequest(request_id) + for request_id in self.zk_nodepool.getHoldRequests(): + request = self.zk_nodepool.getHoldRequest(request_id) if not request: continue diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index 3ae9c72a6..5ae23b736 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -28,13 +28,14 @@ import time import select import threading +from zuul import exceptions import zuul.lib.repl +from zuul.lib import commandsocket from zuul.lib.re2util import filter_allowed_disallowed import zuul.model -from zuul import exceptions import zuul.rpcclient -import zuul.zk -from zuul.lib import commandsocket +from zuul.zk import ZooKeeperClient +from zuul.zk.nodepool import ZooKeeperNodepool STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static') cherrypy.tools.websocket = WebSocketTool() @@ -227,7 +228,8 @@ class ZuulWebAPI(object): def __init__(self, zuulweb): self.rpc = zuulweb.rpc - self.zk = zuulweb.zk + self.zk_client = zuulweb.zk_client + self.zk_nodepool = ZooKeeperNodepool(self.zk_client) self.zuulweb = zuulweb self.cache = {} self.cache_time = {} @@ -853,7 +855,7 @@ class ZuulWebAPI(object): allowed_labels = data['allowed_labels'] disallowed_labels = data['disallowed_labels'] labels = set() - for launcher in self.zk.getRegisteredLaunchers(): + for launcher in self.zk_nodepool.getRegisteredLaunchers(): labels.update(filter_allowed_disallowed( launcher.supported_labels, allowed_labels, disallowed_labels)) @@ -867,7 +869,7 @@ class ZuulWebAPI(object): @cherrypy.tools.json_out(content_type='application/json; charset=utf-8') def nodes(self, tenant): ret = [] - for node in self.zk.nodeIterator(): + for node in self.zk_nodepool.nodeIterator(): node_data = {} for key in ("id", "type", "connection_type", "external_id", "provider", "state", "state_time", "comment"): @@ -1221,11 +1223,11 @@ class ZuulWeb(object): self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port, ssl_key, ssl_cert, ssl_ca, client_id='Zuul Web Server') - self.zk = zuul.zk.ZooKeeper(enable_cache=True) + self.zk_client = ZooKeeperClient() if zk_hosts: - self.zk.connect(hosts=zk_hosts, read_only=True, - timeout=zk_timeout, tls_cert=zk_tls_cert, - tls_key=zk_tls_key, tls_ca=zk_tls_ca) + self.zk_client.connect(hosts=zk_hosts, read_only=True, + timeout=zk_timeout, tls_cert=zk_tls_cert, + tls_key=zk_tls_key, tls_ca=zk_tls_ca) self.connections = connections self.authenticators = authenticators @@ -1382,7 +1384,7 @@ class ZuulWeb(object): cherrypy.server.httpserver = None self.wsplugin.unsubscribe() self.stream_manager.stop() - self.zk.disconnect() + self.zk_client.disconnect() self.stop_repl() self._command_running = False self.command_socket.stop() diff --git a/zuul/zk/__init__.py b/zuul/zk/__init__.py new file mode 100644 index 000000000..7642ffbd3 --- /dev/null +++ b/zuul/zk/__init__.py @@ -0,0 +1,153 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import logging +import time +from abc import ABCMeta +from typing import Optional, List, Callable + +from kazoo.client import KazooClient +from kazoo.handlers.threading import KazooTimeoutError +from kazoo.protocol.states import KazooState + +from zuul.zk.exceptions import NoClientException + + +class ZooKeeperClient(object): + log = logging.getLogger("zuul.zk.base.ZooKeeperClient") + + # Log zookeeper retry every 10 seconds + retry_log_rate = 10 + + def __init__(self): + """ + Initialize the ZooKeeper base client object. + """ + self.client: Optional[KazooClient] = None + self._last_retry_log: int = 0 + self.on_connect_listeners: List[Callable[[], None]] = [] + self.on_disconnect_listeners: List[Callable[[], None]] = [] + + def _connectionListener(self, state): + """ + Listener method for Kazoo connection state changes. + + .. warning:: This method must not block. + """ + if state == KazooState.LOST: + self.log.debug("ZooKeeper connection: LOST") + elif state == KazooState.SUSPENDED: + self.log.debug("ZooKeeper connection: SUSPENDED") + else: + self.log.debug("ZooKeeper connection: CONNECTED") + + @property + def connected(self): + return self.client and self.client.state == KazooState.CONNECTED + + @property + def suspended(self): + return self.client and self.client.state == KazooState.SUSPENDED + + @property + def lost(self): + return not self.client or self.client.state == KazooState.LOST + + def logConnectionRetryEvent(self): + now = time.monotonic() + if now - self._last_retry_log >= self.retry_log_rate: + self.log.warning("Retrying zookeeper connection") + self._last_retry_log = now + + def connect(self, hosts: str, read_only: bool = False, + timeout: float = 10.0, tls_cert: Optional[str] = None, + tls_key: Optional[str] = None, + tls_ca: Optional[str] = None): + """ + Establish a connection with ZooKeeper cluster. + + Convenience method if a pre-existing ZooKeeper connection is not + supplied to the ZooKeeper object at instantiation time. + + :param str hosts: Comma-separated list of hosts to connect to (e.g. + 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183). + :param bool read_only: If True, establishes a read-only connection. + :param float timeout: The ZooKeeper session timeout, in + seconds (default: 10.0). + :param str tls_key: Path to TLS key + :param str tls_cert: Path to TLS cert + :param str tls_ca: Path to TLS CA cert + """ + if self.client is None: + args = dict(hosts=hosts, read_only=read_only, timeout=timeout) + if tls_key: + args['use_ssl'] = True + args['keyfile'] = tls_key + args['certfile'] = tls_cert + args['ca'] = tls_ca + self.client = KazooClient(**args) + self.client.add_listener(self._connectionListener) + # Manually retry initial connection attempt + while True: + try: + self.client.start(1) + break + except KazooTimeoutError: + self.logConnectionRetryEvent() + + for listener in self.on_connect_listeners: + listener() + + def disconnect(self): + """ + Close the ZooKeeper cluster connection. + + You should call this method if you used connect() to establish a + cluster connection. + """ + for listener in self.on_disconnect_listeners: + listener() + + if self.client is not None and self.client.connected: + self.client.stop() + self.client.close() + self.client = None + + def resetHosts(self, hosts): + """ + Reset the ZooKeeper cluster connection host list. + + :param str hosts: Comma-separated list of hosts to connect to (e.g. + 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183). + """ + if self.client is not None: + self.client.set_hosts(hosts=hosts) + + +class ZooKeeperBase(metaclass=ABCMeta): + """Base class for components that need to interact with Zookeeper.""" + + def __init__(self, client: ZooKeeperClient): + self.client = client + self.client.on_connect_listeners.append(self._onConnect) + self.client.on_disconnect_listeners.append(self._onDisconnect) + + @property + def kazoo_client(self) -> KazooClient: + if not self.client.client: + raise NoClientException() + return self.client.client + + def _onConnect(self): + pass + + def _onDisconnect(self): + pass diff --git a/zuul/zk/exceptions.py b/zuul/zk/exceptions.py new file mode 100644 index 000000000..5237dea2a --- /dev/null +++ b/zuul/zk/exceptions.py @@ -0,0 +1,27 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from kazoo.exceptions import KazooException + + +class ZuulZooKeeperException(KazooException): + """Base exception class for all custom ZK exceptions""" + pass + + +class LockException(ZuulZooKeeperException): + pass + + +class NoClientException(ZuulZooKeeperException): + + def __init__(self): + super().__init__("No zookeeper client!") diff --git a/zuul/zk.py b/zuul/zk/nodepool.py index 980249526..f28faf463 100644 --- a/zuul/zk.py +++ b/zuul/zk/nodepool.py @@ -13,235 +13,348 @@ import json import logging import time +from typing import Dict, Optional, List -from kazoo.client import KazooClient, KazooState -from kazoo import exceptions as kze -from kazoo.handlers.threading import KazooTimeoutError -from kazoo.recipe.cache import TreeCache, TreeEvent +from kazoo.exceptions import NoNodeError, LockTimeout +from kazoo.recipe.cache import TreeCache +from kazoo.recipe.cache import TreeEvent from kazoo.recipe.lock import Lock import zuul.model +from zuul.model import HoldRequest +from zuul.zk import ZooKeeperClient, ZooKeeperBase +from zuul.zk.exceptions import LockException -class LockException(Exception): - pass - - -class ZooKeeper(object): - ''' - Class implementing the ZooKeeper interface. - - This class uses the facade design pattern to keep common interaction - with the ZooKeeper API simple and consistent for the caller, and - limits coupling between objects. It allows for more complex interactions - by providing direct access to the client connection when needed (though - that is discouraged). It also provides for a convenient entry point for - testing only ZooKeeper interactions. - ''' - - log = logging.getLogger("zuul.zk.ZooKeeper") - +class ZooKeeperNodepool(ZooKeeperBase): + """ + Class implementing Nodepool related ZooKeeper interface. + """ + NODES_ROOT = "/nodepool/nodes" + LAUNCHER_ROOT = "/nodepool/launchers" REQUEST_ROOT = '/nodepool/requests' REQUEST_LOCK_ROOT = "/nodepool/requests-lock" - NODE_ROOT = '/nodepool/nodes' HOLD_REQUEST_ROOT = '/zuul/hold-requests' - # Log zookeeper retry every 10 seconds - retry_log_rate = 10 - - def __init__(self, enable_cache=True): - ''' - Initialize the ZooKeeper object. - - :param bool enable_cache: When True, enables caching of ZooKeeper - objects (e.g., HoldRequests). - ''' - self.client = None - self._became_lost = False - self._last_retry_log = 0 - self.enable_cache = enable_cache + log = logging.getLogger("zuul.zk.nodepool.ZooKeeperNodepool") + def __init__(self, client: ZooKeeperClient, enable_cache: bool = True): + super().__init__(client) + self.enable_cache = enable_cache # type: bool # The caching model we use is designed around handing out model # data as objects. To do this, we use two caches: one is a TreeCache # which contains raw znode data (among other details), and one for # storing that data serialized as objects. This allows us to return # objects from the APIs, and avoids calling the methods to serialize # the data into objects more than once. - self._hold_request_tree = None - self._cached_hold_requests = {} - - def _dictToStr(self, data): - return json.dumps(data).encode('utf8') - - def _strToDict(self, data): - return json.loads(data.decode('utf8')) - - def _connection_listener(self, state): - ''' - Listener method for Kazoo connection state changes. - - .. warning:: This method must not block. - ''' - if state == KazooState.LOST: - self.log.debug("ZooKeeper connection: LOST") - self._became_lost = True - elif state == KazooState.SUSPENDED: - self.log.debug("ZooKeeper connection: SUSPENDED") + self._hold_request_tree: Optional[TreeCache] = None + self._cached_hold_requests: Optional[Dict[str, HoldRequest]] = {} + if self.client.connected: + self._onConnect() + + def _onConnect(self): + if self.enable_cache: + self._hold_request_tree = TreeCache(self.kazoo_client, + self.HOLD_REQUEST_ROOT) + self._hold_request_tree.listen_fault(self._cacheFaultListener) + self._hold_request_tree.listen(self._holdRequestCacheListener) + self._hold_request_tree.start() + + def _onDisconnect(self): + if self._hold_request_tree is not None: + self._hold_request_tree.close() + self._hold_request_tree = None + + def _launcherPath(self, launcher): + return "%s/%s" % (self.LAUNCHER_ROOT, launcher) + + def _nodePath(self, node): + return "%s/%s" % (self.NODES_ROOT, node) + + def _cacheFaultListener(self, e): + self.log.exception(e) + + def getRegisteredLaunchers(self): + """ + Get a list of all launchers that have registered with ZooKeeper. + + :returns: A list of Launcher objects, or empty list if none are found. + """ + try: + launcher_ids = self.kazoo_client\ + .get_children(self.LAUNCHER_ROOT) + except NoNodeError: + return [] + + objs = [] + for launcher in launcher_ids: + path = self._launcherPath(launcher) + try: + data, _ = self.kazoo_client.get(path) + except NoNodeError: + # launcher disappeared + continue + + objs.append(Launcher.fromDict(json.loads(data.decode('utf8')))) + return objs + + def getNodes(self): + """ + Get the current list of all nodes. + + :returns: A list of nodes. + """ + try: + return self.kazoo_client.get_children(self.NODES_ROOT) + except NoNodeError: + return [] + + def _getNode(self, node): + """ + Get the data for a specific node. + + :param str node: The node ID. + + :returns: The node data, or None if the node was not found. + """ + path = self._nodePath(node) + try: + data, stat = self.kazoo_client.get(path) + except NoNodeError: + return None + if not data: + return None + + d = json.loads(data.decode('utf8')) + d['id'] = node + return d + + def nodeIterator(self): + """ + Utility generator method for iterating through all nodes. + """ + for node_id in self.getNodes(): + node = self._getNode(node_id) + if node: + yield node + + def getHoldRequests(self): + """ + Get the current list of all hold requests. + """ + + try: + return sorted(self.kazoo_client + .get_children(self.HOLD_REQUEST_ROOT)) + except NoNodeError: + return [] + + def getHoldRequest(self, hold_request_id): + path = self.HOLD_REQUEST_ROOT + "/" + hold_request_id + try: + data, stat = self.kazoo_client.get(path) + except NoNodeError: + return None + if not data: + return None + + obj = HoldRequest.fromDict(json.loads(data.decode('utf8'))) + obj.id = hold_request_id + obj.stat = stat + return obj + + def storeHoldRequest(self, request: HoldRequest): + """ + Create or update a hold request. + + If this is a new request with no value for the `id` attribute of the + passed in request, then `id` will be set with the unique request + identifier after successful creation. + + :param HoldRequest request: Object representing the hold request. + """ + if request.id is None: + path = self.kazoo_client.create( + self.HOLD_REQUEST_ROOT + "/", + value=request.serialize(), + sequence=True, + makepath=True) + request.id = path.split('/')[-1] else: - self.log.debug("ZooKeeper connection: CONNECTED") + path = self.HOLD_REQUEST_ROOT + "/" + request.id + self.kazoo_client.set(path, request.serialize()) - @property - def connected(self): - return self.client.state == KazooState.CONNECTED + def _markHeldNodesAsUsed(self, request: HoldRequest): + """ + Changes the state for each held node for the hold request to 'used'. - @property - def suspended(self): - return self.client.state == KazooState.SUSPENDED + :returns: True if all nodes marked USED, False otherwise. + """ + def getHeldNodeIDs(req: HoldRequest) -> List[str]: + node_ids: List[str] = [] + for data in req.nodes: + # TODO(Shrews): Remove type check at some point. + # When autoholds were initially changed to be stored in ZK, + # the node IDs were originally stored as a list of strings. + # A later change embedded them within a dict. Handle both + # cases here to deal with the upgrade. + if isinstance(data, dict): + node_ids += data['nodes'] + else: + node_ids.append(data) + return node_ids - @property - def lost(self): - return self.client.state == KazooState.LOST + failure = False + for node_id in getHeldNodeIDs(request): + node = self._getNode(node_id) + if not node or node['state'] == zuul.model.STATE_USED: + continue - @property - def didLoseConnection(self): - return self._became_lost - - def resetLostFlag(self): - self._became_lost = False - - def logConnectionRetryEvent(self): - now = time.monotonic() - if now - self._last_retry_log >= self.retry_log_rate: - self.log.warning("Retrying zookeeper connection") - self._last_retry_log = now - - def connect(self, hosts, read_only=False, timeout=10.0, - tls_cert=None, tls_key=None, tls_ca=None): - ''' - Establish a connection with ZooKeeper cluster. - - Convenience method if a pre-existing ZooKeeper connection is not - supplied to the ZooKeeper object at instantiation time. - - :param str hosts: Comma-separated list of hosts to connect to (e.g. - 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183). - :param bool read_only: If True, establishes a read-only connection. - :param float timeout: The ZooKeeper session timeout, in - seconds (default: 10.0). - :param str tls_key: Path to TLS key - :param str tls_cert: Path to TLS cert - :param str tls_ca: Path to TLS CA cert - ''' - - if self.client is None: - args = dict(hosts=hosts, - read_only=read_only, - timeout=timeout, - ) - if tls_key: - args['use_ssl'] = True - args['keyfile'] = tls_key - args['certfile'] = tls_cert - args['ca'] = tls_ca - self.client = KazooClient(**args) - self.client.add_listener(self._connection_listener) - # Manually retry initial connection attempt - while True: + node['state'] = zuul.model.STATE_USED + + name = None + label = None + if 'name' in node: + name = node['name'] + if 'label' in node: + label = node['label'] + + node_obj = zuul.model.Node(name, label) + node_obj.updateFromDict(node) + + try: + self.lockNode(node_obj, blocking=False) + self.storeNode(node_obj) + except Exception: + self.log.exception("Cannot change HELD node state to USED " + "for node %s in request %s", + node_obj.id, request.id) + failure = True + finally: try: - self.client.start(1) - break - except KazooTimeoutError: - self.logConnectionRetryEvent() + if node_obj.lock: + self.unlockNode(node_obj) + except Exception: + self.log.exception( + "Failed to unlock HELD node %s for request %s", + node_obj.id, request.id) - if self.enable_cache: - self._hold_request_tree = TreeCache(self.client, - self.HOLD_REQUEST_ROOT) - self._hold_request_tree.listen_fault(self.cacheFaultListener) - self._hold_request_tree.listen(self.holdRequestCacheListener) - self._hold_request_tree.start() + return not failure - def cacheFaultListener(self, e): - self.log.exception(e) + def deleteHoldRequest(self, request: HoldRequest): + """ + Delete a hold request. - def holdRequestCacheListener(self, event): - ''' - Keep the hold request object cache in sync with the TreeCache. - ''' + :param HoldRequest request: Object representing the hold request. + """ + if not self._markHeldNodesAsUsed(request): + self.log.info("Unable to delete hold request %s because " + "not all nodes marked as USED.", request.id) + return + + path = self.HOLD_REQUEST_ROOT + "/" + request.id try: - self._holdRequestCacheListener(event) - except Exception: - self.log.exception( - "Exception in hold request cache update for event: %s", event) + self.kazoo_client.delete(path, recursive=True) + except NoNodeError: + pass - def _holdRequestCacheListener(self, event): - if hasattr(event.event_data, 'path'): - # Ignore root node - path = event.event_data.path - if path == self.HOLD_REQUEST_ROOT: - return + def lockHoldRequest(self, request: HoldRequest, + blocking: bool = True, timeout: Optional[int] = None): + """ + Lock a node request. - if event.event_type not in (TreeEvent.NODE_ADDED, - TreeEvent.NODE_UPDATED, - TreeEvent.NODE_REMOVED): - return + This will set the `lock` attribute of the request object when the + lock is successfully acquired. - path = event.event_data.path - request_id = path.rsplit('/', 1)[1] + :param request: The hold request to lock. + :param blocking: Block until lock is obtained or return immediately. + :param timeout: Don't wait forever to acquire the lock. + """ + if not request.id: + raise LockException( + "Hold request without an ID cannot be locked: %s" % request) - if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED): - # Requests with no data are invalid - if not event.event_data.data: - return + path = "%s/%s/lock" % (self.HOLD_REQUEST_ROOT, request.id) + try: + lock = Lock(self.kazoo_client, path) + have_lock = lock.acquire(blocking, timeout) + except LockTimeout: + raise LockException("Timeout trying to acquire lock %s" % path) + + # If we aren't blocking, it's possible we didn't get the lock + # because someone else has it. + if not have_lock: + raise LockException("Did not get lock on %s" % path) + + request.lock = lock - # Perform an in-place update of the already cached request - d = self._bytesToDict(event.event_data.data) - old_request = self._cached_hold_requests.get(request_id) - if old_request: - if event.event_data.stat.version <= old_request.stat.version: - # Don't update to older data + def unlockHoldRequest(self, request: HoldRequest): + """ + Unlock a hold request. + + The request must already have been locked. + + :param HoldRequest request: The request to unlock. + + :raises: ZKLockException if the request is not currently locked. + """ + if request.lock is None: + raise LockException("Request %s does not hold a lock" % request) + request.lock.release() + request.lock = None + + def _holdRequestCacheListener(self, event): + """ + Keep the hold request object cache in sync with the TreeCache. + """ + try: + if hasattr(event.event_data, 'path'): + # Ignore root node + path = event.event_data.path + if path == self.HOLD_REQUEST_ROOT: return - old_request.updateFromDict(d) - old_request.stat = event.event_data.stat - else: - request = zuul.model.HoldRequest.fromDict(d) - request.id = request_id - request.stat = event.event_data.stat - self._cached_hold_requests[request_id] = request - - elif event.event_type == TreeEvent.NODE_REMOVED: - try: - del self._cached_hold_requests[request_id] - except KeyError: - pass - def disconnect(self): - ''' - Close the ZooKeeper cluster connection. + if event.event_type not in (TreeEvent.NODE_ADDED, + TreeEvent.NODE_UPDATED, + TreeEvent.NODE_REMOVED): + return - You should call this method if you used connect() to establish a - cluster connection. - ''' - if self._hold_request_tree is not None: - self._hold_request_tree.close() - self._hold_request_tree = None + path = event.event_data.path + request_id = path.rsplit('/', 1)[1] - if self.client is not None and self.client.connected: - self.client.stop() - self.client.close() - self.client = None + if event.event_type in ( + TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED): + # Requests with no data are invalid + if not event.event_data.data: + return - def resetHosts(self, hosts): - ''' - Reset the ZooKeeper cluster connection host list. + # Perform an in-place update of the already cached request + d = json.loads(event.event_data.data.decode('utf8')) + old_request = self._cached_hold_requests.get(request_id) + if old_request: + if event.event_data.stat.version <= old_request.stat\ + .version: + # Don't update to older data + return + old_request.updateFromDict(d) + old_request.stat = event.event_data.stat + else: + request = HoldRequest.fromDict(d) + request.id = request_id + request.stat = event.event_data.stat + self._cached_hold_requests[request_id] = request - :param str hosts: Comma-separated list of hosts to connect to (e.g. - 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183). - ''' - if self.client is not None: - self.client.set_hosts(hosts=hosts) + elif event.event_type == TreeEvent.NODE_REMOVED: + try: + del self._cached_hold_requests[request_id] + except KeyError: + pass + except Exception: + self.log.exception( + "Exception in hold request cache update for event: %s", event) def submitNodeRequest(self, node_request, watcher): - ''' + """ Submit a request for nodes to Nodepool. :param NodeRequest node_request: A NodeRequest with the @@ -255,105 +368,106 @@ class ZooKeeper(object): longer exists (notably, this will happen on disconnection from ZooKeeper). The watcher should return False when further updates are no longer necessary. - ''' + """ node_request.created_time = time.time() data = node_request.toDict() path = '{}/{:0>3}-'.format(self.REQUEST_ROOT, node_request.priority) - path = self.client.create(path, self._dictToStr(data), - makepath=True, - sequence=True, ephemeral=True) + path = self.kazoo_client.create(path, json.dumps(data).encode('utf8'), + makepath=True, sequence=True, + ephemeral=True) reqid = path.split("/")[-1] node_request.id = reqid - def callback(data, stat): - if data: - self.updateNodeRequest(node_request, data) - deleted = (data is None) # data *are* none + def callback(value, _): + if value: + self.updateNodeRequest(node_request, value) + deleted = (value is None) # data *are* none return watcher(node_request, deleted) - self.client.DataWatch(path, callback) + self.kazoo_client.DataWatch(path, callback) def deleteNodeRequest(self, node_request): - ''' + """ Delete a request for nodes. :param NodeRequest node_request: A NodeRequest with the contents of the request. - ''' - + """ path = '%s/%s' % (self.REQUEST_ROOT, node_request.id) try: - self.client.delete(path) - except kze.NoNodeError: + self.kazoo_client.delete(path) + except NoNodeError: pass def nodeRequestExists(self, node_request): - ''' + """ See if a NodeRequest exists in ZooKeeper. :param NodeRequest node_request: A NodeRequest to verify. :returns: True if the request exists, False otherwise. - ''' + """ path = '%s/%s' % (self.REQUEST_ROOT, node_request.id) - if self.client.exists(path): + if self.kazoo_client.exists(path): return True return False def storeNodeRequest(self, node_request): - '''Store the node request. + """ + Store the node request. The request is expected to already exist and is updated in its entirety. :param NodeRequest node_request: The request to update. - ''' - + """ path = '%s/%s' % (self.REQUEST_ROOT, node_request.id) - self.client.set(path, self._dictToStr(node_request.toDict())) + self.kazoo_client.set( + path, json.dumps(node_request.toDict()).encode('utf8')) def updateNodeRequest(self, node_request, data=None): - '''Refresh an existing node request. + """ + Refresh an existing node request. :param NodeRequest node_request: The request to update. :param dict data: The data to use; query ZK if absent. - ''' + """ if data is None: path = '%s/%s' % (self.REQUEST_ROOT, node_request.id) - data, stat = self.client.get(path) - data = self._strToDict(data) + data, stat = self.kazoo_client.get(path) + data = json.loads(data.decode('utf8')) request_nodes = list(node_request.nodeset.getNodes()) for i, nodeid in enumerate(data.get('nodes', [])): request_nodes[i].id = nodeid - self.updateNode(request_nodes[i]) + self._updateNode(request_nodes[i]) node_request.updateFromDict(data) def storeNode(self, node): - '''Store the node. + """ + Store the node. The node is expected to already exist and is updated in its entirety. :param Node node: The node to update. - ''' - - path = '%s/%s' % (self.NODE_ROOT, node.id) - self.client.set(path, self._dictToStr(node.toDict())) + """ + path = '%s/%s' % (self.NODES_ROOT, node.id) + self.kazoo_client.set(path, json.dumps(node.toDict()).encode('utf8')) - def updateNode(self, node): - '''Refresh an existing node. + def _updateNode(self, node): + """ + Refresh an existing node. :param Node node: The node to update. - ''' - - node_path = '%s/%s' % (self.NODE_ROOT, node.id) - node_data, node_stat = self.client.get(node_path) - node_data = self._strToDict(node_data) + """ + node_path = '%s/%s' % (self.NODES_ROOT, node.id) + node_data, node_stat = self.kazoo_client.get(node_path) + node_data = json.loads(node_data.decode('utf8')) node.updateFromDict(node_data) def lockNode(self, node, blocking=True, timeout=None): - ''' + """ Lock a node. This should be called as soon as a request is fulfilled and @@ -362,13 +476,12 @@ class ZooKeeper(object): node should be reclaimed. :param Node node: The node which should be locked. - ''' - - lock_path = '%s/%s/lock' % (self.NODE_ROOT, node.id) + """ + lock_path = '%s/%s/lock' % (self.NODES_ROOT, node.id) try: - lock = Lock(self.client, lock_path) + lock = Lock(self.kazoo_client, lock_path) have_lock = lock.acquire(blocking, timeout) - except kze.LockTimeout: + except LockTimeout: raise LockException( "Timeout trying to acquire lock %s" % lock_path) @@ -380,13 +493,13 @@ class ZooKeeper(object): node.lock = lock def unlockNode(self, node): - ''' + """ Unlock a node. The node must already have been locked. :param Node node: The node which should be unlocked. - ''' + """ if node.lock is None: raise LockException("Node %s does not hold a lock" % (node,)) @@ -394,7 +507,7 @@ class ZooKeeper(object): node.lock = None def lockNodeRequest(self, request, blocking=True, timeout=None): - ''' + """ Lock a node request. This will set the `lock` attribute of the request object when the @@ -409,16 +522,15 @@ class ZooKeeper(object): :raises: TimeoutException if we failed to acquire the lock when blocking with a timeout. ZKLockException if we are not blocking and could not get the lock, or a lock is already held. - ''' - + """ path = "%s/%s" % (self.REQUEST_LOCK_ROOT, request.id) + lock = Lock(self.kazoo_client, path) try: - lock = Lock(self.client, path) have_lock = lock.acquire(blocking, timeout) - except kze.LockTimeout: + except LockTimeout: raise LockException( "Timeout trying to acquire lock %s" % path) - except kze.NoNodeError: + except NoNodeError: have_lock = False self.log.error("Request not found for locking: %s", request) @@ -431,7 +543,7 @@ class ZooKeeper(object): self.updateNodeRequest(request) def unlockNodeRequest(self, request): - ''' + """ Unlock a node request. The request must already have been locked. @@ -439,7 +551,7 @@ class ZooKeeper(object): :param NodeRequest request: The request to unlock. :raises: ZKLockException if the request is not currently locked. - ''' + """ if request.lock is None: raise LockException( "Request %s does not hold a lock" % request) @@ -447,278 +559,40 @@ class ZooKeeper(object): request.lock = None def heldNodeCount(self, autohold_key): - ''' + """ Count the number of nodes being held for the given tenant/project/job. :param set autohold_key: A set with the tenant/project/job names. - ''' + """ identifier = " ".join(autohold_key) try: - nodes = self.client.get_children(self.NODE_ROOT) - except kze.NoNodeError: + nodes = self.kazoo_client.get_children(self.NODES_ROOT) + except NoNodeError: return 0 count = 0 for nodeid in nodes: - node_path = '%s/%s' % (self.NODE_ROOT, nodeid) + node_path = '%s/%s' % (self.NODES_ROOT, nodeid) try: - node_data, node_stat = self.client.get(node_path) - except kze.NoNodeError: + node_data, node_stat = self.kazoo_client.get(node_path) + except NoNodeError: # Node got removed on us. Just ignore. continue if not node_data: self.log.warning("Node ID %s has no data", nodeid) continue - node_data = self._strToDict(node_data) + node_data = json.loads(node_data.decode('utf8')) if (node_data['state'] == zuul.model.STATE_HOLD and node_data.get('hold_job') == identifier): count += 1 return count - # Copy of nodepool/zk.py begins here - NODE_ROOT = "/nodepool/nodes" - LAUNCHER_ROOT = "/nodepool/launchers" - - def _bytesToDict(self, data): - return json.loads(data.decode('utf8')) - - def _launcherPath(self, launcher): - return "%s/%s" % (self.LAUNCHER_ROOT, launcher) - - def _nodePath(self, node): - return "%s/%s" % (self.NODE_ROOT, node) - - def getRegisteredLaunchers(self): - ''' - Get a list of all launchers that have registered with ZooKeeper. - - :returns: A list of Launcher objects, or empty list if none are found. - ''' - try: - launcher_ids = self.client.get_children(self.LAUNCHER_ROOT) - except kze.NoNodeError: - return [] - - objs = [] - for launcher in launcher_ids: - path = self._launcherPath(launcher) - try: - data, _ = self.client.get(path) - except kze.NoNodeError: - # launcher disappeared - continue - - objs.append(Launcher.fromDict(self._bytesToDict(data))) - return objs - - def getNodes(self): - ''' - Get the current list of all nodes. - - :returns: A list of nodes. - ''' - try: - return self.client.get_children(self.NODE_ROOT) - except kze.NoNodeError: - return [] - - def getNode(self, node): - ''' - Get the data for a specific node. - - :param str node: The node ID. - - :returns: The node data, or None if the node was not found. - ''' - path = self._nodePath(node) - try: - data, stat = self.client.get(path) - except kze.NoNodeError: - return None - if not data: - return None - - d = self._bytesToDict(data) - d['id'] = node - return d - - def nodeIterator(self): - ''' - Utility generator method for iterating through all nodes. - ''' - for node_id in self.getNodes(): - node = self.getNode(node_id) - if node: - yield node - - def getHoldRequests(self): - ''' - Get the current list of all hold requests. - ''' - try: - return sorted(self.client.get_children(self.HOLD_REQUEST_ROOT)) - except kze.NoNodeError: - return [] - - def getHoldRequest(self, hold_request_id): - path = self.HOLD_REQUEST_ROOT + "/" + hold_request_id - try: - data, stat = self.client.get(path) - except kze.NoNodeError: - return None - if not data: - return None - - obj = zuul.model.HoldRequest.fromDict(self._strToDict(data)) - obj.id = hold_request_id - obj.stat = stat - return obj - - def storeHoldRequest(self, hold_request): - ''' - Create or update a hold request. - - If this is a new request with no value for the `id` attribute of the - passed in request, then `id` will be set with the unique request - identifier after successful creation. - - :param HoldRequest hold_request: Object representing the hold request. - ''' - if hold_request.id is None: - path = self.client.create( - self.HOLD_REQUEST_ROOT + "/", - value=hold_request.serialize(), - sequence=True, - makepath=True) - hold_request.id = path.split('/')[-1] - else: - path = self.HOLD_REQUEST_ROOT + "/" + hold_request.id - self.client.set(path, hold_request.serialize()) - - def _markHeldNodesAsUsed(self, hold_request): - ''' - Changes the state for each held node for the hold request to 'used'. - - :returns: True if all nodes marked USED, False otherwise. - ''' - def getHeldNodeIDs(request): - node_ids = [] - for data in request.nodes: - # TODO(Shrews): Remove type check at some point. - # When autoholds were initially changed to be stored in ZK, - # the node IDs were originally stored as a list of strings. - # A later change embedded them within a dict. Handle both - # cases here to deal with the upgrade. - if isinstance(data, dict): - node_ids += data['nodes'] - else: - node_ids.append(data) - return node_ids - - failure = False - for node_id in getHeldNodeIDs(hold_request): - node = self.getNode(node_id) - if not node or node['state'] == zuul.model.STATE_USED: - continue - - node['state'] = zuul.model.STATE_USED - - name = None - label = None - if 'name' in node: - name = node['name'] - if 'label' in node: - label = node['label'] - - node_obj = zuul.model.Node(name, label) - node_obj.updateFromDict(node) - - try: - self.lockNode(node_obj, blocking=False) - self.storeNode(node_obj) - except Exception: - self.log.exception("Cannot change HELD node state to USED " - "for node %s in request %s", - node_obj.id, hold_request.id) - failure = True - finally: - try: - if node_obj.lock: - self.unlockNode(node_obj) - except Exception: - self.log.exception( - "Failed to unlock HELD node %s for request %s", - node_obj.id, hold_request.id) - - return not failure - - def deleteHoldRequest(self, hold_request): - ''' - Delete a hold request. - - :param HoldRequest hold_request: Object representing the hold request. - ''' - if not self._markHeldNodesAsUsed(hold_request): - self.log.info("Unable to delete hold request %s because " - "not all nodes marked as USED.", hold_request.id) - return - - path = self.HOLD_REQUEST_ROOT + "/" + hold_request.id - try: - self.client.delete(path, recursive=True) - except kze.NoNodeError: - pass - - def lockHoldRequest(self, request, blocking=True, timeout=None): - ''' - Lock a node request. - - This will set the `lock` attribute of the request object when the - lock is successfully acquired. - - :param HoldRequest request: The hold request to lock. - ''' - if not request.id: - raise LockException( - "Hold request without an ID cannot be locked: %s" % request) - - path = "%s/%s/lock" % (self.HOLD_REQUEST_ROOT, request.id) - try: - lock = Lock(self.client, path) - have_lock = lock.acquire(blocking, timeout) - except kze.LockTimeout: - raise LockException( - "Timeout trying to acquire lock %s" % path) - - # If we aren't blocking, it's possible we didn't get the lock - # because someone else has it. - if not have_lock: - raise LockException("Did not get lock on %s" % path) - - request.lock = lock - - def unlockHoldRequest(self, request): - ''' - Unlock a hold request. - - The request must already have been locked. - - :param HoldRequest request: The request to unlock. - - :raises: ZKLockException if the request is not currently locked. - ''' - if request.lock is None: - raise LockException( - "Request %s does not hold a lock" % request) - request.lock.release() - request.lock = None - -class Launcher(): - ''' +class Launcher: + """ Class to describe a nodepool launcher. - ''' + """ def __init__(self): self.id = None |