summaryrefslogtreecommitdiff
path: root/lib/ansible
diff options
context:
space:
mode:
authorMichael DeHaan <michael@ansibleworks.com>2014-02-07 20:38:20 -0500
committerMichael DeHaan <michael@ansibleworks.com>2014-02-07 20:38:20 -0500
commit633d685bb73b3af91117cee9ae6d25cf992fc026 (patch)
treed6ff8e1e316d2effa4be7593f85babede17809d8 /lib/ansible
parentad8c275887086fb3d8534852a9285160f7c6eb29 (diff)
downloadansible-633d685bb73b3af91117cee9ae6d25cf992fc026.tar.gz
Revert "Revert "Revert "This patch makes Ansible reuse fork allocation between seperate instantations of the runner API, therefore the overhead of recreating forks"""
This reverts commit 73ca1a173c8ede76f19ebce5d6a018fd41e30c68.
Diffstat (limited to 'lib/ansible')
-rw-r--r--lib/ansible/runner/__init__.py68
1 files changed, 47 insertions, 21 deletions
diff --git a/lib/ansible/runner/__init__.py b/lib/ansible/runner/__init__.py
index 9160a0b592..528dfb42ff 100644
--- a/lib/ansible/runner/__init__.py
+++ b/lib/ansible/runner/__init__.py
@@ -49,7 +49,6 @@ from ansible.module_common import ModuleReplacer
module_replacer = ModuleReplacer(strip_comments=False)
-NEED_ATFORK=False
HAS_ATFORK=True
try:
from Crypto.Random import atfork
@@ -61,28 +60,30 @@ multiprocessing_runner = None
OUTPUT_LOCKFILE = tempfile.TemporaryFile()
PROCESS_LOCKFILE = tempfile.TemporaryFile()
-from foon import Foon
-
-FOON = Foon()
-
################################################
-class KeyboardInterruptError(Exception):
- pass
-
-def _executor_hook(params):
-
- (host, my_stdin) = params
+def _executor_hook(job_queue, result_queue, new_stdin):
# attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17
# this function also not present in CentOS 6
- if HAS_ATFORK and NEED_ATFORK:
+ if HAS_ATFORK:
atfork()
- try:
- return multiprocessing_runner._executor(host, my_stdin)
- except KeyboardInterrupt:
- raise KeyboardInterruptError()
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ while not job_queue.empty():
+ try:
+ host = job_queue.get(block=False)
+ return_data = multiprocessing_runner._executor(host, new_stdin)
+ result_queue.put(return_data)
+
+ if 'LEGACY_TEMPLATE_WARNING' in return_data.flags:
+ # pass data back up across the multiprocessing fork boundary
+ template.Flags.LEGACY_TEMPLATE_WARNING = True
+
+ except Queue.Empty:
+ pass
+ except:
+ traceback.print_exc()
class HostVars(dict):
''' A special view of setup_cache that adds values from the inventory when needed. '''
@@ -208,9 +209,6 @@ class Runner(object):
else:
self.transport = "ssh"
- if self.transport == "paramiko":
- global NEED_ATFORK
- NEED_ATFORK=True
# misc housekeeping
if subset and self.inventory._subset is None:
@@ -1058,11 +1056,39 @@ class Runner(object):
# *****************************************************
+
def _parallel_exec(self, hosts):
''' handles mulitprocessing when more than 1 fork is required '''
- FOON.set_size(self.forks)
- return FOON.map(_executor_hook, hosts)
+ manager = multiprocessing.Manager()
+ job_queue = manager.Queue()
+ for host in hosts:
+ job_queue.put(host)
+ result_queue = manager.Queue()
+
+ workers = []
+ for i in range(self.forks):
+ new_stdin = os.fdopen(os.dup(sys.stdin.fileno()))
+ prc = multiprocessing.Process(target=_executor_hook,
+ args=(job_queue, result_queue, new_stdin))
+ prc.start()
+ workers.append(prc)
+
+ try:
+ for worker in workers:
+ worker.join()
+ except KeyboardInterrupt:
+ for worker in workers:
+ worker.terminate()
+ worker.join()
+
+ results = []
+ try:
+ while not result_queue.empty():
+ results.append(result_queue.get(block=False))
+ except socket.error:
+ raise errors.AnsibleError("<interrupted>")
+ return results
# *****************************************************