diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-02-25 15:04:55 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-03 11:22:47 +0300 |
commit | b82f94f5a409a237222a6486a870751fa17da254 (patch) | |
tree | e61e22baa87cf055cdb41d829f607989243c456f /kafka | |
parent | 67424a22869b1906f7a02e2d895f68170f6d0f1d (diff) | |
download | kafka-python-b82f94f5a409a237222a6486a870751fa17da254.tar.gz |
Retries for async batching
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/common.py | 12 | ||||
-rw-r--r-- | kafka/producer/base.py | 31 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 11 | ||||
-rw-r--r-- | kafka/producer/simple.py | 11 |
4 files changed, 50 insertions, 15 deletions
diff --git a/kafka/common.py b/kafka/common.py index 8207bec..b3380d7 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -6,6 +6,7 @@ from collections import namedtuple # Structs # ############### +<<<<<<< HEAD # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI MetadataRequest = namedtuple("MetadataRequest", ["topics"]) @@ -14,8 +15,15 @@ MetadataResponse = namedtuple("MetadataResponse", ["brokers", "topics"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI -ProduceRequest = namedtuple("ProduceRequest", - ["topic", "partition", "messages"]) +_ProduceRequest = namedtuple("ProduceRequest", + ["topic", "partition", "messages", "retries"]) + + +class ProduceRequest(_ProduceRequest): + def __new__(cls, topic, partition, messages, retries=0): + return super(ProduceRequest, cls).__new__( + cls, topic, partition, messages, retries) + ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 4bd3de4..a5af3d6 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -24,22 +24,26 @@ log = logging.getLogger("kafka") BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 +BATCH_RETRY_BACKOFF_MS = 300 +BATCH_RETRIES_LIMIT = 5 STOP_ASYNC_PRODUCER = -1 def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout, stop_event): + req_acks, ack_timeout, retry_backoff, retries_limit, stop_event): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one request """ stop = False + reqs = [] + client.reinit() while not stop_event.is_set(): timeout = batch_time - count = batch_size + count = batch_size - len(reqs) send_at = time.time() + timeout msgset = defaultdict(list) @@ -48,7 +52,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, while count > 0 and timeout >= 0: try: topic_partition, msg, key = queue.get(timeout=timeout) - except Empty: break @@ -63,7 +66,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, msgset[topic_partition].append((msg, key)) # Send collected requests upstream - reqs = [] for topic_partition, msg in msgset.items(): messages = create_message_set(msg, codec, key) req = ProduceRequest(topic_partition.topic, @@ -75,8 +77,19 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, client.send_produce_request(reqs, acks=req_acks, timeout=ack_timeout) - except Exception: - log.exception("Unable to send message") + except FailedPayloadsError as ex: + log.exception("Failed payloads count %s" % len(ex.message)) + if retries_limit is None: + reqs = ex.message + continue + for req in ex.message: + if retries_limit and req.retries < retries_limit: + reqs.append(req._replace(retries=req.retries+1)) + except Exception as ex: + log.exception("Unable to send message: %s" % type(ex)) + + if reqs and retry_backoff: + time.sleep(float(retry_backoff) / 1000) class Producer(object): @@ -111,7 +124,9 @@ class Producer(object): codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, + batch_retries_limit=BATCH_RETRIES_LIMIT): if batch_send: async = True @@ -148,6 +163,8 @@ class Producer(object): batch_send_every_n, self.req_acks, self.ack_timeout, + batch_retry_backoff_ms, + batch_retries_limit, self.thread_stop_event)) # Thread will die if main thread exits diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 333b6c0..aa569b3 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -7,7 +7,8 @@ from kafka.util import kafka_bytestring from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT + BATCH_SEND_MSG_COUNT, + BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT ) log = logging.getLogger("kafka") @@ -37,7 +38,9 @@ class KeyedProducer(Producer): codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, + batch_retries_limit=BATCH_RETRIES_LIMIT): if not partitioner: partitioner = HashedPartitioner self.partitioner_class = partitioner @@ -46,7 +49,9 @@ class KeyedProducer(Producer): super(KeyedProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + batch_retry_backoff_ms, + batch_retries_limit) def _next_partition(self, topic, key): if topic not in self.partitioners: diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 2699cf2..7391be0 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -10,7 +10,8 @@ from six.moves import xrange from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT + BATCH_SEND_MSG_COUNT, + BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT ) log = logging.getLogger("kafka") @@ -45,13 +46,17 @@ class SimpleProducer(Producer): batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - random_start=True): + random_start=True, + batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, + batch_retries_limit=BATCH_RETRIES_LIMIT): self.partition_cycles = {} self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + batch_retry_backoff_ms, + batch_retries_limit) def _next_partition(self, topic): if topic not in self.partition_cycles: |