diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-08 08:01:48 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-08 08:01:48 -0800 |
commit | 8c0792581d8a38822c01b40f5d3926c659b0c439 (patch) | |
tree | 8b39433a185984b71984a9301d3ed991bdf0fbe3 /kafka/conn.py | |
parent | 7a99013668b798aaa0acffcf382a7e48e7bd41c1 (diff) | |
download | kafka-python-8c0792581d8a38822c01b40f5d3926c659b0c439.tar.gz |
Do network connections and writes in KafkaClient.poll() (#1729)
* Add BrokerConnection.send_pending_requests to support async network sends
* Send network requests during KafkaClient.poll() rather than in KafkaClient.send()
* Dont acquire lock during KafkaClient.send if node is connected / ready
* Move all network connection IO into KafkaClient.poll()
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 49 |
1 files changed, 30 insertions, 19 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 7dfc8bd..6b5aff9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -733,11 +733,8 @@ class BrokerConnection(object): future.failure(error) self.config['state_change_callback'](self) - def send(self, request): - """send request, return Future() - - Can block on network if request is larger than send_buffer_bytes - """ + def send(self, request, blocking=True): + """Queue request for async network send, return Future()""" future = Future() if self.connecting(): return future.failure(Errors.NodeNotReadyError(str(self))) @@ -745,35 +742,49 @@ class BrokerConnection(object): return future.failure(Errors.KafkaConnectionError(str(self))) elif not self.can_send_more(): return future.failure(Errors.TooManyInFlightRequests(str(self))) - return self._send(request) + return self._send(request, blocking=blocking) - def _send(self, request): + def _send(self, request, blocking=True): assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED) future = Future() correlation_id = self._protocol.send_request(request) + + # Attempt to replicate behavior from prior to introduction of + # send_pending_requests() / async sends + if blocking: + error = self.send_pending_requests() + if isinstance(error, Exception): + future.failure(error) + return future + + log.debug('%s Request %d: %s', self, correlation_id, request) + if request.expect_response(): + sent_time = time.time() + ifr = (correlation_id, future, sent_time) + self.in_flight_requests.append(ifr) + else: + future.success(None) + return future + + def send_pending_requests(self): + """Can block on network if request is larger than send_buffer_bytes""" + if self.state not in (ConnectionStates.AUTHENTICATING, + ConnectionStates.CONNECTED): + return Errors.NodeNotReadyError(str(self)) data = self._protocol.send_bytes() try: # In the future we might manage an internal write buffer # and send bytes asynchronously. For now, just block # sending each request payload - sent_time = time.time() total_bytes = self._send_bytes_blocking(data) if self._sensors: self._sensors.bytes_sent.record(total_bytes) + return total_bytes except ConnectionError as e: - log.exception("Error sending %s to %s", request, self) + log.exception("Error sending request data to %s", self) error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) - return future.failure(error) - log.debug('%s Request %d: %s', self, correlation_id, request) - - if request.expect_response(): - ifr = (correlation_id, future, sent_time) - self.in_flight_requests.append(ifr) - else: - future.success(None) - - return future + return error def can_send_more(self): """Return True unless there are max_in_flight_requests_per_connection.""" |