summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-04-22 12:14:11 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-06-03 11:22:48 +0300
commit5119bb605acc4b24e091778656b229a36f9cac11 (patch)
tree82c2aa8108c36570c3e8e8c9fd022a5ab18e86f3
parent7da48f62975385e15e4115df70986688837058b8 (diff)
downloadkafka-python-5119bb605acc4b24e091778656b229a36f9cac11.tar.gz
Fix names for async retries opts, add timeout for put
-rw-r--r--kafka/common.py4
-rw-r--r--kafka/producer/base.py28
-rw-r--r--kafka/producer/keyed.py11
-rw-r--r--kafka/producer/simple.py11
4 files changed, 34 insertions, 20 deletions
diff --git a/kafka/common.py b/kafka/common.py
index e327d02..87c29f0 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -78,12 +78,12 @@ TopicAndPartition = namedtuple("TopicAndPartition",
KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])
+# Define retry policy for async producer
+# Limit corner values: None - infinite retries, 0 - no retries
RetryOptions = namedtuple("RetryOptions",
["limit", "backoff_ms", "retry_on_timeouts"])
-
-
#################
# Exceptions #
#################
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index fffea94..0b31d18 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -29,11 +29,13 @@ log = logging.getLogger("kafka")
BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20
-BATCH_RETRY_OPTIONS = RetryOptions(
- limit=0, backoff_ms=300, retry_on_timeouts=False)
# unlimited
ASYNC_QUEUE_MAXSIZE = 0
+ASYNC_QUEUE_PUT_TIMEOUT = 0
+# no retries by default
+ASYNC_RETRY_OPTIONS = RetryOptions(
+ limit=0, backoff_ms=0, retry_on_timeouts=False)
STOP_ASYNC_PRODUCER = -1
@@ -108,7 +110,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
finally:
reqs = []
- if not reqs_to_retry:
+ if not reqs_to_retry or retry_options.limit == 0:
continue
# doing backoff before next retry
@@ -120,11 +122,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if error_type in RETRY_REFRESH_ERROR_TYPES:
client.load_metadata_for_topics()
- reqs = reqs_to_retry
- # filter reqs_to_retry if there's a retry limit
- if retry_options.limit and retry_options.limit > 0:
- reqs = [req._replace(retries=req.retries+1)
- for req in reqs if req.retries < retry_options.limit]
+ reqs = [req._replace(retries=req.retries+1)
+ for req in reqs_to_retry
+ if not retry_options.limit or
+ (retry_options.limit and req.retries < retry_options.limit)]
class Producer(object):
@@ -160,8 +161,9 @@ class Producer(object):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- batch_retry_options=BATCH_RETRY_OPTIONS,
- async_queue_maxsize=ASYNC_QUEUE_MAXSIZE):
+ async_retry_options=ASYNC_RETRY_OPTIONS,
+ async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
+ async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
if batch_send:
async = True
@@ -188,6 +190,7 @@ class Producer(object):
if self.async:
# Messages are sent through this queue
self.queue = Queue(async_queue_maxsize)
+ self.async_queue_put_timeout = async_queue_put_timeout
self.thread_stop_event = Event()
self.thread = Thread(target=_send_upstream,
args=(self.queue,
@@ -197,7 +200,7 @@ class Producer(object):
batch_send_every_n,
self.req_acks,
self.ack_timeout,
- batch_retry_options,
+ async_retry_options,
self.thread_stop_event))
# Thread will die if main thread exits
@@ -249,10 +252,11 @@ class Producer(object):
raise TypeError("the key must be type bytes")
if self.async:
+ put_timeout = self.async_queue_put_timeout
for m in msg:
try:
item = (TopicAndPartition(topic, partition), m, key)
- self.queue.put_nowait(item)
+ self.queue.put(item, bool(put_timeout), put_timeout)
except Full:
raise AsyncProducerQueueFull(
'Producer async queue overfilled. '
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 7bcc629..0fdccd5 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -7,7 +7,8 @@ from kafka.util import kafka_bytestring
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT, BATCH_RETRY_OPTIONS
+ BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS,
+ ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT
)
log = logging.getLogger("kafka")
@@ -38,7 +39,9 @@ class KeyedProducer(Producer):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- batch_retry_options=BATCH_RETRY_OPTIONS):
+ async_retry_options=ASYNC_RETRY_OPTIONS,
+ async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
+ async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
if not partitioner:
partitioner = HashedPartitioner
self.partitioner_class = partitioner
@@ -48,7 +51,9 @@ class KeyedProducer(Producer):
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t,
- batch_retry_options)
+ async_retry_options,
+ async_queue_maxsize,
+ async_queue_put_timeout)
def _next_partition(self, topic, key):
if topic not in self.partitioners:
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index b869683..f7dfc46 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -10,7 +10,8 @@ from six.moves import xrange
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT, BATCH_RETRY_OPTIONS
+ BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS,
+ ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT
)
log = logging.getLogger("kafka")
@@ -46,14 +47,18 @@ class SimpleProducer(Producer):
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
random_start=True,
- batch_retry_options=BATCH_RETRY_OPTIONS):
+ async_retry_options=ASYNC_RETRY_OPTIONS,
+ async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
+ async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t,
- batch_retry_options)
+ async_retry_options,
+ async_queue_maxsize,
+ async_queue_put_timeout)
def _next_partition(self, topic):
if topic not in self.partition_cycles: