diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-24 08:44:46 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-24 08:44:46 -0700 |
commit | d388b48951327955a9a9031a229f02880e2c6f05 (patch) | |
tree | a0f7fb764114f9573a24d7e91a1aa4ba103be928 /kafka/client_async.py | |
parent | ce9c1d2e2b8d85b2f6c3b2a2ebd280246cfea07f (diff) | |
download | kafka-python-d388b48951327955a9a9031a229f02880e2c6f05.tar.gz |
Dont do client wakeup when sending from sender thread (#1761)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 369dc3e..682fd7c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -321,14 +321,15 @@ class KafkaClient(object): log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() - def maybe_connect(self, node_id): + def maybe_connect(self, node_id, wakeup=True): """Queues a node for asynchronous connection during the next .poll()""" if self._can_connect(node_id): self._connecting.add(node_id) # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding # the client lock in poll(). - self.wakeup() + if wakeup: + self.wakeup() return True return False @@ -499,7 +500,7 @@ class KafkaClient(object): return False return conn.connected() and conn.can_send_more() - def send(self, node_id, request): + def send(self, node_id, request, wakeup=True): """Send a request to a specific node. Bytes are placed on an internal per-connection send-queue. Actual network I/O will be triggered in a subsequent call to .poll() @@ -507,6 +508,7 @@ class KafkaClient(object): Arguments: node_id (int): destination node request (Struct): request object (not-encoded) + wakeup (bool): optional flag to disable thread-wakeup Raises: AssertionError: if node_id is not in current cluster metadata @@ -526,7 +528,8 @@ class KafkaClient(object): # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding # the client lock in poll(). - self.wakeup() + if wakeup: + self.wakeup() return future |