summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-09-29 17:04:17 -0700
committerGitHub <noreply@github.com>2019-09-29 17:04:17 -0700
commit392d674be6641078717a4d87e471916c9a4bbb22 (patch)
treebaee4165315b4c99939a944a6a7b893f0c17e36b /kafka/conn.py
parent9de12d3f03236988a60e6cd79a50ffa5165cf735 (diff)
downloadkafka-python-392d674be6641078717a4d87e471916c9a4bbb22.tar.gz
Send socket data via non-blocking IO with send buffer (#1912)
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py80
1 files changed, 72 insertions, 8 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 5ea5436..815065b 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -289,6 +289,7 @@ class BrokerConnection(object):
self.state = ConnectionStates.DISCONNECTED
self._reset_reconnect_backoff()
self._sock = None
+ self._send_buffer = b''
self._ssl_context = None
if self.config['ssl_context'] is not None:
self._ssl_context = self.config['ssl_context']
@@ -557,6 +558,32 @@ class BrokerConnection(object):
'kafka-python does not support SASL mechanism %s' %
self.config['sasl_mechanism']))
+ def _send_bytes(self, data):
+ """Send some data via non-blocking IO
+
+ Note: this method is not synchronized internally; you should
+ always hold the _lock before calling
+
+ Returns: number of bytes
+ Raises: socket exception
+ """
+ total_sent = 0
+ while total_sent < len(data):
+ try:
+ sent_bytes = self._sock.send(data[total_sent:])
+ total_sent += sent_bytes
+ except (SSLWantReadError, SSLWantWriteError):
+ break
+ except (ConnectionError, TimeoutError) as e:
+ if six.PY2 and e.errno == errno.EWOULDBLOCK:
+ break
+ raise
+ except BlockingIOError:
+ if six.PY3:
+ break
+ raise
+ return total_sent
+
def _send_bytes_blocking(self, data):
self._sock.settimeout(self.config['request_timeout_ms'] / 1000)
total_sent = 0
@@ -839,6 +866,7 @@ class BrokerConnection(object):
self._protocol = KafkaProtocol(
client_id=self.config['client_id'],
api_version=self.config['api_version'])
+ self._send_buffer = b''
if error is None:
error = Errors.Cancelled(str(self))
ifrs = list(self.in_flight_requests.items())
@@ -901,24 +929,60 @@ class BrokerConnection(object):
return future
def send_pending_requests(self):
- """Can block on network if request is larger than send_buffer_bytes"""
+ """Attempts to send pending requests messages via blocking IO
+ If all requests have been sent, return True
+ Otherwise, if the socket is blocked and there are more bytes to send,
+ return False.
+ """
try:
with self._lock:
if not self._can_send_recv():
- return Errors.NodeNotReadyError(str(self))
- # In the future we might manage an internal write buffer
- # and send bytes asynchronously. For now, just block
- # sending each request payload
+ return False
data = self._protocol.send_bytes()
total_bytes = self._send_bytes_blocking(data)
+
if self._sensors:
self._sensors.bytes_sent.record(total_bytes)
- return total_bytes
+ return True
+
except (ConnectionError, TimeoutError) as e:
log.exception("Error sending request data to %s", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
- return error
+ return False
+
+ def send_pending_requests_v2(self):
+ """Attempts to send pending requests messages via non-blocking IO
+ If all requests have been sent, return True
+ Otherwise, if the socket is blocked and there are more bytes to send,
+ return False.
+ """
+ try:
+ with self._lock:
+ if not self._can_send_recv():
+ return False
+
+ # _protocol.send_bytes returns encoded requests to send
+ # we send them via _send_bytes()
+ # and hold leftover bytes in _send_buffer
+ if not self._send_buffer:
+ self._send_buffer = self._protocol.send_bytes()
+
+ total_bytes = 0
+ if self._send_buffer:
+ total_bytes = self._send_bytes(self._send_buffer)
+ self._send_buffer = self._send_buffer[total_bytes:]
+
+ if self._sensors:
+ self._sensors.bytes_sent.record(total_bytes)
+ # Return True iff send buffer is empty
+ return len(self._send_buffer) == 0
+
+ except (ConnectionError, TimeoutError, Exception) as e:
+ log.exception("Error sending request data to %s", self)
+ error = Errors.KafkaConnectionError("%s: %s" % (self, e))
+ self.close(error=error)
+ return False
def can_send_more(self):
"""Return True unless there are max_in_flight_requests_per_connection."""
@@ -979,7 +1043,7 @@ class BrokerConnection(object):
else:
recvd.append(data)
- except SSLWantReadError:
+ except (SSLWantReadError, SSLWantWriteError):
break
except (ConnectionError, TimeoutError) as e:
if six.PY2 and e.errno == errno.EWOULDBLOCK: