diff options
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 9a7790e..9f76f7f 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -25,20 +25,21 @@ from kafka.structs import ( from test.conftest import version from test.fixtures import ZookeeperFixture, KafkaFixture, random_string -from test.testutil import ( - KafkaIntegrationTestCase, kafka_versions, Timer, - send_messages -) +from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer @pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") -def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): - """Test KafkaConsumer - """ +def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory): + """Test KafkaConsumer""" kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest') - send_messages(simple_client, topic, 0, range(0, 100)) - send_messages(simple_client, topic, 1, range(100, 200)) + # TODO replace this with a `send_messages()` pytest fixture + # as we will likely need this elsewhere + for i in range(0, 100): + kafka_producer.send(topic, partition=0, value=str(i).encode()) + for i in range(100, 200): + kafka_producer.send(topic, partition=1, value=str(i).encode()) + kafka_producer.flush() cnt = 0 messages = {0: set(), 1: set()} |