summaryrefslogtreecommitdiff
path: root/zuul
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2021-11-29 10:29:01 -0800
committerJames E. Blair <jim@acmegating.com>2022-01-25 06:44:09 -0800
commit3aa546da867225372d6818f5e11236eab989b344 (patch)
tree32526e9fed8b5e4dc4abbca61e83291144f361ae /zuul
parent5bc059ea76bce405b3c01693f4067f6b51c6289c (diff)
downloadzuul-3aa546da867225372d6818f5e11236eab989b344.tar.gz
Remove the rpc client and listener
These are not used any more, remove them from the scheduler and the "zuul" client. Change-Id: I5a3217dde32c5f8fefbb0a7a8357a737494d2956
Diffstat (limited to 'zuul')
-rwxr-xr-xzuul/cmd/client.py18
-rw-r--r--zuul/rpcclient.py148
-rw-r--r--zuul/rpclistener.py238
-rw-r--r--zuul/scheduler.py10
4 files changed, 3 insertions, 411 deletions
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index b3b5ff9c4..264973e47 100755
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -28,7 +28,6 @@ import textwrap
import requests
import urllib.parse
-import zuul.rpcclient
import zuul.cmd
from zuul.lib.config import get_default
from zuul.model import SystemAttributes
@@ -506,18 +505,7 @@ class Client(zuul.cmd.ZuulApp):
self.args.auth_token)
return client
conf_sections = self.config.sections()
- if 'gearman' in conf_sections:
- self.log.debug('gearman section found in config, using RPC client')
- server = self.config.get('gearman', 'server')
- port = get_default(self.config, 'gearman', 'port', 4730)
- ssl_key = get_default(self.config, 'gearman', 'ssl_key')
- ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
- ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
- client = zuul.rpcclient.RPCClient(
- server, port, ssl_key,
- ssl_cert, ssl_ca,
- client_id=self.app_description)
- elif 'webclient' in conf_sections:
+ if 'webclient' in conf_sections:
self.log.debug('web section found in config, using REST client')
server = get_default(self.config, 'webclient', 'url', None)
verify = get_default(self.config, 'webclient', 'verify_ssl',
@@ -525,8 +513,8 @@ class Client(zuul.cmd.ZuulApp):
client = ZuulRESTClient(server, verify,
self.args.auth_token)
else:
- print('Unable to find a way to connect to Zuul, add a "gearman" '
- 'or "web" section to your configuration file')
+ print('Unable to find a way to connect to Zuul, add a '
+ '"web" section to your configuration file')
sys.exit(1)
if server is None:
print('Missing "server" configuration value')
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
deleted file mode 100644
index 0f80376b1..000000000
--- a/zuul/rpcclient.py
+++ /dev/null
@@ -1,148 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import json
-import logging
-import time
-
-import gear
-
-
-class RPCFailure(Exception):
- pass
-
-
-class RPCClient(object):
- log = logging.getLogger("zuul.RPCClient")
-
- def __init__(self, server, port, ssl_key=None, ssl_cert=None, ssl_ca=None,
- client_id='Zuul RPC Client'):
- self.log.debug("Connecting to gearman at %s:%s" % (server, port))
- self.gearman = gear.Client(client_id)
- self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
- keepalive=True, tcp_keepidle=60,
- tcp_keepintvl=30, tcp_keepcnt=5)
- self.log.debug("Waiting for gearman")
- self.gearman.waitForServer()
-
- def submitJob(self, name, data):
- self.log.debug("Submitting job %s with data %s" % (name, data))
- job = gear.TextJob(name,
- json.dumps(data),
- unique=str(time.time()))
- self.gearman.submitJob(job, timeout=300)
-
- self.log.debug("Waiting for job completion")
- while not job.complete:
- time.sleep(0.1)
- if job.exception:
- raise RPCFailure(job.exception)
- self.log.debug("Job complete, success: %s" % (not job.failure))
- return job
-
- def autohold(self, tenant, project, job, change, ref, reason, count,
- node_hold_expiration=None):
- data = {'tenant': tenant,
- 'project': project,
- 'job': job,
- 'change': change,
- 'ref': ref,
- 'reason': reason,
- 'count': count,
- 'node_hold_expiration': node_hold_expiration}
- return not self.submitJob('zuul:autohold', data).failure
-
- def autohold_delete(self, request_id):
- data = {'request_id': request_id}
- return not self.submitJob('zuul:autohold_delete', data).failure
-
- def autohold_info(self, request_id):
- data = {'request_id': request_id}
- job = self.submitJob('zuul:autohold_info', data)
- if job.failure:
- return False
- else:
- return json.loads(job.data[0])
-
- # todo allow filtering per tenant, like in the REST API
- def autohold_list(self, *args, **kwargs):
- data = {}
- job = self.submitJob('zuul:autohold_list', data)
- if job.failure:
- return False
- else:
- return json.loads(job.data[0])
-
- def enqueue(self, tenant, pipeline, project, trigger, change):
- if trigger is not None:
- self.log.info('enqueue: the "trigger" argument is deprecated')
- data = {'tenant': tenant,
- 'pipeline': pipeline,
- 'project': project,
- 'trigger': trigger,
- 'change': change,
- }
- return not self.submitJob('zuul:enqueue', data).failure
-
- def enqueue_ref(
- self, tenant, pipeline, project, trigger, ref, oldrev, newrev):
- if trigger is not None:
- self.log.info('enqueue_ref: the "trigger" argument is deprecated')
- data = {'tenant': tenant,
- 'pipeline': pipeline,
- 'project': project,
- 'trigger': trigger,
- 'ref': ref,
- 'oldrev': oldrev,
- 'newrev': newrev,
- }
- return not self.submitJob('zuul:enqueue_ref', data).failure
-
- def dequeue(self, tenant, pipeline, project, change, ref):
- data = {'tenant': tenant,
- 'pipeline': pipeline,
- 'project': project,
- 'change': change,
- 'ref': ref,
- }
- return not self.submitJob('zuul:dequeue', data).failure
-
- def promote(self, tenant, pipeline, change_ids):
- data = {'tenant': tenant,
- 'pipeline': pipeline,
- 'change_ids': change_ids,
- }
- return not self.submitJob('zuul:promote', data).failure
-
- def get_running_jobs(self):
- data = {}
- job = self.submitJob('zuul:get_running_jobs', data)
- if job.failure:
- return False
- else:
- return json.loads(job.data[0])
-
- def shutdown(self):
- self.gearman.shutdown()
-
- def get_job_log_stream_address(self, uuid, logfile='console.log',
- source_zone=None):
- data = {'uuid': uuid, 'logfile': logfile}
- if source_zone:
- data['source_zone'] = source_zone
- job = self.submitJob('zuul:get_job_log_stream_address', data)
- if job.failure:
- return False
- else:
- return json.loads(job.data[0])
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
deleted file mode 100644
index e011a85a3..000000000
--- a/zuul/rpclistener.py
+++ /dev/null
@@ -1,238 +0,0 @@
-# Copyright 2012 Hewlett-Packard Development Company, L.P.
-# Copyright 2013 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import json
-import logging
-from abc import ABCMeta
-from typing import List
-
-from zuul.lib.gearworker import ZuulGearWorker
-
-
-class RPCListenerBase(metaclass=ABCMeta):
- log = logging.getLogger("zuul.RPCListenerBase")
- thread_name = 'zuul-rpc-gearman-worker'
- functions = [] # type: List[str]
-
- def __init__(self, config, sched):
- self.config = config
- self.sched = sched
-
- self.jobs = {}
-
- for func in self.functions:
- f = getattr(self, 'handle_%s' % func)
- self.jobs['zuul:%s' % func] = f
- self.gearworker = ZuulGearWorker(
- 'Zuul RPC Listener',
- self.log.name,
- self.thread_name,
- self.config,
- self.jobs)
-
- def start(self):
- self.gearworker.start()
-
- def stop(self):
- self.log.debug("Stopping")
- self.gearworker.stop()
- self.log.debug("Stopped")
-
- def join(self):
- self.gearworker.join()
-
-
-class RPCListenerSlow(RPCListenerBase):
- log = logging.getLogger("zuul.RPCListenerSlow")
- thread_name = 'zuul-rpc-slow-gearman-worker'
- functions = [
- 'dequeue',
- 'enqueue',
- 'enqueue_ref',
- 'promote',
- ]
-
- def handle_dequeue(self, job):
- args = json.loads(job.arguments)
- tenant_name = args['tenant']
- pipeline_name = args['pipeline']
- project_name = args['project']
- change = args['change']
- ref = args['ref']
- try:
- self.sched.dequeue(
- tenant_name, pipeline_name, project_name, change, ref)
- except Exception as e:
- job.sendWorkException(str(e).encode('utf8'))
- return
- job.sendWorkComplete()
-
- def _common_enqueue(self, job, args):
- tenant_name = args['tenant']
- pipeline_name = args['pipeline']
- project_name = args['project']
- change = args.get('change')
- ref = args.get('ref')
- oldrev = args.get('oldrev')
- newrev = args.get('newrev')
- try:
- self.sched.enqueue(tenant_name, pipeline_name, project_name,
- change, ref, oldrev, newrev)
- except Exception as e:
- job.sendWorkException(str(e).encode('utf8'))
- return
-
- job.sendWorkComplete()
-
- def handle_enqueue(self, job):
- args = json.loads(job.arguments)
- self._common_enqueue(job, args)
-
- def handle_enqueue_ref(self, job):
- args = json.loads(job.arguments)
- oldrev = args['oldrev']
- newrev = args['newrev']
- errors = ''
- try:
- int(oldrev, 16)
- if len(oldrev) != 40:
- errors += f'Old rev must be 40 character sha1: {oldrev}\n'
- except Exception:
- errors += f'Old rev must be base16 hash: {oldrev}\n'
- try:
- int(newrev, 16)
- if len(newrev) != 40:
- errors += f'New rev must be 40 character sha1: {newrev}\n'
- except Exception:
- errors += f'New rev must be base16 hash: {newrev}\n'
-
- if errors:
- job.sendWorkException(errors.encode('utf8'))
- else:
- self._common_enqueue(job, args)
-
- def handle_promote(self, job):
- args = json.loads(job.arguments)
- tenant_name = args['tenant']
- pipeline_name = args['pipeline']
- change_ids = args['change_ids']
- self.sched.promote(tenant_name, pipeline_name, change_ids)
- job.sendWorkComplete()
-
-
-class RPCListener(RPCListenerBase):
- log = logging.getLogger("zuul.RPCListener")
- thread_name = 'zuul-rpc-gearman-worker'
- functions = [
- 'autohold',
- 'autohold_delete',
- 'autohold_info',
- 'autohold_list',
- 'get_running_jobs',
- ]
-
- def start(self):
- self.gearworker.start()
-
- def stop(self):
- self.log.debug("Stopping")
- self.gearworker.stop()
- self.log.debug("Stopped")
-
- def join(self):
- self.gearworker.join()
-
- def handle_autohold_info(self, job):
- args = json.loads(job.arguments)
- request_id = args['request_id']
- try:
- data = self.sched.autohold_info(request_id)
- except Exception as e:
- job.sendWorkException(str(e).encode('utf8'))
- return
- job.sendWorkComplete(json.dumps(data))
-
- def handle_autohold_delete(self, job):
- args = json.loads(job.arguments)
- request_id = args['request_id']
- try:
- self.sched.autohold_delete(request_id)
- except Exception as e:
- job.sendWorkException(str(e).encode('utf8'))
- return
- job.sendWorkComplete()
-
- def handle_autohold_list(self, job):
- data = self.sched.autohold_list()
- job.sendWorkComplete(json.dumps(data))
-
- def handle_autohold(self, job):
- args = json.loads(job.arguments)
- params = {}
-
- tenant = self.sched.abide.tenants.get(args['tenant'])
- if tenant:
- params['tenant_name'] = args['tenant']
- else:
- error = "Invalid tenant: %s" % args['tenant']
- job.sendWorkException(error.encode('utf8'))
- return
-
- (trusted, project) = tenant.getProject(args['project'])
- if project:
- params['project_name'] = project.canonical_name
- else:
- error = "Invalid project: %s" % args['project']
- job.sendWorkException(error.encode('utf8'))
- return
-
- if args['change'] and args['ref']:
- job.sendWorkException("Change and ref can't be both used "
- "for the same request")
-
- if args['change']:
- # Convert change into ref based on zuul connection
- ref_filter = project.source.getRefForChange(args['change'])
- elif args['ref']:
- ref_filter = "%s" % args['ref']
- else:
- ref_filter = ".*"
-
- params['job_name'] = args['job']
- params['ref_filter'] = ref_filter
- params['reason'] = args['reason']
-
- if args['count'] < 0:
- error = "Invalid count: %d" % args['count']
- job.sendWorkException(error.encode('utf8'))
- return
-
- params['count'] = args['count']
- params['node_hold_expiration'] = args['node_hold_expiration']
-
- self.sched.autohold(**params)
- job.sendWorkComplete()
-
- def handle_get_running_jobs(self, job):
- # args = json.loads(job.arguments)
- # TODO: use args to filter by pipeline etc
- running_items = []
- for tenant in self.sched.abide.tenants.values():
- for pipeline_name, pipeline in tenant.layout.pipelines.items():
- for queue in pipeline.queues:
- for item in queue.queue:
- running_items.append(item.formatJSON())
-
- job.sendWorkComplete(json.dumps(running_items))
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 7d492a849..3e26625d1 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -30,7 +30,6 @@ from apscheduler.triggers.interval import IntervalTrigger
from kazoo.exceptions import NotEmptyError
from zuul import configloader, exceptions
-from zuul import rpclistener
from zuul.lib import commandsocket
from zuul.lib.ansible import AnsibleManager
from zuul.lib.config import get_default
@@ -175,8 +174,6 @@ class Scheduler(threading.Thread):
self.sql = self.connections.getSqlReporter(None)
self.statsd = get_statsd(config)
self.times = Times(self.sql, self.statsd)
- self.rpc = rpclistener.RPCListener(config, self)
- self.rpc_slow = rpclistener.RPCListenerSlow(config, self)
self.repl = None
self.stats_thread = threading.Thread(target=self.runStatsElection)
self.stats_thread.daemon = True
@@ -284,8 +281,6 @@ class Scheduler(threading.Thread):
self.command_thread.daemon = True
self.command_thread.start()
- self.rpc.start()
- self.rpc_slow.start()
self.stats_thread.start()
self.apsched.start()
self.times.start()
@@ -329,11 +324,6 @@ class Scheduler(threading.Thread):
self.log.debug("Waiting for layout update thread")
self.layout_update_event.set()
self.layout_update_thread.join()
- self.log.debug("Stopping RPC thread")
- self.rpc.stop()
- self.rpc.join()
- self.rpc_slow.stop()
- self.rpc_slow.join()
self.stop_repl()
self._command_running = False
self.log.debug("Stopping command socket")