summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-01-12 15:29:41 +0000
committerGerrit Code Review <review@openstack.org>2022-01-12 15:29:41 +0000
commit5d165cc713a98dbd650e9e6295d7966ce2919935 (patch)
tree76dd1a861889d047608ffee7ca19ac918c6c8977
parent5eeccdd425a7ddbeffe2ee9a97458da65bb1d82c (diff)
parent7b3968d9b012e873a9b393fcefa578c46fca18c6 (diff)
downloadoslo-messaging-5d165cc713a98dbd650e9e6295d7966ce2919935.tar.gz
Merge "[rabbit] use retry parameters during notification sending"12.12.0
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py7
-rw-r--r--oslo_messaging/_drivers/common.py4
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py11
-rw-r--r--oslo_messaging/_drivers/pool.py10
-rw-r--r--oslo_messaging/notify/messaging.py23
-rw-r--r--oslo_messaging/tests/drivers/test_pool.py4
-rw-r--r--oslo_messaging/tests/notify/test_notifier.py19
-rw-r--r--releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml8
8 files changed, 60 insertions, 26 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 24fdbc7..991bf46 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -601,9 +601,10 @@ class AMQPDriverBase(base.BaseDriver):
def _get_exchange(self, target):
return target.exchange or self._default_exchange
- def _get_connection(self, purpose=rpc_common.PURPOSE_SEND):
+ def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None):
return rpc_common.ConnectionContext(self._connection_pool,
- purpose=purpose)
+ purpose=purpose,
+ retry=retry)
def _get_reply_q(self):
with self._reply_q_lock:
@@ -649,7 +650,7 @@ class AMQPDriverBase(base.BaseDriver):
log_msg = "CAST unique_id: %s " % unique_id
try:
- with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
+ with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn:
if notify:
exchange = self._get_exchange(target)
LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"
diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py
index 54c6f7f..b6c3adb 100644
--- a/oslo_messaging/_drivers/common.py
+++ b/oslo_messaging/_drivers/common.py
@@ -392,7 +392,7 @@ class ConnectionContext(Connection):
If possible the function makes sure to return a connection to the pool.
"""
- def __init__(self, connection_pool, purpose):
+ def __init__(self, connection_pool, purpose, retry):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
@@ -420,7 +420,7 @@ class ConnectionContext(Connection):
pooled = purpose == PURPOSE_SEND
if pooled:
- self.connection = connection_pool.get()
+ self.connection = connection_pool.get(retry=retry)
else:
self.connection = connection_pool.create(purpose)
self.pooled = pooled
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 20e30ad..3ba5418 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -452,13 +452,14 @@ class ConnectionLock(DummyConnectionLock):
class Connection(object):
"""Connection object."""
- def __init__(self, conf, url, purpose):
+ def __init__(self, conf, url, purpose, retry=None):
# NOTE(viktors): Parse config options
driver_conf = conf.oslo_messaging_rabbit
self.interval_start = driver_conf.rabbit_retry_interval
self.interval_stepping = driver_conf.rabbit_retry_backoff
self.interval_max = driver_conf.rabbit_interval_max
+ self.max_retries = retry
self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
@@ -728,7 +729,13 @@ class Connection(object):
str(exc), interval)
self._set_current_channel(None)
- self.connection.ensure_connection(errback=on_error)
+ self.connection.ensure_connection(
+ errback=on_error,
+ max_retries=self.max_retries,
+ interval_start=self.interval_start or 1,
+ interval_step=self.interval_stepping,
+ interval_max=self.interval_max,
+ )
self._set_current_channel(self.connection.channel())
self.set_transport_socket_timeout()
diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py
index 8090e8d..9e5288d 100644
--- a/oslo_messaging/_drivers/pool.py
+++ b/oslo_messaging/_drivers/pool.py
@@ -69,7 +69,7 @@ class Pool(object, metaclass=abc.ABCMeta):
self._items.append((ttl_watch, item))
self._cond.notify()
- def get(self):
+ def get(self, retry=None):
"""Return an item from the pool, when one is available.
This may cause the calling thread to block.
@@ -95,7 +95,7 @@ class Pool(object, metaclass=abc.ABCMeta):
# We've grabbed a slot and dropped the lock, now do the creation
try:
- return self.create()
+ return self.create(retry=retry)
except Exception:
with self._cond:
self._current_size -= 1
@@ -111,7 +111,7 @@ class Pool(object, metaclass=abc.ABCMeta):
return
@abc.abstractmethod
- def create(self):
+ def create(self, retry=None):
"""Construct a new item."""
@@ -130,9 +130,9 @@ class ConnectionPool(Pool):
LOG.debug("Idle connection has expired and been closed."
" Pool size: %d" % len(self._items))
- def create(self, purpose=common.PURPOSE_SEND):
+ def create(self, purpose=common.PURPOSE_SEND, retry=None):
LOG.debug('Pool creating new connection')
- return self.connection_cls(self.conf, self.url, purpose)
+ return self.connection_cls(self.conf, self.url, purpose, retry=retry)
def empty(self):
for item in self.iter_free():
diff --git a/oslo_messaging/notify/messaging.py b/oslo_messaging/notify/messaging.py
index 61c7357..da633d8 100644
--- a/oslo_messaging/notify/messaging.py
+++ b/oslo_messaging/notify/messaging.py
@@ -21,19 +21,30 @@ Notification drivers for sending notifications via messaging.
The messaging drivers publish notification messages to notification
listeners.
-The driver will block the notifier's thread until the notification message has
-been passed to the messaging transport. There is no guarantee that the
-notification message will be consumed by a notification listener.
+In case of the rabbit backend the driver will block the notifier's thread
+until the notification message has been passed to the messaging transport.
+There is no guarantee that the notification message will be consumed by a
+notification listener.
+
+In case of the kafka backend the driver will not block the notifier's thread
+but return immediately. The driver will try to deliver the message in the
+background.
Notification messages are sent 'at-most-once' - ensuring that they are not
duplicated.
If the connection to the messaging service is not active when a notification is
-sent this driver will block waiting for the connection to complete. If the
-connection fails to complete, the driver will try to re-establish that
+sent the rabbit backend will block waiting for the connection to complete.
+If the connection fails to complete, the driver will try to re-establish that
connection. By default this will continue indefinitely until the connection
completes. However, the retry parameter can be used to have the notification
-send fail with a MessageDeliveryFailure after the given number of retries.
+send fail. In this case an error is logged and the notifier's thread is resumed
+without any error.
+
+If the connection to the messaging service is not active when a notification is
+sent the kafka backend will return immediately and the backend tries to
+establish the connection and deliver the messages in the background.
+
"""
import logging
diff --git a/oslo_messaging/tests/drivers/test_pool.py b/oslo_messaging/tests/drivers/test_pool.py
index d5c6420..82a10e1 100644
--- a/oslo_messaging/tests/drivers/test_pool.py
+++ b/oslo_messaging/tests/drivers/test_pool.py
@@ -44,7 +44,7 @@ class PoolTestCase(test_utils.BaseTestCase):
class TestPool(pool.Pool):
- def create(self):
+ def create(self, retry=None):
return uuid.uuid4()
class ThreadWaitWaiter(object):
@@ -82,7 +82,7 @@ class PoolTestCase(test_utils.BaseTestCase):
p = self.TestPool(**kwargs)
if self.create_error:
- def create_error():
+ def create_error(retry=None):
raise RuntimeError
orig_create = p.create
self.useFixture(fixtures.MockPatchObject(
diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py
index d0a8eca..330bdab 100644
--- a/oslo_messaging/tests/notify/test_notifier.py
+++ b/oslo_messaging/tests/notify/test_notifier.py
@@ -244,6 +244,10 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
topics=["test-retry"],
retry=2,
group="oslo_messaging_notifications")
+ self.config(
+ # just to speed up the test execution
+ rabbit_retry_backoff=0,
+ group="oslo_messaging_rabbit")
transport = oslo_messaging.get_notification_transport(
self.conf, url='rabbit://')
notifier = oslo_messaging.Notifier(transport)
@@ -264,12 +268,15 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
'kombu.connection.Connection._establish_connection',
new=wrapped_establish_connection
):
- # FIXME(gibi) This is bug 1917645 as the driver does not stop
- # retrying the connection after two retries only our test fixture
- # stops the retry by raising TestingException
- self.assertRaises(
- self.TestingException,
- notifier.info, {}, "test", {})
+ with mock.patch(
+ 'oslo_messaging.notify.messaging.LOG.exception'
+ ) as mock_log:
+ notifier.info({}, "test", {})
+
+ # one normal call plus two retries
+ self.assertEqual(3, len(calls))
+ # the error was caught and logged
+ mock_log.assert_called_once()
def test_notifier_retry_connection_fails_kafka(self):
"""This test sets a small retry number for notification sending and
diff --git a/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml
new file mode 100644
index 0000000..d3d62cb
--- /dev/null
+++ b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml
@@ -0,0 +1,8 @@
+---
+fixes:
+ - |
+ As a fix for `bug 1917645 <https://launchpad.net/bugs/1917645>`_ the rabbit
+ backend is changed to use the ``[oslo_messaging_notifications]retry``
+ parameter when driver tries to connect to the message bus during
+ notification sending. Before this fix the rabbit backend retried the
+ connection forever blocking the caller thread.