summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2023-02-15 08:43:19 +0000
committerGerrit Code Review <review@openstack.org>2023-02-15 08:43:19 +0000
commitaf96e5786f7bf83c40bd5f5973d20d20c46ae33b (patch)
tree8cbd5fbeff052e925152e5ffeff85b100c900f3b
parentb1b8c00ddd646c5ed51b36ffd260102c2e3873cd (diff)
parent95ecb41c51c4a766d254ec05f32ec9c1f0472d4f (diff)
downloadzuul-af96e5786f7bf83c40bd5f5973d20d20c46ae33b.tar.gz
Merge "Add scheduler run handler metric"
-rw-r--r--doc/source/monitoring.rst6
-rw-r--r--tests/unit/test_scheduler.py1
-rw-r--r--zuul/scheduler.py108
3 files changed, 63 insertions, 52 deletions
diff --git a/doc/source/monitoring.rst b/doc/source/monitoring.rst
index a87356e45..f40bee445 100644
--- a/doc/source/monitoring.rst
+++ b/doc/source/monitoring.rst
@@ -716,6 +716,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
The size of the current connection event queue.
+ .. stat:: run_handler
+ :type: timer
+
+ A timer metric reporting the time taken for one scheduler run
+ handler iteration.
+
.. stat:: time_query
:type: timer
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index b3948e519..1a5657ed6 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -461,6 +461,7 @@ class TestScheduler(ZuulTestCase):
'zuul.mergers.online', value='1', kind='g')
self.assertReportedStat('zuul.scheduler.eventqueues.connection.gerrit',
value='0', kind='g')
+ self.assertReportedStat('zuul.scheduler.run_handler', kind='ms')
# Catch time / monotonic errors
for key in [
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a546339c3..63f3d7389 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -2126,70 +2126,74 @@ class Scheduler(threading.Thread):
return
self.log.debug("Run handler awake")
self.run_handler_lock.acquire()
- try:
- if not self._stopped:
- self.process_reconfigure_queue()
+ with self.statsd_timer("zuul.scheduler.run_handler"):
+ try:
+ self._run()
+ except Exception:
+ self.log.exception("Exception in run handler:")
+ # There may still be more events to process
+ self.wake_event.set()
+ finally:
+ self.run_handler_lock.release()
- if self.unparsed_abide.ltime < self.system_config_cache.ltime:
- self.updateSystemConfig()
+ def _run(self):
+ if not self._stopped:
+ self.process_reconfigure_queue()
- for tenant_name in self.unparsed_abide.tenants:
- if self._stopped:
- break
+ if self.unparsed_abide.ltime < self.system_config_cache.ltime:
+ self.updateSystemConfig()
- tenant = self.abide.tenants.get(tenant_name)
- if not tenant:
- continue
+ for tenant_name in self.unparsed_abide.tenants:
+ if self._stopped:
+ break
- # This will also forward events for the pipelines
- # (e.g. enqueue or dequeue events) to the matching
- # pipeline event queues that are processed afterwards.
- self.process_tenant_management_queue(tenant)
+ tenant = self.abide.tenants.get(tenant_name)
+ if not tenant:
+ continue
- if self._stopped:
- break
+ # This will also forward events for the pipelines
+ # (e.g. enqueue or dequeue events) to the matching
+ # pipeline event queues that are processed afterwards.
+ self.process_tenant_management_queue(tenant)
- try:
- with tenant_read_lock(
- self.zk_client, tenant_name, blocking=False
- ) as tlock:
- if not self.isTenantLayoutUpToDate(tenant_name):
- continue
+ if self._stopped:
+ break
- # Get tenant again, as it might have been updated
- # by a tenant reconfig or layout change.
- tenant = self.abide.tenants[tenant_name]
+ try:
+ with tenant_read_lock(
+ self.zk_client, tenant_name, blocking=False
+ ) as tlock:
+ if not self.isTenantLayoutUpToDate(tenant_name):
+ continue
- if not self._stopped:
- # This will forward trigger events to pipeline
- # event queues that are processed below.
- self.process_tenant_trigger_queue(tenant)
+ # Get tenant again, as it might have been updated
+ # by a tenant reconfig or layout change.
+ tenant = self.abide.tenants[tenant_name]
- self.process_pipelines(tenant, tlock)
- except LockException:
- self.log.debug("Skipping locked tenant %s",
- tenant.name)
- remote_state = self.tenant_layout_state.get(
- tenant_name)
- local_state = self.local_layout_state.get(
- tenant_name)
- if (remote_state is None or
- local_state is None or
- remote_state > local_state):
- # Let's keep looping until we've updated to the
- # latest tenant layout.
- self.wake_event.set()
- except Exception:
- self.log.exception("Exception processing tenant %s:",
- tenant_name)
- # There may still be more events to process
- self.wake_event.set()
+ if not self._stopped:
+ # This will forward trigger events to pipeline
+ # event queues that are processed below.
+ self.process_tenant_trigger_queue(tenant)
+
+ self.process_pipelines(tenant, tlock)
+ except LockException:
+ self.log.debug("Skipping locked tenant %s",
+ tenant.name)
+ remote_state = self.tenant_layout_state.get(
+ tenant_name)
+ local_state = self.local_layout_state.get(
+ tenant_name)
+ if (remote_state is None or
+ local_state is None or
+ remote_state > local_state):
+ # Let's keep looping until we've updated to the
+ # latest tenant layout.
+ self.wake_event.set()
except Exception:
- self.log.exception("Exception in run handler:")
+ self.log.exception("Exception processing tenant %s:",
+ tenant_name)
# There may still be more events to process
self.wake_event.set()
- finally:
- self.run_handler_lock.release()
def primeSystemConfig(self):
with self.layout_lock: