summaryrefslogtreecommitdiff
path: root/kafka/producer.py
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-10-08 08:50:52 +0530
committerMahendra M <mahendra.m@gmail.com>2013-10-08 08:50:52 +0530
commitf9cf62816ff2c2255d414a2d9f3dd32d8c81418b (patch)
treeb43b90fcdaaef0839329b20a02c79f8229773b26 /kafka/producer.py
parent75de0f00956eb7cf0394fcfabb6a7d63057409fe (diff)
parenteb2c1735f26ce11540fb92ea94817f43b9b3a798 (diff)
downloadkafka-python-f9cf62816ff2c2255d414a2d9f3dd32d8c81418b.tar.gz
Merge branch 'master' into prod-windows
Conflicts: kafka/producer.py
Diffstat (limited to 'kafka/producer.py')
-rw-r--r--kafka/producer.py11
1 files changed, 8 insertions, 3 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index a7bfe28..7ef7896 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -7,6 +7,7 @@ import logging
import sys
from kafka.common import ProduceRequest
+from kafka.common import FailedPayloadsException
from kafka.protocol import create_message
from kafka.partitioner import HashedPartitioner
@@ -67,7 +68,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
acks=req_acks,
timeout=ack_timeout)
except Exception as exp:
- log.error("Error sending message", exc_info=sys.exc_info())
+ log.exception("Unable to send message")
class Producer(object):
@@ -140,8 +141,12 @@ class Producer(object):
else:
messages = [create_message(m) for m in msg]
req = ProduceRequest(self.topic, partition, messages)
- resp = self.client.send_produce_request([req], acks=self.req_acks,
- timeout=self.ack_timeout)
+ try:
+ resp = self.client.send_produce_request([req], acks=self.req_acks,
+ timeout=self.ack_timeout)
+ except Exception as e:
+ log.exception("Unable to send messages")
+ raise e
return resp
def stop(self, timeout=1):