summaryrefslogtreecommitdiff
path: root/zuul/driver
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2022-11-21 13:33:31 -0800
committerJames E. Blair <jim@acmegating.com>2022-11-21 13:36:45 -0800
commit592d47648ef64bedbd4e309b78251318edeb923a (patch)
treeef12a01bedf3c71403598ed72329b714f6108730 /zuul/driver
parentc8aac6a118b84fd28ea454161d33f28184c4bd0b (diff)
downloadzuul-592d47648ef64bedbd4e309b78251318edeb923a.tar.gz
Avoid replacing timer apscheduler jobs
If a timer trigger is configured with a large jitter and a reconfiguration happens within the jitter time, it is possible to miss an expected scheduled trigger because the act of reconfiguration removes and re-adds all of a tenant's timer trigger apscheduler jobs. To avoid this situation, we will try to preserve any jobs with identical configurations. Change-Id: I5d3a4d7be891fcb4b9a3f268ee347f2069aaded3
Diffstat (limited to 'zuul/driver')
-rw-r--r--zuul/driver/timer/__init__.py60
1 files changed, 37 insertions, 23 deletions
diff --git a/zuul/driver/timer/__init__.py b/zuul/driver/timer/__init__.py
index 7922cc604..37fdcf580 100644
--- a/zuul/driver/timer/__init__.py
+++ b/zuul/driver/timer/__init__.py
@@ -83,7 +83,6 @@ class TimerDriver(Driver, TriggerInterface):
self.log.debug("Timer election tenure ended")
def reconfigure(self, tenant):
- self._removeJobs(tenant)
if self.stopped:
return
if not self.apsched:
@@ -93,14 +92,19 @@ class TimerDriver(Driver, TriggerInterface):
self.apsched.start()
self._addJobs(tenant)
- def _removeJobs(self, tenant):
- jobs = self.tenant_jobs.get(tenant.name, [])
- for job in jobs:
+ def _removeJobs(self, tenant, new_jobs):
+ # Compare existing jobs to new jobs and remove any that should
+ # not be present.
+ existing_jobs = self.tenant_jobs.get(tenant.name)
+ if not existing_jobs:
+ return
+ to_remove = set(existing_jobs.keys()) - set(new_jobs.keys())
+ for key in to_remove:
+ job = existing_jobs[key]
job.remove()
def _addJobs(self, tenant):
- jobs = []
- self.tenant_jobs[tenant.name] = jobs
+ jobs = {}
for pipeline in tenant.layout.pipelines.values():
for ef in pipeline.manager.event_filters:
if not isinstance(ef.trigger, timertrigger.TimerTrigger):
@@ -141,9 +145,12 @@ class TimerDriver(Driver, TriggerInterface):
self._addJobsInner(tenant, pipeline, trigger, timespec,
jobs)
+ self._removeJobs(tenant, jobs)
+ self.tenant_jobs[tenant.name] = jobs
def _addJobsInner(self, tenant, pipeline, trigger, timespec, jobs):
- # jobs is a list that we mutate
+ # jobs is a dict of args->job that we mutate
+ existing_jobs = self.tenant_jobs.get(tenant.name, {})
for project_name, pcs in tenant.layout.project_configs.items():
# timer operates on branch heads and doesn't need
# speculative layouts to decide if it should be
@@ -157,23 +164,29 @@ class TimerDriver(Driver, TriggerInterface):
try:
for branch in project.source.getProjectBranches(
project, tenant):
- # The 'misfire_grace_time' argument is set to None to
- # disable checking if the job missed its run time window.
- # This ensures we don't miss a trigger when the job is
- # delayed due to e.g. high scheduler load. Those short
- # delays are not a problem for our trigger use-case.
- job = self.apsched.add_job(
- self._onTrigger, trigger=trigger,
- args=(tenant, pipeline.name, project_name,
- branch, timespec,),
- misfire_grace_time=None)
- jobs.append(job)
+ args = (tenant.name, pipeline.name, project_name,
+ branch, timespec,)
+ existing_job = existing_jobs.get(args)
+ if existing_job:
+ job = existing_job
+ else:
+ # The 'misfire_grace_time' argument is set to
+ # None to disable checking if the job missed
+ # its run time window. This ensures we don't
+ # miss a trigger when the job is delayed due
+ # to e.g. high scheduler load. Those short
+ # delays are not a problem for our trigger
+ # use-case.
+ job = self.apsched.add_job(
+ self._onTrigger, trigger=trigger,
+ args=args, misfire_grace_time=None)
+ jobs[job.args] = job
except Exception:
self.log.exception("Unable to create APScheduler job for "
"%s %s %s",
tenant, pipeline, project)
- def _onTrigger(self, tenant, pipeline_name, project_name, branch,
+ def _onTrigger(self, tenant_name, pipeline_name, project_name, branch,
timespec):
if not self.election_won:
return
@@ -188,19 +201,20 @@ class TimerDriver(Driver, TriggerInterface):
}
with self.tracer.start_as_current_span(
"TimerEvent", attributes=attributes):
- self._dispatchEvent(tenant, pipeline_name, project_name,
+ self._dispatchEvent(tenant_name, pipeline_name, project_name,
branch, timespec)
except Exception:
self.stop_event.set()
self.log.exception("Error when dispatching timer event")
- def _dispatchEvent(self, tenant, pipeline_name, project_name,
+ def _dispatchEvent(self, tenant_name, pipeline_name, project_name,
branch, timespec):
self.log.debug('Got trigger for tenant %s and pipeline %s '
'project %s branch %s with timespec %s',
- tenant.name, pipeline_name, project_name,
+ tenant_name, pipeline_name, project_name,
branch, timespec)
try:
+ tenant = self.sched.abide.tenants[tenant_name]
(trusted, project) = tenant.getProject(project_name)
event = TimerTriggerEvent()
event.type = 'timer'
@@ -224,7 +238,7 @@ class TimerDriver(Driver, TriggerInterface):
except Exception:
self.log.exception("Error dispatching timer event for "
"tenant %s project %s branch %s",
- tenant, project_name, branch)
+ tenant_name, project_name, branch)
def stop(self):
self.log.debug("Stopping timer driver")