diff options
author | Ilya Tyaptin <ityaptin@mirantis.com> | 2016-03-21 11:59:12 +0300 |
---|---|---|
committer | Nadya Shakhat <nprivalova@mirantis.com> | 2016-04-07 16:17:22 +0300 |
commit | ba18a7b4cc11fe2469d201cee5e44e45094375d6 (patch) | |
tree | 4a9b509e642873e253677ac3046db753eaad30ae | |
parent | 95505f2e6bcc66f9eb5f8ba036e8a7942787ccfa (diff) | |
download | oslo-messaging-ba18a7b4cc11fe2469d201cee5e44e45094375d6.tar.gz |
[Kafka] Ensure a topics before consume messages
Currently we trying ot fetch messages from the topics even
they have bot been created yet. This behaviour causes a
KafkaConfigurationError which are raised in the kafka driver.
Change-Id: I78cfd5ac24fbf37be5649232d0bc825319cf6402
Closes-bug: #1557521
-rw-r--r-- | oslo_messaging/_drivers/impl_kafka.py | 3 |
1 files changed, 3 insertions, 0 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index a9aa58c..dfd0ed0 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -226,6 +226,9 @@ class Connection(object): self.kafka_client = None def declare_topic_consumer(self, topics, group=None): + self._ensure_connection() + for topic in topics: + self.kafka_client.ensure_topic_exists(topic) self.consumer = kafka.KafkaConsumer( *topics, group_id=group, bootstrap_servers=["%s:%s" % (self.host, str(self.port))], |