summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2022-02-18 20:23:23 -0800
committerJames E. Blair <jim@acmegating.com>2022-02-20 16:55:34 -0800
commitc522bfa46077f0e030f03e0cd4585d5c100ed241 (patch)
tree117a8e09a740a62fa2f154473227c3c3825a4979
parenta1bfee1f9a37f490dbd923175f77a0f62b151c01 (diff)
downloadzuul-c522bfa46077f0e030f03e0cd4585d5c100ed241.tar.gz
Add pipeline timing metrics
This adds several metrics for different phases of processing an item in a pipeline: * How long we wait for a response from mergers * How long it takes to get or compute a layout * How long it takes to freeze jobs * How long we wait for node requests to complete * How long we wait for an executor to start running a job after the request And finally, the total amount of time from the original event until the first job starts. We already report that at the tenant level, this duplicates that for a pipeline-specific metric. Several of these would also make sense as job metrics, but since they are mainly intended to diagnose Zuul system performance and not individual jobs, that would be a waste of storage space due to the extremely high cardinality. Additionally, two other timing metrics are added: the cumulative time spent reading and writing ZKObject data to ZK during pipeline processing. These can help determine whether more effort should be spent optimizing ZK data transfer. In preparing this change, I noticed that python statsd emits floating point values for timing. It's not clear whether this strictly matches the statsd spec, but since it does emit values with that precision, I have removed several int() casts in order to maintain the precision through to the statsd client. I also noticed a place where we were writing a monotonic timestamp value in a JSON serialized string to ZK. I do not believe this value is currently being used, therefore there is no further error to correct, however, we should not use time.monotonic() for values that are serialized since the reference clock will be different on different systems. Several new attributes are added to the QueueItem and Build classes, but are done so in a way that is backwards compatible, so no model api schema upgrade is needed. The code sites where they are used protect against the null values which will occur in a mixed-version cluster (the components will just not emit these stats in those cases). Change-Id: Iaacbef7fa2ed93bfc398a118c5e8cfbc0a67b846
-rw-r--r--doc/source/monitoring.rst73
-rw-r--r--releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml16
-rw-r--r--tests/base.py4
-rw-r--r--tests/fixtures/zuul-connections-gerrit-and-github.conf5
-rw-r--r--tests/fixtures/zuul-sql-driver-mysql.conf5
-rw-r--r--tests/fixtures/zuul-sql-driver-postgres.conf5
-rw-r--r--tests/unit/test_scheduler.py22
-rw-r--r--tests/unit/test_v3.py12
-rw-r--r--zuul/manager/__init__.py56
-rw-r--r--zuul/model.py14
-rw-r--r--zuul/nodepool.py2
-rw-r--r--zuul/scheduler.py9
-rw-r--r--zuul/zk/event_queues.py2
-rw-r--r--zuul/zk/sharding.py20
-rw-r--r--zuul/zk/zkobject.py12
15 files changed, 235 insertions, 22 deletions
diff --git a/doc/source/monitoring.rst b/doc/source/monitoring.rst
index 1caddf708..3c0b2a1ff 100644
--- a/doc/source/monitoring.rst
+++ b/doc/source/monitoring.rst
@@ -79,11 +79,66 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
Holds metrics specific to jobs. This hierarchy includes:
- .. stat:: <pipeline name>
+ .. stat:: <pipeline>
A set of metrics for each pipeline named as defined in the Zuul
config.
+ .. stat:: event_enqueue_time
+ :type: timer
+
+ The time elapsed from when a trigger event was received from
+ the remote system to when the corresponding item is enqueued
+ in a pipeline.
+
+ .. stat:: merge_request_time
+ :type: timer
+
+ The amount of time spent waiting for the initial merge
+ operation(s). This will always include a request to a Zuul
+ merger to speculatively merge the change, but it may also
+ include a second request submitted in parallel to identify
+ the files altered by the change.
+
+ .. stat:: layout_generation_time
+ :type: timer
+
+ The amount of time spent generating a dynamic configuration layout.
+
+ .. stat:: job_freeze_time
+ :type: timer
+
+ The amount of time spent freezing the inheritance hierarchy
+ and parameters of a job.
+
+ .. stat:: repo_state_time
+ :type: timer
+
+ The amount of time waiting for a secondary Zuul merger
+ operation to collect additional information about the repo
+ state of required projects.
+
+ .. stat:: node_request_time
+ :type: timer
+
+ The amount of time spent waiting for each node request to be
+ fulfilled.
+
+ .. stat:: job_wait_time
+ :type: timer
+
+ How long a job waited for an executor to start running it
+ after the build was requested.
+
+ .. stat:: event_job_time
+ :type: timer
+
+ The total amount of time elapsed from when a trigger event
+ was received from the remote system until the item's first
+ job is run. This is only emitted once per queue item, even
+ if its buildset is reset due to a speculative execution
+ failure.
+
.. stat:: all_jobs
:type: counter
@@ -153,8 +208,8 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
.. stat:: wait_time
:type: timer
- How long each item spent in the pipeline before its first job
- started.
+ How long the job waited for an executor to
+ start running it after the build was requested.
.. stat:: current_changes
:type: gauge
@@ -174,6 +229,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
The number of changes for this project processed by the
pipeline since Zuul started.
+ .. stat:: read_time
+ :type: timer
+
+ The time spent reading data from ZooKeeper during a single
+ pipeline processing run.
+
.. stat:: refresh
:type: timer
@@ -206,6 +267,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
The size of the pipeline's management event queue.
+ .. stat:: write_time
+ :type: timer
+
+ The time spent writing data to ZooKeeper during a single
+ pipeline processing run.
+
.. stat:: zuul.executor.<executor>
Holds metrics emitted by individual executors. The ``<executor>``
diff --git a/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml b/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml
new file mode 100644
index 000000000..cab32e146
--- /dev/null
+++ b/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml
@@ -0,0 +1,16 @@
+---
+features:
+ - |
+ The following new statsd metrics are available in order to monitor
+ Zuul system performance:
+
+ * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.event_enqueue_time`
+ * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.merge_request_time`
+ * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.layout_generation_time`
+ * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.job_freeze_time`
+ * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.repo_state_time`
+ * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.node_request_time`
+ * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.job_wait_time`
+ * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.event_job_time`
+ * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.read_time`
+ * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.write_time`
diff --git a/tests/base.py b/tests/base.py
index 64859c5b3..17d6e948f 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -5544,6 +5544,10 @@ class ZuulTestCase(BaseTestCase):
return s_value
time.sleep(0.1)
+ stats = list(itertools.chain.from_iterable(
+ [s.decode('utf-8').split('\n') for s in self.statsd.stats]))
+ for stat in stats:
+ self.log.debug("Stat: %s", stat)
raise StatException("Key %s not found in reported stats" % key)
def assertUnReportedStat(self, key, value=None, kind=None):
diff --git a/tests/fixtures/zuul-connections-gerrit-and-github.conf b/tests/fixtures/zuul-connections-gerrit-and-github.conf
index 601de4b92..d71d4f584 100644
--- a/tests/fixtures/zuul-connections-gerrit-and-github.conf
+++ b/tests/fixtures/zuul-connections-gerrit-and-github.conf
@@ -1,3 +1,8 @@
+[statsd]
+# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
+# see: https://github.com/jsocol/pystatsd/issues/61
+server=127.0.0.1
+
[scheduler]
tenant_config=config/multi-driver/main.yaml
diff --git a/tests/fixtures/zuul-sql-driver-mysql.conf b/tests/fixtures/zuul-sql-driver-mysql.conf
index 094efdd63..4d0ee8c61 100644
--- a/tests/fixtures/zuul-sql-driver-mysql.conf
+++ b/tests/fixtures/zuul-sql-driver-mysql.conf
@@ -1,3 +1,8 @@
+[statsd]
+# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
+# see: https://github.com/jsocol/pystatsd/issues/61
+server=127.0.0.1
+
[scheduler]
tenant_config=main.yaml
diff --git a/tests/fixtures/zuul-sql-driver-postgres.conf b/tests/fixtures/zuul-sql-driver-postgres.conf
index 436d16de4..0d21c9aed 100644
--- a/tests/fixtures/zuul-sql-driver-postgres.conf
+++ b/tests/fixtures/zuul-sql-driver-postgres.conf
@@ -1,3 +1,8 @@
+[statsd]
+# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
+# see: https://github.com/jsocol/pystatsd/issues/61
+server=127.0.0.1
+
[scheduler]
tenant_config=main.yaml
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 703eece1f..773900a41 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -440,13 +440,21 @@ class TestScheduler(ZuulTestCase):
value='0', kind='g')
# Catch time / monotonic errors
- val = self.assertReportedStat(
- 'zuul.tenant.tenant-one.event_enqueue_processing_time',
- kind='ms')
- self.assertTrue(0.0 < float(val) < 60000.0)
- val = self.assertReportedStat(
- 'zuul.tenant.tenant-one.event_enqueue_time', kind='ms')
- self.assertTrue(0.0 < float(val) < 60000.0)
+ for key in [
+ 'zuul.tenant.tenant-one.event_enqueue_processing_time',
+ 'zuul.tenant.tenant-one.event_enqueue_time',
+ 'zuul.tenant.tenant-one.pipeline.gate.event_enqueue_time',
+ 'zuul.tenant.tenant-one.pipeline.gate.merge_request_time',
+ 'zuul.tenant.tenant-one.pipeline.gate.job_freeze_time',
+ 'zuul.tenant.tenant-one.pipeline.gate.node_request_time',
+ 'zuul.tenant.tenant-one.pipeline.gate.job_wait_time',
+ 'zuul.tenant.tenant-one.pipeline.gate.event_job_time',
+ 'zuul.tenant.tenant-one.pipeline.gate.resident_time',
+ 'zuul.tenant.tenant-one.pipeline.gate.read_time',
+ 'zuul.tenant.tenant-one.pipeline.gate.write_time',
+ ]:
+ val = self.assertReportedStat(key, kind='ms')
+ self.assertTrue(0.0 < float(val) < 60000.0)
self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.'
'data_size_compressed',
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index c905896e1..0a3ad2818 100644
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -1240,6 +1240,12 @@ class TestInRepoConfig(ZuulTestCase):
dict(name='project-test3', result='SUCCESS', changes='2,1'),
], ordered=False)
+ # Catch time / monotonic errors
+ val = self.assertReportedStat('zuul.tenant.tenant-one.pipeline.'
+ 'tenant-one-gate.layout_generation_time',
+ kind='ms')
+ self.assertTrue(0.0 < float(val) < 60000.0)
+
def test_dynamic_template(self):
# Tests that a project can't update a template in another
# project.
@@ -7196,6 +7202,12 @@ class TestProvidesRequiresMysql(ZuulTestCase):
}
}])
+ # Catch time / monotonic errors
+ val = self.assertReportedStat('zuul.tenant.tenant-one.pipeline.'
+ 'gate.repo_state_time',
+ kind='ms')
+ self.assertTrue(0.0 < float(val) < 60000.0)
+
@simple_layout('layouts/provides-requires-unshared.yaml')
def test_provides_requires_unshared_queue(self):
self.executor_server.hold_jobs_in_build = True
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 070065ba1..2e805e300 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -1120,7 +1120,10 @@ class PipelineManager(metaclass=ABCMeta):
return self.getFallbackLayout(item)
log.debug("Preparing dynamic layout for: %s" % item.change)
- return self._loadDynamicLayout(item)
+ start = time.time()
+ layout = self._loadDynamicLayout(item)
+ self.reportPipelineTiming('layout_generation_time', start)
+ return layout
def _branchesForRepoState(self, projects, tenant, items=None):
items = items or []
@@ -1253,6 +1256,7 @@ class PipelineManager(metaclass=ABCMeta):
branches=branches)
item.current_build_set.updateAttributes(
self.current_context,
+ repo_state_request_time=time.time(),
repo_state_state=item.current_build_set.PENDING)
return True
@@ -1341,10 +1345,12 @@ class PipelineManager(metaclass=ABCMeta):
if not item.current_build_set.job_graph:
try:
log.debug("Freezing job graph for %s" % (item,))
+ start = time.time()
item.freezeJobGraph(self.getLayout(item),
self.current_context,
skip_file_matcher=False,
redact_secrets_and_keys=False)
+ self.reportPipelineTiming('job_freeze_time', start)
except Exception as e:
# TODOv3(jeblair): nicify this exception as it will be reported
log.exception("Error freezing job graph for %s" % (item,))
@@ -1564,6 +1570,17 @@ class PipelineManager(metaclass=ABCMeta):
log = get_annotated_logger(self.log, build.zuul_event_id)
log.debug("Build %s started", build)
self.sql.reportBuildStart(build)
+ self.reportPipelineTiming('job_wait_time',
+ build.execute_time, build.start_time)
+ if not build.build_set.item.first_job_start_time:
+ # Only report this for the first job in a queue item so
+ # that we don't include gate resets.
+ build.build_set.item.updateAttributes(
+ self.current_context,
+ first_job_start_time=build.start_time)
+ self.reportPipelineTiming('event_job_time',
+ build.build_set.item.event.timestamp,
+ build.start_time)
return True
def onBuildPaused(self, build):
@@ -1664,14 +1681,25 @@ class PipelineManager(metaclass=ABCMeta):
source.setChangeAttributes(item.change, files=event.files)
build_set.updateAttributes(self.current_context,
files_state=build_set.COMPLETE)
+ if build_set.merge_state == build_set.COMPLETE:
+ # We're the second of the files/merger pair, report the stat
+ self.reportPipelineTiming('merge_request_time',
+ build_set.configured_time)
def onMergeCompleted(self, event, build_set):
if build_set.merge_state == build_set.COMPLETE:
self._onGlobalRepoStateCompleted(event, build_set)
+ self.reportPipelineTiming('repo_state_time',
+ build_set.repo_state_request_time)
else:
self._onMergeCompleted(event, build_set)
+ if build_set.files_state == build_set.COMPLETE:
+ # We're the second of the files/merger pair, report the stat
+ self.reportPipelineTiming('merge_request_time',
+ build_set.configured_time)
def _onMergeCompleted(self, event, build_set):
+
item = build_set.item
source = self.sched.connections.getSource(
item.change.project.connection_name)
@@ -1702,12 +1730,14 @@ class PipelineManager(metaclass=ABCMeta):
item.setUnableToMerge()
def _onGlobalRepoStateCompleted(self, event, build_set):
+ item = build_set.item
if not event.updated:
- item = build_set.item
self.log.info("Unable to get global repo state for change %s"
% item.change)
item.setUnableToMerge()
else:
+ self.log.info("Received global repo state for change %s"
+ % item.change)
with build_set.activeContext(self.current_context):
build_set.setExtraRepoState(event.repo_state)
build_set.repo_state_state = build_set.COMPLETE
@@ -1716,6 +1746,7 @@ class PipelineManager(metaclass=ABCMeta):
# TODOv3(jeblair): handle provisioning failure here
log = get_annotated_logger(self.log, request.event_id)
+ self.reportPipelineTiming('node_request_time', request.created_time)
if nodeset is not None:
build_set.jobNodeRequestComplete(request.job_name, nodeset)
if not request.fulfilled:
@@ -1878,7 +1909,7 @@ class PipelineManager(metaclass=ABCMeta):
# Update the gauge on enqueue and dequeue, but timers only
# when dequeing.
if item.dequeue_time:
- dt = int((item.dequeue_time - item.enqueue_time) * 1000)
+ dt = (item.dequeue_time - item.enqueue_time) * 1000
else:
dt = None
items = len(self.pipeline.getAllItems())
@@ -1913,12 +1944,27 @@ class PipelineManager(metaclass=ABCMeta):
if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'):
now = time.time()
arrived = item.event.arrived_at_scheduler_timestamp
- processing = int((now - arrived) * 1000)
- elapsed = int((now - item.event.timestamp) * 1000)
+ processing = (now - arrived) * 1000
+ elapsed = (now - item.event.timestamp) * 1000
self.sched.statsd.timing(
basekey + '.event_enqueue_processing_time',
processing)
self.sched.statsd.timing(
basekey + '.event_enqueue_time', elapsed)
+ self.reportPipelineTiming('event_enqueue_time',
+ item.event.timestamp)
except Exception:
self.log.exception("Exception reporting pipeline stats")
+
+ def reportPipelineTiming(self, key, start, end=None):
+ if not self.sched.statsd:
+ return
+ if not start:
+ return
+ if end is None:
+ end = time.time()
+ pipeline = self.pipeline
+ tenant = pipeline.tenant
+ stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
+ dt = (end - start) * 1000
+ self.sched.statsd.timing(f'{stats_key}.{key}', dt)
diff --git a/zuul/model.py b/zuul/model.py
index 17d2c1814..c47175916 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1485,6 +1485,7 @@ class NodeRequest(object):
if 'tenant_name' in data:
self.tenant_name = data['tenant_name']
self.nodes = data.get('nodes', [])
+ self.created_time = data.get('created_time')
@classmethod
def fromDict(cls, data):
@@ -3624,6 +3625,9 @@ class BuildSet(zkobject.ZKObject):
files_state=self.NEW,
repo_state_state=self.NEW,
configured=False,
+ configured_time=None, # When setConfigured was called
+ start_time=None, # When the buildset reported start
+ repo_state_request_time=None, # When the refstate job was called
fail_fast=False,
job_graph=None,
jobs={},
@@ -3727,6 +3731,9 @@ class BuildSet(zkobject.ZKObject):
"fail_fast": self.fail_fast,
"job_graph": (self.job_graph.toDict()
if self.job_graph else None),
+ "configured_time": self.configured_time,
+ "start_time": self.start_time,
+ "repo_state_request_time": self.repo_state_request_time,
# jobs (serialize as separate objects)
}
return json.dumps(data, sort_keys=True).encode("utf8")
@@ -3831,8 +3838,8 @@ class BuildSet(zkobject.ZKObject):
"builds": builds,
"retry_builds": retry_builds,
# These are local cache objects only valid for one pipeline run
- '_old_job_graph': None,
- '_old_jobs': {},
+ "_old_job_graph": None,
+ "_old_jobs": {},
})
return data
@@ -3868,6 +3875,7 @@ class BuildSet(zkobject.ZKObject):
self.dependent_changes = [i.change.toDict() for i in items]
self.merger_items = [i.makeMergerItem() for i in items]
self.configured = True
+ self.configured_time = time.time()
def getStateName(self, state_num):
return self.states_map.get(
@@ -4016,6 +4024,7 @@ class QueueItem(zkobject.ZKObject):
enqueue_time=None,
report_time=None,
dequeue_time=None,
+ first_job_start_time=None,
reported=False,
reported_start=False,
quiet=False,
@@ -4088,6 +4097,7 @@ class QueueItem(zkobject.ZKObject):
"dynamic_state": self.dynamic_state,
"bundle": self.bundle and self.bundle.serialize(),
"dequeued_bundle_failing": self.dequeued_bundle_failing,
+ "first_job_start_time": self.first_job_start_time,
}
return json.dumps(data, sort_keys=True).encode("utf8")
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 8dd27bfe6..d30c846f7 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -146,7 +146,7 @@ class Nodepool(object):
if request.canceled:
state = 'canceled'
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
- dt = int((request.state_time - request.requested_time) * 1000)
+ dt = (request.state_time - request.requested_time) * 1000
key = 'zuul.nodepool.requests.%s' % state
pipe.incr(key + ".total")
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 39193f103..57b212774 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -795,14 +795,14 @@ class Scheduler(threading.Thread):
'RETRY' if build.result is None else build.result
)
if build.result in ['SUCCESS', 'FAILURE'] and build.start_time:
- dt = int((build.end_time - build.start_time) * 1000)
+ dt = (build.end_time - build.start_time) * 1000
self.statsd.timing(key, dt)
self.statsd.incr(key)
# zuul.tenant.<tenant>.pipeline.<pipeline>.project.
# <host>.<project>.<branch>.job.<job>.wait_time
if build.start_time:
key = '%s.wait_time' % jobkey
- dt = int((build.start_time - build.execute_time) * 1000)
+ dt = (build.start_time - build.execute_time) * 1000
self.statsd.timing(key, dt)
except Exception:
self.log.exception("Exception reporting runtime stats")
@@ -1891,6 +1891,11 @@ class Scheduler(threading.Thread):
self._process_pipeline(tenant, pipeline)
# Update pipeline summary for zuul-web
pipeline.summary.update(ctx, self.globals)
+ if self.statsd:
+ self.statsd.timing(f'{stats_key}.read_time',
+ ctx.cumulative_read_time * 1000)
+ self.statsd.timing(f'{stats_key}.write_time',
+ ctx.cumulative_write_time * 1000)
except LockException:
self.log.debug("Skipping locked pipeline %s in tenant %s",
pipeline.name, tenant.name)
diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py
index fdc99442f..e35e05ecb 100644
--- a/zuul/zk/event_queues.py
+++ b/zuul/zk/event_queues.py
@@ -590,7 +590,7 @@ class ManagementEventQueue(ZooKeeperEventQueue):
return
result_data = {"traceback": event.traceback,
- "timestamp": time.monotonic()}
+ "timestamp": time.time()}
try:
self.kazoo_client.set(
event.result_ref,
diff --git a/zuul/zk/sharding.py b/zuul/zk/sharding.py
index a1278a8a2..5ca1158d5 100644
--- a/zuul/zk/sharding.py
+++ b/zuul/zk/sharding.py
@@ -14,6 +14,7 @@
import io
from contextlib import suppress
+import time
import zlib
from kazoo.exceptions import NoNodeError
@@ -30,6 +31,8 @@ class RawShardIO(io.RawIOBase):
self.shard_base = path
self.compressed_bytes_read = 0
self.compressed_bytes_written = 0
+ self.cumulative_read_time = 0.0
+ self.cumulative_write_time = 0.0
def readable(self):
return True
@@ -46,12 +49,17 @@ class RawShardIO(io.RawIOBase):
@property
def _shards(self):
try:
- return self.client.get_children(self.shard_base)
+ start = time.perf_counter()
+ ret = self.client.get_children(self.shard_base)
+ self.cumulative_read_time += time.perf_counter() - start
+ return ret
except NoNodeError:
return []
def _getData(self, path):
+ start = time.perf_counter()
data, _ = self.client.get(path)
+ self.cumulative_read_time += time.perf_counter() - start
self.compressed_bytes_read += len(data)
return zlib.decompress(data)
@@ -69,12 +77,14 @@ class RawShardIO(io.RawIOBase):
shard_bytes = zlib.compress(shard_bytes)
if not (len(shard_bytes) < NODE_BYTE_SIZE_LIMIT):
raise RuntimeError("Shard too large")
+ start = time.perf_counter()
self.client.create(
"{}/".format(self.shard_base),
shard_bytes,
sequence=True,
makepath=True,
)
+ self.cumulative_write_time += time.perf_counter() - start
self.compressed_bytes_written += len(shard_bytes)
return min(byte_count, NODE_BYTE_SIZE_LIMIT)
@@ -88,6 +98,10 @@ class BufferedShardWriter(io.BufferedWriter):
def compressed_bytes_written(self):
return self.__raw.compressed_bytes_written
+ @property
+ def cumulative_write_time(self):
+ return self.__raw.cumulative_write_time
+
class BufferedShardReader(io.BufferedReader):
def __init__(self, client, path):
@@ -97,3 +111,7 @@ class BufferedShardReader(io.BufferedReader):
@property
def compressed_bytes_read(self):
return self.__raw.compressed_bytes_read
+
+ @property
+ def cumulative_read_time(self):
+ return self.__raw.cumulative_read_time
diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py
index 45eb2b8e2..e837bf3ea 100644
--- a/zuul/zk/zkobject.py
+++ b/zuul/zk/zkobject.py
@@ -31,6 +31,8 @@ class ZKContext:
self.lock = lock
self.stop_event = stop_event
self.log = log
+ self.cumulative_read_time = 0.0
+ self.cumulative_write_time = 0.0
def sessionIsValid(self):
return ((not self.lock or self.lock.is_still_valid()) and
@@ -237,7 +239,10 @@ class ZKObject:
path = self.getPath()
while context.sessionIsValid():
try:
+ start = time.perf_counter()
compressed_data, zstat = context.client.get(path)
+ context.cumulative_read_time += time.perf_counter() - start
+
self._set(_zkobject_hash=None)
try:
data = zlib.decompress(compressed_data)
@@ -278,6 +283,7 @@ class ZKObject:
while context.sessionIsValid():
try:
compressed_data = zlib.compress(data)
+ start = time.perf_counter()
if create:
real_path, zstat = context.client.create(
path, compressed_data, makepath=True,
@@ -285,6 +291,7 @@ class ZKObject:
else:
zstat = context.client.set(path, compressed_data,
version=self._zstat.version)
+ context.cumulative_write_time += time.perf_counter() - start
self._set(_zstat=zstat,
_zkobject_hash=hash(data),
_zkobject_compressed_size=len(compressed_data),
@@ -336,6 +343,8 @@ class ShardedZKObject(ZKObject):
context.client, path) as stream:
data = stream.read()
compressed_size = stream.compressed_bytes_read
+ context.cumulative_read_time += \
+ stream.cumulative_read_time
if not data and context.client.exists(path) is None:
raise NoNodeError
self._set(**self.deserialize(data, context))
@@ -382,6 +391,9 @@ class ShardedZKObject(ZKObject):
stream.write(data)
stream.flush()
compressed_size = stream.compressed_bytes_written
+ context.cumulative_write_time += \
+ stream.cumulative_write_time
+
self._set(_zkobject_hash=hash(data),
_zkobject_compressed_size=compressed_size,
_zkobject_uncompressed_size=len(data),