diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-01-24 21:24:21 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-01-24 21:24:21 -0800 |
commit | 2c7b7452a8ca761672e70ee56b3779e4a96c1997 (patch) | |
tree | 0c81eb7336ac4a8d8bc70704993b3b7d9738a5d4 /kafka/coordinator/consumer.py | |
parent | 077dc4742ffa82584946379790424faf4c6ba47f (diff) | |
parent | c02b2711f1b18bba85155f8bf402b5b9824b6502 (diff) | |
download | kafka-python-2c7b7452a8ca761672e70ee56b3779e4a96c1997.tar.gz |
Merge pull request #516 from dpkp/group_id_none
Support group_id=None to disable offset commits and group membership
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r-- | kafka/coordinator/consumer.py | 25 |
1 files changed, 17 insertions, 8 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 7390ab3..263dac0 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -75,18 +75,24 @@ class ConsumerCoordinator(BaseCoordinator): if key in configs: self.config[key] = configs[key] - self._cluster = client.cluster + if self.config['api_version'] >= (0, 9) and self.config['group_id'] is not None: + assert self.config['assignors'], 'Coordinator requires assignors' + self._subscription = subscription self._partitions_per_topic = {} - self._auto_commit_task = None - if self.config['api_version'] >= (0, 9): - assert self.config['assignors'], 'Coordinator require assignors' - + self._cluster = client.cluster self._cluster.request_update() self._cluster.add_listener(self._handle_metadata_update) - if self.config['api_version'] >= (0, 8, 1): - if self.config['enable_auto_commit']: + self._auto_commit_task = None + if self.config['enable_auto_commit']: + if self.config['api_version'] < (0, 8, 1): + log.warning('Broker version (%s) does not support offset' + ' commits; disabling auto-commit.', + self.config['api_version']) + elif self.config['group_id'] is None: + log.warning('group_id is None: disabling auto-commit.') + else: interval = self.config['auto_commit_interval_ms'] / 1000.0 self._auto_commit_task = AutoCommitTask(self, interval) @@ -127,7 +133,10 @@ class ConsumerCoordinator(BaseCoordinator): # check if there are any changes to the metadata which should trigger # a rebalance if self._subscription_metadata_changed(): - if self.config['api_version'] >= (0, 9): + + if (self.config['api_version'] >= (0, 9) + and self.config['group_id'] is not None): + self._subscription.mark_for_reassignment() # If we haven't got group coordinator support, |