summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/impl_rabbit.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers/impl_rabbit.py')
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py69
1 files changed, 64 insertions, 5 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index ed2642c..c61393c 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -92,7 +92,7 @@ rabbit_opts = [
'executable used does not support OpenSSL FIPS mode, '
'an exception will be raised.'),
cfg.BoolOpt('heartbeat_in_pthread',
- default=True,
+ default=False,
help="Run the health check heartbeat thread "
"through a native python thread by default. If this "
"option is equal to False then the health check "
@@ -100,7 +100,9 @@ rabbit_opts = [
"from the parent process. For "
"example if the parent process has monkey patched the "
"stdlib by using eventlet/greenlet then the heartbeat "
- "will be run through a green thread.",
+ "will be run through a green thread. "
+ "This option should be set to True only for the "
+ "wsgi services.",
),
cfg.FloatOpt('kombu_reconnect_delay',
default=1.0,
@@ -347,11 +349,44 @@ class Consumer(object):
self._declared_on = None
self.exchange = kombu.entity.Exchange(
name=exchange_name,
- type=type,
+ type=self.type,
durable=self.durable,
auto_delete=self.exchange_auto_delete)
self.enable_cancel_on_failover = enable_cancel_on_failover
+ def _declare_fallback(self, err, conn, consumer_arguments):
+ """Fallback by declaring a non durable queue.
+
+ When a control exchange is shared between services it is possible
+ that some service created first a non durable control exchange and
+ then after that an other service can try to create the same control
+ exchange but as a durable control exchange. In this case RabbitMQ
+ will raise an exception (PreconditionFailed), and then it will stop
+ our execution and our service will fail entirly. In this case we want
+ to fallback by creating a non durable queue to match the default
+ config.
+ """
+ if "PRECONDITION_FAILED - inequivalent arg 'durable'" in str(err):
+ LOG.info(
+ "[%s] Retrying to declare the exchange (%s) as "
+ "non durable", conn.connection_id, self.exchange_name)
+ self.exchange = kombu.entity.Exchange(
+ name=self.exchange_name,
+ type=self.type,
+ durable=False,
+ auto_delete=self.queue_auto_delete)
+ self.queue = kombu.entity.Queue(
+ name=self.queue_name,
+ channel=conn.channel,
+ exchange=self.exchange,
+ durable=False,
+ auto_delete=self.queue_auto_delete,
+ routing_key=self.routing_key,
+ queue_arguments=self.queue_arguments,
+ consumer_arguments=consumer_arguments
+ )
+ self.queue.declare()
+
def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect."""
@@ -374,7 +409,18 @@ class Consumer(object):
try:
LOG.debug('[%s] Queue.declare: %s',
conn.connection_id, self.queue_name)
- self.queue.declare()
+ try:
+ self.queue.declare()
+ except amqp_exec.PreconditionFailed as err:
+ # NOTE(hberaud): This kind of exception may be triggered
+ # when a control exchange is shared between services and
+ # when services try to create it with configs that differ
+ # from each others. RabbitMQ will reject the services
+ # that try to create it with a configuration that differ
+ # from the one used first.
+ LOG.warning(err)
+ self._declare_fallback(err, conn, consumer_arguments)
+
except conn.connection.channel_errors as exc:
# NOTE(jrosenboom): This exception may be triggered by a race
# condition. Simply retrying will solve the error most of the time
@@ -1352,7 +1398,20 @@ class Connection(object):
"""Publish a message."""
if not (exchange.passive or exchange.name in self._declared_exchanges):
- exchange(self.channel).declare()
+ try:
+ exchange(self.channel).declare()
+ except amqp_exec.PreconditionFailed as err:
+ # NOTE(hberaud): This kind of exception may be triggered
+ # when a control exchange is shared between services and
+ # when services try to create it with configs that differ
+ # from each others. RabbitMQ will reject the services
+ # that try to create it with a configuration that differ
+ # from the one used first.
+ if "PRECONDITION_FAILED - inequivalent arg 'durable'" \
+ in str(err):
+ LOG.warning("Force creating a non durable exchange.")
+ exchange.durable = False
+ exchange(self.channel).declare()
self._declared_exchanges.add(exchange.name)
log_info = {'msg': msg,