summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-24 08:44:46 -0700
committerGitHub <noreply@github.com>2019-03-24 08:44:46 -0700
commitd388b48951327955a9a9031a229f02880e2c6f05 (patch)
treea0f7fb764114f9573a24d7e91a1aa4ba103be928 /kafka/client_async.py
parentce9c1d2e2b8d85b2f6c3b2a2ebd280246cfea07f (diff)
downloadkafka-python-d388b48951327955a9a9031a229f02880e2c6f05.tar.gz
Dont do client wakeup when sending from sender thread (#1761)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py11
1 files changed, 7 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 369dc3e..682fd7c 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -321,14 +321,15 @@ class KafkaClient(object):
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
- def maybe_connect(self, node_id):
+ def maybe_connect(self, node_id, wakeup=True):
"""Queues a node for asynchronous connection during the next .poll()"""
if self._can_connect(node_id):
self._connecting.add(node_id)
# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
# the client lock in poll().
- self.wakeup()
+ if wakeup:
+ self.wakeup()
return True
return False
@@ -499,7 +500,7 @@ class KafkaClient(object):
return False
return conn.connected() and conn.can_send_more()
- def send(self, node_id, request):
+ def send(self, node_id, request, wakeup=True):
"""Send a request to a specific node. Bytes are placed on an
internal per-connection send-queue. Actual network I/O will be
triggered in a subsequent call to .poll()
@@ -507,6 +508,7 @@ class KafkaClient(object):
Arguments:
node_id (int): destination node
request (Struct): request object (not-encoded)
+ wakeup (bool): optional flag to disable thread-wakeup
Raises:
AssertionError: if node_id is not in current cluster metadata
@@ -526,7 +528,8 @@ class KafkaClient(object):
# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
# the client lock in poll().
- self.wakeup()
+ if wakeup:
+ self.wakeup()
return future