summaryrefslogtreecommitdiff
path: root/zuul/lib/fingergw.py
diff options
context:
space:
mode:
authorTobias Henkel <tobias.henkel@bmw.de>2019-06-12 16:33:28 +0200
committerSimon Westphahl <simon.westphahl@bmw.de>2021-06-10 14:09:37 +0200
commit5c4e8d7ddddafc1f4964a0498dfcaa7ac28da09d (patch)
treefe4039affc5d15f9a797e76e5c914e84280678b1 /zuul/lib/fingergw.py
parent46d0ed8e8f4f6cb33decdd435b965a5088474fec (diff)
downloadzuul-5c4e8d7ddddafc1f4964a0498dfcaa7ac28da09d.tar.gz
Route streams to different zones via finger gateway
In some distributed deployments we need to route traffic via single entry points that need to dispatch the traffic. For this use case make all components aware of their zone so it is possible to compute if traffic needs to go via an intermediate finger gateway or not. Therefore we register the gearman function 'fingergw:info:<zone>' if the fingergw is zoned. That way the scheduler will be able to route streams from different zones via finger gateways that are responsible for their zone. Change-Id: I655427283205ea02de6f0f271b4aa5092ac05278
Diffstat (limited to 'zuul/lib/fingergw.py')
-rw-r--r--zuul/lib/fingergw.py115
1 files changed, 108 insertions, 7 deletions
diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py
index 022c6c6f3..334c27ffb 100644
--- a/zuul/lib/fingergw.py
+++ b/zuul/lib/fingergw.py
@@ -13,20 +13,26 @@
# under the License.
import functools
+import json
import logging
import socket
import threading
+import time
from configparser import ConfigParser
from typing import Optional
+import gear
+
import zuul.rpcclient
from zuul.lib import streamer_utils
from zuul.lib.commandsocket import CommandSocket
from zuul.lib.config import get_default
+from zuul.lib.gear_utils import getGearmanFunctions
+from zuul.lib.gearworker import ZuulGearWorker
+from zuul.rpcclient import RPCFailure
from zuul.zk import ZooKeeperClient
from zuul.zk.components import FingerGatewayComponent
-
COMMANDS = ['stop']
@@ -38,7 +44,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler):
log = logging.getLogger("zuul.fingergw")
def __init__(self, *args, **kwargs):
- self.rpc = kwargs.pop('rpc')
+ self.fingergw = kwargs.pop('fingergw')
+
super(RequestHandler, self).__init__(*args, **kwargs)
def _fingerClient(self, server, port, build_uuid):
@@ -72,7 +79,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler):
port = None
try:
build_uuid = self.getCommand()
- port_location = self.rpc.get_job_log_stream_address(build_uuid)
+ port_location = self.fingergw.rpc.get_job_log_stream_address(
+ build_uuid, source_zone=self.fingergw.zone)
if not port_location:
msg = 'Invalid build UUID %s' % build_uuid
@@ -104,6 +112,9 @@ class FingerGateway(object):
'''
log = logging.getLogger("zuul.fingergw")
+ handler_class = RequestHandler
+
+ gearworker: Optional[ZuulGearWorker]
def __init__(
self,
@@ -137,10 +148,12 @@ class FingerGateway(object):
self.gear_ssl_ca = gear_ssl_ca
host = get_default(config, 'fingergw', 'listen_address', '::')
- port = int(get_default(config, 'fingergw', 'port', 79))
+ self.port = int(get_default(config, 'fingergw', 'port', 79))
+ self.public_port = int(get_default(
+ config, 'fingergw', 'public_port', self.port))
user = get_default(config, 'fingergw', 'user', None)
- self.address = (host, port)
+ self.address = (host, self.port)
self.user = user
self.pid_file = pid_file
@@ -157,14 +170,40 @@ class FingerGateway(object):
stop=self.stop,
)
+ self.hostname = get_default(config, 'fingergw', 'hostname',
+ socket.getfqdn())
+ self.zone = get_default(config, 'fingergw', 'zone')
+
+ if self.zone is not None:
+ jobs = {
+ 'fingergw:info:%s' % self.zone: self.handle_info,
+ }
+ self.gearworker = ZuulGearWorker(
+ 'Finger Gateway',
+ 'zuul.fingergw',
+ 'fingergw-gearman-worker',
+ config,
+ jobs)
+ else:
+ self.gearworker = None
+
self.zk_client = ZooKeeperClient.fromConfig(config)
self.zk_client.connect()
- self.hostname = socket.getfqdn()
self.component_info = FingerGatewayComponent(
self.zk_client, self.hostname
)
self.component_info.register()
+ def handle_info(self, job):
+ self.log.debug('Got %s job: %s', job.name, job.unique)
+ info = {
+ 'server': self.hostname,
+ 'port': self.public_port,
+ }
+ if self.zone:
+ info['zone'] = self.zone
+ job.sendWorkComplete(json.dumps(info))
+
def _runCommand(self):
while self.command_running:
try:
@@ -194,10 +233,14 @@ class FingerGateway(object):
self.server = streamer_utils.CustomThreadingTCPServer(
self.address,
- functools.partial(RequestHandler, rpc=self.rpc),
+ functools.partial(self.handler_class, fingergw=self),
user=self.user,
pid_file=self.pid_file)
+ # Update port that we really use if we configured a port of 0
+ if self.public_port == 0:
+ self.public_port = self.server.socket.getsockname()[1]
+
# Start the command processor after the server and privilege drop
if self.command_socket_path:
self.log.debug("Starting command processor")
@@ -215,10 +258,20 @@ class FingerGateway(object):
self.server_thread.daemon = True
self.server_thread.start()
self.component_info.state = self.component_info.RUNNING
+
+ # Register this finger gateway in case we are zoned
+ if self.gearworker:
+ self.log.info('Starting gearworker')
+ self.gearworker.start()
+
self.log.info("Finger gateway is started")
def stop(self):
self.component_info.state = self.component_info.STOPPED
+
+ if self.gearworker:
+ self.gearworker.stop()
+
if self.server:
try:
self.server.shutdown()
@@ -250,7 +303,55 @@ class FingerGateway(object):
'''
Wait on the gateway to shutdown.
'''
+ self.gearworker.join()
self.server_thread.join()
if self.command_thread:
self.command_thread.join()
+
+
+class FingerClient:
+ log = logging.getLogger("zuul.FingerClient")
+
+ def __init__(self, server, port, ssl_key=None, ssl_cert=None, ssl_ca=None):
+ self.log.debug("Connecting to gearman at %s:%s" % (server, port))
+ self.gearman = gear.Client()
+ self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
+ keepalive=True, tcp_keepidle=60,
+ tcp_keepintvl=30, tcp_keepcnt=5)
+ self.log.debug("Waiting for gearman")
+ self.gearman.waitForServer()
+
+ def submitJob(self, name, data):
+ self.log.debug("Submitting job %s with data %s" % (name, data))
+ job = gear.TextJob(name,
+ json.dumps(data),
+ unique=str(time.time()))
+ self.gearman.submitJob(job, timeout=300)
+
+ self.log.debug("Waiting for job completion")
+ while not job.complete:
+ time.sleep(0.1)
+ if job.exception:
+ raise RPCFailure(job.exception)
+ self.log.debug("Job complete, success: %s" % (not job.failure))
+ return job
+
+ def shutdown(self):
+ self.gearman.shutdown()
+
+ def get_fingergw_in_zone(self, zone):
+ job_name = 'fingergw:info:%s' % zone
+ functions = getGearmanFunctions(self.gearman)
+ if job_name not in functions:
+ # There is no matching fingergw
+ self.log.warning('No fingergw found in zone %s', zone)
+ return None
+
+ job = self.submitJob(job_name, {})
+ if job.failure:
+ self.log.warning('Failed to get fingergw info from zone %s: '
+ '%s', zone, job)
+ return None
+ else:
+ return json.loads(job.data[0])