diff options
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 257 |
1 files changed, 257 insertions, 0 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py new file mode 100644 index 0000000..a6589b3 --- /dev/null +++ b/test/test_consumer_integration.py @@ -0,0 +1,257 @@ +import os +from datetime import datetime + +from kafka import * # noqa +from kafka.common import * # noqa +from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES +from fixtures import ZookeeperFixture, KafkaFixture +from testutil import * + +class TestConsumerIntegration(KafkaIntegrationTestCase): + @classmethod + def setUpClass(cls): + if not os.environ.get('KAFKA_VERSION'): + return + + cls.zk = ZookeeperFixture.instance() + cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) + cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) + + cls.server = cls.server1 # Bootstrapping server + + @classmethod + def tearDownClass(cls): + if not os.environ.get('KAFKA_VERSION'): + return + + cls.server1.close() + cls.server2.close() + cls.zk.close() + + def send_messages(self, partition, messages): + messages = [ create_message(self.msg(str(msg))) for msg in messages ] + produce = ProduceRequest(self.topic, partition, messages = messages) + resp, = self.client.send_produce_request([produce]) + self.assertEquals(resp.error, 0) + + return [ x.value for x in messages ] + + def assert_message_count(self, messages, num_messages): + # Make sure we got them all + self.assertEquals(len(messages), num_messages) + + # Make sure there are no duplicates + self.assertEquals(len(set(messages)), num_messages) + + @kafka_versions("all") + def test_simple_consumer(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Start a consumer + consumer = self.consumer() + + self.assert_message_count([ message for message in consumer ], 200) + + consumer.stop() + + @kafka_versions("all") + def test_simple_consumer__seek(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + consumer = self.consumer() + + # Rewind 10 messages from the end + consumer.seek(-10, 2) + self.assert_message_count([ message for message in consumer ], 10) + + # Rewind 13 messages from the end + consumer.seek(-13, 2) + self.assert_message_count([ message for message in consumer ], 13) + + consumer.stop() + + @kafka_versions("all") + def test_simple_consumer_blocking(self): + consumer = self.consumer() + + # Ask for 5 messages, nothing in queue, block 5 seconds + with Timer() as t: + messages = consumer.get_messages(block=True, timeout=5) + self.assert_message_count(messages, 0) + self.assertGreaterEqual(t.interval, 5) + + self.send_messages(0, range(0, 10)) + + # Ask for 5 messages, 10 in queue. Get 5 back, no blocking + with Timer() as t: + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assert_message_count(messages, 5) + self.assertLessEqual(t.interval, 1) + + # Ask for 10 messages, get 5 back, block 5 seconds + with Timer() as t: + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assert_message_count(messages, 5) + self.assertGreaterEqual(t.interval, 5) + + consumer.stop() + + @kafka_versions("all") + def test_simple_consumer_pending(self): + # Produce 10 messages to partitions 0 and 1 + self.send_messages(0, range(0, 10)) + self.send_messages(1, range(10, 20)) + + consumer = self.consumer() + + self.assertEquals(consumer.pending(), 20) + self.assertEquals(consumer.pending(partitions=[0]), 10) + self.assertEquals(consumer.pending(partitions=[1]), 10) + + consumer.stop() + + @kafka_versions("all") + def test_multi_process_consumer(self): + # Produce 100 messages to partitions 0 and 1 + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + consumer = self.consumer(consumer = MultiProcessConsumer) + + self.assert_message_count([ message for message in consumer ], 200) + + consumer.stop() + + @kafka_versions("all") + def test_multi_process_consumer_blocking(self): + consumer = self.consumer(consumer = MultiProcessConsumer) + + # Ask for 5 messages, No messages in queue, block 5 seconds + with Timer() as t: + messages = consumer.get_messages(block=True, timeout=5) + self.assert_message_count(messages, 0) + + self.assertGreaterEqual(t.interval, 5) + + # Send 10 messages + self.send_messages(0, range(0, 10)) + + # Ask for 5 messages, 10 messages in queue, block 0 seconds + with Timer() as t: + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assert_message_count(messages, 5) + self.assertLessEqual(t.interval, 1) + + # Ask for 10 messages, 5 in queue, block 5 seconds + with Timer() as t: + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assert_message_count(messages, 5) + self.assertGreaterEqual(t.interval, 5) + + consumer.stop() + + @kafka_versions("all") + def test_multi_proc_pending(self): + self.send_messages(0, range(0, 10)) + self.send_messages(1, range(10, 20)) + + consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False) + + self.assertEquals(consumer.pending(), 20) + self.assertEquals(consumer.pending(partitions=[0]), 10) + self.assertEquals(consumer.pending(partitions=[1]), 10) + + consumer.stop() + + @kafka_versions("all") + def test_large_messages(self): + # Produce 10 "normal" size messages + small_messages = self.send_messages(0, [ str(x) for x in range(10) ]) + + # Produce 10 messages that are large (bigger than default fetch size) + large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ]) + + # Consumer should still get all of them + consumer = self.consumer() + + expected_messages = set(small_messages + large_messages) + actual_messages = set([ x.message.value for x in consumer ]) + self.assertEqual(expected_messages, actual_messages) + + consumer.stop() + + @kafka_versions("all") + def test_huge_messages(self): + huge_message, = self.send_messages(0, [ + create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)), + ]) + + # Create a consumer with the default buffer size + consumer = self.consumer() + + # This consumer failes to get the message + with self.assertRaises(ConsumerFetchSizeTooSmall): + consumer.get_message(False, 0.1) + + consumer.stop() + + # Create a consumer with no fetch size limit + big_consumer = self.consumer( + max_buffer_size = None, + partitions = [0], + ) + + # Seek to the last message + big_consumer.seek(-1, 2) + + # Consume giant message successfully + message = big_consumer.get_message(block=False, timeout=10) + self.assertIsNotNone(message) + self.assertEquals(message.message.value, huge_message) + + big_consumer.stop() + + @kafka_versions("0.8.1") + def test_offset_behavior__resuming_behavior(self): + msgs1 = self.send_messages(0, range(0, 100)) + msgs2 = self.send_messages(1, range(100, 200)) + + # Start a consumer + consumer1 = self.consumer( + auto_commit_every_t = None, + auto_commit_every_n = 20, + ) + + # Grab the first 195 messages + output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ] + self.assert_message_count(output_msgs1, 195) + + # The total offset across both partitions should be at 180 + consumer2 = self.consumer( + auto_commit_every_t = None, + auto_commit_every_n = 20, + ) + + # 181-200 + self.assert_message_count([ message for message in consumer2 ], 20) + + consumer1.stop() + consumer2.stop() + + def consumer(self, **kwargs): + if os.environ['KAFKA_VERSION'] == "0.8.0": + # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off + kwargs['auto_commit'] = False + else: + kwargs.setdefault('auto_commit', True) + + consumer_class = kwargs.pop('consumer', SimpleConsumer) + group = kwargs.pop('group', self.id()) + topic = kwargs.pop('topic', self.topic) + + if consumer_class == SimpleConsumer: + kwargs.setdefault('iter_timeout', 0) + + return consumer_class(self.client, group, topic, **kwargs) |