diff options
author | James Cammarata <jimi@sngx.net> | 2016-09-14 10:12:37 -0500 |
---|---|---|
committer | James Cammarata <jimi@sngx.net> | 2016-09-15 00:47:47 -0500 |
commit | 16e0cfa3181ca55d7eb25006117c904da9170580 (patch) | |
tree | 029e883dd7df31416e691ce7204cb00e70837370 | |
parent | 600915aa97cd70dc3b6c5e5a3d1944f48c14597e (diff) | |
download | ansible-testing.tar.gz |
Testing threading hang fixestesting
-rw-r--r-- | lib/ansible/executor/action_write_locks.py | 5 | ||||
-rw-r--r-- | lib/ansible/executor/process/worker.py | 20 | ||||
-rw-r--r-- | lib/ansible/executor/task_queue_manager.py | 76 | ||||
-rw-r--r-- | lib/ansible/plugins/strategy/__init__.py | 6 | ||||
-rw-r--r-- | lib/ansible/plugins/strategy/linear.py | 3 | ||||
-rw-r--r-- | shippable.yml | 49 |
6 files changed, 85 insertions, 74 deletions
diff --git a/lib/ansible/executor/action_write_locks.py b/lib/ansible/executor/action_write_locks.py index 413d56d9d7..124904391f 100644 --- a/lib/ansible/executor/action_write_locks.py +++ b/lib/ansible/executor/action_write_locks.py @@ -20,6 +20,7 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type from multiprocessing import Lock +from multiprocessing.util import register_after_fork from ansible.module_utils.facts import Facts if 'action_write_locks' not in globals(): @@ -39,5 +40,7 @@ if 'action_write_locks' not in globals(): mods = set(p['name'] for p in Facts.PKG_MGRS) mods.update(('copy', 'file', 'setup', 'slurp', 'stat')) for mod_name in mods: - action_write_locks[mod_name] = Lock() + l = Lock() + action_write_locks[mod_name] = l + register_after_fork(l, l.release) diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index d93de24ab3..4b7279ffdb 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -55,8 +55,9 @@ class WorkerProcess(multiprocessing.Process): for reading later. ''' - def __init__(self, rslt_q, play, host, task, task_vars, play_context, loader, variable_manager, shared_loader_obj): + def __init__(self, rslt_q, play, host, task, task_vars, play_context, loader, variable_manager, shared_loader_obj, event): + print(u"WORKER STARTING INIT: %s - %s" % (to_text(host), to_text(task))) super(WorkerProcess, self).__init__() # takes a task queue manager as the sole param: self._rslt_q = rslt_q @@ -67,6 +68,7 @@ class WorkerProcess(multiprocessing.Process): self._loader = loader self._variable_manager = variable_manager self._shared_loader_obj = shared_loader_obj + self._event = event self._task_vars = task_vars @@ -85,6 +87,15 @@ class WorkerProcess(multiprocessing.Process): except (AttributeError, ValueError): # couldn't get stdin's fileno, so we just carry on pass + print(u"WORKER DONE WITH INIT: %s - %s" % (to_text(host), to_text(task))) + + def _bootstrap(self): + print(u"WORKER BOOTSTRAPPING: %s - %s" % (to_text(self._host), to_text(self._task))) + return super(WorkerProcess, self)._bootstrap() + + def start(self, tqm): + print(u"WORKER CALLING START: %s - %s" % (to_text(self._host), to_text(self._task))) + super(WorkerProcess, self).start() def run(self): ''' @@ -93,6 +104,8 @@ class WorkerProcess(multiprocessing.Process): signify that they are ready for their next task. ''' + print(u"WORKER STARTING RUN: %s - %s" % (to_text(self._host), to_text(self._task))) + self._event.set() #import cProfile, pstats, StringIO #pr = cProfile.Profile() #pr.enable() @@ -103,6 +116,7 @@ class WorkerProcess(multiprocessing.Process): try: # execute the task and build a TaskResult from the result display.debug("running TaskExecutor() for %s/%s" % (self._host, self._task)) + print(u"WORKER RUNNING TASK: %s - %s" % (to_text(self._host), to_text(self._task))) executor_result = TaskExecutor( self._host, self._task, @@ -113,6 +127,7 @@ class WorkerProcess(multiprocessing.Process): self._shared_loader_obj, self._rslt_q ).run() + print(u"WORKER DONE RUNNING TASK: %s - %s" % (to_text(self._host), to_text(self._task))) display.debug("done running TaskExecutor() for %s/%s" % (self._host, self._task)) self._host.vars = dict() @@ -131,6 +146,8 @@ class WorkerProcess(multiprocessing.Process): self._rslt_q.put(task_result, block=False) except Exception as e: + print(u"WORKER EXCEPTION: %s" % to_text(e)) + print(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc())) if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound): try: self._host.vars = dict() @@ -151,4 +168,5 @@ class WorkerProcess(multiprocessing.Process): #with open('worker_%06d.stats' % os.getpid(), 'w') as f: # f.write(s.getvalue()) + print(u"WORKER DONE: %s - %s" % (to_text(self._host), to_text(self._task))) sys.exit(0) diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 6744367363..95c33030fe 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -26,6 +26,7 @@ import threading import time from collections import deque +from multiprocessing.util import register_after_fork from ansible import constants as C from ansible.compat.six import string_types @@ -126,6 +127,9 @@ class TaskQueueManager: # the "queue" for the background thread to use self._queued_tasks = deque() self._queued_tasks_lock = threading.Lock() + register_after_fork(self._queued_tasks_lock, self._queued_tasks_lock.release) + self._thread_event = threading.Event() + self._thread_event.set() # the background queuing thread self._queue_thread = None @@ -137,58 +141,74 @@ class TaskQueueManager: # plugins for inter-process locking. self._connection_lockfile = tempfile.TemporaryFile() - def _queue_thread_main(self): + @staticmethod + def _queue_thread_main(tqm): # create a dummy object with plugin loaders set as an easier # way to share them with the forked processes shared_loader_obj = SharedPluginLoaderObj() display.debug("queuing thread starting") - while not self._terminated: + print_count = 0 + while not tqm._terminated and tqm._thread_event.is_set(): available_workers = [] - for idx, entry in enumerate(self._workers): + for idx, entry in enumerate(tqm._workers): (worker_prc, _) = entry if worker_prc is None or not worker_prc.is_alive(): available_workers.append(idx) - if len(available_workers) == 0: + if len(available_workers) == 0 or len(tqm._queued_tasks) == 0: + print_count += 1 + if print_count > 100: + print("available workers: %s/%s, len queued tasks: %s" % (len(available_workers), len(tqm._workers), len(tqm._queued_tasks))) + print_count = 0 time.sleep(0.01) continue for worker_idx in available_workers: try: - self._queued_tasks_lock.acquire() - (host, task, task_vars, play_context) = self._queued_tasks.pop() + tqm._queued_tasks_lock.acquire() + (host, task, task_vars, play_context) = tqm._queued_tasks.pop() except IndexError: break finally: - self._queued_tasks_lock.release() + tqm._queued_tasks_lock.release() if task.action not in action_write_locks.action_write_locks: display.debug('Creating lock for %s' % task.action) action_write_locks.action_write_locks[task.action] = multiprocessing.Lock() + register_after_fork(action_write_locks.action_write_locks[task.action], action_write_locks.action_write_locks[task.action].release) try: + e = multiprocessing.Event() worker_prc = WorkerProcess( - self._final_q, - self._iterator._play, + tqm._final_q, + tqm._iterator._play, host, task, task_vars, play_context, - self._loader, - self._variable_manager, + tqm._loader, + tqm._variable_manager, shared_loader_obj, + e, ) - self._workers[worker_idx][0] = worker_prc - worker_prc.start() - display.debug("worker is %d (out of %d available)" % (worker_idx+1, len(self._workers))) + print("%s: starting worker" % os.getpid()) + worker_prc.start(tqm) + e.wait() + print("%s: putting worker in slot %s" % (os.getpid(), worker_idx)) + tqm._workers[worker_idx][0] = worker_prc + print("%s: done with worker" % os.getpid()) + display.debug("worker is %d (out of %d available)" % (worker_idx+1, len(tqm._workers))) except (EOFError, IOError, AssertionError) as e: # most likely an abort + print("%s: WORKER QUEUE ERROR: %s" % (os.getpid(), e)) display.debug("got an error while queuing: %s" % e) break + tqm._thread_event.set() + print("%s QUEUE THREAD EXITING" % os.getpid()) display.debug("queuing thread exiting") def queue_task(self, host, task, task_vars, play_context): @@ -197,18 +217,20 @@ class TaskQueueManager: self._queued_tasks_lock.release() def queue_multiple_tasks(self, items, play_context): - for item in items: - (host, task, task_vars) = item + + try: self._queued_tasks_lock.acquire() - self._queued_tasks.append((host, task, task_vars, play_context)) + for item in items: + (host, task, task_vars) = item + self._queued_tasks.append((host, task, task_vars, play_context)) + finally: self._queued_tasks_lock.release() def _initialize_processes(self, num): self._workers = [] for i in range(num): - rslt_q = multiprocessing.Queue() - self._workers.append([None, rslt_q]) + self._workers.append([None, None]) def _initialize_notified_handlers(self, play): ''' @@ -307,9 +329,17 @@ class TaskQueueManager: if not self._callbacks_loaded: self.load_callbacks() - if self._queue_thread is None: - self._queue_thread = threading.Thread(target=self._queue_thread_main) - self._queue_thread.start() + if self._queue_thread is not None: + self._thread_event.clear() + self._thread_event.wait() + + self._queue_thread = threading.Thread( + target=self._queue_thread_main, + args=(self,), + name="WorkerQueueThread", + ) + self._queue_thread.daemon = True + self._queue_thread.start() all_vars = self._variable_manager.get_vars(loader=self._loader, play=play) templar = Templar(loader=self._loader, variables=all_vars) @@ -399,7 +429,7 @@ class TaskQueueManager: def _cleanup_processes(self): for (worker_prc, rslt_q) in self._workers: - rslt_q.close() + #rslt_q.close() if worker_prc and worker_prc.is_alive(): try: worker_prc.terminate() diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index 65b645a3d0..6dbbabcfc7 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -412,11 +412,17 @@ class StrategyBase: display.debug("waiting for pending results...") dead_check = 10 + print_count = 0 while self._pending_results > 0 and not self._tqm._terminated: results = self._process_pending_results(iterator) ret_results.extend(results) + print_count += 1 + if print_count > 1000: + print("waiting on pending results still: %s" % self._pending_results) + print_count = 0 + dead_check -= 1 if dead_check == 0: if self._pending_results > 0 and self._tqm.has_dead_workers(): diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py index a843ae57e6..e741bd78d3 100644 --- a/lib/ansible/plugins/strategy/linear.py +++ b/lib/ansible/plugins/strategy/linear.py @@ -275,7 +275,8 @@ class StrategyModule(StrategyBase): if self._pending_results > 0: results += self._process_pending_results(iterator, one_pass=True) - self._tqm.queue_multiple_tasks(items_to_queue, play_context) + if len(items_to_queue) > 0: + self._tqm.queue_multiple_tasks(items_to_queue, play_context) # go to next host/task group if skip_rest: diff --git a/shippable.yml b/shippable.yml index f10b7b2299..addcf41202 100644 --- a/shippable.yml +++ b/shippable.yml @@ -8,55 +8,8 @@ matrix: exclude: - env: TEST=none include: - - env: TEST=remote TARGET=all PLATFORM=osx VERSION=10.11 + - env: TEST=integration TARGET=other IMAGE=ansible/ansible:ubuntu1404 ANSIBLE_DEBUG=1 TEST_FLAGS='-vv' - - env: TEST=remote TARGET=all PLATFORM=freebsd VERSION=10.3-STABLE - - - env: TEST=remote TARGET=ci_win1 PLATFORM=windows VERSION=2012-R2_RTM - - env: TEST=remote TARGET=ci_win2 PLATFORM=windows VERSION=2012-R2_RTM - - env: TEST=remote TARGET=ci_win3 PLATFORM=windows VERSION=2012-R2_RTM - - - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:centos6 - - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:centos7 - - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:fedora-rawhide - - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:fedora23 - - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:opensuseleap - - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:ubuntu1204 PRIVILEGED=true - - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:ubuntu1404 PRIVILEGED=true - - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:ubuntu1604 - - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:ubuntu1604py3 PYTHON3=1 - - - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:centos6 - - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:centos7 - - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:fedora-rawhide - - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:fedora23 - - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:opensuseleap - - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:ubuntu1204 - - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:ubuntu1404 - - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:ubuntu1604 - - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:ubuntu1604py3 PYTHON3=1 - - - env: TEST=integration TARGET=other IMAGE=ansible/ansible:centos6 - - env: TEST=integration TARGET=other IMAGE=ansible/ansible:centos7 - - env: TEST=integration TARGET=other IMAGE=ansible/ansible:fedora-rawhide - - env: TEST=integration TARGET=other IMAGE=ansible/ansible:fedora23 - - env: TEST=integration TARGET=other IMAGE=ansible/ansible:opensuseleap - - env: TEST=integration TARGET=other IMAGE=ansible/ansible:ubuntu1204 - - env: TEST=integration TARGET=other IMAGE=ansible/ansible:ubuntu1404 - - env: TEST=integration TARGET=other IMAGE=ansible/ansible:ubuntu1604 - - env: TEST=integration TARGET=other IMAGE=ansible/ansible:ubuntu1604py3 PYTHON3=1 - - - env: TEST=sanity INSTALL_DEPS=1 TOXENV=py24 - python: 2.7 - - env: TEST=sanity INSTALL_DEPS=1 TOXENV=py26 - python: 2.6 - - env: TEST=sanity INSTALL_DEPS=1 TOXENV=py27 - python: 2.7 - - env: TEST=sanity INSTALL_DEPS=1 TOXENV=py35 - python: 3.5 - - - env: TEST=code-smell INSTALL_DEPS=1 - python: 2.7 build: pre_ci_boot: options: "--privileged=false --net=bridge" |