summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordukhlov <dukhlov@mirantis.com>2016-02-25 01:16:12 +0300
committerkbespalov <kbespalov@mirantis.com>2016-08-30 12:22:16 +0300
commit34ea4f6edeaaee93d6a694ed7959bb223bc2240e (patch)
tree2158cfb51d0c00288eab27a9564f1fd57d89dbe8
parentf568fe2003a1d4fcc57bf6f9f6a81caa5ad5b712 (diff)
downloadoslo-messaging-34ea4f6edeaaee93d6a694ed7959bb223bc2240e.tar.gz
Adds exchange declaration on sender's side
When you send message to exchange which is not exist current channel is closed and you need to reconnect. This is undesired. Also this patch separate sending fanout message and call/cast messages and don't raise exception if exchange doesn't exist for fanout messages. Change-Id: Ia556d0c1b219387892007925bb437664aaaccb69 (cherry picked from commit 89cc47e)
-rw-r--r--oslo_messaging/_drivers/impl_pika.py94
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_engine.py37
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_message.py22
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_poller.py26
-rw-r--r--oslo_messaging/tests/drivers/pika/test_message.py8
-rw-r--r--oslo_messaging/tests/drivers/pika/test_poller.py28
-rw-r--r--oslo_messaging/tests/notify/test_notifier.py3
7 files changed, 146 insertions, 72 deletions
diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py
index a8b9fae..faa3e3b 100644
--- a/oslo_messaging/_drivers/impl_pika.py
+++ b/oslo_messaging/_drivers/impl_pika.py
@@ -34,7 +34,7 @@ pika_opts = [
help='Maximum number of channels to allow'),
cfg.IntOpt('frame_max', default=None,
help='The maximum byte size for an AMQP frame'),
- cfg.IntOpt('heartbeat_interval', default=1,
+ cfg.IntOpt('heartbeat_interval', default=3,
help="How often to send heartbeats for consumer's connections"),
cfg.BoolOpt('ssl', default=None,
help='Enable SSL'),
@@ -51,7 +51,7 @@ pika_opts = [
]
pika_pool_opts = [
- cfg.IntOpt('pool_max_size', default=10,
+ cfg.IntOpt('pool_max_size', default=30,
help="Maximum number of connections to keep queued."),
cfg.IntOpt('pool_max_overflow', default=0,
help="Maximum number of connections to create above "
@@ -158,6 +158,23 @@ class PikaDriver(base.BaseDriver):
def require_features(self, requeue=False):
pass
+ def _declare_rpc_exchange(self, exchange, timeout):
+ with (self._pika_engine.connection_without_confirmation_pool
+ .acquire(timeout=timeout)) as conn:
+ try:
+ self._pika_engine.declare_exchange_by_channel(
+ conn.channel,
+ self._pika_engine.get_rpc_exchange_name(
+ exchange
+ ), "direct", False
+ )
+ except pika_pool.Timeout as e:
+ raise exceptions.MessagingTimeout(
+ "Timeout for current operation was expired. {}.".format(
+ str(e)
+ )
+ )
+
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
expiration_time = None if timeout is None else time.time() + timeout
@@ -165,9 +182,23 @@ class PikaDriver(base.BaseDriver):
if retry is None:
retry = self._pika_engine.default_rpc_retry_attempts
+ exchange = self._pika_engine.get_rpc_exchange_name(
+ target.exchange
+ )
+
def on_exception(ex):
- if isinstance(ex, (pika_drv_exc.ConnectionException,
- exceptions.MessageDeliveryFailure)):
+ if isinstance(ex, pika_drv_exc.ExchangeNotFoundException):
+ # it is desired to create exchange because if we sent to
+ # exchange which is not exists, we get ChannelClosed exception
+ # and need to reconnect
+ try:
+ self._declare_rpc_exchange(exchange,
+ expiration_time - time.time())
+ except pika_drv_exc.ConnectionException as e:
+ LOG.warn("Problem during declaring exchange. %", e)
+ return True
+ elif isinstance(ex, (pika_drv_exc.ConnectionException,
+ exceptions.MessageDeliveryFailure)):
LOG.warn("Problem during message sending. %s", ex)
return True
else:
@@ -182,14 +213,35 @@ class PikaDriver(base.BaseDriver):
)
)
+ if target.fanout:
+ return self.cast_all_servers(
+ exchange, target.topic, ctxt, message, expiration_time,
+ retrier
+ )
+
+ routing_key = self._pika_engine.get_rpc_queue_name(
+ target.topic, target.server, retrier is None
+ )
+
msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine, message,
ctxt)
- reply = msg.send(
- target,
- reply_listener=self._reply_listener if wait_for_reply else None,
- expiration_time=expiration_time,
- retrier=retrier
- )
+ try:
+ reply = msg.send(
+ exchange=exchange,
+ routing_key=routing_key,
+ reply_listener=(
+ self._reply_listener if wait_for_reply else None
+ ),
+ expiration_time=expiration_time,
+ retrier=retrier
+ )
+ except pika_drv_exc.ExchangeNotFoundException as ex:
+ try:
+ self._declare_rpc_exchange(exchange,
+ expiration_time - time.time())
+ except pika_drv_exc.ConnectionException as e:
+ LOG.warn("Problem during declaring exchange. %", e)
+ raise ex
if reply is not None:
if reply.failure is not None:
@@ -197,6 +249,28 @@ class PikaDriver(base.BaseDriver):
return reply.result
+ def cast_all_servers(self, exchange, topic, ctxt, message, expiration_time,
+ retrier=None):
+ msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
+ ctxt)
+ try:
+ msg.send(
+ exchange=exchange,
+ routing_key=self._pika_engine.get_rpc_queue_name(
+ topic, "all_servers", retrier is None
+ ),
+ mandatory=False,
+ expiration_time=expiration_time,
+ retrier=retrier
+ )
+ except pika_drv_exc.ExchangeNotFoundException:
+ try:
+ self._declare_rpc_exchange(
+ exchange, expiration_time - time.time()
+ )
+ except pika_drv_exc.ConnectionException as e:
+ LOG.warn("Problem during declaring exchange. %", e)
+
def _declare_notification_queue_binding(self, target, timeout=None):
if timeout is not None and timeout < 0:
raise exceptions.MessagingTimeout(
diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py
index 7041771..a8c9a71 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_engine.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py
@@ -272,7 +272,7 @@ class PikaEngine(object):
except pika_pool.Connection.connectivity_errors as e:
LOG.warn("Can't establish connection to host. %s", e)
except pika_drv_exc.HostConnectionNotAllowedException as e:
- LOG.warn("Connection to host is not Allowed. %s", e)
+ LOG.warn("Connection to host is not allowed. %s", e)
connection_attempts -= 1
pika_next_connection_num += 1
@@ -358,6 +358,28 @@ class PikaEngine(object):
] = cur_time
@staticmethod
+ def declare_exchange_by_channel(channel, exchange, exchange_type, durable):
+ """Declare exchange using already created channel, if they don't exist
+
+ :param channel: Channel for communication with RabbitMQ
+ :param exchange: String, RabbitMQ exchange name
+ :param exchange_type: String ('direct', 'topic' or 'fanout')
+ exchange type for exchange to be declared
+ :param durable: Boolean, creates durable exchange if true
+ """
+ try:
+ channel.exchange_declare(
+ exchange, exchange_type, auto_delete=True, durable=durable
+ )
+ except pika_pool.Connection.connectivity_errors as e:
+ raise pika_drv_exc.ConnectionException(
+ "Connectivity problem detected during declaring exchange: "
+ "exchange:{}, exchange_type: {}, durable: {}. {}".format(
+ exchange, exchange_type, durable, str(e)
+ )
+ )
+
+ @staticmethod
def declare_queue_binding_by_channel(channel, exchange, queue, routing_key,
exchange_type, queue_expiration,
durable):
@@ -397,23 +419,14 @@ class PikaEngine(object):
)
)
- def get_rpc_exchange_name(self, exchange, topic, fanout, no_ack):
+ def get_rpc_exchange_name(self, exchange):
"""Returns RabbitMQ exchange name for given rpc request
:param exchange: String, oslo.messaging target's exchange
- :param topic: String, oslo.messaging target's topic
- :param fanout: Boolean, oslo.messaging target's fanout mode
- :param no_ack: Boolean, use message delivery with acknowledges or not
:return: String, RabbitMQ exchange name
"""
- exchange = (exchange or self.default_rpc_exchange)
-
- if fanout:
- exchange = '{}_fanout_{}_{}'.format(
- exchange, "no_ack" if no_ack else "with_ack", topic
- )
- return exchange
+ return exchange or self.default_rpc_exchange
@staticmethod
def get_rpc_queue_name(topic, server, no_ack):
diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py
index dab422e..4726c48 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_message.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_message.py
@@ -495,11 +495,12 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
self.msg_id = None
self.reply_q = None
- def send(self, target, reply_listener=None, expiration_time=None,
- retrier=None):
+ def send(self, exchange, routing_key, reply_listener=None,
+ expiration_time=None, retrier=None):
"""Send RPC message with configured retrying
- :param target: Target, oslo.messaging target which defines RPC service
+ :param exchange: String, RabbitMQ exchange name for message sending
+ :param routing_key: String, RabbitMQ routing key for message routing
:param reply_listener: RpcReplyPikaListener, listener for waiting
reply. If None - return immediately without reply waiting
:param expiration_time: Float, expiration time in seconds
@@ -507,15 +508,6 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
:param retrier: retrying.Retrier, configured retrier object for sending
message, if None no retrying is performed
"""
-
- exchange = self._pika_engine.get_rpc_exchange_name(
- target.exchange, target.topic, target.fanout, retrier is None
- )
-
- queue = "" if target.fanout else self._pika_engine.get_rpc_queue_name(
- target.topic, target.server, retrier is None
- )
-
msg_dict, msg_props = self._prepare_message_to_send()
if reply_listener:
@@ -531,7 +523,7 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
future = reply_listener.register_reply_waiter(msg_id=self.msg_id)
self._do_send(
- exchange=exchange, routing_key=queue, msg_dict=msg_dict,
+ exchange=exchange, routing_key=routing_key, msg_dict=msg_dict,
msg_props=msg_props, confirm=True, mandatory=True,
persistent=False, expiration_time=expiration_time,
retrier=retrier
@@ -546,8 +538,8 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
raise e
else:
self._do_send(
- exchange=exchange, routing_key=queue, msg_dict=msg_dict,
- msg_props=msg_props, confirm=True, mandatory=not target.fanout,
+ exchange=exchange, routing_key=routing_key, msg_dict=msg_dict,
+ msg_props=msg_props, confirm=True, mandatory=True,
persistent=False, expiration_time=expiration_time,
retrier=retrier
)
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py
index ce2b1c3..cc2d6e9 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_poller.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py
@@ -295,13 +295,13 @@ class RpcServicePikaPoller(PikaPoller):
"""
queue_expiration = self._pika_engine.rpc_queue_expiration
+ exchange = self._pika_engine.get_rpc_exchange_name(
+ self._target.exchange
+ )
+
queues_to_consume = []
for no_ack in [True, False]:
- exchange = self._pika_engine.get_rpc_exchange_name(
- self._target.exchange, self._target.topic, False, no_ack
- )
-
queue = self._pika_engine.get_rpc_queue_name(
self._target.topic, None, no_ack
)
@@ -323,21 +323,19 @@ class RpcServicePikaPoller(PikaPoller):
queue=server_queue, routing_key=server_queue,
exchange_type='direct', queue_expiration=queue_expiration
)
+ all_servers_routing_key = self._pika_engine.get_rpc_queue_name(
+ self._target.topic, "all_servers", no_ack
+ )
+ self._pika_engine.declare_queue_binding_by_channel(
+ channel=self._channel, exchange=exchange, durable=False,
+ queue=server_queue, routing_key=all_servers_routing_key,
+ exchange_type='direct', queue_expiration=queue_expiration
+ )
queues_to_consume.append(
{"queue_name": server_queue, "no_ack": no_ack,
"consumer_tag": None}
)
- fanout_exchange = self._pika_engine.get_rpc_exchange_name(
- self._target.exchange, self._target.topic, True, no_ack
- )
-
- self._pika_engine.declare_queue_binding_by_channel(
- channel=self._channel, exchange=fanout_exchange,
- queue=server_queue, routing_key="", exchange_type='fanout',
- queue_expiration=queue_expiration, durable=False
- )
-
return queues_to_consume
diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py
index cf1b1d8..0cc1b86 100644
--- a/oslo_messaging/tests/drivers/pika/test_message.py
+++ b/oslo_messaging/tests/drivers/pika/test_message.py
@@ -434,8 +434,8 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
expiration_time = time.time() + expiration
message.send(
- target=oslo_messaging.Target(exchange=self._exchange,
- topic=self._routing_key),
+ exchange=self._exchange,
+ routing_key=self._routing_key,
reply_listener=None,
expiration_time=expiration_time,
retrier=None
@@ -490,8 +490,8 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
reply_listener.get_reply_qname.return_value = reply_queue_name
res = message.send(
- target=oslo_messaging.Target(exchange=self._exchange,
- topic=self._routing_key),
+ exchange=self._exchange,
+ routing_key=self._routing_key,
reply_listener=reply_listener,
expiration_time=expiration_time,
retrier=None
diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py
index abb0804..17ba3b7 100644
--- a/oslo_messaging/tests/drivers/pika/test_poller.py
+++ b/oslo_messaging/tests/drivers/pika/test_poller.py
@@ -234,9 +234,7 @@ class RpcServicePikaPollerTestCase(unittest.TestCase):
)
self._pika_engine.get_rpc_exchange_name.side_effect = (
- lambda exchange, topic, fanout, no_ack: "_".join(
- [exchange, topic, str(fanout), str(no_ack)]
- )
+ lambda exchange: exchange
)
self._prefetch_count = 123
@@ -277,7 +275,7 @@ class RpcServicePikaPollerTestCase(unittest.TestCase):
declare_queue_binding_by_channel_mock.assert_has_calls((
mock.call(
channel=self._poller_channel_mock, durable=False,
- exchange="exchange_topic_False_True",
+ exchange="exchange",
exchange_type='direct',
queue="topic_None_True",
queue_expiration=12345,
@@ -285,7 +283,7 @@ class RpcServicePikaPollerTestCase(unittest.TestCase):
),
mock.call(
channel=self._poller_channel_mock, durable=False,
- exchange="exchange_topic_False_True",
+ exchange="exchange",
exchange_type='direct',
queue="topic_server_True",
queue_expiration=12345,
@@ -293,15 +291,15 @@ class RpcServicePikaPollerTestCase(unittest.TestCase):
),
mock.call(
channel=self._poller_channel_mock, durable=False,
- exchange="exchange_topic_True_True",
- exchange_type='fanout',
+ exchange="exchange",
+ exchange_type='direct',
queue="topic_server_True",
queue_expiration=12345,
- routing_key=''
+ routing_key="topic_all_servers_True"
),
mock.call(
channel=self._poller_channel_mock, durable=False,
- exchange="exchange_topic_False_False",
+ exchange="exchange",
exchange_type='direct',
queue="topic_None_False",
queue_expiration=12345,
@@ -309,20 +307,20 @@ class RpcServicePikaPollerTestCase(unittest.TestCase):
),
mock.call(
channel=self._poller_channel_mock, durable=False,
- exchange="exchange_topic_False_False",
+ exchange="exchange",
exchange_type='direct',
queue="topic_server_False",
queue_expiration=12345,
- routing_key="topic_server_False"
+ routing_key='topic_server_False'
),
mock.call(
channel=self._poller_channel_mock, durable=False,
- exchange="exchange_topic_True_False",
- exchange_type='fanout',
+ exchange="exchange",
+ exchange_type='direct',
queue="topic_server_False",
queue_expiration=12345,
- routing_key=''
- ),
+ routing_key='topic_all_servers_False'
+ )
))
diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py
index bf56288..92e8f64 100644
--- a/oslo_messaging/tests/notify/test_notifier.py
+++ b/oslo_messaging/tests/notify/test_notifier.py
@@ -351,8 +351,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
logger = mock.MagicMock()
logger.info = mock.MagicMock()
message = {'password': 'passw0rd', 'event_type': 'foo'}
- json_str = jsonutils.dumps(message)
- mask_str = strutils.mask_password(json_str)
+ mask_str = jsonutils.dumps(strutils.mask_dict_password(message))
with mock.patch.object(logging, 'getLogger') as gl:
gl.return_value = logger