summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Martz <matt@sivel.net>2022-01-21 13:20:12 -0600
committerGitHub <noreply@github.com>2022-01-21 13:20:12 -0600
commit30aeba87c319f82a8342db1ca8582a644e7684b8 (patch)
tree2bc6ad08df1a2607cf76378e8370739e209282e2
parentfdb93a912b527633d32516bf2971572b930b83ea (diff)
downloadansible-30aeba87c319f82a8342db1ca8582a644e7684b8.tar.gz
[stable-2.12] Resolve perf issue with async callback events (#76783) (#76816)
* Don't send full task with async callback events. Fixes #76729 * Use args for async_status task, instead of k=v * Make sure we send back the async task attrs for polling * Add clog frag * load is a staticmethod (cherry picked from commit 96ce480) Co-authored-by: Matt Martz <matt@sivel.net>
-rw-r--r--changelogs/fragments/76729-async-callback-perf.yml3
-rw-r--r--lib/ansible/executor/task_executor.py12
-rw-r--r--lib/ansible/plugins/strategy/__init__.py16
3 files changed, 22 insertions, 9 deletions
diff --git a/changelogs/fragments/76729-async-callback-perf.yml b/changelogs/fragments/76729-async-callback-perf.yml
new file mode 100644
index 0000000000..64d9299554
--- /dev/null
+++ b/changelogs/fragments/76729-async-callback-perf.yml
@@ -0,0 +1,3 @@
+bugfixes:
+- async - Improve performance of sending async callback events by never sending the full task through the queue
+ (https://github.com/ansible/ansible/issues/76729)
diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py
index 8ff764731c..1a7c666fea 100644
--- a/lib/ansible/executor/task_executor.py
+++ b/lib/ansible/executor/task_executor.py
@@ -622,14 +622,14 @@ class TaskExecutor:
self._final_q.send_callback(
'v2_runner_on_async_failed',
TaskResult(self._host.name,
- self._task, # We send the full task here, because the controller knows nothing about it, the TE created it
+ self._task._uuid,
result,
task_fields=self._task.dump_attrs()))
else:
self._final_q.send_callback(
'v2_runner_on_async_ok',
TaskResult(self._host.name,
- self._task, # We send the full task here, because the controller knows nothing about it, the TE created it
+ self._task._uuid,
result,
task_fields=self._task.dump_attrs()))
@@ -798,7 +798,7 @@ class TaskExecutor:
# that (with a sleep for "poll" seconds between each retry) until the
# async time limit is exceeded.
- async_task = Task().load(dict(action='async_status jid=%s' % async_jid, environment=self._task.environment))
+ async_task = Task.load(dict(action='async_status', args={'jid': async_jid}, environment=self._task.environment))
# FIXME: this is no longer the case, normal takes care of all, see if this can just be generalized
# Because this is an async task, the action handler is async. However,
@@ -850,9 +850,9 @@ class TaskExecutor:
'v2_runner_on_async_poll',
TaskResult(
self._host.name,
- async_task, # We send the full task here, because the controller knows nothing about it, the TE created it
+ async_task._uuid,
async_result,
- task_fields=self._task.dump_attrs(),
+ task_fields=async_task.dump_attrs(),
),
)
@@ -864,7 +864,7 @@ class TaskExecutor:
else:
# If the async task finished, automatically cleanup the temporary
# status file left behind.
- cleanup_task = Task().load(
+ cleanup_task = Task.load(
{
'async_status': {
'jid': async_jid,
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index f415709a54..29de68ce08 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -46,6 +46,7 @@ from ansible.playbook.conditional import Conditional
from ansible.playbook.handler import Handler
from ansible.playbook.helpers import load_list_of_blocks
from ansible.playbook.included_file import IncludedFile
+from ansible.playbook.task import Task
from ansible.playbook.task_include import TaskInclude
from ansible.plugins import loader as plugin_loader
from ansible.template import Templar
@@ -469,9 +470,18 @@ class StrategyBase:
if isinstance(task_result._task, string_types):
# If the value is a string, it is ``Task._uuid``
queue_cache_entry = (task_result._host.name, task_result._task)
- found_task = self._queued_task_cache.get(queue_cache_entry)['task']
- original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
- original_task._parent = found_task._parent
+ try:
+ found_task = self._queued_task_cache[queue_cache_entry]['task']
+ except KeyError:
+ # This should only happen due to an implicit task created by the
+ # TaskExecutor, restrict this behavior to the explicit use case
+ # of an implicit async_status task
+ if task_result._task_fields.get('action') != 'async_status':
+ raise
+ original_task = Task()
+ else:
+ original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
+ original_task._parent = found_task._parent
original_task.from_attrs(task_result._task_fields)
task_result._task = original_task