summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py42
1 files changed, 41 insertions, 1 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 6adb154..f9b8f16 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -6,7 +6,7 @@ import socket
import sys
import time
-from kafka.errors import KafkaConfigurationError
+from kafka.errors import KafkaConfigurationError, UnsupportedVersionError
from kafka.vendor import six
@@ -861,6 +861,46 @@ class KafkaConsumer(six.Iterator):
metrics[k.group][k.name] = v.value()
return metrics
+ def offsets_for_times(self, timestamps):
+ """
+ Look up the offsets for the given partitions by timestamp. The returned
+ offset for each partition is the earliest offset whose timestamp is
+ greater than or equal to the given timestamp in the corresponding
+ partition.
+
+ This is a blocking call. The consumer does not have to be assigned the
+ partitions.
+
+ If the message format version in a partition is before 0.10.0, i.e.
+ the messages do not have timestamps, ``None`` will be returned for that
+ partition.
+
+ Note:
+ Notice that this method may block indefinitely if the partition
+ does not exist.
+
+ Arguments:
+ timestamps (dict): ``{TopicPartition: int}`` mapping from partition
+ to the timestamp to look up.
+
+ Raises:
+ ValueError: if the target timestamp is negative
+ UnsupportedVersionError: if the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: if fetch failed in request_timeout_ms
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ for tp, ts in timestamps.items():
+ if ts < 0:
+ raise ValueError(
+ "The target time for partition {} is {}. The target time "
+ "cannot be negative.".format(tp, ts))
+ return self._fetcher.get_offsets_by_times(
+ timestamps, self.config['request_timeout_ms'])
+
def _use_consumer_group(self):
"""Return True iff this consumer can/should join a broker-coordinated group."""
if self.config['api_version'] < (0, 9):