diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-13 17:51:26 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-13 17:51:26 +0530 |
commit | 337127a77dc749224bd7645893950ed90f76f134 (patch) | |
tree | 1356494238270b7f23df2faa8dc7a10ba75eda18 /kafka/producer.py | |
parent | 77b8301e253774e09d13ff6b7c132fd51e6d9091 (diff) | |
download | kafka-python-337127a77dc749224bd7645893950ed90f76f134.tar.gz |
Support for async producer
The Java/Scala Kafka client supports a mechanism for sending
messages asynchronously by using a queue and a thread.
Messages are put on the queue and the worker thread keeps sending
it to the broker.
This ticket implements this feature in python
We use multiprocessing instead of threads to send the messages
Diffstat (limited to 'kafka/producer.py')
-rw-r--r-- | kafka/producer.py | 65 |
1 files changed, 57 insertions, 8 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 589eb11..9a8ba7a 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,4 +1,5 @@ from itertools import cycle +from multiprocessing import Queue, Process import logging from kafka.common import ProduceRequest @@ -7,19 +8,67 @@ from kafka.protocol import create_message log = logging.getLogger("kafka") -class SimpleProducer(object): +class Producer(object): """ - A simple, round-robbin producer. Each message goes to exactly one partition + Base class to be used by producers + + Params: + client - The Kafka client instance to use + topic - The topic for sending messages to + async - If set to true, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these """ - def __init__(self, client, topic): + def __init__(self, client, async=False): self.client = client + self.async = async + + if self.async: + self.queue = Queue() # Messages are sent through this queue + self.proc = Process(target=self._send_upstream, args=(self.queue,)) + self.proc.daemon = True # Process will die if main thread exits + self.proc.start() + + def _send_upstream(self, queue): + """ + Listen on the queue for messages and send them upstream to the brokers + """ + while True: + req = queue.get() + self.client.send_produce_request([req])[0] + + def send_request(self, req): + """ + Helper method to send produce requests + """ + if self.async: + self.queue.put(req) + else: + resp = self.client.send_produce_request([req])[0] + assert resp.error == 0 + + def stop(self): + if self.async: + self.proc.terminate() + self.proc.join() + + +class SimpleProducer(Producer): + """ + A simple, round-robbin producer. Each message goes to exactly one partition + + Params: + client - The Kafka client instance to use + topic - The topic for sending messages to + async - If True, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these + """ + def __init__(self, client, topic, async=False): self.topic = topic - self.client._load_metadata_for_topics(topic) - self.next_partition = cycle(self.client.topic_partitions[topic]) + client._load_metadata_for_topics(topic) + self.next_partition = cycle(client.topic_partitions[topic]) + super(SimpleProducer, self).__init__(client, async) def send_messages(self, *msg): req = ProduceRequest(self.topic, self.next_partition.next(), messages=[create_message(m) for m in msg]) - - resp = self.client.send_produce_request([req])[0] - assert resp.error == 0 + self.send_request(req) |