summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py29
1 files changed, 19 insertions, 10 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index a83d5da..cbfd720 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -585,12 +585,11 @@ class KafkaConsumer(six.Iterator):
dict: Map of topic to list of records (may be empty).
"""
if self._use_consumer_group():
- self._coordinator.ensure_coordinator_known()
self._coordinator.ensure_active_group()
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_coordinator_ready()
# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
@@ -835,6 +834,8 @@ class KafkaConsumer(six.Iterator):
Returns:
set: {topic, ...}
"""
+ if self._subscription.subscription is None:
+ return None
return self._subscription.subscription.copy()
def unsubscribe(self):
@@ -988,26 +989,34 @@ class KafkaConsumer(six.Iterator):
NoOffsetForPartitionError: If no offset is stored for a given
partition and no offset reset policy is defined.
"""
- if (self.config['api_version'] >= (0, 8, 1) and
- self.config['group_id'] is not None):
+ # Lookup any positions for partitions which are awaiting reset (which may be the
+ # case if the user called seekToBeginning or seekToEnd. We do this check first to
+ # avoid an unnecessary lookup of committed offsets (which typically occurs when
+ # the user is manually assigning partitions and managing their own offsets).
+ self._fetcher.reset_offsets_if_needed(partitions)
- # Refresh commits for all assigned partitions
- self._coordinator.refresh_committed_offsets_if_needed()
+ if not self._subscription.has_all_fetch_positions():
+ # if we still don't have offsets for all partitions, then we should either seek
+ # to the last committed position or reset using the auto reset policy
+ if (self.config['api_version'] >= (0, 8, 1) and
+ self.config['group_id'] is not None):
+ # first refresh commits for all assigned partitions
+ self._coordinator.refresh_committed_offsets_if_needed()
- # Then, do any offset lookups in case some positions are not known
- self._fetcher.update_fetch_positions(partitions)
+ # Then, do any offset lookups in case some positions are not known
+ self._fetcher.update_fetch_positions(partitions)
def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:
if self._use_consumer_group():
- self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_coordinator_ready()
self._coordinator.ensure_active_group()
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_coordinator_ready()
# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():