diff options
author | James Cammarata <jimi@sngx.net> | 2015-11-16 13:41:21 -0500 |
---|---|---|
committer | James Cammarata <jimi@sngx.net> | 2015-11-16 14:33:22 -0500 |
commit | e60c8839fd3fa67e18197926bcb3adffa13f7ebc (patch) | |
tree | b5cb54ea731107531552a02f2c60f76ee1848a43 | |
parent | 29389bc3389012bbd29b9b701e398bb9734f9344 (diff) | |
download | ansible-hostvars_manager.tar.gz |
Additional fixes to correct bugs in hostvars manager implementationhostvars_manager
-rw-r--r-- | lib/ansible/executor/process/worker.py | 1 | ||||
-rw-r--r-- | lib/ansible/executor/task_queue_manager.py | 47 | ||||
-rw-r--r-- | lib/ansible/plugins/strategy/__init__.py | 50 | ||||
-rw-r--r-- | lib/ansible/vars/hostvars.py | 24 |
4 files changed, 43 insertions, 79 deletions
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index 1ae832778b..a1a83a5dda 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -107,6 +107,7 @@ class WorkerProcess(multiprocessing.Process): job_vars = json.loads(zlib.decompress(zip_vars)) else: job_vars = zip_vars + job_vars['hostvars'] = self._hostvars.hostvars() debug("there's work to be done! got a task/handler to work on: %s" % task) diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 2f4868081f..3e62cb3c99 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -93,14 +93,14 @@ class TaskQueueManager: # plugins for inter-process locking. self._connection_lockfile = tempfile.TemporaryFile() - def _initialize_processes(self, num, hostvars_manager): + def _initialize_processes(self, num): self._workers = [] for i in xrange(num): main_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue() - prc = WorkerProcess(self, main_q, rslt_q, hostvars_manager, self._loader) + prc = WorkerProcess(self, main_q, rslt_q, self._hostvars_manager, self._loader) prc.start() self._workers.append((prc, main_q, rslt_q)) @@ -175,36 +175,42 @@ class TaskQueueManager: are done with the current task). ''' + if not self._callbacks_loaded: + self.load_callbacks() + + all_vars = self._variable_manager.get_vars(loader=self._loader, play=play) + templar = Templar(loader=self._loader, variables=all_vars) + + new_play = play.copy() + new_play.post_validate(templar) + class HostVarsManager(SyncManager): pass - def get_hostvars(): - return hostvars - hostvars = HostVars( - play=play, + play=new_play, inventory=self._inventory, variable_manager=self._variable_manager, loader=self._loader, ) - HostVarsManager.register('hostvars', get_hostvars, DictProxy) - hostvars_manager = HostVarsManager() - hostvars_manager.start() + HostVarsManager.register( + 'hostvars', + callable=lambda: hostvars, + # FIXME: this is the list of exposed methods to the DictProxy object, plus our + # one special one (set_variable_manager). There's probably a better way + # to do this with a proper BaseProxy/DictProxy derivative + exposed=('set_variable_manager', '__contains__', '__delitem__', '__getitem__', + '__len__', '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items', + 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'), + ) + self._hostvars_manager = HostVarsManager() + self._hostvars_manager.start() # Fork # of forks, # of hosts or serial, whichever is lowest - contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))] + contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(new_play.hosts))] contenders = [ v for v in contenders if v is not None and v > 0 ] - self._initialize_processes(min(contenders), hostvars_manager) - - if not self._callbacks_loaded: - self.load_callbacks() - - all_vars = self._variable_manager.get_vars(loader=self._loader, play=play) - templar = Templar(loader=self._loader, variables=all_vars) - - new_play = play.copy() - new_play.post_validate(templar) + self._initialize_processes(min(contenders)) play_context = PlayContext(new_play, self._options, self.passwords, self._connection_lockfile.fileno()) for callback_plugin in self._callback_plugins: @@ -240,6 +246,7 @@ class TaskQueueManager: # and run the play using the strategy and cleanup on way out play_return = strategy.run(iterator, play_context) self._cleanup_processes() + self._hostvars_manager.shutdown() return play_return def cleanup(self): diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index 501f8a6486..f1f4650529 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -153,24 +153,6 @@ class StrategyBase: # way to share them with the forked processes shared_loader_obj = SharedPluginLoaderObj() - def dump_dict(obj, indent=0): - print("%sdict:" % (" "*indent,)) - for k in obj.keys(): - dump_vars(obj[k], indent) - - def dump_list(obj, indent=0): - print("%slist:" % (" "*indent,)) - for item in obj: - dump_vars(item, indent) - - def dump_vars(obj, indent=0): - if isinstance(obj, dict): - dump_dict(obj, indent+2) - elif isinstance(obj, list): - dump_list(obj, indent+2) - else: - print("%svar: %s (%s)" % (" "*indent, obj, type(obj))) - # compress (and convert) the data if so configured, which can # help a lot when the variable dictionary is huge. We pop the # hostvars out of the task variables right now, due to the fact @@ -187,36 +169,6 @@ class StrategyBase: zip_vars = task_vars # noqa (pyflakes false positive because task_vars is deleted in the conditional above) # and queue the task - if False: #C.DEFAULT_DEBUG: - print("vars has %d keys" % len(zip_vars.keys())) - print("vars are:") - dump_vars(zip_vars) - - import datetime - import pickle - import cPickle - p1 = datetime.datetime.now() - d1 = cPickle.dumps(host, pickle.HIGHEST_PROTOCOL) - p2 = datetime.datetime.now() - d2 = cPickle.dumps(task, pickle.HIGHEST_PROTOCOL) - p3 = datetime.datetime.now() - d3 = cPickle.dumps(self._loader.get_basedir(), pickle.HIGHEST_PROTOCOL) - p4 = datetime.datetime.now() - d4 = cPickle.dumps(zip_vars, pickle.HIGHEST_PROTOCOL) - p5 = datetime.datetime.now() - d5 = cPickle.dumps(hostvars, pickle.HIGHEST_PROTOCOL) - p6 = datetime.datetime.now() - d6 = cPickle.dumps(play_context, pickle.HIGHEST_PROTOCOL) - p7 = datetime.datetime.now() - d7 = cPickle.dumps(shared_loader_obj, pickle.HIGHEST_PROTOCOL) - p8 = datetime.datetime.now() - print("time to serialize host: %s (size: %s)" % (p2-p1, len(d1))) - print("time to serialize task: %s (size: %s)" % (p3-p2, len(d2))) - print("time to serialize loader basedir: %s (size: %s)" % (p4-p3, len(d3))) - print("time to serialize vars: %s (size: %s)" % (p5-p4, len(d4))) - print("time to serialize hostvars: %s (size: %s)" % (p6-p5, len(d5))) - print("time to serialize play context: %s (size: %s)" % (p7-p6, len(d6))) - print("time to serialize shared plugin loader: %s (size: %s)" % (p8-p7, len(d7))) main_q.put((host, task, self._loader.get_basedir(), zip_vars, compressed_vars, play_context, shared_loader_obj)) self._pending_results += 1 @@ -321,6 +273,7 @@ class StrategyBase: var_value = wrap_var(result[3]) self._variable_manager.set_nonpersistent_facts(host, {var_name: var_value}) + self._tqm._hostvars_manager.hostvars().set_variable_manager(self._variable_manager) elif result[0] in ('set_host_var', 'set_host_facts'): host = result[1] @@ -351,6 +304,7 @@ class StrategyBase: self._variable_manager.set_nonpersistent_facts(target_host, facts) else: self._variable_manager.set_host_facts(target_host, facts) + self._tqm._hostvars_manager.hostvars().set_variable_manager(self._variable_manager) else: raise AnsibleError("unknown result message received: %s" % result[0]) diff --git a/lib/ansible/vars/hostvars.py b/lib/ansible/vars/hostvars.py index 8829526c9f..3846b5117c 100644 --- a/lib/ansible/vars/hostvars.py +++ b/lib/ansible/vars/hostvars.py @@ -46,7 +46,6 @@ __all__ = ['HostVars'] class HostVars(collections.Mapping): ''' A special view of vars_cache that adds values from the inventory when needed. ''' - _exposed_ = ('__getitem__', '__contains__', '__iter__', '__len__') def __init__(self, play, inventory, variable_manager, loader): self._lookup = dict() self._inventory = inventory @@ -55,6 +54,9 @@ class HostVars(collections.Mapping): self._variable_manager = variable_manager self._cached_result = dict() + def set_variable_manager(self, variable_manager): + self._variable_manager = variable_manager + def _find_host(self, host_name): return self._inventory.get_host(host_name) @@ -66,18 +68,18 @@ class HostVars(collections.Mapping): data = self._variable_manager.get_vars(loader=self._loader, host=host, play=self._play, include_hostvars=False) # Using cache in order to avoid template call - sha1_hash = sha1(str(data).encode('utf-8')).hexdigest() - if sha1_hash in self._cached_result: - result = self._cached_result[sha1_hash] - else: - templar = Templar(variables=data, loader=self._loader) - result = templar.template(data, fail_on_undefined=False, static_vars=STATIC_VARS) - self._cached_result[sha1_hash] = result - - return result + #sha1_hash = sha1(str(data).encode('utf-8')).hexdigest() + #if sha1_hash in self._cached_result: + # result = self._cached_result[sha1_hash] + #else: + # templar = Templar(variables=data, loader=self._loader) + # result = templar.template(data, fail_on_undefined=False, static_vars=STATIC_VARS) + # self._cached_result[sha1_hash] = result + #return result + return data def __contains__(self, host_name): - return True #self._find_host(host_name) is not None + return self._find_host(host_name) is not None def __iter__(self): for host in self._inventory.get_hosts(ignore_limits_and_restrictions=True): |