summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Cammarata <jimi@sngx.net>2016-09-14 10:12:37 -0500
committerJames Cammarata <jimi@sngx.net>2016-09-15 00:47:47 -0500
commit16e0cfa3181ca55d7eb25006117c904da9170580 (patch)
tree029e883dd7df31416e691ce7204cb00e70837370
parent600915aa97cd70dc3b6c5e5a3d1944f48c14597e (diff)
downloadansible-testing.tar.gz
Testing threading hang fixestesting
-rw-r--r--lib/ansible/executor/action_write_locks.py5
-rw-r--r--lib/ansible/executor/process/worker.py20
-rw-r--r--lib/ansible/executor/task_queue_manager.py76
-rw-r--r--lib/ansible/plugins/strategy/__init__.py6
-rw-r--r--lib/ansible/plugins/strategy/linear.py3
-rw-r--r--shippable.yml49
6 files changed, 85 insertions, 74 deletions
diff --git a/lib/ansible/executor/action_write_locks.py b/lib/ansible/executor/action_write_locks.py
index 413d56d9d7..124904391f 100644
--- a/lib/ansible/executor/action_write_locks.py
+++ b/lib/ansible/executor/action_write_locks.py
@@ -20,6 +20,7 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from multiprocessing import Lock
+from multiprocessing.util import register_after_fork
from ansible.module_utils.facts import Facts
if 'action_write_locks' not in globals():
@@ -39,5 +40,7 @@ if 'action_write_locks' not in globals():
mods = set(p['name'] for p in Facts.PKG_MGRS)
mods.update(('copy', 'file', 'setup', 'slurp', 'stat'))
for mod_name in mods:
- action_write_locks[mod_name] = Lock()
+ l = Lock()
+ action_write_locks[mod_name] = l
+ register_after_fork(l, l.release)
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py
index d93de24ab3..4b7279ffdb 100644
--- a/lib/ansible/executor/process/worker.py
+++ b/lib/ansible/executor/process/worker.py
@@ -55,8 +55,9 @@ class WorkerProcess(multiprocessing.Process):
for reading later.
'''
- def __init__(self, rslt_q, play, host, task, task_vars, play_context, loader, variable_manager, shared_loader_obj):
+ def __init__(self, rslt_q, play, host, task, task_vars, play_context, loader, variable_manager, shared_loader_obj, event):
+ print(u"WORKER STARTING INIT: %s - %s" % (to_text(host), to_text(task)))
super(WorkerProcess, self).__init__()
# takes a task queue manager as the sole param:
self._rslt_q = rslt_q
@@ -67,6 +68,7 @@ class WorkerProcess(multiprocessing.Process):
self._loader = loader
self._variable_manager = variable_manager
self._shared_loader_obj = shared_loader_obj
+ self._event = event
self._task_vars = task_vars
@@ -85,6 +87,15 @@ class WorkerProcess(multiprocessing.Process):
except (AttributeError, ValueError):
# couldn't get stdin's fileno, so we just carry on
pass
+ print(u"WORKER DONE WITH INIT: %s - %s" % (to_text(host), to_text(task)))
+
+ def _bootstrap(self):
+ print(u"WORKER BOOTSTRAPPING: %s - %s" % (to_text(self._host), to_text(self._task)))
+ return super(WorkerProcess, self)._bootstrap()
+
+ def start(self, tqm):
+ print(u"WORKER CALLING START: %s - %s" % (to_text(self._host), to_text(self._task)))
+ super(WorkerProcess, self).start()
def run(self):
'''
@@ -93,6 +104,8 @@ class WorkerProcess(multiprocessing.Process):
signify that they are ready for their next task.
'''
+ print(u"WORKER STARTING RUN: %s - %s" % (to_text(self._host), to_text(self._task)))
+ self._event.set()
#import cProfile, pstats, StringIO
#pr = cProfile.Profile()
#pr.enable()
@@ -103,6 +116,7 @@ class WorkerProcess(multiprocessing.Process):
try:
# execute the task and build a TaskResult from the result
display.debug("running TaskExecutor() for %s/%s" % (self._host, self._task))
+ print(u"WORKER RUNNING TASK: %s - %s" % (to_text(self._host), to_text(self._task)))
executor_result = TaskExecutor(
self._host,
self._task,
@@ -113,6 +127,7 @@ class WorkerProcess(multiprocessing.Process):
self._shared_loader_obj,
self._rslt_q
).run()
+ print(u"WORKER DONE RUNNING TASK: %s - %s" % (to_text(self._host), to_text(self._task)))
display.debug("done running TaskExecutor() for %s/%s" % (self._host, self._task))
self._host.vars = dict()
@@ -131,6 +146,8 @@ class WorkerProcess(multiprocessing.Process):
self._rslt_q.put(task_result, block=False)
except Exception as e:
+ print(u"WORKER EXCEPTION: %s" % to_text(e))
+ print(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc()))
if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound):
try:
self._host.vars = dict()
@@ -151,4 +168,5 @@ class WorkerProcess(multiprocessing.Process):
#with open('worker_%06d.stats' % os.getpid(), 'w') as f:
# f.write(s.getvalue())
+ print(u"WORKER DONE: %s - %s" % (to_text(self._host), to_text(self._task)))
sys.exit(0)
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index 6744367363..95c33030fe 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -26,6 +26,7 @@ import threading
import time
from collections import deque
+from multiprocessing.util import register_after_fork
from ansible import constants as C
from ansible.compat.six import string_types
@@ -126,6 +127,9 @@ class TaskQueueManager:
# the "queue" for the background thread to use
self._queued_tasks = deque()
self._queued_tasks_lock = threading.Lock()
+ register_after_fork(self._queued_tasks_lock, self._queued_tasks_lock.release)
+ self._thread_event = threading.Event()
+ self._thread_event.set()
# the background queuing thread
self._queue_thread = None
@@ -137,58 +141,74 @@ class TaskQueueManager:
# plugins for inter-process locking.
self._connection_lockfile = tempfile.TemporaryFile()
- def _queue_thread_main(self):
+ @staticmethod
+ def _queue_thread_main(tqm):
# create a dummy object with plugin loaders set as an easier
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
display.debug("queuing thread starting")
- while not self._terminated:
+ print_count = 0
+ while not tqm._terminated and tqm._thread_event.is_set():
available_workers = []
- for idx, entry in enumerate(self._workers):
+ for idx, entry in enumerate(tqm._workers):
(worker_prc, _) = entry
if worker_prc is None or not worker_prc.is_alive():
available_workers.append(idx)
- if len(available_workers) == 0:
+ if len(available_workers) == 0 or len(tqm._queued_tasks) == 0:
+ print_count += 1
+ if print_count > 100:
+ print("available workers: %s/%s, len queued tasks: %s" % (len(available_workers), len(tqm._workers), len(tqm._queued_tasks)))
+ print_count = 0
time.sleep(0.01)
continue
for worker_idx in available_workers:
try:
- self._queued_tasks_lock.acquire()
- (host, task, task_vars, play_context) = self._queued_tasks.pop()
+ tqm._queued_tasks_lock.acquire()
+ (host, task, task_vars, play_context) = tqm._queued_tasks.pop()
except IndexError:
break
finally:
- self._queued_tasks_lock.release()
+ tqm._queued_tasks_lock.release()
if task.action not in action_write_locks.action_write_locks:
display.debug('Creating lock for %s' % task.action)
action_write_locks.action_write_locks[task.action] = multiprocessing.Lock()
+ register_after_fork(action_write_locks.action_write_locks[task.action], action_write_locks.action_write_locks[task.action].release)
try:
+ e = multiprocessing.Event()
worker_prc = WorkerProcess(
- self._final_q,
- self._iterator._play,
+ tqm._final_q,
+ tqm._iterator._play,
host,
task,
task_vars,
play_context,
- self._loader,
- self._variable_manager,
+ tqm._loader,
+ tqm._variable_manager,
shared_loader_obj,
+ e,
)
- self._workers[worker_idx][0] = worker_prc
- worker_prc.start()
- display.debug("worker is %d (out of %d available)" % (worker_idx+1, len(self._workers)))
+ print("%s: starting worker" % os.getpid())
+ worker_prc.start(tqm)
+ e.wait()
+ print("%s: putting worker in slot %s" % (os.getpid(), worker_idx))
+ tqm._workers[worker_idx][0] = worker_prc
+ print("%s: done with worker" % os.getpid())
+ display.debug("worker is %d (out of %d available)" % (worker_idx+1, len(tqm._workers)))
except (EOFError, IOError, AssertionError) as e:
# most likely an abort
+ print("%s: WORKER QUEUE ERROR: %s" % (os.getpid(), e))
display.debug("got an error while queuing: %s" % e)
break
+ tqm._thread_event.set()
+ print("%s QUEUE THREAD EXITING" % os.getpid())
display.debug("queuing thread exiting")
def queue_task(self, host, task, task_vars, play_context):
@@ -197,18 +217,20 @@ class TaskQueueManager:
self._queued_tasks_lock.release()
def queue_multiple_tasks(self, items, play_context):
- for item in items:
- (host, task, task_vars) = item
+
+ try:
self._queued_tasks_lock.acquire()
- self._queued_tasks.append((host, task, task_vars, play_context))
+ for item in items:
+ (host, task, task_vars) = item
+ self._queued_tasks.append((host, task, task_vars, play_context))
+ finally:
self._queued_tasks_lock.release()
def _initialize_processes(self, num):
self._workers = []
for i in range(num):
- rslt_q = multiprocessing.Queue()
- self._workers.append([None, rslt_q])
+ self._workers.append([None, None])
def _initialize_notified_handlers(self, play):
'''
@@ -307,9 +329,17 @@ class TaskQueueManager:
if not self._callbacks_loaded:
self.load_callbacks()
- if self._queue_thread is None:
- self._queue_thread = threading.Thread(target=self._queue_thread_main)
- self._queue_thread.start()
+ if self._queue_thread is not None:
+ self._thread_event.clear()
+ self._thread_event.wait()
+
+ self._queue_thread = threading.Thread(
+ target=self._queue_thread_main,
+ args=(self,),
+ name="WorkerQueueThread",
+ )
+ self._queue_thread.daemon = True
+ self._queue_thread.start()
all_vars = self._variable_manager.get_vars(loader=self._loader, play=play)
templar = Templar(loader=self._loader, variables=all_vars)
@@ -399,7 +429,7 @@ class TaskQueueManager:
def _cleanup_processes(self):
for (worker_prc, rslt_q) in self._workers:
- rslt_q.close()
+ #rslt_q.close()
if worker_prc and worker_prc.is_alive():
try:
worker_prc.terminate()
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index 65b645a3d0..6dbbabcfc7 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -412,11 +412,17 @@ class StrategyBase:
display.debug("waiting for pending results...")
dead_check = 10
+ print_count = 0
while self._pending_results > 0 and not self._tqm._terminated:
results = self._process_pending_results(iterator)
ret_results.extend(results)
+ print_count += 1
+ if print_count > 1000:
+ print("waiting on pending results still: %s" % self._pending_results)
+ print_count = 0
+
dead_check -= 1
if dead_check == 0:
if self._pending_results > 0 and self._tqm.has_dead_workers():
diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py
index a843ae57e6..e741bd78d3 100644
--- a/lib/ansible/plugins/strategy/linear.py
+++ b/lib/ansible/plugins/strategy/linear.py
@@ -275,7 +275,8 @@ class StrategyModule(StrategyBase):
if self._pending_results > 0:
results += self._process_pending_results(iterator, one_pass=True)
- self._tqm.queue_multiple_tasks(items_to_queue, play_context)
+ if len(items_to_queue) > 0:
+ self._tqm.queue_multiple_tasks(items_to_queue, play_context)
# go to next host/task group
if skip_rest:
diff --git a/shippable.yml b/shippable.yml
index f10b7b2299..addcf41202 100644
--- a/shippable.yml
+++ b/shippable.yml
@@ -8,55 +8,8 @@ matrix:
exclude:
- env: TEST=none
include:
- - env: TEST=remote TARGET=all PLATFORM=osx VERSION=10.11
+ - env: TEST=integration TARGET=other IMAGE=ansible/ansible:ubuntu1404 ANSIBLE_DEBUG=1 TEST_FLAGS='-vv'
- - env: TEST=remote TARGET=all PLATFORM=freebsd VERSION=10.3-STABLE
-
- - env: TEST=remote TARGET=ci_win1 PLATFORM=windows VERSION=2012-R2_RTM
- - env: TEST=remote TARGET=ci_win2 PLATFORM=windows VERSION=2012-R2_RTM
- - env: TEST=remote TARGET=ci_win3 PLATFORM=windows VERSION=2012-R2_RTM
-
- - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:centos6
- - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:centos7
- - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:fedora-rawhide
- - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:fedora23
- - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:opensuseleap
- - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:ubuntu1204 PRIVILEGED=true
- - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:ubuntu1404 PRIVILEGED=true
- - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:ubuntu1604
- - env: TEST=integration TARGET=destructive IMAGE=ansible/ansible:ubuntu1604py3 PYTHON3=1
-
- - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:centos6
- - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:centos7
- - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:fedora-rawhide
- - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:fedora23
- - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:opensuseleap
- - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:ubuntu1204
- - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:ubuntu1404
- - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:ubuntu1604
- - env: TEST=integration TARGET=non_destructive IMAGE=ansible/ansible:ubuntu1604py3 PYTHON3=1
-
- - env: TEST=integration TARGET=other IMAGE=ansible/ansible:centos6
- - env: TEST=integration TARGET=other IMAGE=ansible/ansible:centos7
- - env: TEST=integration TARGET=other IMAGE=ansible/ansible:fedora-rawhide
- - env: TEST=integration TARGET=other IMAGE=ansible/ansible:fedora23
- - env: TEST=integration TARGET=other IMAGE=ansible/ansible:opensuseleap
- - env: TEST=integration TARGET=other IMAGE=ansible/ansible:ubuntu1204
- - env: TEST=integration TARGET=other IMAGE=ansible/ansible:ubuntu1404
- - env: TEST=integration TARGET=other IMAGE=ansible/ansible:ubuntu1604
- - env: TEST=integration TARGET=other IMAGE=ansible/ansible:ubuntu1604py3 PYTHON3=1
-
- - env: TEST=sanity INSTALL_DEPS=1 TOXENV=py24
- python: 2.7
- - env: TEST=sanity INSTALL_DEPS=1 TOXENV=py26
- python: 2.6
- - env: TEST=sanity INSTALL_DEPS=1 TOXENV=py27
- python: 2.7
- - env: TEST=sanity INSTALL_DEPS=1 TOXENV=py35
- python: 3.5
-
- - env: TEST=code-smell INSTALL_DEPS=1
- python: 2.7
build:
pre_ci_boot:
options: "--privileged=false --net=bridge"