diff options
author | shenjiatong <yshxxsjt715@gmail.com> | 2020-07-03 15:51:21 +0800 |
---|---|---|
committer | ushen <yshxxsjt715@gmail.com> | 2020-07-31 06:05:16 +0800 |
commit | 196fa877a90d7eb0f82ec9e1c194eef3f98fc0b1 (patch) | |
tree | 5a9a9762146c0d13cccd1bab9bf112698a88f551 /oslo_messaging/_drivers/impl_rabbit.py | |
parent | 7e406c312a6514e7ae377edb52b9e02b5bf37a7d (diff) | |
download | oslo-messaging-196fa877a90d7eb0f82ec9e1c194eef3f98fc0b1.tar.gz |
Cancel consumer if queue down
Previously, we have switched to use default exchanges
to avoid excessive amounts of exchange not found messages.
But it does not actually solve the problem because
reply_* queue is already gone and agent will not receive callbacks.
after some debugging, I found under some circumstances
seems rabbitmq consumer does not receive basic cancel
signal when queue is already gone. This might due to
rabbitmq try to restart consumer when queue is down
(for example when split brain). In such cases,
it might be better to fail early.
by reading the code, seems like x-cancel-on-ha-failover
is not dedicated to mirror queues only, https://github.com/rabbitmq/rabbitmq-server/blob/master/src/rabbit_channel.erl#L1894,
https://github.com/rabbitmq/rabbitmq-server/blob/master/src/rabbit_channel.erl#L1926.
By failing early, in my own test setup,
I could solve a certain case of exchange not found problem.
Change-Id: I2ae53340783e4044dab58035bc0992dc08145b53
Related-bug: #1789177
Diffstat (limited to 'oslo_messaging/_drivers/impl_rabbit.py')
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 83 |
1 files changed, 52 insertions, 31 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index c5c3970..679300b 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -172,6 +172,11 @@ rabbit_opts = [ 'for direct send. The direct send is used as reply, ' 'so the MessageUndeliverable exception is raised ' 'in case the client queue does not exist.'), + cfg.BoolOpt('enable_cancel_on_failover', + default=False, + help="Enable x-cancel-on-ha-failover flag so that " + "rabbitmq server will cancel and notify consumers" + "when queue is down") ] LOG = logging.getLogger(__name__) @@ -233,7 +238,8 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, - nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0): + nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, + enable_cancel_on_failover=False): """Init the Consumer class with the exchange_name, routing_key, type, durable auto_delete """ @@ -255,10 +261,16 @@ class Consumer(object): type=type, durable=self.durable, auto_delete=self.exchange_auto_delete) + self.enable_cancel_on_failover = enable_cancel_on_failover def declare(self, conn): """Re-declare the queue after a rabbit (re)connect.""" + consumer_arguments = None + if self.enable_cancel_on_failover: + consumer_arguments = { + "x-cancel-on-ha-failover": True} + self.queue = kombu.entity.Queue( name=self.queue_name, channel=conn.channel, @@ -266,7 +278,9 @@ class Consumer(object): durable=self.durable, auto_delete=self.queue_auto_delete, routing_key=self.routing_key, - queue_arguments=self.queue_arguments) + queue_arguments=self.queue_arguments, + consumer_arguments=consumer_arguments + ) try: LOG.debug('[%s] Queue.declare: %s', @@ -467,6 +481,7 @@ class Connection(object): self.kombu_failover_strategy = driver_conf.kombu_failover_strategy self.kombu_compression = driver_conf.kombu_compression self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread + self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover if self.heartbeat_in_pthread: # NOTE(hberaud): Experimental: threading module is in use to run @@ -1116,31 +1131,35 @@ class Connection(object): responses for call/multicall """ - consumer = Consumer(exchange_name='', # using default exchange - queue_name=topic, - routing_key='', - type='direct', - durable=False, - exchange_auto_delete=False, - queue_auto_delete=False, - callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues, - rabbit_queue_ttl=self.rabbit_transient_queues_ttl) + consumer = Consumer( + exchange_name='', # using default exchange + queue_name=topic, + routing_key='', + type='direct', + durable=False, + exchange_auto_delete=False, + queue_auto_delete=False, + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues, + rabbit_queue_ttl=self.rabbit_transient_queues_ttl, + enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) def declare_topic_consumer(self, exchange_name, topic, callback=None, queue_name=None): """Create a 'topic' consumer.""" - consumer = Consumer(exchange_name=exchange_name, - queue_name=queue_name or topic, - routing_key=topic, - type='topic', - durable=self.amqp_durable_queues, - exchange_auto_delete=self.amqp_auto_delete, - queue_auto_delete=self.amqp_auto_delete, - callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues) + consumer = Consumer( + exchange_name=exchange_name, + queue_name=queue_name or topic, + routing_key=topic, + type='topic', + durable=self.amqp_durable_queues, + exchange_auto_delete=self.amqp_auto_delete, + queue_auto_delete=self.amqp_auto_delete, + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues, + enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) @@ -1151,16 +1170,18 @@ class Connection(object): exchange_name = '%s_fanout' % topic queue_name = '%s_fanout_%s' % (topic, unique) - consumer = Consumer(exchange_name=exchange_name, - queue_name=queue_name, - routing_key=topic, - type='fanout', - durable=False, - exchange_auto_delete=True, - queue_auto_delete=False, - callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues, - rabbit_queue_ttl=self.rabbit_transient_queues_ttl) + consumer = Consumer( + exchange_name=exchange_name, + queue_name=queue_name, + routing_key=topic, + type='fanout', + durable=False, + exchange_auto_delete=True, + queue_auto_delete=False, + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues, + rabbit_queue_ttl=self.rabbit_transient_queues_ttl, + enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) |