diff options
Diffstat (limited to 'kafka/consumer/subscription_state.py')
-rw-r--r-- | kafka/consumer/subscription_state.py | 5 |
1 files changed, 5 insertions, 0 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 4b0b275..ef50166 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -382,6 +382,9 @@ class TopicPartitionState(object): self._position = None # offset exposed to the user self.highwater = None self.drop_pending_message_set = False + # The last message offset hint available from a message batch with + # magic=2 which includes deleted compacted messages + self.last_offset_from_message_batch = None def _set_position(self, offset): assert self.has_valid_position, 'Valid position required' @@ -396,6 +399,7 @@ class TopicPartitionState(object): self.awaiting_reset = True self.reset_strategy = strategy self._position = None + self.last_offset_from_message_batch = None self.has_valid_position = False def seek(self, offset): @@ -404,6 +408,7 @@ class TopicPartitionState(object): self.reset_strategy = None self.has_valid_position = True self.drop_pending_message_set = True + self.last_offset_from_message_batch = None def pause(self): self.paused = True |