summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKomei Shimamura <komei.t.f@gmail.com>2015-06-05 23:05:29 -0700
committerOleksii Zamiatin <ozamiatin@mirantis.com>2015-12-01 14:20:33 +0000
commit67c63031f5cb1a675686fc2648ce27f6e36ee254 (patch)
tree4671a119f589f9ea67743f9a64e02d8a3d9ef070
parent33c1010c3281804456a22b769c4bac5ac6a7cca1 (diff)
downloadoslo-messaging-67c63031f5cb1a675686fc2648ce27f6e36ee254.tar.gz
Add a driver for Apache Kafka
Adding a driver for Apache Kafka connection, supporting notification via Kafka. This driver is experimental until having functional and integration tests Change-Id: I7a5d8e3259b21d5e29ed3b795d04952e1d13ad08 Implements: blueprint adding-kafka-support
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py363
-rw-r--r--oslo_messaging/tests/drivers/test_impl_kafka.py288
-rw-r--r--setup.cfg3
-rw-r--r--test-requirements.txt3
4 files changed, 657 insertions, 0 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
new file mode 100644
index 0000000..9be417b
--- /dev/null
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -0,0 +1,363 @@
+# Copyright (C) 2015 Cisco Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import threading
+
+from oslo_messaging._drivers import base
+from oslo_messaging._drivers import common as driver_common
+from oslo_messaging._drivers import pool as driver_pool
+from oslo_messaging._i18n import _LE
+from oslo_messaging._i18n import _LW
+from oslo_serialization import jsonutils
+
+import kafka
+from kafka.common import KafkaError
+from oslo_config import cfg
+from oslo_log import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+PURPOSE_SEND = 'send'
+PURPOSE_LISTEN = 'listen'
+
+kafka_opts = [
+ cfg.StrOpt('kafka_default_host', default='localhost',
+ help='Default Kafka broker Host'),
+
+ cfg.IntOpt('kafka_default_port', default=9092,
+ help='Default Kafka broker Port'),
+
+ cfg.IntOpt('kafka_max_fetch_bytes', default=1024 * 1024,
+ help='Max fetch bytes of Kafka consumer'),
+
+ cfg.IntOpt('kafka_consumer_timeout', default=1.0,
+ help='Default timeout(s) for Kafka consumers'),
+
+ cfg.IntOpt('pool_size', default=10,
+ help='Pool Size for Kafka Consumers'),
+]
+
+CONF = cfg.CONF
+
+
+def pack_context_with_message(ctxt, msg):
+ """Pack context into msg."""
+ if isinstance(ctxt, dict):
+ context_d = ctxt
+ else:
+ context_d = ctxt.to_dict()
+
+ return {'message': msg, 'context': context_d}
+
+
+def target_to_topic(target):
+ """Convert target into topic string
+
+ :param target: Message destination target
+ :type target: oslo_messaging.Target
+ """
+ if target.exchange is None:
+ return target.topic
+ return "%s_%s" % (target.exchange, target.topic)
+
+
+class Connection(object):
+
+ def __init__(self, conf, url, purpose):
+
+ driver_conf = conf.oslo_messaging_kafka
+
+ self.conf = conf
+ self.kafka_client = None
+ self.producer = None
+ self.consumer = None
+ self.fetch_messages_max_bytes = driver_conf.kafka_max_fetch_bytes
+ self.consumer_timeout = float(driver_conf.kafka_consumer_timeout)
+ self.url = url
+ self._parse_url()
+ # TODO(Support for manual/auto_commit functionality)
+ # When auto_commit is False, consumer can manually notify
+ # the completion of the subscription.
+ # Currently we don't support for non auto commit option
+ self.auto_commit = True
+ self._consume_loop_stopped = False
+
+ def _parse_url(self):
+ driver_conf = self.conf.oslo_messaging_kafka
+ try:
+ self.host = self.url.hosts[0].hostname
+ except (NameError, IndexError):
+ self.host = driver_conf.kafka_default_host
+
+ try:
+ self.port = self.url.hosts[0].port
+ except (NameError, IndexError):
+ self.port = driver_conf.kafka_default_port
+
+ if self.host is None:
+ self.host = driver_conf.kafka_default_host
+
+ if self.port is None:
+ self.port = driver_conf.kafka_default_port
+
+ def notify_send(self, topic, ctxt, msg, retry):
+ """Send messages to Kafka broker.
+
+ :param topic: String of the topic
+ :param ctxt: context for the messages
+ :param msg: messages for publishing
+ :param retry: the number of retry
+ """
+ message = pack_context_with_message(ctxt, msg)
+ self._ensure_connection()
+ self._send_and_retry(message, topic, retry)
+
+ def _send_and_retry(self, message, topic, retry):
+ current_retry = 0
+ if not isinstance(message, str):
+ message = jsonutils.dumps(message)
+ while message is not None:
+ try:
+ self._send(message, topic)
+ message = None
+ except Exception:
+ LOG.warn(_LW("Failed to publish a message of topic %s"), topic)
+ current_retry += 1
+ if retry is not None and current_retry >= retry:
+ LOG.exception(_LE("Failed to retry to send data "
+ "with max retry times"))
+ message = None
+
+ def _send(self, message, topic):
+ self.producer.send_messages(topic, message)
+
+ def consume(self, timeout=None):
+ """recieve messages as many as max_fetch_messages.
+
+ In this functions, there are no while loop to subscribe.
+ This would be helpful when we wants to control the velocity of
+ subscription.
+ """
+ duration = (self.consumer_timeout if timeout is None else timeout)
+ timer = driver_common.DecayingTimer(duration=duration)
+ timer.start()
+
+ def _raise_timeout():
+ LOG.debug('Timed out waiting for Kafka response')
+ raise driver_common.Timeout()
+
+ poll_timeout = (self.consumer_timeout if timeout is None
+ else min(timeout, self.consumer_timeout))
+
+ while True:
+ if self._consume_loop_stopped:
+ return
+ try:
+ next_timeout = poll_timeout * 1000.0
+ # TODO(use configure() method instead)
+ # Currently KafkaConsumer does not support for
+ # the case of updating only fetch_max_wait_ms parameter
+ self.consumer._config['fetch_max_wait_ms'] = next_timeout
+ messages = list(self.consumer.fetch_messages())
+ except Exception as e:
+ LOG.exception(_LE("Failed to consume messages: %s"), e)
+ messages = None
+
+ if not messages:
+ poll_timeout = timer.check_return(
+ _raise_timeout, maximum=self.consumer_timeout)
+ continue
+
+ return messages
+
+ def stop_consuming(self):
+ self._consume_loop_stopped = True
+
+ def reset(self):
+ """Reset a connection so it can be used again."""
+ if self.kafka_client:
+ self.kafka_client.close()
+ self.kafka_client = None
+ if self.producer:
+ self.producer.stop()
+ self.producer = None
+ self.consumer = None
+
+ def close(self):
+ if self.kafka_client:
+ self.kafka_client.close()
+ self.kafka_client = None
+ if self.producer:
+ self.producer.stop()
+ self.consumer = None
+
+ def commit(self):
+ """Commit is used by subscribers belonging to the same group.
+ After subscribing messages, commit is called to prevent
+ the other subscribers which belong to the same group
+ from re-subscribing the same messages.
+
+ Currently self.auto_commit option is always True,
+ so we don't need to call this function.
+ """
+ self.consumer.commit()
+
+ def _ensure_connection(self):
+ if self.kafka_client:
+ return
+ try:
+ self.kafka_client = kafka.KafkaClient(
+ "%s:%s" % (self.host, str(self.port)))
+ self.producer = kafka.SimpleProducer(self.kafka_client)
+ except KafkaError as e:
+ LOG.exception(_LE("Kafka Connection is not available: %s"), e)
+ self.kafka_client = None
+
+ def declare_topic_consumer(self, topics, group=None):
+ self.consumer = kafka.KafkaConsumer(
+ *topics, group_id=group,
+ metadata_broker_list=["%s:%s" % (self.host, str(self.port))],
+ # auto_commit_enable=self.auto_commit,
+ fetch_message_max_bytes=self.fetch_messages_max_bytes)
+
+
+class OsloKafkaMessage(base.IncomingMessage):
+
+ def __init__(self, listener, ctxt, message):
+ super(OsloKafkaMessage, self).__init__(listener, ctxt, message)
+
+ def requeue(self):
+ LOG.warn(_LW("requeue is not supported"))
+
+ def reply(self, reply=None, failure=None, log_failure=True):
+ LOG.warn(_LW("reply is not supported"))
+
+
+class KafkaListener(base.Listener):
+
+ def __init__(self, driver, conn):
+ super(KafkaListener, self).__init__(driver)
+ self._stopped = threading.Event()
+ self.conn = conn
+ self.incoming_queue = []
+
+ def poll(self, timeout=None):
+ while not self._stopped.is_set():
+ if self.incoming_queue:
+ return self.incoming_queue.pop(0)
+ try:
+ messages = self.conn.consume(timeout=timeout)
+ for msg in messages:
+ message = msg.value
+ message = jsonutils.loads(message)
+ self.incoming_queue.append(OsloKafkaMessage(
+ listener=self, ctxt=message['context'],
+ message=message['message']))
+ except driver_common.Timeout:
+ return None
+
+ def stop(self):
+ self._stopped.set()
+ self.conn.stop_consuming()
+
+ def cleanup(self):
+ self.conn.close()
+
+ def commit(self):
+ # TODO(Support for manually/auto commit functionality)
+ # It's better to allow users to commit manually and support for
+ # self.auto_commit = False option. For now, this commit function
+ # is meaningless since user couldn't call this function and
+ # auto_commit option is always True.
+ self.conn.commit()
+
+
+class KafkaDriver(base.BaseDriver):
+ """Note: Current implementation of this driver is experimental.
+ We will have functional and/or integrated testing enabled for this driver.
+ """
+
+ def __init__(self, conf, url, default_exchange=None,
+ allowed_remote_exmods=None):
+
+ opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
+ title='Kafka driver options')
+ conf.register_group(opt_group)
+ conf.register_opts(kafka_opts, group=opt_group)
+
+ super(KafkaDriver, self).__init__(
+ conf, url, default_exchange, allowed_remote_exmods)
+
+ self.connection_pool = driver_pool.ConnectionPool(
+ self.conf, self.conf.oslo_messaging_kafka.pool_size,
+ self._url, Connection)
+ self.listeners = []
+
+ def cleanup(self):
+ for c in self.listeners:
+ c.close()
+ self.listeners = []
+
+ def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
+ retry=None):
+ raise NotImplementedError(
+ 'The RPC implementation for Kafka is not implemented')
+
+ def send_notification(self, target, ctxt, message, version, retry=None):
+ """Send notification to Kafka brokers
+
+ :param target: Message destination target
+ :type target: oslo_messaging.Target
+ :param ctxt: Message context
+ :type ctxt: dict
+ :param message: Message payload to pass
+ :type message: dict
+ :param version: Messaging API version (currently not used)
+ :type version: str
+ :param retry: an optional default kafka consumer retries configuration
+ None means to retry forever
+ 0 means no retry
+ N means N retries
+ :type retry: int
+ """
+ with self._get_connection(purpose=PURPOSE_SEND) as conn:
+ conn.notify_send(target_to_topic(target), ctxt, message, retry)
+
+ def listen(self, target):
+ raise NotImplementedError(
+ 'The RPC implementation for Kafka is not implemented')
+
+ def listen_for_notifications(self, targets_and_priorities, pool=None):
+ """Listen to a specified list of targets on Kafka brokers
+
+ :param targets_and_priorities: List of pairs (target, priority)
+ priority is not used for kafka driver
+ target.exchange_target.topic is used as
+ a kafka topic
+ :type targets_and_priorities: list
+ :param pool: consumer group of Kafka consumers
+ :type pool: string
+ """
+ conn = self._get_connection(purpose=PURPOSE_LISTEN)
+ topics = []
+ for target, priority in targets_and_priorities:
+ topics.append(target_to_topic(target))
+
+ conn.declare_topic_consumer(topics, pool)
+
+ listener = KafkaListener(self, conn)
+ return listener
+
+ def _get_connection(self, purpose):
+ return driver_common.ConnectionContext(self.connection_pool, purpose)
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py
new file mode 100644
index 0000000..6f25b2c
--- /dev/null
+++ b/oslo_messaging/tests/drivers/test_impl_kafka.py
@@ -0,0 +1,288 @@
+# Copyright (C) 2015 Cisco Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import json
+import kafka
+from kafka.common import KafkaError
+import mock
+import testscenarios
+from testtools.testcase import unittest
+import time
+
+import oslo_messaging
+from oslo_messaging._drivers import common as driver_common
+from oslo_messaging._drivers import impl_kafka as kafka_driver
+from oslo_messaging.tests import utils as test_utils
+
+load_tests = testscenarios.load_tests_apply_scenarios
+
+KAFKA_BROKER = 'localhost:9092'
+KAFKA_BROKER_URL = 'kafka://localhost:9092'
+
+
+def _is_kafka_service_running():
+ """Checks whether the Kafka service is running or not"""
+ kafka_running = True
+ try:
+ broker = KAFKA_BROKER
+ kafka.KafkaClient(broker)
+ except KafkaError:
+ # Kafka service is not running.
+ kafka_running = False
+ return kafka_running
+
+
+class TestKafkaDriverLoad(test_utils.BaseTestCase):
+
+ def setUp(self):
+ super(TestKafkaDriverLoad, self).setUp()
+ self.messaging_conf.transport_driver = 'kafka'
+
+ def test_driver_load(self):
+ transport = oslo_messaging.get_transport(self.conf)
+ self.assertIsInstance(transport._driver, kafka_driver.KafkaDriver)
+
+
+class TestKafkaTransportURL(test_utils.BaseTestCase):
+
+ scenarios = [
+ ('none', dict(url=None,
+ expected=[dict(host='localhost', port=9092)])),
+ ('empty', dict(url='kafka:///',
+ expected=[dict(host='localhost', port=9092)])),
+ ('host', dict(url='kafka://127.0.0.1',
+ expected=[dict(host='127.0.0.1', port=9092)])),
+ ('port', dict(url='kafka://localhost:1234',
+ expected=[dict(host='localhost', port=1234)])),
+ ]
+
+ def setUp(self):
+ super(TestKafkaTransportURL, self).setUp()
+ self.messaging_conf.transport_driver = 'kafka'
+
+ def test_transport_url(self):
+ transport = oslo_messaging.get_transport(self.conf, self.url)
+ self.addCleanup(transport.cleanup)
+ driver = transport._driver
+
+ conn = driver._get_connection(kafka_driver.PURPOSE_SEND)
+ self.assertEqual(self.expected[0]['host'], conn.host)
+ self.assertEqual(self.expected[0]['port'], conn.port)
+
+
+class TestKafkaDriver(test_utils.BaseTestCase):
+ """Unit Test cases to test the kafka driver
+ """
+
+ def setUp(self):
+ super(TestKafkaDriver, self).setUp()
+ self.messaging_conf.transport_driver = 'kafka'
+ transport = oslo_messaging.get_transport(self.conf)
+ self.driver = transport._driver
+
+ def test_send(self):
+ target = oslo_messaging.Target(topic="topic_test")
+ self.assertRaises(NotImplementedError,
+ self.driver.send, target, {}, {})
+
+ def test_send_notification(self):
+ target = oslo_messaging.Target(topic="topic_test")
+
+ with mock.patch.object(
+ kafka_driver.Connection, 'notify_send') as fake_send:
+ self.driver.send_notification(target, {}, {}, None)
+ self.assertEquals(1, len(fake_send.mock_calls))
+
+ def test_listen(self):
+ target = oslo_messaging.Target(topic="topic_test")
+ self.assertRaises(NotImplementedError, self.driver.listen, target)
+
+
+class TestKafkaConnection(test_utils.BaseTestCase):
+
+ def setUp(self):
+ super(TestKafkaConnection, self).setUp()
+ self.messaging_conf.transport_driver = 'kafka'
+ transport = oslo_messaging.get_transport(self.conf)
+ self.driver = transport._driver
+
+ @mock.patch.object(kafka_driver.Connection, '_ensure_connection')
+ @mock.patch.object(kafka_driver.Connection, '_send')
+ def test_notify(self, fake_send, fake_ensure_connection):
+ conn = self.driver._get_connection(kafka_driver.PURPOSE_SEND)
+ conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
+ {"fake_text": "fake_message_1"}, 10)
+ self.assertEqual(1, len(fake_send.mock_calls))
+
+ @mock.patch.object(kafka_driver.Connection, '_ensure_connection')
+ @mock.patch.object(kafka_driver.Connection, '_send')
+ def test_notify_with_retry(self, fake_send, fake_ensure_connection):
+ conn = self.driver._get_connection(kafka_driver.PURPOSE_SEND)
+ fake_send.side_effect = KafkaError("fake_exception")
+ conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
+ {"fake_text": "fake_message_2"}, 10)
+ self.assertEqual(10, len(fake_send.mock_calls))
+
+ @mock.patch.object(kafka_driver.Connection, '_ensure_connection')
+ @mock.patch.object(kafka_driver.Connection, '_parse_url')
+ def test_consume(self, fake_parse_url, fake_ensure_connection):
+ fake_message = {
+ "context": {"fake": "fake_context_1"},
+ "message": {"fake": "fake_message_1"}}
+
+ conn = kafka_driver.Connection(
+ self.conf, '', kafka_driver.PURPOSE_LISTEN)
+
+ conn.consumer = mock.MagicMock()
+ conn.consumer.fetch_messages = mock.MagicMock(
+ return_value=iter([json.dumps(fake_message)]))
+
+ self.assertEqual(fake_message, json.loads(conn.consume()[0]))
+ self.assertEqual(1, len(conn.consumer.fetch_messages.mock_calls))
+
+ @mock.patch.object(kafka_driver.Connection, '_ensure_connection')
+ @mock.patch.object(kafka_driver.Connection, '_parse_url')
+ def test_consume_timeout(self, fake_parse_url, fake_ensure_connection):
+ deadline = time.time() + 3
+ conn = kafka_driver.Connection(
+ self.conf, '', kafka_driver.PURPOSE_LISTEN)
+
+ conn.consumer = mock.MagicMock()
+ conn.consumer.fetch_messages = mock.MagicMock(return_value=iter([]))
+
+ self.assertRaises(driver_common.Timeout, conn.consume, timeout=3)
+ self.assertEqual(0, int(deadline - time.time()))
+
+ @mock.patch.object(kafka_driver.Connection, '_ensure_connection')
+ @mock.patch.object(kafka_driver.Connection, '_parse_url')
+ def test_consume_with_default_timeout(
+ self, fake_parse_url, fake_ensure_connection):
+ deadline = time.time() + 1
+ conn = kafka_driver.Connection(
+ self.conf, '', kafka_driver.PURPOSE_LISTEN)
+
+ conn.consumer = mock.MagicMock()
+ conn.consumer.fetch_messages = mock.MagicMock(return_value=iter([]))
+
+ self.assertRaises(driver_common.Timeout, conn.consume)
+ self.assertEqual(0, int(deadline - time.time()))
+
+ @mock.patch.object(kafka_driver.Connection, '_ensure_connection')
+ @mock.patch.object(kafka_driver.Connection, '_parse_url')
+ def test_consume_timeout_without_consumers(
+ self, fake_parse_url, fake_ensure_connection):
+ deadline = time.time() + 3
+ conn = kafka_driver.Connection(
+ self.conf, '', kafka_driver.PURPOSE_LISTEN)
+ conn.consumer = mock.MagicMock(return_value=None)
+
+ self.assertRaises(driver_common.Timeout, conn.consume, timeout=3)
+ self.assertEqual(0, int(deadline - time.time()))
+
+
+class TestKafkaListener(test_utils.BaseTestCase):
+
+ def setUp(self):
+ super(TestKafkaListener, self).setUp()
+ self.messaging_conf.transport_driver = 'kafka'
+ transport = oslo_messaging.get_transport(self.conf)
+ self.driver = transport._driver
+
+ @mock.patch.object(kafka_driver.Connection, '_ensure_connection')
+ @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
+ def test_create_listener(self, fake_consumer, fake_ensure_connection):
+ fake_target = oslo_messaging.Target(topic='fake_topic')
+ fake_targets_and_priorities = [(fake_target, 'info')]
+ listener = self.driver.listen_for_notifications(
+ fake_targets_and_priorities)
+ self.assertEqual(1, len(fake_consumer.mock_calls))
+
+ @mock.patch.object(kafka_driver.Connection, '_ensure_connection')
+ @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
+ def test_stop_listener(self, fake_consumer, fake_client):
+ fake_target = oslo_messaging.Target(topic='fake_topic')
+ fake_targets_and_priorities = [(fake_target, 'info')]
+ listener = self.driver.listen_for_notifications(
+ fake_targets_and_priorities)
+ listener.conn.consume = mock.MagicMock()
+ listener.conn.consume.return_value = (
+ iter([kafka.common.KafkaMessage(
+ topic='fake_topic', partition=0, offset=0,
+ key=None, value='{"message": {"fake": "fake_message_1"},'
+ '"context": {"fake": "fake_context_1"}}')]))
+ listener.poll()
+ self.assertEqual(1, len(listener.conn.consume.mock_calls))
+ listener.conn.stop_consuming = mock.MagicMock()
+ listener.stop()
+ fake_response = listener.poll()
+ self.assertEqual(1, len(listener.conn.consume.mock_calls))
+ self.assertEqual(fake_response, None)
+
+
+class TestWithRealKafkaBroker(test_utils.BaseTestCase):
+
+ def setUp(self):
+ super(TestWithRealKafkaBroker, self).setUp()
+ self.messaging_conf.transport_driver = 'kafka'
+ transport = oslo_messaging.get_transport(self.conf, KAFKA_BROKER_URL)
+ self.driver = transport._driver
+
+ @unittest.skipUnless(
+ _is_kafka_service_running(), "Kafka service is not available")
+ def test_send_and_recieve_message(self):
+ target = oslo_messaging.Target(
+ topic="fake_topic", exchange='fake_exchange')
+ targets_and_priorities = [(target, 'fake_info')]
+
+ listener = self.driver.listen_for_notifications(
+ targets_and_priorities)
+ fake_context = {"fake_context_key": "fake_context_value"}
+ fake_message = {"fake_message_key": "fake_message_value"}
+ self.driver.send_notification(
+ target, fake_context, fake_message, None)
+
+ received_message = listener.poll()
+ self.assertEqual(fake_context, received_message.ctxt)
+ self.assertEqual(fake_message, received_message.message)
+
+ @unittest.skipUnless(
+ _is_kafka_service_running(), "Kafka service is not available")
+ def test_send_and_recieve_message_without_exchange(self):
+ target = oslo_messaging.Target(topic="fake_no_exchange_topic")
+ targets_and_priorities = [(target, 'fake_info')]
+
+ listener = self.driver.listen_for_notifications(
+ targets_and_priorities)
+ fake_context = {"fake_context_key": "fake_context_value"}
+ fake_message = {"fake_message_key": "fake_message_value"}
+ self.driver.send_notification(
+ target, fake_context, fake_message, None)
+
+ received_message = listener.poll()
+ self.assertEqual(fake_context, received_message.ctxt)
+ self.assertEqual(fake_message, received_message.message)
+
+ @unittest.skipUnless(
+ _is_kafka_service_running(), "Kafka service is not available")
+ def test_recieve_message_from_empty_topic_with_timeout(self):
+ target = oslo_messaging.Target(
+ topic="fake_empty_topic", exchange='fake_empty_exchange')
+ targets_and_priorities = [(target, 'fake_info')]
+
+ listener = self.driver.listen_for_notifications(
+ targets_and_priorities)
+
+ deadline = time.time() + 3
+ received_message = listener.poll(timeout=3)
+ self.assertEqual(0, int(deadline - time.time()))
+ self.assertEqual(None, received_message)
diff --git a/setup.cfg b/setup.cfg
index cbed377..b45466c 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -29,6 +29,9 @@ oslo.messaging.drivers =
zmq = oslo_messaging._drivers.impl_zmq:ZmqDriver
amqp = oslo_messaging._drivers.protocols.amqp.driver:ProtonDriver
+ # This driver is supporting for only notification usage
+ kafka = oslo_messaging._drivers.impl_kafka:KafkaDriver
+
# To avoid confusion
kombu = oslo_messaging._drivers.impl_rabbit:RabbitDriver
diff --git a/test-requirements.txt b/test-requirements.txt
index 89cda42..1387e1a 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -21,6 +21,9 @@ redis>=2.10.0
# for test_impl_zmq
pyzmq>=14.3.1 # LGPL+BSD
+# for test_impl_kafka
+kafka-python>=0.9.2 # Apache-2.0
+
# when we can require tox>= 1.4, this can go into tox.ini:
# [testenv:cover]
# deps = {[testenv]deps} coverage