diff options
author | James E. Blair <jim@acmegating.com> | 2021-11-29 10:29:01 -0800 |
---|---|---|
committer | James E. Blair <jim@acmegating.com> | 2022-01-25 06:44:09 -0800 |
commit | 3aa546da867225372d6818f5e11236eab989b344 (patch) | |
tree | 32526e9fed8b5e4dc4abbca61e83291144f361ae /zuul | |
parent | 5bc059ea76bce405b3c01693f4067f6b51c6289c (diff) | |
download | zuul-3aa546da867225372d6818f5e11236eab989b344.tar.gz |
Remove the rpc client and listener
These are not used any more, remove them from the scheduler and
the "zuul" client.
Change-Id: I5a3217dde32c5f8fefbb0a7a8357a737494d2956
Diffstat (limited to 'zuul')
-rwxr-xr-x | zuul/cmd/client.py | 18 | ||||
-rw-r--r-- | zuul/rpcclient.py | 148 | ||||
-rw-r--r-- | zuul/rpclistener.py | 238 | ||||
-rw-r--r-- | zuul/scheduler.py | 10 |
4 files changed, 3 insertions, 411 deletions
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py index b3b5ff9c4..264973e47 100755 --- a/zuul/cmd/client.py +++ b/zuul/cmd/client.py @@ -28,7 +28,6 @@ import textwrap import requests import urllib.parse -import zuul.rpcclient import zuul.cmd from zuul.lib.config import get_default from zuul.model import SystemAttributes @@ -506,18 +505,7 @@ class Client(zuul.cmd.ZuulApp): self.args.auth_token) return client conf_sections = self.config.sections() - if 'gearman' in conf_sections: - self.log.debug('gearman section found in config, using RPC client') - server = self.config.get('gearman', 'server') - port = get_default(self.config, 'gearman', 'port', 4730) - ssl_key = get_default(self.config, 'gearman', 'ssl_key') - ssl_cert = get_default(self.config, 'gearman', 'ssl_cert') - ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') - client = zuul.rpcclient.RPCClient( - server, port, ssl_key, - ssl_cert, ssl_ca, - client_id=self.app_description) - elif 'webclient' in conf_sections: + if 'webclient' in conf_sections: self.log.debug('web section found in config, using REST client') server = get_default(self.config, 'webclient', 'url', None) verify = get_default(self.config, 'webclient', 'verify_ssl', @@ -525,8 +513,8 @@ class Client(zuul.cmd.ZuulApp): client = ZuulRESTClient(server, verify, self.args.auth_token) else: - print('Unable to find a way to connect to Zuul, add a "gearman" ' - 'or "web" section to your configuration file') + print('Unable to find a way to connect to Zuul, add a ' + '"web" section to your configuration file') sys.exit(1) if server is None: print('Missing "server" configuration value') diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py deleted file mode 100644 index 0f80376b1..000000000 --- a/zuul/rpcclient.py +++ /dev/null @@ -1,148 +0,0 @@ -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import json -import logging -import time - -import gear - - -class RPCFailure(Exception): - pass - - -class RPCClient(object): - log = logging.getLogger("zuul.RPCClient") - - def __init__(self, server, port, ssl_key=None, ssl_cert=None, ssl_ca=None, - client_id='Zuul RPC Client'): - self.log.debug("Connecting to gearman at %s:%s" % (server, port)) - self.gearman = gear.Client(client_id) - self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca, - keepalive=True, tcp_keepidle=60, - tcp_keepintvl=30, tcp_keepcnt=5) - self.log.debug("Waiting for gearman") - self.gearman.waitForServer() - - def submitJob(self, name, data): - self.log.debug("Submitting job %s with data %s" % (name, data)) - job = gear.TextJob(name, - json.dumps(data), - unique=str(time.time())) - self.gearman.submitJob(job, timeout=300) - - self.log.debug("Waiting for job completion") - while not job.complete: - time.sleep(0.1) - if job.exception: - raise RPCFailure(job.exception) - self.log.debug("Job complete, success: %s" % (not job.failure)) - return job - - def autohold(self, tenant, project, job, change, ref, reason, count, - node_hold_expiration=None): - data = {'tenant': tenant, - 'project': project, - 'job': job, - 'change': change, - 'ref': ref, - 'reason': reason, - 'count': count, - 'node_hold_expiration': node_hold_expiration} - return not self.submitJob('zuul:autohold', data).failure - - def autohold_delete(self, request_id): - data = {'request_id': request_id} - return not self.submitJob('zuul:autohold_delete', data).failure - - def autohold_info(self, request_id): - data = {'request_id': request_id} - job = self.submitJob('zuul:autohold_info', data) - if job.failure: - return False - else: - return json.loads(job.data[0]) - - # todo allow filtering per tenant, like in the REST API - def autohold_list(self, *args, **kwargs): - data = {} - job = self.submitJob('zuul:autohold_list', data) - if job.failure: - return False - else: - return json.loads(job.data[0]) - - def enqueue(self, tenant, pipeline, project, trigger, change): - if trigger is not None: - self.log.info('enqueue: the "trigger" argument is deprecated') - data = {'tenant': tenant, - 'pipeline': pipeline, - 'project': project, - 'trigger': trigger, - 'change': change, - } - return not self.submitJob('zuul:enqueue', data).failure - - def enqueue_ref( - self, tenant, pipeline, project, trigger, ref, oldrev, newrev): - if trigger is not None: - self.log.info('enqueue_ref: the "trigger" argument is deprecated') - data = {'tenant': tenant, - 'pipeline': pipeline, - 'project': project, - 'trigger': trigger, - 'ref': ref, - 'oldrev': oldrev, - 'newrev': newrev, - } - return not self.submitJob('zuul:enqueue_ref', data).failure - - def dequeue(self, tenant, pipeline, project, change, ref): - data = {'tenant': tenant, - 'pipeline': pipeline, - 'project': project, - 'change': change, - 'ref': ref, - } - return not self.submitJob('zuul:dequeue', data).failure - - def promote(self, tenant, pipeline, change_ids): - data = {'tenant': tenant, - 'pipeline': pipeline, - 'change_ids': change_ids, - } - return not self.submitJob('zuul:promote', data).failure - - def get_running_jobs(self): - data = {} - job = self.submitJob('zuul:get_running_jobs', data) - if job.failure: - return False - else: - return json.loads(job.data[0]) - - def shutdown(self): - self.gearman.shutdown() - - def get_job_log_stream_address(self, uuid, logfile='console.log', - source_zone=None): - data = {'uuid': uuid, 'logfile': logfile} - if source_zone: - data['source_zone'] = source_zone - job = self.submitJob('zuul:get_job_log_stream_address', data) - if job.failure: - return False - else: - return json.loads(job.data[0]) diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py deleted file mode 100644 index e011a85a3..000000000 --- a/zuul/rpclistener.py +++ /dev/null @@ -1,238 +0,0 @@ -# Copyright 2012 Hewlett-Packard Development Company, L.P. -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import json -import logging -from abc import ABCMeta -from typing import List - -from zuul.lib.gearworker import ZuulGearWorker - - -class RPCListenerBase(metaclass=ABCMeta): - log = logging.getLogger("zuul.RPCListenerBase") - thread_name = 'zuul-rpc-gearman-worker' - functions = [] # type: List[str] - - def __init__(self, config, sched): - self.config = config - self.sched = sched - - self.jobs = {} - - for func in self.functions: - f = getattr(self, 'handle_%s' % func) - self.jobs['zuul:%s' % func] = f - self.gearworker = ZuulGearWorker( - 'Zuul RPC Listener', - self.log.name, - self.thread_name, - self.config, - self.jobs) - - def start(self): - self.gearworker.start() - - def stop(self): - self.log.debug("Stopping") - self.gearworker.stop() - self.log.debug("Stopped") - - def join(self): - self.gearworker.join() - - -class RPCListenerSlow(RPCListenerBase): - log = logging.getLogger("zuul.RPCListenerSlow") - thread_name = 'zuul-rpc-slow-gearman-worker' - functions = [ - 'dequeue', - 'enqueue', - 'enqueue_ref', - 'promote', - ] - - def handle_dequeue(self, job): - args = json.loads(job.arguments) - tenant_name = args['tenant'] - pipeline_name = args['pipeline'] - project_name = args['project'] - change = args['change'] - ref = args['ref'] - try: - self.sched.dequeue( - tenant_name, pipeline_name, project_name, change, ref) - except Exception as e: - job.sendWorkException(str(e).encode('utf8')) - return - job.sendWorkComplete() - - def _common_enqueue(self, job, args): - tenant_name = args['tenant'] - pipeline_name = args['pipeline'] - project_name = args['project'] - change = args.get('change') - ref = args.get('ref') - oldrev = args.get('oldrev') - newrev = args.get('newrev') - try: - self.sched.enqueue(tenant_name, pipeline_name, project_name, - change, ref, oldrev, newrev) - except Exception as e: - job.sendWorkException(str(e).encode('utf8')) - return - - job.sendWorkComplete() - - def handle_enqueue(self, job): - args = json.loads(job.arguments) - self._common_enqueue(job, args) - - def handle_enqueue_ref(self, job): - args = json.loads(job.arguments) - oldrev = args['oldrev'] - newrev = args['newrev'] - errors = '' - try: - int(oldrev, 16) - if len(oldrev) != 40: - errors += f'Old rev must be 40 character sha1: {oldrev}\n' - except Exception: - errors += f'Old rev must be base16 hash: {oldrev}\n' - try: - int(newrev, 16) - if len(newrev) != 40: - errors += f'New rev must be 40 character sha1: {newrev}\n' - except Exception: - errors += f'New rev must be base16 hash: {newrev}\n' - - if errors: - job.sendWorkException(errors.encode('utf8')) - else: - self._common_enqueue(job, args) - - def handle_promote(self, job): - args = json.loads(job.arguments) - tenant_name = args['tenant'] - pipeline_name = args['pipeline'] - change_ids = args['change_ids'] - self.sched.promote(tenant_name, pipeline_name, change_ids) - job.sendWorkComplete() - - -class RPCListener(RPCListenerBase): - log = logging.getLogger("zuul.RPCListener") - thread_name = 'zuul-rpc-gearman-worker' - functions = [ - 'autohold', - 'autohold_delete', - 'autohold_info', - 'autohold_list', - 'get_running_jobs', - ] - - def start(self): - self.gearworker.start() - - def stop(self): - self.log.debug("Stopping") - self.gearworker.stop() - self.log.debug("Stopped") - - def join(self): - self.gearworker.join() - - def handle_autohold_info(self, job): - args = json.loads(job.arguments) - request_id = args['request_id'] - try: - data = self.sched.autohold_info(request_id) - except Exception as e: - job.sendWorkException(str(e).encode('utf8')) - return - job.sendWorkComplete(json.dumps(data)) - - def handle_autohold_delete(self, job): - args = json.loads(job.arguments) - request_id = args['request_id'] - try: - self.sched.autohold_delete(request_id) - except Exception as e: - job.sendWorkException(str(e).encode('utf8')) - return - job.sendWorkComplete() - - def handle_autohold_list(self, job): - data = self.sched.autohold_list() - job.sendWorkComplete(json.dumps(data)) - - def handle_autohold(self, job): - args = json.loads(job.arguments) - params = {} - - tenant = self.sched.abide.tenants.get(args['tenant']) - if tenant: - params['tenant_name'] = args['tenant'] - else: - error = "Invalid tenant: %s" % args['tenant'] - job.sendWorkException(error.encode('utf8')) - return - - (trusted, project) = tenant.getProject(args['project']) - if project: - params['project_name'] = project.canonical_name - else: - error = "Invalid project: %s" % args['project'] - job.sendWorkException(error.encode('utf8')) - return - - if args['change'] and args['ref']: - job.sendWorkException("Change and ref can't be both used " - "for the same request") - - if args['change']: - # Convert change into ref based on zuul connection - ref_filter = project.source.getRefForChange(args['change']) - elif args['ref']: - ref_filter = "%s" % args['ref'] - else: - ref_filter = ".*" - - params['job_name'] = args['job'] - params['ref_filter'] = ref_filter - params['reason'] = args['reason'] - - if args['count'] < 0: - error = "Invalid count: %d" % args['count'] - job.sendWorkException(error.encode('utf8')) - return - - params['count'] = args['count'] - params['node_hold_expiration'] = args['node_hold_expiration'] - - self.sched.autohold(**params) - job.sendWorkComplete() - - def handle_get_running_jobs(self, job): - # args = json.loads(job.arguments) - # TODO: use args to filter by pipeline etc - running_items = [] - for tenant in self.sched.abide.tenants.values(): - for pipeline_name, pipeline in tenant.layout.pipelines.items(): - for queue in pipeline.queues: - for item in queue.queue: - running_items.append(item.formatJSON()) - - job.sendWorkComplete(json.dumps(running_items)) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 7d492a849..3e26625d1 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -30,7 +30,6 @@ from apscheduler.triggers.interval import IntervalTrigger from kazoo.exceptions import NotEmptyError from zuul import configloader, exceptions -from zuul import rpclistener from zuul.lib import commandsocket from zuul.lib.ansible import AnsibleManager from zuul.lib.config import get_default @@ -175,8 +174,6 @@ class Scheduler(threading.Thread): self.sql = self.connections.getSqlReporter(None) self.statsd = get_statsd(config) self.times = Times(self.sql, self.statsd) - self.rpc = rpclistener.RPCListener(config, self) - self.rpc_slow = rpclistener.RPCListenerSlow(config, self) self.repl = None self.stats_thread = threading.Thread(target=self.runStatsElection) self.stats_thread.daemon = True @@ -284,8 +281,6 @@ class Scheduler(threading.Thread): self.command_thread.daemon = True self.command_thread.start() - self.rpc.start() - self.rpc_slow.start() self.stats_thread.start() self.apsched.start() self.times.start() @@ -329,11 +324,6 @@ class Scheduler(threading.Thread): self.log.debug("Waiting for layout update thread") self.layout_update_event.set() self.layout_update_thread.join() - self.log.debug("Stopping RPC thread") - self.rpc.stop() - self.rpc.join() - self.rpc_slow.stop() - self.rpc_slow.join() self.stop_repl() self._command_running = False self.log.debug("Stopping command socket") |