diff options
Diffstat (limited to 'zuul/scheduler.py')
-rw-r--r-- | zuul/scheduler.py | 123 |
1 files changed, 66 insertions, 57 deletions
diff --git a/zuul/scheduler.py b/zuul/scheduler.py index a546339c3..b8314f162 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1548,7 +1548,6 @@ class Scheduler(threading.Thread): # This is called in the scheduler loop after another thread submits # a request if self.unparsed_abide.ltime < self.system_config_cache.ltime: - self.log.debug("Updating system config") self.updateSystemConfig() with self.layout_lock: @@ -2126,70 +2125,74 @@ class Scheduler(threading.Thread): return self.log.debug("Run handler awake") self.run_handler_lock.acquire() - try: - if not self._stopped: - self.process_reconfigure_queue() + with self.statsd_timer("zuul.scheduler.run_handler"): + try: + self._run() + except Exception: + self.log.exception("Exception in run handler:") + # There may still be more events to process + self.wake_event.set() + finally: + self.run_handler_lock.release() - if self.unparsed_abide.ltime < self.system_config_cache.ltime: - self.updateSystemConfig() + def _run(self): + if not self._stopped: + self.process_reconfigure_queue() - for tenant_name in self.unparsed_abide.tenants: - if self._stopped: - break + if self.unparsed_abide.ltime < self.system_config_cache.ltime: + self.updateSystemConfig() - tenant = self.abide.tenants.get(tenant_name) - if not tenant: - continue + for tenant_name in self.unparsed_abide.tenants: + if self._stopped: + break - # This will also forward events for the pipelines - # (e.g. enqueue or dequeue events) to the matching - # pipeline event queues that are processed afterwards. - self.process_tenant_management_queue(tenant) + tenant = self.abide.tenants.get(tenant_name) + if not tenant: + continue - if self._stopped: - break + # This will also forward events for the pipelines + # (e.g. enqueue or dequeue events) to the matching + # pipeline event queues that are processed afterwards. + self.process_tenant_management_queue(tenant) - try: - with tenant_read_lock( - self.zk_client, tenant_name, blocking=False - ) as tlock: - if not self.isTenantLayoutUpToDate(tenant_name): - continue + if self._stopped: + break - # Get tenant again, as it might have been updated - # by a tenant reconfig or layout change. - tenant = self.abide.tenants[tenant_name] + try: + with tenant_read_lock( + self.zk_client, tenant_name, blocking=False + ) as tlock: + if not self.isTenantLayoutUpToDate(tenant_name): + continue - if not self._stopped: - # This will forward trigger events to pipeline - # event queues that are processed below. - self.process_tenant_trigger_queue(tenant) + # Get tenant again, as it might have been updated + # by a tenant reconfig or layout change. + tenant = self.abide.tenants[tenant_name] - self.process_pipelines(tenant, tlock) - except LockException: - self.log.debug("Skipping locked tenant %s", - tenant.name) - remote_state = self.tenant_layout_state.get( - tenant_name) - local_state = self.local_layout_state.get( - tenant_name) - if (remote_state is None or - local_state is None or - remote_state > local_state): - # Let's keep looping until we've updated to the - # latest tenant layout. - self.wake_event.set() - except Exception: - self.log.exception("Exception processing tenant %s:", - tenant_name) - # There may still be more events to process - self.wake_event.set() + if not self._stopped: + # This will forward trigger events to pipeline + # event queues that are processed below. + self.process_tenant_trigger_queue(tenant) + + self.process_pipelines(tenant, tlock) + except LockException: + self.log.debug("Skipping locked tenant %s", + tenant.name) + remote_state = self.tenant_layout_state.get( + tenant_name) + local_state = self.local_layout_state.get( + tenant_name) + if (remote_state is None or + local_state is None or + remote_state > local_state): + # Let's keep looping until we've updated to the + # latest tenant layout. + self.wake_event.set() except Exception: - self.log.exception("Exception in run handler:") + self.log.exception("Exception processing tenant %s:", + tenant_name) # There may still be more events to process self.wake_event.set() - finally: - self.run_handler_lock.release() def primeSystemConfig(self): with self.layout_lock: @@ -2207,6 +2210,7 @@ class Scheduler(threading.Thread): def updateSystemConfig(self): with self.layout_lock: + self.log.debug("Updating system config") self.unparsed_abide, self.globals = self.system_config_cache.get() self.ansible_manager = AnsibleManager( default_version=self.globals.default_ansible_version) @@ -2241,7 +2245,12 @@ class Scheduler(threading.Thread): self.zk_client, tenant.name, pipeline.name, blocking=False) as lock,\ self.createZKContext(lock, self.log) as ctx: + self.log.debug("Processing pipeline %s in tenant %s", + pipeline.name, tenant.name) with pipeline.manager.currentContext(ctx): + if ((tenant.name, pipeline.name) in + self._profile_pipelines): + ctx.profile = True with self.statsd_timer(f'{stats_key}.handling'): refreshed = self._process_pipeline( tenant, pipeline) @@ -2310,14 +2319,10 @@ class Scheduler(threading.Thread): stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}' ctx = pipeline.manager.current_context - if (tenant.name, pipeline.name) in self._profile_pipelines: - ctx.profile = True with self.statsd_timer(f'{stats_key}.refresh'): pipeline.change_list.refresh(ctx) pipeline.summary.refresh(ctx) pipeline.state.refresh(ctx) - if (tenant.name, pipeline.name) in self._profile_pipelines: - ctx.profile = False pipeline.state.setDirty(self.zk_client.client) if pipeline.state.old_queues: @@ -2382,6 +2387,8 @@ class Scheduler(threading.Thread): with trigger_queue_lock( self.zk_client, tenant.name, blocking=False ): + self.log.debug("Processing tenant trigger events in %s", + tenant.name) # Update the pipeline changes ctx = self.createZKContext(None, self.log) for pipeline in tenant.layout.pipelines.values(): @@ -2590,6 +2597,8 @@ class Scheduler(threading.Thread): "Skipping management event queue for tenant %s", tenant.name) return + self.log.debug("Processing tenant management events in %s", + tenant.name) self._process_tenant_management_queue(tenant) except LockException: self.log.debug("Skipping locked management event queue" |