summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFelix Edel <felix.edel@bmw.de>2021-07-05 13:58:01 +0200
committerJames E. Blair <jim@acmegating.com>2021-07-13 14:57:01 -0700
commit8f8f4f18987719b7c0a3f7c86ebf8e545d977ff9 (patch)
tree5fccf00f4177be35e3009f7d628d57b535febd52
parent95ee8e81506f286218464567ca947c5ecfb7fedc (diff)
downloadzuul-8f8f4f18987719b7c0a3f7c86ebf8e545d977ff9.tar.gz
Switch to ZooKeeper backed NodesProvisionedEvents
This puts the NodesProvisionedEvents into ZooKeeper. With this, all result events are now in ZooKeeper. To make the NodesProvisionedEvent serializable, we cannot store the whole NodeRequest object in the event anymore. Instead we are using the request.id, job name and the buildset UUID to look up the corresponding NodeRequest object from the buildset when the NodesProvisionedEvent is handled. As a result of this we have to find a way to return the NodeSet even in case the NodeRequest or the buildset cannot be found anymore (e.g. due to a gate reset). In this case we look up the NodeRequest directly from ZooKeeper and provide a faked NodeSet which allows us to retrieve the node information from Zookeeper (via the update mechanism of the NodeRequest in the Nodepool client). Finally, we can get rid of the local result event queue in the scheduler as now all result events are in ZooKeeper. Together with that the `zuul.scheduler.eventqueues.result` gauge was also removed. Change-Id: Ib5e0f13d25a21ebad908d38f0201e92b704a1c85
-rw-r--r--doc/source/reference/monitoring.rst5
-rw-r--r--releasenotes/notes/remove-global-result-event-queue-f00bf6e4dab2720a.yaml3
-rw-r--r--tests/base.py1
-rw-r--r--tests/unit/test_nodepool.py64
-rw-r--r--web/public/openapi.yaml3
-rw-r--r--web/src/pages/Status.jsx5
-rw-r--r--zuul/manager/__init__.py5
-rw-r--r--zuul/model.py19
-rw-r--r--zuul/scheduler.py82
-rw-r--r--zuul/zk/nodepool.py32
10 files changed, 139 insertions, 80 deletions
diff --git a/doc/source/reference/monitoring.rst b/doc/source/reference/monitoring.rst
index e19b7ce76..c4514718c 100644
--- a/doc/source/reference/monitoring.rst
+++ b/doc/source/reference/monitoring.rst
@@ -474,11 +474,6 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
Holds metrics about the event queue lengths in the Zuul scheduler.
- .. stat:: result
- :type: gauge
-
- The size of the current result event queue.
-
.. stat:: management
:type: gauge
diff --git a/releasenotes/notes/remove-global-result-event-queue-f00bf6e4dab2720a.yaml b/releasenotes/notes/remove-global-result-event-queue-f00bf6e4dab2720a.yaml
new file mode 100644
index 000000000..425527c15
--- /dev/null
+++ b/releasenotes/notes/remove-global-result-event-queue-f00bf6e4dab2720a.yaml
@@ -0,0 +1,3 @@
+---
+upgrade:
+ - The `zuul.scheduler.eventqueues.result` gauge was removed
diff --git a/tests/base.py b/tests/base.py
index 5d39725aa..e0e932894 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -4176,7 +4176,6 @@ class SchedulerTestApp:
# longer use global management events.
self.event_queues = [
self.sched.reconfigure_event_queue,
- self.sched.result_event_queue,
]
def start(self, validate_tenants: list):
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index f08e47fdd..cb73cf282 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -202,3 +202,67 @@ class TestNodepool(BaseTestCase):
self.assertEqual(request1.state, 'fulfilled')
self.assertEqual(request2.state, 'fulfilled')
self.assertTrue(request2.state_time < request1.state_time)
+
+ def test_get_node_request_with_nodeset(self):
+ # Test that we are able to deserialize a node request from ZK and
+ # update the node information while providing a valid NodeSet.
+ nodeset = model.NodeSet()
+ nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
+ nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
+ job = model.Job('testjob')
+ job.nodeset = nodeset
+
+ request = self.nodepool.requestNodes(
+ "test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
+ self.waitForRequests()
+ self.assertEqual(len(self.provisioned_requests), 1)
+ self.assertEqual(request.state, 'fulfilled')
+
+ # Look up the node request from ZooKeeper while providing the original
+ # nodeset.
+ restored_request = self.zk_nodepool.getNodeRequest(request.id, nodeset)
+
+ # As we've provided the origial nodeset when retrieving the node
+ # request from ZooKeeper, they should be the same
+ self.assertEqual(restored_request.nodeset, nodeset)
+
+ # And the nodeset should contain the original data
+ restored_nodes = restored_request.nodeset.getNodes()
+ self.assertEqual(restored_nodes[0].name, ['controller', 'foo'])
+ self.assertEqual(restored_nodes[1].name, ['compute'])
+
+ def test_get_node_request_without_nodeset(self):
+ # Test that we are able to deserialize a node request from ZK and
+ # update the node information without providing a NodeSet object.
+
+ # This is used in case something went wrong when processing the
+ # NodesProvisionedEvents in the scheduler and the original NodeRequest
+ # and/or NodeSet objects are not available anymore.
+ nodeset = model.NodeSet()
+ nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
+ nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
+ job = model.Job('testjob')
+ job.nodeset = nodeset
+
+ request = self.nodepool.requestNodes(
+ "test-uuid", job, "tenant", "pipeline", "provider", 0, 0)
+ self.waitForRequests()
+ self.assertEqual(len(self.provisioned_requests), 1)
+ self.assertEqual(request.state, 'fulfilled')
+
+ # Look up the node request from ZooKeeper while providing no nodeset
+ # will result in a fake nodeset being created for the node update.
+ restored_request = self.zk_nodepool.getNodeRequest(request.id)
+
+ # As we didn't provide a nodeset, the nodepool client will create a
+ # fake one to look up the node information from nodepool.
+ self.assertFalse(nodeset == restored_request.nodeset)
+
+ restored_nodes = restored_request.nodeset.getNodes()
+ self.assertEqual(len(restored_nodes), 2)
+ self.assertEqual(restored_nodes[0].label, 'ubuntu-xenial')
+ self.assertEqual(restored_nodes[1].label, 'ubuntu-xenial')
+ # As the nodes were faked, they don't have the same name like the
+ # original ones from the config
+ self.assertEqual(restored_nodes[0].name, "ubuntu-xenial-0")
+ self.assertEqual(restored_nodes[1].name, "ubuntu-xenial-1")
diff --git a/web/public/openapi.yaml b/web/public/openapi.yaml
index 6313d1263..312a1907e 100644
--- a/web/public/openapi.yaml
+++ b/web/public/openapi.yaml
@@ -308,9 +308,6 @@ paths:
items:
$ref: '#/components/schemas/pipelineStatus'
type: array
- result_event_queue:
- description: The number of completed events
- type: integer
trigger_event_queue:
description: The number of running events
type: integer
diff --git a/web/src/pages/Status.jsx b/web/src/pages/Status.jsx
index 38490a923..ac3dc7840 100644
--- a/web/src/pages/Status.jsx
+++ b/web/src/pages/Status.jsx
@@ -171,10 +171,7 @@ class StatusPage extends React.Component {
}</span> events,&nbsp;
<span>{status.management_event_queue ?
status.management_event_queue.length : '0'
- }</span> management events,&nbsp;
- <span>{status.result_event_queue ?
- status.result_event_queue.length : '0'
- }</span> results.
+ }</span> management events.
</p>
)
}
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 0c10bc661..702316e0e 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -1482,12 +1482,13 @@ class PipelineManager(metaclass=ABCMeta):
repo_state[connection] = event.repo_state[connection]
build_set.repo_state_state = build_set.COMPLETE
- def onNodesProvisioned(self, event, build_set):
+ def onNodesProvisioned(self, request, build_set):
# TODOv3(jeblair): handle provisioning failure here
- request = event.request
log = get_annotated_logger(self.log, request.event_id)
build_set.jobNodeRequestComplete(request.job_name, request.nodeset)
+ # TODO (felix): Check if the failed is still needed as the
+ # NodesProvisionedEvents are now in ZooKeeper.
if request.failed or not request.fulfilled:
log.info("Node request %s: failure for %s",
request, request.job_name)
diff --git a/zuul/model.py b/zuul/model.py
index 80114a689..bd34e7bf3 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -871,6 +871,7 @@ class NodeRequest(object):
self._state = data['state']
self.state_time = data['state_time']
self.relative_priority = data.get('relative_priority', 0)
+ self.event_id = data['event_id']
@classmethod
def fromDict(cls, data):
@@ -4096,20 +4097,26 @@ class NodesProvisionedEvent(ResultEvent):
:arg NodeRequest request: The fulfilled node request.
"""
- def __init__(self, request):
- self.request = request
- self.build_set_uuid = request.build_set_uuid
- self.request_id = request.id
+ def __init__(self, request_id, job_name, build_set_uuid):
+ self.request_id = request_id
+ # We have to use the job_name to look up empty node requests from the
+ # buildset (as empty node requests don't have an id).
+ self.job_name = job_name
+ self.build_set_uuid = build_set_uuid
def toDict(self):
return {
- "request": self.request,
+ "request_id": self.request_id,
+ "job_name": self.job_name,
+ "build_set_uuid": self.build_set_uuid,
}
@classmethod
def fromDict(cls, data):
return cls(
- data.get("request"),
+ data.get("request_id"),
+ data.get("job_name"),
+ data.get("build_set_uuid"),
)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 70b2f0890..cdeaba0ee 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -163,7 +163,6 @@ class Scheduler(threading.Thread):
# TODO (swestphahl): Remove after we've refactored reconfigurations
# to be performed on the tenant level.
self.reconfigure_event_queue = NamedQueue("ReconfigureEventQueue")
- self.result_event_queue = NamedQueue("ResultEventQueue")
self.event_watcher = EventWatcher(
self.zk_client, self.wake_event.set
)
@@ -405,8 +404,6 @@ class Scheduler(threading.Thread):
merge_running += running
self.statsd.gauge('zuul.mergers.jobs_running', merge_running)
self.statsd.gauge('zuul.mergers.jobs_queued', merge_queue)
- self.statsd.gauge('zuul.scheduler.eventqueues.result',
- self.result_event_queue.qsize())
self.statsd.gauge('zuul.scheduler.eventqueues.management',
self.reconfigure_event_queue.qsize())
base = 'zuul.scheduler.eventqueues.connection'
@@ -588,9 +585,10 @@ class Scheduler(threading.Thread):
self.pipeline_result_events[tenant_name][pipeline_name].put(event)
def onNodesProvisioned(self, req):
- event = NodesProvisionedEvent(req)
- self.result_event_queue.put(event)
- self.wake_event.set()
+ tenant_name = req.tenant_name
+ pipeline_name = req.pipeline_name
+ event = NodesProvisionedEvent(req.id, req.job_name, req.build_set_uuid)
+ self.pipeline_result_events[tenant_name][pipeline_name].put(event)
def reconfigureTenant(self, tenant, project, event):
self.log.debug("Submitting tenant reconfiguration event for "
@@ -1344,11 +1342,6 @@ class Scheduler(threading.Thread):
if not self._stopped:
self.process_reconfigure_queue()
- # TODO(swestphahl): Remove legacy result event queue after
- # NodesProvisionedEvents are dispatched via Zookeeper.
- if not self._stopped:
- self.process_result_queue()
-
# Process tenant management events separate from other events
# as they might reload the tenant.
for tenant in self.abide.tenants.values():
@@ -1622,21 +1615,6 @@ class Scheduler(threading.Thread):
pipeline.name
].ack(event)
- def process_result_queue(self):
- # TODO (felix): The old result event queue is still used for the nodes
- # provisioned results and will be removed once we move those to ZK as
- # well.
- while not self.result_event_queue.empty() and not self._stopped:
- self.log.debug("Fetching result event")
- event = self.result_event_queue.get()
- try:
- if isinstance(event, NodesProvisionedEvent):
- self._doNodesProvisionedEvent(event)
- else:
- self.log.error("Unable to handle event %s", event)
- finally:
- self.result_event_queue.task_done()
-
def _process_result_event(self, event, pipeline):
if isinstance(event, BuildStartedEvent):
self._doBuildStartedEvent(event)
@@ -1650,6 +1628,8 @@ class Scheduler(threading.Thread):
self._doMergeCompletedEvent(event, pipeline)
elif isinstance(event, FilesChangesCompletedEvent):
self._doFilesChangesCompletedEvent(event, pipeline)
+ elif isinstance(event, NodesProvisionedEvent):
+ self._doNodesProvisionedEvent(event, pipeline)
else:
self.log.error("Unable to handle event %s", event)
@@ -1839,40 +1819,41 @@ class Scheduler(threading.Thread):
return
pipeline.manager.onFilesChangesCompleted(event, build_set)
- def _doNodesProvisionedEvent(self, event):
- request = event.request
+ def _doNodesProvisionedEvent(self, event, pipeline):
request_id = event.request_id
- tenant_name = request.tenant_name
- pipeline_name = request.pipeline_name
- log = get_annotated_logger(self.log, request.event_id)
+ # Look up the buildset to access the local node request object
+ build_set = self._getBuildSetFromPipeline(event, pipeline)
+ if not build_set:
+ # Directly look up the node request in ZK and provide a dummy
+ # nodeset, so we can return the nodes to nodepool.
+ request = self.zk_nodepool.getNodeRequest(request_id)
+ if request.fulfilled:
+ self.nodepool.returnNodeSet(request.nodeset,
+ zuul_event_id=request.event_id)
+ return
+
+ request = build_set.getJobNodeRequest(event.job_name)
+ if not request:
+ # Directly look up the node request in ZK and provide a dummy
+ # nodeset, so we can return the nodes to nodepool.
+ request = self.zk_nodepool.getNodeRequest(request_id)
+ if request.fulfilled:
+ self.nodepool.returnNodeSet(request.nodeset,
+ zuul_event_id=request.event_id)
+ return
ready = self.nodepool.checkNodeRequest(request, request_id)
if not ready:
return
+ log = get_annotated_logger(self.log, request.event_id)
+
# If the request failed, we must directly delete it as the nodes will
# never be accepted.
if request.state == STATE_FAILED:
self.nodepool.deleteNodeRequest(request)
- # Look up the pipeline by its name
- # TODO (felix): The pipeline lookup can be removed once the
- # NodesProvisionedEvents are in ZooKeeper.
- pipeline = None
- tenant = self.abide.tenants.get(tenant_name)
- for pl in tenant.layout.pipelines.values():
- if pl.name == pipeline_name:
- pipeline = pl
- break
-
- build_set = self._getBuildSetFromPipeline(event, pipeline)
- if not build_set:
- if request.fulfilled:
- self.nodepool.returnNodeSet(request.nodeset,
- zuul_event_id=request.event_id)
- return
-
if request.job_name not in [x.name for x in build_set.item.getJobs()]:
log.warning("Item %s does not contain job %s "
"for node request %s",
@@ -1883,7 +1864,7 @@ class Scheduler(threading.Thread):
zuul_event_id=request.event_id)
return
- pipeline.manager.onNodesProvisioned(event, build_set)
+ pipeline.manager.onNodesProvisioned(request, build_set)
def formatStatusJSON(self, tenant_name):
# TODOv3(jeblair): use tenants
@@ -1895,9 +1876,6 @@ class Scheduler(threading.Thread):
data['trigger_event_queue'] = {}
data['trigger_event_queue']['length'] = len(
self.trigger_events[tenant_name])
- data['result_event_queue'] = {}
- data['result_event_queue']['length'] = \
- self.result_event_queue.qsize()
data['management_event_queue'] = {}
data['management_event_queue']['length'] = len(
self.management_events[tenant_name]
diff --git a/zuul/zk/nodepool.py b/zuul/zk/nodepool.py
index 3101f8d2f..7d67662e4 100644
--- a/zuul/zk/nodepool.py
+++ b/zuul/zk/nodepool.py
@@ -21,7 +21,7 @@ from kazoo.recipe.cache import TreeEvent
from kazoo.recipe.lock import Lock
import zuul.model
-from zuul.model import HoldRequest, NodeRequest
+from zuul.model import HoldRequest, Node, NodeRequest, NodeSet
from zuul.zk import ZooKeeperClient, ZooKeeperBase
from zuul.zk.exceptions import LockException
@@ -400,14 +400,14 @@ class ZooKeeperNodepool(ZooKeeperBase):
self.kazoo_client.DataWatch(path, callback)
- def getNodeRequest(self, node_request_id, nodeset):
+ def getNodeRequest(self, node_request_id, nodeset=None):
"""
Retrieve a NodeRequest from a given path in ZooKeeper.
- The nodeset provided to this method will be set on the NodeRequest
- before updating it. This will ensure that all nodes are updated as
- well.
-
+ The serialized version of the NodeRequest doesn't contain a NodeSet, so
+ we have to add this to the request manually. The nodeset provided to
+ this method will be set on the NodeRequest before updating it. This
+ will ensure that all nodes are updated as well.
"""
path = f"{self.REQUEST_ROOT}/{node_request_id}"
@@ -419,7 +419,25 @@ class ZooKeeperNodepool(ZooKeeperBase):
if not data:
return None
- obj = NodeRequest.fromDict(json.loads(data.decode("utf-8")))
+ json_data = json.loads(data.decode("utf-8"))
+
+ obj = NodeRequest.fromDict(json_data)
+ if nodeset is None:
+ # If no NodeSet is provided, we create one on-the-fly based on the
+ # list of labels (node_types) stored in the NodeRequest's znode
+ # data.
+ # This is necessary as the logic in the updateNodeRequest() method
+ # below will update each node "in-place" an thus, we have to ensure
+ # that the NodeRequest has a valid NodeSet with all nodes available
+ # in advance.
+ # This is only used to return the nodes to nodepool in case
+ # something went wrong and the original NodeRequest and/or NodeSet
+ # objects are not available anymore.
+ nodeset = NodeSet()
+ for i, node_type in enumerate(json_data["node_types"]):
+ node = Node(name=f"{node_type}-{i}", label=node_type)
+ nodeset.addNode(node)
+
obj.nodeset = nodeset
# Using updateNodeRequest() here will ensure that the nodes are also
# updated. Doing the update in here directly rather than calling it