diff options
author | Andre Araujo <asdaraujo@gmail.com> | 2017-11-15 06:08:29 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2018-02-21 13:30:12 -0800 |
commit | a1869c4be5f47b4f6433610249aaf29af4ec95e5 (patch) | |
tree | c18b155f5a3b812ed69a2f3a7d0499628cd87694 /test/test_consumer_integration.py | |
parent | 0f5d35fa3489fa36000c05a891d375cc30672e23 (diff) | |
download | kafka-python-a1869c4be5f47b4f6433610249aaf29af4ec95e5.tar.gz |
Introduce new fixtures to prepare for migration to pytest.
This commits adds new pytest fixtures in prepation for the
migration of unittest.TestCases to pytest test cases. The handling
of temporary dir creation was also changed so that we can use
the pytest tmpdir fixture after the migration.
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 40eec14..fe4e454 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -21,9 +21,30 @@ from kafka.structs import ( from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( - KafkaIntegrationTestCase, kafka_versions, random_string, Timer + KafkaIntegrationTestCase, kafka_versions, random_string, Timer, + send_messages ) +def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): + """Test KafkaConsumer + """ + kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest') + + send_messages(simple_client, topic, 0, range(0, 100)) + send_messages(simple_client, topic, 1, range(100, 200)) + + cnt = 0 + messages = {0: set(), 1: set()} + for message in kafka_consumer: + logging.debug("Consumed message %s", repr(message)) + cnt += 1 + messages[message.partition].add(message.offset) + if cnt >= 200: + break + + assert len(messages[0]) == 100 + assert len(messages[1]) == 100 + class TestConsumerIntegration(KafkaIntegrationTestCase): maxDiff = None @@ -35,9 +56,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): cls.zk = ZookeeperFixture.instance() chroot = random_string(10) - cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port, + cls.server1 = KafkaFixture.instance(0, cls.zk, zk_chroot=chroot) - cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port, + cls.server2 = KafkaFixture.instance(1, cls.zk, zk_chroot=chroot) cls.server = cls.server1 # Bootstrapping server @@ -501,24 +522,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): messages = [ message for message in consumer ] self.assertEqual(len(messages), 2) - def test_kafka_consumer(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Start a consumer - consumer = self.kafka_consumer(auto_offset_reset='earliest') - n = 0 - messages = {0: set(), 1: set()} - for m in consumer: - logging.debug("Consumed message %s" % repr(m)) - n += 1 - messages[m.partition].add(m.offset) - if n >= 200: - break - - self.assertEqual(len(messages[0]), 100) - self.assertEqual(len(messages[1]), 100) - def test_kafka_consumer__blocking(self): TIMEOUT_MS = 500 consumer = self.kafka_consumer(auto_offset_reset='earliest', |