diff options
author | James E. Blair <jim@acmegating.com> | 2022-01-25 16:33:40 -0800 |
---|---|---|
committer | James E. Blair <jim@acmegating.com> | 2022-02-08 14:14:17 -0800 |
commit | a160484a8685298b8661a64ca6a4b9142a31dc97 (patch) | |
tree | b8375acd345fe260487601edaac8ee86c50db438 | |
parent | 1d4a6e0b7163b92c50721c6e79ed0b1fe2839440 (diff) | |
download | zuul-a160484a8685298b8661a64ca6a4b9142a31dc97.tar.gz |
Add zuul-scheduler tenant-reconfigure
This is a new reconfiguration command which behaves like full-reconfigure
but only for a single tenant. This can be useful after connection issues
with code hosting systems, or potentially with Zuul cache bugs.
Because this is the first command-socket command with an argument, some
command-socket infrastructure changes are necessary. Additionally, this
includes some minor changes to make the services more consistent around
socket commands.
Change-Id: Ib695ab8e7ae54790a0a0e4ac04fdad96d60ee0c9
-rw-r--r-- | doc/source/operation.rst | 6 | ||||
-rw-r--r-- | releasenotes/notes/tenant-reconfigure-0c67cca5f64f7b51.yaml | 7 | ||||
-rw-r--r-- | tests/base.py | 8 | ||||
-rw-r--r-- | tests/unit/test_scheduler.py | 29 | ||||
-rwxr-xr-x | zuul/cmd/__init__.py | 40 | ||||
-rwxr-xr-x | zuul/cmd/executor.py | 11 | ||||
-rw-r--r-- | zuul/cmd/fingergw.py | 22 | ||||
-rwxr-xr-x | zuul/cmd/merger.py | 19 | ||||
-rwxr-xr-x | zuul/cmd/scheduler.py | 23 | ||||
-rwxr-xr-x | zuul/cmd/web.py | 14 | ||||
-rw-r--r-- | zuul/executor/server.py | 72 | ||||
-rw-r--r-- | zuul/lib/commandsocket.py | 57 | ||||
-rw-r--r-- | zuul/lib/fingergw.py | 20 | ||||
-rw-r--r-- | zuul/merger/server.py | 24 | ||||
-rw-r--r-- | zuul/model.py | 9 | ||||
-rw-r--r-- | zuul/scheduler.py | 71 | ||||
-rwxr-xr-x | zuul/web/__init__.py | 20 |
17 files changed, 322 insertions, 130 deletions
diff --git a/doc/source/operation.rst b/doc/source/operation.rst index 7debecaad..c81f2e8d7 100644 --- a/doc/source/operation.rst +++ b/doc/source/operation.rst @@ -40,7 +40,11 @@ not read from a git repository. Zuul supports two kinds of reconfigurations. The full reconfiguration refetches and reloads the configuration of all tenants. To do so, run ``zuul-scheduler full-reconfigure``. For example this can be used to fix eventual configuration inconsistencies -after connection problems to Gerrit/Github. +after connection problems with the code hosting system. + +To perform the same actions as a full reconfiguration but for a single +tenant, use ``zuul-scheduler tenant-reconfigure TENANT`` (where +``TENANT`` is the name of the tenant to reconfigure). The smart reconfiguration reloads only the tenants that changed their configuration in the tenant config file. To do so, run diff --git a/releasenotes/notes/tenant-reconfigure-0c67cca5f64f7b51.yaml b/releasenotes/notes/tenant-reconfigure-0c67cca5f64f7b51.yaml new file mode 100644 index 000000000..8bed0829f --- /dev/null +++ b/releasenotes/notes/tenant-reconfigure-0c67cca5f64f7b51.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + A new command ``zuul-scheduler tenant-reconfigure`` has been + added. It allows an operator to perform a reconfiguration of a + single tenant. This may be helpful in clearing up issues after + connection problems with the code hosting system. diff --git a/tests/base.py b/tests/base.py index 44dc48952..da7b66538 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1,6 +1,6 @@ # Copyright 2012 Hewlett-Packard Development Company, L.P. # Copyright 2016 Red Hat, Inc. -# Copyright 2021 Acme Gating, LLC +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -4271,6 +4271,12 @@ class SchedulerTestApp: except Exception: self.log.exception("Reconfiguration failed:") + def tenantReconfigure(self, tenants): + try: + self.sched.reconfigure(self.config, smart=False, tenants=tenants) + except Exception: + self.log.exception("Reconfiguration failed:") + class SchedulerTestManager: def __init__(self, validate_tenants): diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index d24dfc06f..67c8c49a0 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -1,4 +1,5 @@ # Copyright 2012 Hewlett-Packard Development Company, L.P. +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -3797,6 +3798,34 @@ class TestScheduler(ZuulTestCase): else: time.sleep(0) + def test_tenant_reconfiguration_command_socket(self): + "Test that single-tenant reconfiguration via command socket works" + + # record previous tenant reconfiguration state, which may not be set + old = self.scheds.first.sched.tenant_layout_state.get( + 'tenant-one', EMPTY_LAYOUT_STATE) + self.waitUntilSettled() + + command_socket = self.scheds.first.config.get( + 'scheduler', 'command_socket') + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: + s.connect(command_socket) + s.sendall('tenant-reconfigure ["tenant-one"]\n'.encode('utf8')) + + # Wait for full reconfiguration. Note that waitUntilSettled is not + # reliable here because the reconfigure event may arrive in the + # event queue after waitUntilSettled. + start = time.time() + while True: + if time.time() - start > 15: + raise Exception("Timeout waiting for full reconfiguration") + new = self.scheds.first.sched.tenant_layout_state.get( + 'tenant-one', EMPTY_LAYOUT_STATE) + if old < new: + break + else: + time.sleep(0) + def test_double_live_reconfiguration_shared_queue(self): # This was a real-world regression. A change is added to # gate; a reconfigure happens, a second change which depends diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py index 045882084..6df796bca 100755 --- a/zuul/cmd/__init__.py +++ b/zuul/cmd/__init__.py @@ -1,5 +1,6 @@ # Copyright 2012 Hewlett-Packard Development Company, L.P. # Copyright 2013 OpenStack Foundation +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -19,6 +20,7 @@ import configparser import daemon import extras import io +import json import logging import logging.config import os @@ -100,6 +102,7 @@ class ZuulApp(object): self.args = None self.config = None self.connections = {} + self.commands = {} def _get_version(self): from zuul.version import version_info as zuul_version_info @@ -116,14 +119,47 @@ class ZuulApp(object): help='show zuul version') return parser + def addSubCommands(self, parser, commands): + # Add a list of commandsocket.Command items to the parser + subparsers = parser.add_subparsers( + title='Online commands', + description=('The following commands may be used to affect ' + 'the running process.'), + ) + for command in commands: + self.commands[command.name] = command + cmd = subparsers.add_parser( + command.name, help=command.help) + cmd.set_defaults(command=command.name) + for arg in command.args: + cmd.add_argument(arg.name, + help=arg.help, + default=arg.default) + + def handleCommands(self): + command_name = getattr(self.args, 'command', None) + if command_name in self.commands: + command = self.commands[self.args.command] + command_str = command.name + command_args = [getattr(self.args, arg.name) + for arg in command.args] + if command_args: + command_str += ' ' + json.dumps(command_args) + self.sendCommand(command_str) + sys.exit(0) + def parseArguments(self, args=None): parser = self.createParser() self.args = parser.parse_args(args) - if hasattr(self.args, 'foreground') and self.args.foreground: + if getattr(self.args, 'foreground', False): self.args.nodaemon = True else: self.args.nodaemon = False + + if getattr(self.args, 'command', None): + self.args.nodaemon = True + return parser def readConfig(self): @@ -219,7 +255,7 @@ class ZuulDaemonApp(ZuulApp, metaclass=abc.ABCMeta): with daemon.DaemonContext(pidfile=pid, umask=0o022): self.run() - def send_command(self, cmd): + def sendCommand(self, cmd): command_socket = get_default( self.config, self.app_name, 'command_socket', '/var/lib/zuul/%s.socket' % self.app_name) diff --git a/zuul/cmd/executor.py b/zuul/cmd/executor.py index 7e3c0cb1a..3c98ecd4a 100755 --- a/zuul/cmd/executor.py +++ b/zuul/cmd/executor.py @@ -1,5 +1,6 @@ # Copyright 2012 Hewlett-Packard Development Company, L.P. # Copyright 2013-2014 OpenStack Foundation +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -33,15 +34,11 @@ class Executor(zuul.cmd.ZuulDaemonApp): parser.add_argument('--keep-jobdir', dest='keep_jobdir', action='store_true', help='keep local jobdirs after run completes') - parser.add_argument('command', - choices=zuul.executor.server.COMMANDS, - nargs='?') + self.addSubCommands(parser, zuul.executor.server.COMMANDS) return parser def parseArguments(self, args=None): super(Executor, self).parseArguments() - if self.args.command: - self.args.nodaemon = True def exit_handler(self, signum, frame): if self.config.has_option('executor', 'sigterm_method'): @@ -79,9 +76,7 @@ class Executor(zuul.cmd.ZuulDaemonApp): self.log_streamer_pid = child_pid def run(self): - if self.args.command in zuul.executor.server.COMMANDS: - self.send_command(self.args.command) - sys.exit(0) + self.handleCommands() self.configure_connections(source_only=True) diff --git a/zuul/cmd/fingergw.py b/zuul/cmd/fingergw.py index 9497d7e85..04e0d5cc8 100644 --- a/zuul/cmd/fingergw.py +++ b/zuul/cmd/fingergw.py @@ -1,4 +1,5 @@ # Copyright 2017 Red Hat, Inc. +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -14,12 +15,10 @@ import logging import signal -import sys -from typing import Optional import zuul.cmd from zuul.lib.config import get_default -from zuul.lib.fingergw import COMMANDS, FingerGateway +from zuul.lib import fingergw class FingerGatewayApp(zuul.cmd.ZuulDaemonApp): @@ -32,29 +31,20 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp): def __init__(self): super(FingerGatewayApp, self).__init__() - self.gateway: Optional[FingerGateway] = None + self.gateway = None def createParser(self): parser = super(FingerGatewayApp, self).createParser() - parser.add_argument('command', - choices=COMMANDS, - nargs='?') + self.addSubCommands(parser, fingergw.COMMANDS) return parser - def parseArguments(self, args=None): - super(FingerGatewayApp, self).parseArguments() - if self.args.command: - self.args.nodaemon = True - def run(self): ''' Main entry point for the FingerGatewayApp. Called by the main() method of the parent class. ''' - if self.args.command in COMMANDS: - self.send_command(self.args.command) - sys.exit(0) + self.handleCommands() self.setup_logging('fingergw', 'log_config') self.log = logging.getLogger('zuul.fingergw') @@ -63,7 +53,7 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp): self.config, 'fingergw', 'command_socket', '/var/lib/zuul/%s.socket' % self.app_name) - self.gateway = FingerGateway( + self.gateway = fingergw.FingerGateway( self.config, cmdsock, self.getPidFile(), diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py index e5f412826..659399c85 100755 --- a/zuul/cmd/merger.py +++ b/zuul/cmd/merger.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # Copyright 2012 Hewlett-Packard Development Company, L.P. # Copyright 2013-2014 OpenStack Foundation +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -18,7 +19,7 @@ import signal import sys import zuul.cmd -from zuul.merger.server import COMMANDS, MergeServer +import zuul.merger.server class Merger(zuul.cmd.ZuulDaemonApp): @@ -27,31 +28,23 @@ class Merger(zuul.cmd.ZuulDaemonApp): def createParser(self): parser = super(Merger, self).createParser() - parser.add_argument('command', - choices=COMMANDS, - nargs='?') + self.addSubCommands(parser, zuul.merger.server.COMMANDS) return parser - def parseArguments(self, args=None): - super(Merger, self).parseArguments() - if self.args.command: - self.args.nodaemon = True - def exit_handler(self, signum, frame): self.merger.stop() self.merger.join() sys.exit(0) def run(self): - if self.args.command in COMMANDS: - self.send_command(self.args.command) - sys.exit(0) + self.handleCommands() self.configure_connections(source_only=True) self.setup_logging('merger', 'log_config') - self.merger = MergeServer(self.config, self.connections) + self.merger = zuul.merger.server.MergeServer( + self.config, self.connections) self.merger.start() if self.args.nodaemon: diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py index 1c320b76c..176466cfe 100755 --- a/zuul/cmd/scheduler.py +++ b/zuul/cmd/scheduler.py @@ -1,5 +1,6 @@ # Copyright 2012 Hewlett-Packard Development Company, L.P. # Copyright 2013 OpenStack Foundation +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -38,16 +39,9 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): 'listed, all tenants will be validated. ' 'Note: this requires ZooKeeper and ' 'will distribute work to mergers.') - parser.add_argument('command', - choices=zuul.scheduler.COMMANDS, - nargs='?') + self.addSubCommands(parser, zuul.scheduler.COMMANDS) return parser - def parseArguments(self, args=None): - super(Scheduler, self).parseArguments() - if self.args.command: - self.args.nodaemon = True - def fullReconfigure(self): self.log.debug("Reconfiguration triggered") self.readConfig() @@ -66,15 +60,22 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): except Exception: self.log.exception("Reconfiguration failed:") + def tenantReconfigure(self, tenants): + self.log.debug("Tenant reconfiguration triggered") + self.readConfig() + self.setup_logging('scheduler', 'log_config') + try: + self.sched.reconfigure(self.config, smart=False, tenants=tenants) + except Exception: + self.log.exception("Reconfiguration failed:") + def exit_handler(self, signum, frame): self.sched.stop() self.sched.join() sys.exit(0) def run(self): - if self.args.command in zuul.scheduler.COMMANDS: - self.send_command(self.args.command) - sys.exit(0) + self.handleCommands() self.setup_logging('scheduler', 'log_config') self.log = logging.getLogger("zuul.Scheduler") diff --git a/zuul/cmd/web.py b/zuul/cmd/web.py index 1eb6bc8c2..bb9348c71 100755 --- a/zuul/cmd/web.py +++ b/zuul/cmd/web.py @@ -1,4 +1,5 @@ # Copyright 2017 Red Hat, Inc. +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -30,16 +31,9 @@ class WebServer(zuul.cmd.ZuulDaemonApp): def createParser(self): parser = super().createParser() - parser.add_argument('command', - choices=zuul.web.COMMANDS, - nargs='?') + self.addSubCommands(parser, zuul.web.COMMANDS) return parser - def parseArguments(self, args=None): - super().parseArguments() - if self.args.command: - self.args.nodaemon = True - def exit_handler(self, signum, frame): self.web.stop() @@ -81,9 +75,7 @@ class WebServer(zuul.cmd.ZuulDaemonApp): self.authenticators.configure(self.config) def run(self): - if self.args.command in zuul.web.COMMANDS: - self.send_command(self.args.command) - sys.exit(0) + self.handleCommands() self.setup_logging('web', 'log_config') self.log = logging.getLogger("zuul.WebServer") diff --git a/zuul/executor/server.py b/zuul/executor/server.py index d43e38b6b..3330e9641 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -1,5 +1,5 @@ # Copyright 2014 OpenStack Foundation -# Copyright 2021 Acme Gating, LLC +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -80,8 +80,6 @@ from zuul.zk.system import ZuulSystem from zuul.zk.zkobject import ZKContext BUFFER_LINES_FOR_SYNTAX = 200 -COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose', - 'unverbose', 'keep', 'nokeep', 'repl', 'norepl'] DEFAULT_FINGER_PORT = 7900 DEFAULT_STREAM_PORT = 19885 BLACKLISTED_ANSIBLE_CONNECTION_TYPES = [ @@ -95,6 +93,40 @@ BLACKLISTED_VARS = dict( ) +class VerboseCommand(commandsocket.Command): + name = 'verbose' + help = 'Enable Ansible verbose mode' + + +class UnVerboseCommand(commandsocket.Command): + name = 'unverbose' + help = 'Disable Ansible verbose mode' + + +class KeepCommand(commandsocket.Command): + name = 'keep' + help = 'Keep build directories after completion' + + +class NoKeepCommand(commandsocket.Command): + name = 'nokeep' + help = 'Remove build directories after completion' + + +COMMANDS = [ + commandsocket.StopCommand, + commandsocket.PauseCommand, + commandsocket.UnPauseCommand, + commandsocket.GracefulCommand, + VerboseCommand, + UnVerboseCommand, + KeepCommand, + NoKeepCommand, + commandsocket.ReplCommand, + commandsocket.NoReplCommand, +] + + class NodeRequestError(Exception): pass @@ -3173,18 +3205,18 @@ class ExecutorServer(BaseMergeServer): self.governor_lock = threading.Lock() self.run_lock = threading.Lock() self.verbose = False - self.command_map = dict( - stop=self.stop, - pause=self.pause, - unpause=self.unpause, - graceful=self.graceful, - verbose=self.verboseOn, - unverbose=self.verboseOff, - keep=self.keep, - nokeep=self.nokeep, - repl=self.start_repl, - norepl=self.stop_repl, - ) + self.command_map = { + commandsocket.StopCommand.name: self.stop, + commandsocket.PauseCommand.name: self.pause, + commandsocket.UnPauseCommand.name: self.unpause, + commandsocket.GracefulCommand.name: self.graceful, + VerboseCommand.name: self.verboseOn, + UnVerboseCommand.name: self.verboseOff, + KeepCommand.name: self.keep, + NoKeepCommand.name: self.nokeep, + commandsocket.ReplCommand.name: self.startRepl, + commandsocket.NoReplCommand.name: self.stopRepl, + } self.log_console_port = log_console_port self.repl = None @@ -3456,7 +3488,7 @@ class ExecutorServer(BaseMergeServer): # ZooKeeper. We do this as one of the last steps to ensure # that all ZK related components can be stopped first. super().stop() - self.stop_repl() + self.stopRepl() self.monitoring_server.stop() self.log.debug("Stopped") @@ -3510,13 +3542,13 @@ class ExecutorServer(BaseMergeServer): def nokeep(self): self.keep_jobdir = False - def start_repl(self): + def startRepl(self): if self.repl: return self.repl = zuul.lib.repl.REPLServer(self) self.repl.start() - def stop_repl(self): + def stopRepl(self): if not self.repl: # not running return @@ -3526,9 +3558,9 @@ class ExecutorServer(BaseMergeServer): def runCommand(self): while self._command_running: try: - command = self.command_socket.get().decode('utf8') + command, args = self.command_socket.get() if command != '_stop': - self.command_map[command]() + self.command_map[command](*args) except Exception: self.log.exception("Exception while processing command") diff --git a/zuul/lib/commandsocket.py b/zuul/lib/commandsocket.py index 087173b1b..7311bca85 100644 --- a/zuul/lib/commandsocket.py +++ b/zuul/lib/commandsocket.py @@ -1,6 +1,7 @@ # Copyright 2014 OpenStack Foundation # Copyright 2014 Hewlett-Packard Development Company, L.P. # Copyright 2016 Red Hat +# Copyright 2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -14,6 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. +import json import logging import os import socket @@ -22,6 +24,49 @@ import threading from zuul.lib.queue import NamedQueue +class Command: + name = None + help = None + args = [] + + +class Argument: + name = None + help = None + required = None + default = None + + +class StopCommand(Command): + name = 'stop' + help = 'Stop the running process' + + +class GracefulCommand(Command): + name = 'graceful' + help = 'Stop after completing existing work' + + +class PauseCommand(Command): + name = 'pause' + help = 'Stop accepting new work' + + +class UnPauseCommand(Command): + name = 'unpause' + help = 'Resume accepting new work' + + +class ReplCommand(Command): + name = 'repl' + help = 'Enable the REPL for debugging' + + +class NoReplCommand(Command): + name = 'norepl' + help = 'Disable the REPL' + + class CommandSocket(object): log = logging.getLogger("zuul.CommandSocket") @@ -54,7 +99,7 @@ class CommandSocket(object): # either handle '_stop' or just ignore the unknown command and # then check to see if they should continue to run before # re-entering their loop. - self.queue.put(b'_stop') + self.queue.put(('_stop', [])) self.socket_thread.join() def _socketListener(self): @@ -70,11 +115,17 @@ class CommandSocket(object): buf = buf.strip() self.log.debug("Received %s from socket" % (buf,)) s.close() + + buf = buf.decode('utf8') + parts = buf.split(' ', 1) # Because we use '_stop' internally to wake up a # waiting thread, don't allow it to actually be # injected externally. - if buf != b'_stop': - self.queue.put(buf) + args = parts[1:] + if args: + args = json.loads(args[0]) + if parts[0] != '_stop': + self.queue.put((parts[0], args)) except Exception: self.log.exception("Exception in socket handler") diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py index 867b07732..dd2218b26 100644 --- a/zuul/lib/fingergw.py +++ b/zuul/lib/fingergw.py @@ -1,4 +1,5 @@ # Copyright 2017 Red Hat, Inc. +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -22,7 +23,7 @@ from typing import Optional from zuul.exceptions import StreamingError from zuul.lib import streamer_utils -from zuul.lib.commandsocket import CommandSocket +from zuul.lib import commandsocket from zuul.lib.config import get_default from zuul.lib.monitoring import MonitoringServer from zuul.version import get_version_string @@ -30,7 +31,9 @@ from zuul.zk import ZooKeeperClient from zuul.zk.components import ComponentRegistry, FingerGatewayComponent from zuul.zk.executor import ExecutorApi -COMMANDS = ['stop'] +COMMANDS = [ + commandsocket.StopCommand, +] class RequestHandler(streamer_utils.BaseFingerRequestHandler): @@ -168,9 +171,9 @@ class FingerGateway(object): else: self.tls_listen = False - self.command_map = dict( - stop=self.stop, - ) + self.command_map = { + commandsocket.StopCommand.name: self.stop, + } self.hostname = get_default(config, 'fingergw', 'hostname', socket.getfqdn()) @@ -199,9 +202,9 @@ class FingerGateway(object): def _runCommand(self): while self.command_running: try: - command = self.command_socket.get().decode('utf8') + command, args = self.command_socket.get() if command != '_stop': - self.command_map[command]() + self.command_map[command](*args) else: return except Exception: @@ -239,7 +242,8 @@ class FingerGateway(object): # Start the command processor after the server and privilege drop if self.command_socket_path: self.log.debug("Starting command processor") - self.command_socket = CommandSocket(self.command_socket_path) + self.command_socket = commandsocket.CommandSocket( + self.command_socket_path) self.command_socket.start() self.command_running = True self.command_thread = threading.Thread( diff --git a/zuul/merger/server.py b/zuul/merger/server.py index a036e4a85..96dc72137 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -1,4 +1,5 @@ # Copyright 2014 OpenStack Foundation +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -39,7 +40,12 @@ from zuul.zk.components import MergerComponent from zuul.zk.event_queues import PipelineResultEventQueue from zuul.zk.merger import MergerApi -COMMANDS = ['stop', 'pause', 'unpause', 'graceful'] +COMMANDS = [ + commandsocket.StopCommand, + commandsocket.PauseCommand, + commandsocket.UnPauseCommand, + commandsocket.GracefulCommand +] class BaseRepoLocks(metaclass=ABCMeta): @@ -475,14 +481,14 @@ class MergeServer(BaseMergeServer): self.component_info) self.monitoring_server.start() - self.command_map = dict( - stop=self.stop, + self.command_map = { + commandsocket.StopCommand.name: self.stop, # Stop for the mergers is always graceful. We add this alias # to make it clearer to users that they can gracefully stop. - graceful=self.stop, - pause=self.pause, - unpause=self.unpause, - ) + commandsocket.GracefulCommand.name: self.stop, + commandsocket.PauseCommand.name: self.pause, + commandsocket.UnPauseCommand.name: self.unpause, + } command_socket = get_default( self.config, 'merger', 'command_socket', '/var/lib/zuul/merger.socket') @@ -527,8 +533,8 @@ class MergeServer(BaseMergeServer): def runCommand(self): while self._command_running: try: - command = self.command_socket.get().decode('utf8') + command, args = self.command_socket.get() if command != '_stop': - self.command_map[command]() + self.command_map[command](*args) except Exception: self.log.exception("Exception while processing command") diff --git a/zuul/model.py b/zuul/model.py index 778b6bd68..b671bd130 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -1,5 +1,5 @@ # Copyright 2012 Hewlett-Packard Development Company, L.P. -# Copyright 2021 Acme Gating, LLC +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -5555,18 +5555,21 @@ class ReconfigureEvent(ManagementEvent): """Reconfigure the scheduler. The layout will be (re-)loaded from the path specified in the configuration.""" - def __init__(self, smart=False): + def __init__(self, smart=False, tenants=None): super(ReconfigureEvent, self).__init__() self.smart = smart + self.tenants = tenants def toDict(self): d = super().toDict() d["smart"] = self.smart + d["tenants"] = self.tenants return d @classmethod def fromDict(cls, data): - event = cls(data.get("smart", False)) + event = cls(data.get("smart", False), + data.get("tenants", None)) event.updateFromDict(data) return event diff --git a/zuul/scheduler.py b/zuul/scheduler.py index e0fd7f82d..e8b525c7a 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2,6 +2,7 @@ # Copyright 2013 OpenStack Foundation # Copyright 2013 Antoine "hashar" Musso # Copyright 2013 Wikimedia Foundation Inc. +# Copyright 2021-2022 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -105,7 +106,36 @@ from zuul.zk.system import ZuulSystem from zuul.zk.zkobject import ZKContext from zuul.zk.election import SessionAwareElection -COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl'] + +class FullReconfigureCommand(commandsocket.Command): + name = 'full-reconfigure' + help = 'Perform a reconfiguration of all tenants' + + +class SmartReconfigureCommand(commandsocket.Command): + name = 'smart-reconfigure' + help = 'Perform a reconfiguration of updated tenants' + + +class TenantArgument(commandsocket.Argument): + name = 'tenant' + help = 'The name of the tenant' + + +class TenantReconfigureCommand(commandsocket.Command): + name = 'tenant-reconfigure' + help = 'Perform a reconfiguration of a specific tenant' + args = [TenantArgument] + + +COMMANDS = [ + FullReconfigureCommand, + SmartReconfigureCommand, + TenantReconfigureCommand, + commandsocket.StopCommand, + commandsocket.ReplCommand, + commandsocket.NoReplCommand, +] class SchedulerStatsElection(SessionAwareElection): @@ -163,11 +193,13 @@ class Scheduler(threading.Thread): # Only used by tests in order to quiesce the main run loop self.run_handler_lock = threading.Lock() self.command_map = { - 'stop': self.stop, - 'full-reconfigure': self.fullReconfigureCommandHandler, - 'smart-reconfigure': self.smartReconfigureCommandHandler, - 'repl': self.start_repl, - 'norepl': self.stop_repl, + FullReconfigureCommand.name: self.fullReconfigureCommandHandler, + SmartReconfigureCommand.name: self.smartReconfigureCommandHandler, + TenantReconfigureCommand.name: + self.tenantReconfigureCommandHandler, + commandsocket.StopCommand.name: self.stop, + commandsocket.ReplCommand.name: self.startRepl, + commandsocket.NoReplCommand.name: self.stopRepl, } self._stopped = False @@ -330,7 +362,7 @@ class Scheduler(threading.Thread): self.log.debug("Waiting for layout update thread") self.layout_update_event.set() self.layout_update_thread.join() - self.stop_repl() + self.stopRepl() self._command_running = False self.log.debug("Stopping command socket") self.command_socket.stop() @@ -347,9 +379,9 @@ class Scheduler(threading.Thread): def runCommand(self): while self._command_running: try: - command = self.command_socket.get().decode('utf8') + command, args = self.command_socket.get() if command != '_stop': - self.command_map[command]() + self.command_map[command](*args) except Exception: self.log.exception("Exception while processing command") @@ -777,13 +809,16 @@ class Scheduler(threading.Thread): def smartReconfigureCommandHandler(self): self._zuul_app.smartReconfigure() - def start_repl(self): + def tenantReconfigureCommandHandler(self, tenant_name): + self._zuul_app.tenantReconfigure([tenant_name]) + + def startRepl(self): if self.repl: return self.repl = zuul.lib.repl.REPLServer(self) self.repl.start() - def stop_repl(self): + def stopRepl(self): if not self.repl: return self.repl.stop() @@ -872,10 +907,10 @@ class Scheduler(threading.Thread): self.wake_event.set() self.component_info.state = self.component_info.RUNNING - def reconfigure(self, config, smart=False): + def reconfigure(self, config, smart=False, tenants=None): self.log.debug("Submitting reconfiguration event") - event = ReconfigureEvent(smart=smart) + event = ReconfigureEvent(smart=smart, tenants=tenants) event.zuul_event_ltime = self.zk_client.getCurrentLtime() event.ack_ref = threading.Event() self.reconfigure_event_queue.put(event) @@ -1174,7 +1209,8 @@ class Scheduler(threading.Thread): # a request reconfigured_tenants = [] with self.layout_lock: - self.log.info("Reconfiguration beginning (smart=%s)", event.smart) + self.log.info("Reconfiguration beginning (smart=%s, tenants=%s)", + event.smart, event.tenants) start = time.monotonic() # Update runtime related system attributes from config @@ -1213,6 +1249,8 @@ class Scheduler(threading.Thread): new_tenant = self.unparsed_abide.tenants.get(tenant_name) if old_tenant == new_tenant: continue + if event.tenants and tenant_name not in event.tenants: + continue old_tenant = self.abide.tenants.get(tenant_name) if event.smart: @@ -1240,8 +1278,9 @@ class Scheduler(threading.Thread): self._reconfigureDeleteTenant(ctx, old_tenant) duration = round(time.monotonic() - start, 3) - self.log.info("Reconfiguration complete (smart: %s, " - "duration: %s seconds)", event.smart, duration) + self.log.info("Reconfiguration complete (smart: %s, tenants: %s, " + "duration: %s seconds)", event.smart, event.tenants, + duration) if event.smart: self.log.info("Reconfigured tenants: %s", reconfigured_tenants) diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index cf0e29578..bb69480e7 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -83,7 +83,11 @@ from zuul.web.logutil import ZuulCherrypyLogManager STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static') cherrypy.tools.websocket = WebSocketTool() -COMMANDS = ['stop', 'repl', 'norepl'] +COMMANDS = [ + commandsocket.StopCommand, + commandsocket.ReplCommand, + commandsocket.NoReplCommand, +] def get_request_logger(logger=None): @@ -1772,9 +1776,9 @@ class ZuulWeb(object): self.repl = None self.command_map = { - 'stop': self.stop, - 'repl': self.start_repl, - 'norepl': self.stop_repl, + commandsocket.StopCommand.name: self.stop, + commandsocket.ReplCommand.name: self.startRepl, + commandsocket.NoReplCommand.name: self.stopRepl, } self.finger_tls_key = get_default( @@ -1973,7 +1977,7 @@ class ZuulWeb(object): self.system_config_cache_wake_event.set() self.system_config_thread.join() self.zk_client.disconnect() - self.stop_repl() + self.stopRepl() self._command_running = False self.command_socket.stop() self.monitoring_server.stop() @@ -1985,19 +1989,19 @@ class ZuulWeb(object): def runCommand(self): while self._command_running: try: - command = self.command_socket.get().decode('utf8') + command, args = self.command_socket.get() if command != '_stop': self.command_map[command]() except Exception: self.log.exception("Exception while processing command") - def start_repl(self): + def startRepl(self): if self.repl: return self.repl = zuul.lib.repl.REPLServer(self) self.repl.start() - def stop_repl(self): + def stopRepl(self): if not self.repl: return self.repl.stop() |