diff options
author | David Arthur <mumrah@gmail.com> | 2013-10-07 14:16:13 -0700 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-10-07 14:16:13 -0700 |
commit | eb2c1735f26ce11540fb92ea94817f43b9b3a798 (patch) | |
tree | 0c036a3d0691c6c7f69166bdc373fbe4e79b2705 /kafka/producer.py | |
parent | cfd9f86e60429d1f7af8bcac5849808354b8719e (diff) | |
parent | a6c99b287b1cf9c39068be74d72150808588dd43 (diff) | |
download | kafka-python-eb2c1735f26ce11540fb92ea94817f43b9b3a798.tar.gz |
Merge pull request #55 from quixey/fault-tolerance
Improve fault tolerance by handling leadership election and other metadata changes
Thanks, @jimjh!
Diffstat (limited to 'kafka/producer.py')
-rw-r--r-- | kafka/producer.py | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 5f23285..cceb584 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -7,6 +7,7 @@ import logging import sys from kafka.common import ProduceRequest +from kafka.common import FailedPayloadsException from kafka.protocol import create_message from kafka.partitioner import HashedPartitioner @@ -113,7 +114,7 @@ class Producer(object): self.client.send_produce_request(reqs, acks=self.req_acks, timeout=self.ack_timeout) except Exception: - log.error("Error sending message", exc_info=sys.exc_info()) + log.exception("Unable to send message") def send_messages(self, partition, *msg): """ @@ -126,8 +127,12 @@ class Producer(object): else: messages = [create_message(m) for m in msg] req = ProduceRequest(self.topic, partition, messages) - resp = self.client.send_produce_request([req], acks=self.req_acks, - timeout=self.ack_timeout) + try: + resp = self.client.send_produce_request([req], acks=self.req_acks, + timeout=self.ack_timeout) + except Exception as e: + log.exception("Unable to send messages") + raise e return resp def stop(self, timeout=1): |