summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer.py49
1 files changed, 39 insertions, 10 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 413628b..9c4f9a0 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -18,10 +18,24 @@ class Producer(object):
topic - The topic for sending messages to
async - If set to true, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
+ req_acks - A value indicating the acknowledgements that the server must
+ receive before responding to the request
+ ack_timeout - Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
"""
- def __init__(self, client, async=False):
+
+ ACK_NOT_REQUIRED = 0 # No ack is required
+ ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
+ ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
+
+ DEFAULT_ACK_TIMEOUT = 1000
+
+ def __init__(self, client, async=False, req_acks=ACK_NOT_REQUIRED,
+ ack_timeout=DEFAULT_ACK_TIMEOUT):
self.client = client
self.async = async
+ self.req_acks = req_acks
+ self.ack_timeout = ack_timeout
if self.async:
self.queue = Queue() # Messages are sent through this queue
@@ -35,17 +49,21 @@ class Producer(object):
"""
while True:
req = queue.get()
- self.client.send_produce_request([req])[0]
+ # Ignore any acks in the async mode
+ self.client.send_produce_request([req], acks=self.req_acks,
+ timeout=self.ack_timeout)
def send_request(self, req):
"""
Helper method to send produce requests
"""
+ resp = []
if self.async:
self.queue.put(req)
else:
- resp = self.client.send_produce_request([req])[0]
- assert resp.error == 0
+ resp = self.client.send_produce_request([req], acks=self.req_acks,
+ timeout=self.ack_timeout)
+ return resp
def stop(self):
if self.async:
@@ -62,17 +80,24 @@ class SimpleProducer(Producer):
topic - The topic for sending messages to
async - If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
+ req_acks - A value indicating the acknowledgements that the server must
+ receive before responding to the request
+ ack_timeout - Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
"""
- def __init__(self, client, topic, async=False):
+ def __init__(self, client, topic, async=False, req_acks=ACK_NOT_REQUIRED,
+ ack_timeout=DEFAULT_ACK_TIMEOUT):
self.topic = topic
client._load_metadata_for_topics(topic)
self.next_partition = cycle(client.topic_partitions[topic])
- super(SimpleProducer, self).__init__(client, async)
+
+ super(SimpleProducer, self).__init__(client, async,
+ req_acks, ack_timeout)
def send_messages(self, *msg):
req = ProduceRequest(self.topic, self.next_partition.next(),
messages=[create_message(m) for m in msg])
- self.send_request(req)
+ return self.send_request(req)
class KeyedProducer(Producer):
@@ -86,8 +111,11 @@ class KeyedProducer(Producer):
to send the message to. Must be derived from Partitioner
async - If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
+ ack_timeout - Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
"""
- def __init__(self, client, topic, partitioner=None, async=False):
+ def __init__(self, client, topic, partitioner=None, async=False,
+ req_acks=ACK_NOT_REQUIRED, ack_timeout=DEFAULT_ACK_TIMEOUT):
self.topic = topic
client._load_metadata_for_topics(topic)
@@ -96,7 +124,8 @@ class KeyedProducer(Producer):
self.partitioner = partitioner(client.topic_partitions[topic])
- super(KeyedProducer, self).__init__(client, async)
+ super(KeyedProducer, self).__init__(client, async,
+ req_acks, ack_timeout)
def send(self, key, msg):
partitions = self.client.topic_partitions[self.topic]
@@ -105,4 +134,4 @@ class KeyedProducer(Producer):
req = ProduceRequest(self.topic, partition,
messages=[create_message(msg)])
- self.send_request(req)
+ return self.send_request(req)