diff options
-rw-r--r-- | kafka/producer/base.py | 29 |
1 files changed, 26 insertions, 3 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 0fd742d..18af342 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -39,11 +39,13 @@ ASYNC_RETRY_ON_TIMEOUTS = True ASYNC_LOG_MESSAGES_ON_ERROR = True STOP_ASYNC_PRODUCER = -1 +ASYNC_STOP_TIMEOUT_SECS = 30 def _send_upstream(queue, client, codec, batch_time, batch_size, req_acks, ack_timeout, retry_options, stop_event, - log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): + log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, + stop_timeout=ASYNC_STOP_TIMEOUT_SECS): """Private method to manage producing messages asynchronously Listens on the queue for a specified number of messages or until @@ -69,11 +71,23 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, log_messages_on_error (bool, optional): log stringified message-contents on any produce error, otherwise only log a hash() of the contents, defaults to True. + stop_timeout (int or float, optional): number of seconds to continue + retrying messages after stop_event is set, defaults to 30. """ request_tries = {} client.reinit() + stop_at = None while not (stop_event.is_set() and queue.empty() and not request_tries): + + # Handle stop_timeout + if stop_event.is_set(): + if not stop_at: + stop_at = stop_timeout + time.time() + if time.time() > stop_at: + log.debug('Async producer stopping due to stop_timeout') + break + timeout = batch_time count = batch_size send_at = time.time() + timeout @@ -181,6 +195,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, orig_req.messages if log_messages_on_error else hash(orig_req.messages)) + if request_tries or not queue.empty(): + log.error('Stopped producer with {0} unsent messages' + .format(len(request_tries) + queue.qsize())) + class Producer(object): """ @@ -219,6 +237,9 @@ class Producer(object): requests, defaults to True (log full messages). Hash logging will not allow you to identify the specific message that failed, but it will allow you to match failures with retries. + async_stop_timeout (int or float, optional): seconds to continue + attempting to send queued messages after producer.stop(), + defaults to 30. Deprecated Arguments: batch_send (bool, optional): If True, messages are sent by a background @@ -242,7 +263,8 @@ class Producer(object): async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, - async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): + async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, + async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS): if async: assert batch_send_every_n > 0 @@ -277,7 +299,8 @@ class Producer(object): batch_send_every_t, batch_send_every_n, self.req_acks, self.ack_timeout, async_retry_options, self.thread_stop_event), - kwargs={'log_messages_on_error': async_log_messages_on_error} + kwargs={'log_messages_on_error': async_log_messages_on_error, + 'stop_timeout': async_stop_timeout} ) # Thread will die if main thread exits |