summaryrefslogtreecommitdiff
path: root/kafka/producer/sender.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 13:04:24 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-06 14:48:13 -0700
commit3d16f2ff5f75380c8a9fce846f35e92bb5bfb935 (patch)
tree169a0e740992d6bff7b4e46dbf047d14429b5d82 /kafka/producer/sender.py
parent331442ee0fcc0d888c2b2d2ed4f2a339d167b4a2 (diff)
downloadkafka-python-kafka-2136.tar.gz
KAFKA-2136: support Fetch and Produce v1 (throttle_time_ms)kafka-2136
Diffstat (limited to 'kafka/producer/sender.py')
-rw-r--r--kafka/producer/sender.py8
1 files changed, 5 insertions, 3 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 2201261..bf7c163 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -27,6 +27,7 @@ class Sender(threading.Thread):
'retries': 0,
'request_timeout_ms': 30000,
'client_id': 'kafka-python-' + __version__,
+ 'api_version': (0, 8, 0),
}
def __init__(self, client, metadata, accumulator, **configs):
@@ -232,7 +233,7 @@ class Sender(threading.Thread):
collated: {node_id: [RecordBatch]}
Returns:
- dict: {node_id: ProduceRequest}
+ dict: {node_id: ProduceRequest} (version depends on api_version)
"""
requests = {}
for node_id, batches in six.iteritems(collated):
@@ -245,7 +246,7 @@ class Sender(threading.Thread):
"""Create a produce request from the given record batches.
Returns:
- ProduceRequest
+ ProduceRequest (version depends on api_version)
"""
produce_records_by_partition = collections.defaultdict(dict)
for batch in batches:
@@ -256,7 +257,8 @@ class Sender(threading.Thread):
buf = batch.records.buffer()
produce_records_by_partition[topic][partition] = buf
- return ProduceRequest[0](
+ version = 1 if self.config['api_version'] >= (0, 9) else 0
+ return ProduceRequest[version](
required_acks=acks,
timeout=timeout,
topics=[(topic, list(partition_info.items()))