summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2021-09-13 16:39:10 +0000
committerGerrit Code Review <review@openstack.org>2021-09-13 16:39:10 +0000
commitef0b31f1120ab10d83d1d56030ccd71494b2289a (patch)
tree60f272550b4d24cab06221d230523096ade65fa0
parent230404240a468908313e60e4bbded161f9b16434 (diff)
parentbdcf915e788bb368774e5462ccc15e6f5b7223d7 (diff)
downloadoslo-messaging-ef0b31f1120ab10d83d1d56030ccd71494b2289a.tar.gz
Merge "limit maximum timeout in the poll loop"
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py4
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py27
2 files changed, 29 insertions, 2 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index cdc21c5..589baf5 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -351,7 +351,7 @@ class AMQPListener(base.PollStyleListener):
self.conn.consume(timeout=min(self._current_timeout, left))
except rpc_common.Timeout:
LOG.debug("AMQPListener connection timeout")
- self._current_timeout = max(self._current_timeout * 2,
+ self._current_timeout = min(self._current_timeout * 2,
ACK_REQUEUE_EVERY_SECONDS_MAX)
else:
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
@@ -490,7 +490,7 @@ class ReplyWaiter(object):
# ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds
self.conn.consume(timeout=current_timeout)
except rpc_common.Timeout:
- current_timeout = max(current_timeout * 2,
+ current_timeout = min(current_timeout * 2,
ACK_REQUEUE_EVERY_SECONDS_MAX)
except Exception:
LOG.exception("Failed to process incoming message, retrying..")
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index e035150..f8882f2 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -1080,3 +1080,30 @@ class ConnectionLockTestCase(test_utils.BaseTestCase):
t2 = self._thread(lock, 1)
self.assertAlmostEqual(1, t1(), places=0)
self.assertAlmostEqual(2, t2(), places=0)
+
+
+class TestPollTimeoutLimit(test_utils.BaseTestCase):
+ def test_poll_timeout_limit(self):
+ transport = oslo_messaging.get_transport(self.conf,
+ 'kombu+memory:////')
+ self.addCleanup(transport.cleanup)
+ driver = transport._driver
+ target = oslo_messaging.Target(topic='testtopic')
+ listener = driver.listen(target, None, None)._poll_style_listener
+
+ thread = threading.Thread(target=listener.poll)
+ thread.daemon = True
+ thread.start()
+ time.sleep(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX * 2)
+
+ try:
+ # timeout should not grow past the maximum
+ self.assertEqual(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX,
+ listener._current_timeout)
+
+ finally:
+ # gracefully stop waiting
+ driver.send(target,
+ {},
+ {'tx_id': 'test'})
+ thread.join()