diff options
-rw-r--r-- | README.md | 6 | ||||
-rw-r--r-- | kafka/producer.py | 65 |
2 files changed, 63 insertions, 8 deletions
@@ -30,10 +30,16 @@ from kafka.producer import SimpleProducer kafka = KafkaClient("localhost", 9092) +# To send messages synchronously producer = SimpleProducer(kafka, "my-topic") producer.send_messages("some message") producer.send_messages("this method", "is variadic") +# To send messages asynchronously +producer = SimpleProducer(kafka, "my-topic", async=True) +producer.send_messages("async message") + +# To consume messages consumer = SimpleConsumer(kafka, "my-group", "my-topic") for message in consumer: print(message) diff --git a/kafka/producer.py b/kafka/producer.py index 589eb11..9a8ba7a 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,4 +1,5 @@ from itertools import cycle +from multiprocessing import Queue, Process import logging from kafka.common import ProduceRequest @@ -7,19 +8,67 @@ from kafka.protocol import create_message log = logging.getLogger("kafka") -class SimpleProducer(object): +class Producer(object): """ - A simple, round-robbin producer. Each message goes to exactly one partition + Base class to be used by producers + + Params: + client - The Kafka client instance to use + topic - The topic for sending messages to + async - If set to true, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these """ - def __init__(self, client, topic): + def __init__(self, client, async=False): self.client = client + self.async = async + + if self.async: + self.queue = Queue() # Messages are sent through this queue + self.proc = Process(target=self._send_upstream, args=(self.queue,)) + self.proc.daemon = True # Process will die if main thread exits + self.proc.start() + + def _send_upstream(self, queue): + """ + Listen on the queue for messages and send them upstream to the brokers + """ + while True: + req = queue.get() + self.client.send_produce_request([req])[0] + + def send_request(self, req): + """ + Helper method to send produce requests + """ + if self.async: + self.queue.put(req) + else: + resp = self.client.send_produce_request([req])[0] + assert resp.error == 0 + + def stop(self): + if self.async: + self.proc.terminate() + self.proc.join() + + +class SimpleProducer(Producer): + """ + A simple, round-robbin producer. Each message goes to exactly one partition + + Params: + client - The Kafka client instance to use + topic - The topic for sending messages to + async - If True, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these + """ + def __init__(self, client, topic, async=False): self.topic = topic - self.client._load_metadata_for_topics(topic) - self.next_partition = cycle(self.client.topic_partitions[topic]) + client._load_metadata_for_topics(topic) + self.next_partition = cycle(client.topic_partitions[topic]) + super(SimpleProducer, self).__init__(client, async) def send_messages(self, *msg): req = ProduceRequest(self.topic, self.next_partition.next(), messages=[create_message(m) for m in msg]) - - resp = self.client.send_produce_request([req])[0] - assert resp.error == 0 + self.send_request(req) |