summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Cammarata <jimi@sngx.net>2016-07-13 12:44:36 -0500
committerJames Cammarata <jimi@sngx.net>2016-07-13 12:44:36 -0500
commit0902e54ac8a289c1e3d193725777f4759eb6cb0e (patch)
treebffa99bb2b6e1516cef67368f9fa046e2935c52b
parent3c39bb5633d0cbfa9cf22f9a4038296caef9c622 (diff)
downloadansible-gracefully_handle_killed_subprocesses.tar.gz
Gracefully handle terminated subprocessesgracefully_handle_killed_subprocesses
This patch changes the TQM to track not only the worker proc created, but the host and task assigned to that worker. In the event of kill outside of the control of Ansible, the host/task is failed rather than a complete stop of the playbook.
-rw-r--r--lib/ansible/executor/task_queue_manager.py36
-rw-r--r--lib/ansible/plugins/strategy/__init__.py11
2 files changed, 39 insertions, 8 deletions
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index 8c09151f26..1f74c85fd0 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -23,10 +23,13 @@ import multiprocessing
import os
import tempfile
+from collections import namedtuple
+
from ansible import constants as C
from ansible.errors import AnsibleError
from ansible.executor.play_iterator import PlayIterator
from ansible.executor.process.result import ResultProcess
+from ansible.executor.task_result import TaskResult
from ansible.executor.stats import AggregateStats
from ansible.playbook.block import Block
from ansible.playbook.play_context import PlayContext
@@ -45,6 +48,7 @@ except ImportError:
__all__ = ['TaskQueueManager']
+WorkerSlot = namedtuple("WorkerSlot", "proc host task")
class TaskQueueManager:
@@ -110,7 +114,7 @@ class TaskQueueManager:
for i in range(num):
rslt_q = multiprocessing.Queue()
- self._workers.append([None, rslt_q])
+ self._workers.append([WorkerSlot(None, None, None), rslt_q])
self._result_prc = ResultProcess(self._final_q, self._workers)
self._result_prc.start()
@@ -290,11 +294,11 @@ class TaskQueueManager:
if self._result_prc:
self._result_prc.terminate()
- for (worker_prc, rslt_q) in self._workers:
+ for (worker_slot, rslt_q) in self._workers:
rslt_q.close()
- if worker_prc and worker_prc.is_alive():
+ if worker_slot.proc and worker_slot.proc.is_alive():
try:
- worker_prc.terminate()
+ worker_slot.proc.terminate()
except AttributeError:
pass
@@ -311,11 +315,33 @@ class TaskQueueManager:
return self._loader
def get_workers(self):
- return self._workers[:]
+ return self._workers
def terminate(self):
self._terminated = True
+ def handle_dead_workers(self):
+ display.debug("handling dead workers")
+ for idx, (worker_slot, rslt_q) in enumerate(self._workers):
+ if hasattr(worker_slot.proc, 'exitcode'):
+ if worker_slot.proc.exitcode in (-9, -15):
+ tr = TaskResult(
+ host=worker_slot.host,
+ task=worker_slot.task,
+ return_data=dict(
+ failed=True,
+ msg="The worker process handling this host died or was killed unexpectedly.",
+ ),
+ )
+ rslt_q.put(tr)
+ self._workers[idx][0] = WorkerSlot(None, None, None)
+ display.debug("done handling dead workers")
+
+ display.debug("checking to make sure the result proc is ok")
+ if hasattr(self._result_prc, 'exitcode') and self._result_prc.exitcode in (-9, -15):
+ raise AnsibleError("the result subprocess died unexpectedly, exiting")
+ display.debug("done checking to make sure the result proc is ok")
+
def send_callback(self, method_name, *args, **kwargs):
for callback_plugin in [self._stdout_callback] + self._callback_plugins:
# a plugin that set self.disabled to True will not be called
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index d7441b0908..e44914a3ae 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -33,6 +33,7 @@ from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
from ansible.executor.play_iterator import PlayIterator
from ansible.executor.process.worker import WorkerProcess
+from ansible.executor.task_queue_manager import WorkerSlot
from ansible.executor.task_result import TaskResult
from ansible.inventory.host import Host
from ansible.inventory.group import Group
@@ -188,12 +189,15 @@ class StrategyBase:
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
+ # make sure the worker pool is safe
+ self._tqm.handle_dead_workers()
+
queued = False
while True:
- (worker_prc, rslt_q) = self._workers[self._cur_worker]
- if worker_prc is None or not worker_prc.is_alive():
+ (worker_slot, rslt_q) = self._workers[self._cur_worker]
+ if worker_slot.proc is None or not worker_slot.proc.is_alive():
worker_prc = WorkerProcess(rslt_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
- self._workers[self._cur_worker][0] = worker_prc
+ self._workers[self._cur_worker][0] = WorkerSlot(proc=worker_prc, host=host, task=task)
worker_prc.start()
queued = True
self._cur_worker += 1
@@ -219,6 +223,7 @@ class StrategyBase:
ret_results = []
+ self._tqm.handle_dead_workers()
while not self._final_q.empty() and not self._tqm._terminated:
try:
result = self._final_q.get()