summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/base.py29
1 files changed, 26 insertions, 3 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 0fd742d..18af342 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -39,11 +39,13 @@ ASYNC_RETRY_ON_TIMEOUTS = True
ASYNC_LOG_MESSAGES_ON_ERROR = True
STOP_ASYNC_PRODUCER = -1
+ASYNC_STOP_TIMEOUT_SECS = 30
def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, retry_options, stop_event,
- log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR):
+ log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
+ stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
"""Private method to manage producing messages asynchronously
Listens on the queue for a specified number of messages or until
@@ -69,11 +71,23 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
log_messages_on_error (bool, optional): log stringified message-contents
on any produce error, otherwise only log a hash() of the contents,
defaults to True.
+ stop_timeout (int or float, optional): number of seconds to continue
+ retrying messages after stop_event is set, defaults to 30.
"""
request_tries = {}
client.reinit()
+ stop_at = None
while not (stop_event.is_set() and queue.empty() and not request_tries):
+
+ # Handle stop_timeout
+ if stop_event.is_set():
+ if not stop_at:
+ stop_at = stop_timeout + time.time()
+ if time.time() > stop_at:
+ log.debug('Async producer stopping due to stop_timeout')
+ break
+
timeout = batch_time
count = batch_size
send_at = time.time() + timeout
@@ -181,6 +195,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))
+ if request_tries or not queue.empty():
+ log.error('Stopped producer with {0} unsent messages'
+ .format(len(request_tries) + queue.qsize()))
+
class Producer(object):
"""
@@ -219,6 +237,9 @@ class Producer(object):
requests, defaults to True (log full messages). Hash logging
will not allow you to identify the specific message that failed,
but it will allow you to match failures with retries.
+ async_stop_timeout (int or float, optional): seconds to continue
+ attempting to send queued messages after producer.stop(),
+ defaults to 30.
Deprecated Arguments:
batch_send (bool, optional): If True, messages are sent by a background
@@ -242,7 +263,8 @@ class Producer(object):
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
- async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR):
+ async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
+ async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
if async:
assert batch_send_every_n > 0
@@ -277,7 +299,8 @@ class Producer(object):
batch_send_every_t, batch_send_every_n,
self.req_acks, self.ack_timeout,
async_retry_options, self.thread_stop_event),
- kwargs={'log_messages_on_error': async_log_messages_on_error}
+ kwargs={'log_messages_on_error': async_log_messages_on_error,
+ 'stop_timeout': async_stop_timeout}
)
# Thread will die if main thread exits