summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py10
1 files changed, 7 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 96c0647..ac2d364 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -267,9 +267,9 @@ class KafkaClient(object):
if node_id not in self._connecting:
self._connecting.add(node_id)
try:
- self._selector.register(sock, selectors.EVENT_WRITE)
+ self._selector.register(sock, selectors.EVENT_WRITE, conn)
except KeyError:
- self._selector.modify(sock, selectors.EVENT_WRITE)
+ self._selector.modify(sock, selectors.EVENT_WRITE, conn)
if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()
@@ -623,7 +623,11 @@ class KafkaClient(object):
if key.fileobj is self._wake_r:
self._clear_wake_fd()
continue
- elif not (events & selectors.EVENT_READ):
+ if events & selectors.EVENT_WRITE:
+ conn = key.data
+ if conn.connecting():
+ conn.connect()
+ if not (events & selectors.EVENT_READ):
continue
conn = key.data
processed.add(conn)