diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index f552038..52ebb4b 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -133,6 +133,18 @@ class Fetcher(six.Iterator): self._clean_done_fetch_futures() return futures + def reset_offsets_if_needed(self, partitions): + """Lookup and set offsets for any partitions which are awaiting an + explicit reset. + + Arguments: + partitions (set of TopicPartitions): the partitions to reset + """ + for tp in partitions: + # TODO: If there are several offsets to reset, we could submit offset requests in parallel + if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp): + self._reset_offset(tp) + def _clean_done_fetch_futures(self): while True: if not self._fetch_futures: @@ -167,9 +179,6 @@ class Fetcher(six.Iterator): " update", tp) continue - # TODO: If there are several offsets to reset, - # we could submit offset requests in parallel - # for now, each call to _reset_offset will block if self._subscriptions.is_offset_reset_needed(tp): self._reset_offset(tp) elif self._subscriptions.assignment[tp].committed is None: |