summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-02-25 15:04:55 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-06-03 11:22:47 +0300
commitb82f94f5a409a237222a6486a870751fa17da254 (patch)
treee61e22baa87cf055cdb41d829f607989243c456f /kafka
parent67424a22869b1906f7a02e2d895f68170f6d0f1d (diff)
downloadkafka-python-b82f94f5a409a237222a6486a870751fa17da254.tar.gz
Retries for async batching
Diffstat (limited to 'kafka')
-rw-r--r--kafka/common.py12
-rw-r--r--kafka/producer/base.py31
-rw-r--r--kafka/producer/keyed.py11
-rw-r--r--kafka/producer/simple.py11
4 files changed, 50 insertions, 15 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 8207bec..b3380d7 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -6,6 +6,7 @@ from collections import namedtuple
# Structs #
###############
+<<<<<<< HEAD
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
MetadataRequest = namedtuple("MetadataRequest",
["topics"])
@@ -14,8 +15,15 @@ MetadataResponse = namedtuple("MetadataResponse",
["brokers", "topics"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
-ProduceRequest = namedtuple("ProduceRequest",
- ["topic", "partition", "messages"])
+_ProduceRequest = namedtuple("ProduceRequest",
+ ["topic", "partition", "messages", "retries"])
+
+
+class ProduceRequest(_ProduceRequest):
+ def __new__(cls, topic, partition, messages, retries=0):
+ return super(ProduceRequest, cls).__new__(
+ cls, topic, partition, messages, retries)
+
ProduceResponse = namedtuple("ProduceResponse",
["topic", "partition", "error", "offset"])
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 4bd3de4..a5af3d6 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -24,22 +24,26 @@ log = logging.getLogger("kafka")
BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20
+BATCH_RETRY_BACKOFF_MS = 300
+BATCH_RETRIES_LIMIT = 5
STOP_ASYNC_PRODUCER = -1
def _send_upstream(queue, client, codec, batch_time, batch_size,
- req_acks, ack_timeout, stop_event):
+ req_acks, ack_timeout, retry_backoff, retries_limit, stop_event):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
request
"""
stop = False
+ reqs = []
+ client.reinit()
while not stop_event.is_set():
timeout = batch_time
- count = batch_size
+ count = batch_size - len(reqs)
send_at = time.time() + timeout
msgset = defaultdict(list)
@@ -48,7 +52,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
while count > 0 and timeout >= 0:
try:
topic_partition, msg, key = queue.get(timeout=timeout)
-
except Empty:
break
@@ -63,7 +66,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
msgset[topic_partition].append((msg, key))
# Send collected requests upstream
- reqs = []
for topic_partition, msg in msgset.items():
messages = create_message_set(msg, codec, key)
req = ProduceRequest(topic_partition.topic,
@@ -75,8 +77,19 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
client.send_produce_request(reqs,
acks=req_acks,
timeout=ack_timeout)
- except Exception:
- log.exception("Unable to send message")
+ except FailedPayloadsError as ex:
+ log.exception("Failed payloads count %s" % len(ex.message))
+ if retries_limit is None:
+ reqs = ex.message
+ continue
+ for req in ex.message:
+ if retries_limit and req.retries < retries_limit:
+ reqs.append(req._replace(retries=req.retries+1))
+ except Exception as ex:
+ log.exception("Unable to send message: %s" % type(ex))
+
+ if reqs and retry_backoff:
+ time.sleep(float(retry_backoff) / 1000)
class Producer(object):
@@ -111,7 +124,9 @@ class Producer(object):
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+ batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS,
+ batch_retries_limit=BATCH_RETRIES_LIMIT):
if batch_send:
async = True
@@ -148,6 +163,8 @@ class Producer(object):
batch_send_every_n,
self.req_acks,
self.ack_timeout,
+ batch_retry_backoff_ms,
+ batch_retries_limit,
self.thread_stop_event))
# Thread will die if main thread exits
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 333b6c0..aa569b3 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -7,7 +7,8 @@ from kafka.util import kafka_bytestring
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT
+ BATCH_SEND_MSG_COUNT,
+ BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT
)
log = logging.getLogger("kafka")
@@ -37,7 +38,9 @@ class KeyedProducer(Producer):
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+ batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS,
+ batch_retries_limit=BATCH_RETRIES_LIMIT):
if not partitioner:
partitioner = HashedPartitioner
self.partitioner_class = partitioner
@@ -46,7 +49,9 @@ class KeyedProducer(Producer):
super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
- batch_send_every_t)
+ batch_send_every_t,
+ batch_retry_backoff_ms,
+ batch_retries_limit)
def _next_partition(self, topic, key):
if topic not in self.partitioners:
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 2699cf2..7391be0 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -10,7 +10,8 @@ from six.moves import xrange
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT
+ BATCH_SEND_MSG_COUNT,
+ BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT
)
log = logging.getLogger("kafka")
@@ -45,13 +46,17 @@ class SimpleProducer(Producer):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- random_start=True):
+ random_start=True,
+ batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS,
+ batch_retries_limit=BATCH_RETRIES_LIMIT):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
- batch_send_every_t)
+ batch_send_every_t,
+ batch_retry_backoff_ms,
+ batch_retries_limit)
def _next_partition(self, topic):
if topic not in self.partition_cycles: