diff options
author | Zack Dever <zdever@pandora.com> | 2016-04-07 17:46:55 -0700 |
---|---|---|
committer | Zack Dever <zdever@pandora.com> | 2016-04-13 17:26:39 -0700 |
commit | e010669b602ffdfddde6fa2a381dad6c3be1f05d (patch) | |
tree | 453f771664ac89d18a1962891580c1a1d7b19b3f /kafka/coordinator/consumer.py | |
parent | caf4cdefe4f41b444d44ddef8f40f5ddeccf65b9 (diff) | |
download | kafka-python-e010669b602ffdfddde6fa2a381dad6c3be1f05d.tar.gz |
Beginnings of metrics instrumentation in kafka consumer.
This adds the parent metrics instance to kafka consumer, which will
eventually be used to instrument everything under consumer. To start
I ported the java consumer coordinator metrics.
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r-- | kafka/coordinator/consumer.py | 71 |
1 files changed, 30 insertions, 41 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index cd3d48a..50d2806 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -14,6 +14,8 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor from .protocol import ConsumerProtocol from .. import errors as Errors from ..future import Future +from ..metrics import AnonMeasurable +from ..metrics.stats import Avg, Count, Max, Rate from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest from ..structs import OffsetAndMetadata, TopicPartition from ..util import WeakMethod @@ -36,7 +38,8 @@ class ConsumerCoordinator(BaseCoordinator): 'api_version': (0, 9), } - def __init__(self, client, subscription, **configs): + def __init__(self, client, subscription, metrics, metric_group_prefix, + **configs): """Initialize the coordination manager. Keyword Arguments: @@ -97,10 +100,8 @@ class ConsumerCoordinator(BaseCoordinator): interval = self.config['auto_commit_interval_ms'] / 1000.0 self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval) - # metrics=None, - # metric_group_prefix=None, - # metric_tags=None, - # self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) + self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, + self._subscription) def __del__(self): if hasattr(self, '_auto_commit_task') and self._auto_commit_task: @@ -470,12 +471,13 @@ class ConsumerCoordinator(BaseCoordinator): future = Future() _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_commit_response, offsets, future) + _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time()) _f.add_errback(self._failed_request, node_id, request, future) return future - def _handle_offset_commit_response(self, offsets, future, response): - #self.sensors.commit_latency.record(response.requestLatencyMs()) + def _handle_offset_commit_response(self, offsets, future, send_time, response): + # TODO look at adding request_latency_ms to response (like java kafka) + self._sensors.commit_latency.record((time.time() - send_time) * 1000) unauthorized_topics = set() for topic, partitions in response.topics: @@ -720,38 +722,25 @@ class AutoCommitTask(object): self._reschedule(next_at) -# TODO -""" class ConsumerCoordinatorMetrics(object): - def __init__(self, metrics, prefix, tags): + def __init__(self, metrics, metric_group_prefix, subscription): self.metrics = metrics - self.group_name = prefix + "-coordinator-metrics" - - self.commit_latency = metrics.sensor("commit-latency") - self.commit_latency.add(metrics.MetricName( - "commit-latency-avg", self.group_name, - "The average time taken for a commit request", - tags), metrics.Avg()) - self.commit_latency.add(metrics.MetricName( - "commit-latency-max", self.group_name, - "The max time taken for a commit request", - tags), metrics.Max()) - self.commit_latency.add(metrics.MetricName( - "commit-rate", self.group_name, - "The number of commit calls per second", - tags), metrics.Rate(metrics.Count())) - - ''' - def _num_partitions(config, now): - new Measurable() { - public double measure(MetricConfig config, long now) { - return subscriptions.assignedPartitions().size(); - } - }; - metrics.addMetric(new MetricName("assigned-partitions", - this.metricGrpName, - "The number of partitions currently assigned to this consumer", - tags), - numParts); - ''' -""" + self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix + + self.commit_latency = metrics.sensor('commit-latency') + self.commit_latency.add(metrics.metric_name( + 'commit-latency-avg', self.metric_group_name, + 'The average time taken for a commit request'), Avg()) + self.commit_latency.add(metrics.metric_name( + 'commit-latency-max', self.metric_group_name, + 'The max time taken for a commit request'), Max()) + self.commit_latency.add(metrics.metric_name( + 'commit-rate', self.metric_group_name, + 'The number of commit calls per second'), Rate(sampled_stat=Count())) + + num_parts = AnonMeasurable(lambda config, now: + len(subscription.assigned_partitions())) + metrics.add_metric(metrics.metric_name( + 'assigned-partitions', self.metric_group_name, + 'The number of partitions currently assigned to this consumer'), + num_parts) |