diff options
author | Jim Lim <jim@quixey.com> | 2013-09-27 14:02:10 -0700 |
---|---|---|
committer | Jim Lim <jim@quixey.com> | 2013-10-04 14:49:44 -0700 |
commit | a6c99b287b1cf9c39068be74d72150808588dd43 (patch) | |
tree | 0c036a3d0691c6c7f69166bdc373fbe4e79b2705 /kafka/producer.py | |
parent | cfd9f86e60429d1f7af8bcac5849808354b8719e (diff) | |
download | kafka-python-a6c99b287b1cf9c39068be74d72150808588dd43.tar.gz |
make changes to be more fault tolerant: clean up connections, brokers, failed_messages
- add integration tests for sync producer
- add integration tests for async producer w. leadership election
- use log.exception
Diffstat (limited to 'kafka/producer.py')
-rw-r--r-- | kafka/producer.py | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 5f23285..cceb584 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -7,6 +7,7 @@ import logging import sys from kafka.common import ProduceRequest +from kafka.common import FailedPayloadsException from kafka.protocol import create_message from kafka.partitioner import HashedPartitioner @@ -113,7 +114,7 @@ class Producer(object): self.client.send_produce_request(reqs, acks=self.req_acks, timeout=self.ack_timeout) except Exception: - log.error("Error sending message", exc_info=sys.exc_info()) + log.exception("Unable to send message") def send_messages(self, partition, *msg): """ @@ -126,8 +127,12 @@ class Producer(object): else: messages = [create_message(m) for m in msg] req = ProduceRequest(self.topic, partition, messages) - resp = self.client.send_produce_request([req], acks=self.req_acks, - timeout=self.ack_timeout) + try: + resp = self.client.send_produce_request([req], acks=self.req_acks, + timeout=self.ack_timeout) + except Exception as e: + log.exception("Unable to send messages") + raise e return resp def stop(self, timeout=1): |