diff options
-rw-r--r-- | kafka/producer/base.py | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 15768be..2f47d87 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -52,13 +52,18 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, while not stop_event.is_set(): timeout = batch_time - - # it's a simplification: we're comparing message sets and - # messages: each set can contain [1..batch_size] messages - count = batch_size - len(request_tries) + count = batch_size send_at = time.time() + timeout msgset = defaultdict(list) + # Merging messages will require a bit more work to manage correctly + # for now, dont look for new batches if we have old ones to retry + if request_tries: + count = 0 + log.debug('Skipping new batch collection to handle retries') + else: + log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout)) + # Keep fetching till we gather enough messages or a # timeout is reached while count > 0 and timeout >= 0: |