summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--releasenotes/notes/wait-for-init-934370422b22b442.yaml8
-rw-r--r--tests/base.py21
-rw-r--r--tests/unit/test_scheduler.py17
-rwxr-xr-xzuul/cmd/scheduler.py7
-rw-r--r--zuul/scheduler.py7
5 files changed, 49 insertions, 11 deletions
diff --git a/releasenotes/notes/wait-for-init-934370422b22b442.yaml b/releasenotes/notes/wait-for-init-934370422b22b442.yaml
new file mode 100644
index 000000000..9033131f1
--- /dev/null
+++ b/releasenotes/notes/wait-for-init-934370422b22b442.yaml
@@ -0,0 +1,8 @@
+---
+features:
+ - |
+ The scheduler now accepts an argument `--wait-for-init` which will
+ cause it to wait until all tenants have been initialized before it
+ begins processing pipelines. This may help large systems with
+ excess scheduler capacity perform a rolling restart of schedulers
+ more quickly.
diff --git a/tests/base.py b/tests/base.py
index 504a9718d..2076fc2ad 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -4379,11 +4379,12 @@ class SchedulerTestApp:
def __init__(self, log, config, changes, additional_event_queues,
upstream_root, poller_events,
git_url_with_auth, add_cleanup, validate_tenants,
- instance_id):
+ wait_for_init, instance_id):
self.log = log
self.config = config
self.changes = changes
self.validate_tenants = validate_tenants
+ self.wait_for_init = wait_for_init
# Register connections from the config using fakes
self.connections = TestConnectionRegistry(
@@ -4397,7 +4398,8 @@ class SchedulerTestApp:
)
self.connections.configure(self.config)
- self.sched = TestScheduler(self.config, self.connections, self)
+ self.sched = TestScheduler(self.config, self.connections, self,
+ wait_for_init)
self.sched.log = logging.getLogger(f"zuul.Scheduler-{instance_id}")
self.sched._stats_interval = 1
@@ -4463,13 +4465,12 @@ class SchedulerTestApp:
class SchedulerTestManager:
- def __init__(self, validate_tenants):
+ def __init__(self, validate_tenants, wait_for_init):
self.instances = []
- self.validate_tenants = validate_tenants
def create(self, log, config, changes, additional_event_queues,
- upstream_root, poller_events,
- git_url_with_auth, add_cleanup, validate_tenants):
+ upstream_root, poller_events, git_url_with_auth,
+ add_cleanup, validate_tenants, wait_for_init):
# Since the config contains a regex we cannot use copy.deepcopy()
# as this will raise an exception with Python <3.7
config_data = StringIO()
@@ -4490,7 +4491,7 @@ class SchedulerTestManager:
additional_event_queues, upstream_root,
poller_events,
git_url_with_auth, add_cleanup,
- validate_tenants, instance_id)
+ validate_tenants, wait_for_init, instance_id)
self.instances.append(app)
return app
@@ -4593,6 +4594,7 @@ class ZuulTestCase(BaseTestCase):
git_url_with_auth: bool = False
log_console_port: int = 19885
validate_tenants = None
+ wait_for_init = None
scheduler_count = SCHEDULER_COUNT
def __getattr__(self, name):
@@ -4757,7 +4759,8 @@ class ZuulTestCase(BaseTestCase):
self.history = self.executor_server.build_history
self.builds = self.executor_server.running_builds
- self.scheds = SchedulerTestManager(self.validate_tenants)
+ self.scheds = SchedulerTestManager(self.validate_tenants,
+ self.wait_for_init)
for _ in range(self.scheduler_count):
self.createScheduler()
@@ -4776,7 +4779,7 @@ class ZuulTestCase(BaseTestCase):
self.log, self.config, self.changes,
self.additional_event_queues, self.upstream_root,
self.poller_events, self.git_url_with_auth,
- self.addCleanup, self.validate_tenants)
+ self.addCleanup, self.validate_tenants, self.wait_for_init)
def createZKContext(self, lock=None):
if lock is None:
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 5cc70b9cf..f3ee0d28e 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -8793,3 +8793,20 @@ class TestEventProcessing(ZuulTestCase):
dict(name='tagjob', result='SUCCESS'),
dict(name='checkjob', result='SUCCESS', changes='1,1'),
], ordered=False)
+
+
+class TestWaitForInit(ZuulTestCase):
+ tenant_config_file = 'config/single-tenant/main.yaml'
+ wait_for_init = True
+
+ def setUp(self):
+ with self.assertLogs('zuul.Scheduler-0', level='DEBUG') as full_logs:
+ super().setUp()
+ self.assertRegexInList('Waiting for tenant initialization',
+ full_logs.output)
+
+ def test_wait_for_init(self):
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index 3ab93670f..7ed30b68e 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -39,6 +39,10 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
'listed, all tenants will be validated. '
'Note: this requires ZooKeeper and '
'will distribute work to mergers.')
+ parser.add_argument('--wait-for-init', dest='wait_for_init',
+ action='store_true',
+ help='Wait until all tenants are fully loaded '
+ 'before beginning to process events.')
self.addSubCommands(parser, zuul.scheduler.COMMANDS)
return parser
@@ -82,7 +86,8 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.configure_connections(require_sql=True)
self.sched = zuul.scheduler.Scheduler(self.config,
- self.connections, self)
+ self.connections, self,
+ self.args.wait_for_init)
if self.args.validate_tenants is None:
self.connections.registerScheduler(self.sched)
self.connections.load(self.sched.zk_client,
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 2b1ac8c77..5c1808d26 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -184,9 +184,11 @@ class Scheduler(threading.Thread):
_merger_client_class = MergeClient
_executor_client_class = ExecutorClient
- def __init__(self, config, connections, app, testonly=False):
+ def __init__(self, config, connections, app, wait_for_init,
+ testonly=False):
threading.Thread.__init__(self)
self.daemon = True
+ self.wait_for_init = wait_for_init
self.hostname = socket.getfqdn()
self.primed_event = threading.Event()
# Wake up the main run loop
@@ -1901,6 +1903,9 @@ class Scheduler(threading.Thread):
self.log.debug("Statsd enabled")
else:
self.log.debug("Statsd not configured")
+ if self.wait_for_init:
+ self.log.debug("Waiting for tenant initialization")
+ self.primed_event.wait()
while True:
self.log.debug("Run handler sleeping")
self.wake_event.wait()