diff options
author | James E. Blair <jim@acmegating.com> | 2022-11-21 13:33:31 -0800 |
---|---|---|
committer | James E. Blair <jim@acmegating.com> | 2022-11-21 13:36:45 -0800 |
commit | 592d47648ef64bedbd4e309b78251318edeb923a (patch) | |
tree | ef12a01bedf3c71403598ed72329b714f6108730 | |
parent | c8aac6a118b84fd28ea454161d33f28184c4bd0b (diff) | |
download | zuul-592d47648ef64bedbd4e309b78251318edeb923a.tar.gz |
Avoid replacing timer apscheduler jobs
If a timer trigger is configured with a large jitter and a
reconfiguration happens within the jitter time, it is possible
to miss an expected scheduled trigger because the act of
reconfiguration removes and re-adds all of a tenant's timer
trigger apscheduler jobs.
To avoid this situation, we will try to preserve any jobs with
identical configurations.
Change-Id: I5d3a4d7be891fcb4b9a3f268ee347f2069aaded3
-rw-r--r-- | tests/unit/test_scheduler.py | 25 | ||||
-rw-r--r-- | zuul/driver/timer/__init__.py | 60 |
2 files changed, 62 insertions, 23 deletions
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 6c55952a2..6a0da1279 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -4442,6 +4442,9 @@ class TestScheduler(ZuulTestCase): # with a configuration which does not include a # timer-triggered job so that we have an opportunity to set # the hold flag before the first job. + timer = self.scheds.first.sched.connections.drivers['timer'] + start_jobs = timer.apsched.get_jobs() + self.create_branch('org/project', 'stable') self.fake_gerrit.addEvent( self.fake_gerrit.getFakeBranchCreatedEvent( @@ -4450,6 +4453,7 @@ class TestScheduler(ZuulTestCase): self.commitConfigUpdate('common-config', config_file) self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + first_jobs = timer.apsched.get_jobs() # Collect the currently cached branches in order to later check, # that the timer driver refreshes the cache. cached_versions = {} @@ -4487,6 +4491,7 @@ class TestScheduler(ZuulTestCase): # below don't race against more jobs being queued. self.commitConfigUpdate('common-config', 'layouts/no-timer.yaml') self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + second_jobs = timer.apsched.get_jobs() self.waitUntilSettled() # If APScheduler is in mid-event when we remove the job, we # can end up with one more event firing, so give it an extra @@ -4513,6 +4518,11 @@ class TestScheduler(ZuulTestCase): self.assertGreater(change.cache_version, cached_versions[branch]) + # We start with no jobs, and our first reconfigure should add jobs + self.assertTrue(len(first_jobs) > len(start_jobs)) + # Our second reconfigure should return us to no jobs + self.assertEqual(start_jobs, second_jobs) + def _create_dummy_event(self, project, branch): event = zuul.model.TriggerEvent() event.type = 'test' @@ -4532,6 +4542,21 @@ class TestScheduler(ZuulTestCase): "Test that a periodic job with a jitter is triggered" self._test_timer('layouts/timer-jitter.yaml') + @simple_layout('layouts/timer-jitter.yaml') + def test_timer_preserve_jobs(self): + # This tests that we keep the same apsched jobs if possible + # when reconfiguring. If a reconfiguration happens during the + # "jitter" period, we might end up not running jobs unless we + # preserve the exact job object across reconfiguration. + timer = self.scheds.first.sched.connections.drivers['timer'] + old_jobs = timer.apsched.get_jobs() + + self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + + new_jobs = timer.apsched.get_jobs() + + self.assertEqual(old_jobs, new_jobs) + def test_idle(self): "Test that frequent periodic jobs work" # This test can not use simple_layout because it must start diff --git a/zuul/driver/timer/__init__.py b/zuul/driver/timer/__init__.py index 7922cc604..37fdcf580 100644 --- a/zuul/driver/timer/__init__.py +++ b/zuul/driver/timer/__init__.py @@ -83,7 +83,6 @@ class TimerDriver(Driver, TriggerInterface): self.log.debug("Timer election tenure ended") def reconfigure(self, tenant): - self._removeJobs(tenant) if self.stopped: return if not self.apsched: @@ -93,14 +92,19 @@ class TimerDriver(Driver, TriggerInterface): self.apsched.start() self._addJobs(tenant) - def _removeJobs(self, tenant): - jobs = self.tenant_jobs.get(tenant.name, []) - for job in jobs: + def _removeJobs(self, tenant, new_jobs): + # Compare existing jobs to new jobs and remove any that should + # not be present. + existing_jobs = self.tenant_jobs.get(tenant.name) + if not existing_jobs: + return + to_remove = set(existing_jobs.keys()) - set(new_jobs.keys()) + for key in to_remove: + job = existing_jobs[key] job.remove() def _addJobs(self, tenant): - jobs = [] - self.tenant_jobs[tenant.name] = jobs + jobs = {} for pipeline in tenant.layout.pipelines.values(): for ef in pipeline.manager.event_filters: if not isinstance(ef.trigger, timertrigger.TimerTrigger): @@ -141,9 +145,12 @@ class TimerDriver(Driver, TriggerInterface): self._addJobsInner(tenant, pipeline, trigger, timespec, jobs) + self._removeJobs(tenant, jobs) + self.tenant_jobs[tenant.name] = jobs def _addJobsInner(self, tenant, pipeline, trigger, timespec, jobs): - # jobs is a list that we mutate + # jobs is a dict of args->job that we mutate + existing_jobs = self.tenant_jobs.get(tenant.name, {}) for project_name, pcs in tenant.layout.project_configs.items(): # timer operates on branch heads and doesn't need # speculative layouts to decide if it should be @@ -157,23 +164,29 @@ class TimerDriver(Driver, TriggerInterface): try: for branch in project.source.getProjectBranches( project, tenant): - # The 'misfire_grace_time' argument is set to None to - # disable checking if the job missed its run time window. - # This ensures we don't miss a trigger when the job is - # delayed due to e.g. high scheduler load. Those short - # delays are not a problem for our trigger use-case. - job = self.apsched.add_job( - self._onTrigger, trigger=trigger, - args=(tenant, pipeline.name, project_name, - branch, timespec,), - misfire_grace_time=None) - jobs.append(job) + args = (tenant.name, pipeline.name, project_name, + branch, timespec,) + existing_job = existing_jobs.get(args) + if existing_job: + job = existing_job + else: + # The 'misfire_grace_time' argument is set to + # None to disable checking if the job missed + # its run time window. This ensures we don't + # miss a trigger when the job is delayed due + # to e.g. high scheduler load. Those short + # delays are not a problem for our trigger + # use-case. + job = self.apsched.add_job( + self._onTrigger, trigger=trigger, + args=args, misfire_grace_time=None) + jobs[job.args] = job except Exception: self.log.exception("Unable to create APScheduler job for " "%s %s %s", tenant, pipeline, project) - def _onTrigger(self, tenant, pipeline_name, project_name, branch, + def _onTrigger(self, tenant_name, pipeline_name, project_name, branch, timespec): if not self.election_won: return @@ -188,19 +201,20 @@ class TimerDriver(Driver, TriggerInterface): } with self.tracer.start_as_current_span( "TimerEvent", attributes=attributes): - self._dispatchEvent(tenant, pipeline_name, project_name, + self._dispatchEvent(tenant_name, pipeline_name, project_name, branch, timespec) except Exception: self.stop_event.set() self.log.exception("Error when dispatching timer event") - def _dispatchEvent(self, tenant, pipeline_name, project_name, + def _dispatchEvent(self, tenant_name, pipeline_name, project_name, branch, timespec): self.log.debug('Got trigger for tenant %s and pipeline %s ' 'project %s branch %s with timespec %s', - tenant.name, pipeline_name, project_name, + tenant_name, pipeline_name, project_name, branch, timespec) try: + tenant = self.sched.abide.tenants[tenant_name] (trusted, project) = tenant.getProject(project_name) event = TimerTriggerEvent() event.type = 'timer' @@ -224,7 +238,7 @@ class TimerDriver(Driver, TriggerInterface): except Exception: self.log.exception("Error dispatching timer event for " "tenant %s project %s branch %s", - tenant, project_name, branch) + tenant_name, project_name, branch) def stop(self): self.log.debug("Stopping timer driver") |