diff options
author | James Cammarata <jimi@sngx.net> | 2016-09-14 10:12:37 -0500 |
---|---|---|
committer | James Cammarata <jimi@sngx.net> | 2016-09-15 00:47:47 -0500 |
commit | 16e0cfa3181ca55d7eb25006117c904da9170580 (patch) | |
tree | 029e883dd7df31416e691ce7204cb00e70837370 /lib/ansible/executor/task_queue_manager.py | |
parent | 600915aa97cd70dc3b6c5e5a3d1944f48c14597e (diff) | |
download | ansible-testing.tar.gz |
Testing threading hang fixestesting
Diffstat (limited to 'lib/ansible/executor/task_queue_manager.py')
-rw-r--r-- | lib/ansible/executor/task_queue_manager.py | 76 |
1 files changed, 53 insertions, 23 deletions
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 6744367363..95c33030fe 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -26,6 +26,7 @@ import threading import time from collections import deque +from multiprocessing.util import register_after_fork from ansible import constants as C from ansible.compat.six import string_types @@ -126,6 +127,9 @@ class TaskQueueManager: # the "queue" for the background thread to use self._queued_tasks = deque() self._queued_tasks_lock = threading.Lock() + register_after_fork(self._queued_tasks_lock, self._queued_tasks_lock.release) + self._thread_event = threading.Event() + self._thread_event.set() # the background queuing thread self._queue_thread = None @@ -137,58 +141,74 @@ class TaskQueueManager: # plugins for inter-process locking. self._connection_lockfile = tempfile.TemporaryFile() - def _queue_thread_main(self): + @staticmethod + def _queue_thread_main(tqm): # create a dummy object with plugin loaders set as an easier # way to share them with the forked processes shared_loader_obj = SharedPluginLoaderObj() display.debug("queuing thread starting") - while not self._terminated: + print_count = 0 + while not tqm._terminated and tqm._thread_event.is_set(): available_workers = [] - for idx, entry in enumerate(self._workers): + for idx, entry in enumerate(tqm._workers): (worker_prc, _) = entry if worker_prc is None or not worker_prc.is_alive(): available_workers.append(idx) - if len(available_workers) == 0: + if len(available_workers) == 0 or len(tqm._queued_tasks) == 0: + print_count += 1 + if print_count > 100: + print("available workers: %s/%s, len queued tasks: %s" % (len(available_workers), len(tqm._workers), len(tqm._queued_tasks))) + print_count = 0 time.sleep(0.01) continue for worker_idx in available_workers: try: - self._queued_tasks_lock.acquire() - (host, task, task_vars, play_context) = self._queued_tasks.pop() + tqm._queued_tasks_lock.acquire() + (host, task, task_vars, play_context) = tqm._queued_tasks.pop() except IndexError: break finally: - self._queued_tasks_lock.release() + tqm._queued_tasks_lock.release() if task.action not in action_write_locks.action_write_locks: display.debug('Creating lock for %s' % task.action) action_write_locks.action_write_locks[task.action] = multiprocessing.Lock() + register_after_fork(action_write_locks.action_write_locks[task.action], action_write_locks.action_write_locks[task.action].release) try: + e = multiprocessing.Event() worker_prc = WorkerProcess( - self._final_q, - self._iterator._play, + tqm._final_q, + tqm._iterator._play, host, task, task_vars, play_context, - self._loader, - self._variable_manager, + tqm._loader, + tqm._variable_manager, shared_loader_obj, + e, ) - self._workers[worker_idx][0] = worker_prc - worker_prc.start() - display.debug("worker is %d (out of %d available)" % (worker_idx+1, len(self._workers))) + print("%s: starting worker" % os.getpid()) + worker_prc.start(tqm) + e.wait() + print("%s: putting worker in slot %s" % (os.getpid(), worker_idx)) + tqm._workers[worker_idx][0] = worker_prc + print("%s: done with worker" % os.getpid()) + display.debug("worker is %d (out of %d available)" % (worker_idx+1, len(tqm._workers))) except (EOFError, IOError, AssertionError) as e: # most likely an abort + print("%s: WORKER QUEUE ERROR: %s" % (os.getpid(), e)) display.debug("got an error while queuing: %s" % e) break + tqm._thread_event.set() + print("%s QUEUE THREAD EXITING" % os.getpid()) display.debug("queuing thread exiting") def queue_task(self, host, task, task_vars, play_context): @@ -197,18 +217,20 @@ class TaskQueueManager: self._queued_tasks_lock.release() def queue_multiple_tasks(self, items, play_context): - for item in items: - (host, task, task_vars) = item + + try: self._queued_tasks_lock.acquire() - self._queued_tasks.append((host, task, task_vars, play_context)) + for item in items: + (host, task, task_vars) = item + self._queued_tasks.append((host, task, task_vars, play_context)) + finally: self._queued_tasks_lock.release() def _initialize_processes(self, num): self._workers = [] for i in range(num): - rslt_q = multiprocessing.Queue() - self._workers.append([None, rslt_q]) + self._workers.append([None, None]) def _initialize_notified_handlers(self, play): ''' @@ -307,9 +329,17 @@ class TaskQueueManager: if not self._callbacks_loaded: self.load_callbacks() - if self._queue_thread is None: - self._queue_thread = threading.Thread(target=self._queue_thread_main) - self._queue_thread.start() + if self._queue_thread is not None: + self._thread_event.clear() + self._thread_event.wait() + + self._queue_thread = threading.Thread( + target=self._queue_thread_main, + args=(self,), + name="WorkerQueueThread", + ) + self._queue_thread.daemon = True + self._queue_thread.start() all_vars = self._variable_manager.get_vars(loader=self._loader, play=play) templar = Templar(loader=self._loader, variables=all_vars) @@ -399,7 +429,7 @@ class TaskQueueManager: def _cleanup_processes(self): for (worker_prc, rslt_q) in self._workers: - rslt_q.close() + #rslt_q.close() if worker_prc and worker_prc.is_alive(): try: worker_prc.terminate() |