summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabriele <gsantomaggio@suse.com>2019-07-22 17:27:21 +0200
committerGabriele <gsantomaggio@suse.com>2019-08-21 14:46:59 +0200
commitb7e9faf6590086b79f9696183b5c296ecc12b7b6 (patch)
treea3ce04c8d50c4de1595d7d741934678cf5bbe0ea
parentb56380654a4b695a8a27827621b185ee3991ce92 (diff)
downloadoslo-messaging-10.2.0.tar.gz
Add the mandatory flag for direct send10.2.0
With this feature, the server will raise and log a Message Undeliverable exception. So it is possible to log immediately an error in case the reply queue does not exist for some reason. This is part of blueprint transport-options The blueprint link is [1] Please follow the link [2] to use and test the feature. 1- https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options 2- https://github.com/Gsantomaggio/rabbitmq-utils/ tree/master/openstack/mandatory_test Change-Id: Iac7474c06ef425a2afe5bcd912e51510ba1c8fb3
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py13
-rw-r--r--oslo_messaging/rpc/server.py19
2 files changed, 25 insertions, 7 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index ca76aaa..1f25da6 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -168,6 +168,12 @@ rabbit_opts = [
default=2,
help='How often times during the heartbeat_timeout_threshold '
'we check the heartbeat.'),
+ cfg.IntOpt('direct_mandatory_flag',
+ default=True,
+ help='Enable/Disable the RabbitMQ mandatory flag '
+ 'for direct send. The direct send is used as reply,'
+ 'so the MessageUndeliverable exception is raised'
+ ' in case the client queue does not exist.'),
]
LOG = logging.getLogger(__name__)
@@ -492,6 +498,7 @@ class Connection(object):
# if it was already monkey patched by eventlet/greenlet.
global threading
threading = stdlib_threading
+ self.direct_mandatory_flag = driver_conf.direct_mandatory_flag
if self.ssl:
self.ssl_version = driver_conf.ssl_version
@@ -1291,9 +1298,11 @@ class Connection(object):
durable=False,
auto_delete=True,
passive=True)
-
+ options = oslo_messaging.TransportOptions(
+ at_least_once=self.direct_mandatory_flag)
self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
- exchange, msg, routing_key=msg_id)
+ exchange, msg, routing_key=msg_id,
+ transport_options=options)
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None,
transport_options=None):
diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py
index 9bb6f63..b16d77f 100644
--- a/oslo_messaging/rpc/server.py
+++ b/oslo_messaging/rpc/server.py
@@ -1,4 +1,3 @@
-
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -124,6 +123,7 @@ A simple example of an RPC server with multiple endpoints might be::
import logging
import sys
+from oslo_messaging import exceptions
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
from oslo_messaging import server as msg_server
from oslo_messaging import transport as msg_transport
@@ -178,13 +178,19 @@ class RPCServer(msg_server.MessageHandlingServer):
message.reply(res)
else:
message.reply(failure=failure)
+ except exceptions.MessageUndeliverable as e:
+ LOG.exception(
+ "MessageUndeliverable error, "
+ "source exception: %s, routing_key: %s, exchange: %s: ",
+ e.exception, e.routing_key, e.exchange
+ )
except Exception:
LOG.exception("Can not send reply for message")
finally:
- # NOTE(dhellmann): Remove circular object reference
- # between the current stack frame and the traceback in
- # exc_info.
- del failure
+ # NOTE(dhellmann): Remove circular object reference
+ # between the current stack frame and the traceback in
+ # exc_info.
+ del failure
def get_rpc_server(transport, target, endpoints,
@@ -222,6 +228,7 @@ def expected_exceptions(*exceptions):
ExpectedException, which is used internally by the RPC sever. The RPC
client will see the original exception type.
"""
+
def outer(func):
def inner(*args, **kwargs):
try:
@@ -234,7 +241,9 @@ def expected_exceptions(*exceptions):
# ignored and thrown as normal.
except exceptions:
raise rpc_dispatcher.ExpectedException()
+
return inner
+
return outer