summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/ansible/executor/task_queue_manager.py36
-rw-r--r--lib/ansible/plugins/strategy/__init__.py11
2 files changed, 39 insertions, 8 deletions
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index 8c09151f26..1f74c85fd0 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -23,10 +23,13 @@ import multiprocessing
import os
import tempfile
+from collections import namedtuple
+
from ansible import constants as C
from ansible.errors import AnsibleError
from ansible.executor.play_iterator import PlayIterator
from ansible.executor.process.result import ResultProcess
+from ansible.executor.task_result import TaskResult
from ansible.executor.stats import AggregateStats
from ansible.playbook.block import Block
from ansible.playbook.play_context import PlayContext
@@ -45,6 +48,7 @@ except ImportError:
__all__ = ['TaskQueueManager']
+WorkerSlot = namedtuple("WorkerSlot", "proc host task")
class TaskQueueManager:
@@ -110,7 +114,7 @@ class TaskQueueManager:
for i in range(num):
rslt_q = multiprocessing.Queue()
- self._workers.append([None, rslt_q])
+ self._workers.append([WorkerSlot(None, None, None), rslt_q])
self._result_prc = ResultProcess(self._final_q, self._workers)
self._result_prc.start()
@@ -290,11 +294,11 @@ class TaskQueueManager:
if self._result_prc:
self._result_prc.terminate()
- for (worker_prc, rslt_q) in self._workers:
+ for (worker_slot, rslt_q) in self._workers:
rslt_q.close()
- if worker_prc and worker_prc.is_alive():
+ if worker_slot.proc and worker_slot.proc.is_alive():
try:
- worker_prc.terminate()
+ worker_slot.proc.terminate()
except AttributeError:
pass
@@ -311,11 +315,33 @@ class TaskQueueManager:
return self._loader
def get_workers(self):
- return self._workers[:]
+ return self._workers
def terminate(self):
self._terminated = True
+ def handle_dead_workers(self):
+ display.debug("handling dead workers")
+ for idx, (worker_slot, rslt_q) in enumerate(self._workers):
+ if hasattr(worker_slot.proc, 'exitcode'):
+ if worker_slot.proc.exitcode in (-9, -15):
+ tr = TaskResult(
+ host=worker_slot.host,
+ task=worker_slot.task,
+ return_data=dict(
+ failed=True,
+ msg="The worker process handling this host died or was killed unexpectedly.",
+ ),
+ )
+ rslt_q.put(tr)
+ self._workers[idx][0] = WorkerSlot(None, None, None)
+ display.debug("done handling dead workers")
+
+ display.debug("checking to make sure the result proc is ok")
+ if hasattr(self._result_prc, 'exitcode') and self._result_prc.exitcode in (-9, -15):
+ raise AnsibleError("the result subprocess died unexpectedly, exiting")
+ display.debug("done checking to make sure the result proc is ok")
+
def send_callback(self, method_name, *args, **kwargs):
for callback_plugin in [self._stdout_callback] + self._callback_plugins:
# a plugin that set self.disabled to True will not be called
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index d7441b0908..e44914a3ae 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -33,6 +33,7 @@ from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
from ansible.executor.play_iterator import PlayIterator
from ansible.executor.process.worker import WorkerProcess
+from ansible.executor.task_queue_manager import WorkerSlot
from ansible.executor.task_result import TaskResult
from ansible.inventory.host import Host
from ansible.inventory.group import Group
@@ -188,12 +189,15 @@ class StrategyBase:
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
+ # make sure the worker pool is safe
+ self._tqm.handle_dead_workers()
+
queued = False
while True:
- (worker_prc, rslt_q) = self._workers[self._cur_worker]
- if worker_prc is None or not worker_prc.is_alive():
+ (worker_slot, rslt_q) = self._workers[self._cur_worker]
+ if worker_slot.proc is None or not worker_slot.proc.is_alive():
worker_prc = WorkerProcess(rslt_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
- self._workers[self._cur_worker][0] = worker_prc
+ self._workers[self._cur_worker][0] = WorkerSlot(proc=worker_prc, host=host, task=task)
worker_prc.start()
queued = True
self._cur_worker += 1
@@ -219,6 +223,7 @@ class StrategyBase:
ret_results = []
+ self._tqm.handle_dead_workers()
while not self._final_q.empty() and not self._tqm._terminated:
try:
result = self._final_q.get()