diff options
Diffstat (limited to 'tests/notify/test_notifier.py')
-rw-r--r-- | tests/notify/test_notifier.py | 540 |
1 files changed, 0 insertions, 540 deletions
diff --git a/tests/notify/test_notifier.py b/tests/notify/test_notifier.py deleted file mode 100644 index 9cc8ec0..0000000 --- a/tests/notify/test_notifier.py +++ /dev/null @@ -1,540 +0,0 @@ - -# Copyright 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -import logging -import sys -import uuid - -import fixtures -from oslo_serialization import jsonutils -from oslo_utils import timeutils -from stevedore import dispatch -from stevedore import extension -import testscenarios -import yaml - -from oslo import messaging -from oslo.messaging.notify import notifier as msg_notifier -from oslo.messaging import serializer as msg_serializer -from oslo_messaging.notify import _impl_log -from oslo_messaging.notify import _impl_messaging -from oslo_messaging.notify import _impl_test -from oslo_messaging.tests import utils as test_utils -from six.moves import mock - -load_tests = testscenarios.load_tests_apply_scenarios - - -class _FakeTransport(object): - - def __init__(self, conf): - self.conf = conf - - def _send_notification(self, target, ctxt, message, version, retry=None): - pass - - -class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture): - - """Record logged exceptions and re-raise in cleanup. - - The notifier just logs notification send errors so, for the sake of - debugging test failures, we record any exceptions logged and re-raise them - during cleanup. - """ - - class FakeLogger(object): - - def __init__(self): - self.exceptions = [] - - def exception(self, msg, *args, **kwargs): - self.exceptions.append(sys.exc_info()[1]) - - def setUp(self): - super(_ReRaiseLoggedExceptionsFixture, self).setUp() - - self.logger = self.FakeLogger() - - def reraise_exceptions(): - for ex in self.logger.exceptions: - raise ex - - self.addCleanup(reraise_exceptions) - - -class TestMessagingNotifier(test_utils.BaseTestCase): - - _v1 = [ - ('v1', dict(v1=True)), - ('not_v1', dict(v1=False)), - ] - - _v2 = [ - ('v2', dict(v2=True)), - ('not_v2', dict(v2=False)), - ] - - _publisher_id = [ - ('ctor_pub_id', dict(ctor_pub_id='test', - expected_pub_id='test')), - ('prep_pub_id', dict(prep_pub_id='test.localhost', - expected_pub_id='test.localhost')), - ('override', dict(ctor_pub_id='test', - prep_pub_id='test.localhost', - expected_pub_id='test.localhost')), - ] - - _topics = [ - ('no_topics', dict(topics=[])), - ('single_topic', dict(topics=['notifications'])), - ('multiple_topic2', dict(topics=['foo', 'bar'])), - ] - - _priority = [ - ('audit', dict(priority='audit')), - ('debug', dict(priority='debug')), - ('info', dict(priority='info')), - ('warn', dict(priority='warn')), - ('error', dict(priority='error')), - ('sample', dict(priority='sample')), - ('critical', dict(priority='critical')), - ] - - _payload = [ - ('payload', dict(payload={'foo': 'bar'})), - ] - - _context = [ - ('ctxt', dict(ctxt={'user': 'bob'})), - ] - - _retry = [ - ('unconfigured', dict()), - ('None', dict(retry=None)), - ('0', dict(retry=0)), - ('5', dict(retry=5)), - ] - - @classmethod - def generate_scenarios(cls): - cls.scenarios = testscenarios.multiply_scenarios(cls._v1, - cls._v2, - cls._publisher_id, - cls._topics, - cls._priority, - cls._payload, - cls._context, - cls._retry) - - def setUp(self): - super(TestMessagingNotifier, self).setUp() - - self.logger = self.useFixture(_ReRaiseLoggedExceptionsFixture()).logger - self.stubs.Set(_impl_messaging, 'LOG', self.logger) - self.stubs.Set(msg_notifier, '_LOG', self.logger) - - @mock.patch('oslo_utils.timeutils.utcnow') - def test_notifier(self, mock_utcnow): - drivers = [] - if self.v1: - drivers.append('messaging') - if self.v2: - drivers.append('messagingv2') - - self.config(notification_driver=drivers, - notification_topics=self.topics) - - transport = _FakeTransport(self.conf) - - if hasattr(self, 'ctor_pub_id'): - notifier = messaging.Notifier(transport, - publisher_id=self.ctor_pub_id) - else: - notifier = messaging.Notifier(transport) - - prepare_kwds = {} - if hasattr(self, 'retry'): - prepare_kwds['retry'] = self.retry - if hasattr(self, 'prep_pub_id'): - prepare_kwds['publisher_id'] = self.prep_pub_id - if prepare_kwds: - notifier = notifier.prepare(**prepare_kwds) - - self.mox.StubOutWithMock(transport, '_send_notification') - - message_id = uuid.uuid4() - self.mox.StubOutWithMock(uuid, 'uuid4') - uuid.uuid4().AndReturn(message_id) - - mock_utcnow.return_value = datetime.datetime.utcnow() - - message = { - 'message_id': str(message_id), - 'publisher_id': self.expected_pub_id, - 'event_type': 'test.notify', - 'priority': self.priority.upper(), - 'payload': self.payload, - 'timestamp': str(timeutils.utcnow()), - } - - sends = [] - if self.v1: - sends.append(dict(version=1.0)) - if self.v2: - sends.append(dict(version=2.0)) - - for send_kwargs in sends: - for topic in self.topics: - if hasattr(self, 'retry'): - send_kwargs['retry'] = self.retry - else: - send_kwargs['retry'] = None - target = messaging.Target(topic='%s.%s' % (topic, - self.priority)) - transport._send_notification(target, self.ctxt, message, - **send_kwargs).InAnyOrder() - - self.mox.ReplayAll() - - method = getattr(notifier, self.priority) - method(self.ctxt, 'test.notify', self.payload) - - -TestMessagingNotifier.generate_scenarios() - - -class TestSerializer(test_utils.BaseTestCase): - - def setUp(self): - super(TestSerializer, self).setUp() - self.addCleanup(_impl_test.reset) - - @mock.patch('oslo_utils.timeutils.utcnow') - def test_serializer(self, mock_utcnow): - transport = _FakeTransport(self.conf) - - serializer = msg_serializer.NoOpSerializer() - - notifier = messaging.Notifier(transport, - 'test.localhost', - driver='test', - topic='test', - serializer=serializer) - - message_id = uuid.uuid4() - self.mox.StubOutWithMock(uuid, 'uuid4') - uuid.uuid4().AndReturn(message_id) - - mock_utcnow.return_value = datetime.datetime.utcnow() - - self.mox.StubOutWithMock(serializer, 'serialize_context') - self.mox.StubOutWithMock(serializer, 'serialize_entity') - serializer.serialize_context(dict(user='bob')).\ - AndReturn(dict(user='alice')) - serializer.serialize_entity(dict(user='bob'), 'bar').AndReturn('sbar') - - self.mox.ReplayAll() - - notifier.info(dict(user='bob'), 'test.notify', 'bar') - - message = { - 'message_id': str(message_id), - 'publisher_id': 'test.localhost', - 'event_type': 'test.notify', - 'priority': 'INFO', - 'payload': 'sbar', - 'timestamp': str(timeutils.utcnow()), - } - - self.assertEqual([(dict(user='alice'), message, 'INFO', None)], - _impl_test.NOTIFICATIONS) - - -class TestLogNotifier(test_utils.BaseTestCase): - - @mock.patch('oslo_utils.timeutils.utcnow') - def test_notifier(self, mock_utcnow): - self.config(notification_driver=['log']) - - transport = _FakeTransport(self.conf) - - notifier = messaging.Notifier(transport, 'test.localhost') - - message_id = uuid.uuid4() - self.mox.StubOutWithMock(uuid, 'uuid4') - uuid.uuid4().AndReturn(message_id) - - mock_utcnow.return_value = datetime.datetime.utcnow() - - message = { - 'message_id': str(message_id), - 'publisher_id': 'test.localhost', - 'event_type': 'test.notify', - 'priority': 'INFO', - 'payload': 'bar', - 'timestamp': str(timeutils.utcnow()), - } - - logger = self.mox.CreateMockAnything() - - self.mox.StubOutWithMock(logging, 'getLogger') - logging.getLogger('oslo.messaging.notification.test.notify').\ - AndReturn(logger) - - logger.info(jsonutils.dumps(message)) - - self.mox.ReplayAll() - - notifier.info({}, 'test.notify', 'bar') - - def test_sample_priority(self): - # Ensure logger drops sample-level notifications. - driver = _impl_log.LogDriver(None, None, None) - - logger = self.mox.CreateMock( - logging.getLogger('oslo.messaging.notification.foo')) - logger.sample = None - self.mox.StubOutWithMock(logging, 'getLogger') - logging.getLogger('oslo.messaging.notification.foo').\ - AndReturn(logger) - - self.mox.ReplayAll() - - msg = {'event_type': 'foo'} - driver.notify(None, msg, "sample", None) - - -class TestRoutingNotifier(test_utils.BaseTestCase): - def setUp(self): - super(TestRoutingNotifier, self).setUp() - self.config(notification_driver=['routing']) - - transport = _FakeTransport(self.conf) - self.notifier = messaging.Notifier(transport) - self.router = self.notifier._driver_mgr['routing'].obj - - def _fake_extension_manager(self, ext): - return extension.ExtensionManager.make_test_instance( - [extension.Extension('test', None, None, ext), ]) - - def _empty_extension_manager(self): - return extension.ExtensionManager.make_test_instance([]) - - def test_should_load_plugin(self): - self.router.used_drivers = set(["zoo", "blah"]) - ext = mock.MagicMock() - ext.name = "foo" - self.assertFalse(self.router._should_load_plugin(ext)) - ext.name = "zoo" - self.assertTrue(self.router._should_load_plugin(ext)) - - def test_load_notifiers_no_config(self): - # default routing_notifier_config="" - self.router._load_notifiers() - self.assertEqual({}, self.router.routing_groups) - self.assertEqual(0, len(self.router.used_drivers)) - - def test_load_notifiers_no_extensions(self): - self.config(routing_notifier_config="routing_notifier.yaml") - routing_config = r"" - config_file = mock.MagicMock() - config_file.return_value = routing_config - - with mock.patch.object(self.router, '_get_notifier_config_file', - config_file): - with mock.patch('stevedore.dispatch.DispatchExtensionManager', - return_value=self._empty_extension_manager()): - with mock.patch('oslo_messaging.notify.' - '_impl_routing.LOG') as mylog: - self.router._load_notifiers() - self.assertFalse(mylog.debug.called) - self.assertEqual({}, self.router.routing_groups) - - def test_load_notifiers_config(self): - self.config(routing_notifier_config="routing_notifier.yaml") - routing_config = r""" -group_1: - rpc : foo -group_2: - rpc : blah - """ - - config_file = mock.MagicMock() - config_file.return_value = routing_config - - with mock.patch.object(self.router, '_get_notifier_config_file', - config_file): - with mock.patch('stevedore.dispatch.DispatchExtensionManager', - return_value=self._fake_extension_manager( - mock.MagicMock())): - self.router._load_notifiers() - groups = list(self.router.routing_groups.keys()) - groups.sort() - self.assertEqual(['group_1', 'group_2'], groups) - - def test_get_drivers_for_message_accepted_events(self): - config = r""" -group_1: - rpc: - accepted_events: - - foo.* - - blah.zoo.* - - zip - """ - groups = yaml.load(config) - group = groups['group_1'] - - # No matching event ... - self.assertEqual([], - self.router._get_drivers_for_message( - group, "unknown", "info")) - - # Child of foo ... - self.assertEqual(['rpc'], - self.router._get_drivers_for_message( - group, "foo.1", "info")) - - # Foo itself ... - self.assertEqual([], - self.router._get_drivers_for_message( - group, "foo", "info")) - - # Child of blah.zoo - self.assertEqual(['rpc'], - self.router._get_drivers_for_message( - group, "blah.zoo.zing", "info")) - - def test_get_drivers_for_message_accepted_priorities(self): - config = r""" -group_1: - rpc: - accepted_priorities: - - info - - error - """ - groups = yaml.load(config) - group = groups['group_1'] - - # No matching priority - self.assertEqual([], - self.router._get_drivers_for_message( - group, None, "unknown")) - - # Info ... - self.assertEqual(['rpc'], - self.router._get_drivers_for_message( - group, None, "info")) - - # Error (to make sure the list is getting processed) ... - self.assertEqual(['rpc'], - self.router._get_drivers_for_message( - group, None, "error")) - - def test_get_drivers_for_message_both(self): - config = r""" -group_1: - rpc: - accepted_priorities: - - info - accepted_events: - - foo.* - driver_1: - accepted_priorities: - - info - driver_2: - accepted_events: - - foo.* - """ - groups = yaml.load(config) - group = groups['group_1'] - - # Valid event, but no matching priority - self.assertEqual(['driver_2'], - self.router._get_drivers_for_message( - group, 'foo.blah', "unknown")) - - # Valid priority, but no matching event - self.assertEqual(['driver_1'], - self.router._get_drivers_for_message( - group, 'unknown', "info")) - - # Happy day ... - x = self.router._get_drivers_for_message(group, 'foo.blah', "info") - x.sort() - self.assertEqual(['driver_1', 'driver_2', 'rpc'], x) - - def test_filter_func(self): - ext = mock.MagicMock() - ext.name = "rpc" - - # Good ... - self.assertTrue(self.router._filter_func(ext, {}, {}, 'info', - None, ['foo', 'rpc'])) - - # Bad - self.assertFalse(self.router._filter_func(ext, {}, {}, 'info', - None, ['foo'])) - - def test_notify(self): - self.router.routing_groups = {'group_1': None, 'group_2': None} - drivers_mock = mock.MagicMock() - drivers_mock.side_effect = [['rpc'], ['foo']] - - with mock.patch.object(self.router, 'plugin_manager') as pm: - with mock.patch.object(self.router, '_get_drivers_for_message', - drivers_mock): - self.notifier.info({}, 'my_event', {}) - self.assertEqual(sorted(['rpc', 'foo']), - sorted(pm.map.call_args[0][6])) - - def test_notify_filtered(self): - self.config(routing_notifier_config="routing_notifier.yaml") - routing_config = r""" -group_1: - rpc: - accepted_events: - - my_event - rpc2: - accepted_priorities: - - info - bar: - accepted_events: - - nothing - """ - config_file = mock.MagicMock() - config_file.return_value = routing_config - - rpc_driver = mock.Mock() - rpc2_driver = mock.Mock() - bar_driver = mock.Mock() - - pm = dispatch.DispatchExtensionManager.make_test_instance( - [extension.Extension('rpc', None, None, rpc_driver), - extension.Extension('rpc2', None, None, rpc2_driver), - extension.Extension('bar', None, None, bar_driver)], - ) - - with mock.patch.object(self.router, '_get_notifier_config_file', - config_file): - with mock.patch('stevedore.dispatch.DispatchExtensionManager', - return_value=pm): - self.notifier.info({}, 'my_event', {}) - self.assertFalse(bar_driver.info.called) - rpc_driver.notify.assert_called_once_with( - {}, mock.ANY, 'INFO', None) - rpc2_driver.notify.assert_called_once_with( - {}, mock.ANY, 'INFO', None) |