diff options
author | Tobias Henkel <tobias.henkel@bmw.de> | 2019-06-12 16:33:28 +0200 |
---|---|---|
committer | Simon Westphahl <simon.westphahl@bmw.de> | 2021-06-10 14:09:37 +0200 |
commit | 5c4e8d7ddddafc1f4964a0498dfcaa7ac28da09d (patch) | |
tree | fe4039affc5d15f9a797e76e5c914e84280678b1 /zuul/lib/fingergw.py | |
parent | 46d0ed8e8f4f6cb33decdd435b965a5088474fec (diff) | |
download | zuul-5c4e8d7ddddafc1f4964a0498dfcaa7ac28da09d.tar.gz |
Route streams to different zones via finger gateway
In some distributed deployments we need to route traffic via single
entry points that need to dispatch the traffic. For this use case make
all components aware of their zone so it is possible to compute if
traffic needs to go via an intermediate finger gateway or not.
Therefore we register the gearman function 'fingergw:info:<zone>' if
the fingergw is zoned. That way the scheduler will be able to route
streams from different zones via finger gateways that are responsible
for their zone.
Change-Id: I655427283205ea02de6f0f271b4aa5092ac05278
Diffstat (limited to 'zuul/lib/fingergw.py')
-rw-r--r-- | zuul/lib/fingergw.py | 115 |
1 files changed, 108 insertions, 7 deletions
diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py index 022c6c6f3..334c27ffb 100644 --- a/zuul/lib/fingergw.py +++ b/zuul/lib/fingergw.py @@ -13,20 +13,26 @@ # under the License. import functools +import json import logging import socket import threading +import time from configparser import ConfigParser from typing import Optional +import gear + import zuul.rpcclient from zuul.lib import streamer_utils from zuul.lib.commandsocket import CommandSocket from zuul.lib.config import get_default +from zuul.lib.gear_utils import getGearmanFunctions +from zuul.lib.gearworker import ZuulGearWorker +from zuul.rpcclient import RPCFailure from zuul.zk import ZooKeeperClient from zuul.zk.components import FingerGatewayComponent - COMMANDS = ['stop'] @@ -38,7 +44,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler): log = logging.getLogger("zuul.fingergw") def __init__(self, *args, **kwargs): - self.rpc = kwargs.pop('rpc') + self.fingergw = kwargs.pop('fingergw') + super(RequestHandler, self).__init__(*args, **kwargs) def _fingerClient(self, server, port, build_uuid): @@ -72,7 +79,8 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler): port = None try: build_uuid = self.getCommand() - port_location = self.rpc.get_job_log_stream_address(build_uuid) + port_location = self.fingergw.rpc.get_job_log_stream_address( + build_uuid, source_zone=self.fingergw.zone) if not port_location: msg = 'Invalid build UUID %s' % build_uuid @@ -104,6 +112,9 @@ class FingerGateway(object): ''' log = logging.getLogger("zuul.fingergw") + handler_class = RequestHandler + + gearworker: Optional[ZuulGearWorker] def __init__( self, @@ -137,10 +148,12 @@ class FingerGateway(object): self.gear_ssl_ca = gear_ssl_ca host = get_default(config, 'fingergw', 'listen_address', '::') - port = int(get_default(config, 'fingergw', 'port', 79)) + self.port = int(get_default(config, 'fingergw', 'port', 79)) + self.public_port = int(get_default( + config, 'fingergw', 'public_port', self.port)) user = get_default(config, 'fingergw', 'user', None) - self.address = (host, port) + self.address = (host, self.port) self.user = user self.pid_file = pid_file @@ -157,14 +170,40 @@ class FingerGateway(object): stop=self.stop, ) + self.hostname = get_default(config, 'fingergw', 'hostname', + socket.getfqdn()) + self.zone = get_default(config, 'fingergw', 'zone') + + if self.zone is not None: + jobs = { + 'fingergw:info:%s' % self.zone: self.handle_info, + } + self.gearworker = ZuulGearWorker( + 'Finger Gateway', + 'zuul.fingergw', + 'fingergw-gearman-worker', + config, + jobs) + else: + self.gearworker = None + self.zk_client = ZooKeeperClient.fromConfig(config) self.zk_client.connect() - self.hostname = socket.getfqdn() self.component_info = FingerGatewayComponent( self.zk_client, self.hostname ) self.component_info.register() + def handle_info(self, job): + self.log.debug('Got %s job: %s', job.name, job.unique) + info = { + 'server': self.hostname, + 'port': self.public_port, + } + if self.zone: + info['zone'] = self.zone + job.sendWorkComplete(json.dumps(info)) + def _runCommand(self): while self.command_running: try: @@ -194,10 +233,14 @@ class FingerGateway(object): self.server = streamer_utils.CustomThreadingTCPServer( self.address, - functools.partial(RequestHandler, rpc=self.rpc), + functools.partial(self.handler_class, fingergw=self), user=self.user, pid_file=self.pid_file) + # Update port that we really use if we configured a port of 0 + if self.public_port == 0: + self.public_port = self.server.socket.getsockname()[1] + # Start the command processor after the server and privilege drop if self.command_socket_path: self.log.debug("Starting command processor") @@ -215,10 +258,20 @@ class FingerGateway(object): self.server_thread.daemon = True self.server_thread.start() self.component_info.state = self.component_info.RUNNING + + # Register this finger gateway in case we are zoned + if self.gearworker: + self.log.info('Starting gearworker') + self.gearworker.start() + self.log.info("Finger gateway is started") def stop(self): self.component_info.state = self.component_info.STOPPED + + if self.gearworker: + self.gearworker.stop() + if self.server: try: self.server.shutdown() @@ -250,7 +303,55 @@ class FingerGateway(object): ''' Wait on the gateway to shutdown. ''' + self.gearworker.join() self.server_thread.join() if self.command_thread: self.command_thread.join() + + +class FingerClient: + log = logging.getLogger("zuul.FingerClient") + + def __init__(self, server, port, ssl_key=None, ssl_cert=None, ssl_ca=None): + self.log.debug("Connecting to gearman at %s:%s" % (server, port)) + self.gearman = gear.Client() + self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca, + keepalive=True, tcp_keepidle=60, + tcp_keepintvl=30, tcp_keepcnt=5) + self.log.debug("Waiting for gearman") + self.gearman.waitForServer() + + def submitJob(self, name, data): + self.log.debug("Submitting job %s with data %s" % (name, data)) + job = gear.TextJob(name, + json.dumps(data), + unique=str(time.time())) + self.gearman.submitJob(job, timeout=300) + + self.log.debug("Waiting for job completion") + while not job.complete: + time.sleep(0.1) + if job.exception: + raise RPCFailure(job.exception) + self.log.debug("Job complete, success: %s" % (not job.failure)) + return job + + def shutdown(self): + self.gearman.shutdown() + + def get_fingergw_in_zone(self, zone): + job_name = 'fingergw:info:%s' % zone + functions = getGearmanFunctions(self.gearman) + if job_name not in functions: + # There is no matching fingergw + self.log.warning('No fingergw found in zone %s', zone) + return None + + job = self.submitJob(job_name, {}) + if job.failure: + self.log.warning('Failed to get fingergw info from zone %s: ' + '%s', zone, job) + return None + else: + return json.loads(job.data[0]) |