diff options
author | Zuul <zuul@review.opendev.org> | 2021-09-13 16:39:10 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2021-09-13 16:39:10 +0000 |
commit | ef0b31f1120ab10d83d1d56030ccd71494b2289a (patch) | |
tree | 60f272550b4d24cab06221d230523096ade65fa0 | |
parent | 230404240a468908313e60e4bbded161f9b16434 (diff) | |
parent | bdcf915e788bb368774e5462ccc15e6f5b7223d7 (diff) | |
download | oslo-messaging-ef0b31f1120ab10d83d1d56030ccd71494b2289a.tar.gz |
Merge "limit maximum timeout in the poll loop"
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 4 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_rabbit.py | 27 |
2 files changed, 29 insertions, 2 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index cdc21c5..589baf5 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -351,7 +351,7 @@ class AMQPListener(base.PollStyleListener): self.conn.consume(timeout=min(self._current_timeout, left)) except rpc_common.Timeout: LOG.debug("AMQPListener connection timeout") - self._current_timeout = max(self._current_timeout * 2, + self._current_timeout = min(self._current_timeout * 2, ACK_REQUEUE_EVERY_SECONDS_MAX) else: self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN @@ -490,7 +490,7 @@ class ReplyWaiter(object): # ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds self.conn.consume(timeout=current_timeout) except rpc_common.Timeout: - current_timeout = max(current_timeout * 2, + current_timeout = min(current_timeout * 2, ACK_REQUEUE_EVERY_SECONDS_MAX) except Exception: LOG.exception("Failed to process incoming message, retrying..") diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index e035150..f8882f2 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -1080,3 +1080,30 @@ class ConnectionLockTestCase(test_utils.BaseTestCase): t2 = self._thread(lock, 1) self.assertAlmostEqual(1, t1(), places=0) self.assertAlmostEqual(2, t2(), places=0) + + +class TestPollTimeoutLimit(test_utils.BaseTestCase): + def test_poll_timeout_limit(self): + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + driver = transport._driver + target = oslo_messaging.Target(topic='testtopic') + listener = driver.listen(target, None, None)._poll_style_listener + + thread = threading.Thread(target=listener.poll) + thread.daemon = True + thread.start() + time.sleep(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX * 2) + + try: + # timeout should not grow past the maximum + self.assertEqual(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX, + listener._current_timeout) + + finally: + # gracefully stop waiting + driver.send(target, + {}, + {'tx_id': 'test'}) + thread.join() |