diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/multiprocess.py | 14 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 15 |
2 files changed, 18 insertions, 11 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index d03eb95..046271b 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -226,10 +226,12 @@ class MultiProcessConsumer(Consumer): Keyword Arguments: count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. + block: If True, the API will block till all messages are fetched. + If block is a positive integer the API will block until that + many messages are fetched. + timeout: When blocking is requested the function will block for + the specified time (in seconds) until count messages is + fetched. If None, it will block forever. """ messages = [] @@ -252,8 +254,10 @@ class MultiProcessConsumer(Consumer): if self.queue.empty(): self.events.start.set() + block_next_call = block is True or block > len(messages) try: - partition, message = self.queue.get(block, timeout) + partition, message = self.queue.get(block_next_call, + timeout) except Empty: break diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 733baa8..6e18290 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -272,10 +272,12 @@ class SimpleConsumer(Consumer): Keyword Arguments: count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. + block: If True, the API will block till all messages are fetched. + If block is a positive integer the API will block until that + many messages are fetched. + timeout: When blocking is requested the function will block for + the specified time (in seconds) until count messages is + fetched. If None, it will block forever. """ messages = [] if timeout is not None: @@ -286,12 +288,13 @@ class SimpleConsumer(Consumer): while len(messages) < count: block_time = timeout - time.time() log.debug('calling _get_message block=%s timeout=%s', block, block_time) - result = self._get_message(block, block_time, + block_next_call = block is True or block > len(messages) + result = self._get_message(block_next_call, block_time, get_partition_info=True, update_offset=False) log.debug('got %s from _get_messages', result) if not result: - if block and (timeout is None or time.time() <= timeout): + if block_next_call and (timeout is None or time.time() <= timeout): continue break |