summaryrefslogtreecommitdiff
path: root/lib/ansible/executor/process
diff options
context:
space:
mode:
authorMatt Martz <matt@sivel.net>2020-08-17 10:51:01 -0500
committerGitHub <noreply@github.com>2020-08-17 10:51:01 -0500
commit5821128995097831f3dd31fca80546c5d6392e66 (patch)
tree0f5f81c51d584ede8392593d21fcbcd9c442c5bc /lib/ansible/executor/process
parent92d59a58c09f2a8baf811abe1beb09e4f911eb54 (diff)
downloadansible-5821128995097831f3dd31fca80546c5d6392e66.tar.gz
Allow callbacks from forks (#70501)
* POC for supporting callback events that come from the worker * linting fixes. ci_complete * fix up units. ci_complete * Try moving the sentinel put higher. ci_complete * safeguards. ci_complete * Move queue killing to terminate * LINTING. ci_complete * Subclass Queue, to add helper send_callback method * Just use _final_q instead of adding another queue and thread * Revert a few changes * Add helper for inserting a TaskResult into the _final_q * Add changelog fragment * Address rebase issue * ci_complete * Add test to assert async poll callback from fork * Don't use full path * ci_complete * Use _results_lock as a context manager * Add new generic lock decorator, and use it with send_callback
Diffstat (limited to 'lib/ansible/executor/process')
-rw-r--r--lib/ansible/executor/process/worker.py15
1 files changed, 6 insertions, 9 deletions
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py
index 0b18fc351f..df3db35e4a 100644
--- a/lib/ansible/executor/process/worker.py
+++ b/lib/ansible/executor/process/worker.py
@@ -166,41 +166,38 @@ class WorkerProcess(multiprocessing_context.Process):
display.debug("done running TaskExecutor() for %s/%s [%s]" % (self._host, self._task, self._task._uuid))
self._host.vars = dict()
self._host.groups = []
- task_result = TaskResult(
+
+ # put the result on the result queue
+ display.debug("sending task result for task %s" % self._task._uuid)
+ self._final_q.send_task_result(
self._host.name,
self._task._uuid,
executor_result,
task_fields=self._task.dump_attrs(),
)
-
- # put the result on the result queue
- display.debug("sending task result for task %s" % self._task._uuid)
- self._final_q.put(task_result)
display.debug("done sending task result for task %s" % self._task._uuid)
except AnsibleConnectionFailure:
self._host.vars = dict()
self._host.groups = []
- task_result = TaskResult(
+ self._final_q.send_task_result(
self._host.name,
self._task._uuid,
dict(unreachable=True),
task_fields=self._task.dump_attrs(),
)
- self._final_q.put(task_result, block=False)
except Exception as e:
if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound):
try:
self._host.vars = dict()
self._host.groups = []
- task_result = TaskResult(
+ self._final_q.send_task_result(
self._host.name,
self._task._uuid,
dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''),
task_fields=self._task.dump_attrs(),
)
- self._final_q.put(task_result, block=False)
except Exception:
display.debug(u"WORKER EXCEPTION: %s" % to_text(e))
display.debug(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc()))