summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2017-01-06 08:01:16 +0000
committerGerrit Code Review <review@openstack.org>2017-01-06 08:01:16 +0000
commit234e920b6f57f2569e57f08f14a7e9987ae3a0d6 (patch)
treeb1be8ae5aa85b9547d104a13c71d652fe5351e14
parenta4c28cc577bde66c6631f34152832aed23e655ca (diff)
parent488594936a52145c778c89fc88adca722ae8bd72 (diff)
downloadoslo-messaging-234e920b6f57f2569e57f08f14a7e9987ae3a0d6.tar.gz
Merge "kafka: return to poller when timeout is reach"
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py31
1 files changed, 20 insertions, 11 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 0a8c02c..59d2ff7 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -194,18 +194,27 @@ class Connection(object):
:param timeout: poll timeout in seconds
"""
- if self._consume_loop_stopped:
- return None
- timeout = timeout if timeout >= 0 else self.consumer_timeout
- try:
- messages = self._poll_messages(timeout)
- except kafka.errors.ConsumerTimeout as e:
- raise driver_common.Timeout(e.message)
- except Exception:
- LOG.exception(_LE("Failed to consume messages"))
- messages = None
- return messages
+ def _raise_timeout(exc):
+ raise driver_common.Timeout(exc.message)
+
+ timer = driver_common.DecayingTimer(duration=timeout)
+ timer.start()
+
+ poll_timeout = (self.consumer_timeout if timeout is None
+ else min(timeout, self.consumer_timeout))
+
+ while True:
+ if self._consume_loop_stopped:
+ return
+ try:
+ return self._poll_messages(poll_timeout)
+ except kafka.errors.ConsumerTimeout as exc:
+ poll_timeout = timer.check_return(
+ _raise_timeout, exc, maximum=self.consumer_timeout)
+ except Exception:
+ LOG.exception(_LE("Failed to consume messages"))
+ return
def stop_consuming(self):
self._consume_loop_stopped = True