summaryrefslogtreecommitdiff
path: root/zuul/manager/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'zuul/manager/__init__.py')
-rw-r--r--zuul/manager/__init__.py92
1 files changed, 65 insertions, 27 deletions
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 36361df11..c3d082a47 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -243,7 +243,7 @@ class PipelineManager(metaclass=ABCMeta):
and self.useDependenciesByTopic(change.project))
if (update_commit_dependencies
or update_topic_dependencies):
- self.updateCommitDependencies(change, None, event=None)
+ self.updateCommitDependencies(change, event=None)
self._change_cache[change.cache_key] = change
resolved_changes.append(change)
return resolved_changes
@@ -285,11 +285,18 @@ class PipelineManager(metaclass=ABCMeta):
return True
return False
- def isAnyVersionOfChangeInPipeline(self, change):
- # Checks any items in the pipeline
+ def isChangeRelevantToPipeline(self, change):
+ # Checks if any version of the change or its deps matches any
+ # item in the pipeline.
for change_key in self.pipeline.change_list.getChangeKeys():
if change.cache_stat.key.isSameChange(change_key):
return True
+ if isinstance(change, model.Change):
+ for dep_change_ref in change.getNeedsChanges(
+ self.useDependenciesByTopic(change.project)):
+ dep_change_key = ChangeKey.fromReference(dep_change_ref)
+ if change.cache_stat.key.isSameChange(dep_change_key):
+ return True
return False
def isChangeAlreadyInQueue(self, change, change_queue):
@@ -315,7 +322,7 @@ class PipelineManager(metaclass=ABCMeta):
to_refresh.add(item.change)
for existing_change in to_refresh:
- self.updateCommitDependencies(existing_change, None, event)
+ self.updateCommitDependencies(existing_change, event)
def reportEnqueue(self, item):
if not self.pipeline.state.disabled:
@@ -516,7 +523,8 @@ class PipelineManager(metaclass=ABCMeta):
def addChange(self, change, event, quiet=False, enqueue_time=None,
ignore_requirements=False, live=True,
- change_queue=None, history=None, dependency_graph=None):
+ change_queue=None, history=None, dependency_graph=None,
+ skip_presence_check=False):
log = get_annotated_logger(self.log, event)
log.debug("Considering adding change %s" % change)
@@ -531,7 +539,9 @@ class PipelineManager(metaclass=ABCMeta):
# If we are adding a live change, check if it's a live item
# anywhere in the pipeline. Otherwise, we will perform the
# duplicate check below on the specific change_queue.
- if live and self.isChangeAlreadyInPipeline(change):
+ if (live and
+ self.isChangeAlreadyInPipeline(change) and
+ not skip_presence_check):
log.debug("Change %s is already in pipeline, ignoring" % change)
return True
@@ -564,7 +574,7 @@ class PipelineManager(metaclass=ABCMeta):
# to date and this is a noop; otherwise, we need to refresh
# them anyway.
if isinstance(change, model.Change):
- self.updateCommitDependencies(change, None, event)
+ self.updateCommitDependencies(change, event)
with self.getChangeQueue(change, event, change_queue) as change_queue:
if not change_queue:
@@ -590,8 +600,10 @@ class PipelineManager(metaclass=ABCMeta):
log.debug("History after enqueuing changes ahead: %s", history)
if self.isChangeAlreadyInQueue(change, change_queue):
- log.debug("Change %s is already in queue, ignoring" % change)
- return True
+ if not skip_presence_check:
+ log.debug("Change %s is already in queue, ignoring",
+ change)
+ return True
cycle = []
if isinstance(change, model.Change):
@@ -625,7 +637,7 @@ class PipelineManager(metaclass=ABCMeta):
if enqueue_time:
item.enqueue_time = enqueue_time
item.live = live
- self.reportStats(item, added=True)
+ self.reportStats(item, trigger_event=event)
item.quiet = quiet
if item.live:
@@ -857,7 +869,7 @@ class PipelineManager(metaclass=ABCMeta):
self.pipeline.tenant.name][other_pipeline.name].put(
event, needs_result=False)
- def updateCommitDependencies(self, change, change_queue, event):
+ def updateCommitDependencies(self, change, event):
log = get_annotated_logger(self.log, event)
must_update_commit_deps = (
@@ -1448,16 +1460,17 @@ class PipelineManager(metaclass=ABCMeta):
item.bundle and
item.bundle.updatesConfig(tenant) and tpc is not None
):
- extra_config_files = set(tpc.extra_config_files)
- extra_config_dirs = set(tpc.extra_config_dirs)
- # Merge extra_config_files and extra_config_dirs of the
- # dependent change
- for item_ahead in item.items_ahead:
- tpc_ahead = tenant.project_configs.get(
- item_ahead.change.project.canonical_name)
- if tpc_ahead:
- extra_config_files.update(tpc_ahead.extra_config_files)
- extra_config_dirs.update(tpc_ahead.extra_config_dirs)
+ # Collect extra config files and dirs of required changes.
+ extra_config_files = set()
+ extra_config_dirs = set()
+ for merger_item in item.current_build_set.merger_items:
+ source = self.sched.connections.getSource(
+ merger_item["connection"])
+ project = source.getProject(merger_item["project"])
+ tpc = tenant.project_configs.get(project.canonical_name)
+ if tpc:
+ extra_config_files.update(tpc.extra_config_files)
+ extra_config_dirs.update(tpc.extra_config_dirs)
ready = self.scheduleMerge(
item,
@@ -1554,6 +1567,7 @@ class PipelineManager(metaclass=ABCMeta):
log.info("Dequeuing change %s because "
"it can no longer merge" % item.change)
self.cancelJobs(item)
+ quiet_dequeue = False
if item.isBundleFailing():
item.setDequeuedBundleFailing('Bundle is failing')
elif not meets_reqs:
@@ -1565,7 +1579,28 @@ class PipelineManager(metaclass=ABCMeta):
else:
msg = f'Change {clist} is needed.'
item.setDequeuedNeedingChange(msg)
- if item.live:
+ # If all the dependencies are already in the pipeline
+ # (but not ahead of this change), then we probably
+ # just added updated versions of them, possibly
+ # updating a cycle. In that case, attempt to
+ # re-enqueue this change with the updated deps.
+ if (item.live and
+ all([self.isChangeAlreadyInPipeline(c)
+ for c in needs_changes])):
+ # Try enqueue, if that succeeds, keep this dequeue quiet
+ try:
+ log.info("Attempting re-enqueue of change %s",
+ item.change)
+ quiet_dequeue = self.addChange(
+ item.change, item.event,
+ enqueue_time=item.enqueue_time,
+ quiet=True,
+ skip_presence_check=True)
+ except Exception:
+ log.exception("Unable to re-enqueue change %s "
+ "which is missing dependencies",
+ item.change)
+ if item.live and not quiet_dequeue:
try:
self.reportItem(item)
except exceptions.MergeFailure:
@@ -2197,7 +2232,7 @@ class PipelineManager(metaclass=ABCMeta):
log.error("Reporting item %s received: %s", item, ret)
return action, (not ret)
- def reportStats(self, item, added=False):
+ def reportStats(self, item, trigger_event=None):
if not self.sched.statsd:
return
try:
@@ -2236,18 +2271,21 @@ class PipelineManager(metaclass=ABCMeta):
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
- if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'):
+ if (
+ trigger_event
+ and hasattr(trigger_event, 'arrived_at_scheduler_timestamp')
+ ):
now = time.time()
- arrived = item.event.arrived_at_scheduler_timestamp
+ arrived = trigger_event.arrived_at_scheduler_timestamp
processing = (now - arrived) * 1000
- elapsed = (now - item.event.timestamp) * 1000
+ elapsed = (now - trigger_event.timestamp) * 1000
self.sched.statsd.timing(
basekey + '.event_enqueue_processing_time',
processing)
self.sched.statsd.timing(
basekey + '.event_enqueue_time', elapsed)
self.reportPipelineTiming('event_enqueue_time',
- item.event.timestamp)
+ trigger_event.timestamp)
except Exception:
self.log.exception("Exception reporting pipeline stats")