summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/impl_rabbit.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers/impl_rabbit.py')
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py167
1 files changed, 153 insertions, 14 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 0519ca1..c61393c 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import collections
import contextlib
import errno
import functools
@@ -45,6 +46,13 @@ from oslo_messaging._drivers import pool
from oslo_messaging import _utils
from oslo_messaging import exceptions
+
+# The QuorumMemConfig will hold the quorum queue memory configurations
+QuorumMemConfig = collections.namedtuple('QuorumMemConfig',
+ 'delivery_limit'
+ ' max_memory_length'
+ ' max_memory_bytes')
+
# NOTE(sileht): don't exist in py2 socket module
TCP_USER_TIMEOUT = 18
@@ -74,8 +82,17 @@ rabbit_opts = [
deprecated_name='kombu_ssl_ca_certs',
help='SSL certification authority file '
'(valid only if SSL enabled).'),
+ cfg.BoolOpt('ssl_enforce_fips_mode',
+ default=False,
+ help='Global toggle for enforcing the OpenSSL FIPS mode. '
+ 'This feature requires Python support. '
+ 'This is available in Python 3.9 in all '
+ 'environments and may have been backported to older '
+ 'Python versions on select environments. If the Python '
+ 'executable used does not support OpenSSL FIPS mode, '
+ 'an exception will be raised.'),
cfg.BoolOpt('heartbeat_in_pthread',
- default=True,
+ default=False,
help="Run the health check heartbeat thread "
"through a native python thread by default. If this "
"option is equal to False then the health check "
@@ -83,7 +100,9 @@ rabbit_opts = [
"from the parent process. For "
"example if the parent process has monkey patched the "
"stdlib by using eventlet/greenlet then the heartbeat "
- "will be run through a green thread.",
+ "will be run through a green thread. "
+ "This option should be set to True only for the "
+ "wsgi services.",
),
cfg.FloatOpt('kombu_reconnect_delay',
default=1.0,
@@ -108,7 +127,7 @@ rabbit_opts = [
'unavailable. Takes effect only if more than one '
'RabbitMQ node is provided in config.'),
cfg.StrOpt('rabbit_login_method',
- choices=('PLAIN', 'AMQPLAIN', 'RABBIT-CR-DEMO'),
+ choices=('PLAIN', 'AMQPLAIN', 'EXTERNAL', 'RABBIT-CR-DEMO'),
default='AMQPLAIN',
deprecated_group='DEFAULT',
help='The RabbitMQ login method.'),
@@ -136,6 +155,41 @@ rabbit_opts = [
'nodes, run: '
"""\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """
"""'{"ha-mode": "all"}' \""""),
+ cfg.BoolOpt('rabbit_quorum_queue',
+ default=False,
+ help='Use quorum queues in RabbitMQ (x-queue-type: quorum). '
+ 'The quorum queue is a modern queue type for RabbitMQ '
+ 'implementing a durable, replicated FIFO queue based on the '
+ 'Raft consensus algorithm. It is available as of '
+ 'RabbitMQ 3.8.0. If set this option will conflict with '
+ 'the HA queues (``rabbit_ha_queues``) aka mirrored queues, '
+ 'in other words the HA queues should be disabled, quorum '
+ 'queues durable by default so the amqp_durable_queues '
+ 'opion is ignored when this option enabled.'),
+ cfg.IntOpt('rabbit_quorum_delivery_limit',
+ default=0,
+ help='Each time a message is redelivered to a consumer, '
+ 'a counter is incremented. Once the redelivery count '
+ 'exceeds the delivery limit the message gets dropped '
+ 'or dead-lettered (if a DLX exchange has been configured) '
+ 'Used only when rabbit_quorum_queue is enabled, '
+ 'Default 0 which means dont set a limit.'),
+ cfg.IntOpt('rabbit_quroum_max_memory_length',
+ default=0,
+ help='By default all messages are maintained in memory '
+ 'if a quorum queue grows in length it can put memory '
+ 'pressure on a cluster. This option can limit the number '
+ 'of messages in the quorum queue. '
+ 'Used only when rabbit_quorum_queue is enabled, '
+ 'Default 0 which means dont set a limit.'),
+ cfg.IntOpt('rabbit_quroum_max_memory_bytes',
+ default=0,
+ help='By default all messages are maintained in memory '
+ 'if a quorum queue grows in length it can put memory '
+ 'pressure on a cluster. This option can limit the number '
+ 'of memory bytes used by the quorum queue. '
+ 'Used only when rabbit_quorum_queue is enabled, '
+ 'Default 0 which means dont set a limit.'),
cfg.IntOpt('rabbit_transient_queues_ttl',
min=1,
default=1800,
@@ -178,7 +232,9 @@ rabbit_opts = [
LOG = logging.getLogger(__name__)
-def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
+def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
+ rabbit_quorum_queue,
+ rabbit_quorum_queue_config):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we try to declare a mirrored queue
@@ -201,12 +257,48 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
Setting a queue TTL causes the queue to be automatically deleted
if it is unused for the TTL duration. This is a helpful safeguard
to prevent queues with zero consumers from growing without bound.
+
+ If the rabbit_quorum_queue option is set, we try to declare a mirrored
+ queue as described here:
+
+ https://www.rabbitmq.com/quorum-queues.html
+
+ Setting x-queue-type to quorum means that replicated FIFO queue based on
+ the Raft consensus algorithm will be used. It is available as of
+ RabbitMQ 3.8.0. If set this option will conflict with
+ the HA queues (``rabbit_ha_queues``) aka mirrored queues,
+ in other words HA queues should be disabled.
+
+ rabbit_quorum_queue_config:
+ Quorum queues provides three options to handle message poisoning
+ and limit the resources the qourum queue can use
+ x-delivery-limit number of times the queue will try to deliver
+ a message before it decide to discard it
+ x-max-in-memory-length, x-max-in-memory-bytes control the size
+ of memory used by quorum queue
"""
args = {}
+ if rabbit_quorum_queue and rabbit_ha_queues:
+ raise RuntimeError('Configuration Error: rabbit_quorum_queue '
+ 'and rabbit_ha_queues both enabled, queue '
+ 'type is quorum or HA (mirrored) not both')
+
if rabbit_ha_queues:
args['x-ha-policy'] = 'all'
+ if rabbit_quorum_queue:
+ args['x-queue-type'] = 'quorum'
+ if rabbit_quorum_queue_config.delivery_limit:
+ args['x-delivery-limit'] = \
+ rabbit_quorum_queue_config.delivery_limit
+ if rabbit_quorum_queue_config.max_memory_length:
+ args['x-max-in-memory-length'] = \
+ rabbit_quorum_queue_config.max_memory_length
+ if rabbit_quorum_queue_config.max_memory_bytes:
+ args['x-max-in-memory-bytes'] = \
+ rabbit_quorum_queue_config.max_memory_bytes
+
if rabbit_queue_ttl > 0:
args['x-expires'] = rabbit_queue_ttl * 1000
@@ -235,7 +327,8 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
- enable_cancel_on_failover=False):
+ enable_cancel_on_failover=False, rabbit_quorum_queue=False,
+ rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)):
"""Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete
"""
@@ -248,8 +341,10 @@ class Consumer(object):
self.callback = callback
self.type = type
self.nowait = nowait
- self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
- rabbit_queue_ttl)
+ rabbit_quorum_queue_config = rabbit_quorum_queue_config or {}
+ self.queue_arguments = _get_queue_arguments(
+ rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue,
+ rabbit_quorum_queue_config)
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
@@ -496,16 +591,20 @@ class ConnectionLock(DummyConnectionLock):
class Connection(object):
"""Connection object."""
- def __init__(self, conf, url, purpose):
+ def __init__(self, conf, url, purpose, retry=None):
# NOTE(viktors): Parse config options
driver_conf = conf.oslo_messaging_rabbit
self.interval_start = driver_conf.rabbit_retry_interval
self.interval_stepping = driver_conf.rabbit_retry_backoff
self.interval_max = driver_conf.rabbit_interval_max
+ self.max_retries = retry
self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
+ self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue
+ self.rabbit_quorum_queue_config = self._get_quorum_configurations(
+ driver_conf)
self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
@@ -521,6 +620,7 @@ class Connection(object):
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
self.kombu_compression = driver_conf.kombu_compression
self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread
+ self.ssl_enforce_fips_mode = driver_conf.ssl_enforce_fips_mode
self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover
if self.heartbeat_in_pthread:
@@ -558,6 +658,19 @@ class Connection(object):
self.ssl_cert_file = driver_conf.ssl_cert_file
self.ssl_ca_file = driver_conf.ssl_ca_file
+ if self.ssl_enforce_fips_mode:
+ if hasattr(ssl, 'FIPS_mode'):
+ LOG.info("Enforcing the use of the OpenSSL FIPS mode")
+ ssl.FIPS_mode_set(1)
+ else:
+ raise exceptions.ConfigurationError(
+ "OpenSSL FIPS mode is not supported by your Python "
+ "version. You must either change the Python "
+ "executable used to a version with FIPS mode "
+ "support or disable FIPS mode by setting the "
+ "'[oslo_messaging_rabbit] ssl_enforce_fips_mode' "
+ "configuration option to 'False'.")
+
self._url = ''
if url.hosts:
if url.transport.startswith('kombu+'):
@@ -705,6 +818,12 @@ class Connection(object):
except AttributeError:
pass
+ @property
+ def durable(self):
+ # Quorum queues are durable by default, durable option should
+ # be enabled by default with quorum queues
+ return self.amqp_durable_queues or self.rabbit_quorum_queue
+
@classmethod
def validate_ssl_version(cls, version):
key = version.lower()
@@ -713,6 +832,14 @@ class Connection(object):
except KeyError:
raise RuntimeError("Invalid SSL version : %s" % version)
+ def _get_quorum_configurations(self, driver_conf):
+ """Get the quorum queue configurations"""
+ delivery_limit = driver_conf.rabbit_quorum_delivery_limit
+ max_memory_length = driver_conf.rabbit_quroum_max_memory_length
+ max_memory_bytes = driver_conf.rabbit_quroum_max_memory_bytes
+ return QuorumMemConfig(delivery_limit, max_memory_length,
+ max_memory_bytes)
+
# NOTE(moguimar): default_password in this function's context is just
# a fallback option, not a hardcoded password.
def _transform_transport_url(self, url, host, default_username='', # nosec
@@ -772,7 +899,13 @@ class Connection(object):
str(exc), interval)
self._set_current_channel(None)
- self.connection.ensure_connection(errback=on_error)
+ self.connection.ensure_connection(
+ errback=on_error,
+ max_retries=self.max_retries,
+ interval_start=self.interval_start or 1,
+ interval_step=self.interval_stepping,
+ interval_max=self.interval_max,
+ )
self._set_current_channel(self.connection.channel())
self.set_transport_socket_timeout()
@@ -1194,12 +1327,14 @@ class Connection(object):
queue_name=queue_name or topic,
routing_key=topic,
type='topic',
- durable=self.amqp_durable_queues,
+ durable=self.durable,
exchange_auto_delete=self.amqp_auto_delete,
queue_auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
- enable_cancel_on_failover=self.enable_cancel_on_failover)
+ enable_cancel_on_failover=self.enable_cancel_on_failover,
+ rabbit_quorum_queue=self.rabbit_quorum_queue,
+ rabbit_quorum_queue_config=self.rabbit_quorum_queue_config)
self.declare_consumer(consumer)
@@ -1324,7 +1459,11 @@ class Connection(object):
auto_delete=exchange.auto_delete,
name=routing_key,
routing_key=routing_key,
- queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0))
+ queue_arguments=_get_queue_arguments(
+ self.rabbit_ha_queues,
+ 0,
+ self.rabbit_quorum_queue,
+ self.rabbit_quorum_queue_config))
log_info = {'key': routing_key, 'exchange': exchange}
LOG.trace(
'Connection._publish_and_creates_default_queue: '
@@ -1380,7 +1519,7 @@ class Connection(object):
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
- durable=self.amqp_durable_queues,
+ durable=self.durable,
auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish, exchange, msg,
@@ -1402,7 +1541,7 @@ class Connection(object):
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
- durable=self.amqp_durable_queues,
+ durable=self.durable,
auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish_and_creates_default_queue,