diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-09-29 17:04:17 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-29 17:04:17 -0700 |
commit | 392d674be6641078717a4d87e471916c9a4bbb22 (patch) | |
tree | baee4165315b4c99939a944a6a7b893f0c17e36b /kafka/conn.py | |
parent | 9de12d3f03236988a60e6cd79a50ffa5165cf735 (diff) | |
download | kafka-python-392d674be6641078717a4d87e471916c9a4bbb22.tar.gz |
Send socket data via non-blocking IO with send buffer (#1912)
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 80 |
1 files changed, 72 insertions, 8 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 5ea5436..815065b 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -289,6 +289,7 @@ class BrokerConnection(object): self.state = ConnectionStates.DISCONNECTED self._reset_reconnect_backoff() self._sock = None + self._send_buffer = b'' self._ssl_context = None if self.config['ssl_context'] is not None: self._ssl_context = self.config['ssl_context'] @@ -557,6 +558,32 @@ class BrokerConnection(object): 'kafka-python does not support SASL mechanism %s' % self.config['sasl_mechanism'])) + def _send_bytes(self, data): + """Send some data via non-blocking IO + + Note: this method is not synchronized internally; you should + always hold the _lock before calling + + Returns: number of bytes + Raises: socket exception + """ + total_sent = 0 + while total_sent < len(data): + try: + sent_bytes = self._sock.send(data[total_sent:]) + total_sent += sent_bytes + except (SSLWantReadError, SSLWantWriteError): + break + except (ConnectionError, TimeoutError) as e: + if six.PY2 and e.errno == errno.EWOULDBLOCK: + break + raise + except BlockingIOError: + if six.PY3: + break + raise + return total_sent + def _send_bytes_blocking(self, data): self._sock.settimeout(self.config['request_timeout_ms'] / 1000) total_sent = 0 @@ -839,6 +866,7 @@ class BrokerConnection(object): self._protocol = KafkaProtocol( client_id=self.config['client_id'], api_version=self.config['api_version']) + self._send_buffer = b'' if error is None: error = Errors.Cancelled(str(self)) ifrs = list(self.in_flight_requests.items()) @@ -901,24 +929,60 @@ class BrokerConnection(object): return future def send_pending_requests(self): - """Can block on network if request is larger than send_buffer_bytes""" + """Attempts to send pending requests messages via blocking IO + If all requests have been sent, return True + Otherwise, if the socket is blocked and there are more bytes to send, + return False. + """ try: with self._lock: if not self._can_send_recv(): - return Errors.NodeNotReadyError(str(self)) - # In the future we might manage an internal write buffer - # and send bytes asynchronously. For now, just block - # sending each request payload + return False data = self._protocol.send_bytes() total_bytes = self._send_bytes_blocking(data) + if self._sensors: self._sensors.bytes_sent.record(total_bytes) - return total_bytes + return True + except (ConnectionError, TimeoutError) as e: log.exception("Error sending request data to %s", self) error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) - return error + return False + + def send_pending_requests_v2(self): + """Attempts to send pending requests messages via non-blocking IO + If all requests have been sent, return True + Otherwise, if the socket is blocked and there are more bytes to send, + return False. + """ + try: + with self._lock: + if not self._can_send_recv(): + return False + + # _protocol.send_bytes returns encoded requests to send + # we send them via _send_bytes() + # and hold leftover bytes in _send_buffer + if not self._send_buffer: + self._send_buffer = self._protocol.send_bytes() + + total_bytes = 0 + if self._send_buffer: + total_bytes = self._send_bytes(self._send_buffer) + self._send_buffer = self._send_buffer[total_bytes:] + + if self._sensors: + self._sensors.bytes_sent.record(total_bytes) + # Return True iff send buffer is empty + return len(self._send_buffer) == 0 + + except (ConnectionError, TimeoutError, Exception) as e: + log.exception("Error sending request data to %s", self) + error = Errors.KafkaConnectionError("%s: %s" % (self, e)) + self.close(error=error) + return False def can_send_more(self): """Return True unless there are max_in_flight_requests_per_connection.""" @@ -979,7 +1043,7 @@ class BrokerConnection(object): else: recvd.append(data) - except SSLWantReadError: + except (SSLWantReadError, SSLWantWriteError): break except (ConnectionError, TimeoutError) as e: if six.PY2 and e.errno == errno.EWOULDBLOCK: |