summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjctanner <tanner.jc@gmail.com>2016-07-14 16:37:35 -0400
committerJames Tanner <tanner.jc@gmail.com>2016-07-14 17:31:09 -0400
commitb7479a1dc666ada3db68d5cb6b74c159105e1807 (patch)
tree99cc10e76d1988af436eaf227c77797b5b5789ea
parentfb6e58e88882b73c2fddb7664bf163730efa15d4 (diff)
downloadansible-b7479a1dc666ada3db68d5cb6b74c159105e1807.tar.gz
Add a function to check for killed processes in all strategies (#16684)
* Add a function to check for killed processes so that if any threads are sigkilled or sigtermed, the entire playbook execution is aborted. (cherry picked from commit 238c6461f643d7610896c89c08f0e57eff16d0e5)
-rw-r--r--lib/ansible/executor/task_queue_manager.py12
-rw-r--r--lib/ansible/plugins/strategy/__init__.py7
-rw-r--r--test/units/plugins/strategies/test_strategy_base.py6
3 files changed, 23 insertions, 2 deletions
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index 5415411c31..95d3ce6833 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -309,6 +309,18 @@ class TaskQueueManager:
def terminate(self):
self._terminated = True
+ def has_dead_workers(self):
+
+ # [<WorkerProcess(WorkerProcess-2, stopped[SIGKILL])>,
+ # <WorkerProcess(WorkerProcess-2, stopped[SIGTERM])>
+
+ defunct = False
+ for idx,x in enumerate(self._workers):
+ if hasattr(x[0], 'exitcode'):
+ if x[0].exitcode in [-9, -15]:
+ defunct = True
+ return defunct
+
def send_callback(self, method_name, *args, **kwargs):
for callback_plugin in [self._stdout_callback] + self._callback_plugins:
# a plugin that set self.disabled to True will not be called
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index 2ba53aef47..785c045240 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -443,9 +443,14 @@ class StrategyBase:
display.debug("waiting for pending results...")
while self._pending_results > 0 and not self._tqm._terminated:
+
+ if self._tqm.has_dead_workers():
+ raise AnsibleError("A worker was found in a dead state")
+
results = self._process_pending_results(iterator)
ret_results.extend(results)
- time.sleep(0.0001)
+ time.sleep(0.005)
+
display.debug("no more pending results, returning what we have")
return ret_results
diff --git a/test/units/plugins/strategies/test_strategy_base.py b/test/units/plugins/strategies/test_strategy_base.py
index 0c0ad108bb..35519ec9d9 100644
--- a/test/units/plugins/strategies/test_strategy_base.py
+++ b/test/units/plugins/strategies/test_strategy_base.py
@@ -162,7 +162,7 @@ class TestStrategyBase(unittest.TestCase):
raise Queue.Empty
else:
return queue_items.pop()
-
+
mock_queue = MagicMock()
mock_queue.empty.side_effect = _queue_empty
mock_queue.get.side_effect = _queue_get
@@ -228,6 +228,10 @@ class TestStrategyBase(unittest.TestCase):
strategy_base._variable_manager = mock_var_mgr
strategy_base._blocked_hosts = dict()
+ def _has_dead_workers():
+ return False
+
+ strategy_base._tqm.has_dead_workers = _has_dead_workers
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0)