summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-20 19:19:08 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-20 19:19:08 +0530
commite690b75590837f24d456bb5e5c766dac8ac093ad (patch)
tree14e626c39d97a502835fcda0becae94375364531 /kafka
parent612dea17c3fca1cb9283c6c60f6def7e76710d4c (diff)
downloadkafka-python-e690b75590837f24d456bb5e5c766dac8ac093ad.tar.gz
Make the default case as 'ack on local write'
Also, ensure that the case of 'no-acks' works fine In conn.send(), do not wait for the response. Wait for it only on conn.recv(). This behaviour is fine now since the connection is not shared among consumer threads etc.
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py13
-rw-r--r--kafka/conn.py2
-rw-r--r--kafka/producer.py6
3 files changed, 15 insertions, 6 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 1146798..b3f8667 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -161,12 +161,16 @@ class KafkaClient(object):
# Send the request, recv the response
conn.send(requestId, request)
+
+ if decoder_fn is None:
+ continue
+
response = conn.recv(requestId)
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
# Order the accumulated responses by the original key order
- return (acc[k] for k in original_keys)
+ return (acc[k] for k in original_keys) if acc else ()
#################
# Public API #
@@ -201,7 +205,12 @@ class KafkaClient(object):
encoder = partial(KafkaProtocol.encode_produce_request,
acks=acks, timeout=timeout)
- decoder = KafkaProtocol.decode_produce_response
+
+ if acks == 0:
+ decoder = None
+ else:
+ decoder = KafkaProtocol.decode_produce_response
+
resps = self._send_broker_aware_request(payloads, encoder, decoder)
out = []
diff --git a/kafka/conn.py b/kafka/conn.py
index fce1fdc..aba3ada 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -76,11 +76,11 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent != None:
raise RuntimeError("Kafka went away")
- self.data = self._consume_response()
def recv(self, requestId):
"Get a response from Kafka"
log.debug("Reading response %d from Kafka" % requestId)
+ self.data = self._consume_response()
return self.data
def close(self):
diff --git a/kafka/producer.py b/kafka/producer.py
index 2412d6d..9ed0056 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -30,7 +30,7 @@ class Producer(object):
DEFAULT_ACK_TIMEOUT = 1000
- def __init__(self, client, async=False, req_acks=ACK_NOT_REQUIRED,
+ def __init__(self, client, async=False, req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT):
self.client = client
self.async = async
@@ -86,7 +86,7 @@ class SimpleProducer(Producer):
for an acknowledgement
"""
def __init__(self, client, topic, async=False,
- req_acks=Producer.ACK_NOT_REQUIRED,
+ req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT):
self.topic = topic
client._load_metadata_for_topics(topic)
@@ -116,7 +116,7 @@ class KeyedProducer(Producer):
for an acknowledgement
"""
def __init__(self, client, topic, partitioner=None, async=False,
- req_acks=Producer.ACK_NOT_REQUIRED,
+ req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT):
self.topic = topic
client._load_metadata_for_topics(topic)