diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-25 17:01:32 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-25 17:01:32 +0530 |
commit | b022be28de85634a642cbd4e0ca4ce89d46d21dd (patch) | |
tree | a1fa620452eb9b025ea88a71afa6aa8586559057 /kafka/client.py | |
parent | 2e38a5273270df8959279973cbac69e5658ec9a9 (diff) | |
download | kafka-python-b022be28de85634a642cbd4e0ca4ce89d46d21dd.tar.gz |
Implement blocking get_messages for SimpleConsumer
The implementation is done by using simple options to
Kafka Fetch Request
Also in the SimpleConsumer iterator, update the offset before the
message is yielded. This is so that the consumer state is not lost
if certain cases.
For eg: the message is yielded and consumed by the caller,
but the caller does not come back into the generator again.
The message will be consumed but the status is not updated in
the consumer
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 10 |
1 files changed, 7 insertions, 3 deletions
diff --git a/kafka/client.py b/kafka/client.py index 1146798..a1c2133 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -221,15 +221,19 @@ class KafkaClient(object): return out def send_fetch_request(self, payloads=[], fail_on_error=True, - callback=None): + callback=None, max_wait_time=100, min_bytes=4096): """ Encode and send a FetchRequest Payloads are grouped by topic and partition so they can be pipelined to the same brokers. """ - resps = self._send_broker_aware_request(payloads, - KafkaProtocol.encode_fetch_request, + + encoder = partial(KafkaProtocol.encode_fetch_request, + max_wait_time=max_wait_time, + min_bytes=min_bytes) + + resps = self._send_broker_aware_request(payloads, encoder, KafkaProtocol.decode_fetch_response) out = [] |