summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-01-23 13:38:54 +0000
committerGerrit Code Review <review@openstack.org>2014-01-23 13:38:54 +0000
commit2862f502a6aabd89f0d532e0968ac9c4fba30111 (patch)
treeaec5cbf4797c5b3d079af3fb326adb1df2aa4cbb
parent258299d3767c15cc3a7819e21c5c5461189d4076 (diff)
parent30c933430652a7b366824d17a59793ca64e48ddf (diff)
downloadoslo-messaging-1.3.0a5.tar.gz
Merge "Routing notifier"1.3.0a5
-rw-r--r--etc/routing_notifier.yaml.sample29
-rw-r--r--oslo/messaging/notify/_impl_routing.py136
-rw-r--r--requirements.txt4
-rw-r--r--setup.cfg1
-rw-r--r--tests/test_notifier.py186
5 files changed, 355 insertions, 1 deletions
diff --git a/etc/routing_notifier.yaml.sample b/etc/routing_notifier.yaml.sample
new file mode 100644
index 0000000..26fb731
--- /dev/null
+++ b/etc/routing_notifier.yaml.sample
@@ -0,0 +1,29 @@
+# Setting a priority AND an event means both have to be satisfied.
+#
+# However, defining different sets for the same driver allows you
+# to do OR operations.
+#
+# See how this logic is modeled below:
+#
+# if (priority in info, warn or error) or
+# (event == compute.scheduler.run_instance)
+# send to messaging driver ...
+#
+# if priority == 'poll' and
+# event == 'bandwidth.*'
+# send to poll driver
+
+group_1:
+ messaging:
+ accepted_priorities: ['info', 'warn', 'error']
+
+ poll:
+ accepted_priorities: ['poll']
+ accepted_events: ['bandwidth.*']
+
+ log:
+ accepted_events: ['compute.instance.exists']
+
+group_2:
+ messaging:⋅
+ accepted_events: ['compute.scheduler.run_instance.*']
diff --git a/oslo/messaging/notify/_impl_routing.py b/oslo/messaging/notify/_impl_routing.py
new file mode 100644
index 0000000..1e8368f
--- /dev/null
+++ b/oslo/messaging/notify/_impl_routing.py
@@ -0,0 +1,136 @@
+# Copyright 2014 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import fnmatch
+import logging
+
+from oslo.config import cfg
+import six
+from stevedore import dispatch
+import yaml
+
+from oslo.messaging.notify import notifier
+from oslo.messaging.openstack.common.gettextutils import _ # noqa
+
+
+LOG = logging.getLogger(__name__)
+
+router_config = cfg.StrOpt('routing_notifier_config', default='',
+ help='RoutingNotifier configuration file location')
+
+CONF = cfg.CONF
+CONF.register_opt(router_config)
+
+
+class RoutingDriver(notifier._Driver):
+ NOTIFIER_PLUGIN_NAMESPACE = 'oslo.messaging.notify.drivers'
+
+ plugin_manager = None
+ routing_groups = None # The routing groups from the config file.
+ used_drivers = None # Used driver names, extracted from config file.
+
+ def _should_load_plugin(self, ext, *args, **kwargs):
+ # Hack to keep stevedore from circular importing since these
+ # endpoints are used for different purposes.
+ if ext.name == 'routing':
+ return False
+ return ext.name in self.used_drivers
+
+ def _get_notifier_config_file(self, filename):
+ """Broken out for testing."""
+ return file(filename, 'r')
+
+ def _load_notifiers(self):
+ """One-time load of notifier config file."""
+ self.routing_groups = {}
+ self.used_drivers = set()
+ filename = CONF.routing_notifier_config
+ if not filename:
+ return
+
+ # Infer which drivers are used from the config file.
+ self.routing_groups = yaml.load(
+ self._get_notifier_config_file(filename))
+ if not self.routing_groups:
+ self.routing_groups = {} # In case we got None from load()
+ return
+
+ for group in self.routing_groups.values():
+ self.used_drivers.update(group.keys())
+
+ LOG.debug(_('loading notifiers from %(namespace)s') %
+ {'namespace': self.NOTIFIER_PLUGIN_NAMESPACE})
+ self.plugin_manager = dispatch.DispatchExtensionManager(
+ namespace=self.NOTIFIER_PLUGIN_NAMESPACE,
+ check_func=self._should_load_plugin,
+ invoke_on_load=True,
+ invoke_args=None)
+ if not list(self.plugin_manager):
+ LOG.warning(_("Failed to load any notifiers "
+ "for %(namespace)s") %
+ {'namespace': self.NOTIFIER_PLUGIN_NAMESPACE})
+
+ def _get_drivers_for_message(self, group, event_type, priority):
+ """Which drivers should be called for this event_type
+ or priority.
+ """
+ accepted_drivers = set()
+
+ for driver, rules in six.iteritems(group):
+ checks = []
+ for key, patterns in six.iteritems(rules):
+ if key == 'accepted_events':
+ c = [fnmatch.fnmatch(event_type, p)
+ for p in patterns]
+ checks.append(any(c))
+ if key == 'accepted_priorities':
+ c = [fnmatch.fnmatch(priority, p.lower())
+ for p in patterns]
+ checks.append(any(c))
+ if all(checks):
+ accepted_drivers.add(driver)
+
+ return list(accepted_drivers)
+
+ def _filter_func(self, ext, context, message, accepted_drivers):
+ """True/False if the driver should be called for this message.
+ """
+ # context is unused here, but passed in by map()
+ return ext.name in accepted_drivers
+
+ def _call_notify(self, ext, context, message, accepted_drivers):
+ """Emit the notification.
+ """
+ # accepted_drivers is passed in as a result of the map() function
+ LOG.info(_("Routing '%(event)s' notification to '%(driver)s' driver") %
+ {'event': message.get('event_type'), 'driver': ext.name})
+ ext.obj.notify(context, message)
+
+ def notify(self, context, message):
+ if not self.plugin_manager:
+ self._load_notifiers()
+
+ # Fail if these aren't present ...
+ event_type = message['event_type']
+ priority = message['priority'].lower()
+
+ accepted_drivers = set()
+ for group in self.routing_groups.values():
+ accepted_drivers.update(self._get_drivers_for_message(group,
+ event_type,
+ priority))
+
+ self.plugin_manager.map(self._filter_func, self._call_notify, context,
+ message, list(accepted_drivers))
diff --git a/requirements.txt b/requirements.txt
index 5fe5fdf..1bdf72f 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,7 +9,11 @@ six>=1.4.1
# FIXME(markmc): remove this when the drivers no longer
# import eventlet
+
eventlet>=0.13.0
# used by openstack/common/gettextutils.py
Babel>=1.3
+
+# for the routing notifier
+PyYAML>=3.1.0
diff --git a/setup.cfg b/setup.cfg
index a1e6715..ed81aab 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -48,6 +48,7 @@ oslo.messaging.notify.drivers =
log = oslo.messaging.notify._impl_log:LogDriver
test = oslo.messaging.notify._impl_test:TestDriver
noop = oslo.messaging.notify._impl_noop:NoOpDriver
+ routing = oslo.messaging.notify._impl_routing:RoutingDriver
[build_sphinx]
source-dir = doc/source
diff --git a/tests/test_notifier.py b/tests/test_notifier.py
index 0f8b3cf..14174c7 100644
--- a/tests/test_notifier.py
+++ b/tests/test_notifier.py
@@ -20,11 +20,15 @@ import uuid
import fixtures
import mock
+from stevedore import extension
+from stevedore.tests import manager as test_manager
import testscenarios
+import yaml
from oslo import messaging
from oslo.messaging.notify import _impl_log
from oslo.messaging.notify import _impl_messaging
+from oslo.messaging.notify import _impl_routing as routing
from oslo.messaging.notify import _impl_test
from oslo.messaging.notify import notifier as msg_notifier
from oslo.messaging.openstack.common import jsonutils
@@ -188,7 +192,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
target = messaging.Target(topic='%s.%s' % (topic,
self.priority))
transport._send_notification(target, self.ctxt, message,
- **send_kwargs)
+ **send_kwargs).InAnyOrder()
self.mox.ReplayAll()
@@ -302,3 +306,183 @@ class TestLogNotifier(test_utils.BaseTestCase):
msg = {'event_type': 'foo'}
driver.notify(None, msg, "sample")
+
+
+class TestRoutingNotifier(test_utils.BaseTestCase):
+ def setUp(self):
+ super(TestRoutingNotifier, self).setUp()
+ self.router = routing.RoutingDriver(None, None, None)
+
+ def _fake_extension_manager(self, ext):
+ return test_manager.TestExtensionManager(
+ [extension.Extension('test', None, None, ext), ])
+
+ def _empty_extension_manager(self):
+ return test_manager.TestExtensionManager([])
+
+ def test_should_load_plugin(self):
+ self.router.used_drivers = set(["zoo", "blah"])
+ ext = mock.MagicMock()
+ ext.name = "foo"
+ self.assertFalse(self.router._should_load_plugin(ext))
+ ext.name = "zoo"
+ self.assertTrue(self.router._should_load_plugin(ext))
+
+ def test_load_notifiers_no_config(self):
+ # default routing_notifier_config=""
+ self.router._load_notifiers()
+ self.assertEqual(self.router.routing_groups, {})
+ self.assertEqual(0, len(self.router.used_drivers))
+
+ def test_load_notifiers_no_extensions(self):
+ self.config(routing_notifier_config="routing_notifier.yaml")
+ routing_config = r""
+ config_file = mock.MagicMock()
+ config_file.return_value = routing_config
+
+ with mock.patch.object(self.router, '_get_notifier_config_file',
+ config_file):
+ with mock.patch('stevedore.dispatch.DispatchExtensionManager',
+ return_value=self._empty_extension_manager()):
+ with mock.patch('oslo.messaging.notify.'
+ '_impl_routing.LOG') as mylog:
+ self.router._load_notifiers()
+ self.assertFalse(mylog.debug.called)
+ self.assertEqual(self.router.routing_groups, {})
+
+ def test_load_notifiers_config(self):
+ self.config(routing_notifier_config="routing_notifier.yaml")
+ routing_config = r"""
+group_1:
+ rpc : foo
+group_2:
+ rpc : blah
+ """
+
+ config_file = mock.MagicMock()
+ config_file.return_value = routing_config
+
+ with mock.patch.object(self.router, '_get_notifier_config_file',
+ config_file):
+ with mock.patch('stevedore.dispatch.DispatchExtensionManager',
+ return_value=self._fake_extension_manager(
+ mock.MagicMock())):
+ self.router._load_notifiers()
+ groups = self.router.routing_groups.keys()
+ groups.sort()
+ self.assertEqual(['group_1', 'group_2'], groups)
+
+ def test_get_drivers_for_message_accepted_events(self):
+ config = r"""
+group_1:
+ rpc:
+ accepted_events:
+ - foo.*
+ - blah.zoo.*
+ - zip
+ """
+ groups = yaml.load(config)
+ group = groups['group_1']
+
+ # No matching event ...
+ self.assertEqual([],
+ self.router._get_drivers_for_message(
+ group, "unknown", None))
+
+ # Child of foo ...
+ self.assertEqual(['rpc'],
+ self.router._get_drivers_for_message(
+ group, "foo.1", None))
+
+ # Foo itself ...
+ self.assertEqual([],
+ self.router._get_drivers_for_message(
+ group, "foo", None))
+
+ # Child of blah.zoo
+ self.assertEqual(['rpc'],
+ self.router._get_drivers_for_message(
+ group, "blah.zoo.zing", None))
+
+ def test_get_drivers_for_message_accepted_priorities(self):
+ config = r"""
+group_1:
+ rpc:
+ accepted_priorities:
+ - info
+ - error
+ """
+ groups = yaml.load(config)
+ group = groups['group_1']
+
+ # No matching priority
+ self.assertEqual([],
+ self.router._get_drivers_for_message(
+ group, None, "unknown"))
+
+ # Info ...
+ self.assertEqual(['rpc'],
+ self.router._get_drivers_for_message(
+ group, None, "info"))
+
+ # Error (to make sure the list is getting processed) ...
+ self.assertEqual(['rpc'],
+ self.router._get_drivers_for_message(
+ group, None, "error"))
+
+ def test_get_drivers_for_message_both(self):
+ config = r"""
+group_1:
+ rpc:
+ accepted_priorities:
+ - info
+ accepted_events:
+ - foo.*
+ driver_1:
+ accepted_priorities:
+ - info
+ driver_2:
+ accepted_events:
+ - foo.*
+ """
+ groups = yaml.load(config)
+ group = groups['group_1']
+
+ # Valid event, but no matching priority
+ self.assertEqual(['driver_2'],
+ self.router._get_drivers_for_message(
+ group, 'foo.blah', "unknown"))
+
+ # Valid priority, but no matching event
+ self.assertEqual(['driver_1'],
+ self.router._get_drivers_for_message(
+ group, 'unknown', "info"))
+
+ # Happy day ...
+ x = self.router._get_drivers_for_message(group, 'foo.blah', "info")
+ x.sort()
+ self.assertEqual(['driver_1', 'driver_2', 'rpc'], x)
+
+ def test_filter_func(self):
+ ext = mock.MagicMock()
+ ext.name = "rpc"
+
+ # Good ...
+ self.assertTrue(self.router._filter_func(ext, {}, {},
+ ['foo', 'rpc']))
+
+ # Bad
+ self.assertFalse(self.router._filter_func(ext, {}, {}, ['foo']))
+
+ def test_notify(self):
+ self.router.routing_groups = {'group_1': None, 'group_2': None}
+ message = {'event_type': 'my_event', 'priority': 'my_priority'}
+
+ drivers_mock = mock.MagicMock()
+ drivers_mock.side_effect = [['rpc'], ['foo']]
+
+ with mock.patch.object(self.router, 'plugin_manager') as pm:
+ with mock.patch.object(self.router, '_get_drivers_for_message',
+ drivers_mock):
+ self.router.notify({}, message)
+ self.assertEqual(pm.map.call_args[0][4], ['rpc', 'foo'])