summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/base.py2
1 files changed, 1 insertions, 1 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index a0bf47c..0fd742d 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -73,7 +73,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
request_tries = {}
client.reinit()
- while not stop_event.is_set():
+ while not (stop_event.is_set() and queue.empty() and not request_tries):
timeout = batch_time
count = batch_size
send_at = time.time() + timeout