summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
authorTaras Voinarovskiy <voyn1991@gmail.com>2017-07-30 15:42:27 +0000
committerTaras Voinarovskiy <voyn1991@gmail.com>2017-08-07 09:34:08 +0000
commit39f0e50b9441609e9dce4e60a1ab2c3f16680476 (patch)
tree2b94ed93bec5ae4f072360c5072cc22b0685f8a1 /kafka/consumer/group.py
parentda25df6d3c6380e27bf638f3620613d05ac9fd03 (diff)
downloadkafka-python-39f0e50b9441609e9dce4e60a1ab2c3f16680476.tar.gz
Added basic support for offsets_for_times API. Still needs to group by nodes and send in parallel.
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py42
1 files changed, 41 insertions, 1 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 6adb154..f9b8f16 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -6,7 +6,7 @@ import socket
import sys
import time
-from kafka.errors import KafkaConfigurationError
+from kafka.errors import KafkaConfigurationError, UnsupportedVersionError
from kafka.vendor import six
@@ -861,6 +861,46 @@ class KafkaConsumer(six.Iterator):
metrics[k.group][k.name] = v.value()
return metrics
+ def offsets_for_times(self, timestamps):
+ """
+ Look up the offsets for the given partitions by timestamp. The returned
+ offset for each partition is the earliest offset whose timestamp is
+ greater than or equal to the given timestamp in the corresponding
+ partition.
+
+ This is a blocking call. The consumer does not have to be assigned the
+ partitions.
+
+ If the message format version in a partition is before 0.10.0, i.e.
+ the messages do not have timestamps, ``None`` will be returned for that
+ partition.
+
+ Note:
+ Notice that this method may block indefinitely if the partition
+ does not exist.
+
+ Arguments:
+ timestamps (dict): ``{TopicPartition: int}`` mapping from partition
+ to the timestamp to look up.
+
+ Raises:
+ ValueError: if the target timestamp is negative
+ UnsupportedVersionError: if the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: if fetch failed in request_timeout_ms
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ for tp, ts in timestamps.items():
+ if ts < 0:
+ raise ValueError(
+ "The target time for partition {} is {}. The target time "
+ "cannot be negative.".format(tp, ts))
+ return self._fetcher.get_offsets_by_times(
+ timestamps, self.config['request_timeout_ms'])
+
def _use_consumer_group(self):
"""Return True iff this consumer can/should join a broker-coordinated group."""
if self.config['api_version'] < (0, 9):