diff options
author | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-05 17:19:54 +0000 |
---|---|---|
committer | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-07 09:34:09 +0000 |
commit | 1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5 (patch) | |
tree | b7521c2a10dedc958bdc506a3fc5d5ce420e4ba5 /kafka/consumer/fetcher.py | |
parent | efc03d083d323e35a2d32bcbdbccc053f737836e (diff) | |
download | kafka-python-1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5.tar.gz |
Added `beginning_offsets` and `end_offsets` API's and fixed @jeffwidman review issues
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 23 |
1 files changed, 19 insertions, 4 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1a3dfd5..6a7b794 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -193,6 +193,21 @@ class Fetcher(six.Iterator): offsets[tp] = OffsetAndTimestamp(offset, timestamp) return offsets + def beginning_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.EARLIEST, timeout_ms) + + def end_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.LATEST, timeout_ms) + + def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): + timestamps = dict([(tp, timestamp) for tp in partitions]) + offsets = self._retrieve_offsets(timestamps, timeout_ms) + for tp in timestamps: + offsets[tp] = offsets[tp][0] + return offsets + def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. @@ -222,10 +237,10 @@ class Fetcher(six.Iterator): self._subscriptions.seek(partition, offset) def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): - """ Fetch offset for each partition passed in ``timestamps`` map. + """Fetch offset for each partition passed in ``timestamps`` map. Blocks until offsets are obtained, a non-retriable exception is raised - or ``timeout_ms`` passed (if it's not ``None``). + or ``timeout_ms`` passed. Arguments: timestamps: {TopicPartition: int} dict with timestamps to fetch @@ -268,7 +283,7 @@ class Fetcher(six.Iterator): remaining_ms = timeout_ms - elapsed_ms raise Errors.KafkaTimeoutError( - "Failed to get offsets by times in %s ms" % timeout_ms) + "Failed to get offsets by timestamps in %s ms" % timeout_ms) def _raise_if_offset_out_of_range(self): """Check FetchResponses for offset out of range. @@ -613,7 +628,7 @@ class Fetcher(six.Iterator): return f(bytes_) def _send_offset_requests(self, timestamps): - """ Fetch offsets for each partition in timestamps dict. This may send + """Fetch offsets for each partition in timestamps dict. This may send request to multiple nodes, based on who is Leader for partition. Arguments: |