diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 29 |
1 files changed, 19 insertions, 10 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a83d5da..cbfd720 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -585,12 +585,11 @@ class KafkaConsumer(six.Iterator): dict: Map of topic to list of records (may be empty). """ if self._use_consumer_group(): - self._coordinator.ensure_coordinator_known() self._coordinator.ensure_active_group() # 0.8.2 brokers support kafka-backed offset storage via group coordinator elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_coordinator_ready() # Fetch positions if we have partitions we're subscribed to that we # don't know the offset for @@ -835,6 +834,8 @@ class KafkaConsumer(six.Iterator): Returns: set: {topic, ...} """ + if self._subscription.subscription is None: + return None return self._subscription.subscription.copy() def unsubscribe(self): @@ -988,26 +989,34 @@ class KafkaConsumer(six.Iterator): NoOffsetForPartitionError: If no offset is stored for a given partition and no offset reset policy is defined. """ - if (self.config['api_version'] >= (0, 8, 1) and - self.config['group_id'] is not None): + # Lookup any positions for partitions which are awaiting reset (which may be the + # case if the user called seekToBeginning or seekToEnd. We do this check first to + # avoid an unnecessary lookup of committed offsets (which typically occurs when + # the user is manually assigning partitions and managing their own offsets). + self._fetcher.reset_offsets_if_needed(partitions) - # Refresh commits for all assigned partitions - self._coordinator.refresh_committed_offsets_if_needed() + if not self._subscription.has_all_fetch_positions(): + # if we still don't have offsets for all partitions, then we should either seek + # to the last committed position or reset using the auto reset policy + if (self.config['api_version'] >= (0, 8, 1) and + self.config['group_id'] is not None): + # first refresh commits for all assigned partitions + self._coordinator.refresh_committed_offsets_if_needed() - # Then, do any offset lookups in case some positions are not known - self._fetcher.update_fetch_positions(partitions) + # Then, do any offset lookups in case some positions are not known + self._fetcher.update_fetch_positions(partitions) def _message_generator(self): assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' while time.time() < self._consumer_timeout: if self._use_consumer_group(): - self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_coordinator_ready() self._coordinator.ensure_active_group() # 0.8.2 brokers support kafka-backed offset storage via group coordinator elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_coordinator_ready() # Fetch offsets for any subscribed partitions that we arent tracking yet if not self._subscription.has_all_fetch_positions(): |