diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-04-22 12:14:11 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-03 11:22:48 +0300 |
commit | 5119bb605acc4b24e091778656b229a36f9cac11 (patch) | |
tree | 82c2aa8108c36570c3e8e8c9fd022a5ab18e86f3 | |
parent | 7da48f62975385e15e4115df70986688837058b8 (diff) | |
download | kafka-python-5119bb605acc4b24e091778656b229a36f9cac11.tar.gz |
Fix names for async retries opts, add timeout for put
-rw-r--r-- | kafka/common.py | 4 | ||||
-rw-r--r-- | kafka/producer/base.py | 28 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 11 | ||||
-rw-r--r-- | kafka/producer/simple.py | 11 |
4 files changed, 34 insertions, 20 deletions
diff --git a/kafka/common.py b/kafka/common.py index e327d02..87c29f0 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -78,12 +78,12 @@ TopicAndPartition = namedtuple("TopicAndPartition", KafkaMessage = namedtuple("KafkaMessage", ["topic", "partition", "offset", "key", "value"]) +# Define retry policy for async producer +# Limit corner values: None - infinite retries, 0 - no retries RetryOptions = namedtuple("RetryOptions", ["limit", "backoff_ms", "retry_on_timeouts"]) - - ################# # Exceptions # ################# diff --git a/kafka/producer/base.py b/kafka/producer/base.py index fffea94..0b31d18 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -29,11 +29,13 @@ log = logging.getLogger("kafka") BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 -BATCH_RETRY_OPTIONS = RetryOptions( - limit=0, backoff_ms=300, retry_on_timeouts=False) # unlimited ASYNC_QUEUE_MAXSIZE = 0 +ASYNC_QUEUE_PUT_TIMEOUT = 0 +# no retries by default +ASYNC_RETRY_OPTIONS = RetryOptions( + limit=0, backoff_ms=0, retry_on_timeouts=False) STOP_ASYNC_PRODUCER = -1 @@ -108,7 +110,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, finally: reqs = [] - if not reqs_to_retry: + if not reqs_to_retry or retry_options.limit == 0: continue # doing backoff before next retry @@ -120,11 +122,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if error_type in RETRY_REFRESH_ERROR_TYPES: client.load_metadata_for_topics() - reqs = reqs_to_retry - # filter reqs_to_retry if there's a retry limit - if retry_options.limit and retry_options.limit > 0: - reqs = [req._replace(retries=req.retries+1) - for req in reqs if req.retries < retry_options.limit] + reqs = [req._replace(retries=req.retries+1) + for req in reqs_to_retry + if not retry_options.limit or + (retry_options.limit and req.retries < retry_options.limit)] class Producer(object): @@ -160,8 +161,9 @@ class Producer(object): batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - batch_retry_options=BATCH_RETRY_OPTIONS, - async_queue_maxsize=ASYNC_QUEUE_MAXSIZE): + async_retry_options=ASYNC_RETRY_OPTIONS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): if batch_send: async = True @@ -188,6 +190,7 @@ class Producer(object): if self.async: # Messages are sent through this queue self.queue = Queue(async_queue_maxsize) + self.async_queue_put_timeout = async_queue_put_timeout self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, @@ -197,7 +200,7 @@ class Producer(object): batch_send_every_n, self.req_acks, self.ack_timeout, - batch_retry_options, + async_retry_options, self.thread_stop_event)) # Thread will die if main thread exits @@ -249,10 +252,11 @@ class Producer(object): raise TypeError("the key must be type bytes") if self.async: + put_timeout = self.async_queue_put_timeout for m in msg: try: item = (TopicAndPartition(topic, partition), m, key) - self.queue.put_nowait(item) + self.queue.put(item, bool(put_timeout), put_timeout) except Full: raise AsyncProducerQueueFull( 'Producer async queue overfilled. ' diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 7bcc629..0fdccd5 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -7,7 +7,8 @@ from kafka.util import kafka_bytestring from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, BATCH_RETRY_OPTIONS + BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS, + ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT ) log = logging.getLogger("kafka") @@ -38,7 +39,9 @@ class KeyedProducer(Producer): batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - batch_retry_options=BATCH_RETRY_OPTIONS): + async_retry_options=ASYNC_RETRY_OPTIONS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): if not partitioner: partitioner = HashedPartitioner self.partitioner_class = partitioner @@ -48,7 +51,9 @@ class KeyedProducer(Producer): ack_timeout, codec, batch_send, batch_send_every_n, batch_send_every_t, - batch_retry_options) + async_retry_options, + async_queue_maxsize, + async_queue_put_timeout) def _next_partition(self, topic, key): if topic not in self.partitioners: diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index b869683..f7dfc46 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -10,7 +10,8 @@ from six.moves import xrange from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, BATCH_RETRY_OPTIONS + BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS, + ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT ) log = logging.getLogger("kafka") @@ -46,14 +47,18 @@ class SimpleProducer(Producer): batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, random_start=True, - batch_retry_options=BATCH_RETRY_OPTIONS): + async_retry_options=ASYNC_RETRY_OPTIONS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): self.partition_cycles = {} self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, batch_send_every_t, - batch_retry_options) + async_retry_options, + async_queue_maxsize, + async_queue_put_timeout) def _next_partition(self, topic): if topic not in self.partition_cycles: |