summaryrefslogtreecommitdiff
path: root/test/test_producer.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_producer.py')
-rw-r--r--test/test_producer.py20
1 files changed, 10 insertions, 10 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index 9605adf..7263130 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -23,16 +23,16 @@ def test_buffer_pool():
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
-@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
+@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_end_to_end(kafka_broker, compression):
-
if compression == 'lz4':
- # LZ4 requires 0.8.2
if env_kafka_version() < (0, 8, 2):
- return
- # python-lz4 crashes on older versions of pypy
+ pytest.skip('LZ4 requires 0.8.2')
elif platform.python_implementation() == 'PyPy':
- return
+ pytest.skip('python-lz4 crashes on older versions of pypy')
+
+ if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
+ pytest.skip('zstd requires kafka 2.1.0 or newer')
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
@@ -81,8 +81,10 @@ def test_kafka_producer_gc_cleanup():
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
-@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
+@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
+ if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
+ pytest.skip('zstd requires 2.1.0 or more')
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
@@ -124,10 +126,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
if headers:
assert record.serialized_header_size == 22
- # generated timestamp case is skipped for broker 0.9 and below
if magic == 0:
- return
-
+ pytest.skip('generated timestamp case is skipped for broker 0.9 and below')
send_time = time.time() * 1000
future = producer.send(
topic,