diff options
| author | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-25 09:38:27 +0000 |
|---|---|---|
| committer | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-10-24 15:12:11 +0000 |
| commit | 3af66bc542efff3f7010bec18b72d844e09488c4 (patch) | |
| tree | a20623631b36230c9425b08ee95b85afbc9a9455 /kafka/producer/sender.py | |
| parent | e06af5343a55cf8d03e32a645ee970d872cb9ba0 (diff) | |
| download | kafka-python-v2_records.tar.gz | |
Add DefaultRecordBatch implementation aka V2 message format parser/builder.v2_records
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
Diffstat (limited to 'kafka/producer/sender.py')
| -rw-r--r-- | kafka/producer/sender.py | 9 |
1 files changed, 7 insertions, 2 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 72a15bb..ffc67f8 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -291,7 +291,11 @@ class Sender(threading.Thread): buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf - if self.config['api_version'] >= (0, 10): + kwargs = {} + if self.config['api_version'] >= (0, 11): + version = 3 + kwargs = dict(transactional_id=None) + elif self.config['api_version'] >= (0, 10): version = 2 elif self.config['api_version'] == (0, 9): version = 1 @@ -302,7 +306,8 @@ class Sender(threading.Thread): timeout=timeout, topics=[(topic, list(partition_info.items())) for topic, partition_info - in six.iteritems(produce_records_by_partition)] + in six.iteritems(produce_records_by_partition)], + **kwargs ) def wakeup(self): |
