summaryrefslogtreecommitdiff
path: root/lib/ansible/plugins/strategy
diff options
context:
space:
mode:
authorJames Cammarata <jimi@sngx.net>2019-08-28 18:47:39 -0500
committerGitHub <noreply@github.com>2019-08-28 18:47:39 -0500
commit51b33b79c0f2a3dea0ba30f6278ec4d538b5b623 (patch)
tree48f595307e555445cf90473d08d447c7dc5e45ca /lib/ansible/plugins/strategy
parent7d1a981b61eb996c64c406a92f73022d4d3a041b (diff)
downloadansible-51b33b79c0f2a3dea0ba30f6278ec4d538b5b623.tar.gz
T woerner max concurrent (#60702)
* play, block, task: New attribute forks With this it is possible to limit the number of concurrent task runs. forks can now be used in play, block and task. If forks is set in different levels in the chain, then the smallest value will be used for the task. The attribute has been added to the Base class as a list to easily provide all the values that have been set in the different levels of the chain. A warning has been added because of the conflict with run_once. forks will be ignored in this case. The forks limitation in StrategyBase._queue_task is not used for the free strategy. Signed-off-by: Thomas Woerner <twoerner@redhat.com> * Handle forks in free strategy The forks attribute for the free strategy is handled in run in the free StrategyModule. This is dony by counting the amount of tasks where the uuid is the same as the current task, that should be queued next. If this amount is bigger or equal to the forks attribute from the chain (task, block, play), then it will be skipped to the next host. Like it is also done with blocked_hosts. Signed-off-by: Thomas Woerner <twoerner@redhat.com> * Test cases for forks with linear and free strategy With ansible_python_interpreter defined in inventory file using ansible_playbook_python. Signed-off-by: Thomas Woerner <twoerner@redhat.com> * Changing forks keyword to throttle and adding some more docs
Diffstat (limited to 'lib/ansible/plugins/strategy')
-rw-r--r--lib/ansible/plugins/strategy/__init__.py31
-rw-r--r--lib/ansible/plugins/strategy/free.py37
2 files changed, 58 insertions, 10 deletions
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index 3679d4f02b..09a1e0355f 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -155,6 +155,11 @@ class StrategyBase:
code useful to all strategies like running handlers, cleanup actions, etc.
'''
+ # by default, strategies should support throttling but we allow individual
+ # strategies to disable this and either forego supporting it or managing
+ # the throttling internally (as `free` does)
+ ALLOW_BASE_THROTTLING = True
+
def __init__(self, tqm):
self._tqm = tqm
self._inventory = tqm.get_inventory()
@@ -310,6 +315,14 @@ class StrategyBase:
display.debug('Creating lock for %s' % task.action)
action_write_locks.action_write_locks[task.action] = Lock()
+ # create a templar and template things we need later for the queuing process
+ templar = Templar(loader=self._loader, variables=task_vars)
+
+ try:
+ throttle = int(templar.template(task.throttle))
+ except Exception as e:
+ raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e)
+
# and then queue the new task
try:
queued = False
@@ -330,9 +343,25 @@ class StrategyBase:
worker_prc.start()
display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
queued = True
+
self._cur_worker += 1
- if self._cur_worker >= len(self._workers):
+
+ # Determine the "rewind point" of the worker list. This means we start
+ # iterating over the list of workers until the end of the list is found.
+ # Normally, that is simply the length of the workers list (as determined
+ # by the forks or serial setting), however a task/block/play may "throttle"
+ # that limit down.
+ rewind_point = len(self._workers)
+ if throttle > 0 and self.ALLOW_BASE_THROTTLING:
+ if task.run_once:
+ display.debug("Ignoring 'throttle' as 'run_once' is also set for '%s'" % task.get_name())
+ else:
+ if throttle <= rewind_point:
+ display.debug("task: %s, throttle: %d" % (task.get_name(), throttle))
+ rewind_point = throttle
+ if self._cur_worker >= rewind_point:
self._cur_worker = 0
+
if queued:
break
elif self._cur_worker == starting_worker:
diff --git a/lib/ansible/plugins/strategy/free.py b/lib/ansible/plugins/strategy/free.py
index ef998321fe..264c0e1095 100644
--- a/lib/ansible/plugins/strategy/free.py
+++ b/lib/ansible/plugins/strategy/free.py
@@ -47,6 +47,9 @@ display = Display()
class StrategyModule(StrategyBase):
+ # This strategy manages throttling on its own, so we don't want it done in queue_task
+ ALLOW_BASE_THROTTLING = False
+
def _filter_notified_hosts(self, notified_hosts):
'''
Filter notified hosts accordingly to strategy
@@ -118,7 +121,31 @@ class StrategyModule(StrategyBase):
display.debug("this host has work to do", host=host_name)
# check to see if this host is blocked (still executing a previous task)
- if host_name not in self._blocked_hosts or not self._blocked_hosts[host_name]:
+ if (host_name not in self._blocked_hosts or not self._blocked_hosts[host_name]):
+
+ display.debug("getting variables", host=host_name)
+ task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task,
+ _hosts=self._hosts_cache,
+ _hosts_all=self._hosts_cache_all)
+ self.add_tqm_variables(task_vars, play=iterator._play)
+ templar = Templar(loader=self._loader, variables=task_vars)
+ display.debug("done getting variables", host=host_name)
+
+ try:
+ throttle = int(templar.template(task.throttle))
+ except Exception as e:
+ raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e)
+
+ if throttle > 0:
+ same_tasks = 0
+ for worker in self._workers:
+ if worker and worker.is_alive() and worker._task._uuid == task._uuid:
+ same_tasks += 1
+
+ display.debug("task: %s, same_tasks: %d" % (task.get_name(), same_tasks))
+ if same_tasks >= throttle:
+ break
+
# pop the task, mark the host blocked, and queue it
self._blocked_hosts[host_name] = True
(state, task) = iterator.get_next_task_for_host(host)
@@ -130,14 +157,6 @@ class StrategyModule(StrategyBase):
# corresponding action plugin
action = None
- display.debug("getting variables", host=host_name)
- task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task,
- _hosts=self._hosts_cache,
- _hosts_all=self._hosts_cache_all)
- self.add_tqm_variables(task_vars, play=iterator._play)
- templar = Templar(loader=self._loader, variables=task_vars)
- display.debug("done getting variables", host=host_name)
-
try:
task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty')
display.debug("done templating", host=host_name)