diff options
Diffstat (limited to 'lib/ansible/executor/task_executor.py')
-rw-r--r-- | lib/ansible/executor/task_executor.py | 80 |
1 files changed, 51 insertions, 29 deletions
diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py index 2dcb5f9631..1417bc9d2c 100644 --- a/lib/ansible/executor/task_executor.py +++ b/lib/ansible/executor/task_executor.py @@ -35,7 +35,7 @@ from ansible.template import Templar from ansible.utils.encrypt import key_for_hostname from ansible.utils.listify import listify_lookup_plugin_terms from ansible.utils.unicode import to_unicode -from ansible.vars.unsafe_proxy import UnsafeProxy +from ansible.vars.unsafe_proxy import UnsafeProxy, wrap_var try: from __main__ import display @@ -67,6 +67,7 @@ class TaskExecutor: self._new_stdin = new_stdin self._loader = loader self._shared_loader_obj = shared_loader_obj + self._connection = None def run(self): ''' @@ -145,7 +146,7 @@ class TaskExecutor: except AttributeError: pass except Exception as e: - display.debug("error closing connection: %s" % to_unicode(e)) + display.debug(u"error closing connection: %s" % to_unicode(e)) def _get_loop_items(self): ''' @@ -153,16 +154,19 @@ class TaskExecutor: and returns the items result. ''' - # create a copy of the job vars here so that we can modify - # them temporarily without changing them too early for other - # parts of the code that might still need a pristine version - #vars_copy = self._job_vars.copy() - vars_copy = self._job_vars + # save the play context variables to a temporary dictionary, + # so that we can modify the job vars without doing a full copy + # and later restore them to avoid modifying things too early + play_context_vars = dict() + self._play_context.update_vars(play_context_vars) - # now we update them with the play context vars - self._play_context.update_vars(vars_copy) + old_vars = dict() + for k in play_context_vars.keys(): + if k in self._job_vars: + old_vars[k] = self._job_vars[k] + self._job_vars[k] = play_context_vars[k] - templar = Templar(loader=self._loader, shared_loader_obj=self._shared_loader_obj, variables=vars_copy) + templar = Templar(loader=self._loader, shared_loader_obj=self._shared_loader_obj, variables=self._job_vars) items = None if self._task.loop: if self._task.loop in self._shared_loader_obj.lookup_loader: @@ -179,16 +183,25 @@ class TaskExecutor: loop_terms = listify_lookup_plugin_terms(terms=self._task.loop_args, templar=templar, loader=self._loader, fail_on_undefined=True, convert_bare=True) except AnsibleUndefinedVariable as e: - if 'has no attribute' in str(e): + if u'has no attribute' in to_unicode(e): loop_terms = [] display.deprecated("Skipping task due to undefined attribute, in the future this will be a fatal error.") else: raise items = self._shared_loader_obj.lookup_loader.get(self._task.loop, loader=self._loader, - templar=templar).run(terms=loop_terms, variables=vars_copy) + templar=templar).run(terms=loop_terms, variables=self._job_vars) else: raise AnsibleError("Unexpected failure in finding the lookup named '%s' in the available lookup plugins" % self._task.loop) + # now we restore any old job variables that may have been modified, + # and delete them if they were in the play context vars but not in + # the old variables dictionary + for k in play_context_vars.keys(): + if k in old_vars: + self._job_vars[k] = old_vars[k] + else: + del self._job_vars[k] + if items: from ansible.vars.unsafe_proxy import UnsafeProxy for idx, item in enumerate(items): @@ -218,7 +231,7 @@ class TaskExecutor: tmp_task = self._task.copy() tmp_play_context = self._play_context.copy() except AnsibleParserError as e: - results.append(dict(failed=True, msg=str(e))) + results.append(dict(failed=True, msg=to_unicode(e))) continue # now we swap the internal task and play context with their copies, @@ -232,6 +245,7 @@ class TaskExecutor: # now update the result with the item info, and append the result # to the list of results res['item'] = item + #TODO: send item results to callback here, instead of all at the end results.append(res) return results @@ -302,6 +316,11 @@ class TaskExecutor: # do the same kind of post validation step on it here before we use it. self._play_context.post_validate(templar=templar) + # now that the play context is finalized, if the remote_addr is not set + # default to using the host's address field as the remote address + if not self._play_context.remote_addr: + self._play_context.remote_addr = self._host.address + # We also add "magic" variables back into the variables dict to make sure # a certain subset of variables exist. self._play_context.update_vars(variables) @@ -348,8 +367,13 @@ class TaskExecutor: self._task.args = variable_params # get the connection and the handler for this execution - self._connection = self._get_connection(variables=variables, templar=templar) - self._connection.set_host_overrides(host=self._host) + if not self._connection or not getattr(self._connection, 'connected', False) or self._play_context.remote_addr != self._connection._play_context.remote_addr: + self._connection = self._get_connection(variables=variables, templar=templar) + self._connection.set_host_overrides(host=self._host) + else: + # if connection is reused, its _play_context is no longer valid and needs + # to be replaced with the one templated above, in case other data changed + self._connection._play_context = self._play_context self._handler = self._get_action_handler(connection=self._connection, templar=templar) @@ -372,30 +396,36 @@ class TaskExecutor: # make a copy of the job vars here, in case we need to update them # with the registered variable value later on when testing conditions - #vars_copy = variables.copy() vars_copy = variables.copy() display.debug("starting attempt loop") result = None for attempt in range(retries): if attempt > 0: - display.display("FAILED - RETRYING: %s (%d retries left). Result was: %s" % (self._task, retries-attempt, result), color="dark gray") + display.display("FAILED - RETRYING: %s (%d retries left). Result was: %s" % (self._task, retries-attempt, result), color=C.COLOR_DEBUG) result['attempts'] = attempt + 1 display.debug("running the handler") try: result = self._handler.run(task_vars=variables) except AnsibleConnectionFailure as e: - return dict(unreachable=True, msg=str(e)) + return dict(unreachable=True, msg=to_unicode(e)) display.debug("handler run complete") + # update the local copy of vars with the registered value, if specified, + # or any facts which may have been generated by the module execution + if self._task.register: + vars_copy[self._task.register] = wrap_var(result.copy()) + if self._task.async > 0: # the async_wrapper module returns dumped JSON via its stdout # response, so we parse it here and replace the result try: + if 'skipped' in result and result['skipped'] or 'failed' in result and result['failed']: + return result result = json.loads(result.get('stdout')) except (TypeError, ValueError) as e: - return dict(failed=True, msg="The async task did not return valid JSON: %s" % str(e)) + return dict(failed=True, msg=u"The async task did not return valid JSON: %s" % to_unicode(e)) if self._task.poll > 0: result = self._poll_async_result(result=result, templar=templar) @@ -416,11 +446,6 @@ class TaskExecutor: return failed_when_result return False - # update the local copy of vars with the registered value, if specified, - # or any facts which may have been generated by the module execution - if self._task.register: - vars_copy[self._task.register] = result - if 'ansible_facts' in result: vars_copy.update(result['ansible_facts']) @@ -437,7 +462,7 @@ class TaskExecutor: if attempt < retries - 1: cond = Conditional(loader=self._loader) - cond.when = self._task.until + cond.when = [ self._task.until ] if cond.evaluate_conditional(templar, vars_copy): break @@ -450,7 +475,7 @@ class TaskExecutor: # do the final update of the local variables here, for both registered # values and any facts which may have been created if self._task.register: - variables[self._task.register] = result + variables[self._task.register] = wrap_var(result) if 'ansible_facts' in result: variables.update(result['ansible_facts']) @@ -528,9 +553,6 @@ class TaskExecutor: correct connection object from the list of connection plugins ''' - if not self._play_context.remote_addr: - self._play_context.remote_addr = self._host.address - if self._task.delegate_to is not None: # since we're delegating, we don't want to use interpreter values # which would have been set for the original target host |