diff options
author | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-07-30 15:42:27 +0000 |
---|---|---|
committer | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-07 09:34:08 +0000 |
commit | 39f0e50b9441609e9dce4e60a1ab2c3f16680476 (patch) | |
tree | 2b94ed93bec5ae4f072360c5072cc22b0685f8a1 /kafka/consumer/group.py | |
parent | da25df6d3c6380e27bf638f3620613d05ac9fd03 (diff) | |
download | kafka-python-39f0e50b9441609e9dce4e60a1ab2c3f16680476.tar.gz |
Added basic support for offsets_for_times API. Still needs to group by nodes and send in parallel.
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 42 |
1 files changed, 41 insertions, 1 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 6adb154..f9b8f16 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -6,7 +6,7 @@ import socket import sys import time -from kafka.errors import KafkaConfigurationError +from kafka.errors import KafkaConfigurationError, UnsupportedVersionError from kafka.vendor import six @@ -861,6 +861,46 @@ class KafkaConsumer(six.Iterator): metrics[k.group][k.name] = v.value() return metrics + def offsets_for_times(self, timestamps): + """ + Look up the offsets for the given partitions by timestamp. The returned + offset for each partition is the earliest offset whose timestamp is + greater than or equal to the given timestamp in the corresponding + partition. + + This is a blocking call. The consumer does not have to be assigned the + partitions. + + If the message format version in a partition is before 0.10.0, i.e. + the messages do not have timestamps, ``None`` will be returned for that + partition. + + Note: + Notice that this method may block indefinitely if the partition + does not exist. + + Arguments: + timestamps (dict): ``{TopicPartition: int}`` mapping from partition + to the timestamp to look up. + + Raises: + ValueError: if the target timestamp is negative + UnsupportedVersionError: if the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: if fetch failed in request_timeout_ms + """ + if self.config['api_version'] <= (0, 10, 0): + raise UnsupportedVersionError( + "offsets_for_times API not supported for cluster version {}" + .format(self.config['api_version'])) + for tp, ts in timestamps.items(): + if ts < 0: + raise ValueError( + "The target time for partition {} is {}. The target time " + "cannot be negative.".format(tp, ts)) + return self._fetcher.get_offsets_by_times( + timestamps, self.config['request_timeout_ms']) + def _use_consumer_group(self): """Return True iff this consumer can/should join a broker-coordinated group.""" if self.config['api_version'] < (0, 9): |