summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
authorTaras Voinarovskiy <voyn1991@gmail.com>2017-08-05 17:19:54 +0000
committerTaras Voinarovskiy <voyn1991@gmail.com>2017-08-07 09:34:09 +0000
commit1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5 (patch)
treeb7521c2a10dedc958bdc506a3fc5d5ce420e4ba5 /kafka/consumer/fetcher.py
parentefc03d083d323e35a2d32bcbdbccc053f737836e (diff)
downloadkafka-python-1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5.tar.gz
Added `beginning_offsets` and `end_offsets` API's and fixed @jeffwidman review issues
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py23
1 files changed, 19 insertions, 4 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 1a3dfd5..6a7b794 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -193,6 +193,21 @@ class Fetcher(six.Iterator):
offsets[tp] = OffsetAndTimestamp(offset, timestamp)
return offsets
+ def beginning_offsets(self, partitions, timeout_ms):
+ return self.beginning_or_end_offset(
+ partitions, OffsetResetStrategy.EARLIEST, timeout_ms)
+
+ def end_offsets(self, partitions, timeout_ms):
+ return self.beginning_or_end_offset(
+ partitions, OffsetResetStrategy.LATEST, timeout_ms)
+
+ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
+ timestamps = dict([(tp, timestamp) for tp in partitions])
+ offsets = self._retrieve_offsets(timestamps, timeout_ms)
+ for tp in timestamps:
+ offsets[tp] = offsets[tp][0]
+ return offsets
+
def _reset_offset(self, partition):
"""Reset offsets for the given partition using the offset reset strategy.
@@ -222,10 +237,10 @@ class Fetcher(six.Iterator):
self._subscriptions.seek(partition, offset)
def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
- """ Fetch offset for each partition passed in ``timestamps`` map.
+ """Fetch offset for each partition passed in ``timestamps`` map.
Blocks until offsets are obtained, a non-retriable exception is raised
- or ``timeout_ms`` passed (if it's not ``None``).
+ or ``timeout_ms`` passed.
Arguments:
timestamps: {TopicPartition: int} dict with timestamps to fetch
@@ -268,7 +283,7 @@ class Fetcher(six.Iterator):
remaining_ms = timeout_ms - elapsed_ms
raise Errors.KafkaTimeoutError(
- "Failed to get offsets by times in %s ms" % timeout_ms)
+ "Failed to get offsets by timestamps in %s ms" % timeout_ms)
def _raise_if_offset_out_of_range(self):
"""Check FetchResponses for offset out of range.
@@ -613,7 +628,7 @@ class Fetcher(six.Iterator):
return f(bytes_)
def _send_offset_requests(self, timestamps):
- """ Fetch offsets for each partition in timestamps dict. This may send
+ """Fetch offsets for each partition in timestamps dict. This may send
request to multiple nodes, based on who is Leader for partition.
Arguments: