From 45d26b6d32d1b4382c2a1ce0194111ac8051e124 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 10 Jan 2016 15:42:26 -0800 Subject: Check delayed task timeout in client.poll() --- kafka/client_async.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index 30d4d6f..1838aed 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -302,7 +302,7 @@ class KafkaClient(object): self._finish_connect(node_id) # Send a metadata request if needed - metadata_timeout = self._maybe_refresh_metadata() + metadata_timeout_ms = self._maybe_refresh_metadata() # Send scheduled tasks for task, task_future in self._delayed_tasks.pop_ready(): @@ -314,7 +314,9 @@ class KafkaClient(object): else: task_future.success(result) - timeout = min(timeout_ms, metadata_timeout, + task_timeout_ms = max(0, 1000 * ( + self._delayed_tasks.next_at() - time.time())) + timeout = min(timeout_ms, metadata_timeout_ms, task_timeout_ms, self.config['request_timeout_ms']) timeout /= 1000.0 -- cgit v1.2.1