diff options
author | Tincu Gabriel <gabi@aiven.io> | 2020-09-08 01:11:18 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-07 16:11:18 -0700 |
commit | a27ab881726ed1a2d952867a1fa266573165d6aa (patch) | |
tree | 918d904583960e760dfaeaf89d813e789782cdb2 /kafka/producer/kafka.py | |
parent | 08ea21167e3d6e9577d16715eadc9829bd8c1a80 (diff) | |
download | kafka-python-a27ab881726ed1a2d952867a1fa266573165d6aa.tar.gz |
Add support for `zstd` compression (#2021)
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r-- | kafka/producer/kafka.py | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 9509ab9..dba1801 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -12,7 +12,7 @@ from kafka.vendor import six import kafka.errors as Errors from kafka.client_async import KafkaClient, selectors -from kafka.codec import has_gzip, has_snappy, has_lz4 +from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd from kafka.metrics import MetricConfig, Metrics from kafka.partitioner.default import DefaultPartitioner from kafka.producer.future import FutureRecordMetadata, FutureProduceResult @@ -119,7 +119,7 @@ class KafkaProducer(object): available guarantee. If unset, defaults to acks=1. compression_type (str): The compression type for all data generated by - the producer. Valid values are 'gzip', 'snappy', 'lz4', or None. + the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or None. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). Default: None. @@ -339,6 +339,7 @@ class KafkaProducer(object): 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP), 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY), 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4), + 'zstd': (has_zstd, DefaultRecordBatchBuilder.CODEC_ZSTD), None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE), } @@ -388,6 +389,9 @@ class KafkaProducer(object): if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' + if self.config['compression_type'] == 'zstd': + assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers' + # Check compression_type for library support ct = self.config['compression_type'] if ct not in self._COMPRESSORS: |