summaryrefslogtreecommitdiff
path: root/kafka/protocol/message.py
diff options
context:
space:
mode:
authorTincu Gabriel <gabi@aiven.io>2020-09-08 01:11:18 +0200
committerGitHub <noreply@github.com>2020-09-07 16:11:18 -0700
commita27ab881726ed1a2d952867a1fa266573165d6aa (patch)
tree918d904583960e760dfaeaf89d813e789782cdb2 /kafka/protocol/message.py
parent08ea21167e3d6e9577d16715eadc9829bd8c1a80 (diff)
downloadkafka-python-a27ab881726ed1a2d952867a1fa266573165d6aa.tar.gz
Add support for `zstd` compression (#2021)
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r--kafka/protocol/message.py10
1 files changed, 7 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 31527bf..4c5c031 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -3,8 +3,8 @@ from __future__ import absolute_import
import io
import time
-from kafka.codec import (has_gzip, has_snappy, has_lz4,
- gzip_decode, snappy_decode,
+from kafka.codec import (has_gzip, has_snappy, has_lz4, has_zstd,
+ gzip_decode, snappy_decode, zstd_decode,
lz4_decode, lz4_decode_old_kafka)
from kafka.protocol.frame import KafkaBytes
from kafka.protocol.struct import Struct
@@ -35,6 +35,7 @@ class Message(Struct):
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
CODEC_LZ4 = 0x03
+ CODEC_ZSTD = 0x04
TIMESTAMP_TYPE_MASK = 0x08
HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
@@ -119,7 +120,7 @@ class Message(Struct):
def decompress(self):
codec = self.attributes & self.CODEC_MASK
- assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4)
+ assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4, self.CODEC_ZSTD)
if codec == self.CODEC_GZIP:
assert has_gzip(), 'Gzip decompression unsupported'
raw_bytes = gzip_decode(self.value)
@@ -132,6 +133,9 @@ class Message(Struct):
raw_bytes = lz4_decode_old_kafka(self.value)
else:
raw_bytes = lz4_decode(self.value)
+ elif codec == self.CODEC_ZSTD:
+ assert has_zstd(), "ZSTD decompression unsupported"
+ raw_bytes = zstd_decode(self.value)
else:
raise Exception('This should be impossible')