summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-25 17:01:32 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-25 17:01:32 +0530
commitb022be28de85634a642cbd4e0ca4ce89d46d21dd (patch)
treea1fa620452eb9b025ea88a71afa6aa8586559057 /kafka/client.py
parent2e38a5273270df8959279973cbac69e5658ec9a9 (diff)
downloadkafka-python-b022be28de85634a642cbd4e0ca4ce89d46d21dd.tar.gz
Implement blocking get_messages for SimpleConsumer
The implementation is done by using simple options to Kafka Fetch Request Also in the SimpleConsumer iterator, update the offset before the message is yielded. This is so that the consumer state is not lost if certain cases. For eg: the message is yielded and consumed by the caller, but the caller does not come back into the generator again. The message will be consumed but the status is not updated in the consumer
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py10
1 files changed, 7 insertions, 3 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 1146798..a1c2133 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -221,15 +221,19 @@ class KafkaClient(object):
return out
def send_fetch_request(self, payloads=[], fail_on_error=True,
- callback=None):
+ callback=None, max_wait_time=100, min_bytes=4096):
"""
Encode and send a FetchRequest
Payloads are grouped by topic and partition so they can be pipelined
to the same brokers.
"""
- resps = self._send_broker_aware_request(payloads,
- KafkaProtocol.encode_fetch_request,
+
+ encoder = partial(KafkaProtocol.encode_fetch_request,
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes)
+
+ resps = self._send_broker_aware_request(payloads, encoder,
KafkaProtocol.decode_fetch_response)
out = []