diff options
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/fetcher.py | 6 | ||||
-rw-r--r-- | kafka/consumer/group.py | 6 |
2 files changed, 6 insertions, 6 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c00681d..f5d44b1 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -42,11 +42,11 @@ class Fetcher(six.Iterator): 'check_crcs': True, 'skip_double_compressed_messages': False, 'iterator_refetch_records': 1, # undocumented -- interface may change + 'metric_group_prefix': 'consumer', 'api_version': (0, 8, 0), } - def __init__(self, client, subscriptions, metrics, metric_group_prefix, - **configs): + def __init__(self, client, subscriptions, metrics, **configs): """Initialize a Kafka Message Fetcher. Keyword Arguments: @@ -94,7 +94,7 @@ class Fetcher(six.Iterator): self._record_too_large_partitions = dict() # {topic_partition: offset} self._iterator = None self._fetch_futures = collections.deque() - self._sensors = FetchManagerMetrics(metrics, metric_group_prefix) + self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) def init_fetches(self): """Send FetchRequests asynchronously for all assigned partitions. diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 7dde29a..d4e0ff3 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -239,6 +239,7 @@ class KafkaConsumer(six.Iterator): 'metric_reporters': [], 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, + 'metric_group_prefix': 'consumer', 'selector': selectors.DefaultSelector, 'exclude_internal_topics': True, 'sasl_mechanism': None, @@ -268,7 +269,6 @@ class KafkaConsumer(six.Iterator): tags=metrics_tags) reporters = [reporter() for reporter in self.config['metric_reporters']] self._metrics = Metrics(metric_config, reporters) - metric_group_prefix = 'consumer' # TODO _metrics likely needs to be passed to KafkaClient, etc. # api_version was previously a str. accept old format for now @@ -289,9 +289,9 @@ class KafkaConsumer(six.Iterator): self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._fetcher = Fetcher( - self._client, self._subscription, self._metrics, metric_group_prefix, **self.config) + self._client, self._subscription, self._metrics, **self.config) self._coordinator = ConsumerCoordinator( - self._client, self._subscription, self._metrics, metric_group_prefix, + self._client, self._subscription, self._metrics, assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False |