summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-09-29 17:04:17 -0700
committerGitHub <noreply@github.com>2019-09-29 17:04:17 -0700
commit392d674be6641078717a4d87e471916c9a4bbb22 (patch)
treebaee4165315b4c99939a944a6a7b893f0c17e36b /kafka/client_async.py
parent9de12d3f03236988a60e6cd79a50ffa5165cf735 (diff)
downloadkafka-python-392d674be6641078717a4d87e471916c9a4bbb22.tar.gz
Send socket data via non-blocking IO with send buffer (#1912)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py29
1 files changed, 27 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index ac2d364..9b9cb8f 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -207,6 +207,7 @@ class KafkaClient(object):
self._conns = Dict() # object to support weakrefs
self._api_versions = None
self._connecting = set()
+ self._sending = set()
self._refresh_on_disconnects = True
self._last_bootstrap = 0
self._bootstrap_fails = 0
@@ -532,6 +533,7 @@ class KafkaClient(object):
# we will need to call send_pending_requests()
# to trigger network I/O
future = conn.send(request, blocking=False)
+ self._sending.add(conn)
# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
@@ -604,14 +606,23 @@ class KafkaClient(object):
return responses
+ def _register_send_sockets(self):
+ while self._sending:
+ conn = self._sending.pop()
+ try:
+ key = self._selector.get_key(conn._sock)
+ events = key.events | selectors.EVENT_WRITE
+ self._selector.modify(key.fileobj, events, key.data)
+ except KeyError:
+ self._selector.register(conn._sock, selectors.EVENT_WRITE, conn)
+
def _poll(self, timeout):
# This needs to be locked, but since it is only called from within the
# locked section of poll(), there is no additional lock acquisition here
processed = set()
# Send pending requests first, before polling for responses
- for conn in six.itervalues(self._conns):
- conn.send_pending_requests()
+ self._register_send_sockets()
start_select = time.time()
ready = self._selector.select(timeout)
@@ -623,10 +634,24 @@ class KafkaClient(object):
if key.fileobj is self._wake_r:
self._clear_wake_fd()
continue
+
+ # Send pending requests if socket is ready to write
if events & selectors.EVENT_WRITE:
conn = key.data
if conn.connecting():
conn.connect()
+ else:
+ if conn.send_pending_requests_v2():
+ # If send is complete, we dont need to track write readiness
+ # for this socket anymore
+ if key.events ^ selectors.EVENT_WRITE:
+ self._selector.modify(
+ key.fileobj,
+ key.events ^ selectors.EVENT_WRITE,
+ key.data)
+ else:
+ self._selector.unregister(key.fileobj)
+
if not (events & selectors.EVENT_READ):
continue
conn = key.data