summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md6
-rw-r--r--kafka/producer.py65
2 files changed, 63 insertions, 8 deletions
diff --git a/README.md b/README.md
index 8b57172..380e0ae 100644
--- a/README.md
+++ b/README.md
@@ -30,10 +30,16 @@ from kafka.producer import SimpleProducer
kafka = KafkaClient("localhost", 9092)
+# To send messages synchronously
producer = SimpleProducer(kafka, "my-topic")
producer.send_messages("some message")
producer.send_messages("this method", "is variadic")
+# To send messages asynchronously
+producer = SimpleProducer(kafka, "my-topic", async=True)
+producer.send_messages("async message")
+
+# To consume messages
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
for message in consumer:
print(message)
diff --git a/kafka/producer.py b/kafka/producer.py
index 589eb11..9a8ba7a 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -1,4 +1,5 @@
from itertools import cycle
+from multiprocessing import Queue, Process
import logging
from kafka.common import ProduceRequest
@@ -7,19 +8,67 @@ from kafka.protocol import create_message
log = logging.getLogger("kafka")
-class SimpleProducer(object):
+class Producer(object):
"""
- A simple, round-robbin producer. Each message goes to exactly one partition
+ Base class to be used by producers
+
+ Params:
+ client - The Kafka client instance to use
+ topic - The topic for sending messages to
+ async - If set to true, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
"""
- def __init__(self, client, topic):
+ def __init__(self, client, async=False):
self.client = client
+ self.async = async
+
+ if self.async:
+ self.queue = Queue() # Messages are sent through this queue
+ self.proc = Process(target=self._send_upstream, args=(self.queue,))
+ self.proc.daemon = True # Process will die if main thread exits
+ self.proc.start()
+
+ def _send_upstream(self, queue):
+ """
+ Listen on the queue for messages and send them upstream to the brokers
+ """
+ while True:
+ req = queue.get()
+ self.client.send_produce_request([req])[0]
+
+ def send_request(self, req):
+ """
+ Helper method to send produce requests
+ """
+ if self.async:
+ self.queue.put(req)
+ else:
+ resp = self.client.send_produce_request([req])[0]
+ assert resp.error == 0
+
+ def stop(self):
+ if self.async:
+ self.proc.terminate()
+ self.proc.join()
+
+
+class SimpleProducer(Producer):
+ """
+ A simple, round-robbin producer. Each message goes to exactly one partition
+
+ Params:
+ client - The Kafka client instance to use
+ topic - The topic for sending messages to
+ async - If True, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
+ """
+ def __init__(self, client, topic, async=False):
self.topic = topic
- self.client._load_metadata_for_topics(topic)
- self.next_partition = cycle(self.client.topic_partitions[topic])
+ client._load_metadata_for_topics(topic)
+ self.next_partition = cycle(client.topic_partitions[topic])
+ super(SimpleProducer, self).__init__(client, async)
def send_messages(self, *msg):
req = ProduceRequest(self.topic, self.next_partition.next(),
messages=[create_message(m) for m in msg])
-
- resp = self.client.send_produce_request([req])[0]
- assert resp.error == 0
+ self.send_request(req)