summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-04-12 01:51:33 +0000
committerGerrit Code Review <review@openstack.org>2022-04-12 01:51:33 +0000
commitf0bf27521428699fb8c17cc9db4378b9cb65028b (patch)
tree6f094726a03792408522f2687d718088cd8bcb4a
parent421df5935ec04034c8295251c1ec744354242bab (diff)
parent5415c40989ee7be509c6ec2bd2759aa8b9cd9fa4 (diff)
downloadzuul-f0bf27521428699fb8c17cc9db4378b9cb65028b.tar.gz
Merge "Add a blob store and store large secrets in it"
-rw-r--r--doc/source/developer/model-changelog.rst9
-rw-r--r--doc/source/developer/zookeeper.rst12
-rw-r--r--tests/unit/test_model_upgrade.py4
-rw-r--r--tests/unit/test_v3.py35
-rw-r--r--tests/unit/test_zk.py35
-rw-r--r--zuul/executor/server.py9
-rw-r--r--zuul/model.py51
-rw-r--r--zuul/model_api.py2
-rw-r--r--zuul/scheduler.py36
-rw-r--r--zuul/zk/blob_store.py201
-rw-r--r--zuul/zk/zkobject.py17
11 files changed, 402 insertions, 9 deletions
diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst
index 7562d9239..efdf50e3d 100644
--- a/doc/source/developer/model-changelog.rst
+++ b/doc/source/developer/model-changelog.rst
@@ -70,3 +70,12 @@ Version 6
:Prior Zuul version: 5.2.0
:Description: Stores the complete layout min_ltimes in /zuul/layout-data.
This only affects schedulers.
+
+Version 7
+---------
+
+:Prior Zuul version: 5.2.2
+:Description: Adds the blob store and stores large secrets in it.
+ Playbook secret references are now either an integer
+ index into the job secret list, or a dict with a blob
+ store key. This affects schedulers and executors.
diff --git a/doc/source/developer/zookeeper.rst b/doc/source/developer/zookeeper.rst
index 3bd4c7d45..c14047ad6 100644
--- a/doc/source/developer/zookeeper.rst
+++ b/doc/source/developer/zookeeper.rst
@@ -149,6 +149,18 @@ This is a reference for object layout in Zookeeper.
These are sharded JSON blobs of the change data.
+.. path:: zuul/cache/blob/data
+
+ Data for the blob store. These nodes are identified by a
+ sha256sum of the secret content.
+
+ These are sharded blobs of data.
+
+.. path:: zuul/cache/blob/lock
+
+ Side-channel lock directory for the blob store. The store locks
+ by key id under this znode when writing.
+
.. path:: zuul/cleanup
This node holds locks for the cleanup routines to make sure that
diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py
index 83fdb692e..536c4f91d 100644
--- a/tests/unit/test_model_upgrade.py
+++ b/tests/unit/test_model_upgrade.py
@@ -177,6 +177,10 @@ class TestModelUpgrade(ZuulTestCase):
self.assertEqual(first.sched.local_layout_state.get("tenant-one"),
second.sched.local_layout_state.get("tenant-one"))
+ # No test for model version 7 (secrets in blob store): old and new
+ # code paths are exercised in existing tests since small secrets
+ # don't use the blob store.
+
class TestSemaphoreModelUpgrade(ZuulTestCase):
tenant_config_file = 'config/semaphore/main.yaml'
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index cd031fcda..5e21cd57f 100644
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -25,12 +25,14 @@ from time import sleep
from unittest import skip, skipIf
from zuul.lib import yamlutil
+import fixtures
import git
import paramiko
import zuul.configloader
from zuul.lib import yamlutil as yaml
from zuul.model import MergeRequest
+from zuul.zk.blob_store import BlobStore
from tests.base import (
AnsibleZuulTestCase,
@@ -5664,6 +5666,39 @@ class TestSecrets(ZuulTestCase):
self._getSecrets('project2-complex', 'playbooks'),
[secret])
+ def test_blobstore_secret(self):
+ # Test the large secret blob store
+ self.executor_server.hold_jobs_in_build = True
+ self.useFixture(fixtures.MonkeyPatch(
+ 'zuul.model.Job.SECRET_BLOB_SIZE',
+ 1))
+
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ context = self.scheds.first.sched.createZKContext(None, self.log)
+ bs = BlobStore(context)
+ self.assertEqual(len(bs), 1)
+
+ self.scheds.first.sched._runBlobStoreCleanup()
+ self.assertEqual(len(bs), 1)
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.reported, 1, "A should report success")
+ self.assertHistory([
+ dict(name='project1-secret', result='SUCCESS', changes='1,1'),
+ ])
+ self.assertEqual(
+ [{'secret_name': self.secret}],
+ self._getSecrets('project1-secret', 'playbooks'))
+
+ self.scheds.first.sched._runBlobStoreCleanup()
+ self.assertEqual(len(bs), 0)
+
class TestSecretInheritance(ZuulTestCase):
tenant_config_file = 'config/secret-inheritance/main.yaml'
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py
index c8b8c74a8..e2893e9df 100644
--- a/tests/unit/test_zk.py
+++ b/tests/unit/test_zk.py
@@ -25,6 +25,7 @@ from zuul import model
from zuul.lib import yamlutil as yaml
from zuul.model import BuildRequest, HoldRequest, MergeRequest
from zuul.zk import ZooKeeperClient
+from zuul.zk.blob_store import BlobStore
from zuul.zk.branch_cache import BranchCache
from zuul.zk.change_cache import (
AbstractChangeCache,
@@ -1980,3 +1981,37 @@ class TestConfigurationErrorList(ZooKeeperBaseTestCase):
self.assertEqual(el1.errors[0], e1)
self.assertNotEqual(e1, e2)
self.assertEqual([e1, e2], [e1, e2])
+
+
+class TestBlobStore(ZooKeeperBaseTestCase):
+ def test_blob_store(self):
+ stop_event = threading.Event()
+ self.zk_client.client.create('/zuul/pipeline', makepath=True)
+ # Create a new object
+ tenant_name = 'fake_tenant'
+
+ start_ltime = self.zk_client.getCurrentLtime()
+ with tenant_write_lock(self.zk_client, tenant_name) as lock:
+ context = ZKContext(self.zk_client, lock, stop_event, self.log)
+ bs = BlobStore(context)
+ with testtools.ExpectedException(KeyError):
+ bs.get('nope')
+
+ path = bs.put(b'something')
+
+ self.assertEqual(bs.get(path), b'something')
+ self.assertEqual([x for x in bs], [path])
+ self.assertEqual(len(bs), 1)
+
+ self.assertTrue(path in bs)
+ self.assertFalse('nope' in bs)
+ self.assertTrue(bs._checkKey(path))
+ self.assertFalse(bs._checkKey('nope'))
+
+ cur_ltime = self.zk_client.getCurrentLtime()
+ self.assertEqual(bs.getKeysLastUsedBefore(cur_ltime), {path})
+ self.assertEqual(bs.getKeysLastUsedBefore(start_ltime), set())
+ bs.delete(path, cur_ltime)
+
+ with testtools.ExpectedException(KeyError):
+ bs.get(path)
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 158042be4..4a7e144bd 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -74,6 +74,7 @@ import zuul.model
from zuul.nodepool import Nodepool
from zuul.version import get_version_string
from zuul.zk.event_queues import PipelineResultEventQueue
+from zuul.zk.blob_store import BlobStore
from zuul.zk.components import ExecutorComponent, COMPONENT_REGISTRY
from zuul.zk.exceptions import JobRequestNotFound
from zuul.zk.executor import ExecutorApi
@@ -2121,8 +2122,14 @@ class AnsibleJob(object):
"""
ret = {}
+ blobstore = BlobStore(self.executor_server.zk_context)
for secret_name, secret_index in secrets.items():
- frozen_secret = self.job.secrets[secret_index]
+ if isinstance(secret_index, dict):
+ key = secret_index['blob']
+ data = blobstore.get(key)
+ frozen_secret = json.loads(data.decode('utf-8'))
+ else:
+ frozen_secret = self.job.secrets[secret_index]
secret = zuul.model.Secret(secret_name, None)
secret.secret_data = yaml.encrypted_load(
frozen_secret['encrypted_data'])
diff --git a/zuul/model.py b/zuul/model.py
index c8e3dd955..0bd5cb88b 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -46,6 +46,7 @@ from zuul.lib.logutil import get_annotated_logger
from zuul.lib.capabilities import capabilities_registry
from zuul.lib.jsonutil import json_dumps
from zuul.zk import zkobject
+from zuul.zk.blob_store import BlobStore
from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
@@ -2280,6 +2281,12 @@ class FrozenJob(zkobject.ZKObject):
_artifact_data=self._makeJobData(
context, 'artifact_data', artifact_data))
+ @property
+ def all_playbooks(self):
+ for k in ('pre_run', 'run', 'post_run', 'cleanup_run'):
+ playbooks = getattr(self, k)
+ yield from playbooks
+
class Job(ConfigObject):
"""A Job represents the defintion of actions to perform.
@@ -2298,6 +2305,8 @@ class Job(ConfigObject):
empty_nodeset = NodeSet()
BASE_JOB_MARKER = object()
+ # Secrets larger than this size will be put in the blob store
+ SECRET_BLOB_SIZE = 10 * 1024
def isBase(self):
return self.parent is self.BASE_JOB_MARKER
@@ -2490,17 +2499,35 @@ class Job(ConfigObject):
role['project'] = role_project.name
return d
- def _deduplicateSecrets(self, secrets, playbook):
+ def _deduplicateSecrets(self, context, secrets, playbook):
# secrets is a list of secrets accumulated so far
# playbook is a frozen playbook from _freezePlaybook
+ # At the end of this method, the values in the playbook
+ # secrets dictionary will be mutated to either be an integer
+ # (which is an index into the job's secret list) or a dict
+ # (which contains a pointer to a key in the global blob
+ # store).
+
+ blobstore = BlobStore(context)
+
# Cast to list so we can modify in place
for secret_key, secret_value in list(playbook['secrets'].items()):
- if secret_value in secrets:
- playbook['secrets'][secret_key] = secrets.index(secret_value)
+ secret_serialized = json_dumps(
+ secret_value, sort_keys=True).encode("utf8")
+ if (COMPONENT_REGISTRY.model_api >= 6 and
+ len(secret_serialized) > self.SECRET_BLOB_SIZE):
+ # If the secret is large, store it in the blob store
+ # and store the key in the playbook secrets dict.
+ blob_key = blobstore.put(secret_serialized)
+ playbook['secrets'][secret_key] = {'blob': blob_key}
else:
- secrets.append(secret_value)
- playbook['secrets'][secret_key] = len(secrets) - 1
+ if secret_value in secrets:
+ playbook['secrets'][secret_key] =\
+ secrets.index(secret_value)
+ else:
+ secrets.append(secret_value)
+ playbook['secrets'][secret_key] = len(secrets) - 1
def freezeJob(self, context, tenant, layout, item,
redact_secrets_and_keys):
@@ -2534,7 +2561,7 @@ class Job(ConfigObject):
# it's clear that the value ("REDACTED") is
# redacted.
for pb in v:
- self._deduplicateSecrets(secrets, pb)
+ self._deduplicateSecrets(context, secrets, pb)
kw[k] = v
kw['secrets'] = secrets
kw['affected_projects'] = self._getAffectedProjects(tenant)
@@ -5193,6 +5220,18 @@ class QueueItem(zkobject.ZKObject):
return True # This job's configuration has changed
return False
+ def getBlobKeys(self):
+ # Return a set of blob keys used by this item
+ # for each job in the frozen job graph
+ keys = set()
+ job_graph = self.current_build_set.job_graph
+ for job in job_graph.getJobs():
+ for pb in job.all_playbooks:
+ for secret in pb['secrets'].values():
+ if isinstance(secret, dict) and 'blob' in secret:
+ keys.add(secret['blob'])
+ return keys
+
class Bundle:
"""Identifies a collection of changes that must be treated as one unit."""
diff --git a/zuul/model_api.py b/zuul/model_api.py
index 93e34634a..05286dad5 100644
--- a/zuul/model_api.py
+++ b/zuul/model_api.py
@@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# docs/developer/model-changelog.rst
-MODEL_API = 6
+MODEL_API = 7
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 04b6b0f45..0654c3d5d 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -74,6 +74,7 @@ from zuul.model import (
)
from zuul.version import get_version_string
from zuul.zk import ZooKeeperClient
+from zuul.zk.blob_store import BlobStore
from zuul.zk.cleanup import (
SemaphoreCleanupLock,
BuildRequestCleanupLock,
@@ -679,6 +680,7 @@ class Scheduler(threading.Thread):
self._runExecutorApiCleanup()
self._runMergerApiCleanup()
self._runLayoutDataCleanup()
+ self._runBlobStoreCleanup()
self.maintainConnectionCache()
except Exception:
self.log.exception("Error in general cleanup:")
@@ -721,6 +723,40 @@ class Scheduler(threading.Thread):
except Exception:
self.log.exception("Error in layout data cleanup:")
+ def _runBlobStoreCleanup(self):
+ self.log.debug("Starting blob store cleanup")
+ try:
+ live_blobs = set()
+ with self.layout_lock:
+ # get the start ltime so that we can filter out any
+ # blobs used since this point
+ start_ltime = self.zk_client.getCurrentLtime()
+ # lock and refresh the pipeline
+ for tenant in self.abide.tenants.values():
+ for pipeline in tenant.layout.pipelines.values():
+ with pipeline_lock(
+ self.zk_client, tenant.name, pipeline.name,
+ ) as lock:
+ ctx = self.createZKContext(lock, self.log)
+ pipeline.state.refresh(ctx)
+ # add any blobstore references
+ for item in pipeline.getAllItems(include_old=True):
+ live_blobs.update(item.getBlobKeys())
+ ctx = self.createZKContext(None, self.log)
+ blobstore = BlobStore(ctx)
+ # get the set of blob keys unused since the start time
+ # (ie, we have already filtered any newly added keys)
+ unused_blobs = blobstore.getKeysLastUsedBefore(start_ltime)
+ # remove the current refences
+ unused_blobs -= live_blobs
+ # delete what's left
+ for key in unused_blobs:
+ self.log.debug("Deleting unused blob: %s", key)
+ blobstore.delete(key, start_ltime)
+ self.log.debug("Finished blob store cleanup")
+ except Exception:
+ self.log.exception("Error in blob store cleanup:")
+
def _runBuildRequestCleanup(self):
# If someone else is running the cleanup, skip it.
if self.build_request_cleanup_lock.acquire(blocking=False):
diff --git a/zuul/zk/blob_store.py b/zuul/zk/blob_store.py
new file mode 100644
index 000000000..5547faff6
--- /dev/null
+++ b/zuul/zk/blob_store.py
@@ -0,0 +1,201 @@
+# Copyright 2020 BMW Group
+# Copyright 2022 Acme Gating, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import hashlib
+
+from kazoo.exceptions import NoNodeError
+from kazoo.retry import KazooRetry
+
+from zuul.zk.locks import locked, SessionAwareLock
+from zuul.zk.zkobject import LocalZKContext, ZKContext
+from zuul.zk import sharding
+
+
+class BlobStore:
+ _retry_interval = 5
+ data_root = "/zuul/cache/blob/data"
+ lock_root = "/zuul/cache/blob/lock"
+
+ def __init__(self, context):
+ self.context = context
+
+ def _getRootPath(self, key):
+ return f"{self.data_root}/{key[0:2]}/{key}"
+
+ def _getPath(self, key):
+ root = self._getRootPath(key)
+ return f"{root}/data"
+
+ def _getFlagPath(self, key):
+ root = self._getRootPath(key)
+ return f"{root}/complete"
+
+ def _retry(self, context, func, *args, max_tries=-1, **kw):
+ kazoo_retry = KazooRetry(max_tries=max_tries,
+ interrupt=context.sessionIsInvalid,
+ delay=self._retry_interval, backoff=0,
+ ignore_expire=False)
+ try:
+ return kazoo_retry(func, *args, **kw)
+ except InterruptedError:
+ pass
+
+ @staticmethod
+ def _retryableLoad(context, key, path, flag):
+ if not context.client.exists(flag):
+ raise KeyError(key)
+ with sharding.BufferedShardReader(context.client, path) as stream:
+ data = stream.read()
+ compressed_size = stream.compressed_bytes_read
+ context.cumulative_read_time += stream.cumulative_read_time
+ context.cumulative_read_objects += 1
+ context.cumulative_read_znodes += stream.znodes_read
+ context.cumulative_read_bytes += compressed_size
+ return data, compressed_size
+
+ def get(self, key):
+ path = self._getPath(key)
+ flag = self._getFlagPath(key)
+
+ if self.context.sessionIsInvalid():
+ raise Exception("ZooKeeper session or lock not valid")
+
+ data, compressed_size = self._retry(self.context, self._retryableLoad,
+ self.context, key, path, flag)
+ return data
+
+ def _checkKey(self, key):
+ # This returns whether the key is in the store. If it is in
+ # the store, it also touches the flag file so that the cleanup
+ # routine can know the last time an entry was used.
+ flag = self._getFlagPath(key)
+
+ if self.context.sessionIsInvalid():
+ raise Exception("ZooKeeper session or lock not valid")
+
+ ret = self._retry(self.context, self.context.client.exists,
+ flag)
+ if not ret:
+ return False
+ self._retry(self.context, self.context.client.set,
+ flag, b'')
+ return True
+
+ @staticmethod
+ def _retryableSave(context, path, flag, data):
+ with sharding.BufferedShardWriter(context.client, path) as stream:
+ stream.truncate(0)
+ stream.write(data)
+ stream.flush()
+ context.client.ensure_path(flag)
+ compressed_size = stream.compressed_bytes_written
+ context.cumulative_write_time += stream.cumulative_write_time
+ context.cumulative_write_objects += 1
+ context.cumulative_write_znodes += stream.znodes_written
+ context.cumulative_write_bytes += compressed_size
+ return compressed_size
+
+ def put(self, data):
+ if isinstance(self.context, LocalZKContext):
+ return None
+
+ if self.context.sessionIsInvalid():
+ raise Exception("ZooKeeper session or lock not valid")
+
+ hasher = hashlib.sha256()
+ hasher.update(data)
+ key = hasher.hexdigest()
+
+ path = self._getPath(key)
+ flag = self._getFlagPath(key)
+
+ if self._checkKey(key):
+ return key
+
+ with locked(
+ SessionAwareLock(
+ self.context.client,
+ f"{self.lock_root}/{key}"),
+ blocking=True
+ ) as lock:
+ if self._checkKey(key):
+ return key
+
+ # make a new context based on the old one
+ locked_context = ZKContext(self.context.client, lock,
+ self.context.stop_event,
+ self.context.log)
+
+ self._retry(
+ locked_context,
+ self._retryableSave,
+ locked_context, path, flag, data)
+ self.context.updateStatsFromOtherContext(locked_context)
+ return key
+
+ def delete(self, key, ltime):
+ path = self._getRootPath(key)
+ flag = self._getFlagPath(key)
+ if self.context.sessionIsInvalid():
+ raise Exception("ZooKeeper session or lock not valid")
+ try:
+ with locked(
+ SessionAwareLock(
+ self.context.client,
+ f"{self.lock_root}/{key}"),
+ blocking=True
+ ) as lock:
+ # make a new context based on the old one
+ locked_context = ZKContext(self.context.client, lock,
+ self.context.stop_event,
+ self.context.log)
+
+ # Double check that it hasn't been used since we
+ # decided to delete it
+ data, zstat = self._retry(locked_context,
+ self.context.client.get,
+ flag)
+ if zstat.last_modified_transaction_id < ltime:
+ self._retry(locked_context, self.context.client.delete,
+ path, recursive=True)
+ except NoNodeError:
+ raise KeyError(key)
+
+ def __iter__(self):
+ try:
+ hashdirs = self.context.client.get_children(self.data_root)
+ except NoNodeError:
+ return
+
+ for hashdir in hashdirs:
+ try:
+ for key in self.context.client.get_children(
+ f'{self.data_root}/{hashdir}'):
+ yield key
+ except NoNodeError:
+ pass
+
+ def __len__(self):
+ return len([x for x in self])
+
+ def getKeysLastUsedBefore(self, ltime):
+ ret = set()
+ for key in self:
+ flag = self._getFlagPath(key)
+ data, zstat = self._retry(self.context, self.context.client.get,
+ flag)
+ if zstat.last_modified_transaction_id < ltime:
+ ret.add(key)
+ return ret
diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py
index 3905e254c..aa32b8b9b 100644
--- a/zuul/zk/zkobject.py
+++ b/zuul/zk/zkobject.py
@@ -22,11 +22,16 @@ from kazoo.exceptions import NodeExistsError, NoNodeError
from kazoo.retry import KazooRetry
from zuul.zk import sharding
+from zuul.zk import ZooKeeperClient
class ZKContext:
def __init__(self, zk_client, lock, stop_event, log):
- self.client = zk_client.client
+ if isinstance(zk_client, ZooKeeperClient):
+ client = zk_client.client
+ else:
+ client = zk_client
+ self.client = client
self.lock = lock
self.stop_event = stop_event
self.log = log
@@ -46,6 +51,16 @@ class ZKContext:
def sessionIsInvalid(self):
return not self.sessionIsValid()
+ def updateStatsFromOtherContext(self, other):
+ self.cumulative_read_time += other.cumulative_read_time
+ self.cumulative_write_time += other.cumulative_write_time
+ self.cumulative_read_objects += other.cumulative_read_objects
+ self.cumulative_write_objects += other.cumulative_write_objects
+ self.cumulative_read_znodes += other.cumulative_read_znodes
+ self.cumulative_write_znodes += other.cumulative_write_znodes
+ self.cumulative_read_bytes += other.cumulative_read_bytes
+ self.cumulative_write_bytes += other.cumulative_write_bytes
+
class LocalZKContext:
"""A Local ZKContext that means don't actually write anything to ZK"""