summaryrefslogtreecommitdiff
path: root/kafka/producer/kafka.py
diff options
context:
space:
mode:
authorTincu Gabriel <gabi@aiven.io>2020-09-08 01:11:18 +0200
committerGitHub <noreply@github.com>2020-09-07 16:11:18 -0700
commita27ab881726ed1a2d952867a1fa266573165d6aa (patch)
tree918d904583960e760dfaeaf89d813e789782cdb2 /kafka/producer/kafka.py
parent08ea21167e3d6e9577d16715eadc9829bd8c1a80 (diff)
downloadkafka-python-a27ab881726ed1a2d952867a1fa266573165d6aa.tar.gz
Add support for `zstd` compression (#2021)
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r--kafka/producer/kafka.py8
1 files changed, 6 insertions, 2 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 9509ab9..dba1801 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -12,7 +12,7 @@ from kafka.vendor import six
import kafka.errors as Errors
from kafka.client_async import KafkaClient, selectors
-from kafka.codec import has_gzip, has_snappy, has_lz4
+from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd
from kafka.metrics import MetricConfig, Metrics
from kafka.partitioner.default import DefaultPartitioner
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
@@ -119,7 +119,7 @@ class KafkaProducer(object):
available guarantee.
If unset, defaults to acks=1.
compression_type (str): The compression type for all data generated by
- the producer. Valid values are 'gzip', 'snappy', 'lz4', or None.
+ the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or None.
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression). Default: None.
@@ -339,6 +339,7 @@ class KafkaProducer(object):
'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4),
+ 'zstd': (has_zstd, DefaultRecordBatchBuilder.CODEC_ZSTD),
None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE),
}
@@ -388,6 +389,9 @@ class KafkaProducer(object):
if self.config['compression_type'] == 'lz4':
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
+ if self.config['compression_type'] == 'zstd':
+ assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers'
+
# Check compression_type for library support
ct = self.config['compression_type']
if ct not in self._COMPRESSORS: