diff options
Diffstat (limited to 'lib/ansible/plugins/strategy/__init__.py')
-rw-r--r-- | lib/ansible/plugins/strategy/__init__.py | 456 |
1 files changed, 238 insertions, 218 deletions
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index aaa164ee75..d6db1dbde8 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -19,14 +19,18 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type +import os +import threading import time +from collections import deque from multiprocessing import Lock from jinja2.exceptions import UndefinedError from ansible.compat.six.moves import queue as Queue from ansible.compat.six import iteritems, string_types from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable +from ansible.executor import action_write_locks from ansible.executor.process.worker import WorkerProcess from ansible.executor.task_result import TaskResult from ansible.inventory.host import Host @@ -50,25 +54,6 @@ except ImportError: __all__ = ['StrategyBase'] -if 'action_write_locks' not in globals(): - # Do not initialize this more than once because it seems to bash - # the existing one. multiprocessing must be reloading the module - # when it forks? - action_write_locks = dict() - - # Below is a Lock for use when we weren't expecting a named module. - # It gets used when an action plugin directly invokes a module instead - # of going through the strategies. Slightly less efficient as all - # processes with unexpected module names will wait on this lock - action_write_locks[None] = Lock() - - # These plugins are called directly by action plugins (not going through - # a strategy). We precreate them here as an optimization - mods = set(p['name'] for p in Facts.PKG_MGRS) - mods.update(('copy', 'file', 'setup', 'slurp', 'stat')) - for mod_name in mods: - action_write_locks[mod_name] = Lock() - # TODO: this should probably be in the plugins/__init__.py, with # a smarter mechanism to set all of the attributes based on # the loaders created there @@ -86,6 +71,25 @@ class SharedPluginLoaderObj: self.module_loader = module_loader +_sentinel = object() +def results_thread_main(strategy): + #print("RESULT THREAD STARTING: %s" % threading.current_thread()) + while True: + try: + result = strategy._final_q.get() + if type(result) == object: + break + else: + #print("result in thread is: %s" % result._result) + strategy._results_lock.acquire() + strategy._results.append(result) + strategy._results_lock.release() + except (IOError, EOFError): + break + except Queue.Empty: + pass + #print("RESULT THREAD EXITED: %s" % threading.current_thread()) + class StrategyBase: ''' @@ -104,6 +108,7 @@ class StrategyBase: self._final_q = tqm._final_q self._step = getattr(tqm._options, 'step', False) self._diff = getattr(tqm._options, 'diff', False) + # Backwards compat: self._display isn't really needed, just import the global display and use that. self._display = display @@ -115,6 +120,18 @@ class StrategyBase: # outstanding tasks still in queue self._blocked_hosts = dict() + self._results = deque() + self._results_lock = threading.Condition(threading.Lock()) + + #print("creating thread for strategy %s" % id(self)) + self._results_thread = threading.Thread(target=results_thread_main, args=(self,)) + self._results_thread.daemon = True + self._results_thread.start() + + def cleanup(self): + self._final_q.put(_sentinel) + self._results_thread.join() + def run(self, iterator, play_context, result=0): # save the failed/unreachable hosts, as the run_handlers() # method will clear that information during its execution @@ -174,10 +191,9 @@ class StrategyBase: # tasks inside of play_iterator so we'd have to extract them to do it # there. - global action_write_locks - if task.action not in action_write_locks: + if task.action not in action_write_locks.action_write_locks: display.debug('Creating lock for %s' % task.action) - action_write_locks[task.action] = Lock() + action_write_locks.action_write_locks[task.action] = Lock() # and then queue the new task try: @@ -211,7 +227,7 @@ class StrategyBase: return display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action)) - def _process_pending_results(self, iterator, one_pass=False): + def _process_pending_results(self, iterator, one_pass=False, max_passes=None): ''' Reads results off the final queue and takes appropriate action based on the result (executing callbacks, updating state, etc.). @@ -270,228 +286,232 @@ class StrategyBase: else: return False - passes = 0 - while not self._tqm._terminated: + cur_pass = 0 + while True: try: - task_result = self._final_q.get(timeout=0.001) - original_host = get_original_host(task_result._host) - original_task = iterator.get_original_task(original_host, task_result._task) - task_result._host = original_host - task_result._task = original_task - - # send callbacks for 'non final' results - if '_ansible_retry' in task_result._result: - self._tqm.send_callback('v2_runner_retry', task_result) - continue - elif '_ansible_item_result' in task_result._result: - if task_result.is_failed() or task_result.is_unreachable(): - self._tqm.send_callback('v2_runner_item_on_failed', task_result) - elif task_result.is_skipped(): - self._tqm.send_callback('v2_runner_item_on_skipped', task_result) - else: - if 'diff' in task_result._result: - if self._diff: - self._tqm.send_callback('v2_on_file_diff', task_result) - self._tqm.send_callback('v2_runner_item_on_ok', task_result) - continue + self._results_lock.acquire() + task_result = self._results.pop() + except IndexError: + break + finally: + self._results_lock.release() + + original_host = get_original_host(task_result._host) + original_task = iterator.get_original_task(original_host, task_result._task) + task_result._host = original_host + task_result._task = original_task + + # send callbacks for 'non final' results + if '_ansible_retry' in task_result._result: + self._tqm.send_callback('v2_runner_retry', task_result) + continue + elif '_ansible_item_result' in task_result._result: + if task_result.is_failed() or task_result.is_unreachable(): + self._tqm.send_callback('v2_runner_item_on_failed', task_result) + elif task_result.is_skipped(): + self._tqm.send_callback('v2_runner_item_on_skipped', task_result) + else: + if 'diff' in task_result._result: + if self._diff: + self._tqm.send_callback('v2_on_file_diff', task_result) + self._tqm.send_callback('v2_runner_item_on_ok', task_result) + continue + + if original_task.register: + #print("^ REGISTERING RESULT %s" % original_task.register) + if original_task.run_once: + host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + else: + host_list = [original_host] + + clean_copy = strip_internal_keys(task_result._result) + if 'invocation' in clean_copy: + del clean_copy['invocation'] - if original_task.register: + for target_host in host_list: + self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy}) + + # all host status messages contain 2 entries: (msg, task_result) + role_ran = False + if task_result.is_failed(): + role_ran = True + if not original_task.ignore_errors: + display.debug("marking %s as failed" % original_host.name) if original_task.run_once: - host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + # if we're using run_once, we have to fail every host here + for h in self._inventory.get_hosts(iterator._play.hosts): + if h.name not in self._tqm._unreachable_hosts: + state, _ = iterator.get_next_task_for_host(h, peek=True) + iterator.mark_host_failed(h) + state, new_task = iterator.get_next_task_for_host(h, peek=True) else: - host_list = [original_host] - - clean_copy = strip_internal_keys(task_result._result) - if 'invocation' in clean_copy: - del clean_copy['invocation'] - - for target_host in host_list: - self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy}) - - # all host status messages contain 2 entries: (msg, task_result) - role_ran = False - if task_result.is_failed(): - role_ran = True - if not original_task.ignore_errors: - display.debug("marking %s as failed" % original_host.name) - if original_task.run_once: - # if we're using run_once, we have to fail every host here - for h in self._inventory.get_hosts(iterator._play.hosts): - if h.name not in self._tqm._unreachable_hosts: - state, _ = iterator.get_next_task_for_host(h, peek=True) - iterator.mark_host_failed(h) - state, new_task = iterator.get_next_task_for_host(h, peek=True) - else: - iterator.mark_host_failed(original_host) + iterator.mark_host_failed(original_host) - # only add the host to the failed list officially if it has - # been failed by the iterator - if iterator.is_failed(original_host): - self._tqm._failed_hosts[original_host.name] = True - self._tqm._stats.increment('failures', original_host.name) - else: - # otherwise, we grab the current state and if we're iterating on - # the rescue portion of a block then we save the failed task in a - # special var for use within the rescue/always - state, _ = iterator.get_next_task_for_host(original_host, peek=True) - if state.run_state == iterator.ITERATING_RESCUE: - self._variable_manager.set_nonpersistent_facts( - original_host, - dict( - ansible_failed_task=original_task.serialize(), - ansible_failed_result=task_result._result, - ), - ) + # only add the host to the failed list officially if it has + # been failed by the iterator + if iterator.is_failed(original_host): + self._tqm._failed_hosts[original_host.name] = True + self._tqm._stats.increment('failures', original_host.name) else: - self._tqm._stats.increment('ok', original_host.name) - self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors) - elif task_result.is_unreachable(): - self._tqm._unreachable_hosts[original_host.name] = True - self._tqm._stats.increment('dark', original_host.name) - self._tqm.send_callback('v2_runner_on_unreachable', task_result) - elif task_result.is_skipped(): - self._tqm._stats.increment('skipped', original_host.name) - self._tqm.send_callback('v2_runner_on_skipped', task_result) + # otherwise, we grab the current state and if we're iterating on + # the rescue portion of a block then we save the failed task in a + # special var for use within the rescue/always + state, _ = iterator.get_next_task_for_host(original_host, peek=True) + if state.run_state == iterator.ITERATING_RESCUE: + self._variable_manager.set_nonpersistent_facts( + original_host, + dict( + ansible_failed_task=original_task.serialize(), + ansible_failed_result=task_result._result, + ), + ) else: - role_ran = True + self._tqm._stats.increment('ok', original_host.name) + self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors) + elif task_result.is_unreachable(): + self._tqm._unreachable_hosts[original_host.name] = True + self._tqm._stats.increment('dark', original_host.name) + self._tqm.send_callback('v2_runner_on_unreachable', task_result) + elif task_result.is_skipped(): + self._tqm._stats.increment('skipped', original_host.name) + self._tqm.send_callback('v2_runner_on_skipped', task_result) + else: + role_ran = True - if original_task.loop: - # this task had a loop, and has more than one result, so - # loop over all of them instead of a single result - result_items = task_result._result.get('results', []) - else: - result_items = [ task_result._result ] - - for result_item in result_items: - if '_ansible_notify' in result_item: - if task_result.is_changed(): - # The shared dictionary for notified handlers is a proxy, which - # does not detect when sub-objects within the proxy are modified. - # So, per the docs, we reassign the list so the proxy picks up and - # notifies all other threads - for handler_name in result_item['_ansible_notify']: - # Find the handler using the above helper. First we look up the - # dependency chain of the current task (if it's from a role), otherwise - # we just look through the list of handlers in the current play/all - # roles and use the first one that matches the notify name - if handler_name in self._listening_handlers: - for listening_handler_name in self._listening_handlers[handler_name]: - listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers) - if listening_handler is None: - raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name) - if original_host not in self._notified_handlers[listening_handler]: - self._notified_handlers[listening_handler].append(original_host) - display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,)) + if original_task.loop: + # this task had a loop, and has more than one result, so + # loop over all of them instead of a single result + result_items = task_result._result.get('results', []) + else: + result_items = [ task_result._result ] + + for result_item in result_items: + if '_ansible_notify' in result_item: + if task_result.is_changed(): + # The shared dictionary for notified handlers is a proxy, which + # does not detect when sub-objects within the proxy are modified. + # So, per the docs, we reassign the list so the proxy picks up and + # notifies all other threads + for handler_name in result_item['_ansible_notify']: + # Find the handler using the above helper. First we look up the + # dependency chain of the current task (if it's from a role), otherwise + # we just look through the list of handlers in the current play/all + # roles and use the first one that matches the notify name + if handler_name in self._listening_handlers: + for listening_handler_name in self._listening_handlers[handler_name]: + listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers) + if listening_handler is None: + raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name) + if original_host not in self._notified_handlers[listening_handler]: + self._notified_handlers[listening_handler].append(original_host) + display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,)) + else: + target_handler = search_handler_blocks(handler_name, iterator._play.handlers) + if target_handler is not None: + if original_host not in self._notified_handlers[target_handler]: + self._notified_handlers[target_handler].append(original_host) + # FIXME: should this be a callback? + display.vv("NOTIFIED HANDLER %s" % (handler_name,)) else: - target_handler = search_handler_blocks(handler_name, iterator._play.handlers) - if target_handler is not None: - if original_host not in self._notified_handlers[target_handler]: + # As there may be more than one handler with the notified name as the + # parent, so we just keep track of whether or not we found one at all + found = False + for target_handler in self._notified_handlers: + if parent_handler_match(target_handler, handler_name): self._notified_handlers[target_handler].append(original_host) - # FIXME: should this be a callback? - display.vv("NOTIFIED HANDLER %s" % (handler_name,)) - else: - # As there may be more than one handler with the notified name as the - # parent, so we just keep track of whether or not we found one at all - found = False - for target_handler in self._notified_handlers: - if parent_handler_match(target_handler, handler_name): - self._notified_handlers[target_handler].append(original_host) - display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),)) - found = True - - # and if none were found, then we raise an error - if not found: - raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name) - - - if 'add_host' in result_item: - # this task added a new host (add_host module) - new_host_info = result_item.get('add_host', dict()) - self._add_host(new_host_info, iterator) - - elif 'add_group' in result_item: - # this task added a new group (group_by module) - self._add_group(original_host, result_item) - - elif 'ansible_facts' in result_item: - loop_var = 'item' - if original_task.loop_control: - loop_var = original_task.loop_control.loop_var or 'item' - - item = result_item.get(loop_var, None) - - if original_task.action == 'include_vars': - for (var_name, var_value) in iteritems(result_item['ansible_facts']): - # find the host we're actually refering too here, which may - # be a host that is not really in inventory at all - if original_task.delegate_to is not None and original_task.delegate_facts: - task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=original_task) - self.add_tqm_variables(task_vars, play=iterator._play) - if item is not None: - task_vars[loop_var] = item - templar = Templar(loader=self._loader, variables=task_vars) - host_name = templar.template(original_task.delegate_to) - actual_host = self._inventory.get_host(host_name) - if actual_host is None: - actual_host = Host(name=host_name) - else: - actual_host = original_host - - if original_task.run_once: - host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] - else: - host_list = [actual_host] + display.vv("NOTIFIED HANDLER %s" % (target_handler.get_name(),)) + found = True + + # and if none were found, then we raise an error + if not found: + raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name) + + + if 'add_host' in result_item: + # this task added a new host (add_host module) + new_host_info = result_item.get('add_host', dict()) + self._add_host(new_host_info, iterator) + + elif 'add_group' in result_item: + # this task added a new group (group_by module) + self._add_group(original_host, result_item) + + elif 'ansible_facts' in result_item: + loop_var = 'item' + if original_task.loop_control: + loop_var = original_task.loop_control.loop_var or 'item' + + item = result_item.get(loop_var, None) + + if original_task.action == 'include_vars': + for (var_name, var_value) in iteritems(result_item['ansible_facts']): + # find the host we're actually refering too here, which may + # be a host that is not really in inventory at all + if original_task.delegate_to is not None and original_task.delegate_facts: + task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=original_task) + self.add_tqm_variables(task_vars, play=iterator._play) + if item is not None: + task_vars[loop_var] = item + templar = Templar(loader=self._loader, variables=task_vars) + host_name = templar.template(original_task.delegate_to) + actual_host = self._inventory.get_host(host_name) + if actual_host is None: + actual_host = Host(name=host_name) + else: + actual_host = original_host - for target_host in host_list: - self._variable_manager.set_host_variable(target_host, var_name, var_value) - else: if original_task.run_once: host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] else: - host_list = [original_host] + host_list = [actual_host] for target_host in host_list: - if original_task.action == 'set_fact': - self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy()) - else: - self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy()) + self._variable_manager.set_host_variable(target_host, var_name, var_value) + else: + if original_task.run_once: + host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + else: + host_list = [original_host] - if 'diff' in task_result._result: - if self._diff: - self._tqm.send_callback('v2_on_file_diff', task_result) + for target_host in host_list: + if original_task.action == 'set_fact': + self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy()) + else: + self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy()) - if original_task.action not in ['include', 'include_role']: - self._tqm._stats.increment('ok', original_host.name) - if 'changed' in task_result._result and task_result._result['changed']: - self._tqm._stats.increment('changed', original_host.name) + if 'diff' in task_result._result: + if self._diff: + self._tqm.send_callback('v2_on_file_diff', task_result) - # finally, send the ok for this task - self._tqm.send_callback('v2_runner_on_ok', task_result) + if original_task.action not in ['include', 'include_role']: + self._tqm._stats.increment('ok', original_host.name) + if 'changed' in task_result._result and task_result._result['changed']: + self._tqm._stats.increment('changed', original_host.name) - self._pending_results -= 1 - if original_host.name in self._blocked_hosts: - del self._blocked_hosts[original_host.name] + # finally, send the ok for this task + self._tqm.send_callback('v2_runner_on_ok', task_result) - # If this is a role task, mark the parent role as being run (if - # the task was ok or failed, but not skipped or unreachable) - if original_task._role is not None and role_ran: #TODO: and original_task.action != 'include_role':? - # lookup the role in the ROLE_CACHE to make sure we're dealing - # with the correct object and mark it as executed - for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]): - if role_obj._uuid == original_task._role._uuid: - role_obj._had_task_run[original_host.name] = True + self._pending_results -= 1 + if original_host.name in self._blocked_hosts: + del self._blocked_hosts[original_host.name] - ret_results.append(task_result) + # If this is a role task, mark the parent role as being run (if + # the task was ok or failed, but not skipped or unreachable) + if original_task._role is not None and role_ran: #TODO: and original_task.action != 'include_role':? + # lookup the role in the ROLE_CACHE to make sure we're dealing + # with the correct object and mark it as executed + for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]): + if role_obj._uuid == original_task._role._uuid: + role_obj._had_task_run[original_host.name] = True - except Queue.Empty: - passes += 1 - if passes > 2: - break + ret_results.append(task_result) - if one_pass: + if one_pass or max_passes is not None and (cur_pass+1) >= max_passes: break + cur_pass += 1 + return ret_results def _wait_on_pending_results(self, iterator): |