path: root/tools
diff options
authorJames E. Blair <>2022-01-05 16:56:35 -0800
committerJames E. Blair <>2022-01-26 12:59:39 -0800
commit1f3f724bbbba616b6eb3dba808d1c1c43953e64f (patch)
tree28e685aee5136cfacce0b4d9aeaa7324cd457a0c /tools
parent02efa8fb28af77c63990722f9b21241132a7de60 (diff)
Add some ZK debug scripts
These may be useful for zuul developers to understand issues with the ZK data storage. zk-dump will dump an approximation of the contents of ZK to the filesystem for manual examination. zk-analyze will perform some analysis on the tree to identify objects which may be execessively large. Change-Id: I1a90cce42da719eee0a5e50242034390722d518e
Diffstat (limited to 'tools')
2 files changed, 539 insertions, 0 deletions
diff --git a/tools/ b/tools/
new file mode 100644
index 000000000..581dc4799
--- /dev/null
+++ b/tools/
@@ -0,0 +1,471 @@
+# Copyright 2022 Acme Gating, LLC
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# Analyze the contents of the ZK tree (whether in ZK or a dump on the
+# local filesystem) to identify large objects.
+import argparse
+import json
+import os
+import sys
+import zlib
+import kazoo.client
+KB = 1024
+MB = 1024**2
+GB = 1024**3
+def convert_human(size):
+ if size >= GB:
+ return f'{int(size/GB)}G'
+ if size >= MB:
+ return f'{int(size/MB)}M'
+ if size >= KB:
+ return f'{int(size/KB)}K'
+ if size > 0:
+ return f'{size}B'
+ return '0'
+def convert_null(size):
+ return size
+def unconvert_human(size):
+ suffix = size[-1]
+ val = size[:-1]
+ if suffix in ['G', 'g']:
+ return int(val) * GB
+ if suffix in ['M', 'm']:
+ return int(val) * MB
+ if suffix in ['K', 'k']:
+ return int(val) * KB
+ return int(size)
+class SummaryLine:
+ def __init__(self, kind, path, size=0, zk_size=0):
+ self.kind = kind
+ self.path = path
+ self.size = size
+ self.zk_size = zk_size
+ self.attrs = {}
+ self.children = []
+ @property
+ def tree_size(self):
+ return sum([x.tree_size for x in self.children] + [self.size])
+ @property
+ def zk_tree_size(self):
+ return sum([x.zk_tree_size for x in self.children] + [self.zk_size])
+ def add(self, child):
+ self.children.append(child)
+ def __str__(self):
+ indent = 0
+ return self.toStr(indent)
+ def matchesLimit(self, limit, zk):
+ if not limit:
+ return True
+ if zk:
+ size = self.zk_size
+ else:
+ size = self.size
+ if size >= limit:
+ return True
+ for child in self.children:
+ if child.matchesLimit(limit, zk):
+ return True
+ return False
+ def toStr(self, indent, depth=None, conv=convert_null, limit=0, zk=False):
+ """Convert this item and its children to a str representation
+ :param indent int: How many levels to indent
+ :param depth int: How many levels deep to display
+ :param conv func: A function to convert sizes to text
+ :param limit int: Don't display items smaller than this
+ :param zk bool: Whether to use the data size (False)
+ or ZK storage size (True)
+ """
+ if depth and indent >= depth:
+ return ''
+ if self.matchesLimit(limit, zk):
+ attrs = ' '.join([f'{k}={conv(v)}' for k, v in self.attrs.items()])
+ if attrs:
+ attrs = ' ' + attrs
+ if zk:
+ size = conv(self.zk_size)
+ tree_size = conv(self.zk_tree_size)
+ else:
+ size = conv(self.size)
+ tree_size = conv(self.tree_size)
+ ret = (' ' * indent + f"{self.kind} {self.path} "
+ f"size={size} tree={tree_size}{attrs}\n")
+ for child in self.children:
+ ret += child.toStr(indent + 1, depth, conv, limit, zk)
+ else:
+ ret = ''
+ return ret
+class Data:
+ def __init__(self, path, raw, zk_size=None, failed=False):
+ self.path = path
+ self.raw = raw
+ self.failed = failed
+ self.zk_size = zk_size or len(raw)
+ if not failed:
+ = json.loads(raw)
+ else:
+ print(f"!!! {path} failed to load data")
+ = {}
+ @property
+ def size(self):
+ return len(self.raw)
+class Tree:
+ def getNode(self, path):
+ pass
+ def listChildren(self, path):
+ pass
+ def listConnections(self):
+ return self.listChildren('/zuul/cache/connection')
+ def getBranchCache(self, connection):
+ return self.getShardedNode(f'/zuul/cache/connection/{connection}'
+ '/branches/data')
+ def listCacheKeys(self, connection):
+ return self.listChildren(f'/zuul/cache/connection/{connection}/cache')
+ def getCacheKey(self, connection, key):
+ return self.getNode(f'/zuul/cache/connection/{connection}/cache/{key}')
+ def listCacheData(self, connection):
+ return self.listChildren(f'/zuul/cache/connection/{connection}/data')
+ def getCacheData(self, connection, key):
+ return self.getShardedNode(f'/zuul/cache/connection/{connection}'
+ f'/data/{key}')
+ def listTenants(self):
+ return self.listChildren('/zuul/tenant')
+ def listPipelines(self, tenant):
+ return self.listChildren(f'/zuul/tenant/{tenant}/pipeline')
+ def getPipeline(self, tenant, pipeline):
+ return self.getNode(f'/zuul/tenant/{tenant}/pipeline/{pipeline}')
+ def getItems(self, tenant, pipeline):
+ pdata = self.getPipeline(tenant, pipeline)
+ for queue in'queues', []):
+ qdata = self.getNode(queue)
+ for item in'queue', []):
+ idata = self.getNode(item)
+ yield idata
+ def listBuildsets(self, item):
+ return self.listChildren(f'{item}/buildset')
+ def getBuildset(self, item, buildset):
+ return self.getNode(f'{item}/buildset/{buildset}')
+ def listJobs(self, buildset):
+ return self.listChildren(f'{buildset}/job')
+ def getJob(self, buildset, job_name):
+ return self.getNode(f'{buildset}/job/{job_name}')
+ def listBuilds(self, buildset, job_name):
+ return self.listChildren(f'{buildset}/job/{job_name}/build')
+ def getBuild(self, buildset, job_name, build):
+ return self.getNode(f'{buildset}/job/{job_name}/build/{build}')
+class FilesystemTree(Tree):
+ def __init__(self, root):
+ self.root = root
+ def getNode(self, path):
+ path = path.lstrip('/')
+ fullpath = os.path.join(self.root, path)
+ if not os.path.exists(fullpath):
+ return Data(path, '', failed=True)
+ try:
+ with open(os.path.join(fullpath, 'ZKDATA'), 'rb') as f:
+ zk_data =
+ data = zk_data
+ try:
+ data = zlib.decompress(zk_data)
+ except Exception:
+ pass
+ return Data(path, data, zk_size=len(zk_data))
+ except Exception:
+ return Data(path, '', failed=True)
+ def getShardedNode(self, path):
+ path = path.lstrip('/')
+ fullpath = os.path.join(self.root, path)
+ if not os.path.exists(fullpath):
+ return Data(path, '', failed=True)
+ shards = sorted([x for x in os.listdir(fullpath)
+ if x != 'ZKDATA'])
+ data = b''
+ compressed_data_len = 0
+ try:
+ for shard in shards:
+ with open(os.path.join(fullpath, shard, 'ZKDATA'), 'rb') as f:
+ compressed_data =
+ compressed_data_len += len(compressed_data)
+ data += zlib.decompress(compressed_data)
+ return Data(path, data, zk_size=compressed_data_len)
+ except Exception:
+ return Data(path, data, failed=True)
+ def listChildren(self, path):
+ path = path.lstrip('/')
+ fullpath = os.path.join(self.root, path)
+ if not os.path.exists(fullpath):
+ return []
+ return [x for x in os.listdir(fullpath)
+ if x != 'ZKDATA']
+class ZKTree(Tree):
+ def __init__(self, host, cert, key, ca):
+ kwargs = {}
+ if cert:
+ kwargs['use_ssl'] = True
+ kwargs['keyfile'] = key
+ kwargs['certfile'] = cert
+ kwargs['ca'] = ca
+ self.client = kazoo.client.KazooClient(host, **kwargs)
+ self.client.start()
+ def getNode(self, path):
+ path = path.lstrip('/')
+ if not self.client.exists(path):
+ return Data(path, '', failed=True)
+ try:
+ zk_data, _ = self.client.get(path)
+ data = zk_data
+ try:
+ data = zlib.decompress(zk_data)
+ except Exception:
+ pass
+ return Data(path, data, zk_size=len(zk_data))
+ except Exception:
+ return Data(path, '', failed=True)
+ def getShardedNode(self, path):
+ path = path.lstrip('/')
+ if not self.client.exists(path):
+ return Data(path, '', failed=True)
+ shards = sorted(self.listChildren(path))
+ data = b''
+ compressed_data_len = 0
+ try:
+ for shard in shards:
+ compressed_data, _ = self.client.get(os.path.join(path, shard))
+ compressed_data_len += len(compressed_data)
+ data += zlib.decompress(compressed_data)
+ return Data(path, data, zk_size=compressed_data_len)
+ except Exception:
+ return Data(path, data, failed=True)
+ def listChildren(self, path):
+ path = path.lstrip('/')
+ try:
+ return self.client.get_children(path)
+ except kazoo.client.NoNodeError:
+ return []
+class Analyzer:
+ def __init__(self, args):
+ if args.path:
+ self.tree = FilesystemTree(args.path)
+ else:
+ self.tree = ZKTree(, args.cert, args.key,
+ if args.depth is not None:
+ self.depth = int(args.depth)
+ else:
+ self.depth = None
+ if args.human:
+ self.conv = convert_human
+ else:
+ self.conv = convert_null
+ if args.limit:
+ self.limit = unconvert_human(args.limit)
+ else:
+ self.limit = 0
+ self.use_zk_size = args.zk_size
+ def summarizeItem(self, item):
+ # Start with an item
+ item_summary = SummaryLine('Item', item.path, item.size, item.zk_size)
+ buildsets = self.tree.listBuildsets(item.path)
+ for bs_i, bs_id in enumerate(buildsets):
+ # Add each buildset
+ buildset = self.tree.getBuildset(item.path, bs_id)
+ buildset_summary = SummaryLine(
+ 'Buildset', buildset.path,
+ buildset.size, buildset.zk_size)
+ item_summary.add(buildset_summary)
+ # Some attributes are offloaded, gather them and include
+ # the size.
+ for x in ['merge_repo_state', 'extra_repo_state', 'files',
+ 'config_errors']:
+ if
+ node = self.tree.getShardedNode(
+ buildset_summary.attrs[x] = \
+ self.use_zk_size and node.zk_size or node.size
+ buildset_summary.size += node.size
+ buildset_summary.zk_size += node.zk_size
+ jobs = self.tree.listJobs(buildset.path)
+ for job_i, job_name in enumerate(jobs):
+ # Add each job
+ job = self.tree.getJob(buildset.path, job_name)
+ job_summary = SummaryLine('Job', job.path,
+ job.size, job.zk_size)
+ buildset_summary.add(job_summary)
+ # Handle offloaded job data
+ for job_attr in ('artifact_data',
+ 'extra_variables',
+ 'group_variables',
+ 'host_variables',
+ 'secret_parent_data',
+ 'variables',
+ 'parent_data',
+ 'secrets'):
+ job_data =, None)
+ if job_data and job_data['storage'] == 'offload':
+ node = self.tree.getShardedNode(job_data['path'])
+ job_summary.attrs[job_attr] = \
+ self.use_zk_size and node.zk_size or node.size
+ job_summary.size += node.size
+ job_summary.zk_size += node.zk_size
+ builds = self.tree.listBuilds(buildset.path, job_name)
+ for build_i, build_id in enumerate(builds):
+ # Add each build
+ build = self.tree.getBuild(
+ buildset.path, job_name, build_id)
+ build_summary = SummaryLine(
+ 'Build', build.path, build.size, build.zk_size)
+ job_summary.add(build_summary)
+ # Add the offloaded build attributes
+ result_len = 0
+ result_zk_len = 0
+ if'_result_data'):
+ result_data = self.tree.getShardedNode(
+ result_len += result_data.size
+ result_zk_len += result_data.zk_size
+ if'_secret_result_data'):
+ secret_result_data = self.tree.getShardedNode(
+ result_len += secret_result_data.size
+ result_zk_len += secret_result_data.zk_size
+ build_summary.attrs['results'] = \
+ self.use_zk_size and result_zk_len or result_len
+ build_summary.size += result_len
+ build_summary.zk_size += result_zk_len
+ sys.stdout.write(item_summary.toStr(0, self.depth, self.conv,
+ self.limit, self.use_zk_size))
+ def summarizePipelines(self):
+ for tenant_name in self.tree.listTenants():
+ for pipeline_name in self.tree.listPipelines(tenant_name):
+ for item in self.tree.getItems(tenant_name, pipeline_name):
+ self.summarizeItem(item)
+ def summarizeConnectionCache(self, connection_name):
+ connection_summary = SummaryLine('Connection', connection_name, 0, 0)
+ branch_cache = self.tree.getBranchCache(connection_name)
+ branch_summary = SummaryLine(
+ 'Branch Cache', connection_name,
+ branch_cache.size, branch_cache.zk_size)
+ connection_summary.add(branch_summary)
+ cache_key_summary = SummaryLine(
+ 'Change Cache Keys', connection_name, 0, 0)
+ cache_key_summary.attrs['count'] = 0
+ connection_summary.add(cache_key_summary)
+ for key in self.tree.listCacheKeys(connection_name):
+ cache_key = self.tree.getCacheKey(connection_name, key)
+ cache_key_summary.size += cache_key.size
+ cache_key_summary.zk_size += cache_key.zk_size
+ cache_key_summary.attrs['count'] += 1
+ cache_data_summary = SummaryLine(
+ 'Change Cache Data', connection_name, 0, 0)
+ cache_data_summary.attrs['count'] = 0
+ connection_summary.add(cache_data_summary)
+ for key in self.tree.listCacheData(connection_name):
+ cache_data = self.tree.getCacheData(connection_name, key)
+ cache_data_summary.size += cache_data.size
+ cache_data_summary.zk_size += cache_data.zk_size
+ cache_data_summary.attrs['count'] += 1
+ sys.stdout.write(connection_summary.toStr(
+ 0, self.depth, self.conv, self.limit, self.use_zk_size))
+ def summarizeConnections(self):
+ for connection_name in self.tree.listConnections():
+ self.summarizeConnectionCache(connection_name)
+ def summarize(self):
+ self.summarizeConnections()
+ self.summarizePipelines()
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--path',
+ help='Filesystem path for previously dumped data')
+ parser.add_argument('--host',
+ help='ZK host string (exclusive with --path)')
+ parser.add_argument('--cert', help='Path to TLS certificate')
+ parser.add_argument('--key', help='Path to TLS key')
+ parser.add_argument('--ca', help='Path to TLS CA cert')
+ parser.add_argument('-d', '--depth', help='Limit depth when printing')
+ parser.add_argument('-H', '--human', dest='human', action='store_true',
+ help='Use human-readable sizes')
+ parser.add_argument('-l', '--limit', dest='limit',
+ help='Only print nodes greater than limit')
+ parser.add_argument('-Z', '--zksize', dest='zk_size', action='store_true',
+ help='Use the possibly compressed ZK storage size '
+ 'instead of plain data size')
+ args = parser.parse_args()
+ az = Analyzer(args)
+ az.summarize()
diff --git a/tools/ b/tools/
new file mode 100644
index 000000000..91969584d
--- /dev/null
+++ b/tools/
@@ -0,0 +1,68 @@
+# Copyright 2022 Acme Gating, LLC
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# Dump the data in ZK to the local filesystem.
+import argparse
+import os
+import zlib
+import kazoo.client
+def getTree(client, root, path, decompress=False):
+ try:
+ data, zstat = client.get(path)
+ except kazoo.exceptions.NoNodeError:
+ print(f"No node at {path}")
+ return
+ if decompress:
+ try:
+ data = zlib.decompress(data)
+ except Exception:
+ pass
+ os.makedirs(root + path)
+ with open(root + path + '/ZKDATA', 'wb') as f:
+ f.write(data)
+ for child in client.get_children(path):
+ getTree(client, root, path + '/' + child, decompress)
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('host', help='ZK host string')
+ parser.add_argument('path', help='Filesystem output path for data dump')
+ parser.add_argument('--cert', help='Path to TLS certificate')
+ parser.add_argument('--key', help='Path to TLS key')
+ parser.add_argument('--ca', help='Path to TLS CA cert')
+ parser.add_argument('--decompress', action='store_true',
+ help='Decompress data')
+ args = parser.parse_args()
+ kwargs = {}
+ if args.cert:
+ kwargs['use_ssl'] = True
+ kwargs['keyfile'] = args.key
+ kwargs['certfile'] = args.cert
+ kwargs['ca'] =
+ client = kazoo.client.KazooClient(, **kwargs)
+ client.start()
+ getTree(client, args.path, '/zuul', args.decompress)
+if __name__ == '__main__':
+ main()