diff options
author | Zuul <zuul@review.opendev.org> | 2022-12-21 07:46:22 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2022-12-21 07:46:22 +0000 |
commit | 9f710ce6cd955eaf6d7f9b474efa21df3d531be0 (patch) | |
tree | 60b6996b831030f15b2c736a7bd2c1702d7eed58 /oslo_messaging/tests/drivers/test_impl_kafka.py | |
parent | bd73f14fd2b8fb7e5587888af126fc59867e4a36 (diff) | |
parent | 43f2224aacb668aa51de3d1274ff8939d8aa73ae (diff) | |
download | oslo-messaging-9f710ce6cd955eaf6d7f9b474efa21df3d531be0.tar.gz |
Merge "Remove logging from ProducerConnection._produce_message"14.1.0
Diffstat (limited to 'oslo_messaging/tests/drivers/test_impl_kafka.py')
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_kafka.py | 32 |
1 files changed, 32 insertions, 0 deletions
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index 77b2ed6..5e78369 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -15,6 +15,8 @@ import testscenarios from unittest import mock +from confluent_kafka import KafkaException + import oslo_messaging from oslo_messaging._drivers import impl_kafka as kafka_driver from oslo_messaging.tests import utils as test_utils @@ -120,6 +122,36 @@ class TestKafkaDriver(test_utils.BaseTestCase): 'ssl.key.password': '', }) + def test_send_notification_retries_on_buffer_error(self): + target = oslo_messaging.Target(topic="topic_test") + + with mock.patch("confluent_kafka.Producer") as producer: + fake_producer = mock.MagicMock() + fake_producer.produce = mock.Mock( + side_effect=[BufferError, BufferError, None]) + producer.return_value = fake_producer + + self.driver.send_notification( + target, {}, {"payload": ["test_1"]}, + None, retry=3) + + assert fake_producer.produce.call_count == 3 + + def test_send_notification_stops_on_kafka_error(self): + target = oslo_messaging.Target(topic="topic_test") + + with mock.patch("confluent_kafka.Producer") as producer: + fake_producer = mock.MagicMock() + fake_producer.produce = mock.Mock( + side_effect=[KafkaException, None]) + producer.return_value = fake_producer + + self.driver.send_notification( + target, {}, {"payload": ["test_1"]}, + None, retry=3) + + assert fake_producer.produce.call_count == 1 + def test_listen(self): target = oslo_messaging.Target(topic="topic_test") self.assertRaises(NotImplementedError, self.driver.listen, target, |