diff options
author | Zuul <zuul@review.opendev.org> | 2022-08-12 08:04:35 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2022-08-12 08:04:35 +0000 |
commit | 73a7d9c1459468298ca22b4ad8096893669c71ec (patch) | |
tree | 28f6cee0494b24b8f83e80696a297d0d169e408c /zuul | |
parent | d35b9f8c73df7fd84251d225f15f3cd22388690c (diff) | |
parent | c1b2fa55988b3f22b392086fe003bde2444ebbdc (diff) | |
download | zuul-73a7d9c1459468298ca22b4ad8096893669c71ec.tar.gz |
Merge "zuul-stream: implement a protocol and version"
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/ansible/base/callback/zuul_stream.py | 93 | ||||
-rwxr-xr-x | zuul/ansible/base/library/zuul_console.py | 37 |
2 files changed, 99 insertions, 31 deletions
diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py index 720261cb2..d33b4dda0 100644 --- a/zuul/ansible/base/callback/zuul_stream.py +++ b/zuul/ansible/base/callback/zuul_stream.py @@ -48,6 +48,7 @@ from zuul.ansible import paths from zuul.ansible import logconfig LOG_STREAM_PORT = int(os.environ.get("ZUUL_CONSOLE_PORT", 19885)) +LOG_STREAM_VERSION = 0 def zuul_filter_result(result): @@ -103,6 +104,7 @@ class CallbackModule(default.CallbackModule): self._items_done = False self._deferred_result = None self._playbook_name = None + self._zuul_console_version = 0 def configure_logger(self): # ansible appends timestamp, user and pid to the log lines emitted @@ -129,9 +131,7 @@ class CallbackModule(default.CallbackModule): else: self._display.display(msg) - def _read_log(self, host, ip, port, log_id, task_name, hosts): - self._log("[%s] Starting to log %s for task %s" - % (host, log_id, task_name), job=False, executor=True) + def _read_log_connect(self, host, ip, port): logger_retries = 0 while True: try: @@ -141,6 +141,7 @@ class CallbackModule(default.CallbackModule): # logs continously. Without this we can easily trip the 5 # second timeout. s.settimeout(None) + return s except socket.timeout: self._log( "Timeout exception waiting for the logger. " @@ -151,7 +152,7 @@ class CallbackModule(default.CallbackModule): "Timeout exception waiting for the logger. " "Please check connectivity to [%s:%s]" % (ip, port)) - return + return None except Exception: if logger_retries % 10 == 0: self._log("[%s] Waiting on logger" % host, @@ -159,31 +160,67 @@ class CallbackModule(default.CallbackModule): logger_retries += 1 time.sleep(0.1) continue - msg = "%s\n" % log_id - s.send(msg.encode("utf-8")) - buff = s.recv(4096) - buffering = True - while buffering: - if b'\n' in buff: - (line, buff) = buff.split(b'\n', 1) - # We can potentially get binary data here. In order to - # being able to handle that use the backslashreplace - # error handling method. This decodes unknown utf-8 - # code points to escape sequences which exactly represent - # the correct data without throwing a decoding exception. - done = self._log_streamline( - host, line.decode("utf-8", "backslashreplace")) - if done: - return + + def _read_log(self, host, ip, port, log_id, task_name, hosts): + self._log("[%s] Starting to log %s for task %s" + % (host, log_id, task_name), job=False, executor=True) + + s = self._read_log_connect(host, ip, port) + if s is None: + # Can't connect; _read_log_connect() already logged an + # error for us, just bail + return + + # Find out what version we are running against + s.send(f'v:{LOG_STREAM_VERSION}\n'.encode('utf-8')) + buff = s.recv(1024).decode('utf-8').strip() + + # NOTE(ianw) 2022-07-21 : zuul_console from < 6.3.0 do not + # understand this protocol. They will assume the send + # above is a log request and send back the not found + # message in a loop. So to handle this we disconnect and + # reconnect. When we decide to remove this, we can remove + # anything in the "version 0" path. + if buff == '[Zuul] Log not found': + s.close() + s = self._read_log_connect(host, ip, port) + if s is None: + return + else: + self._zuul_console_version = int(buff) + self._log('[%s] Reports streaming version: %d' % + (host, self._zuul_console_version), + job=False, executor=True) + + if self._zuul_console_version >= 1: + msg = 's:%s\n' % log_id + else: + msg = '%s\n' % log_id + + s.send(msg.encode("utf-8")) + buff = s.recv(4096) + buffering = True + while buffering: + if b'\n' in buff: + (line, buff) = buff.split(b'\n', 1) + # We can potentially get binary data here. In order to + # being able to handle that use the backslashreplace + # error handling method. This decodes unknown utf-8 + # code points to escape sequences which exactly represent + # the correct data without throwing a decoding exception. + done = self._log_streamline( + host, line.decode("utf-8", "backslashreplace")) + if done: + return + else: + more = s.recv(4096) + if not more: + buffering = False else: - more = s.recv(4096) - if not more: - buffering = False - else: - buff += more - if buff: - self._log_streamline( - host, buff.decode("utf-8", "backslashreplace")) + buff += more + if buff: + self._log_streamline( + host, buff.decode("utf-8", "backslashreplace")) def _log_streamline(self, host, line): if "[Zuul] Task exit code" in line: diff --git a/zuul/ansible/base/library/zuul_console.py b/zuul/ansible/base/library/zuul_console.py index 9dffbbc3a..bd23d5f49 100755 --- a/zuul/ansible/base/library/zuul_console.py +++ b/zuul/ansible/base/library/zuul_console.py @@ -24,6 +24,14 @@ import subprocess import threading import time +# This is the version we report to the zuul_stream callback. It is +# expected that this (zuul_console) process can be long-lived, so if +# there are updates this ensures a later streaming callback can still +# talk to us. +ZUUL_CONSOLE_PROTO_VERSION = 1 +# This is the template for the file name of the log-file written out +# by the command.py override command in the executor's Ansible +# install. LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log' LOG_STREAM_PORT = 19885 @@ -162,15 +170,38 @@ class Server(object): ret = buffer.decode('utf-8') x = ret.find('\n') if x > 0: - return ret[:x] + return ret[:x].strip() except UnicodeDecodeError: pass def handleOneConnection(self, conn): - log_uuid = self.get_command(conn) + # V1 protocol + # ----------- + # v:<ver> get version number, <ver> is remote version + # s:<uuid> send logs for <uuid> + while True: + command = self.get_command(conn) + if command.startswith('v:'): + # NOTE(ianw) : remote sends its version. We currently + # don't have anything to do with this value, so ignore + # for now. + conn.send(f'{ZUUL_CONSOLE_PROTO_VERSION}\n'.encode('utf-8')) + continue + elif command.startswith('s:'): + log_uuid = command[2:] + break + else: + # NOTE(ianw): 2022-07-21 In releases < 6.3.0 the streaming + # side would just send a raw uuid and nothing else; so by + # default assume that is what is coming in here. We can + # remove this fallback when we decide it is no longer + # necessary. + log_uuid = command + break + # use path split to make use the input isn't trying to be clever # and construct some path like /tmp/console-/../../something - log_uuid = os.path.split(log_uuid.rstrip())[-1] + log_uuid = os.path.split(log_uuid)[-1] # FIXME: this won't notice disconnects until it tries to send console = None |