summaryrefslogtreecommitdiff
path: root/kafka/producer.py
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-13 17:51:26 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-13 17:51:26 +0530
commit337127a77dc749224bd7645893950ed90f76f134 (patch)
tree1356494238270b7f23df2faa8dc7a10ba75eda18 /kafka/producer.py
parent77b8301e253774e09d13ff6b7c132fd51e6d9091 (diff)
downloadkafka-python-337127a77dc749224bd7645893950ed90f76f134.tar.gz
Support for async producer
The Java/Scala Kafka client supports a mechanism for sending messages asynchronously by using a queue and a thread. Messages are put on the queue and the worker thread keeps sending it to the broker. This ticket implements this feature in python We use multiprocessing instead of threads to send the messages
Diffstat (limited to 'kafka/producer.py')
-rw-r--r--kafka/producer.py65
1 files changed, 57 insertions, 8 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 589eb11..9a8ba7a 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -1,4 +1,5 @@
from itertools import cycle
+from multiprocessing import Queue, Process
import logging
from kafka.common import ProduceRequest
@@ -7,19 +8,67 @@ from kafka.protocol import create_message
log = logging.getLogger("kafka")
-class SimpleProducer(object):
+class Producer(object):
"""
- A simple, round-robbin producer. Each message goes to exactly one partition
+ Base class to be used by producers
+
+ Params:
+ client - The Kafka client instance to use
+ topic - The topic for sending messages to
+ async - If set to true, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
"""
- def __init__(self, client, topic):
+ def __init__(self, client, async=False):
self.client = client
+ self.async = async
+
+ if self.async:
+ self.queue = Queue() # Messages are sent through this queue
+ self.proc = Process(target=self._send_upstream, args=(self.queue,))
+ self.proc.daemon = True # Process will die if main thread exits
+ self.proc.start()
+
+ def _send_upstream(self, queue):
+ """
+ Listen on the queue for messages and send them upstream to the brokers
+ """
+ while True:
+ req = queue.get()
+ self.client.send_produce_request([req])[0]
+
+ def send_request(self, req):
+ """
+ Helper method to send produce requests
+ """
+ if self.async:
+ self.queue.put(req)
+ else:
+ resp = self.client.send_produce_request([req])[0]
+ assert resp.error == 0
+
+ def stop(self):
+ if self.async:
+ self.proc.terminate()
+ self.proc.join()
+
+
+class SimpleProducer(Producer):
+ """
+ A simple, round-robbin producer. Each message goes to exactly one partition
+
+ Params:
+ client - The Kafka client instance to use
+ topic - The topic for sending messages to
+ async - If True, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
+ """
+ def __init__(self, client, topic, async=False):
self.topic = topic
- self.client._load_metadata_for_topics(topic)
- self.next_partition = cycle(self.client.topic_partitions[topic])
+ client._load_metadata_for_topics(topic)
+ self.next_partition = cycle(client.topic_partitions[topic])
+ super(SimpleProducer, self).__init__(client, async)
def send_messages(self, *msg):
req = ProduceRequest(self.topic, self.next_partition.next(),
messages=[create_message(m) for m in msg])
-
- resp = self.client.send_produce_request([req])[0]
- assert resp.error == 0
+ self.send_request(req)