diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 13 |
1 files changed, 11 insertions, 2 deletions
diff --git a/kafka/client.py b/kafka/client.py index 1146798..b3f8667 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -161,12 +161,16 @@ class KafkaClient(object): # Send the request, recv the response conn.send(requestId, request) + + if decoder_fn is None: + continue + response = conn.recv(requestId) for response in decoder_fn(response): acc[(response.topic, response.partition)] = response # Order the accumulated responses by the original key order - return (acc[k] for k in original_keys) + return (acc[k] for k in original_keys) if acc else () ################# # Public API # @@ -201,7 +205,12 @@ class KafkaClient(object): encoder = partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout) - decoder = KafkaProtocol.decode_produce_response + + if acks == 0: + decoder = None + else: + decoder = KafkaProtocol.decode_produce_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) out = [] |