summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-17 14:39:27 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-17 17:54:06 -0700
commit3a7802d51c3a34f1efafb97b80deceab98ec8b09 (patch)
tree792ac17bcbb0f37fddd42a6594b4406fba9f7a67 /kafka/coordinator/consumer.py
parent436b2b20117ea60f9cdcad1f6f8ad46cb439c1ed (diff)
downloadkafka-python-3a7802d51c3a34f1efafb97b80deceab98ec8b09.tar.gz
Add base coordinator metrics
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py11
1 files changed, 7 insertions, 4 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 517f66a..d6ad9e6 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -76,7 +76,10 @@ class ConsumerCoordinator(BaseCoordinator):
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
"""
- super(ConsumerCoordinator, self).__init__(client, **configs)
+ super(ConsumerCoordinator, self).__init__(client,
+ metrics, metric_group_prefix,
+ **configs)
+
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -107,8 +110,8 @@ class ConsumerCoordinator(BaseCoordinator):
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
self._auto_commit_task.reschedule()
- self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
- self._subscription)
+ self.consumer_sensors = ConsumerCoordinatorMetrics(
+ metrics, metric_group_prefix, self._subscription)
def __del__(self):
if hasattr(self, '_cluster') and self._cluster:
@@ -485,7 +488,7 @@ class ConsumerCoordinator(BaseCoordinator):
def _handle_offset_commit_response(self, offsets, future, send_time, response):
# TODO look at adding request_latency_ms to response (like java kafka)
- self._sensors.commit_latency.record((time.time() - send_time) * 1000)
+ self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
unauthorized_topics = set()
for topic, partitions in response.topics: