summaryrefslogtreecommitdiff
path: root/lib/ansible/executor/task_queue_manager.py
diff options
context:
space:
mode:
authorJames Cammarata <jimi@sngx.net>2016-09-14 10:12:37 -0500
committerJames Cammarata <jimi@sngx.net>2016-09-15 00:47:47 -0500
commit16e0cfa3181ca55d7eb25006117c904da9170580 (patch)
tree029e883dd7df31416e691ce7204cb00e70837370 /lib/ansible/executor/task_queue_manager.py
parent600915aa97cd70dc3b6c5e5a3d1944f48c14597e (diff)
downloadansible-testing.tar.gz
Testing threading hang fixestesting
Diffstat (limited to 'lib/ansible/executor/task_queue_manager.py')
-rw-r--r--lib/ansible/executor/task_queue_manager.py76
1 files changed, 53 insertions, 23 deletions
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index 6744367363..95c33030fe 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -26,6 +26,7 @@ import threading
import time
from collections import deque
+from multiprocessing.util import register_after_fork
from ansible import constants as C
from ansible.compat.six import string_types
@@ -126,6 +127,9 @@ class TaskQueueManager:
# the "queue" for the background thread to use
self._queued_tasks = deque()
self._queued_tasks_lock = threading.Lock()
+ register_after_fork(self._queued_tasks_lock, self._queued_tasks_lock.release)
+ self._thread_event = threading.Event()
+ self._thread_event.set()
# the background queuing thread
self._queue_thread = None
@@ -137,58 +141,74 @@ class TaskQueueManager:
# plugins for inter-process locking.
self._connection_lockfile = tempfile.TemporaryFile()
- def _queue_thread_main(self):
+ @staticmethod
+ def _queue_thread_main(tqm):
# create a dummy object with plugin loaders set as an easier
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
display.debug("queuing thread starting")
- while not self._terminated:
+ print_count = 0
+ while not tqm._terminated and tqm._thread_event.is_set():
available_workers = []
- for idx, entry in enumerate(self._workers):
+ for idx, entry in enumerate(tqm._workers):
(worker_prc, _) = entry
if worker_prc is None or not worker_prc.is_alive():
available_workers.append(idx)
- if len(available_workers) == 0:
+ if len(available_workers) == 0 or len(tqm._queued_tasks) == 0:
+ print_count += 1
+ if print_count > 100:
+ print("available workers: %s/%s, len queued tasks: %s" % (len(available_workers), len(tqm._workers), len(tqm._queued_tasks)))
+ print_count = 0
time.sleep(0.01)
continue
for worker_idx in available_workers:
try:
- self._queued_tasks_lock.acquire()
- (host, task, task_vars, play_context) = self._queued_tasks.pop()
+ tqm._queued_tasks_lock.acquire()
+ (host, task, task_vars, play_context) = tqm._queued_tasks.pop()
except IndexError:
break
finally:
- self._queued_tasks_lock.release()
+ tqm._queued_tasks_lock.release()
if task.action not in action_write_locks.action_write_locks:
display.debug('Creating lock for %s' % task.action)
action_write_locks.action_write_locks[task.action] = multiprocessing.Lock()
+ register_after_fork(action_write_locks.action_write_locks[task.action], action_write_locks.action_write_locks[task.action].release)
try:
+ e = multiprocessing.Event()
worker_prc = WorkerProcess(
- self._final_q,
- self._iterator._play,
+ tqm._final_q,
+ tqm._iterator._play,
host,
task,
task_vars,
play_context,
- self._loader,
- self._variable_manager,
+ tqm._loader,
+ tqm._variable_manager,
shared_loader_obj,
+ e,
)
- self._workers[worker_idx][0] = worker_prc
- worker_prc.start()
- display.debug("worker is %d (out of %d available)" % (worker_idx+1, len(self._workers)))
+ print("%s: starting worker" % os.getpid())
+ worker_prc.start(tqm)
+ e.wait()
+ print("%s: putting worker in slot %s" % (os.getpid(), worker_idx))
+ tqm._workers[worker_idx][0] = worker_prc
+ print("%s: done with worker" % os.getpid())
+ display.debug("worker is %d (out of %d available)" % (worker_idx+1, len(tqm._workers)))
except (EOFError, IOError, AssertionError) as e:
# most likely an abort
+ print("%s: WORKER QUEUE ERROR: %s" % (os.getpid(), e))
display.debug("got an error while queuing: %s" % e)
break
+ tqm._thread_event.set()
+ print("%s QUEUE THREAD EXITING" % os.getpid())
display.debug("queuing thread exiting")
def queue_task(self, host, task, task_vars, play_context):
@@ -197,18 +217,20 @@ class TaskQueueManager:
self._queued_tasks_lock.release()
def queue_multiple_tasks(self, items, play_context):
- for item in items:
- (host, task, task_vars) = item
+
+ try:
self._queued_tasks_lock.acquire()
- self._queued_tasks.append((host, task, task_vars, play_context))
+ for item in items:
+ (host, task, task_vars) = item
+ self._queued_tasks.append((host, task, task_vars, play_context))
+ finally:
self._queued_tasks_lock.release()
def _initialize_processes(self, num):
self._workers = []
for i in range(num):
- rslt_q = multiprocessing.Queue()
- self._workers.append([None, rslt_q])
+ self._workers.append([None, None])
def _initialize_notified_handlers(self, play):
'''
@@ -307,9 +329,17 @@ class TaskQueueManager:
if not self._callbacks_loaded:
self.load_callbacks()
- if self._queue_thread is None:
- self._queue_thread = threading.Thread(target=self._queue_thread_main)
- self._queue_thread.start()
+ if self._queue_thread is not None:
+ self._thread_event.clear()
+ self._thread_event.wait()
+
+ self._queue_thread = threading.Thread(
+ target=self._queue_thread_main,
+ args=(self,),
+ name="WorkerQueueThread",
+ )
+ self._queue_thread.daemon = True
+ self._queue_thread.start()
all_vars = self._variable_manager.get_vars(loader=self._loader, play=play)
templar = Templar(loader=self._loader, variables=all_vars)
@@ -399,7 +429,7 @@ class TaskQueueManager:
def _cleanup_processes(self):
for (worker_prc, rslt_q) in self._workers:
- rslt_q.close()
+ #rslt_q.close()
if worker_prc and worker_prc.is_alive():
try:
worker_prc.terminate()