diff options
-rw-r--r-- | ceilometer/messaging.py | 9 | ||||
-rw-r--r-- | ceilometer/notification.py | 18 | ||||
-rw-r--r-- | ceilometer/tests/test_notification.py | 16 |
3 files changed, 33 insertions, 10 deletions
diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py index cd0c874e..9d739e7d 100644 --- a/ceilometer/messaging.py +++ b/ceilometer/messaging.py @@ -63,11 +63,16 @@ def get_rpc_client(**kwargs): return oslo.messaging.RPCClient(TRANSPORT, target) -def get_notification_listener(targets, endpoints): +def get_notification_listener(targets, endpoints, url=None): """Return a configured oslo.messaging notification listener.""" global TRANSPORT + if url: + transport = oslo.messaging.get_transport(cfg.CONF, url, + _ALIASES) + else: + transport = TRANSPORT return oslo.messaging.get_notification_listener( - TRANSPORT, targets, endpoints, executor='eventlet') + transport, targets, endpoints, executor='eventlet') def get_notifier(publisher_id): diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 509df13c..e1465a2f 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -39,6 +39,12 @@ OPTS = [ deprecated_group='collector', default=False, help='Save event details.'), + cfg.MultiStrOpt('messaging_urls', + default=[], + help="Messaging URLs to listen for notifications. " + "Example: transport://user:pass@host1:port" + "[,hostN:portN]/virtual_host " + "(DEFAULT/transport_url is used if empty)"), ] cfg.CONF.register_opts(OPTS, group="notification") @@ -83,12 +89,18 @@ class NotificationService(os_service.Service): targets.extend(handler.get_targets(cfg.CONF)) endpoints.append(handler) - self.listener = messaging.get_notification_listener(targets, endpoints) - self.listener.start() + urls = cfg.CONF.notification.messaging_urls or [None] + self.listeners = [] + for url in urls: + listener = messaging.get_notification_listener(targets, + endpoints, + url) + listener.start() + self.listeners.append(listener) # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None) def stop(self): - self.listener.stop() + map(lambda x: x.stop(), self.listeners) super(NotificationService, self).stop() diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index 71bfe3c3..6db40bf8 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -114,6 +114,12 @@ class TestNotification(tests_base.BaseTestCase): self.srv.start() self.fake_event_endpoint = fake_event_endpoint_class.return_value + def test_start_multiple_listeners(self): + urls = ["fake://vhost1", "fake://vhost2"] + self.CONF.set_override("messaging_urls", urls, group="notification") + self._do_process_notification_manager_start() + self.assertEqual(2, len(self.srv.listeners)) + def test_process_notification(self): self._do_process_notification_manager_start() self.srv.pipeline_manager.pipelines[0] = mock.MagicMock() @@ -122,18 +128,18 @@ class TestNotification(tests_base.BaseTestCase): 'compute.instance.create.end', TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA) - self.assertEqual(1, len(self.srv.listener.dispatcher.endpoints)) + self.assertEqual(1, len(self.srv.listeners[0].dispatcher.endpoints)) self.assertTrue(self.srv.pipeline_manager.publisher.called) def test_process_notification_no_events(self): self._do_process_notification_manager_start() - self.assertEqual(1, len(self.srv.listener.dispatcher.endpoints)) + self.assertEqual(1, len(self.srv.listeners[0].dispatcher.endpoints)) self.assertNotEqual(self.fake_event_endpoint, - self.srv.listener.dispatcher.endpoints[0]) + self.srv.listeners[0].dispatcher.endpoints[0]) def test_process_notification_with_events(self): self.CONF.set_override("store_events", True, group="notification") self._do_process_notification_manager_start() - self.assertEqual(2, len(self.srv.listener.dispatcher.endpoints)) + self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints)) self.assertEqual(self.fake_event_endpoint, - self.srv.listener.dispatcher.endpoints[0]) + self.srv.listeners[0].dispatcher.endpoints[0]) |