summaryrefslogtreecommitdiff
path: root/kafka/producer/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r--kafka/producer/base.py86
1 files changed, 65 insertions, 21 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 9bfe98b..ebeb82d 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -16,7 +16,10 @@ import six
from kafka.common import (
ProduceRequest, TopicAndPartition,
- UnsupportedCodecError, FailedPayloadsError
+ UnsupportedCodecError, FailedPayloadsError, RetryOptions,
+ RequestTimedOutError, KafkaUnavailableError, LeaderNotAvailableError,
+ UnknownTopicOrPartitionError, NotLeaderForPartitionError, ConnectionError,
+ InvalidMessageError, MessageSizeTooLargeError
)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring
@@ -25,20 +28,19 @@ log = logging.getLogger("kafka")
BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20
-BATCH_RETRY_BACKOFF_MS = 300
-BATCH_RETRIES_LIMIT = 0
+BATCH_RETRY_OPTIONS = RetryOptions(
+ limit=0, backoff_ms=300, retry_on_timeouts=True)
STOP_ASYNC_PRODUCER = -1
def _send_upstream(queue, client, codec, batch_time, batch_size,
- req_acks, ack_timeout, retry_backoff, retries_limit, stop_event):
+ req_acks, ack_timeout, retry_options, stop_event):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
request
"""
- stop = False
reqs = []
client.reinit()
@@ -85,28 +87,71 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
client.send_produce_request(reqs,
acks=req_acks,
timeout=ack_timeout)
+
+ except RequestTimedOutError as ex:
+ # should retry only if user is fine with duplicates
+ if retry_options.retry_on_timeouts:
+ reqs_to_retry = reqs
+
+ except KafkaUnavailableError as ex:
+ # backoff + retry
+ do_backoff(retry_options)
+ reqs_to_retry = get_requests_for_retry(reqs, retry_options)
+
+ except (NotLeaderForPartitionError, UnknownTopicOrPartitionError) as ex:
+ # refresh + retry
+ client.load_metadata_for_topics()
+ reqs_to_retry = get_requests_for_retry(reqs, retry_options)
+
+ except (LeaderNotAvailableError, ConnectionError) as ex:
+ # backoff + refresh + retry
+ do_backoff(retry_options)
+ client.load_metadata_for_topics()
+ reqs_to_retry = get_requests_for_retry(reqs, retry_options)
+
except FailedPayloadsError as ex:
+ # retry only failed messages with backoff
failed_reqs = ex.failed_payloads
- log.exception("Failed payloads count %s" % len(failed_reqs))
-
- # if no limit, retry all failed messages until success
- if retries_limit is None:
- reqs_to_retry = failed_reqs
- # makes sense to check failed reqs only if we have a limit > 0
- elif retries_limit > 0:
- for req in failed_reqs:
- if retries_limit and req.retries < retries_limit:
- updated_req = req._replace(retries=req.retries+1)
- reqs_to_retry.append(updated_req)
+ do_backoff(retry_options)
+ reqs_to_retry = get_requests_for_retry(failed_reqs, retry_options)
+
+ except (InvalidMessageError, MessageSizeTooLargeError) as ex:
+ # "bad" messages, doesn't make sense to retry
+ log.exception("Message error when sending: %s" % type(ex))
+
except Exception as ex:
log.exception("Unable to send message: %s" % type(ex))
+
finally:
reqs = []
- if reqs_to_retry and retry_backoff:
+ if reqs_to_retry:
reqs = reqs_to_retry
- log.warning("%s requests will be retried next call." % len(reqs))
- time.sleep(float(retry_backoff) / 1000)
+
+
+def get_requests_for_retry(requests, retry_options):
+ log.exception("Failed payloads count %s" % len(requests))
+
+ # if no limit, retry all failed messages until success
+ if retry_options.limit is None:
+ return requests
+
+ # makes sense to check failed reqs only if we have a limit > 0
+ reqs_to_retry = []
+ if retry_options.limit > 0:
+ for req in requests:
+ if req.retries < retry_options.limit:
+ updated_req = req._replace(retries=req.retries+1)
+ reqs_to_retry.append(updated_req)
+
+ return reqs_to_retry
+
+
+def do_backoff(retry_options):
+ if retry_options.backoff_ms:
+ log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms)
+ time.sleep(float(retry_options.backoff_ms) / 1000)
+
class Producer(object):
@@ -142,8 +187,7 @@ class Producer(object):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS,
- batch_retries_limit=BATCH_RETRIES_LIMIT):
+ batch_retry_options=BATCH_RETRY_OPTIONS):
if batch_send:
async = True