From 1fd596062fba5ce4236623249ffafcf0be985282 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 8 Jan 2016 15:47:37 -0800 Subject: Avoid CPU spinnning when there are no sockets to read --- kafka/client_async.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index 1c74c6f..fa498e9 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -334,6 +334,14 @@ class KafkaClient(object): if (conn.state is ConnectionStates.CONNECTED and conn.in_flight_requests)]) if not sockets: + # if sockets are connecting, we can wake when they are writeable + if self._connecting: + sockets = [self._conns[node]._sock for node in self._connecting] + select.select([], sockets, [], timeout) + # otherwise just sleep to prevent CPU spinning + else: + log.debug('Nothing to do in _poll -- sleeping for %s', timeout) + time.sleep(timeout) return [] ready, _, _ = select.select(list(sockets.keys()), [], [], timeout) -- cgit v1.2.1 From 5fa8c88d6f369b3eceae7f34296b56cfd92d1f90 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 10 Jan 2016 19:52:16 -0800 Subject: If a completed future is polled, do not block --- kafka/client_async.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index fa498e9..3a1922e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -314,14 +314,21 @@ class KafkaClient(object): else: task_future.success(result) - timeout = min( - timeout_ms, - metadata_timeout_ms, - self._delayed_tasks.next_at() * 1000, - self.config['request_timeout_ms']) - timeout = max(0, timeout / 1000.0) + # If we got a future that is already done, dont block in _poll + if future and future.is_done: + timeout = 0 + else: + timeout = min( + timeout_ms, + metadata_timeout_ms, + self._delayed_tasks.next_at() * 1000, + self.config['request_timeout_ms']) + timeout = max(0, timeout / 1000.0) # avoid negative timeouts responses.extend(self._poll(timeout)) + + # If all we had was a timeout (future is None) - only do one poll + # If we do have a future, we keep looping until it is done if not future or future.is_done: break -- cgit v1.2.1 From d2f136073cac0c8379f357cd76b0ea163fd22a99 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 10 Jan 2016 19:53:07 -0800 Subject: Receive all available responses in client._poll --- kafka/client_async.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index 3a1922e..88b8ec6 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -354,11 +354,12 @@ class KafkaClient(object): ready, _, _ = select.select(list(sockets.keys()), [], [], timeout) responses = [] - # list, not iterator, because inline callbacks may add to self._conns for sock in ready: conn = sockets[sock] - response = conn.recv() # Note: conn.recv runs callbacks / errbacks - if response: + while conn.in_flight_requests: + response = conn.recv() # Note: conn.recv runs callbacks / errbacks + if not response: + break responses.append(response) return responses -- cgit v1.2.1