diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 369dc3e..682fd7c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -321,14 +321,15 @@ class KafkaClient(object): log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() - def maybe_connect(self, node_id): + def maybe_connect(self, node_id, wakeup=True): """Queues a node for asynchronous connection during the next .poll()""" if self._can_connect(node_id): self._connecting.add(node_id) # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding # the client lock in poll(). - self.wakeup() + if wakeup: + self.wakeup() return True return False @@ -499,7 +500,7 @@ class KafkaClient(object): return False return conn.connected() and conn.can_send_more() - def send(self, node_id, request): + def send(self, node_id, request, wakeup=True): """Send a request to a specific node. Bytes are placed on an internal per-connection send-queue. Actual network I/O will be triggered in a subsequent call to .poll() @@ -507,6 +508,7 @@ class KafkaClient(object): Arguments: node_id (int): destination node request (Struct): request object (not-encoded) + wakeup (bool): optional flag to disable thread-wakeup Raises: AssertionError: if node_id is not in current cluster metadata @@ -526,7 +528,8 @@ class KafkaClient(object): # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding # the client lock in poll(). - self.wakeup() + if wakeup: + self.wakeup() return future |