summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNikita Kalyanov <nikitakalyanov@gmail.com>2021-07-13 00:03:31 +0300
committerNikita Kalyanov <nikitakalyanov@gmail.com>2021-07-13 00:15:38 +0300
commitbdcf915e788bb368774e5462ccc15e6f5b7223d7 (patch)
tree5d968023912ba8cb130b15f64b4729c1e14101e1
parent9ab3f4f30d4bf0be7dab5f0ca0f0a47177681fdf (diff)
downloadoslo-messaging-bdcf915e788bb368774e5462ccc15e6f5b7223d7.tar.gz
limit maximum timeout in the poll loop
We should properly limit the maximum timeout with a 'min' to avoid long delays before message processing. Such delays may happen if the connection to a RabbitMQ server is re-established at the same time when the message arrives (see attached bug for more info). Moreover, this change is in line with the original intent to actually have an upper limit on maximum possible timeout (see comments in code and in the original review). Closes-Bug: #1935864 Change-Id: Iebc8a96e868d938a5d250bf9d66d20746c63d3d5
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py4
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py27
2 files changed, 29 insertions, 2 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index cdc21c5..589baf5 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -351,7 +351,7 @@ class AMQPListener(base.PollStyleListener):
self.conn.consume(timeout=min(self._current_timeout, left))
except rpc_common.Timeout:
LOG.debug("AMQPListener connection timeout")
- self._current_timeout = max(self._current_timeout * 2,
+ self._current_timeout = min(self._current_timeout * 2,
ACK_REQUEUE_EVERY_SECONDS_MAX)
else:
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
@@ -490,7 +490,7 @@ class ReplyWaiter(object):
# ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds
self.conn.consume(timeout=current_timeout)
except rpc_common.Timeout:
- current_timeout = max(current_timeout * 2,
+ current_timeout = min(current_timeout * 2,
ACK_REQUEUE_EVERY_SECONDS_MAX)
except Exception:
LOG.exception("Failed to process incoming message, retrying..")
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index e035150..f8882f2 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -1080,3 +1080,30 @@ class ConnectionLockTestCase(test_utils.BaseTestCase):
t2 = self._thread(lock, 1)
self.assertAlmostEqual(1, t1(), places=0)
self.assertAlmostEqual(2, t2(), places=0)
+
+
+class TestPollTimeoutLimit(test_utils.BaseTestCase):
+ def test_poll_timeout_limit(self):
+ transport = oslo_messaging.get_transport(self.conf,
+ 'kombu+memory:////')
+ self.addCleanup(transport.cleanup)
+ driver = transport._driver
+ target = oslo_messaging.Target(topic='testtopic')
+ listener = driver.listen(target, None, None)._poll_style_listener
+
+ thread = threading.Thread(target=listener.poll)
+ thread.daemon = True
+ thread.start()
+ time.sleep(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX * 2)
+
+ try:
+ # timeout should not grow past the maximum
+ self.assertEqual(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX,
+ listener._current_timeout)
+
+ finally:
+ # gracefully stop waiting
+ driver.send(target,
+ {},
+ {'tx_id': 'test'})
+ thread.join()