summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2022-11-21 13:33:31 -0800
committerJames E. Blair <jim@acmegating.com>2022-11-21 13:36:45 -0800
commit592d47648ef64bedbd4e309b78251318edeb923a (patch)
treeef12a01bedf3c71403598ed72329b714f6108730
parentc8aac6a118b84fd28ea454161d33f28184c4bd0b (diff)
downloadzuul-592d47648ef64bedbd4e309b78251318edeb923a.tar.gz
Avoid replacing timer apscheduler jobs
If a timer trigger is configured with a large jitter and a reconfiguration happens within the jitter time, it is possible to miss an expected scheduled trigger because the act of reconfiguration removes and re-adds all of a tenant's timer trigger apscheduler jobs. To avoid this situation, we will try to preserve any jobs with identical configurations. Change-Id: I5d3a4d7be891fcb4b9a3f268ee347f2069aaded3
-rw-r--r--tests/unit/test_scheduler.py25
-rw-r--r--zuul/driver/timer/__init__.py60
2 files changed, 62 insertions, 23 deletions
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 6c55952a2..6a0da1279 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -4442,6 +4442,9 @@ class TestScheduler(ZuulTestCase):
# with a configuration which does not include a
# timer-triggered job so that we have an opportunity to set
# the hold flag before the first job.
+ timer = self.scheds.first.sched.connections.drivers['timer']
+ start_jobs = timer.apsched.get_jobs()
+
self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
@@ -4450,6 +4453,7 @@ class TestScheduler(ZuulTestCase):
self.commitConfigUpdate('common-config', config_file)
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+ first_jobs = timer.apsched.get_jobs()
# Collect the currently cached branches in order to later check,
# that the timer driver refreshes the cache.
cached_versions = {}
@@ -4487,6 +4491,7 @@ class TestScheduler(ZuulTestCase):
# below don't race against more jobs being queued.
self.commitConfigUpdate('common-config', 'layouts/no-timer.yaml')
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+ second_jobs = timer.apsched.get_jobs()
self.waitUntilSettled()
# If APScheduler is in mid-event when we remove the job, we
# can end up with one more event firing, so give it an extra
@@ -4513,6 +4518,11 @@ class TestScheduler(ZuulTestCase):
self.assertGreater(change.cache_version,
cached_versions[branch])
+ # We start with no jobs, and our first reconfigure should add jobs
+ self.assertTrue(len(first_jobs) > len(start_jobs))
+ # Our second reconfigure should return us to no jobs
+ self.assertEqual(start_jobs, second_jobs)
+
def _create_dummy_event(self, project, branch):
event = zuul.model.TriggerEvent()
event.type = 'test'
@@ -4532,6 +4542,21 @@ class TestScheduler(ZuulTestCase):
"Test that a periodic job with a jitter is triggered"
self._test_timer('layouts/timer-jitter.yaml')
+ @simple_layout('layouts/timer-jitter.yaml')
+ def test_timer_preserve_jobs(self):
+ # This tests that we keep the same apsched jobs if possible
+ # when reconfiguring. If a reconfiguration happens during the
+ # "jitter" period, we might end up not running jobs unless we
+ # preserve the exact job object across reconfiguration.
+ timer = self.scheds.first.sched.connections.drivers['timer']
+ old_jobs = timer.apsched.get_jobs()
+
+ self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+
+ new_jobs = timer.apsched.get_jobs()
+
+ self.assertEqual(old_jobs, new_jobs)
+
def test_idle(self):
"Test that frequent periodic jobs work"
# This test can not use simple_layout because it must start
diff --git a/zuul/driver/timer/__init__.py b/zuul/driver/timer/__init__.py
index 7922cc604..37fdcf580 100644
--- a/zuul/driver/timer/__init__.py
+++ b/zuul/driver/timer/__init__.py
@@ -83,7 +83,6 @@ class TimerDriver(Driver, TriggerInterface):
self.log.debug("Timer election tenure ended")
def reconfigure(self, tenant):
- self._removeJobs(tenant)
if self.stopped:
return
if not self.apsched:
@@ -93,14 +92,19 @@ class TimerDriver(Driver, TriggerInterface):
self.apsched.start()
self._addJobs(tenant)
- def _removeJobs(self, tenant):
- jobs = self.tenant_jobs.get(tenant.name, [])
- for job in jobs:
+ def _removeJobs(self, tenant, new_jobs):
+ # Compare existing jobs to new jobs and remove any that should
+ # not be present.
+ existing_jobs = self.tenant_jobs.get(tenant.name)
+ if not existing_jobs:
+ return
+ to_remove = set(existing_jobs.keys()) - set(new_jobs.keys())
+ for key in to_remove:
+ job = existing_jobs[key]
job.remove()
def _addJobs(self, tenant):
- jobs = []
- self.tenant_jobs[tenant.name] = jobs
+ jobs = {}
for pipeline in tenant.layout.pipelines.values():
for ef in pipeline.manager.event_filters:
if not isinstance(ef.trigger, timertrigger.TimerTrigger):
@@ -141,9 +145,12 @@ class TimerDriver(Driver, TriggerInterface):
self._addJobsInner(tenant, pipeline, trigger, timespec,
jobs)
+ self._removeJobs(tenant, jobs)
+ self.tenant_jobs[tenant.name] = jobs
def _addJobsInner(self, tenant, pipeline, trigger, timespec, jobs):
- # jobs is a list that we mutate
+ # jobs is a dict of args->job that we mutate
+ existing_jobs = self.tenant_jobs.get(tenant.name, {})
for project_name, pcs in tenant.layout.project_configs.items():
# timer operates on branch heads and doesn't need
# speculative layouts to decide if it should be
@@ -157,23 +164,29 @@ class TimerDriver(Driver, TriggerInterface):
try:
for branch in project.source.getProjectBranches(
project, tenant):
- # The 'misfire_grace_time' argument is set to None to
- # disable checking if the job missed its run time window.
- # This ensures we don't miss a trigger when the job is
- # delayed due to e.g. high scheduler load. Those short
- # delays are not a problem for our trigger use-case.
- job = self.apsched.add_job(
- self._onTrigger, trigger=trigger,
- args=(tenant, pipeline.name, project_name,
- branch, timespec,),
- misfire_grace_time=None)
- jobs.append(job)
+ args = (tenant.name, pipeline.name, project_name,
+ branch, timespec,)
+ existing_job = existing_jobs.get(args)
+ if existing_job:
+ job = existing_job
+ else:
+ # The 'misfire_grace_time' argument is set to
+ # None to disable checking if the job missed
+ # its run time window. This ensures we don't
+ # miss a trigger when the job is delayed due
+ # to e.g. high scheduler load. Those short
+ # delays are not a problem for our trigger
+ # use-case.
+ job = self.apsched.add_job(
+ self._onTrigger, trigger=trigger,
+ args=args, misfire_grace_time=None)
+ jobs[job.args] = job
except Exception:
self.log.exception("Unable to create APScheduler job for "
"%s %s %s",
tenant, pipeline, project)
- def _onTrigger(self, tenant, pipeline_name, project_name, branch,
+ def _onTrigger(self, tenant_name, pipeline_name, project_name, branch,
timespec):
if not self.election_won:
return
@@ -188,19 +201,20 @@ class TimerDriver(Driver, TriggerInterface):
}
with self.tracer.start_as_current_span(
"TimerEvent", attributes=attributes):
- self._dispatchEvent(tenant, pipeline_name, project_name,
+ self._dispatchEvent(tenant_name, pipeline_name, project_name,
branch, timespec)
except Exception:
self.stop_event.set()
self.log.exception("Error when dispatching timer event")
- def _dispatchEvent(self, tenant, pipeline_name, project_name,
+ def _dispatchEvent(self, tenant_name, pipeline_name, project_name,
branch, timespec):
self.log.debug('Got trigger for tenant %s and pipeline %s '
'project %s branch %s with timespec %s',
- tenant.name, pipeline_name, project_name,
+ tenant_name, pipeline_name, project_name,
branch, timespec)
try:
+ tenant = self.sched.abide.tenants[tenant_name]
(trusted, project) = tenant.getProject(project_name)
event = TimerTriggerEvent()
event.type = 'timer'
@@ -224,7 +238,7 @@ class TimerDriver(Driver, TriggerInterface):
except Exception:
self.log.exception("Error dispatching timer event for "
"tenant %s project %s branch %s",
- tenant, project_name, branch)
+ tenant_name, project_name, branch)
def stop(self):
self.log.debug("Stopping timer driver")