diff options
author | hamza alqtaishat <hamalq@verizonmedia.com> | 2022-02-25 22:08:51 +0000 |
---|---|---|
committer | hamza alqtaishat <hamalq@verizonmedia.com> | 2022-04-06 19:46:40 +0000 |
commit | 8932ad237bea44e42297ea7acbbd67f6664fa005 (patch) | |
tree | ba0c15f37e071e7608a8d5e10ec1c23d6a5001c2 /oslo_messaging/_drivers/impl_rabbit.py | |
parent | 2d090b5d6b78d5a144a656cebfbaf4609dc2e4fa (diff) | |
download | oslo-messaging-8932ad237bea44e42297ea7acbbd67f6664fa005.tar.gz |
Add quorum queue control configurations
the quorum queue type add features that did not exist before or not
handled in rabbitmq the following link shows some of them
https://blog.rabbitmq.com/posts/2020/04/rabbitmq-gets-an-ha-upgrade/
the options below control the quorum queue and ensure the stability of
the quorum system
x-max-in-memory-length
x-max-in-memory-bytes
x-delivery-limit
which control the memory usage and handle message poisoning
Closes-Bug: #1962348
Change-Id: I570227d6102681f4f9d8813ed0d7693a1160c21d
Diffstat (limited to 'oslo_messaging/_drivers/impl_rabbit.py')
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 78 |
1 files changed, 71 insertions, 7 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index ae26f3d..6f737c4 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import contextlib import errno import functools @@ -45,6 +46,13 @@ from oslo_messaging._drivers import pool from oslo_messaging import _utils from oslo_messaging import exceptions + +# The QuorumMemConfig will hold the quorum queue memory configurations +QuorumMemConfig = collections.namedtuple('QuorumMemConfig', + 'delivery_limit' + ' max_memory_length' + ' max_memory_bytes') + # NOTE(sileht): don't exist in py2 socket module TCP_USER_TIMEOUT = 18 @@ -147,6 +155,30 @@ rabbit_opts = [ 'in other words the HA queues should be disabled, quorum ' 'queues durable by default so the amqp_durable_queues ' 'opion is ignored when this option enabled.'), + cfg.IntOpt('rabbit_quorum_delivery_limit', + default=0, + help='Each time a message is redelivered to a consumer, ' + 'a counter is incremented. Once the redelivery count ' + 'exceeds the delivery limit the message gets dropped ' + 'or dead-lettered (if a DLX exchange has been configured) ' + 'Used only when rabbit_quorum_queue is enabled, ' + 'Default 0 which means dont set a limit.'), + cfg.IntOpt('rabbit_quroum_max_memory_length', + default=0, + help='By default all messages are maintained in memory ' + 'if a quorum queue grows in length it can put memory ' + 'pressure on a cluster. This option can limit the number ' + 'of messages in the quorum queue. ' + 'Used only when rabbit_quorum_queue is enabled, ' + 'Default 0 which means dont set a limit.'), + cfg.IntOpt('rabbit_quroum_max_memory_bytes', + default=0, + help='By default all messages are maintained in memory ' + 'if a quorum queue grows in length it can put memory ' + 'pressure on a cluster. This option can limit the number ' + 'of memory bytes used by the quorum queue. ' + 'Used only when rabbit_quorum_queue is enabled, ' + 'Default 0 which means dont set a limit.'), cfg.IntOpt('rabbit_transient_queues_ttl', min=1, default=1800, @@ -190,7 +222,8 @@ LOG = logging.getLogger(__name__) def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, - rabbit_quorum_queue): + rabbit_quorum_queue, + rabbit_quorum_queue_config): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we try to declare a mirrored queue @@ -224,6 +257,14 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, RabbitMQ 3.8.0. If set this option will conflict with the HA queues (``rabbit_ha_queues``) aka mirrored queues, in other words HA queues should be disabled. + + rabbit_quorum_queue_config: + Quorum queues provides three options to handle message poisoning + and limit the resources the qourum queue can use + x-delivery-limit number of times the queue will try to deliver + a message before it decide to discard it + x-max-in-memory-length, x-max-in-memory-bytes control the size + of memory used by quorum queue """ args = {} @@ -237,6 +278,15 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, if rabbit_quorum_queue: args['x-queue-type'] = 'quorum' + if rabbit_quorum_queue_config.delivery_limit: + args['x-delivery-limit'] = \ + rabbit_quorum_queue_config.delivery_limit + if rabbit_quorum_queue_config.max_memory_length: + args['x-max-in-memory-length'] = \ + rabbit_quorum_queue_config.max_memory_length + if rabbit_quorum_queue_config.max_memory_bytes: + args['x-max-in-memory-bytes'] = \ + rabbit_quorum_queue_config.max_memory_bytes if rabbit_queue_ttl > 0: args['x-expires'] = rabbit_queue_ttl * 1000 @@ -266,7 +316,8 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, - enable_cancel_on_failover=False, rabbit_quorum_queue=False): + enable_cancel_on_failover=False, rabbit_quorum_queue=False, + rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)): """Init the Consumer class with the exchange_name, routing_key, type, durable auto_delete """ @@ -279,9 +330,10 @@ class Consumer(object): self.callback = callback self.type = type self.nowait = nowait - self.queue_arguments = _get_queue_arguments(rabbit_ha_queues, - rabbit_queue_ttl, - rabbit_quorum_queue) + rabbit_quorum_queue_config = rabbit_quorum_queue_config or {} + self.queue_arguments = _get_queue_arguments( + rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue, + rabbit_quorum_queue_config) self.queue = None self._declared_on = None self.exchange = kombu.entity.Exchange( @@ -496,6 +548,8 @@ class Connection(object): self.login_method = driver_conf.rabbit_login_method self.rabbit_ha_queues = driver_conf.rabbit_ha_queues self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue + self.rabbit_quorum_queue_config = self._get_quorum_configurations( + driver_conf) self.rabbit_transient_queues_ttl = \ driver_conf.rabbit_transient_queues_ttl self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count @@ -709,6 +763,14 @@ class Connection(object): except KeyError: raise RuntimeError("Invalid SSL version : %s" % version) + def _get_quorum_configurations(self, driver_conf): + """Get the quorum queue configurations""" + delivery_limit = driver_conf.rabbit_quorum_delivery_limit + max_memory_length = driver_conf.rabbit_quroum_max_memory_length + max_memory_bytes = driver_conf.rabbit_quroum_max_memory_bytes + return QuorumMemConfig(delivery_limit, max_memory_length, + max_memory_bytes) + # NOTE(moguimar): default_password in this function's context is just # a fallback option, not a hardcoded password. def _transform_transport_url(self, url, host, default_username='', # nosec @@ -1202,7 +1264,8 @@ class Connection(object): callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, enable_cancel_on_failover=self.enable_cancel_on_failover, - rabbit_quorum_queue=self.rabbit_quorum_queue) + rabbit_quorum_queue=self.rabbit_quorum_queue, + rabbit_quorum_queue_config=self.rabbit_quorum_queue_config) self.declare_consumer(consumer) @@ -1317,7 +1380,8 @@ class Connection(object): queue_arguments=_get_queue_arguments( self.rabbit_ha_queues, 0, - self.rabbit_quorum_queue)) + self.rabbit_quorum_queue, + self.rabbit_quorum_queue_config)) log_info = {'key': routing_key, 'exchange': exchange} LOG.trace( 'Connection._publish_and_creates_default_queue: ' |