diff options
author | Tincu Gabriel <gabi@aiven.io> | 2020-09-08 01:11:18 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-07 16:11:18 -0700 |
commit | a27ab881726ed1a2d952867a1fa266573165d6aa (patch) | |
tree | 918d904583960e760dfaeaf89d813e789782cdb2 /test | |
parent | 08ea21167e3d6e9577d16715eadc9829bd8c1a80 (diff) | |
download | kafka-python-a27ab881726ed1a2d952867a1fa266573165d6aa.tar.gz |
Add support for `zstd` compression (#2021)
Diffstat (limited to 'test')
-rw-r--r-- | test/test_codec.py | 11 | ||||
-rw-r--r-- | test/test_producer.py | 20 |
2 files changed, 20 insertions, 11 deletions
diff --git a/test/test_codec.py b/test/test_codec.py index 9eff888..e057074 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -7,11 +7,12 @@ import pytest from kafka.vendor.six.moves import range from kafka.codec import ( - has_snappy, has_lz4, + has_snappy, has_lz4, has_zstd, gzip_encode, gzip_decode, snappy_encode, snappy_decode, lz4_encode, lz4_decode, lz4_encode_old_kafka, lz4_decode_old_kafka, + zstd_encode, zstd_decode, ) from test.testutil import random_string @@ -113,3 +114,11 @@ def test_lz4_incremental(): b2 = lz4_decode(lz4_encode(b1)) assert len(b1) == len(b2) assert b1 == b2 + + +@pytest.mark.skipif(not has_zstd(), reason="Zstd not available") +def test_zstd(): + for _ in range(1000): + b1 = random_string(100).encode('utf-8') + b2 = zstd_decode(zstd_encode(b1)) + assert b1 == b2 diff --git a/test/test_producer.py b/test/test_producer.py index 9605adf..7263130 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -23,16 +23,16 @@ def test_buffer_pool(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_end_to_end(kafka_broker, compression): - if compression == 'lz4': - # LZ4 requires 0.8.2 if env_kafka_version() < (0, 8, 2): - return - # python-lz4 crashes on older versions of pypy + pytest.skip('LZ4 requires 0.8.2') elif platform.python_implementation() == 'PyPy': - return + pytest.skip('python-lz4 crashes on older versions of pypy') + + if compression == 'zstd' and env_kafka_version() < (2, 1, 0): + pytest.skip('zstd requires kafka 2.1.0 or newer') connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, @@ -81,8 +81,10 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): + if compression == 'zstd' and env_kafka_version() < (2, 1, 0): + pytest.skip('zstd requires 2.1.0 or more') connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, @@ -124,10 +126,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if headers: assert record.serialized_header_size == 22 - # generated timestamp case is skipped for broker 0.9 and below if magic == 0: - return - + pytest.skip('generated timestamp case is skipped for broker 0.9 and below') send_time = time.time() * 1000 future = producer.send( topic, |