summaryrefslogtreecommitdiff
path: root/kafka/producer/simple.py
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-02-25 15:04:55 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-06-03 11:22:47 +0300
commitb82f94f5a409a237222a6486a870751fa17da254 (patch)
treee61e22baa87cf055cdb41d829f607989243c456f /kafka/producer/simple.py
parent67424a22869b1906f7a02e2d895f68170f6d0f1d (diff)
downloadkafka-python-b82f94f5a409a237222a6486a870751fa17da254.tar.gz
Retries for async batching
Diffstat (limited to 'kafka/producer/simple.py')
-rw-r--r--kafka/producer/simple.py11
1 files changed, 8 insertions, 3 deletions
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 2699cf2..7391be0 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -10,7 +10,8 @@ from six.moves import xrange
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT
+ BATCH_SEND_MSG_COUNT,
+ BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT
)
log = logging.getLogger("kafka")
@@ -45,13 +46,17 @@ class SimpleProducer(Producer):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- random_start=True):
+ random_start=True,
+ batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS,
+ batch_retries_limit=BATCH_RETRIES_LIMIT):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
- batch_send_every_t)
+ batch_send_every_t,
+ batch_retry_backoff_ms,
+ batch_retries_limit)
def _next_partition(self, topic):
if topic not in self.partition_cycles: