diff options
author | Zuul <zuul@review.opendev.org> | 2022-08-12 08:04:35 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2022-08-12 08:04:35 +0000 |
commit | 73a7d9c1459468298ca22b4ad8096893669c71ec (patch) | |
tree | 28f6cee0494b24b8f83e80696a297d0d169e408c | |
parent | d35b9f8c73df7fd84251d225f15f3cd22388690c (diff) | |
parent | c1b2fa55988b3f22b392086fe003bde2444ebbdc (diff) | |
download | zuul-73a7d9c1459468298ca22b4ad8096893669c71ec.tar.gz |
Merge "zuul-stream: implement a protocol and version"
-rw-r--r-- | playbooks/zuul-stream/functional.yaml | 6 | ||||
-rw-r--r-- | tests/remote/test_remote_zuul_stream.py | 2 | ||||
-rw-r--r-- | zuul/ansible/base/callback/zuul_stream.py | 93 | ||||
-rwxr-xr-x | zuul/ansible/base/library/zuul_console.py | 37 |
4 files changed, 106 insertions, 32 deletions
diff --git a/playbooks/zuul-stream/functional.yaml b/playbooks/zuul-stream/functional.yaml index 3b79e7b40..2f2e9a059 100644 --- a/playbooks/zuul-stream/functional.yaml +++ b/playbooks/zuul-stream/functional.yaml @@ -19,12 +19,18 @@ ZUUL_JOB_LOG_CONFIG: "{{ ansible_user_dir}}/logging.json" ZUUL_JOBDIR: "{{ ansible_user_dir}}" PYTHONPATH: "{{ python_path }}" + register: _success_output - name: Save output shell: | mv job-output.txt job-output-success-19887.txt mv job-output.json job-output-success-19887.json + - name: Check protocol version + assert: + that: + - "'[node1] Reports streaming version: 1' in _success_output.stdout" + # NOTE(ianw) 2022-07 : we deliberatly have this second step to run # against the console setup by the infrastructure executor in the # job pre playbooks as a backwards compatability sanity check. diff --git a/tests/remote/test_remote_zuul_stream.py b/tests/remote/test_remote_zuul_stream.py index 1f6b7fff7..1c705127e 100644 --- a/tests/remote/test_remote_zuul_stream.py +++ b/tests/remote/test_remote_zuul_stream.py @@ -29,7 +29,7 @@ class FunctionalZuulStreamMixIn: self.log_console_port = 19000 + int( self.ansible_core_version.split('.')[1]) self.executor_server.log_console_port = self.log_console_port - self.wait_timeout = 120 + self.wait_timeout = 180 self.fake_nodepool.remote_ansible = True ansible_remote = os.environ.get('ZUUL_REMOTE_IPV4') diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py index 720261cb2..d33b4dda0 100644 --- a/zuul/ansible/base/callback/zuul_stream.py +++ b/zuul/ansible/base/callback/zuul_stream.py @@ -48,6 +48,7 @@ from zuul.ansible import paths from zuul.ansible import logconfig LOG_STREAM_PORT = int(os.environ.get("ZUUL_CONSOLE_PORT", 19885)) +LOG_STREAM_VERSION = 0 def zuul_filter_result(result): @@ -103,6 +104,7 @@ class CallbackModule(default.CallbackModule): self._items_done = False self._deferred_result = None self._playbook_name = None + self._zuul_console_version = 0 def configure_logger(self): # ansible appends timestamp, user and pid to the log lines emitted @@ -129,9 +131,7 @@ class CallbackModule(default.CallbackModule): else: self._display.display(msg) - def _read_log(self, host, ip, port, log_id, task_name, hosts): - self._log("[%s] Starting to log %s for task %s" - % (host, log_id, task_name), job=False, executor=True) + def _read_log_connect(self, host, ip, port): logger_retries = 0 while True: try: @@ -141,6 +141,7 @@ class CallbackModule(default.CallbackModule): # logs continously. Without this we can easily trip the 5 # second timeout. s.settimeout(None) + return s except socket.timeout: self._log( "Timeout exception waiting for the logger. " @@ -151,7 +152,7 @@ class CallbackModule(default.CallbackModule): "Timeout exception waiting for the logger. " "Please check connectivity to [%s:%s]" % (ip, port)) - return + return None except Exception: if logger_retries % 10 == 0: self._log("[%s] Waiting on logger" % host, @@ -159,31 +160,67 @@ class CallbackModule(default.CallbackModule): logger_retries += 1 time.sleep(0.1) continue - msg = "%s\n" % log_id - s.send(msg.encode("utf-8")) - buff = s.recv(4096) - buffering = True - while buffering: - if b'\n' in buff: - (line, buff) = buff.split(b'\n', 1) - # We can potentially get binary data here. In order to - # being able to handle that use the backslashreplace - # error handling method. This decodes unknown utf-8 - # code points to escape sequences which exactly represent - # the correct data without throwing a decoding exception. - done = self._log_streamline( - host, line.decode("utf-8", "backslashreplace")) - if done: - return + + def _read_log(self, host, ip, port, log_id, task_name, hosts): + self._log("[%s] Starting to log %s for task %s" + % (host, log_id, task_name), job=False, executor=True) + + s = self._read_log_connect(host, ip, port) + if s is None: + # Can't connect; _read_log_connect() already logged an + # error for us, just bail + return + + # Find out what version we are running against + s.send(f'v:{LOG_STREAM_VERSION}\n'.encode('utf-8')) + buff = s.recv(1024).decode('utf-8').strip() + + # NOTE(ianw) 2022-07-21 : zuul_console from < 6.3.0 do not + # understand this protocol. They will assume the send + # above is a log request and send back the not found + # message in a loop. So to handle this we disconnect and + # reconnect. When we decide to remove this, we can remove + # anything in the "version 0" path. + if buff == '[Zuul] Log not found': + s.close() + s = self._read_log_connect(host, ip, port) + if s is None: + return + else: + self._zuul_console_version = int(buff) + self._log('[%s] Reports streaming version: %d' % + (host, self._zuul_console_version), + job=False, executor=True) + + if self._zuul_console_version >= 1: + msg = 's:%s\n' % log_id + else: + msg = '%s\n' % log_id + + s.send(msg.encode("utf-8")) + buff = s.recv(4096) + buffering = True + while buffering: + if b'\n' in buff: + (line, buff) = buff.split(b'\n', 1) + # We can potentially get binary data here. In order to + # being able to handle that use the backslashreplace + # error handling method. This decodes unknown utf-8 + # code points to escape sequences which exactly represent + # the correct data without throwing a decoding exception. + done = self._log_streamline( + host, line.decode("utf-8", "backslashreplace")) + if done: + return + else: + more = s.recv(4096) + if not more: + buffering = False else: - more = s.recv(4096) - if not more: - buffering = False - else: - buff += more - if buff: - self._log_streamline( - host, buff.decode("utf-8", "backslashreplace")) + buff += more + if buff: + self._log_streamline( + host, buff.decode("utf-8", "backslashreplace")) def _log_streamline(self, host, line): if "[Zuul] Task exit code" in line: diff --git a/zuul/ansible/base/library/zuul_console.py b/zuul/ansible/base/library/zuul_console.py index 9dffbbc3a..bd23d5f49 100755 --- a/zuul/ansible/base/library/zuul_console.py +++ b/zuul/ansible/base/library/zuul_console.py @@ -24,6 +24,14 @@ import subprocess import threading import time +# This is the version we report to the zuul_stream callback. It is +# expected that this (zuul_console) process can be long-lived, so if +# there are updates this ensures a later streaming callback can still +# talk to us. +ZUUL_CONSOLE_PROTO_VERSION = 1 +# This is the template for the file name of the log-file written out +# by the command.py override command in the executor's Ansible +# install. LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log' LOG_STREAM_PORT = 19885 @@ -162,15 +170,38 @@ class Server(object): ret = buffer.decode('utf-8') x = ret.find('\n') if x > 0: - return ret[:x] + return ret[:x].strip() except UnicodeDecodeError: pass def handleOneConnection(self, conn): - log_uuid = self.get_command(conn) + # V1 protocol + # ----------- + # v:<ver> get version number, <ver> is remote version + # s:<uuid> send logs for <uuid> + while True: + command = self.get_command(conn) + if command.startswith('v:'): + # NOTE(ianw) : remote sends its version. We currently + # don't have anything to do with this value, so ignore + # for now. + conn.send(f'{ZUUL_CONSOLE_PROTO_VERSION}\n'.encode('utf-8')) + continue + elif command.startswith('s:'): + log_uuid = command[2:] + break + else: + # NOTE(ianw): 2022-07-21 In releases < 6.3.0 the streaming + # side would just send a raw uuid and nothing else; so by + # default assume that is what is coming in here. We can + # remove this fallback when we decide it is no longer + # necessary. + log_uuid = command + break + # use path split to make use the input isn't trying to be clever # and construct some path like /tmp/console-/../../something - log_uuid = os.path.split(log_uuid.rstrip())[-1] + log_uuid = os.path.split(log_uuid)[-1] # FIXME: this won't notice disconnects until it tries to send console = None |