diff options
-rw-r--r-- | oslo_messaging/_drivers/impl_kafka.py | 31 |
1 files changed, 20 insertions, 11 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 0a8c02c..59d2ff7 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -194,18 +194,27 @@ class Connection(object): :param timeout: poll timeout in seconds """ - if self._consume_loop_stopped: - return None - timeout = timeout if timeout >= 0 else self.consumer_timeout - try: - messages = self._poll_messages(timeout) - except kafka.errors.ConsumerTimeout as e: - raise driver_common.Timeout(e.message) - except Exception: - LOG.exception(_LE("Failed to consume messages")) - messages = None - return messages + def _raise_timeout(exc): + raise driver_common.Timeout(exc.message) + + timer = driver_common.DecayingTimer(duration=timeout) + timer.start() + + poll_timeout = (self.consumer_timeout if timeout is None + else min(timeout, self.consumer_timeout)) + + while True: + if self._consume_loop_stopped: + return + try: + return self._poll_messages(poll_timeout) + except kafka.errors.ConsumerTimeout as exc: + poll_timeout = timer.check_return( + _raise_timeout, exc, maximum=self.consumer_timeout) + except Exception: + LOG.exception(_LE("Failed to consume messages")) + return def stop_consuming(self): self._consume_loop_stopped = True |