summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Cammarata <jimi@sngx.net>2015-11-16 13:41:21 -0500
committerJames Cammarata <jimi@sngx.net>2015-11-16 14:33:22 -0500
commite60c8839fd3fa67e18197926bcb3adffa13f7ebc (patch)
treeb5cb54ea731107531552a02f2c60f76ee1848a43
parent29389bc3389012bbd29b9b701e398bb9734f9344 (diff)
downloadansible-hostvars_manager.tar.gz
Additional fixes to correct bugs in hostvars manager implementationhostvars_manager
-rw-r--r--lib/ansible/executor/process/worker.py1
-rw-r--r--lib/ansible/executor/task_queue_manager.py47
-rw-r--r--lib/ansible/plugins/strategy/__init__.py50
-rw-r--r--lib/ansible/vars/hostvars.py24
4 files changed, 43 insertions, 79 deletions
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py
index 1ae832778b..a1a83a5dda 100644
--- a/lib/ansible/executor/process/worker.py
+++ b/lib/ansible/executor/process/worker.py
@@ -107,6 +107,7 @@ class WorkerProcess(multiprocessing.Process):
job_vars = json.loads(zlib.decompress(zip_vars))
else:
job_vars = zip_vars
+
job_vars['hostvars'] = self._hostvars.hostvars()
debug("there's work to be done! got a task/handler to work on: %s" % task)
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index 2f4868081f..3e62cb3c99 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -93,14 +93,14 @@ class TaskQueueManager:
# plugins for inter-process locking.
self._connection_lockfile = tempfile.TemporaryFile()
- def _initialize_processes(self, num, hostvars_manager):
+ def _initialize_processes(self, num):
self._workers = []
for i in xrange(num):
main_q = multiprocessing.Queue()
rslt_q = multiprocessing.Queue()
- prc = WorkerProcess(self, main_q, rslt_q, hostvars_manager, self._loader)
+ prc = WorkerProcess(self, main_q, rslt_q, self._hostvars_manager, self._loader)
prc.start()
self._workers.append((prc, main_q, rslt_q))
@@ -175,36 +175,42 @@ class TaskQueueManager:
are done with the current task).
'''
+ if not self._callbacks_loaded:
+ self.load_callbacks()
+
+ all_vars = self._variable_manager.get_vars(loader=self._loader, play=play)
+ templar = Templar(loader=self._loader, variables=all_vars)
+
+ new_play = play.copy()
+ new_play.post_validate(templar)
+
class HostVarsManager(SyncManager):
pass
- def get_hostvars():
- return hostvars
-
hostvars = HostVars(
- play=play,
+ play=new_play,
inventory=self._inventory,
variable_manager=self._variable_manager,
loader=self._loader,
)
- HostVarsManager.register('hostvars', get_hostvars, DictProxy)
- hostvars_manager = HostVarsManager()
- hostvars_manager.start()
+ HostVarsManager.register(
+ 'hostvars',
+ callable=lambda: hostvars,
+ # FIXME: this is the list of exposed methods to the DictProxy object, plus our
+ # one special one (set_variable_manager). There's probably a better way
+ # to do this with a proper BaseProxy/DictProxy derivative
+ exposed=('set_variable_manager', '__contains__', '__delitem__', '__getitem__',
+ '__len__', '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
+ 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'),
+ )
+ self._hostvars_manager = HostVarsManager()
+ self._hostvars_manager.start()
# Fork # of forks, # of hosts or serial, whichever is lowest
- contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))]
+ contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(new_play.hosts))]
contenders = [ v for v in contenders if v is not None and v > 0 ]
- self._initialize_processes(min(contenders), hostvars_manager)
-
- if not self._callbacks_loaded:
- self.load_callbacks()
-
- all_vars = self._variable_manager.get_vars(loader=self._loader, play=play)
- templar = Templar(loader=self._loader, variables=all_vars)
-
- new_play = play.copy()
- new_play.post_validate(templar)
+ self._initialize_processes(min(contenders))
play_context = PlayContext(new_play, self._options, self.passwords, self._connection_lockfile.fileno())
for callback_plugin in self._callback_plugins:
@@ -240,6 +246,7 @@ class TaskQueueManager:
# and run the play using the strategy and cleanup on way out
play_return = strategy.run(iterator, play_context)
self._cleanup_processes()
+ self._hostvars_manager.shutdown()
return play_return
def cleanup(self):
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index 501f8a6486..f1f4650529 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -153,24 +153,6 @@ class StrategyBase:
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
- def dump_dict(obj, indent=0):
- print("%sdict:" % (" "*indent,))
- for k in obj.keys():
- dump_vars(obj[k], indent)
-
- def dump_list(obj, indent=0):
- print("%slist:" % (" "*indent,))
- for item in obj:
- dump_vars(item, indent)
-
- def dump_vars(obj, indent=0):
- if isinstance(obj, dict):
- dump_dict(obj, indent+2)
- elif isinstance(obj, list):
- dump_list(obj, indent+2)
- else:
- print("%svar: %s (%s)" % (" "*indent, obj, type(obj)))
-
# compress (and convert) the data if so configured, which can
# help a lot when the variable dictionary is huge. We pop the
# hostvars out of the task variables right now, due to the fact
@@ -187,36 +169,6 @@ class StrategyBase:
zip_vars = task_vars # noqa (pyflakes false positive because task_vars is deleted in the conditional above)
# and queue the task
- if False: #C.DEFAULT_DEBUG:
- print("vars has %d keys" % len(zip_vars.keys()))
- print("vars are:")
- dump_vars(zip_vars)
-
- import datetime
- import pickle
- import cPickle
- p1 = datetime.datetime.now()
- d1 = cPickle.dumps(host, pickle.HIGHEST_PROTOCOL)
- p2 = datetime.datetime.now()
- d2 = cPickle.dumps(task, pickle.HIGHEST_PROTOCOL)
- p3 = datetime.datetime.now()
- d3 = cPickle.dumps(self._loader.get_basedir(), pickle.HIGHEST_PROTOCOL)
- p4 = datetime.datetime.now()
- d4 = cPickle.dumps(zip_vars, pickle.HIGHEST_PROTOCOL)
- p5 = datetime.datetime.now()
- d5 = cPickle.dumps(hostvars, pickle.HIGHEST_PROTOCOL)
- p6 = datetime.datetime.now()
- d6 = cPickle.dumps(play_context, pickle.HIGHEST_PROTOCOL)
- p7 = datetime.datetime.now()
- d7 = cPickle.dumps(shared_loader_obj, pickle.HIGHEST_PROTOCOL)
- p8 = datetime.datetime.now()
- print("time to serialize host: %s (size: %s)" % (p2-p1, len(d1)))
- print("time to serialize task: %s (size: %s)" % (p3-p2, len(d2)))
- print("time to serialize loader basedir: %s (size: %s)" % (p4-p3, len(d3)))
- print("time to serialize vars: %s (size: %s)" % (p5-p4, len(d4)))
- print("time to serialize hostvars: %s (size: %s)" % (p6-p5, len(d5)))
- print("time to serialize play context: %s (size: %s)" % (p7-p6, len(d6)))
- print("time to serialize shared plugin loader: %s (size: %s)" % (p8-p7, len(d7)))
main_q.put((host, task, self._loader.get_basedir(), zip_vars, compressed_vars, play_context, shared_loader_obj))
self._pending_results += 1
@@ -321,6 +273,7 @@ class StrategyBase:
var_value = wrap_var(result[3])
self._variable_manager.set_nonpersistent_facts(host, {var_name: var_value})
+ self._tqm._hostvars_manager.hostvars().set_variable_manager(self._variable_manager)
elif result[0] in ('set_host_var', 'set_host_facts'):
host = result[1]
@@ -351,6 +304,7 @@ class StrategyBase:
self._variable_manager.set_nonpersistent_facts(target_host, facts)
else:
self._variable_manager.set_host_facts(target_host, facts)
+ self._tqm._hostvars_manager.hostvars().set_variable_manager(self._variable_manager)
else:
raise AnsibleError("unknown result message received: %s" % result[0])
diff --git a/lib/ansible/vars/hostvars.py b/lib/ansible/vars/hostvars.py
index 8829526c9f..3846b5117c 100644
--- a/lib/ansible/vars/hostvars.py
+++ b/lib/ansible/vars/hostvars.py
@@ -46,7 +46,6 @@ __all__ = ['HostVars']
class HostVars(collections.Mapping):
''' A special view of vars_cache that adds values from the inventory when needed. '''
- _exposed_ = ('__getitem__', '__contains__', '__iter__', '__len__')
def __init__(self, play, inventory, variable_manager, loader):
self._lookup = dict()
self._inventory = inventory
@@ -55,6 +54,9 @@ class HostVars(collections.Mapping):
self._variable_manager = variable_manager
self._cached_result = dict()
+ def set_variable_manager(self, variable_manager):
+ self._variable_manager = variable_manager
+
def _find_host(self, host_name):
return self._inventory.get_host(host_name)
@@ -66,18 +68,18 @@ class HostVars(collections.Mapping):
data = self._variable_manager.get_vars(loader=self._loader, host=host, play=self._play, include_hostvars=False)
# Using cache in order to avoid template call
- sha1_hash = sha1(str(data).encode('utf-8')).hexdigest()
- if sha1_hash in self._cached_result:
- result = self._cached_result[sha1_hash]
- else:
- templar = Templar(variables=data, loader=self._loader)
- result = templar.template(data, fail_on_undefined=False, static_vars=STATIC_VARS)
- self._cached_result[sha1_hash] = result
-
- return result
+ #sha1_hash = sha1(str(data).encode('utf-8')).hexdigest()
+ #if sha1_hash in self._cached_result:
+ # result = self._cached_result[sha1_hash]
+ #else:
+ # templar = Templar(variables=data, loader=self._loader)
+ # result = templar.template(data, fail_on_undefined=False, static_vars=STATIC_VARS)
+ # self._cached_result[sha1_hash] = result
+ #return result
+ return data
def __contains__(self, host_name):
- return True #self._find_host(host_name) is not None
+ return self._find_host(host_name) is not None
def __iter__(self):
for host in self._inventory.get_hosts(ignore_limits_and_restrictions=True):