summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2018-03-18 15:56:47 +0200
committerDana Powers <dana.powers@gmail.com>2018-04-18 13:51:07 -0700
commit908ac8f8d253b20d70e36ce4bae1aefb51769221 (patch)
tree844aca7543b3b57e17b12d5f55f6cffa0e89f73b /test/test_consumer_integration.py
parentd9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0 (diff)
downloadkafka-python-908ac8f8d253b20d70e36ce4bae1aefb51769221.tar.gz
Add codec validators to record parser and builder for all formats (#1447)
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py24
1 files changed, 23 insertions, 1 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index cc036cc..e6f1405 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,6 +1,9 @@
import logging
import os
import time
+from mock import patch
+import pytest
+import kafka.codec
import pytest
from six.moves import xrange
@@ -14,7 +17,7 @@ from kafka import (
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
- KafkaTimeoutError
+ KafkaTimeoutError, UnsupportedCodecError
)
from kafka.structs import (
ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
@@ -27,6 +30,7 @@ from test.testutil import (
send_messages
)
+
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
"""Test KafkaConsumer
@@ -50,6 +54,24 @@ def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
kafka_consumer.close()
+@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+def test_kafka_consumer_unsupported_encoding(
+ topic, kafka_producer_factory, kafka_consumer_factory):
+ # Send a compressed message
+ producer = kafka_producer_factory(compression_type="gzip")
+ fut = producer.send(topic, b"simple message" * 200)
+ fut.get(timeout=5)
+ producer.close()
+
+ # Consume, but with the related compression codec not available
+ with patch.object(kafka.codec, "has_gzip") as mocked:
+ mocked.return_value = False
+ consumer = kafka_consumer_factory(auto_offset_reset='earliest')
+ error_msg = "Libraries for gzip compression codec not found"
+ with pytest.raises(UnsupportedCodecError, match=error_msg):
+ consumer.poll(timeout_ms=2000)
+
+
class TestConsumerIntegration(KafkaIntegrationTestCase):
maxDiff = None