summaryrefslogtreecommitdiff
path: root/kafka/consumer/multiprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/multiprocess.py')
-rw-r--r--kafka/consumer/multiprocess.py14
1 files changed, 9 insertions, 5 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index d03eb95..046271b 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -226,10 +226,12 @@ class MultiProcessConsumer(Consumer):
Keyword Arguments:
count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
+ block: If True, the API will block till all messages are fetched.
+ If block is a positive integer the API will block until that
+ many messages are fetched.
+ timeout: When blocking is requested the function will block for
+ the specified time (in seconds) until count messages is
+ fetched. If None, it will block forever.
"""
messages = []
@@ -252,8 +254,10 @@ class MultiProcessConsumer(Consumer):
if self.queue.empty():
self.events.start.set()
+ block_next_call = block is True or block > len(messages)
try:
- partition, message = self.queue.get(block, timeout)
+ partition, message = self.queue.get(block_next_call,
+ timeout)
except Empty:
break