summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py13
1 files changed, 11 insertions, 2 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 1146798..b3f8667 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -161,12 +161,16 @@ class KafkaClient(object):
# Send the request, recv the response
conn.send(requestId, request)
+
+ if decoder_fn is None:
+ continue
+
response = conn.recv(requestId)
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
# Order the accumulated responses by the original key order
- return (acc[k] for k in original_keys)
+ return (acc[k] for k in original_keys) if acc else ()
#################
# Public API #
@@ -201,7 +205,12 @@ class KafkaClient(object):
encoder = partial(KafkaProtocol.encode_produce_request,
acks=acks, timeout=timeout)
- decoder = KafkaProtocol.decode_produce_response
+
+ if acks == 0:
+ decoder = None
+ else:
+ decoder = KafkaProtocol.decode_produce_response
+
resps = self._send_broker_aware_request(payloads, encoder, decoder)
out = []