diff options
-rw-r--r-- | doc/source/developer/model-changelog.rst | 7 | ||||
-rw-r--r-- | tests/unit/test_model_upgrade.py | 87 | ||||
-rw-r--r-- | zuul/configloader.py | 8 | ||||
-rw-r--r-- | zuul/model.py | 2 | ||||
-rw-r--r-- | zuul/zk/semaphore.py | 89 |
5 files changed, 148 insertions, 45 deletions
diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst index 50e55645c..5c45d4c62 100644 --- a/doc/source/developer/model-changelog.rst +++ b/doc/source/developer/model-changelog.rst @@ -31,3 +31,10 @@ Version 1 :Prior Zuul version: 4.11.0 :Description: No change since Version 0. This explicitly records the component versions in ZooKeeper. + +Version 2 +--------- + +:Prior Zuul version: 5.0.0 +:Description: Changes the sempahore handle format from `<item_uuid>-<job_name>` + to a dictionary with buildset path and job name. diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py index b8364cb07..985e883aa 100644 --- a/tests/unit/test_model_upgrade.py +++ b/tests/unit/test_model_upgrade.py @@ -63,3 +63,90 @@ class TestModelUpgrade(ZuulTestCase): for _ in iterate_timeout(30, "model api to update"): if component_registry.model_api == 1: break + + +class TestSemaphoreModelUpgrade(ZuulTestCase): + tenant_config_file = 'config/semaphore/main.yaml' + + @model_version(1) + def test_semaphore_handler_cleanup(self): + self.executor_server.hold_jobs_in_build = True + tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) + + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + semaphore_holders = tenant.semaphore_handler.semaphoreHolders( + "test-semaphore") + self.assertEqual(len(semaphore_holders), 1) + # Assert that we are still using the old-style handler format + self.assertTrue(all(isinstance(h, str) for h in semaphore_holders)) + + # Save some variables for later use while the job is running + check_pipeline = tenant.layout.pipelines['check'] + item = check_pipeline.getAllItems()[0] + job = item.getJob('semaphore-one-test1') + + tenant.semaphore_handler.cleanupLeaks() + # Nothing has leaked; our handle should be present. + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + # Make sure the semaphore is released normally + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) + + # Use our previously saved data to simulate a leaked semaphore + # with the OLD handler format. + tenant.semaphore_handler.acquire(item, job, False) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) + + tenant.semaphore_handler.cleanupLeaks() + # Make sure the semaphore is NOT cleaned up as the model version + # is still < 2 + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) + + # Upgrade our component + self.model_test_component_info.model_api = 2 + + tenant.semaphore_handler.cleanupLeaks() + # Make sure we are not touching old-style handlers during cleanup. + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) + + # Try to release the old-style semaphore after the model API upgrade. + tenant.semaphore_handler.release(item, job) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) + + # Use our previously saved data to simulate a leaked semaphore + # with the NEW handler format. + tenant.semaphore_handler.acquire(item, job, False) + semaphore_holders = tenant.semaphore_handler.semaphoreHolders( + "test-semaphore") + self.assertEqual(len(semaphore_holders), 1) + # Assert that we are now using the new-style handler format + self.assertTrue(all(isinstance(h, dict) for h in semaphore_holders)) + + tenant.semaphore_handler.cleanupLeaks() + # Make sure the leaked semaphore is cleaned up + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) diff --git a/zuul/configloader.py b/zuul/configloader.py index 58650b1da..7ed296d83 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -1638,9 +1638,11 @@ class TenantParser(object): tenant.layout = self._parseLayout( tenant, parsed_config, loading_errors, layout_uuid) - tenant.semaphore_handler = SemaphoreHandler( - self.zk_client, self.statsd, tenant.name, tenant.layout - ) + if self.scheduler: + tenant.semaphore_handler = SemaphoreHandler( + self.zk_client, self.statsd, tenant.name, tenant.layout, + self.scheduler.component_registry + ) return tenant diff --git a/zuul/model.py b/zuul/model.py index 778b6bd68..1a8b72442 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -49,7 +49,7 @@ from zuul.zk.change_cache import ChangeKey # When making ZK schema changes, increment this and add a record to # docs/developer/model-changelog.rst -MODEL_API = 1 +MODEL_API = 2 MERGER_MERGE = 1 # "git merge" MERGER_MERGE_RESOLVE = 2 # "git merge -s resolve" diff --git a/zuul/zk/semaphore.py b/zuul/zk/semaphore.py index 548d2756a..9e72cf896 100644 --- a/zuul/zk/semaphore.py +++ b/zuul/zk/semaphore.py @@ -38,10 +38,12 @@ class SemaphoreHandler(ZooKeeperSimpleBase): semaphore_root = "/zuul/semaphores" - def __init__(self, client, statsd, tenant_name, layout): + def __init__(self, client, statsd, tenant_name, layout, + component_registry): super().__init__(client) self.layout = layout self.statsd = statsd + self.component_registry = component_registry self.tenant_name = tenant_name self.tenant_root = f"{self.semaphore_root}/{tenant_name}" @@ -92,17 +94,26 @@ class SemaphoreHandler(ZooKeeperSimpleBase): semaphore_key = quote_plus(semaphore.name) semaphore_path = f"{self.tenant_root}/{semaphore_key}" - semaphore_handle = f"{item.uuid}-{job.name}" + semaphore_handle = { + "buildset_path": item.current_build_set.getPath(), + "job_name": job.name, + } + legacy_handle = f"{item.uuid}-{job.name}" self.kazoo_client.ensure_path(semaphore_path) semaphore_holders, zstat = self.getHolders(semaphore_path) - if semaphore_handle in semaphore_holders: + if (semaphore_handle in semaphore_holders + or legacy_handle in semaphore_holders): return True # semaphore is there, check max while len(semaphore_holders) < self._max_count(semaphore.name): - semaphore_holders.append(semaphore_handle) + # MODEL_API: >1 + if self.component_registry.model_api > 1: + semaphore_holders.append(semaphore_handle) + else: + semaphore_holders.append(legacy_handle) try: self.kazoo_client.set(semaphore_path, @@ -133,11 +144,17 @@ class SemaphoreHandler(ZooKeeperSimpleBase): except NoNodeError: return [] - def _release(self, log, semaphore_path, semaphore_handle, quiet): + def _release(self, log, semaphore_path, semaphore_handle, quiet, + legacy_handle=None): while True: try: semaphore_holders, zstat = self.getHolders(semaphore_path) - semaphore_holders.remove(semaphore_handle) + try: + semaphore_holders.remove(semaphore_handle) + except ValueError: + if legacy_handle is None: + raise + semaphore_holders.remove(legacy_handle) except (ValueError, NoNodeError): if not quiet: log.error("Semaphore %s can not be released for %s " @@ -172,9 +189,13 @@ class SemaphoreHandler(ZooKeeperSimpleBase): def _release_one(self, log, item, job, semaphore, quiet): semaphore_key = quote_plus(semaphore.name) semaphore_path = f"{self.tenant_root}/{semaphore_key}" - semaphore_handle = f"{item.uuid}-{job.name}" - - self._release(log, semaphore_path, semaphore_handle, quiet) + semaphore_handle = { + "buildset_path": item.current_build_set.getPath(), + "job_name": job.name, + } + legacy_handle = f"{item.uuid}-{job.name}" + self._release(log, semaphore_path, semaphore_handle, quiet, + legacy_handle) def semaphoreHolders(self, semaphore_name): semaphore_key = quote_plus(semaphore_name) @@ -190,38 +211,24 @@ class SemaphoreHandler(ZooKeeperSimpleBase): return 1 if semaphore is None else semaphore.max def cleanupLeaks(self): - # This is designed to account for jobs starting and stopping - # while this runs, and should therefore be safe to run outside - # of the scheduler main loop (and accross multiple - # schedulers). - - first_semaphores_by_holder = {} - for semaphore in self.getSemaphores(): - for holder in self.semaphoreHolders(semaphore): - first_semaphores_by_holder[holder] = semaphore - first_holders = set(first_semaphores_by_holder.keys()) - - running_handles = set() - for pipeline in self.layout.pipelines.values(): - for item in pipeline.getAllItems(include_old=True): - for job in item.getJobs(): - running_handles.add(f"{item.uuid}-{job.name}") - - second_semaphores_by_holder = {} - for semaphore in self.getSemaphores(): - for holder in self.semaphoreHolders(semaphore): - second_semaphores_by_holder[holder] = semaphore - second_holders = set(second_semaphores_by_holder.keys()) - - # The stable set of holders; avoids race conditions with - # scheduler(s) starting jobs. - holders = first_holders.intersection(second_holders) - semaphores_by_holder = first_semaphores_by_holder - semaphores_by_holder.update(second_semaphores_by_holder) - - for holder in holders: - if holder not in running_handles: - semaphore_name = semaphores_by_holder[holder] + # MODEL_API: >1 + if self.component_registry.model_api < 2: + self.log.warning("Skipping semaphore cleanup since minimum model " + "API is %s (needs >= 2)", + self.component_registry.model_api) + return + + for semaphore_name in self.getSemaphores(): + for holder in self.semaphoreHolders(semaphore_name): + if isinstance(holder, str): + self.log.warning( + "Ignoring legacy semaphore holder %s for semaphore %s", + holder, semaphore_name) + continue + if (self.kazoo_client.exists(holder["buildset_path"]) + is not None): + continue + semaphore_key = quote_plus(semaphore_name) semaphore_path = f"{self.tenant_root}/{semaphore_key}" self.log.error("Releasing leaked semaphore %s held by %s", |