diff options
author | Tincu Gabriel <gabi@aiven.io> | 2020-09-08 01:11:18 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-07 16:11:18 -0700 |
commit | a27ab881726ed1a2d952867a1fa266573165d6aa (patch) | |
tree | 918d904583960e760dfaeaf89d813e789782cdb2 /kafka/protocol/message.py | |
parent | 08ea21167e3d6e9577d16715eadc9829bd8c1a80 (diff) | |
download | kafka-python-a27ab881726ed1a2d952867a1fa266573165d6aa.tar.gz |
Add support for `zstd` compression (#2021)
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 10 |
1 files changed, 7 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 31527bf..4c5c031 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -3,8 +3,8 @@ from __future__ import absolute_import import io import time -from kafka.codec import (has_gzip, has_snappy, has_lz4, - gzip_decode, snappy_decode, +from kafka.codec import (has_gzip, has_snappy, has_lz4, has_zstd, + gzip_decode, snappy_decode, zstd_decode, lz4_decode, lz4_decode_old_kafka) from kafka.protocol.frame import KafkaBytes from kafka.protocol.struct import Struct @@ -35,6 +35,7 @@ class Message(Struct): CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 + CODEC_ZSTD = 0x04 TIMESTAMP_TYPE_MASK = 0x08 HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2) @@ -119,7 +120,7 @@ class Message(Struct): def decompress(self): codec = self.attributes & self.CODEC_MASK - assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4) + assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4, self.CODEC_ZSTD) if codec == self.CODEC_GZIP: assert has_gzip(), 'Gzip decompression unsupported' raw_bytes = gzip_decode(self.value) @@ -132,6 +133,9 @@ class Message(Struct): raw_bytes = lz4_decode_old_kafka(self.value) else: raw_bytes = lz4_decode(self.value) + elif codec == self.CODEC_ZSTD: + assert has_zstd(), "ZSTD decompression unsupported" + raw_bytes = zstd_decode(self.value) else: raise Exception('This should be impossible') |