diff options
author | Gabriele <gsantomaggio@suse.com> | 2019-06-27 13:13:47 +0200 |
---|---|---|
committer | Gabriele <gsantomaggio@suse.com> | 2019-07-01 21:38:32 +0200 |
commit | c50076b4efb79cef46d618d6d80eecbcebb72898 (patch) | |
tree | 462f06f480100c69e192feb45a1a6780eea416bd | |
parent | 6cdd4cb00779acd5b5825b5f00635af08f92a5df (diff) | |
download | oslo-messaging-c50076b4efb79cef46d618d6d80eecbcebb72898.tar.gz |
Implement mandatory flag for RabbitMQ driver
With this feature it is possible to use the mandatory RabbitMQ
mandatory flag.
Implements: blueprint transport-options (point 3)
The blueprint link is [1]
Please follow the link [2] to use and test the feature.
1- https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options
2- https://github.com/Gsantomaggio/rabbitmq-utils/
tree/master/openstack/mandatory_test
Change-Id: Ie269fc08ba80c4b94a24a8207c1e86c19c3b3fcb
-rw-r--r-- | lower-constraints.txt | 2 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 12 | ||||
-rw-r--r-- | oslo_messaging/exceptions.py | 13 | ||||
-rw-r--r-- | oslo_messaging/tests/functional/test_functional.py | 34 | ||||
-rw-r--r-- | oslo_messaging/tests/functional/utils.py | 9 | ||||
-rw-r--r-- | requirements.txt | 2 |
6 files changed, 66 insertions, 6 deletions
diff --git a/lower-constraints.txt b/lower-constraints.txt index 3df8f1a..338c12a 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -28,7 +28,7 @@ imagesize==0.7.1 iso8601==0.1.11 Jinja2==2.10 keystoneauth1==3.4.0 -kombu==4.0.0 +kombu==4.6.1 linecache2==1.0.0 MarkupSafe==1.0 mccabe==0.2.1 diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 1726fec..124e7ef 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -756,6 +756,10 @@ class Connection(object): # NOTE(sileht): we must reraise this without # trigger error_callback raise + except exceptions.MessageUndeliverable: + # NOTE(gsantomaggio): we must reraise this without + # trigger error_callback + raise except Exception as exc: error_callback and error_callback(exc) self._set_current_channel(None) @@ -769,6 +773,11 @@ class Connection(object): LOG.error(msg) raise exceptions.MessageDeliveryFailure(msg) + @staticmethod + def on_return(exception, exchange, routing_key, message): + raise exceptions.MessageUndeliverable(exception, exchange, routing_key, + message) + def _set_current_channel(self, new_channel): """Change the channel to use. @@ -787,7 +796,8 @@ class Connection(object): if new_channel is not None: if self.purpose == rpc_common.PURPOSE_LISTEN: self._set_qos(new_channel) - self._producer = kombu.messaging.Producer(new_channel) + self._producer = kombu.messaging.Producer(new_channel, + on_return=self.on_return) for consumer in self._consumers: consumer.declare(self) diff --git a/oslo_messaging/exceptions.py b/oslo_messaging/exceptions.py index cfe6a7e..f6ba20a 100644 --- a/oslo_messaging/exceptions.py +++ b/oslo_messaging/exceptions.py @@ -16,7 +16,7 @@ import six __all__ = ['MessagingException', 'MessagingTimeout', 'MessageDeliveryFailure', - 'InvalidTarget'] + 'InvalidTarget', 'MessageUndeliverable'] class MessagingException(Exception): @@ -38,3 +38,14 @@ class InvalidTarget(MessagingException, ValueError): msg = msg + ":" + six.text_type(target) super(InvalidTarget, self).__init__(msg) self.target = target + + +class MessageUndeliverable(Exception): + """Raised if message is not routed with mandatory flag""" + + def __init__(self, exception, exchange, routing_key, message): + super(MessageUndeliverable, self).__init__() + self.exception = exception + self.exchange = exchange + self.routing_key = routing_key + self.message = message diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index 4fa8b48..6800f59 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -152,6 +152,40 @@ class CallTestCase(utils.SkipIfNoTransportURL): self.assertEqual(10, server.endpoint.ival) + def test_mandatory_call(self): + if not self.url.startswith("rabbit://"): + self.skipTest("backend does not support call monitoring") + + transport = self.useFixture(utils.RPCTransportFixture(self.conf, + self.url)) + target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()), + server='server_' + str(uuid.uuid4())) + + # test for mandatory flag using transport-options, see: + # https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options + # first test with `at_least_once=False` raises a "MessagingTimeout" + # error since there is no control if the queue actually exists. + # (Default behavior) + options = oslo_messaging.TransportOptions(at_least_once=False) + client1 = utils.ClientStub(transport.transport, target, + cast=False, timeout=1, + transport_options=options) + + self.assertRaises(oslo_messaging.MessagingTimeout, + client1.delay) + + # second test with `at_least_once=True` raises a "MessageUndeliverable" + # caused by mandatory flag. + # the MessageUndeliverable error is raised immediately without waiting + # any timeout + options2 = oslo_messaging.TransportOptions(at_least_once=True) + client2 = utils.ClientStub(transport.transport, target, + cast=False, timeout=60, + transport_options=options2) + + self.assertRaises(oslo_messaging.MessageUndeliverable, + client2.delay) + def test_monitor_long_call(self): if not (self.url.startswith("rabbit://") or self.url.startswith("amqp://")): diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 4d403a0..700c162 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -226,10 +226,15 @@ class RpcCast(RpcCall): class ClientStub(object): - def __init__(self, transport, target, cast=False, name=None, **kwargs): + def __init__(self, transport, target, cast=False, name=None, + transport_options=None, **kwargs): self.name = name or "functional-tests" self.cast = cast - self.client = oslo_messaging.RPCClient(transport, target, **kwargs) + self.client = oslo_messaging.RPCClient( + transport=transport, + target=target, + transport_options=transport_options, + **kwargs) def __getattr__(self, name): context = {"application": self.name} diff --git a/requirements.txt b/requirements.txt index 2e05118..7ed7394 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,7 +26,7 @@ PyYAML>=3.12 # MIT # rabbit driver is the default # we set the amqp version to ensure heartbeat works amqp>=2.4.1 # BSD -kombu!=4.0.2,>=4.0.0 # BSD +kombu!=4.0.2,>=4.6.1 # BSD # middleware oslo.middleware>=3.31.0 # Apache-2.0 |