diff options
Diffstat (limited to 'zuul/driver/timer/__init__.py')
-rw-r--r-- | zuul/driver/timer/__init__.py | 106 |
1 files changed, 59 insertions, 47 deletions
diff --git a/zuul/driver/timer/__init__.py b/zuul/driver/timer/__init__.py index 4f11a583b..619b1a8ff 100644 --- a/zuul/driver/timer/__init__.py +++ b/zuul/driver/timer/__init__.py @@ -130,6 +130,24 @@ class TimerDriver(Driver, TriggerInterface): pipeline.name) continue + self._addJobsInner(tenant, pipeline, trigger, timespec, + jobs) + + def _addJobsInner(self, tenant, pipeline, trigger, timespec, jobs): + # jobs is a list that we mutate + for project_name, pcs in tenant.layout.project_configs.items(): + # timer operates on branch heads and doesn't need + # speculative layouts to decide if it should be + # enqueued or not. So it can be decided on cached + # data if it needs to run or not. + pcst = tenant.layout.getAllProjectConfigs(project_name) + if not [True for pc in pcst if pipeline.name in pc.pipelines]: + continue + + (trusted, project) = tenant.getProject(project_name) + try: + for branch in project.source.getProjectBranches( + project, tenant): # The 'misfire_grace_time' argument is set to None to # disable checking if the job missed its run time window. # This ensures we don't miss a trigger when the job is @@ -137,11 +155,17 @@ class TimerDriver(Driver, TriggerInterface): # delays are not a problem for our trigger use-case. job = self.apsched.add_job( self._onTrigger, trigger=trigger, - args=(tenant, pipeline.name, timespec,), + args=(tenant, pipeline.name, project_name, + branch, timespec,), misfire_grace_time=None) jobs.append(job) + except Exception: + self.log.exception("Unable to create APScheduler job for " + "%s %s %s", + tenant, pipeline, project) - def _onTrigger(self, tenant, pipeline_name, timespec): + def _onTrigger(self, tenant, pipeline_name, project_name, branch, + timespec): if not self.election_won: return @@ -150,55 +174,43 @@ class TimerDriver(Driver, TriggerInterface): return try: - self._dispatchEvent(tenant, pipeline_name, timespec) + self._dispatchEvent(tenant, pipeline_name, project_name, + branch, timespec) except Exception: self.stop_event.set() self.log.exception("Error when dispatching timer event") - def _dispatchEvent(self, tenant, pipeline_name, timespec): - self.log.debug('Got trigger for tenant %s and pipeline %s with ' - 'timespec %s', tenant.name, pipeline_name, timespec) - for project_name, pcs in tenant.layout.project_configs.items(): - try: - # timer operates on branch heads and doesn't need - # speculative layouts to decide if it should be - # enqueued or not. So it can be decided on cached - # data if it needs to run or not. - pcst = tenant.layout.getAllProjectConfigs(project_name) - if not [True for pc in pcst if pipeline_name in pc.pipelines]: - continue - - (trusted, project) = tenant.getProject(project_name) - for branch in project.source.getProjectBranches( - project, tenant): - try: - event = TimerTriggerEvent() - event.type = 'timer' - event.timespec = timespec - event.forced_pipeline = pipeline_name - event.project_hostname = project.canonical_hostname - event.project_name = project.name - event.ref = 'refs/heads/%s' % branch - event.branch = branch - event.zuul_event_id = str(uuid4().hex) - event.timestamp = time.time() - # Refresh the branch in order to update the item in the - # change cache. - change_key = project.source.getChangeKey(event) - with self.project_update_locks[project.canonical_name]: - project.source.getChange(change_key, refresh=True, - event=event) - log = get_annotated_logger(self.log, event) - log.debug("Adding event") - self.sched.addTriggerEvent(self.name, event) - except Exception: - self.log.exception("Error dispatching timer event for " - "project %s branch %s", - project, branch) - except Exception: - self.log.exception("Error dispatching timer event for " - "project %s", - project) + def _dispatchEvent(self, tenant, pipeline_name, project_name, + branch, timespec): + self.log.debug('Got trigger for tenant %s and pipeline %s ' + 'project %s branch %s with timespec %s', + tenant.name, pipeline_name, project_name, + branch, timespec) + try: + (trusted, project) = tenant.getProject(project_name) + event = TimerTriggerEvent() + event.type = 'timer' + event.timespec = timespec + event.forced_pipeline = pipeline_name + event.project_hostname = project.canonical_hostname + event.project_name = project.name + event.ref = 'refs/heads/%s' % branch + event.branch = branch + event.zuul_event_id = str(uuid4().hex) + event.timestamp = time.time() + # Refresh the branch in order to update the item in the + # change cache. + change_key = project.source.getChangeKey(event) + with self.project_update_locks[project.canonical_name]: + project.source.getChange(change_key, refresh=True, + event=event) + log = get_annotated_logger(self.log, event) + log.debug("Adding event") + self.sched.addTriggerEvent(self.name, event) + except Exception: + self.log.exception("Error dispatching timer event for " + "tenant %s project %s branch %s", + tenant, project_name, branch) def stop(self): self.stopped = True |