summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Cammarata <jimi@sngx.net>2016-09-16 00:14:53 -0500
committerJames Cammarata <jimi@sngx.net>2016-09-17 08:12:52 -0500
commit5a57c66e3c14e26fa5f20f1ce9bb038c667b19e5 (patch)
tree465666889713c1e17a84ae9804d373b52f2ca240
parentdfb1c0647e907dd27509922da995932dcc6be8db (diff)
downloadansible-threaded_receiver.tar.gz
Moving result reading to a background threadthreaded_receiver
-rw-r--r--lib/ansible/executor/action_write_locks.py43
-rw-r--r--lib/ansible/executor/module_common.py8
-rw-r--r--lib/ansible/executor/task_queue_manager.py1
-rw-r--r--lib/ansible/playbook/conditional.py3
-rw-r--r--lib/ansible/plugins/strategy/__init__.py456
-rw-r--r--lib/ansible/plugins/strategy/linear.py8
-rw-r--r--test/units/plugins/strategies/test_strategy_base.py89
7 files changed, 372 insertions, 236 deletions
diff --git a/lib/ansible/executor/action_write_locks.py b/lib/ansible/executor/action_write_locks.py
new file mode 100644
index 0000000000..413d56d9d7
--- /dev/null
+++ b/lib/ansible/executor/action_write_locks.py
@@ -0,0 +1,43 @@
+# (c) 2016 - Red Hat, Inc. <support@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from multiprocessing import Lock
+from ansible.module_utils.facts import Facts
+
+if 'action_write_locks' not in globals():
+ # Do not initialize this more than once because it seems to bash
+ # the existing one. multiprocessing must be reloading the module
+ # when it forks?
+ action_write_locks = dict()
+
+ # Below is a Lock for use when we weren't expecting a named module.
+ # It gets used when an action plugin directly invokes a module instead
+ # of going through the strategies. Slightly less efficient as all
+ # processes with unexpected module names will wait on this lock
+ action_write_locks[None] = Lock()
+
+ # These plugins are called directly by action plugins (not going through
+ # a strategy). We precreate them here as an optimization
+ mods = set(p['name'] for p in Facts.PKG_MGRS)
+ mods.update(('copy', 'file', 'setup', 'slurp', 'stat'))
+ for mod_name in mods:
+ action_write_locks[mod_name] = Lock()
+
diff --git a/lib/ansible/executor/module_common.py b/lib/ansible/executor/module_common.py
index d7dac13de0..86549ea0b7 100644
--- a/lib/ansible/executor/module_common.py
+++ b/lib/ansible/executor/module_common.py
@@ -36,7 +36,7 @@ from ansible.module_utils._text import to_bytes, to_text
# Must import strategy and use write_locks from there
# If we import write_locks directly then we end up binding a
# variable to the object and then it never gets updated.
-from ansible.plugins import strategy
+from ansible.executor import action_write_locks
try:
from __main__ import display
@@ -605,16 +605,16 @@ def _find_snippet_imports(module_name, module_data, module_path, module_args, ta
display.debug('ANSIBALLZ: using cached module: %s' % cached_module_filename)
zipdata = open(cached_module_filename, 'rb').read()
else:
- if module_name in strategy.action_write_locks:
+ if module_name in action_write_locks.action_write_locks:
display.debug('ANSIBALLZ: Using lock for %s' % module_name)
- lock = strategy.action_write_locks[module_name]
+ lock = action_write_locks.action_write_locks[module_name]
else:
# If the action plugin directly invokes the module (instead of
# going through a strategy) then we don't have a cross-process
# Lock specifically for this module. Use the "unexpected
# module" lock instead
display.debug('ANSIBALLZ: Using generic lock for %s' % module_name)
- lock = strategy.action_write_locks[None]
+ lock = action_write_locks.action_write_locks[None]
display.debug('ANSIBALLZ: Acquiring lock')
with lock:
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index c003275306..2e6948f1e0 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -285,6 +285,7 @@ class TaskQueueManager:
for host_name in iterator.get_failed_hosts():
self._failed_hosts[host_name] = True
+ strategy.cleanup()
self._cleanup_processes()
return play_return
diff --git a/lib/ansible/playbook/conditional.py b/lib/ansible/playbook/conditional.py
index 14f50f8829..1fb54df998 100644
--- a/lib/ansible/playbook/conditional.py
+++ b/lib/ansible/playbook/conditional.py
@@ -25,6 +25,7 @@ from ansible.compat.six import text_type
from ansible.errors import AnsibleError, AnsibleUndefinedVariable
from ansible.playbook.attribute import FieldAttribute
from ansible.template import Templar
+from ansible.module_utils._text import to_native
class Conditional:
@@ -72,7 +73,7 @@ class Conditional:
if not self._check_conditional(conditional, templar, all_vars):
return False
except Exception as e:
- raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (conditional, e), obj=ds)
+ raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), obj=ds)
return True
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index aaa164ee75..d6db1dbde8 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -19,14 +19,18 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
+import os
+import threading
import time
+from collections import deque
from multiprocessing import Lock
from jinja2.exceptions import UndefinedError
from ansible.compat.six.moves import queue as Queue
from ansible.compat.six import iteritems, string_types
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
+from ansible.executor import action_write_locks
from ansible.executor.process.worker import WorkerProcess
from ansible.executor.task_result import TaskResult
from ansible.inventory.host import Host
@@ -50,25 +54,6 @@ except ImportError:
__all__ = ['StrategyBase']
-if 'action_write_locks' not in globals():
- # Do not initialize this more than once because it seems to bash
- # the existing one. multiprocessing must be reloading the module
- # when it forks?
- action_write_locks = dict()
-
- # Below is a Lock for use when we weren't expecting a named module.
- # It gets used when an action plugin directly invokes a module instead
- # of going through the strategies. Slightly less efficient as all
- # processes with unexpected module names will wait on this lock
- action_write_locks[None] = Lock()
-
- # These plugins are called directly by action plugins (not going through
- # a strategy). We precreate them here as an optimization
- mods = set(p['name'] for p in Facts.PKG_MGRS)
- mods.update(('copy', 'file', 'setup', 'slurp', 'stat'))
- for mod_name in mods:
- action_write_locks[mod_name] = Lock()
-
# TODO: this should probably be in the plugins/__init__.py, with
# a smarter mechanism to set all of the attributes based on
# the loaders created there
@@ -86,6 +71,25 @@ class SharedPluginLoaderObj:
self.module_loader = module_loader
+_sentinel = object()
+def results_thread_main(strategy):
+ #print("RESULT THREAD STARTING: %s" % threading.current_thread())
+ while True:
+ try:
+ result = strategy._final_q.get()
+ if type(result) == object:
+ break
+ else:
+ #print("result in thread is: %s" % result._result)
+ strategy._results_lock.acquire()
+ strategy._results.append(result)
+ strategy._results_lock.release()
+ except (IOError, EOFError):
+ break
+ except Queue.Empty:
+ pass
+ #print("RESULT THREAD EXITED: %s" % threading.current_thread())
+
class StrategyBase:
'''
@@ -104,6 +108,7 @@ class StrategyBase:
self._final_q = tqm._final_q
self._step = getattr(tqm._options, 'step', False)
self._diff = getattr(tqm._options, 'diff', False)
+
# Backwards compat: self._display isn't really needed, just import the global display and use that.
self._display = display
@@ -115,6 +120,18 @@ class StrategyBase:
# outstanding tasks still in queue
self._blocked_hosts = dict()
+ self._results = deque()
+ self._results_lock = threading.Condition(threading.Lock())
+
+ #print("creating thread for strategy %s" % id(self))
+ self._results_thread = threading.Thread(target=results_thread_main, args=(self,))
+ self._results_thread.daemon = True
+ self._results_thread.start()
+
+ def cleanup(self):
+ self._final_q.put(_sentinel)
+ self._results_thread.join()
+
def run(self, iterator, play_context, result=0):
# save the failed/unreachable hosts, as the run_handlers()
# method will clear that information during its execution
@@ -174,10 +191,9 @@ class StrategyBase:
# tasks inside of play_iterator so we'd have to extract them to do it
# there.
- global action_write_locks
- if task.action not in action_write_locks:
+ if task.action not in action_write_locks.action_write_locks:
display.debug('Creating lock for %s' % task.action)
- action_write_locks[task.action] = Lock()
+ action_write_locks.action_write_locks[task.action] = Lock()
# and then queue the new task
try:
@@ -211,7 +227,7 @@ class StrategyBase:
return
display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action))
- def _process_pending_results(self, iterator, one_pass=False):
+ def _process_pending_results(self, iterator, one_pass=False, max_passes=None):
'''
Reads results off the final queue and takes appropriate action
based on the result (executing callbacks, updating state, etc.).
@@ -270,228 +286,232 @@ class StrategyBase:
else:
return False
- passes = 0
- while not self._tqm._terminated:
+ cur_pass = 0
+ while True:
try:
- task_result = self._final_q.get(timeout=0.001)
- original_host = get_original_host(task_result._host)
- original_task = iterator.get_original_task(original_host, task_result._task)
- task_result._host = original_host
- task_result._task = original_task
-
- # send callbacks for 'non final' results
- if '_ansible_retry' in task_result._result:
- self._tqm.send_callback('v2_runner_retry', task_result)
- continue
- elif '_ansible_item_result' in task_result._result:
- if task_result.is_failed() or task_result.is_unreachable():
- self._tqm.send_callback('v2_runner_item_on_failed', task_result)
- elif task_result.is_skipped():
- self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
- else:
- if 'diff' in task_result._result:
- if self._diff:
- self._tqm.send_callback('v2_on_file_diff', task_result)
- self._tqm.send_callback('v2_runner_item_on_ok', task_result)
- continue
+ self._results_lock.acquire()
+ task_result = self._results.pop()
+ except IndexError:
+ break
+ finally:
+ self._results_lock.release()
+
+ original_host = get_original_host(task_result._host)
+ original_task = iterator.get_original_task(original_host, task_result._task)
+ task_result._host = original_host
+ task_result._task = original_task
+
+ # send callbacks for 'non final' results
+ if '_ansible_retry' in task_result._result:
+ self._tqm.send_callback('v2_runner_retry', task_result)
+ continue
+ elif '_ansible_item_result' in task_result._result:
+ if task_result.is_failed() or task_result.is_unreachable():
+ self._tqm.send_callback('v2_runner_item_on_failed', task_result)
+ elif task_result.is_skipped():
+ self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
+ else:
+ if 'diff' in task_result._result:
+ if self._diff:
+ self._tqm.send_callback('v2_on_file_diff', task_result)
+ self._tqm.send_callback('v2_runner_item_on_ok', task_result)
+ continue
+
+ if original_task.register:
+ #print("^ REGISTERING RESULT %s" % original_task.register)
+ if original_task.run_once:
+ host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
+ else:
+ host_list = [original_host]
+
+ clean_copy = strip_internal_keys(task_result._result)
+ if 'invocation' in clean_copy:
+ del clean_copy['invocation']
- if original_task.register:
+ for target_host in host_list:
+ self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy})
+
+ # all host status messages contain 2 entries: (msg, task_result)
+ role_ran = False
+ if task_result.is_failed():
+ role_ran = True
+ if not original_task.ignore_errors:
+ display.debug("marking %s as failed" % original_host.name)
if original_task.run_once:
- host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
+ # if we're using run_once, we have to fail every host here
+ for h in self._inventory.get_hosts(iterator._play.hosts):
+ if h.name not in self._tqm._unreachable_hosts:
+ state, _ = iterator.get_next_task_for_host(h, peek=True)
+ iterator.mark_host_failed(h)
+ state, new_task = iterator.get_next_task_for_host(h, peek=True)
else:
- host_list = [original_host]
-
- clean_copy = strip_internal_keys(task_result._result)
- if 'invocation' in clean_copy:
- del clean_copy['invocation']
-
- for target_host in host_list:
- self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy})
-
- # all host status messages contain 2 entries: (msg, task_result)
- role_ran = False
- if task_result.is_failed():
- role_ran = True
- if not original_task.ignore_errors:
- display.debug("marking %s as failed" % original_host.name)
- if original_task.run_once:
- # if we're using run_once, we have to fail every host here
- for h in self._inventory.get_hosts(iterator._play.hosts):
- if h.name not in self._tqm._unreachable_hosts:
- state, _ = iterator.get_next_task_for_host(h, peek=True)
- iterator.mark_host_failed(h)
- state, new_task = iterator.get_next_task_for_host(h, peek=True)
- else:
- iterator.mark_host_failed(original_host)
+ iterator.mark_host_failed(original_host)
- # only add the host to the failed list officially if it has
- # been failed by the iterator
- if iterator.is_failed(original_host):
- self._tqm._failed_hosts[original_host.name] = True
- self._tqm._stats.increment('failures', original_host.name)
- else:
- # otherwise, we grab the current state and if we're iterating on
- # the rescue portion of a block then we save the failed task in a
- # special var for use within the rescue/always
- state, _ = iterator.get_next_task_for_host(original_host, peek=True)
- if state.run_state == iterator.ITERATING_RESCUE:
- self._variable_manager.set_nonpersistent_facts(
- original_host,
- dict(
- ansible_failed_task=original_task.serialize(),
- ansible_failed_result=task_result._result,
- ),
- )
+ # only add the host to the failed list officially if it has
+ # been failed by the iterator
+ if iterator.is_failed(original_host):
+ self._tqm._failed_hosts[original_host.name] = True
+ self._tqm._stats.increment('failures', original_host.name)
else:
- self._tqm._stats.increment('ok', original_host.name)
- self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors)
- elif task_result.is_unreachable():
- self._tqm._unreachable_hosts[original_host.name] = True
- self._tqm._stats.increment('dark', original_host.name)
- self._tqm.send_callback('v2_runner_on_unreachable', task_result)
- elif task_result.is_skipped():
- self._tqm._stats.increment('skipped', original_host.name)
- self._tqm.send_callback('v2_runner_on_skipped', task_result)
+ # otherwise, we grab the current state and if we're iterating on
+ # the rescue portion of a block then we save the failed task in a
+ # special var for use within the rescue/always
+ state, _ = iterator.get_next_task_for_host(original_host, peek=True)
+ if state.run_state == iterator.ITERATING_RESCUE:
+ self._variable_manager.set_nonpersistent_facts(
+ original_host,
+ dict(
+ ansible_failed_task=original_task.serialize(),
+ ansible_failed_result=task_result._result,
+ ),
+ )
else:
- role_ran = True
+ self._tqm._stats.increment('ok', original_host.name)
+ self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors)
+ elif task_result.is_unreachable():
+ self._tqm._unreachable_hosts[original_host.name] = True
+ self._tqm._stats.increment('dark', original_host.name)
+ self._tqm.send_callback('v2_runner_on_unreachable', task_result)
+ elif task_result.is_skipped():
+ self._tqm._stats.increment('skipped', original_host.name)
+ self._tqm.send_callback('v2_runner_on_skipped', task_result)
+ else:
+ role_ran = True
- if original_task.loop:
- # this task had a loop, and has more than one result, so
- # loop over all of them instead of a single result
- result_items = task_result._result.get('results', [])
- else:
- result_items = [ task_result._result ]
-
- for result_item in result_items:
- if '_ansible_notify' in result_item:
- if task_result.is_changed():
- # The shared dictionary for notified handlers is a proxy, which
- # does not detect when sub-objects within the proxy are modified.
- # So, per the docs, we reassign the list so the proxy picks up and
- # notifies all other threads
- for handler_name in result_item['_ansible_notify']:
- # Find the handler using the above helper. First we look up the
- # dependency chain of the current task (if it's from a role), otherwise
- # we just look through the list of handlers in the current play/all
- # roles and use the first one that matches the notify name
- if handler_name in self._listening_handlers:
- for listening_handler_name in self._listening_handlers[handler_name]:
- listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers)
- if listening_handler is None:
- raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name)
- if original_host not in self._notified_handlers[listening_handler]:
- self._notified_handlers[listening_handler].append(original_host)
- display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,))
+ if original_task.loop:
+ # this task had a loop, and has more than one result, so
+ # loop over all of them instead of a single result
+ result_items = task_result._result.get('results', [])
+ else:
+ result_items = [ task_result._result ]
+
+ for result_item in result_items:
+ if '_ansible_notify' in result_item:
+ if task_result.is_changed():
+ # The shared dictionary for notified handlers is a proxy, which
+ # does not detect when sub-objects within the proxy are modified.
+ # So, per the docs, we reassign the list so the proxy picks up and
+ # notifies all other threads
+ for handler_name in result_item['_ansible_notify']:
+ # Find the handler using the above helper. First we look up the
+ # dependency chain of the current task (if it's from a role), otherwise
+ # we just look through the list of handlers in the current play/all
+ # roles and use the first one that matches the notify name
+ if handler_name in self._listening_handlers:
+ for listening_handler_name in self._listening_handlers[handler_name]:
+ listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers)
+ if listening_handler is None:
+ raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name)
+ if original_host not in self._notified_handlers[listening_handler]:
+ self._notified_handlers[listening_handler].append(original_host)
+ display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,))
+ else:
+ target_handler = search_handler_blocks(handler_name, iterator._play.handlers)
+ if target_handler is not None:
+ if original_host not in self._notified_handlers[target_handler]:
+ self._notified_handlers[target_handler].append(original_host)
+ # FIXME: should this be a callback?
+ display.vv("NOTIFIED HANDLER %s" % (handler_name,))
else:
- target_handler = search_handler_blocks(handler_name, iterator._play.handlers)
- if target_handler is not None:
- if original_host not in self._notified_handlers[target_handler]:
+ # As there may be more than one handler with the notified name as the
+ # parent, so we just keep track of whether or not we found one at all
+ found = False
+ for target_handler in self._notified_handlers:
+ if parent_handler_match(target_handler, handler_name):
self._notified_handlers[target_handler].append(original_host)
- # FIXME: should this be a callback?
- display.vv("NOTIFIED HANDLER %s" % (handler_name,))
- else:
- # As there may be more than one handler with the notified name as the
- # parent, so we just keep track of whether or not we found one at all
- found = False
- for target_handler in self._notified_handlers:
- if parent_handler_match(target_handler, handler_name):
- self._notified_handlers[target_handler].append(original_host)
- display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),))
- found = True
-
- # and if none were found, then we raise an error
- if not found:
- raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name)
-
-
- if 'add_host' in result_item:
- # this task added a new host (add_host module)
- new_host_info = result_item.get('add_host', dict())
- self._add_host(new_host_info, iterator)
-
- elif 'add_group' in result_item:
- # this task added a new group (group_by module)
- self._add_group(original_host, result_item)
-
- elif 'ansible_facts' in result_item:
- loop_var = 'item'
- if original_task.loop_control:
- loop_var = original_task.loop_control.loop_var or 'item'
-
- item = result_item.get(loop_var, None)
-
- if original_task.action == 'include_vars':
- for (var_name, var_value) in iteritems(result_item['ansible_facts']):
- # find the host we're actually refering too here, which may
- # be a host that is not really in inventory at all
- if original_task.delegate_to is not None and original_task.delegate_facts:
- task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=original_task)
- self.add_tqm_variables(task_vars, play=iterator._play)
- if item is not None:
- task_vars[loop_var] = item
- templar = Templar(loader=self._loader, variables=task_vars)
- host_name = templar.template(original_task.delegate_to)
- actual_host = self._inventory.get_host(host_name)
- if actual_host is None:
- actual_host = Host(name=host_name)
- else:
- actual_host = original_host
-
- if original_task.run_once:
- host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
- else:
- host_list = [actual_host]
+ display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),))
+ found = True
+
+ # and if none were found, then we raise an error
+ if not found:
+ raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name)
+
+
+ if 'add_host' in result_item:
+ # this task added a new host (add_host module)
+ new_host_info = result_item.get('add_host', dict())
+ self._add_host(new_host_info, iterator)
+
+ elif 'add_group' in result_item:
+ # this task added a new group (group_by module)
+ self._add_group(original_host, result_item)
+
+ elif 'ansible_facts' in result_item:
+ loop_var = 'item'
+ if original_task.loop_control:
+ loop_var = original_task.loop_control.loop_var or 'item'
+
+ item = result_item.get(loop_var, None)
+
+ if original_task.action == 'include_vars':
+ for (var_name, var_value) in iteritems(result_item['ansible_facts']):
+ # find the host we're actually refering too here, which may
+ # be a host that is not really in inventory at all
+ if original_task.delegate_to is not None and original_task.delegate_facts:
+ task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=original_task)
+ self.add_tqm_variables(task_vars, play=iterator._play)
+ if item is not None:
+ task_vars[loop_var] = item
+ templar = Templar(loader=self._loader, variables=task_vars)
+ host_name = templar.template(original_task.delegate_to)
+ actual_host = self._inventory.get_host(host_name)
+ if actual_host is None:
+ actual_host = Host(name=host_name)
+ else:
+ actual_host = original_host
- for target_host in host_list:
- self._variable_manager.set_host_variable(target_host, var_name, var_value)
- else:
if original_task.run_once:
host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
else:
- host_list = [original_host]
+ host_list = [actual_host]
for target_host in host_list:
- if original_task.action == 'set_fact':
- self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy())
- else:
- self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy())
+ self._variable_manager.set_host_variable(target_host, var_name, var_value)
+ else:
+ if original_task.run_once:
+ host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
+ else:
+ host_list = [original_host]
- if 'diff' in task_result._result:
- if self._diff:
- self._tqm.send_callback('v2_on_file_diff', task_result)
+ for target_host in host_list:
+ if original_task.action == 'set_fact':
+ self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy())
+ else:
+ self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy())
- if original_task.action not in ['include', 'include_role']:
- self._tqm._stats.increment('ok', original_host.name)
- if 'changed' in task_result._result and task_result._result['changed']:
- self._tqm._stats.increment('changed', original_host.name)
+ if 'diff' in task_result._result:
+ if self._diff:
+ self._tqm.send_callback('v2_on_file_diff', task_result)
- # finally, send the ok for this task
- self._tqm.send_callback('v2_runner_on_ok', task_result)
+ if original_task.action not in ['include', 'include_role']:
+ self._tqm._stats.increment('ok', original_host.name)
+ if 'changed' in task_result._result and task_result._result['changed']:
+ self._tqm._stats.increment('changed', original_host.name)
- self._pending_results -= 1
- if original_host.name in self._blocked_hosts:
- del self._blocked_hosts[original_host.name]
+ # finally, send the ok for this task
+ self._tqm.send_callback('v2_runner_on_ok', task_result)
- # If this is a role task, mark the parent role as being run (if
- # the task was ok or failed, but not skipped or unreachable)
- if original_task._role is not None and role_ran: #TODO: and original_task.action != 'include_role':?
- # lookup the role in the ROLE_CACHE to make sure we're dealing
- # with the correct object and mark it as executed
- for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]):
- if role_obj._uuid == original_task._role._uuid:
- role_obj._had_task_run[original_host.name] = True
+ self._pending_results -= 1
+ if original_host.name in self._blocked_hosts:
+ del self._blocked_hosts[original_host.name]
- ret_results.append(task_result)
+ # If this is a role task, mark the parent role as being run (if
+ # the task was ok or failed, but not skipped or unreachable)
+ if original_task._role is not None and role_ran: #TODO: and original_task.action != 'include_role':?
+ # lookup the role in the ROLE_CACHE to make sure we're dealing
+ # with the correct object and mark it as executed
+ for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]):
+ if role_obj._uuid == original_task._role._uuid:
+ role_obj._had_task_run[original_host.name] = True
- except Queue.Empty:
- passes += 1
- if passes > 2:
- break
+ ret_results.append(task_result)
- if one_pass:
+ if one_pass or max_passes is not None and (cur_pass+1) >= max_passes:
break
+ cur_pass += 1
+
return ret_results
def _wait_on_pending_results(self, iterator):
diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py
index c6ef0a347b..8f42ef81b5 100644
--- a/lib/ansible/plugins/strategy/linear.py
+++ b/lib/ansible/plugins/strategy/linear.py
@@ -263,17 +263,15 @@ class StrategyModule(StrategyBase):
if run_once:
break
- # FIXME: probably not required here any more with the result proc
- # having been removed, so there's no only a single result
- # queue for the main thread
- results += self._process_pending_results(iterator, one_pass=True)
+ results += self._process_pending_results(iterator, max_passes=max(1, int(len(self._tqm._workers) * 0.1)))
# go to next host/task group
if skip_rest:
continue
display.debug("done queuing things up, now waiting for results queue to drain")
- results += self._wait_on_pending_results(iterator)
+ if self._pending_results > 0:
+ results += self._wait_on_pending_results(iterator)
host_results.extend(results)
all_role_blocks = []
diff --git a/test/units/plugins/strategies/test_strategy_base.py b/test/units/plugins/strategies/test_strategy_base.py
index 49fd095bd4..faa6173eb4 100644
--- a/test/units/plugins/strategies/test_strategy_base.py
+++ b/test/units/plugins/strategies/test_strategy_base.py
@@ -45,16 +45,49 @@ class TestStrategyBase(unittest.TestCase):
pass
def test_strategy_base_init(self):
+ queue_items = []
+ def _queue_empty(*args, **kwargs):
+ return len(queue_items) == 0
+ def _queue_get(*args, **kwargs):
+ if len(queue_items) == 0:
+ raise Queue.Empty
+ else:
+ return queue_items.pop()
+ def _queue_put(item, *args, **kwargs):
+ queue_items.append(item)
+
+ mock_queue = MagicMock()
+ mock_queue.empty.side_effect = _queue_empty
+ mock_queue.get.side_effect = _queue_get
+ mock_queue.put.side_effect = _queue_put
+
mock_tqm = MagicMock(TaskQueueManager)
- mock_tqm._final_q = MagicMock()
+ mock_tqm._final_q = mock_queue
mock_tqm._options = MagicMock()
mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {}
strategy_base = StrategyBase(tqm=mock_tqm)
+ strategy_base.cleanup()
def test_strategy_base_run(self):
+ queue_items = []
+ def _queue_empty(*args, **kwargs):
+ return len(queue_items) == 0
+ def _queue_get(*args, **kwargs):
+ if len(queue_items) == 0:
+ raise Queue.Empty
+ else:
+ return queue_items.pop()
+ def _queue_put(item, *args, **kwargs):
+ queue_items.append(item)
+
+ mock_queue = MagicMock()
+ mock_queue.empty.side_effect = _queue_empty
+ mock_queue.get.side_effect = _queue_get
+ mock_queue.put.side_effect = _queue_put
+
mock_tqm = MagicMock(TaskQueueManager)
- mock_tqm._final_q = MagicMock()
+ mock_tqm._final_q = mock_queue
mock_tqm._stats = MagicMock()
mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {}
@@ -87,8 +120,25 @@ class TestStrategyBase(unittest.TestCase):
mock_tqm._unreachable_hosts = dict(host1=True)
mock_iterator.get_failed_hosts.return_value = []
self.assertEqual(strategy_base.run(iterator=mock_iterator, play_context=mock_play_context, result=False), mock_tqm.RUN_UNREACHABLE_HOSTS)
+ strategy_base.cleanup()
def test_strategy_base_get_hosts(self):
+ queue_items = []
+ def _queue_empty(*args, **kwargs):
+ return len(queue_items) == 0
+ def _queue_get(*args, **kwargs):
+ if len(queue_items) == 0:
+ raise Queue.Empty
+ else:
+ return queue_items.pop()
+ def _queue_put(item, *args, **kwargs):
+ queue_items.append(item)
+
+ mock_queue = MagicMock()
+ mock_queue.empty.side_effect = _queue_empty
+ mock_queue.get.side_effect = _queue_get
+ mock_queue.put.side_effect = _queue_put
+
mock_hosts = []
for i in range(0, 5):
mock_host = MagicMock()
@@ -100,7 +150,7 @@ class TestStrategyBase(unittest.TestCase):
mock_inventory.get_hosts.return_value = mock_hosts
mock_tqm = MagicMock()
- mock_tqm._final_q = MagicMock()
+ mock_tqm._final_q = mock_queue
mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {}
mock_tqm.get_inventory.return_value = mock_inventory
@@ -120,6 +170,7 @@ class TestStrategyBase(unittest.TestCase):
mock_tqm._unreachable_hosts = ["host02"]
self.assertEqual(strategy_base.get_hosts_remaining(play=mock_play), mock_hosts[2:])
+ strategy_base.cleanup()
@patch.object(WorkerProcess, 'run')
def test_strategy_base_queue_task(self, mock_worker):
@@ -178,10 +229,13 @@ class TestStrategyBase(unittest.TestCase):
raise Queue.Empty
else:
return queue_items.pop()
+ def _queue_put(item, *args, **kwargs):
+ queue_items.append(item)
mock_queue = MagicMock()
mock_queue.empty.side_effect = _queue_empty
mock_queue.get.side_effect = _queue_get
+ mock_queue.put.side_effect = _queue_put
mock_tqm._final_q = mock_queue
mock_tqm._stats = MagicMock()
@@ -272,7 +326,7 @@ class TestStrategyBase(unittest.TestCase):
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
mock_iterator.is_failed.return_value = True
- results = strategy_base._process_pending_results(iterator=mock_iterator)
+ results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
self.assertEqual(strategy_base._pending_results, 0)
@@ -306,7 +360,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo']))))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
- results = strategy_base._process_pending_results(iterator=mock_iterator)
+ results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts)
@@ -314,7 +368,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo'))))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
- results = strategy_base._process_pending_results(iterator=mock_iterator)
+ results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts)
@@ -322,7 +376,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler'])))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
- results = strategy_base._process_pending_results(iterator=mock_iterator)
+ results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts)
@@ -341,6 +395,7 @@ class TestStrategyBase(unittest.TestCase):
#queue_items.append(('bad'))
#self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator)
+ strategy_base.cleanup()
def test_strategy_base_load_included_file(self):
fake_loader = DictDataLoader({
@@ -351,13 +406,30 @@ class TestStrategyBase(unittest.TestCase):
""",
})
+ queue_items = []
+ def _queue_empty(*args, **kwargs):
+ return len(queue_items) == 0
+ def _queue_get(*args, **kwargs):
+ if len(queue_items) == 0:
+ raise Queue.Empty
+ else:
+ return queue_items.pop()
+ def _queue_put(item, *args, **kwargs):
+ queue_items.append(item)
+
+ mock_queue = MagicMock()
+ mock_queue.empty.side_effect = _queue_empty
+ mock_queue.get.side_effect = _queue_get
+ mock_queue.put.side_effect = _queue_put
+
mock_tqm = MagicMock()
- mock_tqm._final_q = MagicMock()
+ mock_tqm._final_q = mock_queue
mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {}
strategy_base = StrategyBase(tqm=mock_tqm)
strategy_base._loader = fake_loader
+ strategy_base.cleanup()
mock_play = MagicMock()
@@ -443,4 +515,5 @@ class TestStrategyBase(unittest.TestCase):
result = strategy_base.run_handlers(iterator=mock_iterator, play_context=mock_play_context)
finally:
+ strategy_base.cleanup()
tqm.cleanup()