summaryrefslogtreecommitdiff
path: root/zuul/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'zuul/scheduler.py')
-rw-r--r--zuul/scheduler.py123
1 files changed, 66 insertions, 57 deletions
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index a546339c3..b8314f162 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1548,7 +1548,6 @@ class Scheduler(threading.Thread):
# This is called in the scheduler loop after another thread submits
# a request
if self.unparsed_abide.ltime < self.system_config_cache.ltime:
- self.log.debug("Updating system config")
self.updateSystemConfig()
with self.layout_lock:
@@ -2126,70 +2125,74 @@ class Scheduler(threading.Thread):
return
self.log.debug("Run handler awake")
self.run_handler_lock.acquire()
- try:
- if not self._stopped:
- self.process_reconfigure_queue()
+ with self.statsd_timer("zuul.scheduler.run_handler"):
+ try:
+ self._run()
+ except Exception:
+ self.log.exception("Exception in run handler:")
+ # There may still be more events to process
+ self.wake_event.set()
+ finally:
+ self.run_handler_lock.release()
- if self.unparsed_abide.ltime < self.system_config_cache.ltime:
- self.updateSystemConfig()
+ def _run(self):
+ if not self._stopped:
+ self.process_reconfigure_queue()
- for tenant_name in self.unparsed_abide.tenants:
- if self._stopped:
- break
+ if self.unparsed_abide.ltime < self.system_config_cache.ltime:
+ self.updateSystemConfig()
- tenant = self.abide.tenants.get(tenant_name)
- if not tenant:
- continue
+ for tenant_name in self.unparsed_abide.tenants:
+ if self._stopped:
+ break
- # This will also forward events for the pipelines
- # (e.g. enqueue or dequeue events) to the matching
- # pipeline event queues that are processed afterwards.
- self.process_tenant_management_queue(tenant)
+ tenant = self.abide.tenants.get(tenant_name)
+ if not tenant:
+ continue
- if self._stopped:
- break
+ # This will also forward events for the pipelines
+ # (e.g. enqueue or dequeue events) to the matching
+ # pipeline event queues that are processed afterwards.
+ self.process_tenant_management_queue(tenant)
- try:
- with tenant_read_lock(
- self.zk_client, tenant_name, blocking=False
- ) as tlock:
- if not self.isTenantLayoutUpToDate(tenant_name):
- continue
+ if self._stopped:
+ break
- # Get tenant again, as it might have been updated
- # by a tenant reconfig or layout change.
- tenant = self.abide.tenants[tenant_name]
+ try:
+ with tenant_read_lock(
+ self.zk_client, tenant_name, blocking=False
+ ) as tlock:
+ if not self.isTenantLayoutUpToDate(tenant_name):
+ continue
- if not self._stopped:
- # This will forward trigger events to pipeline
- # event queues that are processed below.
- self.process_tenant_trigger_queue(tenant)
+ # Get tenant again, as it might have been updated
+ # by a tenant reconfig or layout change.
+ tenant = self.abide.tenants[tenant_name]
- self.process_pipelines(tenant, tlock)
- except LockException:
- self.log.debug("Skipping locked tenant %s",
- tenant.name)
- remote_state = self.tenant_layout_state.get(
- tenant_name)
- local_state = self.local_layout_state.get(
- tenant_name)
- if (remote_state is None or
- local_state is None or
- remote_state > local_state):
- # Let's keep looping until we've updated to the
- # latest tenant layout.
- self.wake_event.set()
- except Exception:
- self.log.exception("Exception processing tenant %s:",
- tenant_name)
- # There may still be more events to process
- self.wake_event.set()
+ if not self._stopped:
+ # This will forward trigger events to pipeline
+ # event queues that are processed below.
+ self.process_tenant_trigger_queue(tenant)
+
+ self.process_pipelines(tenant, tlock)
+ except LockException:
+ self.log.debug("Skipping locked tenant %s",
+ tenant.name)
+ remote_state = self.tenant_layout_state.get(
+ tenant_name)
+ local_state = self.local_layout_state.get(
+ tenant_name)
+ if (remote_state is None or
+ local_state is None or
+ remote_state > local_state):
+ # Let's keep looping until we've updated to the
+ # latest tenant layout.
+ self.wake_event.set()
except Exception:
- self.log.exception("Exception in run handler:")
+ self.log.exception("Exception processing tenant %s:",
+ tenant_name)
# There may still be more events to process
self.wake_event.set()
- finally:
- self.run_handler_lock.release()
def primeSystemConfig(self):
with self.layout_lock:
@@ -2207,6 +2210,7 @@ class Scheduler(threading.Thread):
def updateSystemConfig(self):
with self.layout_lock:
+ self.log.debug("Updating system config")
self.unparsed_abide, self.globals = self.system_config_cache.get()
self.ansible_manager = AnsibleManager(
default_version=self.globals.default_ansible_version)
@@ -2241,7 +2245,12 @@ class Scheduler(threading.Thread):
self.zk_client, tenant.name, pipeline.name,
blocking=False) as lock,\
self.createZKContext(lock, self.log) as ctx:
+ self.log.debug("Processing pipeline %s in tenant %s",
+ pipeline.name, tenant.name)
with pipeline.manager.currentContext(ctx):
+ if ((tenant.name, pipeline.name) in
+ self._profile_pipelines):
+ ctx.profile = True
with self.statsd_timer(f'{stats_key}.handling'):
refreshed = self._process_pipeline(
tenant, pipeline)
@@ -2310,14 +2319,10 @@ class Scheduler(threading.Thread):
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
ctx = pipeline.manager.current_context
- if (tenant.name, pipeline.name) in self._profile_pipelines:
- ctx.profile = True
with self.statsd_timer(f'{stats_key}.refresh'):
pipeline.change_list.refresh(ctx)
pipeline.summary.refresh(ctx)
pipeline.state.refresh(ctx)
- if (tenant.name, pipeline.name) in self._profile_pipelines:
- ctx.profile = False
pipeline.state.setDirty(self.zk_client.client)
if pipeline.state.old_queues:
@@ -2382,6 +2387,8 @@ class Scheduler(threading.Thread):
with trigger_queue_lock(
self.zk_client, tenant.name, blocking=False
):
+ self.log.debug("Processing tenant trigger events in %s",
+ tenant.name)
# Update the pipeline changes
ctx = self.createZKContext(None, self.log)
for pipeline in tenant.layout.pipelines.values():
@@ -2590,6 +2597,8 @@ class Scheduler(threading.Thread):
"Skipping management event queue for tenant %s",
tenant.name)
return
+ self.log.debug("Processing tenant management events in %s",
+ tenant.name)
self._process_tenant_management_queue(tenant)
except LockException:
self.log.debug("Skipping locked management event queue"