diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-05 13:04:24 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-06 14:48:13 -0700 |
commit | 3d16f2ff5f75380c8a9fce846f35e92bb5bfb935 (patch) | |
tree | 169a0e740992d6bff7b4e46dbf047d14429b5d82 /kafka/producer/sender.py | |
parent | 331442ee0fcc0d888c2b2d2ed4f2a339d167b4a2 (diff) | |
download | kafka-python-kafka-2136.tar.gz |
KAFKA-2136: support Fetch and Produce v1 (throttle_time_ms)kafka-2136
Diffstat (limited to 'kafka/producer/sender.py')
-rw-r--r-- | kafka/producer/sender.py | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 2201261..bf7c163 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -27,6 +27,7 @@ class Sender(threading.Thread): 'retries': 0, 'request_timeout_ms': 30000, 'client_id': 'kafka-python-' + __version__, + 'api_version': (0, 8, 0), } def __init__(self, client, metadata, accumulator, **configs): @@ -232,7 +233,7 @@ class Sender(threading.Thread): collated: {node_id: [RecordBatch]} Returns: - dict: {node_id: ProduceRequest} + dict: {node_id: ProduceRequest} (version depends on api_version) """ requests = {} for node_id, batches in six.iteritems(collated): @@ -245,7 +246,7 @@ class Sender(threading.Thread): """Create a produce request from the given record batches. Returns: - ProduceRequest + ProduceRequest (version depends on api_version) """ produce_records_by_partition = collections.defaultdict(dict) for batch in batches: @@ -256,7 +257,8 @@ class Sender(threading.Thread): buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf - return ProduceRequest[0]( + version = 1 if self.config['api_version'] >= (0, 9) else 0 + return ProduceRequest[version]( required_acks=acks, timeout=timeout, topics=[(topic, list(partition_info.items())) |