summaryrefslogtreecommitdiff
path: root/lib/ansible
diff options
context:
space:
mode:
authorMichael DeHaan <michael@ansibleworks.com>2014-02-07 18:14:13 -0500
committerMichael DeHaan <michael@ansibleworks.com>2014-02-07 18:14:13 -0500
commit60d3611b702898e64f4f132d7dc7a6d3691c54e0 (patch)
treed6ff8e1e316d2effa4be7593f85babede17809d8 /lib/ansible
parent6685b4989e0763793e7b87170027af40f634ce46 (diff)
downloadansible-60d3611b702898e64f4f132d7dc7a6d3691c54e0.tar.gz
Revert "Revert "Add the fork manager code (related to previous patch).""
This reverts commit c53538dc77b46b50d50d4307d3d41616bd45a341.
Diffstat (limited to 'lib/ansible')
-rw-r--r--lib/ansible/runner/foon.py134
1 files changed, 134 insertions, 0 deletions
diff --git a/lib/ansible/runner/foon.py b/lib/ansible/runner/foon.py
new file mode 100644
index 0000000000..f084ba726e
--- /dev/null
+++ b/lib/ansible/runner/foon.py
@@ -0,0 +1,134 @@
+# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+
+
+
+import multiprocessing.pool as mpool
+import multiprocessing
+import sys
+import os
+
+POOL = None
+OLD_SIZE = 0
+
+class MyPool(mpool.Pool):
+
+ '''
+ What is Foon?
+ A Foon is another name for a Spork, which is a Fork plus a Spoon
+ this class is a wrapper around multiprocessing in Python
+ which deals with parallelism via Forks.
+ there is no Spoon.
+
+ Two things we do differently over stock multiprocessing:
+ * intercept exceptions
+ * duplicate stdin per host to enable the process to ask questions about host key checking
+
+ That's it. This class is specific to Ansible's runner forking and is not meant to be generic.
+ '''
+
+
+ def __init__(self, *args, **kwargs):
+ super(MyPool, self).__init__(*args, **kwargs)
+
+ # overriding map_async to catch exceptions and be extensible
+
+ def map_async(self, func, iterable, chunksize=None, callback=None):
+ '''
+ Asynchronous equivalent of `map()` builtin
+ '''
+
+ mapstar = mpool.mapstar
+
+ if not hasattr(iterable, '__len__'):
+ iterable = list(iterable)
+
+ if chunksize is None:
+ chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
+ if extra:
+ chunksize += 1
+
+ task_batches = MyPool._get_tasks(func, iterable, chunksize)
+
+ new_batches = []
+ stdins = []
+ for x in task_batches:
+ # make sure each batch has a different stdin
+ new_stdin = os.fdopen(os.dup(sys.stdin.fileno()))
+ (function, data_list) = x
+ new_data_list = []
+ for host_name in data_list:
+ new_data_list.append((host_name, new_stdin))
+ stdins.append(new_stdin)
+ new_batches.append((function, new_data_list))
+
+ result = mpool.MapResult(self._cache, chunksize, len(iterable), callback)
+
+ #for i, x in enumerate(task_batches):
+ # print "%s => %s" % (i,x)
+
+ self._taskqueue.put(
+ (
+ (
+ (result._job, i, mapstar, (x,), {}) for i, x in enumerate(new_batches) # task_batches)
+ ), None)
+ )
+
+ for x in stdins:
+ x.close()
+
+ return result
+
+
+
+
+class Foon(object):
+
+ def __init__(self):
+ self.set_size(0)
+
+ def make_pool(self, processes=None, initializer=None, initargs=()):
+ '''
+ Returns a process pool object
+ '''
+ return MyPool(processes, initializer, initargs)
+
+ def set_size(self, size):
+
+ global OLD_SIZE
+ global POOL
+
+ if size > OLD_SIZE or POOL is None:
+ OLD_SIZE = size
+ POOL = self.make_pool()
+
+
+ def map(self, function, data_list):
+
+ global POOL
+ try:
+ return POOL.map(function, data_list)
+ except KeyboardInterrupt:
+ print "KEYBOARD INTERRUPT!"
+ sys.exit(1)
+
+
+
+
+
+