summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorTincu Gabriel <gabi@aiven.io>2020-09-08 01:11:18 +0200
committerGitHub <noreply@github.com>2020-09-07 16:11:18 -0700
commita27ab881726ed1a2d952867a1fa266573165d6aa (patch)
tree918d904583960e760dfaeaf89d813e789782cdb2 /test
parent08ea21167e3d6e9577d16715eadc9829bd8c1a80 (diff)
downloadkafka-python-a27ab881726ed1a2d952867a1fa266573165d6aa.tar.gz
Add support for `zstd` compression (#2021)
Diffstat (limited to 'test')
-rw-r--r--test/test_codec.py11
-rw-r--r--test/test_producer.py20
2 files changed, 20 insertions, 11 deletions
diff --git a/test/test_codec.py b/test/test_codec.py
index 9eff888..e057074 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -7,11 +7,12 @@ import pytest
from kafka.vendor.six.moves import range
from kafka.codec import (
- has_snappy, has_lz4,
+ has_snappy, has_lz4, has_zstd,
gzip_encode, gzip_decode,
snappy_encode, snappy_decode,
lz4_encode, lz4_decode,
lz4_encode_old_kafka, lz4_decode_old_kafka,
+ zstd_encode, zstd_decode,
)
from test.testutil import random_string
@@ -113,3 +114,11 @@ def test_lz4_incremental():
b2 = lz4_decode(lz4_encode(b1))
assert len(b1) == len(b2)
assert b1 == b2
+
+
+@pytest.mark.skipif(not has_zstd(), reason="Zstd not available")
+def test_zstd():
+ for _ in range(1000):
+ b1 = random_string(100).encode('utf-8')
+ b2 = zstd_decode(zstd_encode(b1))
+ assert b1 == b2
diff --git a/test/test_producer.py b/test/test_producer.py
index 9605adf..7263130 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -23,16 +23,16 @@ def test_buffer_pool():
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
-@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
+@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_end_to_end(kafka_broker, compression):
-
if compression == 'lz4':
- # LZ4 requires 0.8.2
if env_kafka_version() < (0, 8, 2):
- return
- # python-lz4 crashes on older versions of pypy
+ pytest.skip('LZ4 requires 0.8.2')
elif platform.python_implementation() == 'PyPy':
- return
+ pytest.skip('python-lz4 crashes on older versions of pypy')
+
+ if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
+ pytest.skip('zstd requires kafka 2.1.0 or newer')
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
@@ -81,8 +81,10 @@ def test_kafka_producer_gc_cleanup():
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
-@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
+@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
+ if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
+ pytest.skip('zstd requires 2.1.0 or more')
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
@@ -124,10 +126,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
if headers:
assert record.serialized_header_size == 22
- # generated timestamp case is skipped for broker 0.9 and below
if magic == 0:
- return
-
+ pytest.skip('generated timestamp case is skipped for broker 0.9 and below')
send_time = time.time() * 1000
future = producer.send(
topic,