summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Cammarata <jimi@sngx.net>2016-07-31 03:23:28 -0500
committerJames Cammarata <jimi@sngx.net>2016-08-08 15:58:46 -0500
commitd2b3b2c03e2934b126f5701e5f6e25821e2dbe35 (patch)
tree5f6885356a03e50afdafc6074096639dbc25b453
parent80385a47bda4207a39cc6437458bbe3c3bb34d63 (diff)
downloadansible-d2b3b2c03e2934b126f5701e5f6e25821e2dbe35.tar.gz
Performance improvements
-rw-r--r--lib/ansible/executor/play_iterator.py8
-rw-r--r--lib/ansible/executor/process/result.py196
-rw-r--r--lib/ansible/executor/process/worker.py18
-rw-r--r--lib/ansible/executor/task_executor.py6
-rw-r--r--lib/ansible/executor/task_queue_manager.py9
-rw-r--r--lib/ansible/executor/task_result.py4
-rw-r--r--lib/ansible/inventory/host.py10
-rw-r--r--lib/ansible/playbook/base.py7
-rw-r--r--lib/ansible/playbook/block.py10
-rw-r--r--lib/ansible/playbook/included_file.py14
-rw-r--r--lib/ansible/playbook/task.py7
-rw-r--r--lib/ansible/plugins/strategy/__init__.py428
-rw-r--r--lib/ansible/plugins/strategy/free.py1
-rw-r--r--lib/ansible/plugins/strategy/linear.py1
-rw-r--r--test/units/executor/test_task_executor.py2
-rw-r--r--test/units/plugins/strategies/test_strategy_base.py93
16 files changed, 332 insertions, 482 deletions
diff --git a/lib/ansible/executor/play_iterator.py b/lib/ansible/executor/play_iterator.py
index 51d066ead3..03fdc0b006 100644
--- a/lib/ansible/executor/play_iterator.py
+++ b/lib/ansible/executor/play_iterator.py
@@ -508,6 +508,12 @@ class PlayIterator:
the different processes, and not all data structures are preserved. This method
allows us to find the original task passed into the executor engine.
'''
+
+ if isinstance(task, Task):
+ the_uuid = task._uuid
+ else:
+ the_uuid = task
+
def _search_block(block):
'''
helper method to check a block's task lists (block/rescue/always)
@@ -521,7 +527,7 @@ class PlayIterator:
res = _search_block(t)
if res:
return res
- elif t._uuid == task._uuid:
+ elif t._uuid == the_uuid:
return t
return None
diff --git a/lib/ansible/executor/process/result.py b/lib/ansible/executor/process/result.py
deleted file mode 100644
index da06710293..0000000000
--- a/lib/ansible/executor/process/result.py
+++ /dev/null
@@ -1,196 +0,0 @@
-# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
-#
-# This file is part of Ansible
-#
-# Ansible is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Ansible is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-from ansible.compat.six.moves import queue
-from ansible.compat.six import iteritems, text_type
-from ansible.vars import strip_internal_keys
-
-import multiprocessing
-import time
-import traceback
-
-# TODO: not needed if we use the cryptography library with its default RNG
-# engine
-HAS_ATFORK=True
-try:
- from Crypto.Random import atfork
-except ImportError:
- HAS_ATFORK=False
-
-try:
- from __main__ import display
-except ImportError:
- from ansible.utils.display import Display
- display = Display()
-
-
-__all__ = ['ResultProcess']
-
-
-class ResultProcess(multiprocessing.Process):
- '''
- The result worker thread, which reads results from the results
- queue and fires off callbacks/etc. as necessary.
- '''
-
- def __init__(self, final_q, workers):
-
- # takes a task queue manager as the sole param:
- self._final_q = final_q
- self._workers = workers
- self._cur_worker = 0
- self._terminated = False
-
- super(ResultProcess, self).__init__()
-
- def _send_result(self, result):
- display.debug(u"sending result: %s" % ([text_type(x) for x in result],))
- self._final_q.put(result)
- display.debug("done sending result")
-
- def _read_worker_result(self):
- result = None
- starting_point = self._cur_worker
- while True:
- (worker_prc, rslt_q) = self._workers[self._cur_worker]
- self._cur_worker += 1
- if self._cur_worker >= len(self._workers):
- self._cur_worker = 0
-
- try:
- if not rslt_q.empty():
- display.debug("worker %d has data to read" % self._cur_worker)
- result = rslt_q.get()
- display.debug("got a result from worker %d: %s" % (self._cur_worker, result))
- break
- except queue.Empty:
- pass
-
- if self._cur_worker == starting_point:
- break
-
- return result
-
- def terminate(self):
- self._terminated = True
- super(ResultProcess, self).terminate()
-
- def run(self):
- '''
- The main thread execution, which reads from the results queue
- indefinitely and sends callbacks/etc. when results are received.
- '''
-
- if HAS_ATFORK:
- atfork()
-
- while True:
- try:
- result = self._read_worker_result()
- if result is None:
- time.sleep(0.005)
- continue
-
- # send callbacks for 'non final' results
- if '_ansible_retry' in result._result:
- self._send_result(('v2_runner_retry', result))
- continue
- elif '_ansible_item_result' in result._result:
- if result.is_failed() or result.is_unreachable():
- self._send_result(('v2_runner_item_on_failed', result))
- elif result.is_skipped():
- self._send_result(('v2_runner_item_on_skipped', result))
- else:
- self._send_result(('v2_runner_item_on_ok', result))
- if 'diff' in result._result:
- self._send_result(('v2_on_file_diff', result))
- continue
-
- clean_copy = strip_internal_keys(result._result)
- if 'invocation' in clean_copy:
- del clean_copy['invocation']
-
- # if this task is registering a result, do it now
- if result._task.register:
- self._send_result(('register_host_var', result._host, result._task, clean_copy))
-
- # send callbacks, execute other options based on the result status
- # TODO: this should all be cleaned up and probably moved to a sub-function.
- # the fact that this sometimes sends a TaskResult and other times
- # sends a raw dictionary back may be confusing, but the result vs.
- # results implementation for tasks with loops should be cleaned up
- # better than this
- if result.is_unreachable():
- self._send_result(('host_unreachable', result))
- elif result.is_failed():
- self._send_result(('host_task_failed', result))
- elif result.is_skipped():
- self._send_result(('host_task_skipped', result))
- else:
- if result._task.loop:
- # this task had a loop, and has more than one result, so
- # loop over all of them instead of a single result
- result_items = result._result.get('results', [])
- else:
- result_items = [ result._result ]
-
- for result_item in result_items:
- # if this task is notifying a handler, do it now
- if '_ansible_notify' in result_item:
- if result.is_changed():
- # The shared dictionary for notified handlers is a proxy, which
- # does not detect when sub-objects within the proxy are modified.
- # So, per the docs, we reassign the list so the proxy picks up and
- # notifies all other threads
- for notify in result_item['_ansible_notify']:
- self._send_result(('notify_handler', result, notify))
-
- if 'add_host' in result_item:
- # this task added a new host (add_host module)
- self._send_result(('add_host', result_item))
- elif 'add_group' in result_item:
- # this task added a new group (group_by module)
- self._send_result(('add_group', result._host, result_item))
- elif 'ansible_facts' in result_item:
- # if this task is registering facts, do that now
- loop_var = 'item'
- if result._task.loop_control:
- loop_var = result._task.loop_control.loop_var or 'item'
- item = result_item.get(loop_var, None)
- if result._task.action == 'include_vars':
- for (key, value) in iteritems(result_item['ansible_facts']):
- self._send_result(('set_host_var', result._host, result._task, item, key, value))
- else:
- self._send_result(('set_host_facts', result._host, result._task, item, result_item['ansible_facts']))
-
- # finally, send the ok for this task
- self._send_result(('host_task_ok', result))
-
- except queue.Empty:
- pass
- except (KeyboardInterrupt, SystemExit, IOError, EOFError):
- break
- except:
- # TODO: we should probably send a proper callback here instead of
- # simply dumping a stack trace on the screen
- traceback.print_exc()
- break
-
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py
index d1bc56f637..aeab3e75a7 100644
--- a/lib/ansible/executor/process/worker.py
+++ b/lib/ansible/executor/process/worker.py
@@ -100,6 +100,10 @@ class WorkerProcess(multiprocessing.Process):
signify that they are ready for their next task.
'''
+ #import cProfile, pstats, StringIO
+ #pr = cProfile.Profile()
+ #pr.enable()
+
if HAS_ATFORK:
atfork()
@@ -120,7 +124,7 @@ class WorkerProcess(multiprocessing.Process):
display.debug("done running TaskExecutor() for %s/%s" % (self._host, self._task))
self._host.vars = dict()
self._host.groups = []
- task_result = TaskResult(self._host, self._task, executor_result)
+ task_result = TaskResult(self._host.name, self._task._uuid, executor_result)
# put the result on the result queue
display.debug("sending task result")
@@ -130,7 +134,7 @@ class WorkerProcess(multiprocessing.Process):
except AnsibleConnectionFailure:
self._host.vars = dict()
self._host.groups = []
- task_result = TaskResult(self._host, self._task, dict(unreachable=True))
+ task_result = TaskResult(self._host.name, self._task._uuid, dict(unreachable=True))
self._rslt_q.put(task_result, block=False)
except Exception as e:
@@ -138,7 +142,7 @@ class WorkerProcess(multiprocessing.Process):
try:
self._host.vars = dict()
self._host.groups = []
- task_result = TaskResult(self._host, self._task, dict(failed=True, exception=to_unicode(traceback.format_exc()), stdout=''))
+ task_result = TaskResult(self._host.name, self._task._uuid, dict(failed=True, exception=to_unicode(traceback.format_exc()), stdout=''))
self._rslt_q.put(task_result, block=False)
except:
display.debug(u"WORKER EXCEPTION: %s" % to_unicode(e))
@@ -146,3 +150,11 @@ class WorkerProcess(multiprocessing.Process):
display.debug("WORKER PROCESS EXITING")
+ #pr.disable()
+ #s = StringIO.StringIO()
+ #sortby = 'time'
+ #ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
+ #ps.print_stats()
+ #with open('worker_%06d.stats' % os.getpid(), 'w') as f:
+ # f.write(s.getvalue())
+
diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py
index e34c4784e1..88f69070eb 100644
--- a/lib/ansible/executor/task_executor.py
+++ b/lib/ansible/executor/task_executor.py
@@ -246,7 +246,7 @@ class TaskExecutor:
task_vars[loop_var] = item
try:
- tmp_task = self._task.copy()
+ tmp_task = self._task.copy(exclude_tasks=True)
tmp_play_context = self._play_context.copy()
except AnsibleParserError as e:
results.append(dict(failed=True, msg=to_unicode(e)))
@@ -265,7 +265,7 @@ class TaskExecutor:
res[loop_var] = item
res['_ansible_item_result'] = True
- self._rslt_q.put(TaskResult(self._host, self._task, res), block=False)
+ self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, res), block=False)
results.append(res)
del task_vars[loop_var]
@@ -516,7 +516,7 @@ class TaskExecutor:
result['_ansible_retry'] = True
result['retries'] = retries
display.debug('Retrying task, attempt %d of %d' % (attempt, retries))
- self._rslt_q.put(TaskResult(self._host, self._task, result), block=False)
+ self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, result), block=False)
time.sleep(delay)
else:
if retries > 1:
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index b59ca0ab88..7212b06aea 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -26,7 +26,6 @@ import tempfile
from ansible import constants as C
from ansible.errors import AnsibleError
from ansible.executor.play_iterator import PlayIterator
-from ansible.executor.process.result import ResultProcess
from ansible.executor.stats import AggregateStats
from ansible.playbook.block import Block
from ansible.playbook.play_context import PlayContext
@@ -81,7 +80,6 @@ class TaskQueueManager:
self._callbacks_loaded = False
self._callback_plugins = []
self._start_at_done = False
- self._result_prc = None
# make sure the module path (if specified) is parsed and
# added to the module_loader object
@@ -113,9 +111,6 @@ class TaskQueueManager:
rslt_q = multiprocessing.Queue()
self._workers.append([None, rslt_q])
- self._result_prc = ResultProcess(self._final_q, self._workers)
- self._result_prc.start()
-
def _initialize_notified_handlers(self, play):
'''
Clears and initializes the shared notified handlers dict with entries
@@ -299,9 +294,7 @@ class TaskQueueManager:
self._cleanup_processes()
def _cleanup_processes(self):
- if self._result_prc:
- self._result_prc.terminate()
-
+ if hasattr(self, '_workers'):
for (worker_prc, rslt_q) in self._workers:
rslt_q.close()
if worker_prc and worker_prc.is_alive():
diff --git a/lib/ansible/executor/task_result.py b/lib/ansible/executor/task_result.py
index f7ba7d092c..0f1967fe5c 100644
--- a/lib/ansible/executor/task_result.py
+++ b/lib/ansible/executor/task_result.py
@@ -41,7 +41,7 @@ class TaskResult:
def is_skipped(self):
# loop results
- if 'results' in self._result and self._task.loop:
+ if 'results' in self._result:
results = self._result['results']
# Loop tasks are only considered skipped if all items were skipped.
# some squashed results (eg, yum) are not dicts and can't be skipped individually
@@ -62,7 +62,7 @@ class TaskResult:
return self._check_key('unreachable')
def _check_key(self, key):
- if self._result.get('results', []) and self._task.loop:
+ if self._result.get('results', []):
flag = False
for res in self._result.get('results', []):
if isinstance(res, dict):
diff --git a/lib/ansible/inventory/host.py b/lib/ansible/inventory/host.py
index 36cc90f537..268eef2f46 100644
--- a/lib/ansible/inventory/host.py
+++ b/lib/ansible/inventory/host.py
@@ -64,12 +64,12 @@ class Host:
)
def deserialize(self, data):
- self.__init__()
+ self.__init__(gen_uuid=False)
self.name = data.get('name')
self.vars = data.get('vars', dict())
self.address = data.get('address', '')
- self._uuid = data.get('uuid', uuid.uuid4())
+ self._uuid = data.get('uuid', None)
self.implicit= data.get('implicit', False)
groups = data.get('groups', [])
@@ -78,7 +78,7 @@ class Host:
g.deserialize(group_data)
self.groups.append(g)
- def __init__(self, name=None, port=None):
+ def __init__(self, name=None, port=None, gen_uuid=True):
self.name = name
self.vars = {}
@@ -90,7 +90,9 @@ class Host:
self.set_variable('ansible_port', int(port))
self._gathered_facts = False
- self._uuid = uuid.uuid4()
+ self._uuid = None
+ if gen_uuid:
+ self._uuid = uuid.uuid4()
self.implicit = False
def __repr__(self):
diff --git a/lib/ansible/playbook/base.py b/lib/ansible/playbook/base.py
index 6bafe42844..b34bf448b0 100644
--- a/lib/ansible/playbook/base.py
+++ b/lib/ansible/playbook/base.py
@@ -80,6 +80,7 @@ class Base:
# every object gets a random uuid:
self._uuid = uuid.uuid4()
+ #self._uuid = 1
# and initialize the base attributes
self._initialize_base_attributes()
@@ -137,8 +138,8 @@ class Base:
'''
# check cache before retrieving attributes
- if self.__class__ in BASE_ATTRIBUTES:
- return BASE_ATTRIBUTES[self.__class__]
+ if self.__class__.__name__ in BASE_ATTRIBUTES:
+ return BASE_ATTRIBUTES[self.__class__.__name__]
# Cache init
base_attributes = dict()
@@ -147,7 +148,7 @@ class Base:
if name.startswith('_'):
name = name[1:]
base_attributes[name] = value
- BASE_ATTRIBUTES[self.__class__] = base_attributes
+ BASE_ATTRIBUTES[self.__class__.__name__] = base_attributes
return base_attributes
def _initialize_base_attributes(self):
diff --git a/lib/ansible/playbook/block.py b/lib/ansible/playbook/block.py
index 2994eadf56..016e53298b 100644
--- a/lib/ansible/playbook/block.py
+++ b/lib/ansible/playbook/block.py
@@ -185,7 +185,7 @@ class Block(Base, Become, Conditional, Taggable):
new_me._parent_block = None
if self._parent_block and not exclude_parent:
- new_me._parent_block = self._parent_block.copy(exclude_tasks=exclude_tasks)
+ new_me._parent_block = self._parent_block#.copy(exclude_tasks=exclude_tasks)
new_me._role = None
if self._role:
@@ -193,8 +193,8 @@ class Block(Base, Become, Conditional, Taggable):
new_me._task_include = None
if self._task_include:
- new_me._task_include = self._task_include.copy(exclude_block=True)
- new_me._task_include._block = self._task_include._block.copy(exclude_tasks=True)
+ new_me._task_include = self._task_include#.copy(exclude_block=True)
+ #new_me._task_include._block = self._task_include._block.copy(exclude_tasks=True)
return new_me
@@ -213,8 +213,8 @@ class Block(Base, Become, Conditional, Taggable):
if self._role is not None:
data['role'] = self._role.serialize()
- if self._task_include is not None:
- data['task_include'] = self._task_include.serialize()
+ #if self._task_include is not None:
+ # data['task_include'] = self._task_include.serialize()
if self._parent_block is not None:
data['parent_block'] = self._parent_block.copy(exclude_tasks=True).serialize()
diff --git a/lib/ansible/playbook/included_file.py b/lib/ansible/playbook/included_file.py
index 23a1f7860a..b6992d2738 100644
--- a/lib/ansible/playbook/included_file.py
+++ b/lib/ansible/playbook/included_file.py
@@ -60,8 +60,11 @@ class IncludedFile:
for res in results:
- if res._task.action == 'include':
- if res._task.loop:
+ original_host = res._host
+ original_task = res._task
+
+ if original_task.action == 'include':
+ if original_task.loop:
if 'results' not in res._result:
continue
include_results = res._result['results']
@@ -73,16 +76,13 @@ class IncludedFile:
if 'skipped' in include_result and include_result['skipped'] or 'failed' in include_result:
continue
- original_host = get_original_host(res._host)
- original_task = iterator.get_original_task(original_host, res._task)
-
task_vars = variable_manager.get_vars(loader=loader, play=iterator._play, host=original_host, task=original_task)
templar = Templar(loader=loader, variables=task_vars)
include_variables = include_result.get('include_variables', dict())
loop_var = 'item'
- if res._task.loop_control:
- loop_var = res._task.loop_control.loop_var or 'item'
+ if original_task.loop_control:
+ loop_var = original_task.loop_control.loop_var or 'item'
if loop_var in include_result:
task_vars[loop_var] = include_variables[loop_var] = include_result[loop_var]
diff --git a/lib/ansible/playbook/task.py b/lib/ansible/playbook/task.py
index 53e8d5ee05..e8b1b60f8c 100644
--- a/lib/ansible/playbook/task.py
+++ b/lib/ansible/playbook/task.py
@@ -326,12 +326,12 @@ class Task(Base, Conditional, Taggable, Become):
all_vars.update(self.vars)
return all_vars
- def copy(self, exclude_block=False):
+ def copy(self, exclude_block=False, exclude_tasks=False):
new_me = super(Task, self).copy()
new_me._block = None
if self._block and not exclude_block:
- new_me._block = self._block.copy()
+ new_me._block = self._block#.copy(exclude_tasks=exclude_tasks)
new_me._role = None
if self._role:
@@ -339,7 +339,8 @@ class Task(Base, Conditional, Taggable, Become):
new_me._task_include = None
if self._task_include:
- new_me._task_include = self._task_include.copy(exclude_block=exclude_block)
+ new_me._task_include = self._task_include#.copy(exclude_block=True)
+ #new_me._task_include._block = self._task_include._block.copy(exclude_tasks=True)
return new_me
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index 8954d520e1..c4dab7d241 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -43,7 +43,8 @@ from ansible.plugins import action_loader, connection_loader, filter_loader, loo
from ansible.template import Templar
from ansible.utils.unicode import to_unicode
from ansible.vars.unsafe_proxy import wrap_var
-from ansible.vars import combine_vars
+from ansible.vars import combine_vars, strip_internal_keys
+
try:
from __main__ import display
@@ -174,6 +175,7 @@ class StrategyBase:
# The next common higher level is __init__.py::run() and that has
# tasks inside of play_iterator so we'd have to extract them to do it
# there.
+
global action_write_locks
if task.action not in action_write_locks:
display.debug('Creating lock for %s' % task.action)
@@ -189,21 +191,22 @@ class StrategyBase:
shared_loader_obj = SharedPluginLoaderObj()
queued = False
+ starting_worker = self._cur_worker
while True:
(worker_prc, rslt_q) = self._workers[self._cur_worker]
if worker_prc is None or not worker_prc.is_alive():
- worker_prc = WorkerProcess(rslt_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
+ worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
self._workers[self._cur_worker][0] = worker_prc
worker_prc.start()
queued = True
self._cur_worker += 1
if self._cur_worker >= len(self._workers):
self._cur_worker = 0
- time.sleep(0.005)
if queued:
break
+ elif self._cur_worker == starting_worker:
+ time.sleep(0.0001)
- del task_vars
self._pending_results += 1
except (EOFError, IOError, AssertionError) as e:
# most likely an abort
@@ -219,222 +222,233 @@ class StrategyBase:
ret_results = []
+ def get_original_host(host_name):
+ host_name = to_unicode(host_name)
+ if host_name in self._inventory._hosts_cache:
+ return self._inventory._hosts_cache[host_name]
+ else:
+ return self._inventory.get_host(host_name)
+
+ def search_handler_blocks(handler_name, handler_blocks):
+ for handler_block in handler_blocks:
+ for handler_task in handler_block.block:
+ handler_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, task=handler_task)
+ templar = Templar(loader=self._loader, variables=handler_vars)
+ try:
+ # first we check with the full result of get_name(), which may
+ # include the role name (if the handler is from a role). If that
+ # is not found, we resort to the simple name field, which doesn't
+ # have anything extra added to it.
+ target_handler_name = templar.template(handler_task.name)
+ if target_handler_name == handler_name:
+ return handler_task
+ else:
+ target_handler_name = templar.template(handler_task.get_name())
+ if target_handler_name == handler_name:
+ return handler_task
+ except (UndefinedError, AnsibleUndefinedVariable) as e:
+ # We skip this handler due to the fact that it may be using
+ # a variable in the name that was conditionally included via
+ # set_fact or some other method, and we don't want to error
+ # out unnecessarily
+ continue
+ return None
+
while not self._final_q.empty() and not self._tqm._terminated:
try:
- result = self._final_q.get()
- display.debug("got result from result worker: %s" % ([text_type(x) for x in result],))
-
- # helper method, used to find the original host from the one
- # returned in the result/message, which has been serialized and
- # thus had some information stripped from it to speed up the
- # serialization process
- def get_original_host(host):
- if host.name in self._inventory._hosts_cache:
- return self._inventory._hosts_cache[host.name]
+ task_result = self._final_q.get(block=False)
+ original_host = get_original_host(task_result._host)
+ original_task = iterator.get_original_task(original_host, task_result._task)
+ task_result._host = original_host
+ task_result._task = original_task
+
+ # send callbacks for 'non final' results
+ if '_ansible_retry' in task_result._result:
+ self._tqm.send_callback('v2_runner_retry', task_result)
+ continue
+ elif '_ansible_item_result' in task_result._result:
+ if task_result.is_failed() or task_result.is_unreachable():
+ self._tqm.send_callback('v2_runner_item_on_failed', task_result)
+ elif task_result.is_skipped():
+ self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
else:
- return self._inventory.get_host(host.name)
+ self._tqm.send_callback('v2_runner_item_on_ok', task_result)
+ continue
- # all host status messages contain 2 entries: (msg, task_result)
- if result[0] in ('host_task_ok', 'host_task_failed', 'host_task_skipped', 'host_unreachable'):
- task_result = result[1]
- host = get_original_host(task_result._host)
- task = task_result._task
- if result[0] == 'host_task_failed' or task_result.is_failed():
- if not task.ignore_errors:
- display.debug("marking %s as failed" % host.name)
- if task.run_once:
- # if we're using run_once, we have to fail every host here
- [iterator.mark_host_failed(h) for h in self._inventory.get_hosts(iterator._play.hosts) if h.name not in self._tqm._unreachable_hosts]
- else:
- iterator.mark_host_failed(host)
+ if original_task.register:
+ if original_task.run_once:
+ host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
+ else:
+ host_list = [original_host]
- # only add the host to the failed list officially if it has
- # been failed by the iterator
- if iterator.is_failed(host):
- self._tqm._failed_hosts[host.name] = True
- self._tqm._stats.increment('failures', host.name)
- else:
- # otherwise, we grab the current state and if we're iterating on
- # the rescue portion of a block then we save the failed task in a
- # special var for use within the rescue/always
- state, _ = iterator.get_next_task_for_host(host, peek=True)
- if state.run_state == iterator.ITERATING_RESCUE:
- original_task = iterator.get_original_task(host, task)
- self._variable_manager.set_nonpersistent_facts(
- host,
- dict(
- ansible_failed_task=original_task.serialize(),
- ansible_failed_result=task_result._result,
- ),
- )
- else:
- self._tqm._stats.increment('ok', host.name)
- self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=task.ignore_errors)
- elif result[0] == 'host_unreachable':
- self._tqm._unreachable_hosts[host.name] = True
- self._tqm._stats.increment('dark', host.name)
- self._tqm.send_callback('v2_runner_on_unreachable', task_result)
- elif result[0] == 'host_task_skipped':
- self._tqm._stats.increment('skipped', host.name)
- self._tqm.send_callback('v2_runner_on_skipped', task_result)
- elif result[0] == 'host_task_ok':
- if task.action != 'include':
- self._tqm._stats.increment('ok', host.name)
- if 'changed' in task_result._result and task_result._result['changed']:
- self._tqm._stats.increment('changed', host.name)
- self._tqm.send_callback('v2_runner_on_ok', task_result)
+ clean_copy = strip_internal_keys(task_result._result)
+ if 'invocation' in clean_copy:
+ del clean_copy['invocation']
- if self._diff:
- self._tqm.send_callback('v2_on_file_diff', task_result)
+ for target_host in host_list:
+ self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy})
- self._pending_results -= 1
- if host.name in self._blocked_hosts:
- del self._blocked_hosts[host.name]
-
- # If this is a role task, mark the parent role as being run (if
- # the task was ok or failed, but not skipped or unreachable)
- if task_result._task._role is not None and result[0] in ('host_task_ok', 'host_task_failed'):
- # lookup the role in the ROLE_CACHE to make sure we're dealing
- # with the correct object and mark it as executed
- for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[task_result._task._role._role_name]):
- if role_obj._uuid == task_result._task._role._uuid:
- role_obj._had_task_run[host.name] = True
-
- ret_results.append(task_result)
-
- elif result[0] == 'add_host':
- result_item = result[1]
- new_host_info = result_item.get('add_host', dict())
-
- self._add_host(new_host_info, iterator)
-
- elif result[0] == 'add_group':
- host = get_original_host(result[1])
- result_item = result[2]
- self._add_group(host, result_item)
-
- elif result[0] == 'notify_handler':
- task_result = result[1]
- handler_name = result[2]
-
- original_host = get_original_host(task_result._host)
- original_task = iterator.get_original_task(original_host, task_result._task)
-
- def search_handler_blocks(handler_name, handler_blocks):
- for handler_block in handler_blocks:
- for handler_task in handler_block.block:
- handler_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, task=handler_task)
- templar = Templar(loader=self._loader, variables=handler_vars)
- try:
- # first we check with the full result of get_name(), which may
- # include the role name (if the handler is from a role). If that
- # is not found, we resort to the simple name field, which doesn't
- # have anything extra added to it.
- target_handler_name = templar.template(handler_task.name)
- if target_handler_name == handler_name:
- return handler_task
- else:
- target_handler_name = templar.template(handler_task.get_name())
- if target_handler_name == handler_name:
- return handler_task
- except (UndefinedError, AnsibleUndefinedVariable):
- # We skip this handler due to the fact that it may be using
- # a variable in the name that was conditionally included via
- # set_fact or some other method, and we don't want to error
- # out unnecessarily
- continue
- return None
-
- # Find the handler using the above helper. First we look up the
- # dependency chain of the current task (if it's from a role), otherwise
- # we just look through the list of handlers in the current play/all
- # roles and use the first one that matches the notify name
- if handler_name in self._listening_handlers:
- for listening_handler_name in self._listening_handlers[handler_name]:
- listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers)
- if listening_handler is None:
- raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name)
-
- if original_host not in self._notified_handlers[listening_handler]:
- self._notified_handlers[listening_handler].append(original_host)
- display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,))
- else:
- target_handler = search_handler_blocks(handler_name, iterator._play.handlers)
- if target_handler is None:
- raise AnsibleError("The requested handler '%s' was not found in any of the known handlers" % handler_name)
-
- if target_handler in self._notified_handlers:
- if original_host not in self._notified_handlers[target_handler]:
- self._notified_handlers[target_handler].append(original_host)
- # FIXME: should this be a callback?
- display.vv("NOTIFIED HANDLER %s" % (handler_name,))
+ # all host status messages contain 2 entries: (msg, task_result)
+ role_ran = False
+ if task_result.is_failed():
+ role_ran = True
+ if not original_task.ignore_errors:
+ display.debug("marking %s as failed" % original_host.name)
+ if original_task.run_once:
+ # if we're using run_once, we have to fail every host here
+ [iterator.mark_host_failed(h) for h in self._inventory.get_hosts(iterator._play.hosts) if h.name not in self._tqm._unreachable_hosts]
else:
- raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name)
-
- elif result[0] == 'register_host_var':
- # essentially the same as 'set_host_var' below, however we
- # never follow the delegate_to value for registered vars and
- # the variable goes in the fact_cache
- host = get_original_host(result[1])
- task = result[2]
- var_value = wrap_var(result[3])
- var_name = task.register
-
- if task.run_once:
- host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
- else:
- host_list = [host]
+ iterator.mark_host_failed(original_host)
- for target_host in host_list:
- self._variable_manager.set_nonpersistent_facts(target_host, {var_name: var_value})
-
- elif result[0] in ('set_host_var', 'set_host_facts'):
- host = get_original_host(result[1])
- task = result[2]
- item = result[3]
-
- # find the host we're actually refering too here, which may
- # be a host that is not really in inventory at all
- if task.delegate_to is not None and task.delegate_facts:
- task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
- self.add_tqm_variables(task_vars, play=iterator._play)
- loop_var = 'item'
- if task.loop_control:
- loop_var = task.loop_control.loop_var or 'item'
- if item is not None:
- task_vars[loop_var] = item
- templar = Templar(loader=self._loader, variables=task_vars)
- host_name = templar.template(task.delegate_to)
- actual_host = self._inventory.get_host(host_name)
- if actual_host is None:
- actual_host = Host(name=host_name)
+ # only add the host to the failed list officially if it has
+ # been failed by the iterator
+ if iterator.is_failed(original_host):
+ self._tqm._failed_hosts[original_host.name] = True
+ self._tqm._stats.increment('failures', original_host.name)
+ else:
+ # otherwise, we grab the current state and if we're iterating on
+ # the rescue portion of a block then we save the failed task in a
+ # special var for use within the rescue/always
+ state, _ = iterator.get_next_task_for_host(original_host, peek=True)
+ if state.run_state == iterator.ITERATING_RESCUE:
+ self._variable_manager.set_nonpersistent_facts(
+ original_host,
+ dict(
+ ansible_failed_task=original_task.serialize(),
+ ansible_failed_result=task_result._result,
+ ),
+ )
else:
- actual_host = host
+ self._tqm._stats.increment('ok', original_host.name)
+ self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors)
+ elif task_result.is_unreachable():
+ self._tqm._unreachable_hosts[original_host.name] = True
+ self._tqm._stats.increment('dark', original_host.name)
+ self._tqm.send_callback('v2_runner_on_unreachable', task_result)
+ elif task_result.is_skipped():
+ self._tqm._stats.increment('skipped', original_host.name)
+ self._tqm.send_callback('v2_runner_on_skipped', task_result)
+ else:
+ role_ran = True
- if task.run_once:
- host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
+ if original_task.loop:
+ # this task had a loop, and has more than one result, so
+ # loop over all of them instead of a single result
+ result_items = task_result._result.get('results', [])
else:
- host_list = [actual_host]
-
- if result[0] == 'set_host_var':
- var_name = result[4]
- var_value = result[5]
- for target_host in host_list:
- self._variable_manager.set_host_variable(target_host, var_name, var_value)
- elif result[0] == 'set_host_facts':
- facts = result[4]
- for target_host in host_list:
- if task.action == 'set_fact':
- self._variable_manager.set_nonpersistent_facts(target_host, facts.copy())
+ result_items = [ task_result._result ]
+
+ for result_item in result_items:
+ if '_ansible_notify' in result_item:
+ if task_result.is_changed():
+ # The shared dictionary for notified handlers is a proxy, which
+ # does not detect when sub-objects within the proxy are modified.
+ # So, per the docs, we reassign the list so the proxy picks up and
+ # notifies all other threads
+ for handler_name in result_item['_ansible_notify']:
+ # Find the handler using the above helper. First we look up the
+ # dependency chain of the current task (if it's from a role), otherwise
+ # we just look through the list of handlers in the current play/all
+ # roles and use the first one that matches the notify name
+ if handler_name in self._listening_handlers:
+ for listening_handler_name in self._listening_handlers[handler_name]:
+ listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers)
+ if listening_handler is None:
+ raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name)
+ if original_host not in self._notified_handlers[listening_handler]:
+ self._notified_handlers[listening_handler].append(original_host)
+ display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,))
+
+ else:
+ target_handler = search_handler_blocks(handler_name, iterator._play.handlers)
+ if target_handler is None:
+ raise AnsibleError("The requested handler '%s' was not found in any of the known handlers" % handler_name)
+ if target_handler in self._notified_handlers:
+ if original_host not in self._notified_handlers[target_handler]:
+ self._notified_handlers[target_handler].append(original_host)
+ # FIXME: should this be a callback?
+ display.vv("NOTIFIED HANDLER %s" % (handler_name,))
+ else:
+ raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name)
+
+ if 'add_host' in result_item:
+ # this task added a new host (add_host module)
+ new_host_info = result_item.get('add_host', dict())
+ self._add_host(new_host_info, iterator)
+
+ elif 'add_group' in result_item:
+ # this task added a new group (group_by module)
+ self._add_group(original_host, result_item)
+
+ elif 'ansible_facts' in result_item:
+ loop_var = 'item'
+ if original_task.loop_control:
+ loop_var = original_task.loop_control.loop_var or 'item'
+
+ item = result_item.get(loop_var, None)
+
+ if original_task.action == 'include_vars':
+ for (var_name, var_value) in iteritems(result_item['ansible_facts']):
+ # find the host we're actually refering too here, which may
+ # be a host that is not really in inventory at all
+ if original_task.delegate_to is not None and original_task.delegate_facts:
+ task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
+ self.add_tqm_variables(task_vars, play=iterator._play)
+ if item is not None:
+ task_vars[loop_var] = item
+ templar = Templar(loader=self._loader, variables=task_vars)
+ host_name = templar.template(original_task.delegate_to)
+ actual_host = self._inventory.get_host(host_name)
+ if actual_host is None:
+ actual_host = Host(name=host_name)
+ else:
+ actual_host = original_host
+
+ if original_task.run_once:
+ host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
+ else:
+ host_list = [actual_host]
+
+ for target_host in host_list:
+ self._variable_manager.set_host_variable(target_host, var_name, var_value)
else:
- self._variable_manager.set_host_facts(target_host, facts.copy())
- elif result[0].startswith('v2_runner_item') or result[0] == 'v2_runner_retry':
- self._tqm.send_callback(result[0], result[1])
- elif result[0] == 'v2_on_file_diff':
- if self._diff:
- self._tqm.send_callback('v2_on_file_diff', result[1])
- else:
- raise AnsibleError("unknown result message received: %s" % result[0])
+ if original_task.run_once:
+ host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts]
+ else:
+ host_list = [original_host]
+
+ for target_host in host_list:
+ if original_task.action == 'set_fact':
+ self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy())
+ else:
+ self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy())
+
+ if 'diff' in task_result._result:
+ if self._diff:
+ self._tqm.send_callback('v2_on_file_diff', task_result)
+
+ if original_task.action != 'include':
+ self._tqm._stats.increment('ok', original_host.name)
+ if 'changed' in task_result._result and task_result._result['changed']:
+ self._tqm._stats.increment('changed', original_host.name)
+
+ # finally, send the ok for this task
+ self._tqm.send_callback('v2_runner_on_ok', task_result)
+
+ self._pending_results -= 1
+ if original_host.name in self._blocked_hosts:
+ del self._blocked_hosts[original_host.name]
+
+ # If this is a role task, mark the parent role as being run (if
+ # the task was ok or failed, but not skipped or unreachable)
+ if original_task._role is not None and role_ran:
+ # lookup the role in the ROLE_CACHE to make sure we're dealing
+ # with the correct object and mark it as executed
+ for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]):
+ if role_obj._uuid == original_task._role._uuid:
+ role_obj._had_task_run[original_host.name] = True
+
+ ret_results.append(task_result)
except Queue.Empty:
time.sleep(0.005)
diff --git a/lib/ansible/plugins/strategy/free.py b/lib/ansible/plugins/strategy/free.py
index 89287c49ab..772db5cf5a 100644
--- a/lib/ansible/plugins/strategy/free.py
+++ b/lib/ansible/plugins/strategy/free.py
@@ -146,6 +146,7 @@ class StrategyModule(StrategyBase):
display.warning("Using any_errors_fatal with the free strategy is not supported, as tasks are executed independently on each host")
self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False)
self._queue_task(host, task, task_vars, play_context)
+ del task_vars
else:
display.debug("%s is blocked, skipping for now" % host_name)
diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py
index 50028fbe1c..8253241792 100644
--- a/lib/ansible/plugins/strategy/linear.py
+++ b/lib/ansible/plugins/strategy/linear.py
@@ -253,6 +253,7 @@ class StrategyModule(StrategyBase):
self._blocked_hosts[host.get_name()] = True
self._queue_task(host, task, task_vars, play_context)
+ del task_vars
# if we're bypassing the host loop, break out now
if run_once:
diff --git a/test/units/executor/test_task_executor.py b/test/units/executor/test_task_executor.py
index 3f9a614dfb..12d1330239 100644
--- a/test/units/executor/test_task_executor.py
+++ b/test/units/executor/test_task_executor.py
@@ -139,7 +139,7 @@ class TestTaskExecutor(unittest.TestCase):
mock_host = MagicMock()
- def _copy():
+ def _copy(exclude_block=False, exclude_tasks=False):
new_item = MagicMock()
return new_item
diff --git a/test/units/plugins/strategies/test_strategy_base.py b/test/units/plugins/strategies/test_strategy_base.py
index 23c3ce40bb..46c3729fd2 100644
--- a/test/units/plugins/strategies/test_strategy_base.py
+++ b/test/units/plugins/strategies/test_strategy_base.py
@@ -19,6 +19,8 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
+import uuid
+
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
@@ -27,6 +29,7 @@ from ansible.plugins.strategy import StrategyBase
from ansible.executor.process.worker import WorkerProcess
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.executor.task_result import TaskResult
+from ansible.playbook.block import Block
from ansible.playbook.handler import Handler
from ansible.inventory.host import Host
@@ -183,11 +186,6 @@ class TestStrategyBase(unittest.TestCase):
mock_play = MagicMock()
- mock_iterator = MagicMock()
- mock_iterator._play = mock_play
- mock_iterator.mark_host_failed.return_value = None
- mock_iterator.get_next_task_for_host.return_value = (None, None)
-
mock_host = MagicMock()
mock_host.name = 'test01'
mock_host.vars = dict()
@@ -196,6 +194,8 @@ class TestStrategyBase(unittest.TestCase):
mock_task = MagicMock()
mock_task._role = None
mock_task.ignore_errors = False
+ mock_task._uuid = uuid.uuid4()
+ mock_task.loop = None
mock_handler_task = MagicMock(Handler)
mock_handler_task.name = 'test handler'
@@ -203,8 +203,16 @@ class TestStrategyBase(unittest.TestCase):
mock_handler_task.get_name.return_value = "test handler"
mock_handler_task.has_triggered.return_value = False
+ mock_iterator = MagicMock()
+ mock_iterator._play = mock_play
+ mock_iterator.mark_host_failed.return_value = None
+ mock_iterator.get_next_task_for_host.return_value = (None, None)
+ mock_iterator.get_original_task.return_value = mock_task
+
mock_handler_block = MagicMock()
mock_handler_block.block = [mock_handler_task]
+ mock_handler_block.rescue = []
+ mock_handler_block.always = []
mock_play.handlers = [mock_handler_block]
mock_tqm._notified_handlers = {mock_handler_task: []}
@@ -245,8 +253,8 @@ class TestStrategyBase(unittest.TestCase):
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0)
- task_result = TaskResult(host=mock_host, task=mock_task, return_data=dict(changed=True))
- queue_items.append(('host_task_ok', task_result))
+ task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True))
+ queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
@@ -255,10 +263,11 @@ class TestStrategyBase(unittest.TestCase):
self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts)
- task_result = TaskResult(host=mock_host, task=mock_task, return_data='{"failed":true}')
- queue_items.append(('host_task_failed', task_result))
+ task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data='{"failed":true}')
+ queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
+ mock_iterator.is_failed.return_value = True
results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
@@ -266,9 +275,10 @@ class TestStrategyBase(unittest.TestCase):
self.assertNotIn('test01', strategy_base._blocked_hosts)
self.assertIn('test01', mock_tqm._failed_hosts)
del mock_tqm._failed_hosts['test01']
+ mock_iterator.is_failed.return_value = False
- task_result = TaskResult(host=mock_host, task=mock_task, return_data='{}')
- queue_items.append(('host_unreachable', task_result))
+ task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data='{"unreachable": true}')
+ queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
@@ -279,8 +289,8 @@ class TestStrategyBase(unittest.TestCase):
self.assertIn('test01', mock_tqm._unreachable_hosts)
del mock_tqm._unreachable_hosts['test01']
- task_result = TaskResult(host=mock_host, task=mock_task, return_data='{}')
- queue_items.append(('host_task_skipped', task_result))
+ task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data='{"skipped": true}')
+ queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
@@ -289,42 +299,44 @@ class TestStrategyBase(unittest.TestCase):
self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts)
+ queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo']))))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
-
- queue_items.append(('add_host', dict(add_host=dict(host_name='newhost01', new_groups=['foo']))))
results = strategy_base._process_pending_results(iterator=mock_iterator)
- self.assertEqual(len(results), 0)
- self.assertEqual(strategy_base._pending_results, 1)
- self.assertIn('test01', strategy_base._blocked_hosts)
+ self.assertEqual(len(results), 1)
+ self.assertEqual(strategy_base._pending_results, 0)
+ self.assertNotIn('test01', strategy_base._blocked_hosts)
- queue_items.append(('add_group', mock_host, dict(add_group=dict(group_name='foo'))))
+ queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo'))))
+ strategy_base._blocked_hosts['test01'] = True
+ strategy_base._pending_results = 1
results = strategy_base._process_pending_results(iterator=mock_iterator)
- self.assertEqual(len(results), 0)
- self.assertEqual(strategy_base._pending_results, 1)
- self.assertIn('test01', strategy_base._blocked_hosts)
+ self.assertEqual(len(results), 1)
+ self.assertEqual(strategy_base._pending_results, 0)
+ self.assertNotIn('test01', strategy_base._blocked_hosts)
- task_result = TaskResult(host=mock_host, task=mock_task, return_data=dict(changed=True))
- queue_items.append(('notify_handler', task_result, 'test handler'))
+ queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler'])))
+ strategy_base._blocked_hosts['test01'] = True
+ strategy_base._pending_results = 1
results = strategy_base._process_pending_results(iterator=mock_iterator)
- self.assertEqual(len(results), 0)
- self.assertEqual(strategy_base._pending_results, 1)
- self.assertIn('test01', strategy_base._blocked_hosts)
+ self.assertEqual(len(results), 1)
+ self.assertEqual(strategy_base._pending_results, 0)
+ self.assertNotIn('test01', strategy_base._blocked_hosts)
self.assertIn(mock_handler_task, strategy_base._notified_handlers)
self.assertIn(mock_host, strategy_base._notified_handlers[mock_handler_task])
- queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar'))
- results = strategy_base._process_pending_results(iterator=mock_iterator)
- self.assertEqual(len(results), 0)
- self.assertEqual(strategy_base._pending_results, 1)
+ #queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar'))
+ #results = strategy_base._process_pending_results(iterator=mock_iterator)
+ #self.assertEqual(len(results), 0)
+ #self.assertEqual(strategy_base._pending_results, 1)
- queue_items.append(('set_host_facts', mock_host, mock_task, None, 'foo', dict()))
- results = strategy_base._process_pending_results(iterator=mock_iterator)
- self.assertEqual(len(results), 0)
- self.assertEqual(strategy_base._pending_results, 1)
+ #queue_items.append(('set_host_facts', mock_host, mock_task, None, 'foo', dict()))
+ #results = strategy_base._process_pending_results(iterator=mock_iterator)
+ #self.assertEqual(len(results), 0)
+ #self.assertEqual(strategy_base._pending_results, 1)
- queue_items.append(('bad'))
- self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator)
+ #queue_items.append(('bad'))
+ #self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator)
def test_strategy_base_load_included_file(self):
fake_loader = DictDataLoader({
@@ -377,6 +389,8 @@ class TestStrategyBase(unittest.TestCase):
mock_handler_task.action = 'foo'
mock_handler_task.get_name.return_value = "test handler"
mock_handler_task.has_triggered.return_value = False
+ mock_handler_task.listen = None
+ mock_handler_task._role = None
mock_handler = MagicMock()
mock_handler.block = [mock_handler_task]
@@ -395,8 +409,9 @@ class TestStrategyBase(unittest.TestCase):
mock_var_mgr = MagicMock()
mock_var_mgr.get_vars.return_value = dict()
- mock_iterator = MagicMock
+ mock_iterator = MagicMock()
mock_iterator._play = mock_play
+ mock_iterator.get_original_task.return_value = mock_handler_task
fake_loader = DictDataLoader()
mock_options = MagicMock()
@@ -420,7 +435,7 @@ class TestStrategyBase(unittest.TestCase):
strategy_base._notified_handlers = {mock_handler_task: [mock_host]}
task_result = TaskResult(Host('host01'), Handler(), dict(changed=False))
- tqm._final_q.put(('host_task_ok', task_result))
+ tqm._final_q.put(task_result)
result = strategy_base.run_handlers(iterator=mock_iterator, play_context=mock_play_context)
finally: