summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-11 17:11:31 -0700
committerGitHub <noreply@github.com>2017-10-11 17:11:31 -0700
commitcfddc6bd179e236874e00a899e9349d5c9a54400 (patch)
tree5b3c851f0d127f53adfb1c58680f6c2e6ff2fa9f /kafka/coordinator/consumer.py
parentf04435c5ed97fef0975a77a8dc7bae7c284bba63 (diff)
downloadkafka-python-cfddc6bd179e236874e00a899e9349d5c9a54400.tar.gz
KAFKA-4034: Avoid unnecessary consumer coordinator lookup (#1254)
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py28
1 files changed, 24 insertions, 4 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 84c62df..0328837 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -315,7 +315,7 @@ class ConsumerCoordinator(BaseCoordinator):
return {}
while True:
- self.ensure_coordinator_known()
+ self.ensure_coordinator_ready()
# contact coordinator to fetch committed offsets
future = self._send_offset_fetch_request(partitions)
@@ -353,9 +353,29 @@ class ConsumerCoordinator(BaseCoordinator):
response will be either an Exception or a OffsetCommitResponse
struct. This callback can be used to trigger custom actions when
a commit request completes.
- Returns:
- Future: indicating whether the commit was successful or not
"""
+ if not self.coordinator_unknown():
+ self._do_commit_offsets_async(offsets, callback)
+ else:
+ # we don't know the current coordinator, so try to find it and then
+ # send the commit or fail (we don't want recursive retries which can
+ # cause offset commits to arrive out of order). Note that there may
+ # be multiple offset commits chained to the same coordinator lookup
+ # request. This is fine because the listeners will be invoked in the
+ # same order that they were added. Note also that BaseCoordinator
+ # prevents multiple concurrent coordinator lookup requests.
+ future = self.lookup_coordinator()
+ future.add_callback(self._do_commit_offsets_async, offsets, callback)
+ if callback:
+ future.add_errback(callback)
+
+ # ensure the commit has a chance to be transmitted (without blocking on
+ # its completion). Note that commits are treated as heartbeats by the
+ # coordinator, so there is no need to explicitly allow heartbeats
+ # through delayed task execution.
+ self._client.poll() # no wakeup if we add that feature
+
+ def _do_commit_offsets_async(self, offsets, callback=None):
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
@@ -386,7 +406,7 @@ class ConsumerCoordinator(BaseCoordinator):
return
while True:
- self.ensure_coordinator_known()
+ self.ensure_coordinator_ready()
future = self._send_offset_commit_request(offsets)
self._client.poll(future=future)