diff options
Diffstat (limited to 'oslo_messaging/_drivers/impl_rabbit.py')
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 167 |
1 files changed, 153 insertions, 14 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 0519ca1..c61393c 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import contextlib import errno import functools @@ -45,6 +46,13 @@ from oslo_messaging._drivers import pool from oslo_messaging import _utils from oslo_messaging import exceptions + +# The QuorumMemConfig will hold the quorum queue memory configurations +QuorumMemConfig = collections.namedtuple('QuorumMemConfig', + 'delivery_limit' + ' max_memory_length' + ' max_memory_bytes') + # NOTE(sileht): don't exist in py2 socket module TCP_USER_TIMEOUT = 18 @@ -74,8 +82,17 @@ rabbit_opts = [ deprecated_name='kombu_ssl_ca_certs', help='SSL certification authority file ' '(valid only if SSL enabled).'), + cfg.BoolOpt('ssl_enforce_fips_mode', + default=False, + help='Global toggle for enforcing the OpenSSL FIPS mode. ' + 'This feature requires Python support. ' + 'This is available in Python 3.9 in all ' + 'environments and may have been backported to older ' + 'Python versions on select environments. If the Python ' + 'executable used does not support OpenSSL FIPS mode, ' + 'an exception will be raised.'), cfg.BoolOpt('heartbeat_in_pthread', - default=True, + default=False, help="Run the health check heartbeat thread " "through a native python thread by default. If this " "option is equal to False then the health check " @@ -83,7 +100,9 @@ rabbit_opts = [ "from the parent process. For " "example if the parent process has monkey patched the " "stdlib by using eventlet/greenlet then the heartbeat " - "will be run through a green thread.", + "will be run through a green thread. " + "This option should be set to True only for the " + "wsgi services.", ), cfg.FloatOpt('kombu_reconnect_delay', default=1.0, @@ -108,7 +127,7 @@ rabbit_opts = [ 'unavailable. Takes effect only if more than one ' 'RabbitMQ node is provided in config.'), cfg.StrOpt('rabbit_login_method', - choices=('PLAIN', 'AMQPLAIN', 'RABBIT-CR-DEMO'), + choices=('PLAIN', 'AMQPLAIN', 'EXTERNAL', 'RABBIT-CR-DEMO'), default='AMQPLAIN', deprecated_group='DEFAULT', help='The RabbitMQ login method.'), @@ -136,6 +155,41 @@ rabbit_opts = [ 'nodes, run: ' """\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """ """'{"ha-mode": "all"}' \""""), + cfg.BoolOpt('rabbit_quorum_queue', + default=False, + help='Use quorum queues in RabbitMQ (x-queue-type: quorum). ' + 'The quorum queue is a modern queue type for RabbitMQ ' + 'implementing a durable, replicated FIFO queue based on the ' + 'Raft consensus algorithm. It is available as of ' + 'RabbitMQ 3.8.0. If set this option will conflict with ' + 'the HA queues (``rabbit_ha_queues``) aka mirrored queues, ' + 'in other words the HA queues should be disabled, quorum ' + 'queues durable by default so the amqp_durable_queues ' + 'opion is ignored when this option enabled.'), + cfg.IntOpt('rabbit_quorum_delivery_limit', + default=0, + help='Each time a message is redelivered to a consumer, ' + 'a counter is incremented. Once the redelivery count ' + 'exceeds the delivery limit the message gets dropped ' + 'or dead-lettered (if a DLX exchange has been configured) ' + 'Used only when rabbit_quorum_queue is enabled, ' + 'Default 0 which means dont set a limit.'), + cfg.IntOpt('rabbit_quroum_max_memory_length', + default=0, + help='By default all messages are maintained in memory ' + 'if a quorum queue grows in length it can put memory ' + 'pressure on a cluster. This option can limit the number ' + 'of messages in the quorum queue. ' + 'Used only when rabbit_quorum_queue is enabled, ' + 'Default 0 which means dont set a limit.'), + cfg.IntOpt('rabbit_quroum_max_memory_bytes', + default=0, + help='By default all messages are maintained in memory ' + 'if a quorum queue grows in length it can put memory ' + 'pressure on a cluster. This option can limit the number ' + 'of memory bytes used by the quorum queue. ' + 'Used only when rabbit_quorum_queue is enabled, ' + 'Default 0 which means dont set a limit.'), cfg.IntOpt('rabbit_transient_queues_ttl', min=1, default=1800, @@ -178,7 +232,9 @@ rabbit_opts = [ LOG = logging.getLogger(__name__) -def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl): +def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, + rabbit_quorum_queue, + rabbit_quorum_queue_config): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we try to declare a mirrored queue @@ -201,12 +257,48 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl): Setting a queue TTL causes the queue to be automatically deleted if it is unused for the TTL duration. This is a helpful safeguard to prevent queues with zero consumers from growing without bound. + + If the rabbit_quorum_queue option is set, we try to declare a mirrored + queue as described here: + + https://www.rabbitmq.com/quorum-queues.html + + Setting x-queue-type to quorum means that replicated FIFO queue based on + the Raft consensus algorithm will be used. It is available as of + RabbitMQ 3.8.0. If set this option will conflict with + the HA queues (``rabbit_ha_queues``) aka mirrored queues, + in other words HA queues should be disabled. + + rabbit_quorum_queue_config: + Quorum queues provides three options to handle message poisoning + and limit the resources the qourum queue can use + x-delivery-limit number of times the queue will try to deliver + a message before it decide to discard it + x-max-in-memory-length, x-max-in-memory-bytes control the size + of memory used by quorum queue """ args = {} + if rabbit_quorum_queue and rabbit_ha_queues: + raise RuntimeError('Configuration Error: rabbit_quorum_queue ' + 'and rabbit_ha_queues both enabled, queue ' + 'type is quorum or HA (mirrored) not both') + if rabbit_ha_queues: args['x-ha-policy'] = 'all' + if rabbit_quorum_queue: + args['x-queue-type'] = 'quorum' + if rabbit_quorum_queue_config.delivery_limit: + args['x-delivery-limit'] = \ + rabbit_quorum_queue_config.delivery_limit + if rabbit_quorum_queue_config.max_memory_length: + args['x-max-in-memory-length'] = \ + rabbit_quorum_queue_config.max_memory_length + if rabbit_quorum_queue_config.max_memory_bytes: + args['x-max-in-memory-bytes'] = \ + rabbit_quorum_queue_config.max_memory_bytes + if rabbit_queue_ttl > 0: args['x-expires'] = rabbit_queue_ttl * 1000 @@ -235,7 +327,8 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, - enable_cancel_on_failover=False): + enable_cancel_on_failover=False, rabbit_quorum_queue=False, + rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)): """Init the Consumer class with the exchange_name, routing_key, type, durable auto_delete """ @@ -248,8 +341,10 @@ class Consumer(object): self.callback = callback self.type = type self.nowait = nowait - self.queue_arguments = _get_queue_arguments(rabbit_ha_queues, - rabbit_queue_ttl) + rabbit_quorum_queue_config = rabbit_quorum_queue_config or {} + self.queue_arguments = _get_queue_arguments( + rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue, + rabbit_quorum_queue_config) self.queue = None self._declared_on = None self.exchange = kombu.entity.Exchange( @@ -496,16 +591,20 @@ class ConnectionLock(DummyConnectionLock): class Connection(object): """Connection object.""" - def __init__(self, conf, url, purpose): + def __init__(self, conf, url, purpose, retry=None): # NOTE(viktors): Parse config options driver_conf = conf.oslo_messaging_rabbit self.interval_start = driver_conf.rabbit_retry_interval self.interval_stepping = driver_conf.rabbit_retry_backoff self.interval_max = driver_conf.rabbit_interval_max + self.max_retries = retry self.login_method = driver_conf.rabbit_login_method self.rabbit_ha_queues = driver_conf.rabbit_ha_queues + self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue + self.rabbit_quorum_queue_config = self._get_quorum_configurations( + driver_conf) self.rabbit_transient_queues_ttl = \ driver_conf.rabbit_transient_queues_ttl self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count @@ -521,6 +620,7 @@ class Connection(object): self.kombu_failover_strategy = driver_conf.kombu_failover_strategy self.kombu_compression = driver_conf.kombu_compression self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread + self.ssl_enforce_fips_mode = driver_conf.ssl_enforce_fips_mode self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover if self.heartbeat_in_pthread: @@ -558,6 +658,19 @@ class Connection(object): self.ssl_cert_file = driver_conf.ssl_cert_file self.ssl_ca_file = driver_conf.ssl_ca_file + if self.ssl_enforce_fips_mode: + if hasattr(ssl, 'FIPS_mode'): + LOG.info("Enforcing the use of the OpenSSL FIPS mode") + ssl.FIPS_mode_set(1) + else: + raise exceptions.ConfigurationError( + "OpenSSL FIPS mode is not supported by your Python " + "version. You must either change the Python " + "executable used to a version with FIPS mode " + "support or disable FIPS mode by setting the " + "'[oslo_messaging_rabbit] ssl_enforce_fips_mode' " + "configuration option to 'False'.") + self._url = '' if url.hosts: if url.transport.startswith('kombu+'): @@ -705,6 +818,12 @@ class Connection(object): except AttributeError: pass + @property + def durable(self): + # Quorum queues are durable by default, durable option should + # be enabled by default with quorum queues + return self.amqp_durable_queues or self.rabbit_quorum_queue + @classmethod def validate_ssl_version(cls, version): key = version.lower() @@ -713,6 +832,14 @@ class Connection(object): except KeyError: raise RuntimeError("Invalid SSL version : %s" % version) + def _get_quorum_configurations(self, driver_conf): + """Get the quorum queue configurations""" + delivery_limit = driver_conf.rabbit_quorum_delivery_limit + max_memory_length = driver_conf.rabbit_quroum_max_memory_length + max_memory_bytes = driver_conf.rabbit_quroum_max_memory_bytes + return QuorumMemConfig(delivery_limit, max_memory_length, + max_memory_bytes) + # NOTE(moguimar): default_password in this function's context is just # a fallback option, not a hardcoded password. def _transform_transport_url(self, url, host, default_username='', # nosec @@ -772,7 +899,13 @@ class Connection(object): str(exc), interval) self._set_current_channel(None) - self.connection.ensure_connection(errback=on_error) + self.connection.ensure_connection( + errback=on_error, + max_retries=self.max_retries, + interval_start=self.interval_start or 1, + interval_step=self.interval_stepping, + interval_max=self.interval_max, + ) self._set_current_channel(self.connection.channel()) self.set_transport_socket_timeout() @@ -1194,12 +1327,14 @@ class Connection(object): queue_name=queue_name or topic, routing_key=topic, type='topic', - durable=self.amqp_durable_queues, + durable=self.durable, exchange_auto_delete=self.amqp_auto_delete, queue_auto_delete=self.amqp_auto_delete, callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, - enable_cancel_on_failover=self.enable_cancel_on_failover) + enable_cancel_on_failover=self.enable_cancel_on_failover, + rabbit_quorum_queue=self.rabbit_quorum_queue, + rabbit_quorum_queue_config=self.rabbit_quorum_queue_config) self.declare_consumer(consumer) @@ -1324,7 +1459,11 @@ class Connection(object): auto_delete=exchange.auto_delete, name=routing_key, routing_key=routing_key, - queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0)) + queue_arguments=_get_queue_arguments( + self.rabbit_ha_queues, + 0, + self.rabbit_quorum_queue, + self.rabbit_quorum_queue_config)) log_info = {'key': routing_key, 'exchange': exchange} LOG.trace( 'Connection._publish_and_creates_default_queue: ' @@ -1380,7 +1519,7 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.amqp_durable_queues, + durable=self.durable, auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish, exchange, msg, @@ -1402,7 +1541,7 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.amqp_durable_queues, + durable=self.durable, auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish_and_creates_default_queue, |