diff options
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/ansible/base/callback/zuul_stream.py | 103 | ||||
-rwxr-xr-x | zuul/ansible/base/library/zuul_console.py | 50 |
2 files changed, 121 insertions, 32 deletions
diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py index 720261cb2..f31983ed6 100644 --- a/zuul/ansible/base/callback/zuul_stream.py +++ b/zuul/ansible/base/callback/zuul_stream.py @@ -48,6 +48,7 @@ from zuul.ansible import paths from zuul.ansible import logconfig LOG_STREAM_PORT = int(os.environ.get("ZUUL_CONSOLE_PORT", 19885)) +LOG_STREAM_VERSION = 0 def zuul_filter_result(result): @@ -103,6 +104,7 @@ class CallbackModule(default.CallbackModule): self._items_done = False self._deferred_result = None self._playbook_name = None + self._zuul_console_version = 0 def configure_logger(self): # ansible appends timestamp, user and pid to the log lines emitted @@ -129,9 +131,7 @@ class CallbackModule(default.CallbackModule): else: self._display.display(msg) - def _read_log(self, host, ip, port, log_id, task_name, hosts): - self._log("[%s] Starting to log %s for task %s" - % (host, log_id, task_name), job=False, executor=True) + def _read_log_connect(self, host, ip, port): logger_retries = 0 while True: try: @@ -141,6 +141,7 @@ class CallbackModule(default.CallbackModule): # logs continously. Without this we can easily trip the 5 # second timeout. s.settimeout(None) + return s except socket.timeout: self._log( "Timeout exception waiting for the logger. " @@ -151,7 +152,7 @@ class CallbackModule(default.CallbackModule): "Timeout exception waiting for the logger. " "Please check connectivity to [%s:%s]" % (ip, port)) - return + return None except Exception: if logger_retries % 10 == 0: self._log("[%s] Waiting on logger" % host, @@ -159,31 +160,77 @@ class CallbackModule(default.CallbackModule): logger_retries += 1 time.sleep(0.1) continue - msg = "%s\n" % log_id - s.send(msg.encode("utf-8")) - buff = s.recv(4096) - buffering = True - while buffering: - if b'\n' in buff: - (line, buff) = buff.split(b'\n', 1) - # We can potentially get binary data here. In order to - # being able to handle that use the backslashreplace - # error handling method. This decodes unknown utf-8 - # code points to escape sequences which exactly represent - # the correct data without throwing a decoding exception. - done = self._log_streamline( - host, line.decode("utf-8", "backslashreplace")) - if done: - return + + def _read_log(self, host, ip, port, log_id, task_name, hosts): + self._log("[%s] Starting to log %s for task %s" + % (host, log_id, task_name), job=False, executor=True) + + s = self._read_log_connect(host, ip, port) + if s is None: + # Can't connect; _read_log_connect() already logged an + # error for us, just bail + return + + # Find out what version we are running against + s.send(f'v:{LOG_STREAM_VERSION}\n'.encode('utf-8')) + buff = s.recv(1024).decode('utf-8').strip() + + # NOTE(ianw) 2022-07-21 : zuul_console from < 6.3.0 do not + # understand this protocol. They will assume the send + # above is a log request and send back the not found + # message in a loop. So to handle this we disconnect and + # reconnect. When we decide to remove this, we can remove + # anything in the "version 0" path. + if buff == '[Zuul] Log not found': + s.close() + s = self._read_log_connect(host, ip, port) + if s is None: + return + else: + self._zuul_console_version = int(buff) + self._log('[%s] Reports streaming version: %d' % + (host, self._zuul_console_version), + job=False, executor=True) + + if self._zuul_console_version >= 1: + msg = 's:%s\n' % log_id + else: + msg = '%s\n' % log_id + + s.send(msg.encode("utf-8")) + buff = s.recv(4096) + buffering = True + while buffering: + if b'\n' in buff: + (line, buff) = buff.split(b'\n', 1) + # We can potentially get binary data here. In order to + # being able to handle that use the backslashreplace + # error handling method. This decodes unknown utf-8 + # code points to escape sequences which exactly represent + # the correct data without throwing a decoding exception. + done = self._log_streamline( + host, line.decode("utf-8", "backslashreplace")) + if done: + if self._zuul_console_version > 0: + try: + # reestablish connection and tell console to + # clean up + s = self._read_log_connect(host, ip, port) + s.send(f'f:{log_id}\n'.encode('utf-8')) + s.close() + except Exception: + # Don't worry if this fails + pass + return + else: + more = s.recv(4096) + if not more: + buffering = False else: - more = s.recv(4096) - if not more: - buffering = False - else: - buff += more - if buff: - self._log_streamline( - host, buff.decode("utf-8", "backslashreplace")) + buff += more + if buff: + self._log_streamline( + host, buff.decode("utf-8", "backslashreplace")) def _log_streamline(self, host, line): if "[Zuul] Task exit code" in line: diff --git a/zuul/ansible/base/library/zuul_console.py b/zuul/ansible/base/library/zuul_console.py index 9dffbbc3a..aa999cac1 100755 --- a/zuul/ansible/base/library/zuul_console.py +++ b/zuul/ansible/base/library/zuul_console.py @@ -24,6 +24,14 @@ import subprocess import threading import time +# This is the version we report to the zuul_stream callback. It is +# expected that this (zuul_console) process can be long-lived, so if +# there are updates this ensures a later streaming callback can still +# talk to us. +ZUUL_CONSOLE_PROTO_VERSION = 1 +# This is the template for the file name of the log-file written out +# by the command.py override command in the executor's Ansible +# install. LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log' LOG_STREAM_PORT = 19885 @@ -162,15 +170,49 @@ class Server(object): ret = buffer.decode('utf-8') x = ret.find('\n') if x > 0: - return ret[:x] + return ret[:x].strip() except UnicodeDecodeError: pass - def handleOneConnection(self, conn): - log_uuid = self.get_command(conn) + def _clean_uuid(self, log_uuid): # use path split to make use the input isn't trying to be clever # and construct some path like /tmp/console-/../../something - log_uuid = os.path.split(log_uuid.rstrip())[-1] + return os.path.split(log_uuid)[-1] + + def handleOneConnection(self, conn): + # V1 protocol + # ----------- + # v:<ver> get version number, <ver> is remote version + # s:<uuid> send logs for <uuid> + # f:<uuid> finalise/cleanup <uuid> + while True: + command = self.get_command(conn) + if command.startswith('v:'): + # NOTE(ianw) : remote sends its version. We currently + # don't have anything to do with this value, so ignore + # for now. + cmd = '%s\n' % (ZUUL_CONSOLE_PROTO_VERSION) + conn.send(cmd.encode('utf-8')) + continue + elif command.startswith('f:'): + log_uuid = self._clean_uuid(command[2:]) + try: + os.unlink(self.path.format(log_uuid=log_uuid)) + except Exception: + # something might have cleaned /tmp + pass + continue + elif command.startswith('s:'): + log_uuid = self._clean_uuid(command[2:]) + break + else: + # NOTE(ianw): 2022-07-21 In releases < 6.3.0 the streaming + # side would just send a raw uuid and nothing else; so by + # default assume that is what is coming in here. We can + # remove this fallback when we decide it is no longer + # necessary. + log_uuid = self._clean_uuid(command) + break # FIXME: this won't notice disconnects until it tries to send console = None |