summaryrefslogtreecommitdiff
path: root/kafka/producer/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r--kafka/producer/base.py62
1 files changed, 40 insertions, 22 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index cd14ab6..9f4942b 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -166,32 +166,54 @@ class Producer(object):
Base class to be used by producers
Arguments:
- client: The Kafka client instance to use
- async: If set to true, the messages are sent asynchronously via another
- thread (process). We will not wait for a response to these
- WARNING!!! current implementation of async producer does not
- guarantee message delivery. Use at your own risk! Or help us
- improve with a PR!
- req_acks: A value indicating the acknowledgements that the server must
- receive before responding to the request
- ack_timeout: Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send: If True, messages are send in batches
- batch_send_every_n: If set, messages are send in batches of this size
- batch_send_every_t: If set, messages are send after this timeout
+ client (KafkaClient): instance to use for broker communications.
+ codec (kafka.protocol.ALL_CODECS): compression codec to use.
+ req_acks (int, optional): A value indicating the acknowledgements that
+ the server must receive before responding to the request,
+ defaults to 1 (local ack).
+ ack_timeout (int, optional): millisecond timeout to wait for the
+ configured req_acks, defaults to 1000.
+ async (bool, optional): send message using a background thread,
+ defaults to False.
+ batch_send_every_n (int, optional): If async is True, messages are
+ sent in batches of this size, defaults to 20.
+ batch_send_every_t (int or float, optional): If async is True,
+ messages are sent immediately after this timeout in seconds, even
+ if there are fewer than batch_send_every_n, defaults to 20.
+ async_retry_limit (int, optional): number of retries for failed messages
+ or None for unlimited, defaults to None / unlimited.
+ async_retry_backoff_ms (int, optional): milliseconds to backoff on
+ failed messages, defaults to 100.
+ async_retry_on_timeouts (bool, optional): whether to retry on
+ RequestTimeoutError, defaults to True.
+ async_queue_maxsize (int, optional): limit to the size of the
+ internal message queue in number of messages (not size), defaults
+ to 0 (no limit).
+ async_queue_put_timeout (int or float, optional): timeout seconds
+ for queue.put in send_messages for async producers -- will only
+ apply if async_queue_maxsize > 0 and the queue is Full,
+ defaults to 0 (fail immediately on full queue).
+ async_log_messages_on_error (bool, optional): set to False and the
+ async producer will only log hash() contents on failed produce
+ requests, defaults to True (log full messages). Hash logging
+ will not allow you to identify the specific message that failed,
+ but it will allow you to match failures with retries.
+
+ Deprecated Arguments:
+ batch_send (bool, optional): If True, messages are sent by a background
+ thread in batches, defaults to False. Deprecated, use 'async'
"""
-
ACK_NOT_REQUIRED = 0 # No ack is required
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
-
DEFAULT_ACK_TIMEOUT = 1000
- def __init__(self, client, async=False,
+ def __init__(self, client,
req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT,
codec=None,
- batch_send=False,
+ async=False,
+ batch_send=False, # deprecated, use async
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
@@ -201,14 +223,10 @@ class Producer(object):
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR):
- if batch_send:
- async = True
+ if async:
assert batch_send_every_n > 0
assert batch_send_every_t > 0
assert async_queue_maxsize >= 0
- else:
- batch_send_every_n = 1
- batch_send_every_t = 3600
self.client = client
self.async = async