summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ceilometer/messaging.py9
-rw-r--r--ceilometer/notification.py18
-rw-r--r--ceilometer/tests/test_notification.py16
3 files changed, 33 insertions, 10 deletions
diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py
index cd0c874e..9d739e7d 100644
--- a/ceilometer/messaging.py
+++ b/ceilometer/messaging.py
@@ -63,11 +63,16 @@ def get_rpc_client(**kwargs):
return oslo.messaging.RPCClient(TRANSPORT, target)
-def get_notification_listener(targets, endpoints):
+def get_notification_listener(targets, endpoints, url=None):
"""Return a configured oslo.messaging notification listener."""
global TRANSPORT
+ if url:
+ transport = oslo.messaging.get_transport(cfg.CONF, url,
+ _ALIASES)
+ else:
+ transport = TRANSPORT
return oslo.messaging.get_notification_listener(
- TRANSPORT, targets, endpoints, executor='eventlet')
+ transport, targets, endpoints, executor='eventlet')
def get_notifier(publisher_id):
diff --git a/ceilometer/notification.py b/ceilometer/notification.py
index 509df13c..e1465a2f 100644
--- a/ceilometer/notification.py
+++ b/ceilometer/notification.py
@@ -39,6 +39,12 @@ OPTS = [
deprecated_group='collector',
default=False,
help='Save event details.'),
+ cfg.MultiStrOpt('messaging_urls',
+ default=[],
+ help="Messaging URLs to listen for notifications. "
+ "Example: transport://user:pass@host1:port"
+ "[,hostN:portN]/virtual_host "
+ "(DEFAULT/transport_url is used if empty)"),
]
cfg.CONF.register_opts(OPTS, group="notification")
@@ -83,12 +89,18 @@ class NotificationService(os_service.Service):
targets.extend(handler.get_targets(cfg.CONF))
endpoints.append(handler)
- self.listener = messaging.get_notification_listener(targets, endpoints)
- self.listener.start()
+ urls = cfg.CONF.notification.messaging_urls or [None]
+ self.listeners = []
+ for url in urls:
+ listener = messaging.get_notification_listener(targets,
+ endpoints,
+ url)
+ listener.start()
+ self.listeners.append(listener)
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def stop(self):
- self.listener.stop()
+ map(lambda x: x.stop(), self.listeners)
super(NotificationService, self).stop()
diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py
index 71bfe3c3..6db40bf8 100644
--- a/ceilometer/tests/test_notification.py
+++ b/ceilometer/tests/test_notification.py
@@ -114,6 +114,12 @@ class TestNotification(tests_base.BaseTestCase):
self.srv.start()
self.fake_event_endpoint = fake_event_endpoint_class.return_value
+ def test_start_multiple_listeners(self):
+ urls = ["fake://vhost1", "fake://vhost2"]
+ self.CONF.set_override("messaging_urls", urls, group="notification")
+ self._do_process_notification_manager_start()
+ self.assertEqual(2, len(self.srv.listeners))
+
def test_process_notification(self):
self._do_process_notification_manager_start()
self.srv.pipeline_manager.pipelines[0] = mock.MagicMock()
@@ -122,18 +128,18 @@ class TestNotification(tests_base.BaseTestCase):
'compute.instance.create.end',
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
- self.assertEqual(1, len(self.srv.listener.dispatcher.endpoints))
+ self.assertEqual(1, len(self.srv.listeners[0].dispatcher.endpoints))
self.assertTrue(self.srv.pipeline_manager.publisher.called)
def test_process_notification_no_events(self):
self._do_process_notification_manager_start()
- self.assertEqual(1, len(self.srv.listener.dispatcher.endpoints))
+ self.assertEqual(1, len(self.srv.listeners[0].dispatcher.endpoints))
self.assertNotEqual(self.fake_event_endpoint,
- self.srv.listener.dispatcher.endpoints[0])
+ self.srv.listeners[0].dispatcher.endpoints[0])
def test_process_notification_with_events(self):
self.CONF.set_override("store_events", True, group="notification")
self._do_process_notification_manager_start()
- self.assertEqual(2, len(self.srv.listener.dispatcher.endpoints))
+ self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints))
self.assertEqual(self.fake_event_endpoint,
- self.srv.listener.dispatcher.endpoints[0])
+ self.srv.listeners[0].dispatcher.endpoints[0])