summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-07 18:51:14 -0800
committerDana Powers <dana.powers@rd.io>2016-01-07 18:51:14 -0800
commit828377377da43749af0d27ee256ef31bf714cf17 (patch)
treefbad4d4381fc4d1ea2be7ce2009214d18fbeb674 /test/test_consumer_integration.py
parent71e7568fcb8132899f366b37c32645fd5a40dc4b (diff)
parent9a8af1499ca425366d934487469d9977fae7fe5f (diff)
downloadkafka-python-828377377da43749af0d27ee256ef31bf714cf17.tar.gz
Merge branch '0.9'
Conflicts: kafka/codec.py kafka/version.py test/test_producer.py test/test_producer_integration.py
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py102
1 files changed, 57 insertions, 45 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index ef9a886..5a578d4 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -7,8 +7,8 @@ from kafka import (
KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message
)
from kafka.common import (
- ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout,
- OffsetOutOfRangeError
+ ProduceRequestPayload, ConsumerFetchSizeTooSmall,
+ OffsetOutOfRangeError, TopicPartition
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
@@ -25,8 +25,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
return
cls.zk = ZookeeperFixture.instance()
- cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
- cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
+ chroot = random_string(10)
+ cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port, chroot)
+ cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port, chroot)
cls.server = cls.server1 # Bootstrapping server
@@ -41,7 +42,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def send_messages(self, partition, messages):
messages = [ create_message(self.msg(str(msg))) for msg in messages ]
- produce = ProduceRequest(self.bytes_topic, partition, messages = messages)
+ produce = ProduceRequestPayload(self.topic, partition, messages = messages)
resp, = self.client.send_produce_request([produce])
self.assertEqual(resp.error, 0)
@@ -60,10 +61,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
kwargs['group'] = None
kwargs['auto_commit'] = False
else:
- kwargs.setdefault('auto_commit', True)
+ kwargs.setdefault('group', None)
+ kwargs.setdefault('auto_commit', False)
consumer_class = kwargs.pop('consumer', SimpleConsumer)
- group = kwargs.pop('group', self.id().encode('utf-8'))
+ group = kwargs.pop('group', None)
topic = kwargs.pop('topic', self.topic)
if consumer_class in [SimpleConsumer, MultiProcessConsumer]:
@@ -134,7 +136,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(1, range(100, 200))
# Create 1st consumer and change offsets
- consumer = self.consumer()
+ consumer = self.consumer(group='test_simple_consumer_load_initial_offsets')
self.assertEqual(consumer.offsets, {0: 0, 1: 0})
consumer.offsets.update({0:51, 1:101})
# Update counter after manual offsets update
@@ -142,7 +144,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.commit()
# Create 2nd consumer and check initial offsets
- consumer = self.consumer(auto_commit=False)
+ consumer = self.consumer(group='test_simple_consumer_load_initial_offsets',
+ auto_commit=False)
self.assertEqual(consumer.offsets, {0: 51, 1: 101})
def test_simple_consumer__seek(self):
@@ -184,13 +187,14 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 0)
self.assertGreaterEqual(t.interval, 1)
- self.send_messages(0, range(0, 10))
+ self.send_messages(0, range(0, 5))
+ self.send_messages(1, range(5, 10))
# Ask for 5 messages, 10 in queue. Get 5 back, no blocking
with Timer() as t:
- messages = consumer.get_messages(count=5, block=True, timeout=5)
+ messages = consumer.get_messages(count=5, block=True, timeout=3)
self.assert_message_count(messages, 5)
- self.assertLessEqual(t.interval, 1)
+ self.assertLess(t.interval, 3)
# Ask for 10 messages, get 5 back, block 1 second
with Timer() as t:
@@ -200,7 +204,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
# second, get 5 back, no blocking
- self.send_messages(0, range(0, 5))
+ self.send_messages(0, range(0, 3))
+ self.send_messages(1, range(3, 5))
with Timer() as t:
messages = consumer.get_messages(count=10, block=1, timeout=1)
self.assert_message_count(messages, 5)
@@ -304,7 +309,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(1, range(10, 20))
# Create 1st consumer and change offsets
- consumer = self.consumer()
+ consumer = self.consumer(group='test_multi_process_consumer_load_initial_offsets')
self.assertEqual(consumer.offsets, {0: 0, 1: 0})
consumer.offsets.update({0:5, 1:15})
# Update counter after manual offsets update
@@ -313,6 +318,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Create 2nd consumer and check initial offsets
consumer = self.consumer(consumer = MultiProcessConsumer,
+ group='test_multi_process_consumer_load_initial_offsets',
auto_commit=False)
self.assertEqual(consumer.offsets, {0: 5, 1: 15})
@@ -369,6 +375,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Start a consumer
consumer1 = self.consumer(
+ group='test_offset_behavior__resuming_behavior',
+ auto_commit=True,
auto_commit_every_t = None,
auto_commit_every_n = 20,
)
@@ -379,6 +387,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# The total offset across both partitions should be at 180
consumer2 = self.consumer(
+ group='test_offset_behavior__resuming_behavior',
+ auto_commit=True,
auto_commit_every_t = None,
auto_commit_every_n = 20,
)
@@ -397,6 +407,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Start a consumer
consumer1 = self.consumer(
consumer=MultiProcessConsumer,
+ group='test_multi_process_offset_behavior__resuming_behavior',
+ auto_commit=True,
auto_commit_every_t = None,
auto_commit_every_n = 20,
)
@@ -414,6 +426,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# The total offset across both partitions should be at 180
consumer2 = self.consumer(
consumer=MultiProcessConsumer,
+ group='test_multi_process_offset_behavior__resuming_behavior',
+ auto_commit=True,
auto_commit_every_t = None,
auto_commit_every_n = 20,
)
@@ -447,11 +461,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(1, range(100, 200))
# Start a consumer
- consumer = self.kafka_consumer(auto_offset_reset='smallest',
- consumer_timeout_ms=5000)
+ consumer = self.kafka_consumer(auto_offset_reset='earliest')
n = 0
messages = {0: set(), 1: set()}
- logging.debug("kafka consumer offsets: %s" % consumer.offsets())
for m in consumer:
logging.debug("Consumed message %s" % repr(m))
n += 1
@@ -464,13 +476,17 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
- consumer = self.kafka_consumer(auto_offset_reset='smallest',
+ consumer = self.kafka_consumer(auto_offset_reset='earliest',
consumer_timeout_ms=TIMEOUT_MS)
+ # Manual assignment avoids overhead of consumer group mgmt
+ consumer.unsubscribe()
+ consumer.assign([TopicPartition(self.topic, 0)])
+
# Ask for 5 messages, nothing in queue, block 500ms
with Timer() as t:
- with self.assertRaises(ConsumerTimeout):
- msg = consumer.next()
+ with self.assertRaises(StopIteration):
+ msg = next(consumer)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
self.send_messages(0, range(0, 10))
@@ -479,7 +495,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = set()
with Timer() as t:
for i in range(5):
- msg = consumer.next()
+ msg = next(consumer)
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
@@ -487,52 +503,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Ask for 10 messages, get 5 back, block 500ms
messages = set()
with Timer() as t:
- with self.assertRaises(ConsumerTimeout):
+ with self.assertRaises(StopIteration):
for i in range(10):
- msg = consumer.next()
+ msg = next(consumer)
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
@kafka_versions('>=0.8.1')
def test_kafka_consumer__offset_commit_resume(self):
- GROUP_ID = random_string(10).encode('utf-8')
+ GROUP_ID = random_string(10)
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
# Start a consumer
consumer1 = self.kafka_consumer(
- group_id = GROUP_ID,
- auto_commit_enable = True,
- auto_commit_interval_ms = None,
- auto_commit_interval_messages = 20,
- auto_offset_reset='smallest',
+ group_id=GROUP_ID,
+ enable_auto_commit=True,
+ auto_commit_interval_ms=100,
+ auto_offset_reset='earliest',
)
- # Grab the first 195 messages
+ # Grab the first 180 messages
output_msgs1 = []
- for _ in xrange(195):
- m = consumer1.next()
+ for _ in xrange(180):
+ m = next(consumer1)
output_msgs1.append(m)
- consumer1.task_done(m)
- self.assert_message_count(output_msgs1, 195)
+ self.assert_message_count(output_msgs1, 180)
+ consumer1.close()
# The total offset across both partitions should be at 180
consumer2 = self.kafka_consumer(
- group_id = GROUP_ID,
- auto_commit_enable = True,
- auto_commit_interval_ms = None,
- auto_commit_interval_messages = 20,
- consumer_timeout_ms = 100,
- auto_offset_reset='smallest',
+ group_id=GROUP_ID,
+ enable_auto_commit=True,
+ auto_commit_interval_ms=100,
+ auto_offset_reset='earliest',
)
# 181-200
output_msgs2 = []
- with self.assertRaises(ConsumerTimeout):
- while True:
- m = consumer2.next()
- output_msgs2.append(m)
+ for _ in xrange(20):
+ m = next(consumer2)
+ output_msgs2.append(m)
self.assert_message_count(output_msgs2, 20)
- self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)
+ self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)