summaryrefslogtreecommitdiff
path: root/zuul/ansible
diff options
context:
space:
mode:
authorIan Wienand <iwienand@redhat.com>2022-07-21 11:04:29 +1000
committerIan Wienand <iwienand@redhat.com>2022-08-09 17:04:40 +1000
commitc1b2fa55988b3f22b392086fe003bde2444ebbdc (patch)
tree49fea428ab058b7b4b12b10f7cb8f05c124dd151 /zuul/ansible
parent0c0ec6e5802259bfc7da1b75a9e5d574c2227002 (diff)
downloadzuul-c1b2fa55988b3f22b392086fe003bde2444ebbdc.tar.gz
zuul-stream: implement a protocol and version
A refresher on how this works, to the best of my knowledge 1 Firstly, Zuul's Ansible has a library task "zuul_console:" which is run against the remote node; this forks a console daemon, listening on a default port. 2 We have a action plugin that runs for each task, and if that task is a command/shell task, assigns it a unique id 3 We then override with library/command.py (which backs command/shell tasks) with a version that forks and runs the process on the target node as usual, but also saves the stdout/stderr output to a temporary file named per the unique uuid from the step above. 4 At the same time we have the callback plugin zuul_stream.py, which Ansible is calling as it moves through starting, running and finishing the tasks. This looks at the task, and if it has a UUID [2], sends a request to the zuul_console [1], which opens the temporary file [3] and starts streaming it back. 5 We loop reading this until the connection is closed by [1], eventually outputting each line. In this way, the console log is effectively streamed and saved into our job output. We have established that we expect the console [1] is updated asynchronously to the command/streaming [3,4] in situation such as static nodes. This poses a problem if we ever want to update either part -- for example we can not change the file-name that the command.py file logs to, because an old zuul_console: will not know to open the new file. You could imagine other fantasy things you might like to do; e.g. negotiate compression etc. that would have similar issues. To provide the flexibility for these types of changes, implement a simple protocol where the zuul_stream and zuul_console sides exchange their respective version numbers before sending the log files. This way they can both decide what operations are compatible both ways. Luckily the extant protocol, which is really just sending a plain uuid, can be adapted to this. When an old zuul_console server gets the protocol request it will just look like an invalid log file, which zuul_stream can handle and thus assume the remote end doesn't know about protocols. This bumps the testing timeout; it seems that the extra calls make for random failures. The failures are random and not in the same place, I've run this separately in 850719 several times and not seen any problems with the higher timeout. This test is already has a settle timeout slightly higher so I think it must have just been on the edge. Change-Id: Ief366c092e05fb88351782f6d9cd280bfae96237
Diffstat (limited to 'zuul/ansible')
-rw-r--r--zuul/ansible/base/callback/zuul_stream.py93
-rwxr-xr-xzuul/ansible/base/library/zuul_console.py37
2 files changed, 99 insertions, 31 deletions
diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py
index 720261cb2..d33b4dda0 100644
--- a/zuul/ansible/base/callback/zuul_stream.py
+++ b/zuul/ansible/base/callback/zuul_stream.py
@@ -48,6 +48,7 @@ from zuul.ansible import paths
from zuul.ansible import logconfig
LOG_STREAM_PORT = int(os.environ.get("ZUUL_CONSOLE_PORT", 19885))
+LOG_STREAM_VERSION = 0
def zuul_filter_result(result):
@@ -103,6 +104,7 @@ class CallbackModule(default.CallbackModule):
self._items_done = False
self._deferred_result = None
self._playbook_name = None
+ self._zuul_console_version = 0
def configure_logger(self):
# ansible appends timestamp, user and pid to the log lines emitted
@@ -129,9 +131,7 @@ class CallbackModule(default.CallbackModule):
else:
self._display.display(msg)
- def _read_log(self, host, ip, port, log_id, task_name, hosts):
- self._log("[%s] Starting to log %s for task %s"
- % (host, log_id, task_name), job=False, executor=True)
+ def _read_log_connect(self, host, ip, port):
logger_retries = 0
while True:
try:
@@ -141,6 +141,7 @@ class CallbackModule(default.CallbackModule):
# logs continously. Without this we can easily trip the 5
# second timeout.
s.settimeout(None)
+ return s
except socket.timeout:
self._log(
"Timeout exception waiting for the logger. "
@@ -151,7 +152,7 @@ class CallbackModule(default.CallbackModule):
"Timeout exception waiting for the logger. "
"Please check connectivity to [%s:%s]"
% (ip, port))
- return
+ return None
except Exception:
if logger_retries % 10 == 0:
self._log("[%s] Waiting on logger" % host,
@@ -159,31 +160,67 @@ class CallbackModule(default.CallbackModule):
logger_retries += 1
time.sleep(0.1)
continue
- msg = "%s\n" % log_id
- s.send(msg.encode("utf-8"))
- buff = s.recv(4096)
- buffering = True
- while buffering:
- if b'\n' in buff:
- (line, buff) = buff.split(b'\n', 1)
- # We can potentially get binary data here. In order to
- # being able to handle that use the backslashreplace
- # error handling method. This decodes unknown utf-8
- # code points to escape sequences which exactly represent
- # the correct data without throwing a decoding exception.
- done = self._log_streamline(
- host, line.decode("utf-8", "backslashreplace"))
- if done:
- return
+
+ def _read_log(self, host, ip, port, log_id, task_name, hosts):
+ self._log("[%s] Starting to log %s for task %s"
+ % (host, log_id, task_name), job=False, executor=True)
+
+ s = self._read_log_connect(host, ip, port)
+ if s is None:
+ # Can't connect; _read_log_connect() already logged an
+ # error for us, just bail
+ return
+
+ # Find out what version we are running against
+ s.send(f'v:{LOG_STREAM_VERSION}\n'.encode('utf-8'))
+ buff = s.recv(1024).decode('utf-8').strip()
+
+ # NOTE(ianw) 2022-07-21 : zuul_console from < 6.3.0 do not
+ # understand this protocol. They will assume the send
+ # above is a log request and send back the not found
+ # message in a loop. So to handle this we disconnect and
+ # reconnect. When we decide to remove this, we can remove
+ # anything in the "version 0" path.
+ if buff == '[Zuul] Log not found':
+ s.close()
+ s = self._read_log_connect(host, ip, port)
+ if s is None:
+ return
+ else:
+ self._zuul_console_version = int(buff)
+ self._log('[%s] Reports streaming version: %d' %
+ (host, self._zuul_console_version),
+ job=False, executor=True)
+
+ if self._zuul_console_version >= 1:
+ msg = 's:%s\n' % log_id
+ else:
+ msg = '%s\n' % log_id
+
+ s.send(msg.encode("utf-8"))
+ buff = s.recv(4096)
+ buffering = True
+ while buffering:
+ if b'\n' in buff:
+ (line, buff) = buff.split(b'\n', 1)
+ # We can potentially get binary data here. In order to
+ # being able to handle that use the backslashreplace
+ # error handling method. This decodes unknown utf-8
+ # code points to escape sequences which exactly represent
+ # the correct data without throwing a decoding exception.
+ done = self._log_streamline(
+ host, line.decode("utf-8", "backslashreplace"))
+ if done:
+ return
+ else:
+ more = s.recv(4096)
+ if not more:
+ buffering = False
else:
- more = s.recv(4096)
- if not more:
- buffering = False
- else:
- buff += more
- if buff:
- self._log_streamline(
- host, buff.decode("utf-8", "backslashreplace"))
+ buff += more
+ if buff:
+ self._log_streamline(
+ host, buff.decode("utf-8", "backslashreplace"))
def _log_streamline(self, host, line):
if "[Zuul] Task exit code" in line:
diff --git a/zuul/ansible/base/library/zuul_console.py b/zuul/ansible/base/library/zuul_console.py
index 9dffbbc3a..bd23d5f49 100755
--- a/zuul/ansible/base/library/zuul_console.py
+++ b/zuul/ansible/base/library/zuul_console.py
@@ -24,6 +24,14 @@ import subprocess
import threading
import time
+# This is the version we report to the zuul_stream callback. It is
+# expected that this (zuul_console) process can be long-lived, so if
+# there are updates this ensures a later streaming callback can still
+# talk to us.
+ZUUL_CONSOLE_PROTO_VERSION = 1
+# This is the template for the file name of the log-file written out
+# by the command.py override command in the executor's Ansible
+# install.
LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log'
LOG_STREAM_PORT = 19885
@@ -162,15 +170,38 @@ class Server(object):
ret = buffer.decode('utf-8')
x = ret.find('\n')
if x > 0:
- return ret[:x]
+ return ret[:x].strip()
except UnicodeDecodeError:
pass
def handleOneConnection(self, conn):
- log_uuid = self.get_command(conn)
+ # V1 protocol
+ # -----------
+ # v:<ver> get version number, <ver> is remote version
+ # s:<uuid> send logs for <uuid>
+ while True:
+ command = self.get_command(conn)
+ if command.startswith('v:'):
+ # NOTE(ianw) : remote sends its version. We currently
+ # don't have anything to do with this value, so ignore
+ # for now.
+ conn.send(f'{ZUUL_CONSOLE_PROTO_VERSION}\n'.encode('utf-8'))
+ continue
+ elif command.startswith('s:'):
+ log_uuid = command[2:]
+ break
+ else:
+ # NOTE(ianw): 2022-07-21 In releases < 6.3.0 the streaming
+ # side would just send a raw uuid and nothing else; so by
+ # default assume that is what is coming in here. We can
+ # remove this fallback when we decide it is no longer
+ # necessary.
+ log_uuid = command
+ break
+
# use path split to make use the input isn't trying to be clever
# and construct some path like /tmp/console-/../../something
- log_uuid = os.path.split(log_uuid.rstrip())[-1]
+ log_uuid = os.path.split(log_uuid)[-1]
# FIXME: this won't notice disconnects until it tries to send
console = None