diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-07 18:51:14 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-07 18:51:14 -0800 |
commit | 828377377da43749af0d27ee256ef31bf714cf17 (patch) | |
tree | fbad4d4381fc4d1ea2be7ce2009214d18fbeb674 /test/test_consumer_integration.py | |
parent | 71e7568fcb8132899f366b37c32645fd5a40dc4b (diff) | |
parent | 9a8af1499ca425366d934487469d9977fae7fe5f (diff) | |
download | kafka-python-828377377da43749af0d27ee256ef31bf714cf17.tar.gz |
Merge branch '0.9'
Conflicts:
kafka/codec.py
kafka/version.py
test/test_producer.py
test/test_producer_integration.py
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 102 |
1 files changed, 57 insertions, 45 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index ef9a886..5a578d4 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -7,8 +7,8 @@ from kafka import ( KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message ) from kafka.common import ( - ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, - OffsetOutOfRangeError + ProduceRequestPayload, ConsumerFetchSizeTooSmall, + OffsetOutOfRangeError, TopicPartition ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES @@ -25,8 +25,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): return cls.zk = ZookeeperFixture.instance() - cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) - cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) + chroot = random_string(10) + cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port, chroot) + cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port, chroot) cls.server = cls.server1 # Bootstrapping server @@ -41,7 +42,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def send_messages(self, partition, messages): messages = [ create_message(self.msg(str(msg))) for msg in messages ] - produce = ProduceRequest(self.bytes_topic, partition, messages = messages) + produce = ProduceRequestPayload(self.topic, partition, messages = messages) resp, = self.client.send_produce_request([produce]) self.assertEqual(resp.error, 0) @@ -60,10 +61,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): kwargs['group'] = None kwargs['auto_commit'] = False else: - kwargs.setdefault('auto_commit', True) + kwargs.setdefault('group', None) + kwargs.setdefault('auto_commit', False) consumer_class = kwargs.pop('consumer', SimpleConsumer) - group = kwargs.pop('group', self.id().encode('utf-8')) + group = kwargs.pop('group', None) topic = kwargs.pop('topic', self.topic) if consumer_class in [SimpleConsumer, MultiProcessConsumer]: @@ -134,7 +136,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(1, range(100, 200)) # Create 1st consumer and change offsets - consumer = self.consumer() + consumer = self.consumer(group='test_simple_consumer_load_initial_offsets') self.assertEqual(consumer.offsets, {0: 0, 1: 0}) consumer.offsets.update({0:51, 1:101}) # Update counter after manual offsets update @@ -142,7 +144,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.commit() # Create 2nd consumer and check initial offsets - consumer = self.consumer(auto_commit=False) + consumer = self.consumer(group='test_simple_consumer_load_initial_offsets', + auto_commit=False) self.assertEqual(consumer.offsets, {0: 51, 1: 101}) def test_simple_consumer__seek(self): @@ -184,13 +187,14 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 0) self.assertGreaterEqual(t.interval, 1) - self.send_messages(0, range(0, 10)) + self.send_messages(0, range(0, 5)) + self.send_messages(1, range(5, 10)) # Ask for 5 messages, 10 in queue. Get 5 back, no blocking with Timer() as t: - messages = consumer.get_messages(count=5, block=True, timeout=5) + messages = consumer.get_messages(count=5, block=True, timeout=3) self.assert_message_count(messages, 5) - self.assertLessEqual(t.interval, 1) + self.assertLess(t.interval, 3) # Ask for 10 messages, get 5 back, block 1 second with Timer() as t: @@ -200,7 +204,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 # second, get 5 back, no blocking - self.send_messages(0, range(0, 5)) + self.send_messages(0, range(0, 3)) + self.send_messages(1, range(3, 5)) with Timer() as t: messages = consumer.get_messages(count=10, block=1, timeout=1) self.assert_message_count(messages, 5) @@ -304,7 +309,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(1, range(10, 20)) # Create 1st consumer and change offsets - consumer = self.consumer() + consumer = self.consumer(group='test_multi_process_consumer_load_initial_offsets') self.assertEqual(consumer.offsets, {0: 0, 1: 0}) consumer.offsets.update({0:5, 1:15}) # Update counter after manual offsets update @@ -313,6 +318,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Create 2nd consumer and check initial offsets consumer = self.consumer(consumer = MultiProcessConsumer, + group='test_multi_process_consumer_load_initial_offsets', auto_commit=False) self.assertEqual(consumer.offsets, {0: 5, 1: 15}) @@ -369,6 +375,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Start a consumer consumer1 = self.consumer( + group='test_offset_behavior__resuming_behavior', + auto_commit=True, auto_commit_every_t = None, auto_commit_every_n = 20, ) @@ -379,6 +387,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # The total offset across both partitions should be at 180 consumer2 = self.consumer( + group='test_offset_behavior__resuming_behavior', + auto_commit=True, auto_commit_every_t = None, auto_commit_every_n = 20, ) @@ -397,6 +407,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Start a consumer consumer1 = self.consumer( consumer=MultiProcessConsumer, + group='test_multi_process_offset_behavior__resuming_behavior', + auto_commit=True, auto_commit_every_t = None, auto_commit_every_n = 20, ) @@ -414,6 +426,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # The total offset across both partitions should be at 180 consumer2 = self.consumer( consumer=MultiProcessConsumer, + group='test_multi_process_offset_behavior__resuming_behavior', + auto_commit=True, auto_commit_every_t = None, auto_commit_every_n = 20, ) @@ -447,11 +461,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(1, range(100, 200)) # Start a consumer - consumer = self.kafka_consumer(auto_offset_reset='smallest', - consumer_timeout_ms=5000) + consumer = self.kafka_consumer(auto_offset_reset='earliest') n = 0 messages = {0: set(), 1: set()} - logging.debug("kafka consumer offsets: %s" % consumer.offsets()) for m in consumer: logging.debug("Consumed message %s" % repr(m)) n += 1 @@ -464,13 +476,17 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def test_kafka_consumer__blocking(self): TIMEOUT_MS = 500 - consumer = self.kafka_consumer(auto_offset_reset='smallest', + consumer = self.kafka_consumer(auto_offset_reset='earliest', consumer_timeout_ms=TIMEOUT_MS) + # Manual assignment avoids overhead of consumer group mgmt + consumer.unsubscribe() + consumer.assign([TopicPartition(self.topic, 0)]) + # Ask for 5 messages, nothing in queue, block 500ms with Timer() as t: - with self.assertRaises(ConsumerTimeout): - msg = consumer.next() + with self.assertRaises(StopIteration): + msg = next(consumer) self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) self.send_messages(0, range(0, 10)) @@ -479,7 +495,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): messages = set() with Timer() as t: for i in range(5): - msg = consumer.next() + msg = next(consumer) messages.add((msg.partition, msg.offset)) self.assertEqual(len(messages), 5) self.assertLess(t.interval, TIMEOUT_MS / 1000.0 ) @@ -487,52 +503,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Ask for 10 messages, get 5 back, block 500ms messages = set() with Timer() as t: - with self.assertRaises(ConsumerTimeout): + with self.assertRaises(StopIteration): for i in range(10): - msg = consumer.next() + msg = next(consumer) messages.add((msg.partition, msg.offset)) self.assertEqual(len(messages), 5) self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) @kafka_versions('>=0.8.1') def test_kafka_consumer__offset_commit_resume(self): - GROUP_ID = random_string(10).encode('utf-8') + GROUP_ID = random_string(10) self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) # Start a consumer consumer1 = self.kafka_consumer( - group_id = GROUP_ID, - auto_commit_enable = True, - auto_commit_interval_ms = None, - auto_commit_interval_messages = 20, - auto_offset_reset='smallest', + group_id=GROUP_ID, + enable_auto_commit=True, + auto_commit_interval_ms=100, + auto_offset_reset='earliest', ) - # Grab the first 195 messages + # Grab the first 180 messages output_msgs1 = [] - for _ in xrange(195): - m = consumer1.next() + for _ in xrange(180): + m = next(consumer1) output_msgs1.append(m) - consumer1.task_done(m) - self.assert_message_count(output_msgs1, 195) + self.assert_message_count(output_msgs1, 180) + consumer1.close() # The total offset across both partitions should be at 180 consumer2 = self.kafka_consumer( - group_id = GROUP_ID, - auto_commit_enable = True, - auto_commit_interval_ms = None, - auto_commit_interval_messages = 20, - consumer_timeout_ms = 100, - auto_offset_reset='smallest', + group_id=GROUP_ID, + enable_auto_commit=True, + auto_commit_interval_ms=100, + auto_offset_reset='earliest', ) # 181-200 output_msgs2 = [] - with self.assertRaises(ConsumerTimeout): - while True: - m = consumer2.next() - output_msgs2.append(m) + for _ in xrange(20): + m = next(consumer2) + output_msgs2.append(m) self.assert_message_count(output_msgs2, 20) - self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15) + self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200) |