diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-26 17:23:57 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-26 17:23:57 +0530 |
commit | 7f150a21700d6005d735b1383cffd0227652cacc (patch) | |
tree | 21a5c77016eb65dca26226e8fb2eb9af4a083758 | |
parent | 8b25625c7f3c79bc495f78c4d06fe54087a6fe90 (diff) | |
download | kafka-python-7f150a21700d6005d735b1383cffd0227652cacc.tar.gz |
Optimize sending of batch messages
-rw-r--r-- | kafka/producer.py | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 5eb1bd8..ce71d66 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -104,13 +104,16 @@ class Producer(object): msgset[partition].append(msg) # Send collected requests upstream + reqs = [] for partition, messages in msgset.items(): - try: - req = ProduceRequest(self.topic, partition, messages) - self.client.send_produce_request([req], acks=self.req_acks, - timeout=self.ack_timeout) - except Exception as exp: - log.error("Error sending message", exc_info=sys.exc_info()) + req = ProduceRequest(self.topic, partition, messages) + reqs.append(req) + + try: + self.client.send_produce_request(reqs, acks=self.req_acks, + timeout=self.ack_timeout) + except Exception as exp: + log.error("Error sending message", exc_info=sys.exc_info()) def send_messages(self, partition, *msg): """ |