diff options
-rw-r--r-- | kafka/client.py | 13 | ||||
-rw-r--r-- | kafka/conn.py | 2 | ||||
-rw-r--r-- | kafka/producer.py | 6 |
3 files changed, 15 insertions, 6 deletions
diff --git a/kafka/client.py b/kafka/client.py index 1146798..b3f8667 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -161,12 +161,16 @@ class KafkaClient(object): # Send the request, recv the response conn.send(requestId, request) + + if decoder_fn is None: + continue + response = conn.recv(requestId) for response in decoder_fn(response): acc[(response.topic, response.partition)] = response # Order the accumulated responses by the original key order - return (acc[k] for k in original_keys) + return (acc[k] for k in original_keys) if acc else () ################# # Public API # @@ -201,7 +205,12 @@ class KafkaClient(object): encoder = partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout) - decoder = KafkaProtocol.decode_produce_response + + if acks == 0: + decoder = None + else: + decoder = KafkaProtocol.decode_produce_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) out = [] diff --git a/kafka/conn.py b/kafka/conn.py index fce1fdc..aba3ada 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -76,11 +76,11 @@ class KafkaConnection(local): sent = self._sock.sendall(payload) if sent != None: raise RuntimeError("Kafka went away") - self.data = self._consume_response() def recv(self, requestId): "Get a response from Kafka" log.debug("Reading response %d from Kafka" % requestId) + self.data = self._consume_response() return self.data def close(self): diff --git a/kafka/producer.py b/kafka/producer.py index 2412d6d..9ed0056 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -30,7 +30,7 @@ class Producer(object): DEFAULT_ACK_TIMEOUT = 1000 - def __init__(self, client, async=False, req_acks=ACK_NOT_REQUIRED, + def __init__(self, client, async=False, req_acks=ACK_AFTER_LOCAL_WRITE, ack_timeout=DEFAULT_ACK_TIMEOUT): self.client = client self.async = async @@ -86,7 +86,7 @@ class SimpleProducer(Producer): for an acknowledgement """ def __init__(self, client, topic, async=False, - req_acks=Producer.ACK_NOT_REQUIRED, + req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT): self.topic = topic client._load_metadata_for_topics(topic) @@ -116,7 +116,7 @@ class KeyedProducer(Producer): for an acknowledgement """ def __init__(self, client, topic, partitioner=None, async=False, - req_acks=Producer.ACK_NOT_REQUIRED, + req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT): self.topic = topic client._load_metadata_for_topics(topic) |