From ba7afd9bc9362055ec0bedcf53eb6f8909dc22d2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 15 Aug 2017 13:00:02 -0700 Subject: BrokerConnection receive bytes pipe (#1032) --- kafka/client_async.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index 4e4e835..80e8494 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -599,25 +599,14 @@ class KafkaClient(object): continue self._idle_expiry_manager.update(conn.node_id) - - # Accumulate as many responses as the connection has pending - while conn.in_flight_requests: - response = conn.recv() # Note: conn.recv runs callbacks / errbacks - - # Incomplete responses are buffered internally - # while conn.in_flight_requests retains the request - if not response: - break - responses.append(response) + responses.extend(conn.recv()) # Note: conn.recv runs callbacks / errbacks # Check for additional pending SSL bytes if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): # TODO: optimize for conn in self._conns.values(): if conn not in processed and conn.connected() and conn._sock.pending(): - response = conn.recv() - if response: - responses.append(response) + responses.extend(conn.recv()) for conn in six.itervalues(self._conns): if conn.requests_timed_out(): @@ -629,6 +618,7 @@ class KafkaClient(object): if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000) + self._maybe_close_oldest_connection() return responses -- cgit v1.2.1