summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-17 08:37:15 -0700
committerGitHub <noreply@github.com>2016-07-17 08:37:15 -0700
commit31a29ecea000ad8e95b0ecb1b8e11f9600029135 (patch)
treea530f29e741526a1b26dc3926eba018f7f82fbc1 /kafka/coordinator/consumer.py
parent506d023978e7273bd323c0750e3f77af259d257b (diff)
downloadkafka-python-31a29ecea000ad8e95b0ecb1b8e11f9600029135.tar.gz
KAFKA-2832: Add a consumer config option to exclude internal topics (#765)
Use exclude_internal_topics config in KafkaConsumer to avoid subscribe patterns matching internal topics Raise error during rebalance if subscribed topics are not authorized
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py12
1 files changed, 8 insertions, 4 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 083a36a..2543238 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -36,6 +36,7 @@ class ConsumerCoordinator(BaseCoordinator):
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
'api_version': (0, 9),
+ 'exclude_internal_topics': True,
}
def __init__(self, client, subscription, metrics, metric_group_prefix,
@@ -70,6 +71,10 @@ class ConsumerCoordinator(BaseCoordinator):
using Kafka's group managementment facilities. Default: 30000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
+ exclude_internal_topics (bool): Whether records from internal topics
+ (such as offsets) should be exposed to the consumer. If set to
+ True the only way to receive records from an internal topic is
+ subscribing to it. Requires 0.10+. Default: True
"""
super(ConsumerCoordinator, self).__init__(client, **configs)
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -131,13 +136,12 @@ class ConsumerCoordinator(BaseCoordinator):
def _handle_metadata_update(self, cluster):
# if we encounter any unauthorized topics, raise an exception
- # TODO
- #if self._cluster.unauthorized_topics:
- # raise TopicAuthorizationError(self._cluster.unauthorized_topics)
+ if cluster.unauthorized_topics:
+ raise Errors.TopicAuthorizationFailedError(cluster.unauthorized_topics)
if self._subscription.subscribed_pattern:
topics = []
- for topic in cluster.topics():
+ for topic in cluster.topics(self.config['exclude_internal_topics']):
if self._subscription.subscribed_pattern.match(topic):
topics.append(topic)