summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
authorZack Dever <zdever@pandora.com>2016-04-07 17:46:55 -0700
committerZack Dever <zdever@pandora.com>2016-04-13 17:26:39 -0700
commite010669b602ffdfddde6fa2a381dad6c3be1f05d (patch)
tree453f771664ac89d18a1962891580c1a1d7b19b3f /kafka/coordinator/consumer.py
parentcaf4cdefe4f41b444d44ddef8f40f5ddeccf65b9 (diff)
downloadkafka-python-e010669b602ffdfddde6fa2a381dad6c3be1f05d.tar.gz
Beginnings of metrics instrumentation in kafka consumer.
This adds the parent metrics instance to kafka consumer, which will eventually be used to instrument everything under consumer. To start I ported the java consumer coordinator metrics.
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py71
1 files changed, 30 insertions, 41 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index cd3d48a..50d2806 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -14,6 +14,8 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import ConsumerProtocol
from .. import errors as Errors
from ..future import Future
+from ..metrics import AnonMeasurable
+from ..metrics.stats import Avg, Count, Max, Rate
from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest
from ..structs import OffsetAndMetadata, TopicPartition
from ..util import WeakMethod
@@ -36,7 +38,8 @@ class ConsumerCoordinator(BaseCoordinator):
'api_version': (0, 9),
}
- def __init__(self, client, subscription, **configs):
+ def __init__(self, client, subscription, metrics, metric_group_prefix,
+ **configs):
"""Initialize the coordination manager.
Keyword Arguments:
@@ -97,10 +100,8 @@ class ConsumerCoordinator(BaseCoordinator):
interval = self.config['auto_commit_interval_ms'] / 1000.0
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
- # metrics=None,
- # metric_group_prefix=None,
- # metric_tags=None,
- # self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
+ self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
+ self._subscription)
def __del__(self):
if hasattr(self, '_auto_commit_task') and self._auto_commit_task:
@@ -470,12 +471,13 @@ class ConsumerCoordinator(BaseCoordinator):
future = Future()
_f = self._client.send(node_id, request)
- _f.add_callback(self._handle_offset_commit_response, offsets, future)
+ _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time())
_f.add_errback(self._failed_request, node_id, request, future)
return future
- def _handle_offset_commit_response(self, offsets, future, response):
- #self.sensors.commit_latency.record(response.requestLatencyMs())
+ def _handle_offset_commit_response(self, offsets, future, send_time, response):
+ # TODO look at adding request_latency_ms to response (like java kafka)
+ self._sensors.commit_latency.record((time.time() - send_time) * 1000)
unauthorized_topics = set()
for topic, partitions in response.topics:
@@ -720,38 +722,25 @@ class AutoCommitTask(object):
self._reschedule(next_at)
-# TODO
-"""
class ConsumerCoordinatorMetrics(object):
- def __init__(self, metrics, prefix, tags):
+ def __init__(self, metrics, metric_group_prefix, subscription):
self.metrics = metrics
- self.group_name = prefix + "-coordinator-metrics"
-
- self.commit_latency = metrics.sensor("commit-latency")
- self.commit_latency.add(metrics.MetricName(
- "commit-latency-avg", self.group_name,
- "The average time taken for a commit request",
- tags), metrics.Avg())
- self.commit_latency.add(metrics.MetricName(
- "commit-latency-max", self.group_name,
- "The max time taken for a commit request",
- tags), metrics.Max())
- self.commit_latency.add(metrics.MetricName(
- "commit-rate", self.group_name,
- "The number of commit calls per second",
- tags), metrics.Rate(metrics.Count()))
-
- '''
- def _num_partitions(config, now):
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return subscriptions.assignedPartitions().size();
- }
- };
- metrics.addMetric(new MetricName("assigned-partitions",
- this.metricGrpName,
- "The number of partitions currently assigned to this consumer",
- tags),
- numParts);
- '''
-"""
+ self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix
+
+ self.commit_latency = metrics.sensor('commit-latency')
+ self.commit_latency.add(metrics.metric_name(
+ 'commit-latency-avg', self.metric_group_name,
+ 'The average time taken for a commit request'), Avg())
+ self.commit_latency.add(metrics.metric_name(
+ 'commit-latency-max', self.metric_group_name,
+ 'The max time taken for a commit request'), Max())
+ self.commit_latency.add(metrics.metric_name(
+ 'commit-rate', self.metric_group_name,
+ 'The number of commit calls per second'), Rate(sampled_stat=Count()))
+
+ num_parts = AnonMeasurable(lambda config, now:
+ len(subscription.assigned_partitions()))
+ metrics.add_metric(metrics.metric_name(
+ 'assigned-partitions', self.metric_group_name,
+ 'The number of partitions currently assigned to this consumer'),
+ num_parts)