summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py31
1 files changed, 20 insertions, 11 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 0a8c02c..59d2ff7 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -194,18 +194,27 @@ class Connection(object):
:param timeout: poll timeout in seconds
"""
- if self._consume_loop_stopped:
- return None
- timeout = timeout if timeout >= 0 else self.consumer_timeout
- try:
- messages = self._poll_messages(timeout)
- except kafka.errors.ConsumerTimeout as e:
- raise driver_common.Timeout(e.message)
- except Exception:
- LOG.exception(_LE("Failed to consume messages"))
- messages = None
- return messages
+ def _raise_timeout(exc):
+ raise driver_common.Timeout(exc.message)
+
+ timer = driver_common.DecayingTimer(duration=timeout)
+ timer.start()
+
+ poll_timeout = (self.consumer_timeout if timeout is None
+ else min(timeout, self.consumer_timeout))
+
+ while True:
+ if self._consume_loop_stopped:
+ return
+ try:
+ return self._poll_messages(poll_timeout)
+ except kafka.errors.ConsumerTimeout as exc:
+ poll_timeout = timer.check_return(
+ _raise_timeout, exc, maximum=self.consumer_timeout)
+ except Exception:
+ LOG.exception(_LE("Failed to consume messages"))
+ return
def stop_consuming(self):
self._consume_loop_stopped = True