summaryrefslogtreecommitdiff
path: root/zuul
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-08-12 08:04:35 +0000
committerGerrit Code Review <review@openstack.org>2022-08-12 08:04:35 +0000
commit73a7d9c1459468298ca22b4ad8096893669c71ec (patch)
tree28f6cee0494b24b8f83e80696a297d0d169e408c /zuul
parentd35b9f8c73df7fd84251d225f15f3cd22388690c (diff)
parentc1b2fa55988b3f22b392086fe003bde2444ebbdc (diff)
downloadzuul-73a7d9c1459468298ca22b4ad8096893669c71ec.tar.gz
Merge "zuul-stream: implement a protocol and version"
Diffstat (limited to 'zuul')
-rw-r--r--zuul/ansible/base/callback/zuul_stream.py93
-rwxr-xr-xzuul/ansible/base/library/zuul_console.py37
2 files changed, 99 insertions, 31 deletions
diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py
index 720261cb2..d33b4dda0 100644
--- a/zuul/ansible/base/callback/zuul_stream.py
+++ b/zuul/ansible/base/callback/zuul_stream.py
@@ -48,6 +48,7 @@ from zuul.ansible import paths
from zuul.ansible import logconfig
LOG_STREAM_PORT = int(os.environ.get("ZUUL_CONSOLE_PORT", 19885))
+LOG_STREAM_VERSION = 0
def zuul_filter_result(result):
@@ -103,6 +104,7 @@ class CallbackModule(default.CallbackModule):
self._items_done = False
self._deferred_result = None
self._playbook_name = None
+ self._zuul_console_version = 0
def configure_logger(self):
# ansible appends timestamp, user and pid to the log lines emitted
@@ -129,9 +131,7 @@ class CallbackModule(default.CallbackModule):
else:
self._display.display(msg)
- def _read_log(self, host, ip, port, log_id, task_name, hosts):
- self._log("[%s] Starting to log %s for task %s"
- % (host, log_id, task_name), job=False, executor=True)
+ def _read_log_connect(self, host, ip, port):
logger_retries = 0
while True:
try:
@@ -141,6 +141,7 @@ class CallbackModule(default.CallbackModule):
# logs continously. Without this we can easily trip the 5
# second timeout.
s.settimeout(None)
+ return s
except socket.timeout:
self._log(
"Timeout exception waiting for the logger. "
@@ -151,7 +152,7 @@ class CallbackModule(default.CallbackModule):
"Timeout exception waiting for the logger. "
"Please check connectivity to [%s:%s]"
% (ip, port))
- return
+ return None
except Exception:
if logger_retries % 10 == 0:
self._log("[%s] Waiting on logger" % host,
@@ -159,31 +160,67 @@ class CallbackModule(default.CallbackModule):
logger_retries += 1
time.sleep(0.1)
continue
- msg = "%s\n" % log_id
- s.send(msg.encode("utf-8"))
- buff = s.recv(4096)
- buffering = True
- while buffering:
- if b'\n' in buff:
- (line, buff) = buff.split(b'\n', 1)
- # We can potentially get binary data here. In order to
- # being able to handle that use the backslashreplace
- # error handling method. This decodes unknown utf-8
- # code points to escape sequences which exactly represent
- # the correct data without throwing a decoding exception.
- done = self._log_streamline(
- host, line.decode("utf-8", "backslashreplace"))
- if done:
- return
+
+ def _read_log(self, host, ip, port, log_id, task_name, hosts):
+ self._log("[%s] Starting to log %s for task %s"
+ % (host, log_id, task_name), job=False, executor=True)
+
+ s = self._read_log_connect(host, ip, port)
+ if s is None:
+ # Can't connect; _read_log_connect() already logged an
+ # error for us, just bail
+ return
+
+ # Find out what version we are running against
+ s.send(f'v:{LOG_STREAM_VERSION}\n'.encode('utf-8'))
+ buff = s.recv(1024).decode('utf-8').strip()
+
+ # NOTE(ianw) 2022-07-21 : zuul_console from < 6.3.0 do not
+ # understand this protocol. They will assume the send
+ # above is a log request and send back the not found
+ # message in a loop. So to handle this we disconnect and
+ # reconnect. When we decide to remove this, we can remove
+ # anything in the "version 0" path.
+ if buff == '[Zuul] Log not found':
+ s.close()
+ s = self._read_log_connect(host, ip, port)
+ if s is None:
+ return
+ else:
+ self._zuul_console_version = int(buff)
+ self._log('[%s] Reports streaming version: %d' %
+ (host, self._zuul_console_version),
+ job=False, executor=True)
+
+ if self._zuul_console_version >= 1:
+ msg = 's:%s\n' % log_id
+ else:
+ msg = '%s\n' % log_id
+
+ s.send(msg.encode("utf-8"))
+ buff = s.recv(4096)
+ buffering = True
+ while buffering:
+ if b'\n' in buff:
+ (line, buff) = buff.split(b'\n', 1)
+ # We can potentially get binary data here. In order to
+ # being able to handle that use the backslashreplace
+ # error handling method. This decodes unknown utf-8
+ # code points to escape sequences which exactly represent
+ # the correct data without throwing a decoding exception.
+ done = self._log_streamline(
+ host, line.decode("utf-8", "backslashreplace"))
+ if done:
+ return
+ else:
+ more = s.recv(4096)
+ if not more:
+ buffering = False
else:
- more = s.recv(4096)
- if not more:
- buffering = False
- else:
- buff += more
- if buff:
- self._log_streamline(
- host, buff.decode("utf-8", "backslashreplace"))
+ buff += more
+ if buff:
+ self._log_streamline(
+ host, buff.decode("utf-8", "backslashreplace"))
def _log_streamline(self, host, line):
if "[Zuul] Task exit code" in line:
diff --git a/zuul/ansible/base/library/zuul_console.py b/zuul/ansible/base/library/zuul_console.py
index 9dffbbc3a..bd23d5f49 100755
--- a/zuul/ansible/base/library/zuul_console.py
+++ b/zuul/ansible/base/library/zuul_console.py
@@ -24,6 +24,14 @@ import subprocess
import threading
import time
+# This is the version we report to the zuul_stream callback. It is
+# expected that this (zuul_console) process can be long-lived, so if
+# there are updates this ensures a later streaming callback can still
+# talk to us.
+ZUUL_CONSOLE_PROTO_VERSION = 1
+# This is the template for the file name of the log-file written out
+# by the command.py override command in the executor's Ansible
+# install.
LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log'
LOG_STREAM_PORT = 19885
@@ -162,15 +170,38 @@ class Server(object):
ret = buffer.decode('utf-8')
x = ret.find('\n')
if x > 0:
- return ret[:x]
+ return ret[:x].strip()
except UnicodeDecodeError:
pass
def handleOneConnection(self, conn):
- log_uuid = self.get_command(conn)
+ # V1 protocol
+ # -----------
+ # v:<ver> get version number, <ver> is remote version
+ # s:<uuid> send logs for <uuid>
+ while True:
+ command = self.get_command(conn)
+ if command.startswith('v:'):
+ # NOTE(ianw) : remote sends its version. We currently
+ # don't have anything to do with this value, so ignore
+ # for now.
+ conn.send(f'{ZUUL_CONSOLE_PROTO_VERSION}\n'.encode('utf-8'))
+ continue
+ elif command.startswith('s:'):
+ log_uuid = command[2:]
+ break
+ else:
+ # NOTE(ianw): 2022-07-21 In releases < 6.3.0 the streaming
+ # side would just send a raw uuid and nothing else; so by
+ # default assume that is what is coming in here. We can
+ # remove this fallback when we decide it is no longer
+ # necessary.
+ log_uuid = command
+ break
+
# use path split to make use the input isn't trying to be clever
# and construct some path like /tmp/console-/../../something
- log_uuid = os.path.split(log_uuid.rstrip())[-1]
+ log_uuid = os.path.split(log_uuid)[-1]
# FIXME: this won't notice disconnects until it tries to send
console = None