summaryrefslogtreecommitdiff
path: root/zuul
diff options
context:
space:
mode:
Diffstat (limited to 'zuul')
-rw-r--r--zuul/ansible/base/callback/zuul_stream.py103
-rwxr-xr-xzuul/ansible/base/library/zuul_console.py50
2 files changed, 121 insertions, 32 deletions
diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py
index 720261cb2..f31983ed6 100644
--- a/zuul/ansible/base/callback/zuul_stream.py
+++ b/zuul/ansible/base/callback/zuul_stream.py
@@ -48,6 +48,7 @@ from zuul.ansible import paths
from zuul.ansible import logconfig
LOG_STREAM_PORT = int(os.environ.get("ZUUL_CONSOLE_PORT", 19885))
+LOG_STREAM_VERSION = 0
def zuul_filter_result(result):
@@ -103,6 +104,7 @@ class CallbackModule(default.CallbackModule):
self._items_done = False
self._deferred_result = None
self._playbook_name = None
+ self._zuul_console_version = 0
def configure_logger(self):
# ansible appends timestamp, user and pid to the log lines emitted
@@ -129,9 +131,7 @@ class CallbackModule(default.CallbackModule):
else:
self._display.display(msg)
- def _read_log(self, host, ip, port, log_id, task_name, hosts):
- self._log("[%s] Starting to log %s for task %s"
- % (host, log_id, task_name), job=False, executor=True)
+ def _read_log_connect(self, host, ip, port):
logger_retries = 0
while True:
try:
@@ -141,6 +141,7 @@ class CallbackModule(default.CallbackModule):
# logs continously. Without this we can easily trip the 5
# second timeout.
s.settimeout(None)
+ return s
except socket.timeout:
self._log(
"Timeout exception waiting for the logger. "
@@ -151,7 +152,7 @@ class CallbackModule(default.CallbackModule):
"Timeout exception waiting for the logger. "
"Please check connectivity to [%s:%s]"
% (ip, port))
- return
+ return None
except Exception:
if logger_retries % 10 == 0:
self._log("[%s] Waiting on logger" % host,
@@ -159,31 +160,77 @@ class CallbackModule(default.CallbackModule):
logger_retries += 1
time.sleep(0.1)
continue
- msg = "%s\n" % log_id
- s.send(msg.encode("utf-8"))
- buff = s.recv(4096)
- buffering = True
- while buffering:
- if b'\n' in buff:
- (line, buff) = buff.split(b'\n', 1)
- # We can potentially get binary data here. In order to
- # being able to handle that use the backslashreplace
- # error handling method. This decodes unknown utf-8
- # code points to escape sequences which exactly represent
- # the correct data without throwing a decoding exception.
- done = self._log_streamline(
- host, line.decode("utf-8", "backslashreplace"))
- if done:
- return
+
+ def _read_log(self, host, ip, port, log_id, task_name, hosts):
+ self._log("[%s] Starting to log %s for task %s"
+ % (host, log_id, task_name), job=False, executor=True)
+
+ s = self._read_log_connect(host, ip, port)
+ if s is None:
+ # Can't connect; _read_log_connect() already logged an
+ # error for us, just bail
+ return
+
+ # Find out what version we are running against
+ s.send(f'v:{LOG_STREAM_VERSION}\n'.encode('utf-8'))
+ buff = s.recv(1024).decode('utf-8').strip()
+
+ # NOTE(ianw) 2022-07-21 : zuul_console from < 6.3.0 do not
+ # understand this protocol. They will assume the send
+ # above is a log request and send back the not found
+ # message in a loop. So to handle this we disconnect and
+ # reconnect. When we decide to remove this, we can remove
+ # anything in the "version 0" path.
+ if buff == '[Zuul] Log not found':
+ s.close()
+ s = self._read_log_connect(host, ip, port)
+ if s is None:
+ return
+ else:
+ self._zuul_console_version = int(buff)
+ self._log('[%s] Reports streaming version: %d' %
+ (host, self._zuul_console_version),
+ job=False, executor=True)
+
+ if self._zuul_console_version >= 1:
+ msg = 's:%s\n' % log_id
+ else:
+ msg = '%s\n' % log_id
+
+ s.send(msg.encode("utf-8"))
+ buff = s.recv(4096)
+ buffering = True
+ while buffering:
+ if b'\n' in buff:
+ (line, buff) = buff.split(b'\n', 1)
+ # We can potentially get binary data here. In order to
+ # being able to handle that use the backslashreplace
+ # error handling method. This decodes unknown utf-8
+ # code points to escape sequences which exactly represent
+ # the correct data without throwing a decoding exception.
+ done = self._log_streamline(
+ host, line.decode("utf-8", "backslashreplace"))
+ if done:
+ if self._zuul_console_version > 0:
+ try:
+ # reestablish connection and tell console to
+ # clean up
+ s = self._read_log_connect(host, ip, port)
+ s.send(f'f:{log_id}\n'.encode('utf-8'))
+ s.close()
+ except Exception:
+ # Don't worry if this fails
+ pass
+ return
+ else:
+ more = s.recv(4096)
+ if not more:
+ buffering = False
else:
- more = s.recv(4096)
- if not more:
- buffering = False
- else:
- buff += more
- if buff:
- self._log_streamline(
- host, buff.decode("utf-8", "backslashreplace"))
+ buff += more
+ if buff:
+ self._log_streamline(
+ host, buff.decode("utf-8", "backslashreplace"))
def _log_streamline(self, host, line):
if "[Zuul] Task exit code" in line:
diff --git a/zuul/ansible/base/library/zuul_console.py b/zuul/ansible/base/library/zuul_console.py
index 9dffbbc3a..aa999cac1 100755
--- a/zuul/ansible/base/library/zuul_console.py
+++ b/zuul/ansible/base/library/zuul_console.py
@@ -24,6 +24,14 @@ import subprocess
import threading
import time
+# This is the version we report to the zuul_stream callback. It is
+# expected that this (zuul_console) process can be long-lived, so if
+# there are updates this ensures a later streaming callback can still
+# talk to us.
+ZUUL_CONSOLE_PROTO_VERSION = 1
+# This is the template for the file name of the log-file written out
+# by the command.py override command in the executor's Ansible
+# install.
LOG_STREAM_FILE = '/tmp/console-{log_uuid}.log'
LOG_STREAM_PORT = 19885
@@ -162,15 +170,49 @@ class Server(object):
ret = buffer.decode('utf-8')
x = ret.find('\n')
if x > 0:
- return ret[:x]
+ return ret[:x].strip()
except UnicodeDecodeError:
pass
- def handleOneConnection(self, conn):
- log_uuid = self.get_command(conn)
+ def _clean_uuid(self, log_uuid):
# use path split to make use the input isn't trying to be clever
# and construct some path like /tmp/console-/../../something
- log_uuid = os.path.split(log_uuid.rstrip())[-1]
+ return os.path.split(log_uuid)[-1]
+
+ def handleOneConnection(self, conn):
+ # V1 protocol
+ # -----------
+ # v:<ver> get version number, <ver> is remote version
+ # s:<uuid> send logs for <uuid>
+ # f:<uuid> finalise/cleanup <uuid>
+ while True:
+ command = self.get_command(conn)
+ if command.startswith('v:'):
+ # NOTE(ianw) : remote sends its version. We currently
+ # don't have anything to do with this value, so ignore
+ # for now.
+ cmd = '%s\n' % (ZUUL_CONSOLE_PROTO_VERSION)
+ conn.send(cmd.encode('utf-8'))
+ continue
+ elif command.startswith('f:'):
+ log_uuid = self._clean_uuid(command[2:])
+ try:
+ os.unlink(self.path.format(log_uuid=log_uuid))
+ except Exception:
+ # something might have cleaned /tmp
+ pass
+ continue
+ elif command.startswith('s:'):
+ log_uuid = self._clean_uuid(command[2:])
+ break
+ else:
+ # NOTE(ianw): 2022-07-21 In releases < 6.3.0 the streaming
+ # side would just send a raw uuid and nothing else; so by
+ # default assume that is what is coming in here. We can
+ # remove this fallback when we decide it is no longer
+ # necessary.
+ log_uuid = self._clean_uuid(command)
+ break
# FIXME: this won't notice disconnects until it tries to send
console = None