summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlya Tyaptin <ityaptin@mirantis.com>2016-03-21 11:59:12 +0300
committerNadya Shakhat <nprivalova@mirantis.com>2016-04-07 16:17:22 +0300
commitba18a7b4cc11fe2469d201cee5e44e45094375d6 (patch)
tree4a9b509e642873e253677ac3046db753eaad30ae
parent95505f2e6bcc66f9eb5f8ba036e8a7942787ccfa (diff)
downloadoslo-messaging-ba18a7b4cc11fe2469d201cee5e44e45094375d6.tar.gz
[Kafka] Ensure a topics before consume messages
Currently we trying ot fetch messages from the topics even they have bot been created yet. This behaviour causes a KafkaConfigurationError which are raised in the kafka driver. Change-Id: I78cfd5ac24fbf37be5649232d0bc825319cf6402 Closes-bug: #1557521
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py3
1 files changed, 3 insertions, 0 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index a9aa58c..dfd0ed0 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -226,6 +226,9 @@ class Connection(object):
self.kafka_client = None
def declare_topic_consumer(self, topics, group=None):
+ self._ensure_connection()
+ for topic in topics:
+ self.kafka_client.ensure_topic_exists(topic)
self.consumer = kafka.KafkaConsumer(
*topics, group_id=group,
bootstrap_servers=["%s:%s" % (self.host, str(self.port))],