diff options
author | James Cammarata <jimi@sngx.net> | 2016-07-13 12:44:36 -0500 |
---|---|---|
committer | James Cammarata <jimi@sngx.net> | 2016-07-13 12:44:36 -0500 |
commit | 0902e54ac8a289c1e3d193725777f4759eb6cb0e (patch) | |
tree | bffa99bb2b6e1516cef67368f9fa046e2935c52b | |
parent | 3c39bb5633d0cbfa9cf22f9a4038296caef9c622 (diff) | |
download | ansible-gracefully_handle_killed_subprocesses.tar.gz |
Gracefully handle terminated subprocessesgracefully_handle_killed_subprocesses
This patch changes the TQM to track not only the worker proc created,
but the host and task assigned to that worker. In the event of kill
outside of the control of Ansible, the host/task is failed rather than
a complete stop of the playbook.
-rw-r--r-- | lib/ansible/executor/task_queue_manager.py | 36 | ||||
-rw-r--r-- | lib/ansible/plugins/strategy/__init__.py | 11 |
2 files changed, 39 insertions, 8 deletions
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 8c09151f26..1f74c85fd0 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -23,10 +23,13 @@ import multiprocessing import os import tempfile +from collections import namedtuple + from ansible import constants as C from ansible.errors import AnsibleError from ansible.executor.play_iterator import PlayIterator from ansible.executor.process.result import ResultProcess +from ansible.executor.task_result import TaskResult from ansible.executor.stats import AggregateStats from ansible.playbook.block import Block from ansible.playbook.play_context import PlayContext @@ -45,6 +48,7 @@ except ImportError: __all__ = ['TaskQueueManager'] +WorkerSlot = namedtuple("WorkerSlot", "proc host task") class TaskQueueManager: @@ -110,7 +114,7 @@ class TaskQueueManager: for i in range(num): rslt_q = multiprocessing.Queue() - self._workers.append([None, rslt_q]) + self._workers.append([WorkerSlot(None, None, None), rslt_q]) self._result_prc = ResultProcess(self._final_q, self._workers) self._result_prc.start() @@ -290,11 +294,11 @@ class TaskQueueManager: if self._result_prc: self._result_prc.terminate() - for (worker_prc, rslt_q) in self._workers: + for (worker_slot, rslt_q) in self._workers: rslt_q.close() - if worker_prc and worker_prc.is_alive(): + if worker_slot.proc and worker_slot.proc.is_alive(): try: - worker_prc.terminate() + worker_slot.proc.terminate() except AttributeError: pass @@ -311,11 +315,33 @@ class TaskQueueManager: return self._loader def get_workers(self): - return self._workers[:] + return self._workers def terminate(self): self._terminated = True + def handle_dead_workers(self): + display.debug("handling dead workers") + for idx, (worker_slot, rslt_q) in enumerate(self._workers): + if hasattr(worker_slot.proc, 'exitcode'): + if worker_slot.proc.exitcode in (-9, -15): + tr = TaskResult( + host=worker_slot.host, + task=worker_slot.task, + return_data=dict( + failed=True, + msg="The worker process handling this host died or was killed unexpectedly.", + ), + ) + rslt_q.put(tr) + self._workers[idx][0] = WorkerSlot(None, None, None) + display.debug("done handling dead workers") + + display.debug("checking to make sure the result proc is ok") + if hasattr(self._result_prc, 'exitcode') and self._result_prc.exitcode in (-9, -15): + raise AnsibleError("the result subprocess died unexpectedly, exiting") + display.debug("done checking to make sure the result proc is ok") + def send_callback(self, method_name, *args, **kwargs): for callback_plugin in [self._stdout_callback] + self._callback_plugins: # a plugin that set self.disabled to True will not be called diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index d7441b0908..e44914a3ae 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -33,6 +33,7 @@ from ansible import constants as C from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable from ansible.executor.play_iterator import PlayIterator from ansible.executor.process.worker import WorkerProcess +from ansible.executor.task_queue_manager import WorkerSlot from ansible.executor.task_result import TaskResult from ansible.inventory.host import Host from ansible.inventory.group import Group @@ -188,12 +189,15 @@ class StrategyBase: # way to share them with the forked processes shared_loader_obj = SharedPluginLoaderObj() + # make sure the worker pool is safe + self._tqm.handle_dead_workers() + queued = False while True: - (worker_prc, rslt_q) = self._workers[self._cur_worker] - if worker_prc is None or not worker_prc.is_alive(): + (worker_slot, rslt_q) = self._workers[self._cur_worker] + if worker_slot.proc is None or not worker_slot.proc.is_alive(): worker_prc = WorkerProcess(rslt_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) - self._workers[self._cur_worker][0] = worker_prc + self._workers[self._cur_worker][0] = WorkerSlot(proc=worker_prc, host=host, task=task) worker_prc.start() queued = True self._cur_worker += 1 @@ -219,6 +223,7 @@ class StrategyBase: ret_results = [] + self._tqm.handle_dead_workers() while not self._final_q.empty() and not self._tqm._terminated: try: result = self._final_q.get() |