summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-06-13 17:14:16 +0000
committerGerrit Code Review <review@openstack.org>2022-06-13 17:14:16 +0000
commit41863867489c8a84823f2b3d73cab5585f1fb4ae (patch)
tree34fbb05d7b886fd2b20e9e8568091000e9639834
parentf611370555c994dd685b680a4ef09dbd57b697b2 (diff)
parent8932ad237bea44e42297ea7acbbd67f6664fa005 (diff)
downloadoslo-messaging-41863867489c8a84823f2b3d73cab5585f1fb4ae.tar.gz
Merge "Add quorum queue control configurations"13.0.0
-rw-r--r--doc/source/admin/rabbit.rst3
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py78
-rw-r--r--releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml6
3 files changed, 80 insertions, 7 deletions
diff --git a/doc/source/admin/rabbit.rst b/doc/source/admin/rabbit.rst
index 4fa159f..b9433a5 100644
--- a/doc/source/admin/rabbit.rst
+++ b/doc/source/admin/rabbit.rst
@@ -241,6 +241,9 @@ Consuming Options
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_ha_queues`
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_queue`
+- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_delivery_limit`
+- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quroum_max_memory_length`
+- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quroum_max_memory_bytes`
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_transient_queues_ttl`
Connection Options
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 2b424d7..ed2642c 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import collections
import contextlib
import errno
import functools
@@ -45,6 +46,13 @@ from oslo_messaging._drivers import pool
from oslo_messaging import _utils
from oslo_messaging import exceptions
+
+# The QuorumMemConfig will hold the quorum queue memory configurations
+QuorumMemConfig = collections.namedtuple('QuorumMemConfig',
+ 'delivery_limit'
+ ' max_memory_length'
+ ' max_memory_bytes')
+
# NOTE(sileht): don't exist in py2 socket module
TCP_USER_TIMEOUT = 18
@@ -156,6 +164,30 @@ rabbit_opts = [
'in other words the HA queues should be disabled, quorum '
'queues durable by default so the amqp_durable_queues '
'opion is ignored when this option enabled.'),
+ cfg.IntOpt('rabbit_quorum_delivery_limit',
+ default=0,
+ help='Each time a message is redelivered to a consumer, '
+ 'a counter is incremented. Once the redelivery count '
+ 'exceeds the delivery limit the message gets dropped '
+ 'or dead-lettered (if a DLX exchange has been configured) '
+ 'Used only when rabbit_quorum_queue is enabled, '
+ 'Default 0 which means dont set a limit.'),
+ cfg.IntOpt('rabbit_quroum_max_memory_length',
+ default=0,
+ help='By default all messages are maintained in memory '
+ 'if a quorum queue grows in length it can put memory '
+ 'pressure on a cluster. This option can limit the number '
+ 'of messages in the quorum queue. '
+ 'Used only when rabbit_quorum_queue is enabled, '
+ 'Default 0 which means dont set a limit.'),
+ cfg.IntOpt('rabbit_quroum_max_memory_bytes',
+ default=0,
+ help='By default all messages are maintained in memory '
+ 'if a quorum queue grows in length it can put memory '
+ 'pressure on a cluster. This option can limit the number '
+ 'of memory bytes used by the quorum queue. '
+ 'Used only when rabbit_quorum_queue is enabled, '
+ 'Default 0 which means dont set a limit.'),
cfg.IntOpt('rabbit_transient_queues_ttl',
min=1,
default=1800,
@@ -199,7 +231,8 @@ LOG = logging.getLogger(__name__)
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
- rabbit_quorum_queue):
+ rabbit_quorum_queue,
+ rabbit_quorum_queue_config):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we try to declare a mirrored queue
@@ -233,6 +266,14 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
RabbitMQ 3.8.0. If set this option will conflict with
the HA queues (``rabbit_ha_queues``) aka mirrored queues,
in other words HA queues should be disabled.
+
+ rabbit_quorum_queue_config:
+ Quorum queues provides three options to handle message poisoning
+ and limit the resources the qourum queue can use
+ x-delivery-limit number of times the queue will try to deliver
+ a message before it decide to discard it
+ x-max-in-memory-length, x-max-in-memory-bytes control the size
+ of memory used by quorum queue
"""
args = {}
@@ -246,6 +287,15 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
if rabbit_quorum_queue:
args['x-queue-type'] = 'quorum'
+ if rabbit_quorum_queue_config.delivery_limit:
+ args['x-delivery-limit'] = \
+ rabbit_quorum_queue_config.delivery_limit
+ if rabbit_quorum_queue_config.max_memory_length:
+ args['x-max-in-memory-length'] = \
+ rabbit_quorum_queue_config.max_memory_length
+ if rabbit_quorum_queue_config.max_memory_bytes:
+ args['x-max-in-memory-bytes'] = \
+ rabbit_quorum_queue_config.max_memory_bytes
if rabbit_queue_ttl > 0:
args['x-expires'] = rabbit_queue_ttl * 1000
@@ -275,7 +325,8 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
- enable_cancel_on_failover=False, rabbit_quorum_queue=False):
+ enable_cancel_on_failover=False, rabbit_quorum_queue=False,
+ rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)):
"""Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete
"""
@@ -288,9 +339,10 @@ class Consumer(object):
self.callback = callback
self.type = type
self.nowait = nowait
- self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
- rabbit_queue_ttl,
- rabbit_quorum_queue)
+ rabbit_quorum_queue_config = rabbit_quorum_queue_config or {}
+ self.queue_arguments = _get_queue_arguments(
+ rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue,
+ rabbit_quorum_queue_config)
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
@@ -505,6 +557,8 @@ class Connection(object):
self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue
+ self.rabbit_quorum_queue_config = self._get_quorum_configurations(
+ driver_conf)
self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
@@ -732,6 +786,14 @@ class Connection(object):
except KeyError:
raise RuntimeError("Invalid SSL version : %s" % version)
+ def _get_quorum_configurations(self, driver_conf):
+ """Get the quorum queue configurations"""
+ delivery_limit = driver_conf.rabbit_quorum_delivery_limit
+ max_memory_length = driver_conf.rabbit_quroum_max_memory_length
+ max_memory_bytes = driver_conf.rabbit_quroum_max_memory_bytes
+ return QuorumMemConfig(delivery_limit, max_memory_length,
+ max_memory_bytes)
+
# NOTE(moguimar): default_password in this function's context is just
# a fallback option, not a hardcoded password.
def _transform_transport_url(self, url, host, default_username='', # nosec
@@ -1225,7 +1287,8 @@ class Connection(object):
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
enable_cancel_on_failover=self.enable_cancel_on_failover,
- rabbit_quorum_queue=self.rabbit_quorum_queue)
+ rabbit_quorum_queue=self.rabbit_quorum_queue,
+ rabbit_quorum_queue_config=self.rabbit_quorum_queue_config)
self.declare_consumer(consumer)
@@ -1340,7 +1403,8 @@ class Connection(object):
queue_arguments=_get_queue_arguments(
self.rabbit_ha_queues,
0,
- self.rabbit_quorum_queue))
+ self.rabbit_quorum_queue,
+ self.rabbit_quorum_queue_config))
log_info = {'key': routing_key, 'exchange': exchange}
LOG.trace(
'Connection._publish_and_creates_default_queue: '
diff --git a/releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml b/releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml
new file mode 100644
index 0000000..42fdfbf
--- /dev/null
+++ b/releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml
@@ -0,0 +1,6 @@
+---
+features:
+ - |
+ Add quorum configuration x-max-in-memory-length,
+ x-max-in-memory-bytes, x-delivery-limit which control the quorum
+ queue memory usage and handle the message poisoning problem