summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-08 08:01:48 -0800
committerGitHub <noreply@github.com>2019-03-08 08:01:48 -0800
commit8c0792581d8a38822c01b40f5d3926c659b0c439 (patch)
tree8b39433a185984b71984a9301d3ed991bdf0fbe3 /kafka/conn.py
parent7a99013668b798aaa0acffcf382a7e48e7bd41c1 (diff)
downloadkafka-python-8c0792581d8a38822c01b40f5d3926c659b0c439.tar.gz
Do network connections and writes in KafkaClient.poll() (#1729)
* Add BrokerConnection.send_pending_requests to support async network sends * Send network requests during KafkaClient.poll() rather than in KafkaClient.send() * Dont acquire lock during KafkaClient.send if node is connected / ready * Move all network connection IO into KafkaClient.poll()
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py49
1 files changed, 30 insertions, 19 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 7dfc8bd..6b5aff9 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -733,11 +733,8 @@ class BrokerConnection(object):
future.failure(error)
self.config['state_change_callback'](self)
- def send(self, request):
- """send request, return Future()
-
- Can block on network if request is larger than send_buffer_bytes
- """
+ def send(self, request, blocking=True):
+ """Queue request for async network send, return Future()"""
future = Future()
if self.connecting():
return future.failure(Errors.NodeNotReadyError(str(self)))
@@ -745,35 +742,49 @@ class BrokerConnection(object):
return future.failure(Errors.KafkaConnectionError(str(self)))
elif not self.can_send_more():
return future.failure(Errors.TooManyInFlightRequests(str(self)))
- return self._send(request)
+ return self._send(request, blocking=blocking)
- def _send(self, request):
+ def _send(self, request, blocking=True):
assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED)
future = Future()
correlation_id = self._protocol.send_request(request)
+
+ # Attempt to replicate behavior from prior to introduction of
+ # send_pending_requests() / async sends
+ if blocking:
+ error = self.send_pending_requests()
+ if isinstance(error, Exception):
+ future.failure(error)
+ return future
+
+ log.debug('%s Request %d: %s', self, correlation_id, request)
+ if request.expect_response():
+ sent_time = time.time()
+ ifr = (correlation_id, future, sent_time)
+ self.in_flight_requests.append(ifr)
+ else:
+ future.success(None)
+ return future
+
+ def send_pending_requests(self):
+ """Can block on network if request is larger than send_buffer_bytes"""
+ if self.state not in (ConnectionStates.AUTHENTICATING,
+ ConnectionStates.CONNECTED):
+ return Errors.NodeNotReadyError(str(self))
data = self._protocol.send_bytes()
try:
# In the future we might manage an internal write buffer
# and send bytes asynchronously. For now, just block
# sending each request payload
- sent_time = time.time()
total_bytes = self._send_bytes_blocking(data)
if self._sensors:
self._sensors.bytes_sent.record(total_bytes)
+ return total_bytes
except ConnectionError as e:
- log.exception("Error sending %s to %s", request, self)
+ log.exception("Error sending request data to %s", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
- return future.failure(error)
- log.debug('%s Request %d: %s', self, correlation_id, request)
-
- if request.expect_response():
- ifr = (correlation_id, future, sent_time)
- self.in_flight_requests.append(ifr)
- else:
- future.success(None)
-
- return future
+ return error
def can_send_more(self):
"""Return True unless there are max_in_flight_requests_per_connection."""