diff options
author | Mehdi Abaakouk <sileht@redhat.com> | 2016-12-08 10:05:17 +0100 |
---|---|---|
committer | Mehdi Abaakouk <sileht@redhat.com> | 2017-01-02 11:46:48 +0100 |
commit | 488594936a52145c778c89fc88adca722ae8bd72 (patch) | |
tree | 6698bf846bd288ede8c589332377d3a00eecb3db | |
parent | a7044799ad5a0f2b34ca1d8c23a5a31825e891a7 (diff) | |
download | oslo-messaging-488594936a52145c778c89fc88adca722ae8bd72.tar.gz |
kafka: return to poller when timeout is reach
consume() must return only if user timeout is reached and not
when driver consumer_timeout is reached.
Change-Id: I6b2b2a28038a194224e79fa37285436ca6787a0a
-rw-r--r-- | oslo_messaging/_drivers/impl_kafka.py | 31 |
1 files changed, 20 insertions, 11 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 0a8c02c..59d2ff7 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -194,18 +194,27 @@ class Connection(object): :param timeout: poll timeout in seconds """ - if self._consume_loop_stopped: - return None - timeout = timeout if timeout >= 0 else self.consumer_timeout - try: - messages = self._poll_messages(timeout) - except kafka.errors.ConsumerTimeout as e: - raise driver_common.Timeout(e.message) - except Exception: - LOG.exception(_LE("Failed to consume messages")) - messages = None - return messages + def _raise_timeout(exc): + raise driver_common.Timeout(exc.message) + + timer = driver_common.DecayingTimer(duration=timeout) + timer.start() + + poll_timeout = (self.consumer_timeout if timeout is None + else min(timeout, self.consumer_timeout)) + + while True: + if self._consume_loop_stopped: + return + try: + return self._poll_messages(poll_timeout) + except kafka.errors.ConsumerTimeout as exc: + poll_timeout = timer.check_return( + _raise_timeout, exc, maximum=self.consumer_timeout) + except Exception: + LOG.exception(_LE("Failed to consume messages")) + return def stop_consuming(self): self._consume_loop_stopped = True |