summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Dockerfile2
-rw-r--r--releasenotes/notes/kubectl-port-forward-93de41c7c0b107dc.yaml11
-rwxr-xr-xtests/fixtures/fake_kubectl.sh7
-rw-r--r--tests/unit/test_v3.py4
-rw-r--r--zuul/ansible/base/callback/zuul_stream.py22
-rw-r--r--zuul/executor/server.py100
6 files changed, 133 insertions, 13 deletions
diff --git a/Dockerfile b/Dockerfile
index e1f305184..a1d463634 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -44,7 +44,7 @@ FROM opendevorg/python-base as zuul
COPY --from=builder /output/ /output
RUN echo "deb http://ftp.debian.org/debian stretch-backports main" >> /etc/apt/sources.list \
&& apt-get update \
- && apt-get install -t stretch-backports -y bubblewrap \
+ && apt-get install -t stretch-backports -y bubblewrap socat \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN /output/install-from-bindep \
diff --git a/releasenotes/notes/kubectl-port-forward-93de41c7c0b107dc.yaml b/releasenotes/notes/kubectl-port-forward-93de41c7c0b107dc.yaml
new file mode 100644
index 000000000..f59ac3c9d
--- /dev/null
+++ b/releasenotes/notes/kubectl-port-forward-93de41c7c0b107dc.yaml
@@ -0,0 +1,11 @@
+---
+upgrade:
+ - |
+ Kubectl and socat must now be installed on Zuul executors if using
+ Kubernetes or OpenShift `pod` resources from Nodepool. Additionally,
+ Nodepool version 3.12.0 or later is required.
+fixes:
+ - |
+ Previously, no output from shell or command tasks on pods was placed
+ in the job output; that has been corrected and streaming output is
+ now available.
diff --git a/tests/fixtures/fake_kubectl.sh b/tests/fixtures/fake_kubectl.sh
new file mode 100755
index 000000000..7395a582d
--- /dev/null
+++ b/tests/fixtures/fake_kubectl.sh
@@ -0,0 +1,7 @@
+#!/bin/sh
+
+echo "Forwarding from 127.0.0.1:1234 -> 19885"
+
+while true; do
+ sleep 5
+done
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index b5e62e239..8cffbc815 100644
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -5507,6 +5507,10 @@ class TestContainerJobs(AnsibleZuulTestCase):
tenant_config_file = "config/container-build-resources/main.yaml"
def test_container_jobs(self):
+ self.patch(zuul.executor.server.KubeFwd,
+ 'kubectl_command',
+ os.path.join(FIXTURE_DIR, 'fake_kubectl.sh'))
+
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py
index 5b2206e14..ca34dc2ab 100644
--- a/zuul/ansible/base/callback/zuul_stream.py
+++ b/zuul/ansible/base/callback/zuul_stream.py
@@ -129,12 +129,12 @@ class CallbackModule(default.CallbackModule):
else:
self._display.display(msg)
- def _read_log(self, host, ip, log_id, task_name, hosts):
+ def _read_log(self, host, ip, port, log_id, task_name, hosts):
self._log("[%s] Starting to log %s for task %s"
% (host, log_id, task_name), job=False, executor=True)
while True:
try:
- s = socket.create_connection((ip, LOG_STREAM_PORT), 5)
+ s = socket.create_connection((ip, port), 5)
# Disable the socket timeout after we have successfully
# connected to accomodate the fact that jobs may not be writing
# logs continously. Without this we can easily trip the 5
@@ -144,12 +144,12 @@ class CallbackModule(default.CallbackModule):
self._log(
"Timeout exception waiting for the logger. "
"Please check connectivity to [%s:%s]"
- % (ip, LOG_STREAM_PORT), executor=True)
+ % (ip, port), executor=True)
self._log_streamline(
"localhost",
"Timeout exception waiting for the logger. "
"Please check connectivity to [%s:%s]"
- % (ip, LOG_STREAM_PORT))
+ % (ip, port))
return
except Exception:
self._log("[%s] Waiting on logger" % host,
@@ -254,6 +254,7 @@ class CallbackModule(default.CallbackModule):
hosts = self._get_task_hosts(task)
for host, inventory_hostname in hosts:
+ port = LOG_STREAM_PORT
if host in ('localhost', '127.0.0.1'):
# Don't try to stream from localhost
continue
@@ -267,14 +268,21 @@ class CallbackModule(default.CallbackModule):
# Don't try to stream from loops
continue
if play_vars[host].get('ansible_connection') in ('kubectl', ):
- # Don't try to stream from kubectl connection
- continue
+ # Stream from the forwarded port on kubectl conns
+ port = play_vars[host]['zuul']['resources'][
+ inventory_hostname].get('stream_port')
+ if port is None:
+ self._log("[Zuul] Kubectl and socat must be installed "
+ "on the Zuul executor for streaming output "
+ "from pods")
+ continue
+ ip = '127.0.0.1'
log_id = "%s-%s" % (
task._uuid, paths._sanitize_filename(inventory_hostname))
streamer = threading.Thread(
target=self._read_log, args=(
- host, ip, log_id, task_name, hosts))
+ host, ip, port, log_id, task_name, hosts))
streamer.daemon = True
streamer.start()
self._streamers.append(streamer)
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index e8f5717f6..d0ce36a31 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -29,6 +29,7 @@ import threading
import time
import traceback
from concurrent.futures.process import ProcessPoolExecutor, BrokenProcessPool
+import re
import git
from urllib.parse import urlsplit
@@ -311,6 +312,70 @@ class SshAgent(object):
return result
+class KubeFwd(object):
+ kubectl_command = 'kubectl'
+
+ def __init__(self, zuul_event_id, build, kubeconfig, context,
+ namespace, pod):
+ self.port = None
+ self.fwd = None
+ self.log = get_annotated_logger(
+ logging.getLogger("zuul.ExecutorServer"),
+ zuul_event_id, build=build)
+ self.kubeconfig = kubeconfig
+ self.context = context
+ self.namespace = namespace
+ self.pod = pod
+
+ def start(self):
+ if self.fwd:
+ return
+ with open('/dev/null', 'r+') as devnull:
+ fwd = subprocess.Popen(
+ [self.kubectl_command, '--kubeconfig=%s' % self.kubeconfig,
+ '--context=%s' % self.context,
+ '-n', self.namespace,
+ 'port-forward',
+ '--address', '127.0.0.1',
+ 'pod/%s' % self.pod, ':19885'],
+ close_fds=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ stdin=devnull)
+ line = fwd.stdout.readline().decode('utf8')
+ m = re.match(r'^Forwarding from 127.0.0.1:(\d+) -> 19885', line)
+ if m:
+ self.port = m.group(1)
+ else:
+ try:
+ fwd.kill()
+ except Exception:
+ pass
+ raise Exception("Unable to start kubectl port forward")
+ self.fwd = fwd
+ self.log.info('Started Kubectl port forward on port {}'.format(
+ self.port))
+
+ def stop(self):
+ try:
+ if self.fwd:
+ self.fwd.kill()
+ self.fwd.wait()
+ self.fwd = None
+ except Exception:
+ self.log.exception('Unable to stop kubectl port-forward:')
+
+ def __del__(self):
+ try:
+ self.stop()
+ except Exception:
+ self.log.exception('Exception in KubeFwd destructor')
+ try:
+ super().__del__(self)
+ except AttributeError:
+ pass
+
+
class JobDirPlaybook(object):
def __init__(self, root):
self.root = root
@@ -369,6 +434,8 @@ class JobDir(object):
# work (mounted in bwrap read-write)
# .ssh
# known_hosts
+ # .kube
+ # config
# src
# <git.example.com>
# <project>
@@ -403,6 +470,9 @@ class JobDir(object):
os.makedirs(self.untrusted_root)
ssh_dir = os.path.join(self.work_root, '.ssh')
os.mkdir(ssh_dir, 0o700)
+ kube_dir = os.path.join(self.work_root, ".kube")
+ os.makedirs(kube_dir)
+ self.kubeconfig = os.path.join(kube_dir, "config")
# Create ansible cache directory
self.ansible_cache_root = os.path.join(self.root, '.ansible')
self.fact_cache = os.path.join(self.ansible_cache_root, 'fact-cache')
@@ -758,7 +828,7 @@ class AnsibleJob(object):
'winrm_read_timeout_sec')
self.ssh_agent = SshAgent(zuul_event_id=self.zuul_event_id,
build=self.job.unique)
-
+ self.port_forwards = []
self.executor_variables_file = None
self.cpu_times = {'user': 0, 'system': 0,
@@ -873,6 +943,11 @@ class AnsibleJob(object):
self.ssh_agent.stop()
except Exception:
self.log.exception("Error stopping SSH agent:")
+ for fwd in self.port_forwards:
+ try:
+ fwd.stop()
+ except Exception:
+ self.log.exception("Error stopping port forward:")
try:
self.executor_server.finishJob(self.job.unique)
except Exception:
@@ -1780,12 +1855,11 @@ class AnsibleJob(object):
self.log.debug("Adding role path %s", role_path)
jobdir_playbook.roles_path.append(role_path)
- def prepareKubeConfig(self, data):
- kube_cfg_path = os.path.join(self.jobdir.work_root, ".kube", "config")
+ def prepareKubeConfig(self, jobdir, data):
+ kube_cfg_path = jobdir.kubeconfig
if os.path.exists(kube_cfg_path):
kube_cfg = yaml.safe_load(open(kube_cfg_path))
else:
- os.makedirs(os.path.dirname(kube_cfg_path), exist_ok=True)
kube_cfg = {
'apiVersion': 'v1',
'kind': 'Config',
@@ -1854,7 +1928,7 @@ class AnsibleJob(object):
# TODO: decrypt resource data using scheduler key
data = node['connection_port']
# Setup kube/config file
- self.prepareKubeConfig(data)
+ self.prepareKubeConfig(self.jobdir, data)
# Convert connection_port in kubectl connection parameters
node['connection_port'] = None
node['kubectl_namespace'] = data['namespace']
@@ -1871,6 +1945,22 @@ class AnsibleJob(object):
# Add the real pod name to the resources_var
all_vars['zuul']['resources'][
node['name'][0]]['pod'] = data['pod']
+ fwd = KubeFwd(zuul_event_id=self.zuul_event_id,
+ build=self.job.unique,
+ kubeconfig=self.jobdir.kubeconfig,
+ context=data['context_name'],
+ namespace=data['namespace'],
+ pod=data['pod'])
+ try:
+ fwd.start()
+ self.port_forwards.append(fwd)
+ all_vars['zuul']['resources'][
+ node['name'][0]]['stream_port'] = fwd.port
+ except Exception:
+ self.log.exception("Unable to start port forward:")
+ self.log.error("Kubectl and socat are required for "
+ "streaming logs")
+
# Remove resource node from nodes list
for node in resources_nodes:
args['nodes'].remove(node)