diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 19 |
1 files changed, 19 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c1eb03e..36e269f 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -439,6 +439,14 @@ class Fetcher(six.Iterator): try: batch = records.next_batch() while batch is not None: + + # LegacyRecordBatch cannot access either base_offset or last_offset_delta + try: + self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \ + batch.last_offset_delta + except AttributeError: + pass + for record in batch: key_size = len(record.key) if record.key is not None else -1 value_size = len(record.value) if record.value is not None else -1 @@ -643,6 +651,17 @@ class Fetcher(six.Iterator): for partition in self._fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) + + # advance position for any deleted compacted messages if required + if self._subscriptions.assignment[partition].last_offset_from_message_batch: + next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1 + if next_offset_from_batch_header > self._subscriptions.assignment[partition].position: + log.debug( + "Advance position for partition %s from %s to %s (last message batch location plus one)" + " to correct for deleted compacted messages", + partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header) + self._subscriptions.assignment[partition].position = next_offset_from_batch_header + position = self._subscriptions.assignment[partition].position # fetch if there is a leader and no in-flight requests |