diff options
-rw-r--r-- | lib/ansible/executor/task_queue_manager.py | 36 | ||||
-rw-r--r-- | lib/ansible/plugins/strategy/__init__.py | 11 |
2 files changed, 39 insertions, 8 deletions
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 8c09151f26..1f74c85fd0 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -23,10 +23,13 @@ import multiprocessing import os import tempfile +from collections import namedtuple + from ansible import constants as C from ansible.errors import AnsibleError from ansible.executor.play_iterator import PlayIterator from ansible.executor.process.result import ResultProcess +from ansible.executor.task_result import TaskResult from ansible.executor.stats import AggregateStats from ansible.playbook.block import Block from ansible.playbook.play_context import PlayContext @@ -45,6 +48,7 @@ except ImportError: __all__ = ['TaskQueueManager'] +WorkerSlot = namedtuple("WorkerSlot", "proc host task") class TaskQueueManager: @@ -110,7 +114,7 @@ class TaskQueueManager: for i in range(num): rslt_q = multiprocessing.Queue() - self._workers.append([None, rslt_q]) + self._workers.append([WorkerSlot(None, None, None), rslt_q]) self._result_prc = ResultProcess(self._final_q, self._workers) self._result_prc.start() @@ -290,11 +294,11 @@ class TaskQueueManager: if self._result_prc: self._result_prc.terminate() - for (worker_prc, rslt_q) in self._workers: + for (worker_slot, rslt_q) in self._workers: rslt_q.close() - if worker_prc and worker_prc.is_alive(): + if worker_slot.proc and worker_slot.proc.is_alive(): try: - worker_prc.terminate() + worker_slot.proc.terminate() except AttributeError: pass @@ -311,11 +315,33 @@ class TaskQueueManager: return self._loader def get_workers(self): - return self._workers[:] + return self._workers def terminate(self): self._terminated = True + def handle_dead_workers(self): + display.debug("handling dead workers") + for idx, (worker_slot, rslt_q) in enumerate(self._workers): + if hasattr(worker_slot.proc, 'exitcode'): + if worker_slot.proc.exitcode in (-9, -15): + tr = TaskResult( + host=worker_slot.host, + task=worker_slot.task, + return_data=dict( + failed=True, + msg="The worker process handling this host died or was killed unexpectedly.", + ), + ) + rslt_q.put(tr) + self._workers[idx][0] = WorkerSlot(None, None, None) + display.debug("done handling dead workers") + + display.debug("checking to make sure the result proc is ok") + if hasattr(self._result_prc, 'exitcode') and self._result_prc.exitcode in (-9, -15): + raise AnsibleError("the result subprocess died unexpectedly, exiting") + display.debug("done checking to make sure the result proc is ok") + def send_callback(self, method_name, *args, **kwargs): for callback_plugin in [self._stdout_callback] + self._callback_plugins: # a plugin that set self.disabled to True will not be called diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index d7441b0908..e44914a3ae 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -33,6 +33,7 @@ from ansible import constants as C from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable from ansible.executor.play_iterator import PlayIterator from ansible.executor.process.worker import WorkerProcess +from ansible.executor.task_queue_manager import WorkerSlot from ansible.executor.task_result import TaskResult from ansible.inventory.host import Host from ansible.inventory.group import Group @@ -188,12 +189,15 @@ class StrategyBase: # way to share them with the forked processes shared_loader_obj = SharedPluginLoaderObj() + # make sure the worker pool is safe + self._tqm.handle_dead_workers() + queued = False while True: - (worker_prc, rslt_q) = self._workers[self._cur_worker] - if worker_prc is None or not worker_prc.is_alive(): + (worker_slot, rslt_q) = self._workers[self._cur_worker] + if worker_slot.proc is None or not worker_slot.proc.is_alive(): worker_prc = WorkerProcess(rslt_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) - self._workers[self._cur_worker][0] = worker_prc + self._workers[self._cur_worker][0] = WorkerSlot(proc=worker_prc, host=host, task=task) worker_prc.start() queued = True self._cur_worker += 1 @@ -219,6 +223,7 @@ class StrategyBase: ret_results = [] + self._tqm.handle_dead_workers() while not self._final_q.empty() and not self._tqm._terminated: try: result = self._final_q.get() |