summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-08-12 08:04:35 +0000
committerGerrit Code Review <review@openstack.org>2022-08-12 08:04:35 +0000
commit73a7d9c1459468298ca22b4ad8096893669c71ec (patch)
tree28f6cee0494b24b8f83e80696a297d0d169e408c
parentd35b9f8c73df7fd84251d225f15f3cd22388690c (diff)
parentc1b2fa55988b3f22b392086fe003bde2444ebbdc (diff)
downloadzuul-73a7d9c1459468298ca22b4ad8096893669c71ec.tar.gz
Merge "zuul-stream: implement a protocol and version"
-rw-r--r--playbooks/zuul-stream/functional.yaml6
-rw-r--r--tests/remote/test_remote_zuul_stream.py2
-rw-r--r--zuul/ansible/base/callback/zuul_stream.py93
-rwxr-xr-xzuul/ansible/base/library/zuul_console.py37
4 files changed, 106 insertions, 32 deletions
diff --git a/playbooks/zuul-stream/functional.yaml b/playbooks/zuul-stream/functional.yaml
index 3b79e7b40..2f2e9a059 100644
--- a/playbooks/zuul-stream/functional.yaml
+++ b/playbooks/zuul-stream/functional.yaml
@@ -19,12 +19,18 @@
ZUUL_JOB_LOG_CONFIG: "{{ ansible_user_dir}}/logging.json"
ZUUL_JOBDIR: "{{ ansible_user_dir}}"
PYTHONPATH: "{{ python_path }}"
+ register: _success_output
- name: Save output
shell: |
mv job-output.txt job-output-success-19887.txt
mv job-output.json job-output-success-19887.json
+ - name: Check protocol version
+ assert:
+ that:
+ - "'[node1] Reports streaming version: 1' in _success_output.stdout"
+
# NOTE(ianw) 2022-07 : we deliberatly have this second step to run
# against the console setup by the infrastructure executor in the
# job pre playbooks as a backwards compatability sanity check.
diff --git a/tests/remote/test_remote_zuul_stream.py b/tests/remote/test_remote_zuul_stream.py
index 1f6b7fff7..1c705127e 100644
--- a/tests/remote/test_remote_zuul_stream.py
+++ b/tests/remote/test_remote_zuul_stream.py
@@ -29,7 +29,7 @@ class FunctionalZuulStreamMixIn:
self.log_console_port = 19000 + int(
self.ansible_core_version.split('.')[1])
self.executor_server.log_console_port = self.log_console_port
- self.wait_timeout = 120
+ self.wait_timeout = 180
self.fake_nodepool.remote_ansible = True
ansible_remote = os.environ.get('ZUUL_REMOTE_IPV4')
diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py
index 720261cb2..d33b4dda0 100644
--- a/zuul/ansible/base/callback/zuul_stream.py
+++ b/zuul/ansible/base/callback/zuul_stream.py
@@ -48,6 +48,7 @@ from zuul.ansible import paths
from zuul.ansible import logconfig
LOG_STREAM_PORT = int(os.environ.get("ZUUL_CONSOLE_PORT", 19885))
+LOG_STREAM_VERSION = 0
def zuul_filter_result(result):
@@ -103,6 +104,7 @@ class CallbackModule(default.CallbackModule):
self._items_done = False
self._deferred_result = None
self._playbook_name = None
+ self._zuul_console_version = 0
def configure_logger(self):
# ansible appends timestamp, user and pid to the log lines emitted
@@ -129,9 +131,7 @@ class CallbackModule(default.CallbackModule):
else:
self._display.display(msg)
- def _read_log(self, host, ip, port, log_id, task_name, hosts):
- self._log("[%s] Starting to log %s for task %s"
- % (host, log_id, task_name), job=False, executor=True)
+ def _read_log_connect(self, host, ip, port):
logger_retries = 0
while True:
try:
@@ -141,6 +141,7 @@ class CallbackModule(default.CallbackModule):
# logs continously. Without this we can easily trip the 5
# second timeout.
s.settimeout(None)
+ return s
except socket.timeout:
self._log(
"Timeout exception waiting for the logger. "
@@ -151,7 +152,7 @@ class CallbackModule(default.CallbackModule):
"Timeout exception waiting for the logger. "
"Please check connectivity to [%s:%s]"
% (ip, port))
- return
+ return None
except Exception:
if logger_retries % 10 == 0:
self._log("[%s] Waiting on logger" % host,
@@ -159,31 +160,67 @@ class CallbackModule(default.CallbackModule):
logger_retries += 1
time.sleep(0.1)
continue
- msg = "%s\n" % log_id
- s.send(msg.encode("utf-8"))
- buff = s.recv(4096)
- buffering = True
- while buffering:
- if b'\n' in buff:
- (line, buff) = buff.split(b'\n', 1)
- # We can potentially get binary data here. In order to
- # being able to handle that use the backslashreplace
- # error handling method. This decodes unknown utf-8
- # code points to escape sequences which exactly represent
- # the correct data without throwing a decoding exception.
- done = self._log_streamline(
- host, line.decode("utf-8", "backslashreplace"))
- if done:
- return
+
+ def _read_log(self, host, ip, port, log_id, task_name, hosts):
+ self._log("[%s] Starting to log %s for task %s"
+ % (host, log_id, task_name), job=False, executor=True)
+
+ s = self._read_log_connect(host, ip, port)
+ if s is None:
+ # Can't connect; _read_log_connect() already logged an
+ # error for us, just bail
+ return
+
+ # Find out what version we are running against
+ s.send(f'v:{LOG_STREAM_VERSION}\n'.encode('utf-8'))
+ buff = s.recv(1024).decode('utf-8').strip()
+
+ # NOTE(ianw) 2022-07-21 : zuul_console from < 6.3.0 do not
+ # understand this protocol. They will assume the send
+ # above is a log request and send back the not found
+ # message in a loop. So to handle this we disconnect and
+ # reconnect. When we decide to remove this, we can remove
+ # anything in the "version 0" path.
+ if buff == '[Zuul] Log not found':
+ s.close()
+ s = self._read_log_connect(host, ip, port)
+ if s is None:
+ return
+ else:
+ self._zuul_console_version = int(buff)
+ self._log('[%s] Reports streaming version: %d' %
+ (host, self._zuul_console_version),
+ job=False, executor=True)
+
+ if self._zuul_console_version >= 1:
+ msg = 's:%s\n' % log_id
+ else:
+ msg = '%s\n' % log_id
+
+ s.send(msg.encode("utf-8"))
+ buff = s.recv(4096)
+ buffering = True
+ while buffering:
+ if b'\n' in buff:
+ (line, buff) = buff.split(b'\n', 1)
+ # We can potentially get binary data here. In order to
+ # being able to handle that use the backslashreplace
+ # error handling method. This decodes unknown utf-8
+ # code points to escape sequences which exactly represent
+ # the correct data without throwing a decoding exception.
+ done = self._log_streamline(
+ host, line.decode("utf-8", "backslashreplace"))
+ if done:
+ return
+ else:
+ more = s.recv(4096)
+ if not more:
+ buffering = False
else:
- more = s.recv(4096)
- if not more:
- buffering = False
- else:
- buff += more
- if buff:
- self._log_streamline(
- host, buff.decode("utf-8", "backslashreplace"))
+ buff += more
+ if buff:
+ self._log_streamline(
+ host, buff.decode("utf-8", "backslashreplace"))
def _log_streamline(self, host, line):
if "[Zuul] Task exit code" in line:
diff --git a/zuul/ansible/base/library/zuul_console.py b/zuul/ansible/base/library/zuul_console.py
index 9dffbbc3a..bd23d5f49 100755
--- a/zuul/ansible/base/library/zuul_console.py
+++ b/zuul/ansible/base/library/zuul_console.py
@@ -24,6 +24,14 @@ import subprocess
import threading
import time
+# This is the version we report to the zuul_stream callback. It is
+# expected that this (zuul_console) process can be long-lived, so if
+# there are updates this ensures a later streaming callback can still
+# talk to us.
+ZUUL_CONSOLE_PROTO_VERSION = 1
+# This is the template for the file name of the log-file written out
+# by the command.py override command in the executor's Ansible
+# install.
LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log'
LOG_STREAM_PORT = 19885
@@ -162,15 +170,38 @@ class Server(object):
ret = buffer.decode('utf-8')
x = ret.find('\n')
if x > 0:
- return ret[:x]
+ return ret[:x].strip()
except UnicodeDecodeError:
pass
def handleOneConnection(self, conn):
- log_uuid = self.get_command(conn)
+ # V1 protocol
+ # -----------
+ # v:<ver> get version number, <ver> is remote version
+ # s:<uuid> send logs for <uuid>
+ while True:
+ command = self.get_command(conn)
+ if command.startswith('v:'):
+ # NOTE(ianw) : remote sends its version. We currently
+ # don't have anything to do with this value, so ignore
+ # for now.
+ conn.send(f'{ZUUL_CONSOLE_PROTO_VERSION}\n'.encode('utf-8'))
+ continue
+ elif command.startswith('s:'):
+ log_uuid = command[2:]
+ break
+ else:
+ # NOTE(ianw): 2022-07-21 In releases < 6.3.0 the streaming
+ # side would just send a raw uuid and nothing else; so by
+ # default assume that is what is coming in here. We can
+ # remove this fallback when we decide it is no longer
+ # necessary.
+ log_uuid = command
+ break
+
# use path split to make use the input isn't trying to be clever
# and construct some path like /tmp/console-/../../something
- log_uuid = os.path.split(log_uuid.rstrip())[-1]
+ log_uuid = os.path.split(log_uuid)[-1]
# FIXME: this won't notice disconnects until it tries to send
console = None