summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/impl_rabbit.py
diff options
context:
space:
mode:
authorhamza alqtaishat <hamalq@verizonmedia.com>2022-02-25 22:08:51 +0000
committerhamza alqtaishat <hamalq@verizonmedia.com>2022-04-06 19:46:40 +0000
commit8932ad237bea44e42297ea7acbbd67f6664fa005 (patch)
treeba0c15f37e071e7608a8d5e10ec1c23d6a5001c2 /oslo_messaging/_drivers/impl_rabbit.py
parent2d090b5d6b78d5a144a656cebfbaf4609dc2e4fa (diff)
downloadoslo-messaging-8932ad237bea44e42297ea7acbbd67f6664fa005.tar.gz
Add quorum queue control configurations
the quorum queue type add features that did not exist before or not handled in rabbitmq the following link shows some of them https://blog.rabbitmq.com/posts/2020/04/rabbitmq-gets-an-ha-upgrade/ the options below control the quorum queue and ensure the stability of the quorum system x-max-in-memory-length x-max-in-memory-bytes x-delivery-limit which control the memory usage and handle message poisoning Closes-Bug: #1962348 Change-Id: I570227d6102681f4f9d8813ed0d7693a1160c21d
Diffstat (limited to 'oslo_messaging/_drivers/impl_rabbit.py')
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py78
1 files changed, 71 insertions, 7 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index ae26f3d..6f737c4 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import collections
import contextlib
import errno
import functools
@@ -45,6 +46,13 @@ from oslo_messaging._drivers import pool
from oslo_messaging import _utils
from oslo_messaging import exceptions
+
+# The QuorumMemConfig will hold the quorum queue memory configurations
+QuorumMemConfig = collections.namedtuple('QuorumMemConfig',
+ 'delivery_limit'
+ ' max_memory_length'
+ ' max_memory_bytes')
+
# NOTE(sileht): don't exist in py2 socket module
TCP_USER_TIMEOUT = 18
@@ -147,6 +155,30 @@ rabbit_opts = [
'in other words the HA queues should be disabled, quorum '
'queues durable by default so the amqp_durable_queues '
'opion is ignored when this option enabled.'),
+ cfg.IntOpt('rabbit_quorum_delivery_limit',
+ default=0,
+ help='Each time a message is redelivered to a consumer, '
+ 'a counter is incremented. Once the redelivery count '
+ 'exceeds the delivery limit the message gets dropped '
+ 'or dead-lettered (if a DLX exchange has been configured) '
+ 'Used only when rabbit_quorum_queue is enabled, '
+ 'Default 0 which means dont set a limit.'),
+ cfg.IntOpt('rabbit_quroum_max_memory_length',
+ default=0,
+ help='By default all messages are maintained in memory '
+ 'if a quorum queue grows in length it can put memory '
+ 'pressure on a cluster. This option can limit the number '
+ 'of messages in the quorum queue. '
+ 'Used only when rabbit_quorum_queue is enabled, '
+ 'Default 0 which means dont set a limit.'),
+ cfg.IntOpt('rabbit_quroum_max_memory_bytes',
+ default=0,
+ help='By default all messages are maintained in memory '
+ 'if a quorum queue grows in length it can put memory '
+ 'pressure on a cluster. This option can limit the number '
+ 'of memory bytes used by the quorum queue. '
+ 'Used only when rabbit_quorum_queue is enabled, '
+ 'Default 0 which means dont set a limit.'),
cfg.IntOpt('rabbit_transient_queues_ttl',
min=1,
default=1800,
@@ -190,7 +222,8 @@ LOG = logging.getLogger(__name__)
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
- rabbit_quorum_queue):
+ rabbit_quorum_queue,
+ rabbit_quorum_queue_config):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we try to declare a mirrored queue
@@ -224,6 +257,14 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
RabbitMQ 3.8.0. If set this option will conflict with
the HA queues (``rabbit_ha_queues``) aka mirrored queues,
in other words HA queues should be disabled.
+
+ rabbit_quorum_queue_config:
+ Quorum queues provides three options to handle message poisoning
+ and limit the resources the qourum queue can use
+ x-delivery-limit number of times the queue will try to deliver
+ a message before it decide to discard it
+ x-max-in-memory-length, x-max-in-memory-bytes control the size
+ of memory used by quorum queue
"""
args = {}
@@ -237,6 +278,15 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
if rabbit_quorum_queue:
args['x-queue-type'] = 'quorum'
+ if rabbit_quorum_queue_config.delivery_limit:
+ args['x-delivery-limit'] = \
+ rabbit_quorum_queue_config.delivery_limit
+ if rabbit_quorum_queue_config.max_memory_length:
+ args['x-max-in-memory-length'] = \
+ rabbit_quorum_queue_config.max_memory_length
+ if rabbit_quorum_queue_config.max_memory_bytes:
+ args['x-max-in-memory-bytes'] = \
+ rabbit_quorum_queue_config.max_memory_bytes
if rabbit_queue_ttl > 0:
args['x-expires'] = rabbit_queue_ttl * 1000
@@ -266,7 +316,8 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
- enable_cancel_on_failover=False, rabbit_quorum_queue=False):
+ enable_cancel_on_failover=False, rabbit_quorum_queue=False,
+ rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)):
"""Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete
"""
@@ -279,9 +330,10 @@ class Consumer(object):
self.callback = callback
self.type = type
self.nowait = nowait
- self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
- rabbit_queue_ttl,
- rabbit_quorum_queue)
+ rabbit_quorum_queue_config = rabbit_quorum_queue_config or {}
+ self.queue_arguments = _get_queue_arguments(
+ rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue,
+ rabbit_quorum_queue_config)
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
@@ -496,6 +548,8 @@ class Connection(object):
self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue
+ self.rabbit_quorum_queue_config = self._get_quorum_configurations(
+ driver_conf)
self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
@@ -709,6 +763,14 @@ class Connection(object):
except KeyError:
raise RuntimeError("Invalid SSL version : %s" % version)
+ def _get_quorum_configurations(self, driver_conf):
+ """Get the quorum queue configurations"""
+ delivery_limit = driver_conf.rabbit_quorum_delivery_limit
+ max_memory_length = driver_conf.rabbit_quroum_max_memory_length
+ max_memory_bytes = driver_conf.rabbit_quroum_max_memory_bytes
+ return QuorumMemConfig(delivery_limit, max_memory_length,
+ max_memory_bytes)
+
# NOTE(moguimar): default_password in this function's context is just
# a fallback option, not a hardcoded password.
def _transform_transport_url(self, url, host, default_username='', # nosec
@@ -1202,7 +1264,8 @@ class Connection(object):
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
enable_cancel_on_failover=self.enable_cancel_on_failover,
- rabbit_quorum_queue=self.rabbit_quorum_queue)
+ rabbit_quorum_queue=self.rabbit_quorum_queue,
+ rabbit_quorum_queue_config=self.rabbit_quorum_queue_config)
self.declare_consumer(consumer)
@@ -1317,7 +1380,8 @@ class Connection(object):
queue_arguments=_get_queue_arguments(
self.rabbit_ha_queues,
0,
- self.rabbit_quorum_queue))
+ self.rabbit_quorum_queue,
+ self.rabbit_quorum_queue_config))
log_info = {'key': routing_key, 'exchange': exchange}
LOG.trace(
'Connection._publish_and_creates_default_queue: '