diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-20 19:19:08 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-20 19:19:08 +0530 |
commit | e690b75590837f24d456bb5e5c766dac8ac093ad (patch) | |
tree | 14e626c39d97a502835fcda0becae94375364531 /kafka/client.py | |
parent | 612dea17c3fca1cb9283c6c60f6def7e76710d4c (diff) | |
download | kafka-python-e690b75590837f24d456bb5e5c766dac8ac093ad.tar.gz |
Make the default case as 'ack on local write'
Also, ensure that the case of 'no-acks' works fine
In conn.send(), do not wait for the response. Wait for it only on
conn.recv(). This behaviour is fine now since the connection is not
shared among consumer threads etc.
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 13 |
1 files changed, 11 insertions, 2 deletions
diff --git a/kafka/client.py b/kafka/client.py index 1146798..b3f8667 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -161,12 +161,16 @@ class KafkaClient(object): # Send the request, recv the response conn.send(requestId, request) + + if decoder_fn is None: + continue + response = conn.recv(requestId) for response in decoder_fn(response): acc[(response.topic, response.partition)] = response # Order the accumulated responses by the original key order - return (acc[k] for k in original_keys) + return (acc[k] for k in original_keys) if acc else () ################# # Public API # @@ -201,7 +205,12 @@ class KafkaClient(object): encoder = partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout) - decoder = KafkaProtocol.decode_produce_response + + if acks == 0: + decoder = None + else: + decoder = KafkaProtocol.decode_produce_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) out = [] |