summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/source/admin/kafka.rst7
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py2
-rw-r--r--oslo_messaging/_drivers/kafka_driver/kafka_options.py7
-rw-r--r--oslo_messaging/tests/drivers/test_impl_kafka.py1
-rw-r--r--oslo_messaging/tests/functional/test_functional.py24
5 files changed, 41 insertions, 0 deletions
diff --git a/doc/source/admin/kafka.rst b/doc/source/admin/kafka.rst
index 3fc3864..c581fb0 100644
--- a/doc/source/admin/kafka.rst
+++ b/doc/source/admin/kafka.rst
@@ -166,6 +166,13 @@ Notifier Options
- :oslo.config:option:`oslo_messaging_kafka.producer_batch_timeout`
- :oslo.config:option:`oslo_messaging_kafka.producer_batch_size`
+compression_codec
+ The compression codec for all data generated by the producer, valid values
+ are: none, gzip, snappy, lz4, zstd. Note that the legal option of this
+ depends on the kafka version, please refer to `kafka documentation`_.
+
+.. _kafka documentation: https://kafka.apache.org/documentation/
+
Security Options
^^^^^^^^^^^^^^^^
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index dc4fe09..07b473d 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -255,6 +255,7 @@ class ProducerConnection(Connection):
super(ProducerConnection, self).__init__(conf, url)
self.batch_size = self.driver_conf.producer_batch_size
self.linger_ms = self.driver_conf.producer_batch_timeout * 1000
+ self.compression_codec = self.driver_conf.compression_codec
self.producer = None
self.producer_lock = threading.Lock()
@@ -317,6 +318,7 @@ class ProducerConnection(Connection):
'bootstrap.servers': ",".join(self.hostaddrs),
'linger.ms': self.linger_ms,
'batch.num.messages': self.batch_size,
+ 'compression.codec': self.compression_codec,
'security.protocol': self.security_protocol,
'sasl.mechanism': self.sasl_mechanism,
'sasl.username': self.username,
diff --git a/oslo_messaging/_drivers/kafka_driver/kafka_options.py b/oslo_messaging/_drivers/kafka_driver/kafka_options.py
index 5fbe7b2..42c990c 100644
--- a/oslo_messaging/_drivers/kafka_driver/kafka_options.py
+++ b/oslo_messaging/_drivers/kafka_driver/kafka_options.py
@@ -48,6 +48,13 @@ KAFKA_OPTS = [
cfg.IntOpt('producer_batch_size', default=16384,
help='Size of batch for the producer async send'),
+ cfg.StrOpt('compression_codec', default='none',
+ choices=['none', 'gzip', 'snappy', 'lz4', 'zstd'],
+ help='The compression codec for all data generated by the '
+ 'producer. Valid values are: gzip, snappy, lz4, zstd. If '
+ 'not set, compression will not be used. Note that the '
+ 'legal option of this depends on the kafka version'),
+
cfg.BoolOpt('enable_auto_commit',
default=False,
help='Enable asynchronous consumer commits'),
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py
index 80af576..0af8c05 100644
--- a/oslo_messaging/tests/drivers/test_impl_kafka.py
+++ b/oslo_messaging/tests/drivers/test_impl_kafka.py
@@ -108,6 +108,7 @@ class TestKafkaDriver(test_utils.BaseTestCase):
'bootstrap.servers': '',
'linger.ms': mock.ANY,
'batch.num.messages': mock.ANY,
+ 'compression.codec': 'none',
'security.protocol': 'PLAINTEXT',
'sasl.mechanism': 'PLAIN',
'sasl.username': mock.ANY,
diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py
index 6800f59..384b372 100644
--- a/oslo_messaging/tests/functional/test_functional.py
+++ b/oslo_messaging/tests/functional/test_functional.py
@@ -539,3 +539,27 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(100, len(events[0][1]))
self.assertEqual(100, len(events[1][1]))
self.assertEqual(5, len(events[2][1]))
+
+ def test_compression(self):
+ get_timeout = 1
+ if self.url.startswith("amqp:"):
+ self.conf.set_override('kombu_compression', 'gzip',
+ group='oslo_messaging_rabbit')
+ if self.url.startswith("kafka://"):
+ get_timeout = 5
+ self.conf.set_override('compression_codec', 'gzip',
+ group='oslo_messaging_kafka')
+ self.conf.set_override('consumer_group', 'test_compression',
+ group='oslo_messaging_kafka')
+
+ listener = self.useFixture(
+ utils.NotificationFixture(self.conf, self.url,
+ ['test_compression']))
+ notifier = listener.notifier('abc')
+
+ notifier.info({}, 'test', 'Hello World!')
+ event = listener.events.get(timeout=get_timeout)
+ self.assertEqual('info', event[0])
+ self.assertEqual('test', event[1])
+ self.assertEqual('Hello World!', event[2])
+ self.assertEqual('abc', event[3])