From a1869c4be5f47b4f6433610249aaf29af4ec95e5 Mon Sep 17 00:00:00 2001 From: Andre Araujo Date: Wed, 15 Nov 2017 06:08:29 -0800 Subject: Introduce new fixtures to prepare for migration to pytest. This commits adds new pytest fixtures in prepation for the migration of unittest.TestCases to pytest test cases. The handling of temporary dir creation was also changed so that we can use the pytest tmpdir fixture after the migration. --- test/test_consumer_integration.py | 45 +++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 21 deletions(-) (limited to 'test/test_consumer_integration.py') diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 40eec14..fe4e454 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -21,9 +21,30 @@ from kafka.structs import ( from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( - KafkaIntegrationTestCase, kafka_versions, random_string, Timer + KafkaIntegrationTestCase, kafka_versions, random_string, Timer, + send_messages ) +def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): + """Test KafkaConsumer + """ + kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest') + + send_messages(simple_client, topic, 0, range(0, 100)) + send_messages(simple_client, topic, 1, range(100, 200)) + + cnt = 0 + messages = {0: set(), 1: set()} + for message in kafka_consumer: + logging.debug("Consumed message %s", repr(message)) + cnt += 1 + messages[message.partition].add(message.offset) + if cnt >= 200: + break + + assert len(messages[0]) == 100 + assert len(messages[1]) == 100 + class TestConsumerIntegration(KafkaIntegrationTestCase): maxDiff = None @@ -35,9 +56,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): cls.zk = ZookeeperFixture.instance() chroot = random_string(10) - cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port, + cls.server1 = KafkaFixture.instance(0, cls.zk, zk_chroot=chroot) - cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port, + cls.server2 = KafkaFixture.instance(1, cls.zk, zk_chroot=chroot) cls.server = cls.server1 # Bootstrapping server @@ -501,24 +522,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): messages = [ message for message in consumer ] self.assertEqual(len(messages), 2) - def test_kafka_consumer(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Start a consumer - consumer = self.kafka_consumer(auto_offset_reset='earliest') - n = 0 - messages = {0: set(), 1: set()} - for m in consumer: - logging.debug("Consumed message %s" % repr(m)) - n += 1 - messages[m.partition].add(m.offset) - if n >= 200: - break - - self.assertEqual(len(messages[0]), 100) - self.assertEqual(len(messages[1]), 100) - def test_kafka_consumer__blocking(self): TIMEOUT_MS = 500 consumer = self.kafka_consumer(auto_offset_reset='earliest', -- cgit v1.2.1