diff options
-rw-r--r-- | playbooks/zuul-stream/functional.yaml | 5 | ||||
-rw-r--r-- | zuul/ansible/base/callback/zuul_stream.py | 33 |
2 files changed, 20 insertions, 18 deletions
diff --git a/playbooks/zuul-stream/functional.yaml b/playbooks/zuul-stream/functional.yaml index b8a44a87c..cf60d2cf6 100644 --- a/playbooks/zuul-stream/functional.yaml +++ b/playbooks/zuul-stream/functional.yaml @@ -31,11 +31,6 @@ mv job-output.txt job-output-success-19887.txt mv job-output.json job-output-success-19887.json - - name: Check protocol version - assert: - that: - - "'[node1] Reports streaming version: 1' in _success_output.stdout" - # Streamer puts out a line like # [node1] Starting to log 916b2084-4bbb-80e5-248e-000000000016-1-node1 for task TASK: Print binary data # One of the tasks in job-output shows find: results; diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py index f31983ed6..39d3aa953 100644 --- a/zuul/ansible/base/callback/zuul_stream.py +++ b/zuul/ansible/base/callback/zuul_stream.py @@ -121,6 +121,21 @@ class CallbackModule(default.CallbackModule): self._logger = logging.getLogger('zuul.executor.ansible') def _log(self, msg, ts=None, job=True, executor=False, debug=False): + # With the default "linear" strategy (and likely others), + # Ansible will send the on_task_start callback, and then fork + # a worker process to execute that task. Since we spawn a + # thread in the on_task_start callback, we can end up emitting + # a log message in this method while Ansible is forking. If a + # forked process inherits a Python file object (i.e., stdout) + # that is locked by a thread that doesn't exist in the fork + # (i.e., this one), it can deadlock when trying to flush the + # file object. To minimize the chances of that happening, we + # should avoid using _display outside the main thread. + # Therefore: + + # Do not set executor=True from any calls from a thread + # spawned in this callback. + msg = msg.rstrip() if job: now = ts or datetime.datetime.now() @@ -143,10 +158,6 @@ class CallbackModule(default.CallbackModule): s.settimeout(None) return s except socket.timeout: - self._log( - "Timeout exception waiting for the logger. " - "Please check connectivity to [%s:%s]" - % (ip, port), executor=True) self._log_streamline( "localhost", "Timeout exception waiting for the logger. " @@ -155,16 +166,12 @@ class CallbackModule(default.CallbackModule): return None except Exception: if logger_retries % 10 == 0: - self._log("[%s] Waiting on logger" % host, - executor=True, debug=True) + self._log("[%s] Waiting on logger" % host) logger_retries += 1 time.sleep(0.1) continue def _read_log(self, host, ip, port, log_id, task_name, hosts): - self._log("[%s] Starting to log %s for task %s" - % (host, log_id, task_name), job=False, executor=True) - s = self._read_log_connect(host, ip, port) if s is None: # Can't connect; _read_log_connect() already logged an @@ -188,9 +195,6 @@ class CallbackModule(default.CallbackModule): return else: self._zuul_console_version = int(buff) - self._log('[%s] Reports streaming version: %d' % - (host, self._zuul_console_version), - job=False, executor=True) if self._zuul_console_version >= 1: msg = 's:%s\n' % log_id @@ -349,6 +353,9 @@ class CallbackModule(default.CallbackModule): log_id = "%s-%s-%s" % ( self._task._uuid, count, log_host) + self._log("[%s] Starting to log %s for task %s" + % (host, log_id, task_name), + job=False, executor=True) streamer = threading.Thread( target=self._read_log, args=( host, ip, port, log_id, task_name, hosts)) @@ -369,7 +376,7 @@ class CallbackModule(default.CallbackModule): streamer.join(30) if streamer.is_alive(): msg = "[Zuul] Log Stream did not terminate" - self._log(msg, job=True, executor=True) + self._log(msg) self._streamers_stop = False def _process_result_for_localhost(self, result, is_task=True): |