diff options
-rw-r--r-- | releasenotes/notes/wait-for-init-934370422b22b442.yaml | 8 | ||||
-rw-r--r-- | tests/base.py | 21 | ||||
-rw-r--r-- | tests/unit/test_scheduler.py | 17 | ||||
-rwxr-xr-x | zuul/cmd/scheduler.py | 7 | ||||
-rw-r--r-- | zuul/scheduler.py | 7 |
5 files changed, 49 insertions, 11 deletions
diff --git a/releasenotes/notes/wait-for-init-934370422b22b442.yaml b/releasenotes/notes/wait-for-init-934370422b22b442.yaml new file mode 100644 index 000000000..9033131f1 --- /dev/null +++ b/releasenotes/notes/wait-for-init-934370422b22b442.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + The scheduler now accepts an argument `--wait-for-init` which will + cause it to wait until all tenants have been initialized before it + begins processing pipelines. This may help large systems with + excess scheduler capacity perform a rolling restart of schedulers + more quickly. diff --git a/tests/base.py b/tests/base.py index 504a9718d..2076fc2ad 100644 --- a/tests/base.py +++ b/tests/base.py @@ -4379,11 +4379,12 @@ class SchedulerTestApp: def __init__(self, log, config, changes, additional_event_queues, upstream_root, poller_events, git_url_with_auth, add_cleanup, validate_tenants, - instance_id): + wait_for_init, instance_id): self.log = log self.config = config self.changes = changes self.validate_tenants = validate_tenants + self.wait_for_init = wait_for_init # Register connections from the config using fakes self.connections = TestConnectionRegistry( @@ -4397,7 +4398,8 @@ class SchedulerTestApp: ) self.connections.configure(self.config) - self.sched = TestScheduler(self.config, self.connections, self) + self.sched = TestScheduler(self.config, self.connections, self, + wait_for_init) self.sched.log = logging.getLogger(f"zuul.Scheduler-{instance_id}") self.sched._stats_interval = 1 @@ -4463,13 +4465,12 @@ class SchedulerTestApp: class SchedulerTestManager: - def __init__(self, validate_tenants): + def __init__(self, validate_tenants, wait_for_init): self.instances = [] - self.validate_tenants = validate_tenants def create(self, log, config, changes, additional_event_queues, - upstream_root, poller_events, - git_url_with_auth, add_cleanup, validate_tenants): + upstream_root, poller_events, git_url_with_auth, + add_cleanup, validate_tenants, wait_for_init): # Since the config contains a regex we cannot use copy.deepcopy() # as this will raise an exception with Python <3.7 config_data = StringIO() @@ -4490,7 +4491,7 @@ class SchedulerTestManager: additional_event_queues, upstream_root, poller_events, git_url_with_auth, add_cleanup, - validate_tenants, instance_id) + validate_tenants, wait_for_init, instance_id) self.instances.append(app) return app @@ -4593,6 +4594,7 @@ class ZuulTestCase(BaseTestCase): git_url_with_auth: bool = False log_console_port: int = 19885 validate_tenants = None + wait_for_init = None scheduler_count = SCHEDULER_COUNT def __getattr__(self, name): @@ -4757,7 +4759,8 @@ class ZuulTestCase(BaseTestCase): self.history = self.executor_server.build_history self.builds = self.executor_server.running_builds - self.scheds = SchedulerTestManager(self.validate_tenants) + self.scheds = SchedulerTestManager(self.validate_tenants, + self.wait_for_init) for _ in range(self.scheduler_count): self.createScheduler() @@ -4776,7 +4779,7 @@ class ZuulTestCase(BaseTestCase): self.log, self.config, self.changes, self.additional_event_queues, self.upstream_root, self.poller_events, self.git_url_with_auth, - self.addCleanup, self.validate_tenants) + self.addCleanup, self.validate_tenants, self.wait_for_init) def createZKContext(self, lock=None): if lock is None: diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 5cc70b9cf..f3ee0d28e 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -8793,3 +8793,20 @@ class TestEventProcessing(ZuulTestCase): dict(name='tagjob', result='SUCCESS'), dict(name='checkjob', result='SUCCESS', changes='1,1'), ], ordered=False) + + +class TestWaitForInit(ZuulTestCase): + tenant_config_file = 'config/single-tenant/main.yaml' + wait_for_init = True + + def setUp(self): + with self.assertLogs('zuul.Scheduler-0', level='DEBUG') as full_logs: + super().setUp() + self.assertRegexInList('Waiting for tenant initialization', + full_logs.output) + + def test_wait_for_init(self): + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.addApproval('Code-Review', 2) + self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) + self.waitUntilSettled() diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py index 3ab93670f..7ed30b68e 100755 --- a/zuul/cmd/scheduler.py +++ b/zuul/cmd/scheduler.py @@ -39,6 +39,10 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): 'listed, all tenants will be validated. ' 'Note: this requires ZooKeeper and ' 'will distribute work to mergers.') + parser.add_argument('--wait-for-init', dest='wait_for_init', + action='store_true', + help='Wait until all tenants are fully loaded ' + 'before beginning to process events.') self.addSubCommands(parser, zuul.scheduler.COMMANDS) return parser @@ -82,7 +86,8 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): self.configure_connections(require_sql=True) self.sched = zuul.scheduler.Scheduler(self.config, - self.connections, self) + self.connections, self, + self.args.wait_for_init) if self.args.validate_tenants is None: self.connections.registerScheduler(self.sched) self.connections.load(self.sched.zk_client, diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 2b1ac8c77..5c1808d26 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -184,9 +184,11 @@ class Scheduler(threading.Thread): _merger_client_class = MergeClient _executor_client_class = ExecutorClient - def __init__(self, config, connections, app, testonly=False): + def __init__(self, config, connections, app, wait_for_init, + testonly=False): threading.Thread.__init__(self) self.daemon = True + self.wait_for_init = wait_for_init self.hostname = socket.getfqdn() self.primed_event = threading.Event() # Wake up the main run loop @@ -1901,6 +1903,9 @@ class Scheduler(threading.Thread): self.log.debug("Statsd enabled") else: self.log.debug("Statsd not configured") + if self.wait_for_init: + self.log.debug("Waiting for tenant initialization") + self.primed_event.wait() while True: self.log.debug("Run handler sleeping") self.wake_event.wait() |