diff options
author | Ilya Tyaptin <ityaptin@mirantis.com> | 2016-03-10 16:28:40 +0300 |
---|---|---|
committer | Nadya Shakhat <nprivalova@mirantis.com> | 2016-04-07 16:16:59 +0300 |
commit | 95505f2e6bcc66f9eb5f8ba036e8a7942787ccfa (patch) | |
tree | 8712fa823e8f2f923c9cde4ad878c925ba5c8d49 | |
parent | bbe5213ea714f06c497a06a204b7ae7e8ea693fc (diff) | |
download | oslo-messaging-95505f2e6bcc66f9eb5f8ba036e8a7942787ccfa.tar.gz |
Use only unique topics for the Kafka driver
Consumer in Kafka driver should use only unique topic,
otherwise a FetchDuplicate exception will be raised.
Change-Id: I569ce446eaf05dbc3a7fd0b41a2307e940ab87fb
Closes-bug: #1555081
-rw-r--r-- | oslo_messaging/_drivers/impl_kafka.py | 4 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_kafka.py | 20 |
2 files changed, 22 insertions, 2 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 49e39d1..a9aa58c 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -351,9 +351,9 @@ class KafkaDriver(base.BaseDriver): :type pool: string """ conn = self._get_connection(purpose=PURPOSE_LISTEN) - topics = [] + topics = set() for target, priority in targets_and_priorities: - topics.append(target_to_topic(target, priority)) + topics.add(target_to_topic(target, priority)) conn.declare_topic_consumer(topics, pool) diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index 7f1d5e3..057ec1e 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -208,6 +208,26 @@ class TestKafkaListener(test_utils.BaseTestCase): @mock.patch.object(kafka_driver.Connection, '_ensure_connection') @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer') + def test_converting_targets_to_topics(self, fake_consumer, + fake_ensure_connection): + fake_targets_and_priorities = [ + (oslo_messaging.Target(topic="fake_topic", + exchange="test1"), 'info'), + (oslo_messaging.Target(topic="fake_topic", + exchange="test2"), 'info'), + (oslo_messaging.Target(topic="fake_topic", + exchange="test1"), 'error'), + (oslo_messaging.Target(topic="fake_topic", + exchange="test3"), 'error'), + ] + self.driver.listen_for_notifications(fake_targets_and_priorities) + self.assertEqual(1, len(fake_consumer.mock_calls)) + fake_consumer.assert_called_once_with(set(['fake_topic.error', + 'fake_topic.info']), + None) + + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') + @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer') def test_stop_listener(self, fake_consumer, fake_client): fake_target = oslo_messaging.Target(topic='fake_topic') fake_targets_and_priorities = [(fake_target, 'info')] |