diff options
-rw-r--r-- | kafka/producer/base.py | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index a0bf47c..0fd742d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -73,7 +73,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, request_tries = {} client.reinit() - while not stop_event.is_set(): + while not (stop_event.is_set() and queue.empty() and not request_tries): timeout = batch_time count = batch_size send_at = time.time() + timeout |