summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlya Tyaptin <ityaptin@mirantis.com>2016-03-10 16:28:40 +0300
committerNadya Shakhat <nprivalova@mirantis.com>2016-04-07 16:16:59 +0300
commit95505f2e6bcc66f9eb5f8ba036e8a7942787ccfa (patch)
tree8712fa823e8f2f923c9cde4ad878c925ba5c8d49
parentbbe5213ea714f06c497a06a204b7ae7e8ea693fc (diff)
downloadoslo-messaging-95505f2e6bcc66f9eb5f8ba036e8a7942787ccfa.tar.gz
Use only unique topics for the Kafka driver
Consumer in Kafka driver should use only unique topic, otherwise a FetchDuplicate exception will be raised. Change-Id: I569ce446eaf05dbc3a7fd0b41a2307e940ab87fb Closes-bug: #1555081
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py4
-rw-r--r--oslo_messaging/tests/drivers/test_impl_kafka.py20
2 files changed, 22 insertions, 2 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 49e39d1..a9aa58c 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -351,9 +351,9 @@ class KafkaDriver(base.BaseDriver):
:type pool: string
"""
conn = self._get_connection(purpose=PURPOSE_LISTEN)
- topics = []
+ topics = set()
for target, priority in targets_and_priorities:
- topics.append(target_to_topic(target, priority))
+ topics.add(target_to_topic(target, priority))
conn.declare_topic_consumer(topics, pool)
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py
index 7f1d5e3..057ec1e 100644
--- a/oslo_messaging/tests/drivers/test_impl_kafka.py
+++ b/oslo_messaging/tests/drivers/test_impl_kafka.py
@@ -208,6 +208,26 @@ class TestKafkaListener(test_utils.BaseTestCase):
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
@mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
+ def test_converting_targets_to_topics(self, fake_consumer,
+ fake_ensure_connection):
+ fake_targets_and_priorities = [
+ (oslo_messaging.Target(topic="fake_topic",
+ exchange="test1"), 'info'),
+ (oslo_messaging.Target(topic="fake_topic",
+ exchange="test2"), 'info'),
+ (oslo_messaging.Target(topic="fake_topic",
+ exchange="test1"), 'error'),
+ (oslo_messaging.Target(topic="fake_topic",
+ exchange="test3"), 'error'),
+ ]
+ self.driver.listen_for_notifications(fake_targets_and_priorities)
+ self.assertEqual(1, len(fake_consumer.mock_calls))
+ fake_consumer.assert_called_once_with(set(['fake_topic.error',
+ 'fake_topic.info']),
+ None)
+
+ @mock.patch.object(kafka_driver.Connection, '_ensure_connection')
+ @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
def test_stop_listener(self, fake_consumer, fake_client):
fake_target = oslo_messaging.Target(topic='fake_topic')
fake_targets_and_priorities = [(fake_target, 'info')]