summaryrefslogtreecommitdiff
path: root/kafka/producer/sender.py
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-10-25 07:28:35 +0900
committerGitHub <noreply@github.com>2017-10-25 07:28:35 +0900
commit8b05ee8da50b4c7b832676f4e38f9d92a86639cc (patch)
tree91fe16e3c9aff44ca93633824b96da4b8ff19384 /kafka/producer/sender.py
parent4213d53d4ccfd239addc1db07b5b3913b4c6547c (diff)
downloadkafka-python-8b05ee8da50b4c7b832676f4e38f9d92a86639cc.tar.gz
Add DefaultRecordBatch implementation aka V2 message format parser/builder. (#1185)
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
Diffstat (limited to 'kafka/producer/sender.py')
-rw-r--r--kafka/producer/sender.py9
1 files changed, 7 insertions, 2 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 72a15bb..ffc67f8 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -291,7 +291,11 @@ class Sender(threading.Thread):
buf = batch.records.buffer()
produce_records_by_partition[topic][partition] = buf
- if self.config['api_version'] >= (0, 10):
+ kwargs = {}
+ if self.config['api_version'] >= (0, 11):
+ version = 3
+ kwargs = dict(transactional_id=None)
+ elif self.config['api_version'] >= (0, 10):
version = 2
elif self.config['api_version'] == (0, 9):
version = 1
@@ -302,7 +306,8 @@ class Sender(threading.Thread):
timeout=timeout,
topics=[(topic, list(partition_info.items()))
for topic, partition_info
- in six.iteritems(produce_records_by_partition)]
+ in six.iteritems(produce_records_by_partition)],
+ **kwargs
)
def wakeup(self):