summaryrefslogtreecommitdiff
path: root/lib/ansible/executor/task_executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/executor/task_executor.py')
-rw-r--r--lib/ansible/executor/task_executor.py80
1 files changed, 51 insertions, 29 deletions
diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py
index 2dcb5f9631..1417bc9d2c 100644
--- a/lib/ansible/executor/task_executor.py
+++ b/lib/ansible/executor/task_executor.py
@@ -35,7 +35,7 @@ from ansible.template import Templar
from ansible.utils.encrypt import key_for_hostname
from ansible.utils.listify import listify_lookup_plugin_terms
from ansible.utils.unicode import to_unicode
-from ansible.vars.unsafe_proxy import UnsafeProxy
+from ansible.vars.unsafe_proxy import UnsafeProxy, wrap_var
try:
from __main__ import display
@@ -67,6 +67,7 @@ class TaskExecutor:
self._new_stdin = new_stdin
self._loader = loader
self._shared_loader_obj = shared_loader_obj
+ self._connection = None
def run(self):
'''
@@ -145,7 +146,7 @@ class TaskExecutor:
except AttributeError:
pass
except Exception as e:
- display.debug("error closing connection: %s" % to_unicode(e))
+ display.debug(u"error closing connection: %s" % to_unicode(e))
def _get_loop_items(self):
'''
@@ -153,16 +154,19 @@ class TaskExecutor:
and returns the items result.
'''
- # create a copy of the job vars here so that we can modify
- # them temporarily without changing them too early for other
- # parts of the code that might still need a pristine version
- #vars_copy = self._job_vars.copy()
- vars_copy = self._job_vars
+ # save the play context variables to a temporary dictionary,
+ # so that we can modify the job vars without doing a full copy
+ # and later restore them to avoid modifying things too early
+ play_context_vars = dict()
+ self._play_context.update_vars(play_context_vars)
- # now we update them with the play context vars
- self._play_context.update_vars(vars_copy)
+ old_vars = dict()
+ for k in play_context_vars.keys():
+ if k in self._job_vars:
+ old_vars[k] = self._job_vars[k]
+ self._job_vars[k] = play_context_vars[k]
- templar = Templar(loader=self._loader, shared_loader_obj=self._shared_loader_obj, variables=vars_copy)
+ templar = Templar(loader=self._loader, shared_loader_obj=self._shared_loader_obj, variables=self._job_vars)
items = None
if self._task.loop:
if self._task.loop in self._shared_loader_obj.lookup_loader:
@@ -179,16 +183,25 @@ class TaskExecutor:
loop_terms = listify_lookup_plugin_terms(terms=self._task.loop_args, templar=templar,
loader=self._loader, fail_on_undefined=True, convert_bare=True)
except AnsibleUndefinedVariable as e:
- if 'has no attribute' in str(e):
+ if u'has no attribute' in to_unicode(e):
loop_terms = []
display.deprecated("Skipping task due to undefined attribute, in the future this will be a fatal error.")
else:
raise
items = self._shared_loader_obj.lookup_loader.get(self._task.loop, loader=self._loader,
- templar=templar).run(terms=loop_terms, variables=vars_copy)
+ templar=templar).run(terms=loop_terms, variables=self._job_vars)
else:
raise AnsibleError("Unexpected failure in finding the lookup named '%s' in the available lookup plugins" % self._task.loop)
+ # now we restore any old job variables that may have been modified,
+ # and delete them if they were in the play context vars but not in
+ # the old variables dictionary
+ for k in play_context_vars.keys():
+ if k in old_vars:
+ self._job_vars[k] = old_vars[k]
+ else:
+ del self._job_vars[k]
+
if items:
from ansible.vars.unsafe_proxy import UnsafeProxy
for idx, item in enumerate(items):
@@ -218,7 +231,7 @@ class TaskExecutor:
tmp_task = self._task.copy()
tmp_play_context = self._play_context.copy()
except AnsibleParserError as e:
- results.append(dict(failed=True, msg=str(e)))
+ results.append(dict(failed=True, msg=to_unicode(e)))
continue
# now we swap the internal task and play context with their copies,
@@ -232,6 +245,7 @@ class TaskExecutor:
# now update the result with the item info, and append the result
# to the list of results
res['item'] = item
+ #TODO: send item results to callback here, instead of all at the end
results.append(res)
return results
@@ -302,6 +316,11 @@ class TaskExecutor:
# do the same kind of post validation step on it here before we use it.
self._play_context.post_validate(templar=templar)
+ # now that the play context is finalized, if the remote_addr is not set
+ # default to using the host's address field as the remote address
+ if not self._play_context.remote_addr:
+ self._play_context.remote_addr = self._host.address
+
# We also add "magic" variables back into the variables dict to make sure
# a certain subset of variables exist.
self._play_context.update_vars(variables)
@@ -348,8 +367,13 @@ class TaskExecutor:
self._task.args = variable_params
# get the connection and the handler for this execution
- self._connection = self._get_connection(variables=variables, templar=templar)
- self._connection.set_host_overrides(host=self._host)
+ if not self._connection or not getattr(self._connection, 'connected', False) or self._play_context.remote_addr != self._connection._play_context.remote_addr:
+ self._connection = self._get_connection(variables=variables, templar=templar)
+ self._connection.set_host_overrides(host=self._host)
+ else:
+ # if connection is reused, its _play_context is no longer valid and needs
+ # to be replaced with the one templated above, in case other data changed
+ self._connection._play_context = self._play_context
self._handler = self._get_action_handler(connection=self._connection, templar=templar)
@@ -372,30 +396,36 @@ class TaskExecutor:
# make a copy of the job vars here, in case we need to update them
# with the registered variable value later on when testing conditions
- #vars_copy = variables.copy()
vars_copy = variables.copy()
display.debug("starting attempt loop")
result = None
for attempt in range(retries):
if attempt > 0:
- display.display("FAILED - RETRYING: %s (%d retries left). Result was: %s" % (self._task, retries-attempt, result), color="dark gray")
+ display.display("FAILED - RETRYING: %s (%d retries left). Result was: %s" % (self._task, retries-attempt, result), color=C.COLOR_DEBUG)
result['attempts'] = attempt + 1
display.debug("running the handler")
try:
result = self._handler.run(task_vars=variables)
except AnsibleConnectionFailure as e:
- return dict(unreachable=True, msg=str(e))
+ return dict(unreachable=True, msg=to_unicode(e))
display.debug("handler run complete")
+ # update the local copy of vars with the registered value, if specified,
+ # or any facts which may have been generated by the module execution
+ if self._task.register:
+ vars_copy[self._task.register] = wrap_var(result.copy())
+
if self._task.async > 0:
# the async_wrapper module returns dumped JSON via its stdout
# response, so we parse it here and replace the result
try:
+ if 'skipped' in result and result['skipped'] or 'failed' in result and result['failed']:
+ return result
result = json.loads(result.get('stdout'))
except (TypeError, ValueError) as e:
- return dict(failed=True, msg="The async task did not return valid JSON: %s" % str(e))
+ return dict(failed=True, msg=u"The async task did not return valid JSON: %s" % to_unicode(e))
if self._task.poll > 0:
result = self._poll_async_result(result=result, templar=templar)
@@ -416,11 +446,6 @@ class TaskExecutor:
return failed_when_result
return False
- # update the local copy of vars with the registered value, if specified,
- # or any facts which may have been generated by the module execution
- if self._task.register:
- vars_copy[self._task.register] = result
-
if 'ansible_facts' in result:
vars_copy.update(result['ansible_facts'])
@@ -437,7 +462,7 @@ class TaskExecutor:
if attempt < retries - 1:
cond = Conditional(loader=self._loader)
- cond.when = self._task.until
+ cond.when = [ self._task.until ]
if cond.evaluate_conditional(templar, vars_copy):
break
@@ -450,7 +475,7 @@ class TaskExecutor:
# do the final update of the local variables here, for both registered
# values and any facts which may have been created
if self._task.register:
- variables[self._task.register] = result
+ variables[self._task.register] = wrap_var(result)
if 'ansible_facts' in result:
variables.update(result['ansible_facts'])
@@ -528,9 +553,6 @@ class TaskExecutor:
correct connection object from the list of connection plugins
'''
- if not self._play_context.remote_addr:
- self._play_context.remote_addr = self._host.address
-
if self._task.delegate_to is not None:
# since we're delegating, we don't want to use interpreter values
# which would have been set for the original target host