summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-02-08 14:57:32 +0000
committerGerrit Code Review <review@openstack.org>2022-02-08 14:57:32 +0000
commit2d090b5d6b78d5a144a656cebfbaf4609dc2e4fa (patch)
tree772e4de82ee313a0228178d72ee22a1ef95ebd86
parent5d165cc713a98dbd650e9e6295d7966ce2919935 (diff)
parent7e8acbf87070354cd2438f3ab0821d035c2e7d92 (diff)
downloadoslo-messaging-2d090b5d6b78d5a144a656cebfbaf4609dc2e4fa.tar.gz
Merge "Adding support for rabbitmq quorum queues"12.13.0
-rw-r--r--doc/source/admin/rabbit.rst1
-rw-r--r--oslo_messaging/_drivers/amqp.py4
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py59
-rw-r--r--releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml11
4 files changed, 66 insertions, 9 deletions
diff --git a/doc/source/admin/rabbit.rst b/doc/source/admin/rabbit.rst
index 142bdf7..bf91e47 100644
--- a/doc/source/admin/rabbit.rst
+++ b/doc/source/admin/rabbit.rst
@@ -240,6 +240,7 @@ Consuming Options
^^^^^^^^^^^^^^^^^
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_ha_queues`
+- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_queue`
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_transient_queues_ttl`
Connection Options
diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py
index d4db2c6..b0c9551 100644
--- a/oslo_messaging/_drivers/amqp.py
+++ b/oslo_messaging/_drivers/amqp.py
@@ -32,7 +32,9 @@ from oslo_messaging._drivers import common as rpc_common
amqp_opts = [
cfg.BoolOpt('amqp_durable_queues',
default=False,
- help='Use durable queues in AMQP.'),
+ help='Use durable queues in AMQP. If rabbit_quorum_queue '
+ 'is enabled, queues will be durable and this value will '
+ 'be ignored.'),
cfg.BoolOpt('amqp_auto_delete',
default=False,
deprecated_group='DEFAULT',
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 3ba5418..ae26f3d 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -136,6 +136,17 @@ rabbit_opts = [
'nodes, run: '
"""\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """
"""'{"ha-mode": "all"}' \""""),
+ cfg.BoolOpt('rabbit_quorum_queue',
+ default=False,
+ help='Use quorum queues in RabbitMQ (x-queue-type: quorum). '
+ 'The quorum queue is a modern queue type for RabbitMQ '
+ 'implementing a durable, replicated FIFO queue based on the '
+ 'Raft consensus algorithm. It is available as of '
+ 'RabbitMQ 3.8.0. If set this option will conflict with '
+ 'the HA queues (``rabbit_ha_queues``) aka mirrored queues, '
+ 'in other words the HA queues should be disabled, quorum '
+ 'queues durable by default so the amqp_durable_queues '
+ 'opion is ignored when this option enabled.'),
cfg.IntOpt('rabbit_transient_queues_ttl',
min=1,
default=1800,
@@ -178,7 +189,8 @@ rabbit_opts = [
LOG = logging.getLogger(__name__)
-def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
+def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
+ rabbit_quorum_queue):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we try to declare a mirrored queue
@@ -201,12 +213,31 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
Setting a queue TTL causes the queue to be automatically deleted
if it is unused for the TTL duration. This is a helpful safeguard
to prevent queues with zero consumers from growing without bound.
+
+ If the rabbit_quorum_queue option is set, we try to declare a mirrored
+ queue as described here:
+
+ https://www.rabbitmq.com/quorum-queues.html
+
+ Setting x-queue-type to quorum means that replicated FIFO queue based on
+ the Raft consensus algorithm will be used. It is available as of
+ RabbitMQ 3.8.0. If set this option will conflict with
+ the HA queues (``rabbit_ha_queues``) aka mirrored queues,
+ in other words HA queues should be disabled.
"""
args = {}
+ if rabbit_quorum_queue and rabbit_ha_queues:
+ raise RuntimeError('Configuration Error: rabbit_quorum_queue '
+ 'and rabbit_ha_queues both enabled, queue '
+ 'type is quorum or HA (mirrored) not both')
+
if rabbit_ha_queues:
args['x-ha-policy'] = 'all'
+ if rabbit_quorum_queue:
+ args['x-queue-type'] = 'quorum'
+
if rabbit_queue_ttl > 0:
args['x-expires'] = rabbit_queue_ttl * 1000
@@ -235,7 +266,7 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
- enable_cancel_on_failover=False):
+ enable_cancel_on_failover=False, rabbit_quorum_queue=False):
"""Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete
"""
@@ -249,7 +280,8 @@ class Consumer(object):
self.type = type
self.nowait = nowait
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
- rabbit_queue_ttl)
+ rabbit_queue_ttl,
+ rabbit_quorum_queue)
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
@@ -463,6 +495,7 @@ class Connection(object):
self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
+ self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue
self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
@@ -662,6 +695,12 @@ class Connection(object):
except AttributeError:
pass
+ @property
+ def durable(self):
+ # Quorum queues are durable by default, durable option should
+ # be enabled by default with quorum queues
+ return self.amqp_durable_queues or self.rabbit_quorum_queue
+
@classmethod
def validate_ssl_version(cls, version):
key = version.lower()
@@ -1157,12 +1196,13 @@ class Connection(object):
queue_name=queue_name or topic,
routing_key=topic,
type='topic',
- durable=self.amqp_durable_queues,
+ durable=self.durable,
exchange_auto_delete=self.amqp_auto_delete,
queue_auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
- enable_cancel_on_failover=self.enable_cancel_on_failover)
+ enable_cancel_on_failover=self.enable_cancel_on_failover,
+ rabbit_quorum_queue=self.rabbit_quorum_queue)
self.declare_consumer(consumer)
@@ -1274,7 +1314,10 @@ class Connection(object):
auto_delete=exchange.auto_delete,
name=routing_key,
routing_key=routing_key,
- queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0))
+ queue_arguments=_get_queue_arguments(
+ self.rabbit_ha_queues,
+ 0,
+ self.rabbit_quorum_queue))
log_info = {'key': routing_key, 'exchange': exchange}
LOG.trace(
'Connection._publish_and_creates_default_queue: '
@@ -1330,7 +1373,7 @@ class Connection(object):
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
- durable=self.amqp_durable_queues,
+ durable=self.durable,
auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish, exchange, msg,
@@ -1352,7 +1395,7 @@ class Connection(object):
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
- durable=self.amqp_durable_queues,
+ durable=self.durable,
auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish_and_creates_default_queue,
diff --git a/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml b/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml
new file mode 100644
index 0000000..a88c5d8
--- /dev/null
+++ b/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml
@@ -0,0 +1,11 @@
+---
+features:
+ - |
+ Adding support for quorum queues. Quorum queues are enabled if the
+ ``rabbit_quorum_queue`` parameter is sets (``x-queue-type: quorum``).
+ Setting x-queue-type to quorum means that replicated FIFO queue based on
+ the Raft consensus algorithm will be used. It is available as of
+ RabbitMQ 3.8.0. The quorum queues are durable by default
+ (``amqp_durable_queues``) will be ignored.
+ when enabled the HA queues (``rabbit_ha_queues``) aka mirrored queues
+ should be disabled since the queue can't be both types at the same time