summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/fetcher.py6
-rw-r--r--kafka/consumer/group.py6
2 files changed, 6 insertions, 6 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index c00681d..f5d44b1 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -42,11 +42,11 @@ class Fetcher(six.Iterator):
'check_crcs': True,
'skip_double_compressed_messages': False,
'iterator_refetch_records': 1, # undocumented -- interface may change
+ 'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0),
}
- def __init__(self, client, subscriptions, metrics, metric_group_prefix,
- **configs):
+ def __init__(self, client, subscriptions, metrics, **configs):
"""Initialize a Kafka Message Fetcher.
Keyword Arguments:
@@ -94,7 +94,7 @@ class Fetcher(six.Iterator):
self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
self._fetch_futures = collections.deque()
- self._sensors = FetchManagerMetrics(metrics, metric_group_prefix)
+ self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 7dde29a..d4e0ff3 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -239,6 +239,7 @@ class KafkaConsumer(six.Iterator):
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
+ 'metric_group_prefix': 'consumer',
'selector': selectors.DefaultSelector,
'exclude_internal_topics': True,
'sasl_mechanism': None,
@@ -268,7 +269,6 @@ class KafkaConsumer(six.Iterator):
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
- metric_group_prefix = 'consumer'
# TODO _metrics likely needs to be passed to KafkaClient, etc.
# api_version was previously a str. accept old format for now
@@ -289,9 +289,9 @@ class KafkaConsumer(six.Iterator):
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
- self._client, self._subscription, self._metrics, metric_group_prefix, **self.config)
+ self._client, self._subscription, self._metrics, **self.config)
self._coordinator = ConsumerCoordinator(
- self._client, self._subscription, self._metrics, metric_group_prefix,
+ self._client, self._subscription, self._metrics,
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False