diff options
author | Zuul <zuul@review.opendev.org> | 2019-07-11 16:45:55 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2019-07-11 16:45:55 +0000 |
commit | f7eb82a1e478029ca89c95854142b35789c771c6 (patch) | |
tree | 58126dfb7baff2e41084f0110d8ffe9a3bafd6d1 | |
parent | 1c31abc7bc7b1b7dacd8c482857ab8829ed05eaf (diff) | |
parent | c50076b4efb79cef46d618d6d80eecbcebb72898 (diff) | |
download | oslo-messaging-f7eb82a1e478029ca89c95854142b35789c771c6.tar.gz |
Merge "Implement mandatory flag for RabbitMQ driver"9.8.0
-rw-r--r-- | lower-constraints.txt | 2 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 12 | ||||
-rw-r--r-- | oslo_messaging/exceptions.py | 13 | ||||
-rw-r--r-- | oslo_messaging/tests/functional/test_functional.py | 34 | ||||
-rw-r--r-- | oslo_messaging/tests/functional/utils.py | 9 | ||||
-rw-r--r-- | requirements.txt | 2 |
6 files changed, 66 insertions, 6 deletions
diff --git a/lower-constraints.txt b/lower-constraints.txt index 3df8f1a..338c12a 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -28,7 +28,7 @@ imagesize==0.7.1 iso8601==0.1.11 Jinja2==2.10 keystoneauth1==3.4.0 -kombu==4.0.0 +kombu==4.6.1 linecache2==1.0.0 MarkupSafe==1.0 mccabe==0.2.1 diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 1726fec..124e7ef 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -756,6 +756,10 @@ class Connection(object): # NOTE(sileht): we must reraise this without # trigger error_callback raise + except exceptions.MessageUndeliverable: + # NOTE(gsantomaggio): we must reraise this without + # trigger error_callback + raise except Exception as exc: error_callback and error_callback(exc) self._set_current_channel(None) @@ -769,6 +773,11 @@ class Connection(object): LOG.error(msg) raise exceptions.MessageDeliveryFailure(msg) + @staticmethod + def on_return(exception, exchange, routing_key, message): + raise exceptions.MessageUndeliverable(exception, exchange, routing_key, + message) + def _set_current_channel(self, new_channel): """Change the channel to use. @@ -787,7 +796,8 @@ class Connection(object): if new_channel is not None: if self.purpose == rpc_common.PURPOSE_LISTEN: self._set_qos(new_channel) - self._producer = kombu.messaging.Producer(new_channel) + self._producer = kombu.messaging.Producer(new_channel, + on_return=self.on_return) for consumer in self._consumers: consumer.declare(self) diff --git a/oslo_messaging/exceptions.py b/oslo_messaging/exceptions.py index cfe6a7e..f6ba20a 100644 --- a/oslo_messaging/exceptions.py +++ b/oslo_messaging/exceptions.py @@ -16,7 +16,7 @@ import six __all__ = ['MessagingException', 'MessagingTimeout', 'MessageDeliveryFailure', - 'InvalidTarget'] + 'InvalidTarget', 'MessageUndeliverable'] class MessagingException(Exception): @@ -38,3 +38,14 @@ class InvalidTarget(MessagingException, ValueError): msg = msg + ":" + six.text_type(target) super(InvalidTarget, self).__init__(msg) self.target = target + + +class MessageUndeliverable(Exception): + """Raised if message is not routed with mandatory flag""" + + def __init__(self, exception, exchange, routing_key, message): + super(MessageUndeliverable, self).__init__() + self.exception = exception + self.exchange = exchange + self.routing_key = routing_key + self.message = message diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index 4fa8b48..6800f59 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -152,6 +152,40 @@ class CallTestCase(utils.SkipIfNoTransportURL): self.assertEqual(10, server.endpoint.ival) + def test_mandatory_call(self): + if not self.url.startswith("rabbit://"): + self.skipTest("backend does not support call monitoring") + + transport = self.useFixture(utils.RPCTransportFixture(self.conf, + self.url)) + target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()), + server='server_' + str(uuid.uuid4())) + + # test for mandatory flag using transport-options, see: + # https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options + # first test with `at_least_once=False` raises a "MessagingTimeout" + # error since there is no control if the queue actually exists. + # (Default behavior) + options = oslo_messaging.TransportOptions(at_least_once=False) + client1 = utils.ClientStub(transport.transport, target, + cast=False, timeout=1, + transport_options=options) + + self.assertRaises(oslo_messaging.MessagingTimeout, + client1.delay) + + # second test with `at_least_once=True` raises a "MessageUndeliverable" + # caused by mandatory flag. + # the MessageUndeliverable error is raised immediately without waiting + # any timeout + options2 = oslo_messaging.TransportOptions(at_least_once=True) + client2 = utils.ClientStub(transport.transport, target, + cast=False, timeout=60, + transport_options=options2) + + self.assertRaises(oslo_messaging.MessageUndeliverable, + client2.delay) + def test_monitor_long_call(self): if not (self.url.startswith("rabbit://") or self.url.startswith("amqp://")): diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 4d403a0..700c162 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -226,10 +226,15 @@ class RpcCast(RpcCall): class ClientStub(object): - def __init__(self, transport, target, cast=False, name=None, **kwargs): + def __init__(self, transport, target, cast=False, name=None, + transport_options=None, **kwargs): self.name = name or "functional-tests" self.cast = cast - self.client = oslo_messaging.RPCClient(transport, target, **kwargs) + self.client = oslo_messaging.RPCClient( + transport=transport, + target=target, + transport_options=transport_options, + **kwargs) def __getattr__(self, name): context = {"application": self.name} diff --git a/requirements.txt b/requirements.txt index 2e05118..7ed7394 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,7 +26,7 @@ PyYAML>=3.12 # MIT # rabbit driver is the default # we set the amqp version to ensure heartbeat works amqp>=2.4.1 # BSD -kombu!=4.0.2,>=4.0.0 # BSD +kombu!=4.0.2,>=4.6.1 # BSD # middleware oslo.middleware>=3.31.0 # Apache-2.0 |