summaryrefslogtreecommitdiff
path: root/kafka/consumer/subscription_state.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/subscription_state.py')
-rw-r--r--kafka/consumer/subscription_state.py5
1 files changed, 5 insertions, 0 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 4b0b275..ef50166 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -382,6 +382,9 @@ class TopicPartitionState(object):
self._position = None # offset exposed to the user
self.highwater = None
self.drop_pending_message_set = False
+ # The last message offset hint available from a message batch with
+ # magic=2 which includes deleted compacted messages
+ self.last_offset_from_message_batch = None
def _set_position(self, offset):
assert self.has_valid_position, 'Valid position required'
@@ -396,6 +399,7 @@ class TopicPartitionState(object):
self.awaiting_reset = True
self.reset_strategy = strategy
self._position = None
+ self.last_offset_from_message_batch = None
self.has_valid_position = False
def seek(self, offset):
@@ -404,6 +408,7 @@ class TopicPartitionState(object):
self.reset_strategy = None
self.has_valid_position = True
self.drop_pending_message_set = True
+ self.last_offset_from_message_batch = None
def pause(self):
self.paused = True