diff options
author | David Arthur <mumrah@gmail.com> | 2013-07-11 05:58:54 -0700 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-07-11 05:58:54 -0700 |
commit | 5684af438e6cf871540aa8ea8b556737f56e9798 (patch) | |
tree | f9bb1f6046943946236ceae61d9266a8c14bdcfe /kafka/client.py | |
parent | ffdc08aeec040862d522914a480c135626a19e69 (diff) | |
parent | d2df8f54637490b1dbe858066b74710b57186016 (diff) | |
download | kafka-python-5684af438e6cf871540aa8ea8b556737f56e9798.tar.gz |
Merge pull request #33 from mahendra/asyncproducer
Support for async producer
Merged locally, tests pass, +1
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 13 |
1 files changed, 11 insertions, 2 deletions
diff --git a/kafka/client.py b/kafka/client.py index 1146798..b3f8667 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -161,12 +161,16 @@ class KafkaClient(object): # Send the request, recv the response conn.send(requestId, request) + + if decoder_fn is None: + continue + response = conn.recv(requestId) for response in decoder_fn(response): acc[(response.topic, response.partition)] = response # Order the accumulated responses by the original key order - return (acc[k] for k in original_keys) + return (acc[k] for k in original_keys) if acc else () ################# # Public API # @@ -201,7 +205,12 @@ class KafkaClient(object): encoder = partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout) - decoder = KafkaProtocol.decode_produce_response + + if acks == 0: + decoder = None + else: + decoder = KafkaProtocol.decode_produce_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) out = [] |