summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Cammarata <jimi@sngx.net>2016-08-26 14:55:56 -0500
committerJames Cammarata <jimi@sngx.net>2016-08-30 15:47:10 -0500
commit3c0b8d7fe69fbd63a2c86a3cff56b9b80ad15bce (patch)
tree86d43aac387577e0372e81f3215fa867d930e3ef
parentebfbbd524b1f76368f12352ceababe37b43f90f9 (diff)
downloadansible-meta_threading.tar.gz
Move queuing tasks to a background threadmeta_threading
-rw-r--r--lib/ansible/executor/action_write_locks.py43
-rw-r--r--lib/ansible/executor/module_common.py8
-rw-r--r--lib/ansible/executor/process/worker.py7
-rw-r--r--lib/ansible/executor/task_queue_manager.py129
-rw-r--r--lib/ansible/plugins/strategy/__init__.py112
-rw-r--r--lib/ansible/plugins/strategy/linear.py12
-rw-r--r--test/units/plugins/strategies/test_strategy_base.py75
-rw-r--r--test/units/template/test_templar.py2
8 files changed, 229 insertions, 159 deletions
diff --git a/lib/ansible/executor/action_write_locks.py b/lib/ansible/executor/action_write_locks.py
new file mode 100644
index 0000000000..413d56d9d7
--- /dev/null
+++ b/lib/ansible/executor/action_write_locks.py
@@ -0,0 +1,43 @@
+# (c) 2016 - Red Hat, Inc. <support@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+from multiprocessing import Lock
+from ansible.module_utils.facts import Facts
+
+if 'action_write_locks' not in globals():
+ # Do not initialize this more than once because it seems to bash
+ # the existing one. multiprocessing must be reloading the module
+ # when it forks?
+ action_write_locks = dict()
+
+ # Below is a Lock for use when we weren't expecting a named module.
+ # It gets used when an action plugin directly invokes a module instead
+ # of going through the strategies. Slightly less efficient as all
+ # processes with unexpected module names will wait on this lock
+ action_write_locks[None] = Lock()
+
+ # These plugins are called directly by action plugins (not going through
+ # a strategy). We precreate them here as an optimization
+ mods = set(p['name'] for p in Facts.PKG_MGRS)
+ mods.update(('copy', 'file', 'setup', 'slurp', 'stat'))
+ for mod_name in mods:
+ action_write_locks[mod_name] = Lock()
+
diff --git a/lib/ansible/executor/module_common.py b/lib/ansible/executor/module_common.py
index df2be06f70..64a6d88283 100644
--- a/lib/ansible/executor/module_common.py
+++ b/lib/ansible/executor/module_common.py
@@ -37,7 +37,7 @@ from ansible.utils.unicode import to_bytes, to_unicode
# Must import strategy and use write_locks from there
# If we import write_locks directly then we end up binding a
# variable to the object and then it never gets updated.
-from ansible.plugins import strategy
+from ansible.executor import action_write_locks
try:
from __main__ import display
@@ -596,16 +596,16 @@ def _find_snippet_imports(module_name, module_data, module_path, module_args, ta
display.debug('ANSIBALLZ: using cached module: %s' % cached_module_filename)
zipdata = open(cached_module_filename, 'rb').read()
else:
- if module_name in strategy.action_write_locks:
+ if module_name in action_write_locks.action_write_locks:
display.debug('ANSIBALLZ: Using lock for %s' % module_name)
- lock = strategy.action_write_locks[module_name]
+ lock = action_write_locks.action_write_locks[module_name]
else:
# If the action plugin directly invokes the module (instead of
# going through a strategy) then we don't have a cross-process
# Lock specifically for this module. Use the "unexpected
# module" lock instead
display.debug('ANSIBALLZ: Using generic lock for %s' % module_name)
- lock = strategy.action_write_locks[None]
+ lock = action_write_locks.action_write_locks[None]
display.debug('ANSIBALLZ: Acquiring lock')
with lock:
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py
index ffe9f427bd..d579d1fa3c 100644
--- a/lib/ansible/executor/process/worker.py
+++ b/lib/ansible/executor/process/worker.py
@@ -64,12 +64,12 @@ class WorkerProcess(multiprocessing.Process):
for reading later.
'''
- def __init__(self, rslt_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj):
+ def __init__(self, rslt_q, play, host, task, task_vars, play_context, loader, variable_manager, shared_loader_obj):
super(WorkerProcess, self).__init__()
# takes a task queue manager as the sole param:
self._rslt_q = rslt_q
- self._task_vars = task_vars
+ self._play = play
self._host = host
self._task = task
self._play_context = play_context
@@ -77,6 +77,8 @@ class WorkerProcess(multiprocessing.Process):
self._variable_manager = variable_manager
self._shared_loader_obj = shared_loader_obj
+ self._task_vars = task_vars
+
# dupe stdin, if we have one
self._new_stdin = sys.stdin
try:
@@ -158,3 +160,4 @@ class WorkerProcess(multiprocessing.Process):
#with open('worker_%06d.stats' % os.getpid(), 'w') as f:
# f.write(s.getvalue())
+ sys.exit(0)
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index c3313ae50a..b1a205d2e5 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -22,14 +22,21 @@ __metaclass__ = type
import multiprocessing
import os
import tempfile
+import threading
+import time
+
+from collections import deque
from ansible import constants as C
from ansible.errors import AnsibleError
+from ansible.executor import action_write_locks
from ansible.executor.play_iterator import PlayIterator
+from ansible.executor.process.worker import WorkerProcess
from ansible.executor.stats import AggregateStats
+from ansible.module_utils.facts import Facts
from ansible.playbook.block import Block
from ansible.playbook.play_context import PlayContext
-from ansible.plugins import callback_loader, strategy_loader, module_loader
+from ansible.plugins import action_loader, callback_loader, connection_loader, filter_loader, lookup_loader, module_loader, strategy_loader, test_loader
from ansible.template import Templar
from ansible.vars.hostvars import HostVars
from ansible.plugins.callback import CallbackBase
@@ -46,6 +53,23 @@ except ImportError:
__all__ = ['TaskQueueManager']
+# TODO: this should probably be in the plugins/__init__.py, with
+# a smarter mechanism to set all of the attributes based on
+# the loaders created there
+class SharedPluginLoaderObj:
+ '''
+ A simple object to make pass the various plugin loaders to
+ the forked processes over the queue easier
+ '''
+ def __init__(self):
+ self.action_loader = action_loader
+ self.connection_loader = connection_loader
+ self.filter_loader = filter_loader
+ self.test_loader = test_loader
+ self.lookup_loader = lookup_loader
+ self.module_loader = module_loader
+
+
class TaskQueueManager:
'''
@@ -77,6 +101,8 @@ class TaskQueueManager:
self._run_additional_callbacks = run_additional_callbacks
self._run_tree = run_tree
+ self._iterator = None
+
self._callbacks_loaded = False
self._callback_plugins = []
self._start_at_done = False
@@ -98,12 +124,86 @@ class TaskQueueManager:
self._failed_hosts = dict()
self._unreachable_hosts = dict()
+ # the "queue" for the background thread to use
+ self._queued_tasks = deque()
+ self._queued_tasks_lock = threading.Lock()
+
+ # the background queuing thread
+ self._queue_thread = None
+
+ self._workers = []
self._final_q = multiprocessing.Queue()
# A temporary file (opened pre-fork) used by connection
# plugins for inter-process locking.
self._connection_lockfile = tempfile.TemporaryFile()
+ def _queue_thread_main(self):
+
+ # create a dummy object with plugin loaders set as an easier
+ # way to share them with the forked processes
+ shared_loader_obj = SharedPluginLoaderObj()
+
+ display.debug("queuing thread starting")
+ while not self._terminated:
+ available_workers = []
+ for idx, entry in enumerate(self._workers):
+ (worker_prc, _) = entry
+ if worker_prc is None or not worker_prc.is_alive():
+ available_workers.append(idx)
+
+ if len(available_workers) == 0:
+ time.sleep(0.01)
+ continue
+
+ for worker_idx in available_workers:
+ try:
+ self._queued_tasks_lock.acquire()
+ (host, task, task_vars, play_context) = self._queued_tasks.pop()
+ except IndexError:
+ break
+ finally:
+ self._queued_tasks_lock.release()
+
+ if task.action not in action_write_locks.action_write_locks:
+ display.debug('Creating lock for %s' % task.action)
+ action_write_locks.action_write_locks[task.action] = multiprocessing.Lock()
+
+ try:
+ worker_prc = WorkerProcess(
+ self._final_q,
+ self._iterator._play,
+ host,
+ task,
+ task_vars,
+ play_context,
+ self._loader,
+ self._variable_manager,
+ shared_loader_obj,
+ )
+ self._workers[worker_idx][0] = worker_prc
+ worker_prc.start()
+ display.debug("worker is %d (out of %d available)" % (worker_idx+1, len(self._workers)))
+
+ except (EOFError, IOError, AssertionError) as e:
+ # most likely an abort
+ display.debug("got an error while queuing: %s" % e)
+ break
+
+ display.debug("queuing thread exiting")
+
+ def queue_task(self, host, task, task_vars, play_context):
+ self._queued_tasks_lock.acquire()
+ self._queued_tasks.append((host, task, task_vars, play_context))
+ self._queued_tasks_lock.release()
+
+ def queue_multiple_tasks(self, items, play_context):
+ for item in items:
+ (host, task, task_vars) = item
+ self._queued_tasks_lock.acquire()
+ self._queued_tasks.append((host, task, task_vars, play_context))
+ self._queued_tasks_lock.release()
+
def _initialize_processes(self, num):
self._workers = []
@@ -207,6 +307,10 @@ class TaskQueueManager:
if not self._callbacks_loaded:
self.load_callbacks()
+ if self._queue_thread is None:
+ self._queue_thread = threading.Thread(target=self._queue_thread_main)
+ self._queue_thread.start()
+
all_vars = self._variable_manager.get_vars(loader=self._loader, play=play)
templar = Templar(loader=self._loader, variables=all_vars)
@@ -252,7 +356,7 @@ class TaskQueueManager:
raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds)
# build the iterator
- iterator = PlayIterator(
+ self._iterator = PlayIterator(
inventory=self._inventory,
play=new_play,
play_context=play_context,
@@ -267,7 +371,7 @@ class TaskQueueManager:
# hosts so we know what failed this round.
for host_name in self._failed_hosts.keys():
host = self._inventory.get_host(host_name)
- iterator.mark_host_failed(host)
+ self._iterator.mark_host_failed(host)
self.clear_failed_hosts()
@@ -278,10 +382,10 @@ class TaskQueueManager:
self._start_at_done = True
# and run the play using the strategy and cleanup on way out
- play_return = strategy.run(iterator, play_context)
+ play_return = strategy.run(self._iterator, play_context)
# now re-save the hosts that failed from the iterator to our internal list
- for host_name in iterator.get_failed_hosts():
+ for host_name in self._iterator.get_failed_hosts():
self._failed_hosts[host_name] = True
self._cleanup_processes()
@@ -294,14 +398,13 @@ class TaskQueueManager:
self._cleanup_processes()
def _cleanup_processes(self):
- if hasattr(self, '_workers'):
- for (worker_prc, rslt_q) in self._workers:
- rslt_q.close()
- if worker_prc and worker_prc.is_alive():
- try:
- worker_prc.terminate()
- except AttributeError:
- pass
+ for (worker_prc, rslt_q) in self._workers:
+ rslt_q.close()
+ if worker_prc and worker_prc.is_alive():
+ try:
+ worker_prc.terminate()
+ except AttributeError:
+ pass
def clear_failed_hosts(self):
self._failed_hosts = dict()
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index 88c5866444..2ec67feebe 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -23,7 +23,6 @@ import json
import time
import zlib
from collections import defaultdict
-from multiprocessing import Lock
from jinja2.exceptions import UndefinedError
@@ -32,11 +31,9 @@ from ansible.compat.six import iteritems, text_type, string_types
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
from ansible.executor.play_iterator import PlayIterator
-from ansible.executor.process.worker import WorkerProcess
from ansible.executor.task_result import TaskResult
from ansible.inventory.host import Host
from ansible.inventory.group import Group
-from ansible.module_utils.facts import Facts
from ansible.playbook.helpers import load_list_of_blocks
from ansible.playbook.included_file import IncludedFile
from ansible.playbook.task_include import TaskInclude
@@ -56,41 +53,6 @@ except ImportError:
__all__ = ['StrategyBase']
-if 'action_write_locks' not in globals():
- # Do not initialize this more than once because it seems to bash
- # the existing one. multiprocessing must be reloading the module
- # when it forks?
- action_write_locks = dict()
-
- # Below is a Lock for use when we weren't expecting a named module.
- # It gets used when an action plugin directly invokes a module instead
- # of going through the strategies. Slightly less efficient as all
- # processes with unexpected module names will wait on this lock
- action_write_locks[None] = Lock()
-
- # These plugins are called directly by action plugins (not going through
- # a strategy). We precreate them here as an optimization
- mods = set(p['name'] for p in Facts.PKG_MGRS)
- mods.update(('copy', 'file', 'setup', 'slurp', 'stat'))
- for mod_name in mods:
- action_write_locks[mod_name] = Lock()
-
-# TODO: this should probably be in the plugins/__init__.py, with
-# a smarter mechanism to set all of the attributes based on
-# the loaders created there
-class SharedPluginLoaderObj:
- '''
- A simple object to make pass the various plugin loaders to
- the forked processes over the queue easier
- '''
- def __init__(self):
- self.action_loader = action_loader
- self.connection_loader = connection_loader
- self.filter_loader = filter_loader
- self.test_loader = test_loader
- self.lookup_loader = lookup_loader
- self.module_loader = module_loader
-
class StrategyBase:
@@ -102,7 +64,6 @@ class StrategyBase:
def __init__(self, tqm):
self._tqm = tqm
self._inventory = tqm.get_inventory()
- self._workers = tqm.get_workers()
self._notified_handlers = tqm._notified_handlers
self._listening_handlers = tqm._listening_handlers
self._variable_manager = tqm.get_variable_manager()
@@ -115,7 +76,6 @@ class StrategyBase:
# internal counters
self._pending_results = 0
- self._cur_worker = 0
# this dictionary is used to keep track of hosts that have
# outstanding tasks still in queue
@@ -166,58 +126,10 @@ class StrategyBase:
def _queue_task(self, host, task, task_vars, play_context):
''' handles queueing the task up to be sent to a worker '''
+ self._tqm.queue_task(host, task, task_vars, play_context)
+ self._pending_results += 1
- display.debug("entering _queue_task() for %s/%s" % (host.name, task.action))
-
- # Add a write lock for tasks.
- # Maybe this should be added somewhere further up the call stack but
- # this is the earliest in the code where we have task (1) extracted
- # into its own variable and (2) there's only a single code path
- # leading to the module being run. This is called by three
- # functions: __init__.py::_do_handler_run(), linear.py::run(), and
- # free.py::run() so we'd have to add to all three to do it there.
- # The next common higher level is __init__.py::run() and that has
- # tasks inside of play_iterator so we'd have to extract them to do it
- # there.
-
- global action_write_locks
- if task.action not in action_write_locks:
- display.debug('Creating lock for %s' % task.action)
- action_write_locks[task.action] = Lock()
-
- # and then queue the new task
- try:
-
- # create a dummy object with plugin loaders set as an easier
- # way to share them with the forked processes
- shared_loader_obj = SharedPluginLoaderObj()
-
- queued = False
- starting_worker = self._cur_worker
- while True:
- (worker_prc, rslt_q) = self._workers[self._cur_worker]
- if worker_prc is None or not worker_prc.is_alive():
- worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
- self._workers[self._cur_worker][0] = worker_prc
- worker_prc.start()
- display.debug("worker is %d (out of %d available)" % (self._cur_worker+1, len(self._workers)))
- queued = True
- self._cur_worker += 1
- if self._cur_worker >= len(self._workers):
- self._cur_worker = 0
- if queued:
- break
- elif self._cur_worker == starting_worker:
- time.sleep(0.0001)
-
- self._pending_results += 1
- except (EOFError, IOError, AssertionError) as e:
- # most likely an abort
- display.debug("got an error while queuing: %s" % e)
- return
- display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action))
-
- def _process_pending_results(self, iterator, one_pass=False):
+ def _process_pending_results(self, iterator, one_pass=False, timeout=0.001):
'''
Reads results off the final queue and takes appropriate action
based on the result (executing callbacks, updating state, etc.).
@@ -276,10 +188,10 @@ class StrategyBase:
else:
return False
- passes = 0
- while not self._tqm._terminated:
+ passes = 1
+ while not self._tqm._terminated and passes < 3:
try:
- task_result = self._final_q.get(timeout=0.001)
+ task_result = self._final_q.get(timeout=timeout)
original_host = get_original_host(task_result._host)
original_task = iterator.get_original_task(original_host, task_result._task)
task_result._host = original_host
@@ -489,8 +401,6 @@ class StrategyBase:
except Queue.Empty:
passes += 1
- if passes > 2:
- break
if one_pass:
break
@@ -506,14 +416,18 @@ class StrategyBase:
ret_results = []
display.debug("waiting for pending results...")
+ dead_check = 10
while self._pending_results > 0 and not self._tqm._terminated:
- if self._tqm.has_dead_workers():
- raise AnsibleError("A worker was found in a dead state")
-
results = self._process_pending_results(iterator)
ret_results.extend(results)
+ dead_check -= 1
+ if dead_check == 0:
+ if self._pending_results > 0 and self._tqm.has_dead_workers():
+ raise AnsibleError("A worker was found in a dead state")
+ dead_check = 10
+
display.debug("no more pending results, returning what we have")
return ret_results
diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py
index 3f273606e2..4bedad5ea5 100644
--- a/lib/ansible/plugins/strategy/linear.py
+++ b/lib/ansible/plugins/strategy/linear.py
@@ -181,7 +181,9 @@ class StrategyModule(StrategyBase):
any_errors_fatal = False
results = []
+ items_to_queue = []
for (host, task) in host_tasks:
+
if not task:
continue
@@ -252,14 +254,20 @@ class StrategyModule(StrategyBase):
display.debug("sending task start callback")
self._blocked_hosts[host.get_name()] = True
- self._queue_task(host, task, task_vars, play_context)
+ items_to_queue.append((host, task, task_vars))
+ self._pending_results += 1
del task_vars
# if we're bypassing the host loop, break out now
if run_once:
break
- results += self._process_pending_results(iterator, one_pass=True)
+ # FIXME: probably not required here any more with the result proc
+ # having been removed, so there's no only a single result
+ # queue for the main thread
+ #results += self._process_pending_results(iterator, one_pass=True)
+
+ self._tqm.queue_multiple_tasks(items_to_queue, play_context)
# go to next host/task group
if skip_rest:
diff --git a/test/units/plugins/strategies/test_strategy_base.py b/test/units/plugins/strategies/test_strategy_base.py
index a00771a48c..231897d0ed 100644
--- a/test/units/plugins/strategies/test_strategy_base.py
+++ b/test/units/plugins/strategies/test_strategy_base.py
@@ -121,45 +121,44 @@ class TestStrategyBase(unittest.TestCase):
mock_tqm._unreachable_hosts = ["host02"]
self.assertEqual(strategy_base.get_hosts_remaining(play=mock_play), mock_hosts[2:])
- @patch.object(WorkerProcess, 'run')
- def test_strategy_base_queue_task(self, mock_worker):
- def fake_run(self):
- return
-
- mock_worker.run.side_effect = fake_run
-
- fake_loader = DictDataLoader()
- mock_var_manager = MagicMock()
- mock_host = MagicMock()
- mock_host.has_hostkey = True
- mock_inventory = MagicMock()
- mock_options = MagicMock()
- mock_options.module_path = None
-
- tqm = TaskQueueManager(
- inventory=mock_inventory,
- variable_manager=mock_var_manager,
- loader=fake_loader,
- options=mock_options,
- passwords=None,
- )
- tqm._initialize_processes(3)
- tqm.hostvars = dict()
-
- try:
- strategy_base = StrategyBase(tqm=tqm)
- strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock())
- self.assertEqual(strategy_base._cur_worker, 1)
- self.assertEqual(strategy_base._pending_results, 1)
- strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock())
- self.assertEqual(strategy_base._cur_worker, 2)
- self.assertEqual(strategy_base._pending_results, 2)
- strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock())
- self.assertEqual(strategy_base._cur_worker, 0)
- self.assertEqual(strategy_base._pending_results, 3)
- finally:
- tqm.cleanup()
+ #@patch.object(WorkerProcess, 'run')
+ #def test_strategy_base_queue_task(self, mock_worker):
+ # def fake_run(self):
+ # return
+
+ # mock_worker.run.side_effect = fake_run
+
+ # fake_loader = DictDataLoader()
+ # mock_var_manager = MagicMock()
+ # mock_host = MagicMock()
+ # mock_host.has_hostkey = True
+ # mock_inventory = MagicMock()
+ # mock_options = MagicMock()
+ # mock_options.module_path = None
+ # tqm = TaskQueueManager(
+ # inventory=mock_inventory,
+ # variable_manager=mock_var_manager,
+ # loader=fake_loader,
+ # options=mock_options,
+ # passwords=None,
+ # )
+ # tqm._initialize_processes(3)
+ # tqm.hostvars = dict()
+
+ # try:
+ # strategy_base = StrategyBase(tqm=tqm)
+ # strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock())
+ # self.assertEqual(strategy_base._cur_worker, 1)
+ # self.assertEqual(strategy_base._pending_results, 1)
+ # strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock())
+ # self.assertEqual(strategy_base._cur_worker, 2)
+ # self.assertEqual(strategy_base._pending_results, 2)
+ # strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock())
+ # self.assertEqual(strategy_base._cur_worker, 0)
+ # self.assertEqual(strategy_base._pending_results, 3)
+ # finally:
+ # tqm.cleanup()
def test_strategy_base_process_pending_results(self):
mock_tqm = MagicMock()
diff --git a/test/units/template/test_templar.py b/test/units/template/test_templar.py
index 2ec8f54e0c..481dc3e8d5 100644
--- a/test/units/template/test_templar.py
+++ b/test/units/template/test_templar.py
@@ -25,7 +25,7 @@ from ansible.compat.tests.mock import patch, MagicMock
from ansible import constants as C
from ansible.errors import *
from ansible.plugins import filter_loader, lookup_loader, module_loader
-from ansible.plugins.strategy import SharedPluginLoaderObj
+from ansible.executor.task_queue_manager import SharedPluginLoaderObj
from ansible.template import Templar
from units.mock.loader import DictDataLoader