summaryrefslogtreecommitdiff
path: root/kafka/coordinator/base.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-08 08:01:48 -0800
committerGitHub <noreply@github.com>2019-03-08 08:01:48 -0800
commit8c0792581d8a38822c01b40f5d3926c659b0c439 (patch)
tree8b39433a185984b71984a9301d3ed991bdf0fbe3 /kafka/coordinator/base.py
parent7a99013668b798aaa0acffcf382a7e48e7bd41c1 (diff)
downloadkafka-python-8c0792581d8a38822c01b40f5d3926c659b0c439.tar.gz
Do network connections and writes in KafkaClient.poll() (#1729)
* Add BrokerConnection.send_pending_requests to support async network sends * Send network requests during KafkaClient.poll() rather than in KafkaClient.send() * Dont acquire lock during KafkaClient.send if node is connected / ready * Move all network connection IO into KafkaClient.poll()
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r--kafka/coordinator/base.py4
1 files changed, 2 insertions, 2 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 1435183..664e8d2 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -252,7 +252,7 @@ class BaseCoordinator(object):
if self.config['api_version'] < (0, 8, 2):
self.coordinator_id = self._client.least_loaded_node()
if self.coordinator_id is not None:
- self._client.ready(self.coordinator_id)
+ self._client.maybe_connect(self.coordinator_id)
continue
future = self.lookup_coordinator()
@@ -686,7 +686,7 @@ class BaseCoordinator(object):
self.coordinator_id = response.coordinator_id
log.info("Discovered coordinator %s for group %s",
self.coordinator_id, self.group_id)
- self._client.ready(self.coordinator_id)
+ self._client.maybe_connect(self.coordinator_id)
self.heartbeat.reset_timeouts()
future.success(self.coordinator_id)