summaryrefslogtreecommitdiff
path: root/kafka/producer/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r--kafka/producer/base.py206
1 files changed, 206 insertions, 0 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
new file mode 100644
index 0000000..6c91364
--- /dev/null
+++ b/kafka/producer/base.py
@@ -0,0 +1,206 @@
+from __future__ import absolute_import
+
+import logging
+import time
+
+try:
+ from queue import Empty
+except ImportError:
+ from Queue import Empty
+from collections import defaultdict
+from multiprocessing import Queue, Process
+
+import six
+
+from kafka.common import (
+ ProduceRequest, TopicAndPartition, UnsupportedCodecError
+)
+from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
+
+log = logging.getLogger("kafka")
+
+BATCH_SEND_DEFAULT_INTERVAL = 20
+BATCH_SEND_MSG_COUNT = 20
+
+STOP_ASYNC_PRODUCER = -1
+
+
+def _send_upstream(queue, client, codec, batch_time, batch_size,
+ req_acks, ack_timeout):
+ """
+ Listen on the queue for a specified number of messages or till
+ a specified timeout and send them upstream to the brokers in one
+ request
+
+ NOTE: Ideally, this should have been a method inside the Producer
+ class. However, multiprocessing module has issues in windows. The
+ functionality breaks unless this function is kept outside of a class
+ """
+ stop = False
+ client.reinit()
+
+ while not stop:
+ timeout = batch_time
+ count = batch_size
+ send_at = time.time() + timeout
+ msgset = defaultdict(list)
+
+ # Keep fetching till we gather enough messages or a
+ # timeout is reached
+ while count > 0 and timeout >= 0:
+ try:
+ topic_partition, msg = queue.get(timeout=timeout)
+
+ except Empty:
+ break
+
+ # Check if the controller has requested us to stop
+ if topic_partition == STOP_ASYNC_PRODUCER:
+ stop = True
+ break
+
+ # Adjust the timeout to match the remaining period
+ count -= 1
+ timeout = send_at - time.time()
+ msgset[topic_partition].append(msg)
+
+ # Send collected requests upstream
+ reqs = []
+ for topic_partition, msg in msgset.items():
+ messages = create_message_set(msg, codec)
+ req = ProduceRequest(topic_partition.topic,
+ topic_partition.partition,
+ messages)
+ reqs.append(req)
+
+ try:
+ client.send_produce_request(reqs,
+ acks=req_acks,
+ timeout=ack_timeout)
+ except Exception:
+ log.exception("Unable to send message")
+
+
+class Producer(object):
+ """
+ Base class to be used by producers
+
+ Params:
+ client - The Kafka client instance to use
+ async - If set to true, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
+ WARNING!!! current implementation of async producer does not
+ guarantee message delivery. Use at your own risk! Or help us
+ improve with a PR!
+ req_acks - A value indicating the acknowledgements that the server must
+ receive before responding to the request
+ ack_timeout - Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send - If True, messages are send in batches
+ batch_send_every_n - If set, messages are send in batches of this size
+ batch_send_every_t - If set, messages are send after this timeout
+ """
+
+ ACK_NOT_REQUIRED = 0 # No ack is required
+ ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
+ ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
+
+ DEFAULT_ACK_TIMEOUT = 1000
+
+ def __init__(self, client, async=False,
+ req_acks=ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=DEFAULT_ACK_TIMEOUT,
+ codec=None,
+ batch_send=False,
+ batch_send_every_n=BATCH_SEND_MSG_COUNT,
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+
+ if batch_send:
+ async = True
+ assert batch_send_every_n > 0
+ assert batch_send_every_t > 0
+ else:
+ batch_send_every_n = 1
+ batch_send_every_t = 3600
+
+ self.client = client
+ self.async = async
+ self.req_acks = req_acks
+ self.ack_timeout = ack_timeout
+
+ if codec is None:
+ codec = CODEC_NONE
+ elif codec not in ALL_CODECS:
+ raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
+
+ self.codec = codec
+
+ if self.async:
+ log.warning("async producer does not guarantee message delivery!")
+ log.warning("Current implementation does not retry Failed messages")
+ log.warning("Use at your own risk! (or help improve with a PR!)")
+ self.queue = Queue() # Messages are sent through this queue
+ self.proc = Process(target=_send_upstream,
+ args=(self.queue,
+ self.client.copy(),
+ self.codec,
+ batch_send_every_t,
+ batch_send_every_n,
+ self.req_acks,
+ self.ack_timeout))
+
+ # Process will die if main thread exits
+ self.proc.daemon = True
+ self.proc.start()
+
+ def send_messages(self, topic, partition, *msg):
+ """
+ Helper method to send produce requests
+ @param: topic, name of topic for produce request -- type str
+ @param: partition, partition number for produce request -- type int
+ @param: *msg, one or more message payloads -- type bytes
+ @returns: ResponseRequest returned by server
+ raises on error
+
+ Note that msg type *must* be encoded to bytes by user.
+ Passing unicode message will not work, for example
+ you should encode before calling send_messages via
+ something like `unicode_message.encode('utf-8')`
+
+ All messages produced via this method will set the message 'key' to Null
+ """
+
+ # Guarantee that msg is actually a list or tuple (should always be true)
+ if not isinstance(msg, (list, tuple)):
+ raise TypeError("msg is not a list or tuple!")
+
+ # Raise TypeError if any message is not encoded as bytes
+ if any(not isinstance(m, six.binary_type) for m in msg):
+ raise TypeError("all produce message payloads must be type bytes")
+
+ if self.async:
+ for m in msg:
+ self.queue.put((TopicAndPartition(topic, partition), m))
+ resp = []
+ else:
+ messages = create_message_set(msg, self.codec)
+ req = ProduceRequest(topic, partition, messages)
+ try:
+ resp = self.client.send_produce_request([req], acks=self.req_acks,
+ timeout=self.ack_timeout)
+ except Exception:
+ log.exception("Unable to send messages")
+ raise
+ return resp
+
+ def stop(self, timeout=1):
+ """
+ Stop the producer. Optionally wait for the specified timeout before
+ forcefully cleaning up.
+ """
+ if self.async:
+ self.queue.put((STOP_ASYNC_PRODUCER, None))
+ self.proc.join(timeout)
+
+ if self.proc.is_alive():
+ self.proc.terminate()