summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py13
-rw-r--r--kafka/conn.py2
-rw-r--r--kafka/producer.py6
3 files changed, 15 insertions, 6 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 1146798..b3f8667 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -161,12 +161,16 @@ class KafkaClient(object):
# Send the request, recv the response
conn.send(requestId, request)
+
+ if decoder_fn is None:
+ continue
+
response = conn.recv(requestId)
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
# Order the accumulated responses by the original key order
- return (acc[k] for k in original_keys)
+ return (acc[k] for k in original_keys) if acc else ()
#################
# Public API #
@@ -201,7 +205,12 @@ class KafkaClient(object):
encoder = partial(KafkaProtocol.encode_produce_request,
acks=acks, timeout=timeout)
- decoder = KafkaProtocol.decode_produce_response
+
+ if acks == 0:
+ decoder = None
+ else:
+ decoder = KafkaProtocol.decode_produce_response
+
resps = self._send_broker_aware_request(payloads, encoder, decoder)
out = []
diff --git a/kafka/conn.py b/kafka/conn.py
index fce1fdc..aba3ada 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -76,11 +76,11 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent != None:
raise RuntimeError("Kafka went away")
- self.data = self._consume_response()
def recv(self, requestId):
"Get a response from Kafka"
log.debug("Reading response %d from Kafka" % requestId)
+ self.data = self._consume_response()
return self.data
def close(self):
diff --git a/kafka/producer.py b/kafka/producer.py
index 2412d6d..9ed0056 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -30,7 +30,7 @@ class Producer(object):
DEFAULT_ACK_TIMEOUT = 1000
- def __init__(self, client, async=False, req_acks=ACK_NOT_REQUIRED,
+ def __init__(self, client, async=False, req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT):
self.client = client
self.async = async
@@ -86,7 +86,7 @@ class SimpleProducer(Producer):
for an acknowledgement
"""
def __init__(self, client, topic, async=False,
- req_acks=Producer.ACK_NOT_REQUIRED,
+ req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT):
self.topic = topic
client._load_metadata_for_topics(topic)
@@ -116,7 +116,7 @@ class KeyedProducer(Producer):
for an acknowledgement
"""
def __init__(self, client, topic, partitioner=None, async=False,
- req_acks=Producer.ACK_NOT_REQUIRED,
+ req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT):
self.topic = topic
client._load_metadata_for_topics(topic)