summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-09-01 01:46:42 +0000
committerGerrit Code Review <review@openstack.org>2022-09-01 01:46:42 +0000
commit64fa9db5799d33838267183f5c6734d4ba5ee7c9 (patch)
tree3c05e2c63f22768420a782f3f17e2a8f4bf0addc
parentea27f7a12424be833e42d580d1ba78d15f37c85d (diff)
parent36de4939a8692529355b422cea3ae6a0f777f7e4 (diff)
downloadzuul-64fa9db5799d33838267183f5c6734d4ba5ee7c9.tar.gz
Merge "Do not use _display outside the main thread in zuul_stream"
-rw-r--r--playbooks/zuul-stream/functional.yaml5
-rw-r--r--zuul/ansible/base/callback/zuul_stream.py33
2 files changed, 20 insertions, 18 deletions
diff --git a/playbooks/zuul-stream/functional.yaml b/playbooks/zuul-stream/functional.yaml
index b8a44a87c..cf60d2cf6 100644
--- a/playbooks/zuul-stream/functional.yaml
+++ b/playbooks/zuul-stream/functional.yaml
@@ -31,11 +31,6 @@
mv job-output.txt job-output-success-19887.txt
mv job-output.json job-output-success-19887.json
- - name: Check protocol version
- assert:
- that:
- - "'[node1] Reports streaming version: 1' in _success_output.stdout"
-
# Streamer puts out a line like
# [node1] Starting to log 916b2084-4bbb-80e5-248e-000000000016-1-node1 for task TASK: Print binary data
# One of the tasks in job-output shows find: results;
diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py
index f31983ed6..39d3aa953 100644
--- a/zuul/ansible/base/callback/zuul_stream.py
+++ b/zuul/ansible/base/callback/zuul_stream.py
@@ -121,6 +121,21 @@ class CallbackModule(default.CallbackModule):
self._logger = logging.getLogger('zuul.executor.ansible')
def _log(self, msg, ts=None, job=True, executor=False, debug=False):
+ # With the default "linear" strategy (and likely others),
+ # Ansible will send the on_task_start callback, and then fork
+ # a worker process to execute that task. Since we spawn a
+ # thread in the on_task_start callback, we can end up emitting
+ # a log message in this method while Ansible is forking. If a
+ # forked process inherits a Python file object (i.e., stdout)
+ # that is locked by a thread that doesn't exist in the fork
+ # (i.e., this one), it can deadlock when trying to flush the
+ # file object. To minimize the chances of that happening, we
+ # should avoid using _display outside the main thread.
+ # Therefore:
+
+ # Do not set executor=True from any calls from a thread
+ # spawned in this callback.
+
msg = msg.rstrip()
if job:
now = ts or datetime.datetime.now()
@@ -143,10 +158,6 @@ class CallbackModule(default.CallbackModule):
s.settimeout(None)
return s
except socket.timeout:
- self._log(
- "Timeout exception waiting for the logger. "
- "Please check connectivity to [%s:%s]"
- % (ip, port), executor=True)
self._log_streamline(
"localhost",
"Timeout exception waiting for the logger. "
@@ -155,16 +166,12 @@ class CallbackModule(default.CallbackModule):
return None
except Exception:
if logger_retries % 10 == 0:
- self._log("[%s] Waiting on logger" % host,
- executor=True, debug=True)
+ self._log("[%s] Waiting on logger" % host)
logger_retries += 1
time.sleep(0.1)
continue
def _read_log(self, host, ip, port, log_id, task_name, hosts):
- self._log("[%s] Starting to log %s for task %s"
- % (host, log_id, task_name), job=False, executor=True)
-
s = self._read_log_connect(host, ip, port)
if s is None:
# Can't connect; _read_log_connect() already logged an
@@ -188,9 +195,6 @@ class CallbackModule(default.CallbackModule):
return
else:
self._zuul_console_version = int(buff)
- self._log('[%s] Reports streaming version: %d' %
- (host, self._zuul_console_version),
- job=False, executor=True)
if self._zuul_console_version >= 1:
msg = 's:%s\n' % log_id
@@ -349,6 +353,9 @@ class CallbackModule(default.CallbackModule):
log_id = "%s-%s-%s" % (
self._task._uuid, count, log_host)
+ self._log("[%s] Starting to log %s for task %s"
+ % (host, log_id, task_name),
+ job=False, executor=True)
streamer = threading.Thread(
target=self._read_log, args=(
host, ip, port, log_id, task_name, hosts))
@@ -369,7 +376,7 @@ class CallbackModule(default.CallbackModule):
streamer.join(30)
if streamer.is_alive():
msg = "[Zuul] Log Stream did not terminate"
- self._log(msg, job=True, executor=True)
+ self._log(msg)
self._streamers_stop = False
def _process_result_for_localhost(self, result, is_task=True):