summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/source/developer/model-changelog.rst7
-rw-r--r--tests/unit/test_model_upgrade.py87
-rw-r--r--zuul/configloader.py8
-rw-r--r--zuul/model.py2
-rw-r--r--zuul/zk/semaphore.py89
5 files changed, 148 insertions, 45 deletions
diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst
index 50e55645c..5c45d4c62 100644
--- a/doc/source/developer/model-changelog.rst
+++ b/doc/source/developer/model-changelog.rst
@@ -31,3 +31,10 @@ Version 1
:Prior Zuul version: 4.11.0
:Description: No change since Version 0. This explicitly records the
component versions in ZooKeeper.
+
+Version 2
+---------
+
+:Prior Zuul version: 5.0.0
+:Description: Changes the sempahore handle format from `<item_uuid>-<job_name>`
+ to a dictionary with buildset path and job name.
diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py
index b8364cb07..985e883aa 100644
--- a/tests/unit/test_model_upgrade.py
+++ b/tests/unit/test_model_upgrade.py
@@ -63,3 +63,90 @@ class TestModelUpgrade(ZuulTestCase):
for _ in iterate_timeout(30, "model api to update"):
if component_registry.model_api == 1:
break
+
+
+class TestSemaphoreModelUpgrade(ZuulTestCase):
+ tenant_config_file = 'config/semaphore/main.yaml'
+
+ @model_version(1)
+ def test_semaphore_handler_cleanup(self):
+ self.executor_server.hold_jobs_in_build = True
+ tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.assertEqual(
+ len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
+ 0)
+
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ semaphore_holders = tenant.semaphore_handler.semaphoreHolders(
+ "test-semaphore")
+ self.assertEqual(len(semaphore_holders), 1)
+ # Assert that we are still using the old-style handler format
+ self.assertTrue(all(isinstance(h, str) for h in semaphore_holders))
+
+ # Save some variables for later use while the job is running
+ check_pipeline = tenant.layout.pipelines['check']
+ item = check_pipeline.getAllItems()[0]
+ job = item.getJob('semaphore-one-test1')
+
+ tenant.semaphore_handler.cleanupLeaks()
+ # Nothing has leaked; our handle should be present.
+ self.assertEqual(
+ len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
+ 1)
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ # Make sure the semaphore is released normally
+ self.assertEqual(
+ len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
+ 0)
+
+ # Use our previously saved data to simulate a leaked semaphore
+ # with the OLD handler format.
+ tenant.semaphore_handler.acquire(item, job, False)
+ self.assertEqual(
+ len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
+ 1)
+
+ tenant.semaphore_handler.cleanupLeaks()
+ # Make sure the semaphore is NOT cleaned up as the model version
+ # is still < 2
+ self.assertEqual(
+ len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
+ 1)
+
+ # Upgrade our component
+ self.model_test_component_info.model_api = 2
+
+ tenant.semaphore_handler.cleanupLeaks()
+ # Make sure we are not touching old-style handlers during cleanup.
+ self.assertEqual(
+ len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
+ 1)
+
+ # Try to release the old-style semaphore after the model API upgrade.
+ tenant.semaphore_handler.release(item, job)
+ self.assertEqual(
+ len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
+ 0)
+
+ # Use our previously saved data to simulate a leaked semaphore
+ # with the NEW handler format.
+ tenant.semaphore_handler.acquire(item, job, False)
+ semaphore_holders = tenant.semaphore_handler.semaphoreHolders(
+ "test-semaphore")
+ self.assertEqual(len(semaphore_holders), 1)
+ # Assert that we are now using the new-style handler format
+ self.assertTrue(all(isinstance(h, dict) for h in semaphore_holders))
+
+ tenant.semaphore_handler.cleanupLeaks()
+ # Make sure the leaked semaphore is cleaned up
+ self.assertEqual(
+ len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
+ 0)
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 58650b1da..7ed296d83 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -1638,9 +1638,11 @@ class TenantParser(object):
tenant.layout = self._parseLayout(
tenant, parsed_config, loading_errors, layout_uuid)
- tenant.semaphore_handler = SemaphoreHandler(
- self.zk_client, self.statsd, tenant.name, tenant.layout
- )
+ if self.scheduler:
+ tenant.semaphore_handler = SemaphoreHandler(
+ self.zk_client, self.statsd, tenant.name, tenant.layout,
+ self.scheduler.component_registry
+ )
return tenant
diff --git a/zuul/model.py b/zuul/model.py
index 778b6bd68..1a8b72442 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -49,7 +49,7 @@ from zuul.zk.change_cache import ChangeKey
# When making ZK schema changes, increment this and add a record to
# docs/developer/model-changelog.rst
-MODEL_API = 1
+MODEL_API = 2
MERGER_MERGE = 1 # "git merge"
MERGER_MERGE_RESOLVE = 2 # "git merge -s resolve"
diff --git a/zuul/zk/semaphore.py b/zuul/zk/semaphore.py
index 548d2756a..9e72cf896 100644
--- a/zuul/zk/semaphore.py
+++ b/zuul/zk/semaphore.py
@@ -38,10 +38,12 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
semaphore_root = "/zuul/semaphores"
- def __init__(self, client, statsd, tenant_name, layout):
+ def __init__(self, client, statsd, tenant_name, layout,
+ component_registry):
super().__init__(client)
self.layout = layout
self.statsd = statsd
+ self.component_registry = component_registry
self.tenant_name = tenant_name
self.tenant_root = f"{self.semaphore_root}/{tenant_name}"
@@ -92,17 +94,26 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
semaphore_key = quote_plus(semaphore.name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
- semaphore_handle = f"{item.uuid}-{job.name}"
+ semaphore_handle = {
+ "buildset_path": item.current_build_set.getPath(),
+ "job_name": job.name,
+ }
+ legacy_handle = f"{item.uuid}-{job.name}"
self.kazoo_client.ensure_path(semaphore_path)
semaphore_holders, zstat = self.getHolders(semaphore_path)
- if semaphore_handle in semaphore_holders:
+ if (semaphore_handle in semaphore_holders
+ or legacy_handle in semaphore_holders):
return True
# semaphore is there, check max
while len(semaphore_holders) < self._max_count(semaphore.name):
- semaphore_holders.append(semaphore_handle)
+ # MODEL_API: >1
+ if self.component_registry.model_api > 1:
+ semaphore_holders.append(semaphore_handle)
+ else:
+ semaphore_holders.append(legacy_handle)
try:
self.kazoo_client.set(semaphore_path,
@@ -133,11 +144,17 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
except NoNodeError:
return []
- def _release(self, log, semaphore_path, semaphore_handle, quiet):
+ def _release(self, log, semaphore_path, semaphore_handle, quiet,
+ legacy_handle=None):
while True:
try:
semaphore_holders, zstat = self.getHolders(semaphore_path)
- semaphore_holders.remove(semaphore_handle)
+ try:
+ semaphore_holders.remove(semaphore_handle)
+ except ValueError:
+ if legacy_handle is None:
+ raise
+ semaphore_holders.remove(legacy_handle)
except (ValueError, NoNodeError):
if not quiet:
log.error("Semaphore %s can not be released for %s "
@@ -172,9 +189,13 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
def _release_one(self, log, item, job, semaphore, quiet):
semaphore_key = quote_plus(semaphore.name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
- semaphore_handle = f"{item.uuid}-{job.name}"
-
- self._release(log, semaphore_path, semaphore_handle, quiet)
+ semaphore_handle = {
+ "buildset_path": item.current_build_set.getPath(),
+ "job_name": job.name,
+ }
+ legacy_handle = f"{item.uuid}-{job.name}"
+ self._release(log, semaphore_path, semaphore_handle, quiet,
+ legacy_handle)
def semaphoreHolders(self, semaphore_name):
semaphore_key = quote_plus(semaphore_name)
@@ -190,38 +211,24 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
return 1 if semaphore is None else semaphore.max
def cleanupLeaks(self):
- # This is designed to account for jobs starting and stopping
- # while this runs, and should therefore be safe to run outside
- # of the scheduler main loop (and accross multiple
- # schedulers).
-
- first_semaphores_by_holder = {}
- for semaphore in self.getSemaphores():
- for holder in self.semaphoreHolders(semaphore):
- first_semaphores_by_holder[holder] = semaphore
- first_holders = set(first_semaphores_by_holder.keys())
-
- running_handles = set()
- for pipeline in self.layout.pipelines.values():
- for item in pipeline.getAllItems(include_old=True):
- for job in item.getJobs():
- running_handles.add(f"{item.uuid}-{job.name}")
-
- second_semaphores_by_holder = {}
- for semaphore in self.getSemaphores():
- for holder in self.semaphoreHolders(semaphore):
- second_semaphores_by_holder[holder] = semaphore
- second_holders = set(second_semaphores_by_holder.keys())
-
- # The stable set of holders; avoids race conditions with
- # scheduler(s) starting jobs.
- holders = first_holders.intersection(second_holders)
- semaphores_by_holder = first_semaphores_by_holder
- semaphores_by_holder.update(second_semaphores_by_holder)
-
- for holder in holders:
- if holder not in running_handles:
- semaphore_name = semaphores_by_holder[holder]
+ # MODEL_API: >1
+ if self.component_registry.model_api < 2:
+ self.log.warning("Skipping semaphore cleanup since minimum model "
+ "API is %s (needs >= 2)",
+ self.component_registry.model_api)
+ return
+
+ for semaphore_name in self.getSemaphores():
+ for holder in self.semaphoreHolders(semaphore_name):
+ if isinstance(holder, str):
+ self.log.warning(
+ "Ignoring legacy semaphore holder %s for semaphore %s",
+ holder, semaphore_name)
+ continue
+ if (self.kazoo_client.exists(holder["buildset_path"])
+ is not None):
+ continue
+
semaphore_key = quote_plus(semaphore_name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
self.log.error("Releasing leaked semaphore %s held by %s",