diff options
author | Oskari Saarenmaa <os@ohmu.fi> | 2015-09-18 14:06:51 +0300 |
---|---|---|
committer | Oskari Saarenmaa <os@ohmu.fi> | 2015-09-18 14:06:51 +0300 |
commit | e74a8ba4942891c62ef35f70472f10ee067f89b6 (patch) | |
tree | 187e24f4eb26a8a7b6933f4c9d39a7b5aa4a21ae /test/test_consumer_integration.py | |
parent | b525e1a8d63e4fcb0ede43c05739bc84c85cc79c (diff) | |
download | kafka-python-e74a8ba4942891c62ef35f70472f10ee067f89b6.tar.gz |
Consumers get_messages: allow blocking until some messages are received
Modified MultiProcessConsumer's and SimpleConsumer's `block` argument to
allow integer value which defines the number of messages to block for.
This allows callers to ask for a high number of messages and block only
until some of them are received. Otherwise callers would have to request
messages one by one or block for some time.
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 18 |
1 files changed, 18 insertions, 0 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 52b3e85..fee53f5 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -204,6 +204,14 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertGreaterEqual(t.interval, 1) + # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 + # second, get 5 back, no blocking + self.send_messages(0, range(0, 5)) + with Timer() as t: + messages = consumer.get_messages(count=10, block=1, timeout=1) + self.assert_message_count(messages, 5) + self.assertLessEqual(t.interval, 1) + consumer.stop() @kafka_versions("all") @@ -272,6 +280,16 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertGreaterEqual(t.interval, 1) + # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 + # second, get at least one back, no blocking + self.send_messages(0, range(0, 5)) + with Timer() as t: + messages = consumer.get_messages(count=10, block=1, timeout=1) + received_message_count = len(messages) + self.assertGreaterEqual(received_message_count, 1) + self.assert_message_count(messages, received_message_count) + self.assertLessEqual(t.interval, 1) + consumer.stop() @kafka_versions("all") |