summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorAndre Araujo <asdaraujo@gmail.com>2017-11-15 06:08:29 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-02-21 13:30:12 -0800
commita1869c4be5f47b4f6433610249aaf29af4ec95e5 (patch)
treec18b155f5a3b812ed69a2f3a7d0499628cd87694 /test/test_consumer_integration.py
parent0f5d35fa3489fa36000c05a891d375cc30672e23 (diff)
downloadkafka-python-a1869c4be5f47b4f6433610249aaf29af4ec95e5.tar.gz
Introduce new fixtures to prepare for migration to pytest.
This commits adds new pytest fixtures in prepation for the migration of unittest.TestCases to pytest test cases. The handling of temporary dir creation was also changed so that we can use the pytest tmpdir fixture after the migration.
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py45
1 files changed, 24 insertions, 21 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 40eec14..fe4e454 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -21,9 +21,30 @@ from kafka.structs import (
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
- KafkaIntegrationTestCase, kafka_versions, random_string, Timer
+ KafkaIntegrationTestCase, kafka_versions, random_string, Timer,
+ send_messages
)
+def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
+ """Test KafkaConsumer
+ """
+ kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')
+
+ send_messages(simple_client, topic, 0, range(0, 100))
+ send_messages(simple_client, topic, 1, range(100, 200))
+
+ cnt = 0
+ messages = {0: set(), 1: set()}
+ for message in kafka_consumer:
+ logging.debug("Consumed message %s", repr(message))
+ cnt += 1
+ messages[message.partition].add(message.offset)
+ if cnt >= 200:
+ break
+
+ assert len(messages[0]) == 100
+ assert len(messages[1]) == 100
+
class TestConsumerIntegration(KafkaIntegrationTestCase):
maxDiff = None
@@ -35,9 +56,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
cls.zk = ZookeeperFixture.instance()
chroot = random_string(10)
- cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port,
+ cls.server1 = KafkaFixture.instance(0, cls.zk,
zk_chroot=chroot)
- cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port,
+ cls.server2 = KafkaFixture.instance(1, cls.zk,
zk_chroot=chroot)
cls.server = cls.server1 # Bootstrapping server
@@ -501,24 +522,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ message for message in consumer ]
self.assertEqual(len(messages), 2)
- def test_kafka_consumer(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer = self.kafka_consumer(auto_offset_reset='earliest')
- n = 0
- messages = {0: set(), 1: set()}
- for m in consumer:
- logging.debug("Consumed message %s" % repr(m))
- n += 1
- messages[m.partition].add(m.offset)
- if n >= 200:
- break
-
- self.assertEqual(len(messages[0]), 100)
- self.assertEqual(len(messages[1]), 100)
-
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='earliest',