summaryrefslogtreecommitdiff
path: root/lib/ansible/plugins/strategy/linear.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/plugins/strategy/linear.py')
-rw-r--r--lib/ansible/plugins/strategy/linear.py102
1 files changed, 81 insertions, 21 deletions
diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py
index 65240ef8fa..804cfadc77 100644
--- a/lib/ansible/plugins/strategy/linear.py
+++ b/lib/ansible/plugins/strategy/linear.py
@@ -54,7 +54,8 @@ class StrategyModule(StrategyBase):
host_tasks = {}
display.debug("building list of next tasks for hosts")
for host in hosts:
- host_tasks[host.name] = iterator.get_next_task_for_host(host, peek=True)
+ if not iterator.is_failed(host):
+ host_tasks[host.name] = iterator.get_next_task_for_host(host, peek=True)
display.debug("done building task lists")
num_setups = 0
@@ -62,19 +63,26 @@ class StrategyModule(StrategyBase):
num_rescue = 0
num_always = 0
- lowest_cur_block = len(iterator._blocks)
-
display.debug("counting tasks in each state of execution")
- for (k, v) in iteritems(host_tasks):
- if v is None:
- continue
-
+ host_tasks_to_run = [(host, state_task)
+ for host, state_task in iteritems(host_tasks)
+ if state_task and state_task[1]]
+
+ if host_tasks_to_run:
+ lowest_cur_block = min(
+ (s.cur_block for h, (s, t) in host_tasks_to_run
+ if s.run_state != PlayIterator.ITERATING_COMPLETE))
+ else:
+ # empty host_tasks_to_run will just run till the end of the function
+ # without ever touching lowest_cur_block
+ lowest_cur_block = None
+
+ for (k, v) in host_tasks_to_run:
(s, t) = v
- if t is None:
- continue
- if s.cur_block < lowest_cur_block and s.run_state != PlayIterator.ITERATING_COMPLETE:
- lowest_cur_block = s.cur_block
+ if s.cur_block > lowest_cur_block:
+ # Not the current block, ignore it
+ continue
if s.run_state == PlayIterator.ITERATING_SETUP:
num_setups += 1
@@ -98,7 +106,7 @@ class StrategyModule(StrategyBase):
rvals = []
display.debug("starting to advance hosts")
for host in hosts:
- host_state_task = host_tasks[host.name]
+ host_state_task = host_tasks.get(host.name)
if host_state_task is None:
continue
(s, t) = host_state_task
@@ -169,6 +177,10 @@ class StrategyModule(StrategyBase):
skip_rest = False
choose_step = True
+ # flag set if task is set to any_errors_fatal
+ any_errors_fatal = False
+
+ results = []
for (host, task) in host_tasks:
if not task:
continue
@@ -179,14 +191,15 @@ class StrategyModule(StrategyBase):
run_once = False
work_to_do = True
+ if task.any_errors_fatal:
+ any_errors_fatal = True
+
# test to see if the task across all hosts points to an action plugin which
# sets BYPASS_HOST_LOOP to true, or if it has run_once enabled. If so, we
# will only send this task to the first host in the list.
try:
action = action_loader.get(task.action, class_only=True)
- if task.run_once or getattr(action, 'BYPASS_HOST_LOOP', False):
- run_once = True
except KeyError:
# we don't care here, because the action may simply not have a
# corresponding action plugin
@@ -218,6 +231,8 @@ class StrategyModule(StrategyBase):
templar = Templar(loader=self._loader, variables=task_vars)
display.debug("done getting variables")
+ run_once = templar.template(task.run_once)
+
if not callback_sent:
display.debug("sending task start callback, copying the task so we can template it temporarily")
saved_name = task.name
@@ -240,15 +255,17 @@ class StrategyModule(StrategyBase):
self._queue_task(host, task, task_vars, play_context)
# if we're bypassing the host loop, break out now
- if run_once:
+ if run_once or getattr(action, 'BYPASS_HOST_LOOP', False):
break
+ results += self._process_pending_results(iterator, one_pass=True)
+
# go to next host/task group
if skip_rest:
continue
display.debug("done queuing things up, now waiting for results queue to drain")
- results = self._wait_on_pending_results(iterator)
+ results += self._wait_on_pending_results(iterator)
host_results.extend(results)
if not work_to_do and len(iterator.get_failed_hosts()) > 0:
@@ -258,51 +275,94 @@ class StrategyModule(StrategyBase):
break
try:
- included_files = IncludedFile.process_include_results(host_results, self._tqm,
- iterator=iterator, loader=self._loader, variable_manager=self._variable_manager)
+ included_files = IncludedFile.process_include_results(
+ host_results,
+ self._tqm,
+ iterator=iterator,
+ inventory=self._inventory,
+ loader=self._loader,
+ variable_manager=self._variable_manager
+ )
except AnsibleError as e:
return False
+ include_failure = False
if len(included_files) > 0:
+ display.debug("we have included files to process")
noop_task = Task()
noop_task.action = 'meta'
noop_task.args['_raw_params'] = 'noop'
noop_task.set_loader(iterator._play._loader)
+ display.debug("generating all_blocks data")
all_blocks = dict((host, []) for host in hosts_left)
+ display.debug("done generating all_blocks data")
for included_file in included_files:
+ display.debug("processing included file: %s" % included_file._filename)
# included hosts get the task list while those excluded get an equal-length
# list of noop tasks, to make sure that they continue running in lock-step
try:
new_blocks = self._load_included_file(included_file, iterator=iterator)
+ display.debug("iterating over new_blocks loaded from include file")
for new_block in new_blocks:
+ task_vars = self._variable_manager.get_vars(
+ loader=self._loader,
+ play=iterator._play,
+ task=included_file._task,
+ )
+ display.debug("filtering new block on tags")
+ final_block = new_block.filter_tagged_tasks(play_context, task_vars)
+ display.debug("done filtering new block on tags")
+
noop_block = Block(parent_block=task._block)
noop_block.block = [noop_task for t in new_block.block]
noop_block.always = [noop_task for t in new_block.always]
noop_block.rescue = [noop_task for t in new_block.rescue]
+
for host in hosts_left:
if host in included_file._hosts:
- task_vars = self._variable_manager.get_vars(loader=self._loader,
- play=iterator._play, host=host, task=included_file._task)
- final_block = new_block.filter_tagged_tasks(play_context, task_vars)
all_blocks[host].append(final_block)
else:
all_blocks[host].append(noop_block)
+ display.debug("done iterating over new_blocks loaded from include file")
except AnsibleError as e:
for host in included_file._hosts:
self._tqm._failed_hosts[host.name] = True
iterator.mark_host_failed(host)
display.error(e, wrap_text=False)
+ include_failure = True
continue
# finally go through all of the hosts and append the
# accumulated blocks to their list of tasks
+ display.debug("extending task lists for all hosts with included blocks")
+
for host in hosts_left:
iterator.add_tasks(host, all_blocks[host])
+ display.debug("done extending task lists")
+ display.debug("done processing included files")
+
display.debug("results queue empty")
+
+ display.debug("checking for any_errors_fatal")
+ failed_hosts = []
+ for res in results:
+ if res.is_failed() or res.is_unreachable():
+ failed_hosts.append(res._host.name)
+
+ # if any_errors_fatal and we had an error, mark all hosts as failed
+ if any_errors_fatal and len(failed_hosts) > 0:
+ for host in hosts_left:
+ # don't double-mark hosts, or the iterator will potentially
+ # fail them out of the rescue/always states
+ if host.name not in failed_hosts:
+ self._tqm._failed_hosts[host.name] = True
+ iterator.mark_host_failed(host)
+ display.debug("done checking for any_errors_fatal")
+
except (IOError, EOFError) as e:
display.debug("got IOError/EOFError in task loop: %s" % e)
# most likely an abort, return failed