summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py15
1 files changed, 12 insertions, 3 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index f552038..52ebb4b 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -133,6 +133,18 @@ class Fetcher(six.Iterator):
self._clean_done_fetch_futures()
return futures
+ def reset_offsets_if_needed(self, partitions):
+ """Lookup and set offsets for any partitions which are awaiting an
+ explicit reset.
+
+ Arguments:
+ partitions (set of TopicPartitions): the partitions to reset
+ """
+ for tp in partitions:
+ # TODO: If there are several offsets to reset, we could submit offset requests in parallel
+ if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
+ self._reset_offset(tp)
+
def _clean_done_fetch_futures(self):
while True:
if not self._fetch_futures:
@@ -167,9 +179,6 @@ class Fetcher(six.Iterator):
" update", tp)
continue
- # TODO: If there are several offsets to reset,
- # we could submit offset requests in parallel
- # for now, each call to _reset_offset will block
if self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)
elif self._subscriptions.assignment[tp].committed is None: