diff options
Diffstat (limited to 'lib/ansible/executor/process')
-rw-r--r-- | lib/ansible/executor/process/worker.py | 22 |
1 files changed, 16 insertions, 6 deletions
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index a8bb3d8085..4754d76790 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -21,6 +21,7 @@ __metaclass__ = type from ansible.compat.six.moves import queue +import json import multiprocessing import os import signal @@ -43,6 +44,7 @@ from ansible.executor.task_executor import TaskExecutor from ansible.executor.task_result import TaskResult from ansible.playbook.handler import Handler from ansible.playbook.task import Task +from ansible.vars.unsafe_proxy import AnsibleJSONUnsafeDecoder from ansible.utils.debug import debug @@ -59,9 +61,9 @@ class WorkerProcess(multiprocessing.Process): def __init__(self, tqm, main_q, rslt_q, loader): # takes a task queue manager as the sole param: - self._main_q = main_q - self._rslt_q = rslt_q - self._loader = loader + self._main_q = main_q + self._rslt_q = rslt_q + self._loader = loader # dupe stdin, if we have one self._new_stdin = sys.stdin @@ -97,9 +99,9 @@ class WorkerProcess(multiprocessing.Process): while True: task = None try: + debug("waiting for a message...") (host, task, basedir, job_vars, play_context, shared_loader_obj) = self._main_q.get() - debug("there's work to be done!") - debug("got a task/handler to work on: %s" % task) + debug("there's work to be done! got a task/handler to work on: %s" % task) # because the task queue manager starts workers (forks) before the # playbook is loaded, set the basedir of the loader inherted by @@ -114,7 +116,15 @@ class WorkerProcess(multiprocessing.Process): # execute the task and build a TaskResult from the result debug("running TaskExecutor() for %s/%s" % (host, task)) - executor_result = TaskExecutor(host, task, job_vars, play_context, self._new_stdin, self._loader, shared_loader_obj).run() + executor_result = TaskExecutor( + host, + task, + job_vars, + play_context, + self._new_stdin, + self._loader, + shared_loader_obj, + ).run() debug("done running TaskExecutor() for %s/%s" % (host, task)) task_result = TaskResult(host, task, executor_result) |