From 4915942b07236ca28731dab2fab80c0e93c14bf6 Mon Sep 17 00:00:00 2001 From: Zack Dever Date: Thu, 17 Mar 2016 12:17:03 -0700 Subject: catch all errors thrown by _get_leader_for_partition in SimpleClient --- kafka/client.py | 3 ++- test/test_client.py | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 4b5a043..11f54eb 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -169,7 +169,8 @@ class SimpleClient(object): for payload in payloads: try: leader = self._get_leader_for_partition(payload.topic, payload.partition) - except KafkaUnavailableError: + except (KafkaUnavailableError, LeaderNotAvailableError, + UnknownTopicOrPartitionError): leader = None payloads_by_broker[leader].append(payload) return dict(payloads_by_broker) diff --git a/test/test_client.py b/test/test_client.py index 5a35c83..a53fce1 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -11,7 +11,7 @@ from kafka.common import ( BrokerMetadata, TopicPartition, KafkaUnavailableError, LeaderNotAvailableError, UnknownTopicOrPartitionError, - KafkaTimeoutError, ConnectionError + KafkaTimeoutError, ConnectionError, FailedPayloadsError ) from kafka.conn import KafkaConnection from kafka.future import Future @@ -361,7 +361,7 @@ class TestSimpleClient(unittest.TestCase): "topic_noleader", 0, [create_message("a"), create_message("b")])] - with self.assertRaises(LeaderNotAvailableError): + with self.assertRaises(FailedPayloadsError): client.send_produce_request(requests) @patch('kafka.SimpleClient._get_conn') @@ -386,7 +386,7 @@ class TestSimpleClient(unittest.TestCase): "topic_doesnt_exist", 0, [create_message("a"), create_message("b")])] - with self.assertRaises(UnknownTopicOrPartitionError): + with self.assertRaises(FailedPayloadsError): client.send_produce_request(requests) def test_timeout(self): -- cgit v1.2.1