summaryrefslogtreecommitdiff
path: root/oslo/messaging/_drivers/impl_rabbit.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo/messaging/_drivers/impl_rabbit.py')
-rw-r--r--oslo/messaging/_drivers/impl_rabbit.py30
1 files changed, 28 insertions, 2 deletions
diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py
index 0114951..0182800 100644
--- a/oslo/messaging/_drivers/impl_rabbit.py
+++ b/oslo/messaging/_drivers/impl_rabbit.py
@@ -354,7 +354,8 @@ class DirectPublisher(Publisher):
options = {'durable': False,
'auto_delete': True,
- 'exclusive': False}
+ 'exclusive': False,
+ 'passive': True}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options)
@@ -370,6 +371,7 @@ class TopicPublisher(Publisher):
options = {'durable': conf.amqp_durable_queues,
'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
+
options.update(kwargs)
exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(channel,
@@ -760,7 +762,31 @@ class Connection(object):
def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""
- self.publisher_send(DirectPublisher, msg_id, msg)
+
+ timer = rpc_common.DecayingTimer(duration=60)
+ timer.start()
+ # NOTE(sileht): retry at least 60sec, after we have a good change
+ # that the caller is really dead too...
+
+ while True:
+ try:
+ self.publisher_send(DirectPublisher, msg_id, msg)
+ except self.connection.channel_errors as exc:
+ # NOTE(noelbk/sileht):
+ # If rabbit dies, the consumer can be disconnected before the
+ # publisher sends, and if the consumer hasn't declared the
+ # queue, the publisher's will send a message to an exchange
+ # that's not bound to a queue, and the message wll be lost.
+ # So we set passive=True to the publisher exchange and catch
+ # the 404 kombu ChannelError and retry until the exchange
+ # appears
+ if exc.code == 404 and timer.check_return() > 0:
+ LOG.info(_("The exchange to reply to %s doesn't "
+ "exist yet, retrying...") % msg_id)
+ time.sleep(1)
+ continue
+ raise
+ return
def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message."""