summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py19
1 files changed, 19 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index c1eb03e..36e269f 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -439,6 +439,14 @@ class Fetcher(six.Iterator):
try:
batch = records.next_batch()
while batch is not None:
+
+ # LegacyRecordBatch cannot access either base_offset or last_offset_delta
+ try:
+ self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \
+ batch.last_offset_delta
+ except AttributeError:
+ pass
+
for record in batch:
key_size = len(record.key) if record.key is not None else -1
value_size = len(record.value) if record.value is not None else -1
@@ -643,6 +651,17 @@ class Fetcher(six.Iterator):
for partition in self._fetchable_partitions():
node_id = self._client.cluster.leader_for_partition(partition)
+
+ # advance position for any deleted compacted messages if required
+ if self._subscriptions.assignment[partition].last_offset_from_message_batch:
+ next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1
+ if next_offset_from_batch_header > self._subscriptions.assignment[partition].position:
+ log.debug(
+ "Advance position for partition %s from %s to %s (last message batch location plus one)"
+ " to correct for deleted compacted messages",
+ partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header)
+ self._subscriptions.assignment[partition].position = next_offset_from_batch_header
+
position = self._subscriptions.assignment[partition].position
# fetch if there is a leader and no in-flight requests