diff options
-rw-r--r-- | kafka/consumer/new.py | 32 |
1 files changed, 25 insertions, 7 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index e7d38de..04696af 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -534,11 +534,11 @@ class KafkaConsumer(object): LATEST = -1 EARLIEST = -2 - RequestTime = None + request_time_ms = None if self._config['auto_offset_reset'] == 'largest': - RequestTime = LATEST + request_time_ms = LATEST elif self._config['auto_offset_reset'] == 'smallest': - RequestTime = EARLIEST + request_time_ms = EARLIEST else: # Let's raise an reasonable exception type if user calls @@ -553,12 +553,30 @@ class KafkaConsumer(object): # the request that triggered it, and we do not want to drop that raise - (offset, ) = self.get_partition_offsets(topic, partition, RequestTime, - num_offsets=1) + (offset, ) = self.get_partition_offsets(topic, partition, + request_time_ms, max_num_offsets=1) return offset - def get_partition_offsets(self, topic, partition, request_time, num_offsets): - reqs = [OffsetRequest(topic, partition, request_time, num_offsets)] + def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets): + """ + Request available fetch offsets for a single topic/partition + + @param topic (str) + @param partition (int) + @param request_time_ms (int) -- Used to ask for all messages before a + certain time (ms). There are two special + values. Specify -1 to receive the latest + offset (i.e. the offset of the next coming + message) and -2 to receive the earliest + available offset. Note that because offsets + are pulled in descending order, asking for + the earliest offset will always return you + a single element. + @param max_num_offsets (int) + + @return offsets (list) + """ + reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] (resp,) = self._client.send_offset_request(reqs) |