summaryrefslogtreecommitdiff
path: root/lib/ansible/executor
diff options
context:
space:
mode:
authorJames Cammarata <jimi@sngx.net>2015-11-04 11:26:06 -0500
committerJames Cammarata <jimi@sngx.net>2015-11-04 11:26:06 -0500
commit89d713aa520fe17131efe78218f20ad09fe99688 (patch)
tree08105f897a84dff59371a36c5f0d4e558c75dd1f /lib/ansible/executor
parent61ace1d3c486b294a1badad7b5f60fc40b763755 (diff)
downloadansible-perf_improvements.tar.gz
Fixing up performanceperf_improvements
Diffstat (limited to 'lib/ansible/executor')
-rw-r--r--lib/ansible/executor/process/worker.py22
-rw-r--r--lib/ansible/executor/task_executor.py9
-rw-r--r--lib/ansible/executor/task_queue_manager.py7
3 files changed, 28 insertions, 10 deletions
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py
index a8bb3d8085..4754d76790 100644
--- a/lib/ansible/executor/process/worker.py
+++ b/lib/ansible/executor/process/worker.py
@@ -21,6 +21,7 @@ __metaclass__ = type
from ansible.compat.six.moves import queue
+import json
import multiprocessing
import os
import signal
@@ -43,6 +44,7 @@ from ansible.executor.task_executor import TaskExecutor
from ansible.executor.task_result import TaskResult
from ansible.playbook.handler import Handler
from ansible.playbook.task import Task
+from ansible.vars.unsafe_proxy import AnsibleJSONUnsafeDecoder
from ansible.utils.debug import debug
@@ -59,9 +61,9 @@ class WorkerProcess(multiprocessing.Process):
def __init__(self, tqm, main_q, rslt_q, loader):
# takes a task queue manager as the sole param:
- self._main_q = main_q
- self._rslt_q = rslt_q
- self._loader = loader
+ self._main_q = main_q
+ self._rslt_q = rslt_q
+ self._loader = loader
# dupe stdin, if we have one
self._new_stdin = sys.stdin
@@ -97,9 +99,9 @@ class WorkerProcess(multiprocessing.Process):
while True:
task = None
try:
+ debug("waiting for a message...")
(host, task, basedir, job_vars, play_context, shared_loader_obj) = self._main_q.get()
- debug("there's work to be done!")
- debug("got a task/handler to work on: %s" % task)
+ debug("there's work to be done! got a task/handler to work on: %s" % task)
# because the task queue manager starts workers (forks) before the
# playbook is loaded, set the basedir of the loader inherted by
@@ -114,7 +116,15 @@ class WorkerProcess(multiprocessing.Process):
# execute the task and build a TaskResult from the result
debug("running TaskExecutor() for %s/%s" % (host, task))
- executor_result = TaskExecutor(host, task, job_vars, play_context, self._new_stdin, self._loader, shared_loader_obj).run()
+ executor_result = TaskExecutor(
+ host,
+ task,
+ job_vars,
+ play_context,
+ self._new_stdin,
+ self._loader,
+ shared_loader_obj,
+ ).run()
debug("done running TaskExecutor() for %s/%s" % (host, task))
task_result = TaskResult(host, task, executor_result)
diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py
index 4b03dc0846..48018f12d3 100644
--- a/lib/ansible/executor/task_executor.py
+++ b/lib/ansible/executor/task_executor.py
@@ -156,7 +156,8 @@ class TaskExecutor:
# create a copy of the job vars here so that we can modify
# them temporarily without changing them too early for other
# parts of the code that might still need a pristine version
- vars_copy = self._job_vars.copy()
+ #vars_copy = self._job_vars.copy()
+ vars_copy = self._job_vars
# now we update them with the play context vars
self._play_context.update_vars(vars_copy)
@@ -196,7 +197,8 @@ class TaskExecutor:
# make copies of the job vars and task so we can add the item to
# the variables and re-validate the task with the item variable
- task_vars = self._job_vars.copy()
+ #task_vars = self._job_vars.copy()
+ task_vars = self._job_vars
items = self._squash_items(items, task_vars)
for item in items:
@@ -340,7 +342,8 @@ class TaskExecutor:
# make a copy of the job vars here, in case we need to update them
# with the registered variable value later on when testing conditions
- vars_copy = variables.copy()
+ #vars_copy = variables.copy()
+ vars_copy = variables
self._display.debug("starting attempt loop")
result = None
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index ae41afb600..b18e07a544 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -92,8 +92,13 @@ class TaskQueueManager:
# plugins for inter-process locking.
self._connection_lockfile = tempfile.TemporaryFile()
+ num_hosts = len(inventory.get_hosts())
+ num_workers = self._options.forks
+ if num_workers > num_hosts:
+ num_workers = num_hosts
+
self._workers = []
- for i in range(self._options.forks):
+ for i in xrange(num_workers):
main_q = multiprocessing.Queue()
rslt_q = multiprocessing.Queue()