diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-20 14:58:33 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-20 14:58:33 +0530 |
commit | c2c555798ed915a5560b49a2c3c2be2c3ca5c2d4 (patch) | |
tree | 077bcd32f3a7259cf2ead209bb3df598e80c5944 | |
parent | 604d78bf58e00cd8fc926b5db5206396c63e4286 (diff) | |
download | kafka-python-c2c555798ed915a5560b49a2c3c2be2c3ca5c2d4.tar.gz |
Add ack support for synchronous producer
Add support for two options in the producer - req_acks and ack_timeout
The acks, if any, are passed to the caller directly
-rw-r--r-- | kafka/producer.py | 49 |
1 files changed, 39 insertions, 10 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 413628b..9c4f9a0 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -18,10 +18,24 @@ class Producer(object): topic - The topic for sending messages to async - If set to true, the messages are sent asynchronously via another thread (process). We will not wait for a response to these + req_acks - A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout - Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement """ - def __init__(self, client, async=False): + + ACK_NOT_REQUIRED = 0 # No ack is required + ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log + ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed + + DEFAULT_ACK_TIMEOUT = 1000 + + def __init__(self, client, async=False, req_acks=ACK_NOT_REQUIRED, + ack_timeout=DEFAULT_ACK_TIMEOUT): self.client = client self.async = async + self.req_acks = req_acks + self.ack_timeout = ack_timeout if self.async: self.queue = Queue() # Messages are sent through this queue @@ -35,17 +49,21 @@ class Producer(object): """ while True: req = queue.get() - self.client.send_produce_request([req])[0] + # Ignore any acks in the async mode + self.client.send_produce_request([req], acks=self.req_acks, + timeout=self.ack_timeout) def send_request(self, req): """ Helper method to send produce requests """ + resp = [] if self.async: self.queue.put(req) else: - resp = self.client.send_produce_request([req])[0] - assert resp.error == 0 + resp = self.client.send_produce_request([req], acks=self.req_acks, + timeout=self.ack_timeout) + return resp def stop(self): if self.async: @@ -62,17 +80,24 @@ class SimpleProducer(Producer): topic - The topic for sending messages to async - If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these + req_acks - A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout - Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement """ - def __init__(self, client, topic, async=False): + def __init__(self, client, topic, async=False, req_acks=ACK_NOT_REQUIRED, + ack_timeout=DEFAULT_ACK_TIMEOUT): self.topic = topic client._load_metadata_for_topics(topic) self.next_partition = cycle(client.topic_partitions[topic]) - super(SimpleProducer, self).__init__(client, async) + + super(SimpleProducer, self).__init__(client, async, + req_acks, ack_timeout) def send_messages(self, *msg): req = ProduceRequest(self.topic, self.next_partition.next(), messages=[create_message(m) for m in msg]) - self.send_request(req) + return self.send_request(req) class KeyedProducer(Producer): @@ -86,8 +111,11 @@ class KeyedProducer(Producer): to send the message to. Must be derived from Partitioner async - If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these + ack_timeout - Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement """ - def __init__(self, client, topic, partitioner=None, async=False): + def __init__(self, client, topic, partitioner=None, async=False, + req_acks=ACK_NOT_REQUIRED, ack_timeout=DEFAULT_ACK_TIMEOUT): self.topic = topic client._load_metadata_for_topics(topic) @@ -96,7 +124,8 @@ class KeyedProducer(Producer): self.partitioner = partitioner(client.topic_partitions[topic]) - super(KeyedProducer, self).__init__(client, async) + super(KeyedProducer, self).__init__(client, async, + req_acks, ack_timeout) def send(self, key, msg): partitions = self.client.topic_partitions[self.topic] @@ -105,4 +134,4 @@ class KeyedProducer(Producer): req = ProduceRequest(self.topic, partition, messages=[create_message(msg)]) - self.send_request(req) + return self.send_request(req) |