summaryrefslogtreecommitdiff
path: root/tests/notify/test_notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/notify/test_notifier.py')
-rw-r--r--tests/notify/test_notifier.py540
1 files changed, 0 insertions, 540 deletions
diff --git a/tests/notify/test_notifier.py b/tests/notify/test_notifier.py
deleted file mode 100644
index 9cc8ec0..0000000
--- a/tests/notify/test_notifier.py
+++ /dev/null
@@ -1,540 +0,0 @@
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import logging
-import sys
-import uuid
-
-import fixtures
-from oslo_serialization import jsonutils
-from oslo_utils import timeutils
-from stevedore import dispatch
-from stevedore import extension
-import testscenarios
-import yaml
-
-from oslo import messaging
-from oslo.messaging.notify import notifier as msg_notifier
-from oslo.messaging import serializer as msg_serializer
-from oslo_messaging.notify import _impl_log
-from oslo_messaging.notify import _impl_messaging
-from oslo_messaging.notify import _impl_test
-from oslo_messaging.tests import utils as test_utils
-from six.moves import mock
-
-load_tests = testscenarios.load_tests_apply_scenarios
-
-
-class _FakeTransport(object):
-
- def __init__(self, conf):
- self.conf = conf
-
- def _send_notification(self, target, ctxt, message, version, retry=None):
- pass
-
-
-class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
-
- """Record logged exceptions and re-raise in cleanup.
-
- The notifier just logs notification send errors so, for the sake of
- debugging test failures, we record any exceptions logged and re-raise them
- during cleanup.
- """
-
- class FakeLogger(object):
-
- def __init__(self):
- self.exceptions = []
-
- def exception(self, msg, *args, **kwargs):
- self.exceptions.append(sys.exc_info()[1])
-
- def setUp(self):
- super(_ReRaiseLoggedExceptionsFixture, self).setUp()
-
- self.logger = self.FakeLogger()
-
- def reraise_exceptions():
- for ex in self.logger.exceptions:
- raise ex
-
- self.addCleanup(reraise_exceptions)
-
-
-class TestMessagingNotifier(test_utils.BaseTestCase):
-
- _v1 = [
- ('v1', dict(v1=True)),
- ('not_v1', dict(v1=False)),
- ]
-
- _v2 = [
- ('v2', dict(v2=True)),
- ('not_v2', dict(v2=False)),
- ]
-
- _publisher_id = [
- ('ctor_pub_id', dict(ctor_pub_id='test',
- expected_pub_id='test')),
- ('prep_pub_id', dict(prep_pub_id='test.localhost',
- expected_pub_id='test.localhost')),
- ('override', dict(ctor_pub_id='test',
- prep_pub_id='test.localhost',
- expected_pub_id='test.localhost')),
- ]
-
- _topics = [
- ('no_topics', dict(topics=[])),
- ('single_topic', dict(topics=['notifications'])),
- ('multiple_topic2', dict(topics=['foo', 'bar'])),
- ]
-
- _priority = [
- ('audit', dict(priority='audit')),
- ('debug', dict(priority='debug')),
- ('info', dict(priority='info')),
- ('warn', dict(priority='warn')),
- ('error', dict(priority='error')),
- ('sample', dict(priority='sample')),
- ('critical', dict(priority='critical')),
- ]
-
- _payload = [
- ('payload', dict(payload={'foo': 'bar'})),
- ]
-
- _context = [
- ('ctxt', dict(ctxt={'user': 'bob'})),
- ]
-
- _retry = [
- ('unconfigured', dict()),
- ('None', dict(retry=None)),
- ('0', dict(retry=0)),
- ('5', dict(retry=5)),
- ]
-
- @classmethod
- def generate_scenarios(cls):
- cls.scenarios = testscenarios.multiply_scenarios(cls._v1,
- cls._v2,
- cls._publisher_id,
- cls._topics,
- cls._priority,
- cls._payload,
- cls._context,
- cls._retry)
-
- def setUp(self):
- super(TestMessagingNotifier, self).setUp()
-
- self.logger = self.useFixture(_ReRaiseLoggedExceptionsFixture()).logger
- self.stubs.Set(_impl_messaging, 'LOG', self.logger)
- self.stubs.Set(msg_notifier, '_LOG', self.logger)
-
- @mock.patch('oslo_utils.timeutils.utcnow')
- def test_notifier(self, mock_utcnow):
- drivers = []
- if self.v1:
- drivers.append('messaging')
- if self.v2:
- drivers.append('messagingv2')
-
- self.config(notification_driver=drivers,
- notification_topics=self.topics)
-
- transport = _FakeTransport(self.conf)
-
- if hasattr(self, 'ctor_pub_id'):
- notifier = messaging.Notifier(transport,
- publisher_id=self.ctor_pub_id)
- else:
- notifier = messaging.Notifier(transport)
-
- prepare_kwds = {}
- if hasattr(self, 'retry'):
- prepare_kwds['retry'] = self.retry
- if hasattr(self, 'prep_pub_id'):
- prepare_kwds['publisher_id'] = self.prep_pub_id
- if prepare_kwds:
- notifier = notifier.prepare(**prepare_kwds)
-
- self.mox.StubOutWithMock(transport, '_send_notification')
-
- message_id = uuid.uuid4()
- self.mox.StubOutWithMock(uuid, 'uuid4')
- uuid.uuid4().AndReturn(message_id)
-
- mock_utcnow.return_value = datetime.datetime.utcnow()
-
- message = {
- 'message_id': str(message_id),
- 'publisher_id': self.expected_pub_id,
- 'event_type': 'test.notify',
- 'priority': self.priority.upper(),
- 'payload': self.payload,
- 'timestamp': str(timeutils.utcnow()),
- }
-
- sends = []
- if self.v1:
- sends.append(dict(version=1.0))
- if self.v2:
- sends.append(dict(version=2.0))
-
- for send_kwargs in sends:
- for topic in self.topics:
- if hasattr(self, 'retry'):
- send_kwargs['retry'] = self.retry
- else:
- send_kwargs['retry'] = None
- target = messaging.Target(topic='%s.%s' % (topic,
- self.priority))
- transport._send_notification(target, self.ctxt, message,
- **send_kwargs).InAnyOrder()
-
- self.mox.ReplayAll()
-
- method = getattr(notifier, self.priority)
- method(self.ctxt, 'test.notify', self.payload)
-
-
-TestMessagingNotifier.generate_scenarios()
-
-
-class TestSerializer(test_utils.BaseTestCase):
-
- def setUp(self):
- super(TestSerializer, self).setUp()
- self.addCleanup(_impl_test.reset)
-
- @mock.patch('oslo_utils.timeutils.utcnow')
- def test_serializer(self, mock_utcnow):
- transport = _FakeTransport(self.conf)
-
- serializer = msg_serializer.NoOpSerializer()
-
- notifier = messaging.Notifier(transport,
- 'test.localhost',
- driver='test',
- topic='test',
- serializer=serializer)
-
- message_id = uuid.uuid4()
- self.mox.StubOutWithMock(uuid, 'uuid4')
- uuid.uuid4().AndReturn(message_id)
-
- mock_utcnow.return_value = datetime.datetime.utcnow()
-
- self.mox.StubOutWithMock(serializer, 'serialize_context')
- self.mox.StubOutWithMock(serializer, 'serialize_entity')
- serializer.serialize_context(dict(user='bob')).\
- AndReturn(dict(user='alice'))
- serializer.serialize_entity(dict(user='bob'), 'bar').AndReturn('sbar')
-
- self.mox.ReplayAll()
-
- notifier.info(dict(user='bob'), 'test.notify', 'bar')
-
- message = {
- 'message_id': str(message_id),
- 'publisher_id': 'test.localhost',
- 'event_type': 'test.notify',
- 'priority': 'INFO',
- 'payload': 'sbar',
- 'timestamp': str(timeutils.utcnow()),
- }
-
- self.assertEqual([(dict(user='alice'), message, 'INFO', None)],
- _impl_test.NOTIFICATIONS)
-
-
-class TestLogNotifier(test_utils.BaseTestCase):
-
- @mock.patch('oslo_utils.timeutils.utcnow')
- def test_notifier(self, mock_utcnow):
- self.config(notification_driver=['log'])
-
- transport = _FakeTransport(self.conf)
-
- notifier = messaging.Notifier(transport, 'test.localhost')
-
- message_id = uuid.uuid4()
- self.mox.StubOutWithMock(uuid, 'uuid4')
- uuid.uuid4().AndReturn(message_id)
-
- mock_utcnow.return_value = datetime.datetime.utcnow()
-
- message = {
- 'message_id': str(message_id),
- 'publisher_id': 'test.localhost',
- 'event_type': 'test.notify',
- 'priority': 'INFO',
- 'payload': 'bar',
- 'timestamp': str(timeutils.utcnow()),
- }
-
- logger = self.mox.CreateMockAnything()
-
- self.mox.StubOutWithMock(logging, 'getLogger')
- logging.getLogger('oslo.messaging.notification.test.notify').\
- AndReturn(logger)
-
- logger.info(jsonutils.dumps(message))
-
- self.mox.ReplayAll()
-
- notifier.info({}, 'test.notify', 'bar')
-
- def test_sample_priority(self):
- # Ensure logger drops sample-level notifications.
- driver = _impl_log.LogDriver(None, None, None)
-
- logger = self.mox.CreateMock(
- logging.getLogger('oslo.messaging.notification.foo'))
- logger.sample = None
- self.mox.StubOutWithMock(logging, 'getLogger')
- logging.getLogger('oslo.messaging.notification.foo').\
- AndReturn(logger)
-
- self.mox.ReplayAll()
-
- msg = {'event_type': 'foo'}
- driver.notify(None, msg, "sample", None)
-
-
-class TestRoutingNotifier(test_utils.BaseTestCase):
- def setUp(self):
- super(TestRoutingNotifier, self).setUp()
- self.config(notification_driver=['routing'])
-
- transport = _FakeTransport(self.conf)
- self.notifier = messaging.Notifier(transport)
- self.router = self.notifier._driver_mgr['routing'].obj
-
- def _fake_extension_manager(self, ext):
- return extension.ExtensionManager.make_test_instance(
- [extension.Extension('test', None, None, ext), ])
-
- def _empty_extension_manager(self):
- return extension.ExtensionManager.make_test_instance([])
-
- def test_should_load_plugin(self):
- self.router.used_drivers = set(["zoo", "blah"])
- ext = mock.MagicMock()
- ext.name = "foo"
- self.assertFalse(self.router._should_load_plugin(ext))
- ext.name = "zoo"
- self.assertTrue(self.router._should_load_plugin(ext))
-
- def test_load_notifiers_no_config(self):
- # default routing_notifier_config=""
- self.router._load_notifiers()
- self.assertEqual({}, self.router.routing_groups)
- self.assertEqual(0, len(self.router.used_drivers))
-
- def test_load_notifiers_no_extensions(self):
- self.config(routing_notifier_config="routing_notifier.yaml")
- routing_config = r""
- config_file = mock.MagicMock()
- config_file.return_value = routing_config
-
- with mock.patch.object(self.router, '_get_notifier_config_file',
- config_file):
- with mock.patch('stevedore.dispatch.DispatchExtensionManager',
- return_value=self._empty_extension_manager()):
- with mock.patch('oslo_messaging.notify.'
- '_impl_routing.LOG') as mylog:
- self.router._load_notifiers()
- self.assertFalse(mylog.debug.called)
- self.assertEqual({}, self.router.routing_groups)
-
- def test_load_notifiers_config(self):
- self.config(routing_notifier_config="routing_notifier.yaml")
- routing_config = r"""
-group_1:
- rpc : foo
-group_2:
- rpc : blah
- """
-
- config_file = mock.MagicMock()
- config_file.return_value = routing_config
-
- with mock.patch.object(self.router, '_get_notifier_config_file',
- config_file):
- with mock.patch('stevedore.dispatch.DispatchExtensionManager',
- return_value=self._fake_extension_manager(
- mock.MagicMock())):
- self.router._load_notifiers()
- groups = list(self.router.routing_groups.keys())
- groups.sort()
- self.assertEqual(['group_1', 'group_2'], groups)
-
- def test_get_drivers_for_message_accepted_events(self):
- config = r"""
-group_1:
- rpc:
- accepted_events:
- - foo.*
- - blah.zoo.*
- - zip
- """
- groups = yaml.load(config)
- group = groups['group_1']
-
- # No matching event ...
- self.assertEqual([],
- self.router._get_drivers_for_message(
- group, "unknown", "info"))
-
- # Child of foo ...
- self.assertEqual(['rpc'],
- self.router._get_drivers_for_message(
- group, "foo.1", "info"))
-
- # Foo itself ...
- self.assertEqual([],
- self.router._get_drivers_for_message(
- group, "foo", "info"))
-
- # Child of blah.zoo
- self.assertEqual(['rpc'],
- self.router._get_drivers_for_message(
- group, "blah.zoo.zing", "info"))
-
- def test_get_drivers_for_message_accepted_priorities(self):
- config = r"""
-group_1:
- rpc:
- accepted_priorities:
- - info
- - error
- """
- groups = yaml.load(config)
- group = groups['group_1']
-
- # No matching priority
- self.assertEqual([],
- self.router._get_drivers_for_message(
- group, None, "unknown"))
-
- # Info ...
- self.assertEqual(['rpc'],
- self.router._get_drivers_for_message(
- group, None, "info"))
-
- # Error (to make sure the list is getting processed) ...
- self.assertEqual(['rpc'],
- self.router._get_drivers_for_message(
- group, None, "error"))
-
- def test_get_drivers_for_message_both(self):
- config = r"""
-group_1:
- rpc:
- accepted_priorities:
- - info
- accepted_events:
- - foo.*
- driver_1:
- accepted_priorities:
- - info
- driver_2:
- accepted_events:
- - foo.*
- """
- groups = yaml.load(config)
- group = groups['group_1']
-
- # Valid event, but no matching priority
- self.assertEqual(['driver_2'],
- self.router._get_drivers_for_message(
- group, 'foo.blah', "unknown"))
-
- # Valid priority, but no matching event
- self.assertEqual(['driver_1'],
- self.router._get_drivers_for_message(
- group, 'unknown', "info"))
-
- # Happy day ...
- x = self.router._get_drivers_for_message(group, 'foo.blah', "info")
- x.sort()
- self.assertEqual(['driver_1', 'driver_2', 'rpc'], x)
-
- def test_filter_func(self):
- ext = mock.MagicMock()
- ext.name = "rpc"
-
- # Good ...
- self.assertTrue(self.router._filter_func(ext, {}, {}, 'info',
- None, ['foo', 'rpc']))
-
- # Bad
- self.assertFalse(self.router._filter_func(ext, {}, {}, 'info',
- None, ['foo']))
-
- def test_notify(self):
- self.router.routing_groups = {'group_1': None, 'group_2': None}
- drivers_mock = mock.MagicMock()
- drivers_mock.side_effect = [['rpc'], ['foo']]
-
- with mock.patch.object(self.router, 'plugin_manager') as pm:
- with mock.patch.object(self.router, '_get_drivers_for_message',
- drivers_mock):
- self.notifier.info({}, 'my_event', {})
- self.assertEqual(sorted(['rpc', 'foo']),
- sorted(pm.map.call_args[0][6]))
-
- def test_notify_filtered(self):
- self.config(routing_notifier_config="routing_notifier.yaml")
- routing_config = r"""
-group_1:
- rpc:
- accepted_events:
- - my_event
- rpc2:
- accepted_priorities:
- - info
- bar:
- accepted_events:
- - nothing
- """
- config_file = mock.MagicMock()
- config_file.return_value = routing_config
-
- rpc_driver = mock.Mock()
- rpc2_driver = mock.Mock()
- bar_driver = mock.Mock()
-
- pm = dispatch.DispatchExtensionManager.make_test_instance(
- [extension.Extension('rpc', None, None, rpc_driver),
- extension.Extension('rpc2', None, None, rpc2_driver),
- extension.Extension('bar', None, None, bar_driver)],
- )
-
- with mock.patch.object(self.router, '_get_notifier_config_file',
- config_file):
- with mock.patch('stevedore.dispatch.DispatchExtensionManager',
- return_value=pm):
- self.notifier.info({}, 'my_event', {})
- self.assertFalse(bar_driver.info.called)
- rpc_driver.notify.assert_called_once_with(
- {}, mock.ANY, 'INFO', None)
- rpc2_driver.notify.assert_called_once_with(
- {}, mock.ANY, 'INFO', None)