diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-08 08:01:48 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-08 08:01:48 -0800 |
commit | 8c0792581d8a38822c01b40f5d3926c659b0c439 (patch) | |
tree | 8b39433a185984b71984a9301d3ed991bdf0fbe3 /kafka/client_async.py | |
parent | 7a99013668b798aaa0acffcf382a7e48e7bd41c1 (diff) | |
download | kafka-python-8c0792581d8a38822c01b40f5d3926c659b0c439.tar.gz |
Do network connections and writes in KafkaClient.poll() (#1729)
* Add BrokerConnection.send_pending_requests to support async network sends
* Send network requests during KafkaClient.poll() rather than in KafkaClient.send()
* Dont acquire lock during KafkaClient.send if node is connected / ready
* Move all network connection IO into KafkaClient.poll()
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 59 |
1 files changed, 43 insertions, 16 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index e2bdda9..d608e6a 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -304,7 +304,10 @@ class KafkaClient(object): # SSL connections can enter this state 2x (second during Handshake) if node_id not in self._connecting: self._connecting.add(node_id) + try: self._selector.register(conn._sock, selectors.EVENT_WRITE) + except KeyError: + self._selector.modify(conn._sock, selectors.EVENT_WRITE) elif conn.connected(): log.debug("Node %s connected", node_id) @@ -312,10 +315,10 @@ class KafkaClient(object): self._connecting.remove(node_id) try: - self._selector.unregister(conn._sock) + self._selector.modify(conn._sock, selectors.EVENT_READ, conn) except KeyError: - pass - self._selector.register(conn._sock, selectors.EVENT_READ, conn) + self._selector.register(conn._sock, selectors.EVENT_READ, conn) + if self._sensors: self._sensors.connection_created.record() @@ -336,6 +339,7 @@ class KafkaClient(object): self._selector.unregister(conn._sock) except KeyError: pass + if self._sensors: self._sensors.connection_closed.record() @@ -348,6 +352,17 @@ class KafkaClient(object): log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() + def maybe_connect(self, node_id): + """Queues a node for asynchronous connection during the next .poll()""" + if self._can_connect(node_id): + self._connecting.add(node_id) + # Wakeup signal is useful in case another thread is + # blocked waiting for incoming network traffic while holding + # the client lock in poll(). + self.wakeup() + return True + return False + def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" with self._lock: @@ -397,7 +412,7 @@ class KafkaClient(object): Returns: bool: True if we are ready to send to the given node """ - self._maybe_connect(node_id) + self.maybe_connect(node_id) return self.is_ready(node_id, metadata_priority=metadata_priority) def connected(self, node_id): @@ -499,14 +514,15 @@ class KafkaClient(object): return True def _can_send_request(self, node_id): - with self._lock: - if node_id not in self._conns: - return False - conn = self._conns[node_id] - return conn.connected() and conn.can_send_more() + conn = self._conns.get(node_id) + if not conn: + return False + return conn.connected() and conn.can_send_more() def send(self, node_id, request): - """Send a request to a specific node. + """Send a request to a specific node. Bytes are placed on an + internal per-connection send-queue. Actual network I/O will be + triggered in a subsequent call to .poll() Arguments: node_id (int): destination node @@ -518,11 +534,21 @@ class KafkaClient(object): Returns: Future: resolves to Response struct or Error """ - with self._lock: - if not self._maybe_connect(node_id): - return Future().failure(Errors.NodeNotReadyError(node_id)) + if not self._can_send_request(node_id): + self.maybe_connect(node_id) + return Future().failure(Errors.NodeNotReadyError(node_id)) + + # conn.send will queue the request internally + # we will need to call send_pending_requests() + # to trigger network I/O + future = self._conns[node_id].send(request, blocking=False) - return self._conns[node_id].send(request) + # Wakeup signal is useful in case another thread is + # blocked waiting for incoming network traffic while holding + # the client lock in poll(). + self.wakeup() + + return future def poll(self, timeout_ms=None, future=None): """Try to read and write to sockets. @@ -640,6 +666,8 @@ class KafkaClient(object): conn.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % conn.config['request_timeout_ms'])) + else: + conn.send_pending_requests() if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000) @@ -801,9 +829,8 @@ class KafkaClient(object): # have such application level configuration, using request timeout instead. return self.config['request_timeout_ms'] - if self._can_connect(node_id): + if self.maybe_connect(node_id): log.debug("Initializing connection to node %s for metadata request", node_id) - self._maybe_connect(node_id) return self.config['reconnect_backoff_ms'] # connected but can't send more, OR connecting |