diff options
Diffstat (limited to 'lib/ansible/executor')
-rw-r--r-- | lib/ansible/executor/process/worker.py | 22 | ||||
-rw-r--r-- | lib/ansible/executor/task_executor.py | 9 | ||||
-rw-r--r-- | lib/ansible/executor/task_queue_manager.py | 7 |
3 files changed, 28 insertions, 10 deletions
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index a8bb3d8085..4754d76790 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -21,6 +21,7 @@ __metaclass__ = type from ansible.compat.six.moves import queue +import json import multiprocessing import os import signal @@ -43,6 +44,7 @@ from ansible.executor.task_executor import TaskExecutor from ansible.executor.task_result import TaskResult from ansible.playbook.handler import Handler from ansible.playbook.task import Task +from ansible.vars.unsafe_proxy import AnsibleJSONUnsafeDecoder from ansible.utils.debug import debug @@ -59,9 +61,9 @@ class WorkerProcess(multiprocessing.Process): def __init__(self, tqm, main_q, rslt_q, loader): # takes a task queue manager as the sole param: - self._main_q = main_q - self._rslt_q = rslt_q - self._loader = loader + self._main_q = main_q + self._rslt_q = rslt_q + self._loader = loader # dupe stdin, if we have one self._new_stdin = sys.stdin @@ -97,9 +99,9 @@ class WorkerProcess(multiprocessing.Process): while True: task = None try: + debug("waiting for a message...") (host, task, basedir, job_vars, play_context, shared_loader_obj) = self._main_q.get() - debug("there's work to be done!") - debug("got a task/handler to work on: %s" % task) + debug("there's work to be done! got a task/handler to work on: %s" % task) # because the task queue manager starts workers (forks) before the # playbook is loaded, set the basedir of the loader inherted by @@ -114,7 +116,15 @@ class WorkerProcess(multiprocessing.Process): # execute the task and build a TaskResult from the result debug("running TaskExecutor() for %s/%s" % (host, task)) - executor_result = TaskExecutor(host, task, job_vars, play_context, self._new_stdin, self._loader, shared_loader_obj).run() + executor_result = TaskExecutor( + host, + task, + job_vars, + play_context, + self._new_stdin, + self._loader, + shared_loader_obj, + ).run() debug("done running TaskExecutor() for %s/%s" % (host, task)) task_result = TaskResult(host, task, executor_result) diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py index 4b03dc0846..48018f12d3 100644 --- a/lib/ansible/executor/task_executor.py +++ b/lib/ansible/executor/task_executor.py @@ -156,7 +156,8 @@ class TaskExecutor: # create a copy of the job vars here so that we can modify # them temporarily without changing them too early for other # parts of the code that might still need a pristine version - vars_copy = self._job_vars.copy() + #vars_copy = self._job_vars.copy() + vars_copy = self._job_vars # now we update them with the play context vars self._play_context.update_vars(vars_copy) @@ -196,7 +197,8 @@ class TaskExecutor: # make copies of the job vars and task so we can add the item to # the variables and re-validate the task with the item variable - task_vars = self._job_vars.copy() + #task_vars = self._job_vars.copy() + task_vars = self._job_vars items = self._squash_items(items, task_vars) for item in items: @@ -340,7 +342,8 @@ class TaskExecutor: # make a copy of the job vars here, in case we need to update them # with the registered variable value later on when testing conditions - vars_copy = variables.copy() + #vars_copy = variables.copy() + vars_copy = variables self._display.debug("starting attempt loop") result = None diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index ae41afb600..b18e07a544 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -92,8 +92,13 @@ class TaskQueueManager: # plugins for inter-process locking. self._connection_lockfile = tempfile.TemporaryFile() + num_hosts = len(inventory.get_hosts()) + num_workers = self._options.forks + if num_workers > num_hosts: + num_workers = num_hosts + self._workers = [] - for i in range(self._options.forks): + for i in xrange(num_workers): main_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue() |