diff options
author | Jenkins <jenkins@review.openstack.org> | 2017-01-06 08:01:16 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2017-01-06 08:01:16 +0000 |
commit | 234e920b6f57f2569e57f08f14a7e9987ae3a0d6 (patch) | |
tree | b1be8ae5aa85b9547d104a13c71d652fe5351e14 | |
parent | a4c28cc577bde66c6631f34152832aed23e655ca (diff) | |
parent | 488594936a52145c778c89fc88adca722ae8bd72 (diff) | |
download | oslo-messaging-234e920b6f57f2569e57f08f14a7e9987ae3a0d6.tar.gz |
Merge "kafka: return to poller when timeout is reach"
-rw-r--r-- | oslo_messaging/_drivers/impl_kafka.py | 31 |
1 files changed, 20 insertions, 11 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 0a8c02c..59d2ff7 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -194,18 +194,27 @@ class Connection(object): :param timeout: poll timeout in seconds """ - if self._consume_loop_stopped: - return None - timeout = timeout if timeout >= 0 else self.consumer_timeout - try: - messages = self._poll_messages(timeout) - except kafka.errors.ConsumerTimeout as e: - raise driver_common.Timeout(e.message) - except Exception: - LOG.exception(_LE("Failed to consume messages")) - messages = None - return messages + def _raise_timeout(exc): + raise driver_common.Timeout(exc.message) + + timer = driver_common.DecayingTimer(duration=timeout) + timer.start() + + poll_timeout = (self.consumer_timeout if timeout is None + else min(timeout, self.consumer_timeout)) + + while True: + if self._consume_loop_stopped: + return + try: + return self._poll_messages(poll_timeout) + except kafka.errors.ConsumerTimeout as exc: + poll_timeout = timer.check_return( + _raise_timeout, exc, maximum=self.consumer_timeout) + except Exception: + LOG.exception(_LE("Failed to consume messages")) + return def stop_consuming(self): self._consume_loop_stopped = True |