diff options
author | Taras <voyn1991@gmail.com> | 2018-03-18 15:56:47 +0200 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-04-18 13:51:07 -0700 |
commit | 908ac8f8d253b20d70e36ce4bae1aefb51769221 (patch) | |
tree | 844aca7543b3b57e17b12d5f55f6cffa0e89f73b /test/test_consumer_integration.py | |
parent | d9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0 (diff) | |
download | kafka-python-908ac8f8d253b20d70e36ce4bae1aefb51769221.tar.gz |
Add codec validators to record parser and builder for all formats (#1447)
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 24 |
1 files changed, 23 insertions, 1 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index cc036cc..e6f1405 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,6 +1,9 @@ import logging import os import time +from mock import patch +import pytest +import kafka.codec import pytest from six.moves import xrange @@ -14,7 +17,7 @@ from kafka import ( from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ( ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError, - KafkaTimeoutError + KafkaTimeoutError, UnsupportedCodecError ) from kafka.structs import ( ProduceRequestPayload, TopicPartition, OffsetAndTimestamp @@ -27,6 +30,7 @@ from test.testutil import ( send_messages ) + @pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): """Test KafkaConsumer @@ -50,6 +54,24 @@ def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): kafka_consumer.close() +@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +def test_kafka_consumer_unsupported_encoding( + topic, kafka_producer_factory, kafka_consumer_factory): + # Send a compressed message + producer = kafka_producer_factory(compression_type="gzip") + fut = producer.send(topic, b"simple message" * 200) + fut.get(timeout=5) + producer.close() + + # Consume, but with the related compression codec not available + with patch.object(kafka.codec, "has_gzip") as mocked: + mocked.return_value = False + consumer = kafka_consumer_factory(auto_offset_reset='earliest') + error_msg = "Libraries for gzip compression codec not found" + with pytest.raises(UnsupportedCodecError, match=error_msg): + consumer.poll(timeout_ms=2000) + + class TestConsumerIntegration(KafkaIntegrationTestCase): maxDiff = None |