summaryrefslogtreecommitdiff
path: root/oslo_messaging/notify/_impl_routing.py
blob: 0731039f72840655e384844c20b2835f15781229 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Copyright 2014 Rackspace Hosting
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import logging

from oslo_config import cfg
from oslo_utils import fnmatch
from stevedore import dispatch
import yaml

from oslo_messaging.notify import notifier


LOG = logging.getLogger(__name__)

router_config = cfg.StrOpt('routing_config', default='',
                           deprecated_group='DEFAULT',
                           deprecated_name='routing_notifier_config',
                           help='RoutingNotifier configuration file location.')

CONF = cfg.CONF
CONF.register_opt(router_config, group='oslo_messaging_notifications')


class RoutingDriver(notifier.Driver):
    NOTIFIER_PLUGIN_NAMESPACE = 'oslo.messaging.notify.drivers'

    plugin_manager = None
    routing_groups = None  # The routing groups from the config file.
    used_drivers = None  # Used driver names, extracted from config file.

    def _should_load_plugin(self, ext, *args, **kwargs):
        # Hack to keep stevedore from circular importing since these
        # endpoints are used for different purposes.
        if ext.name == 'routing':
            return False
        return ext.name in self.used_drivers

    def _get_notifier_config_file(self, filename):
        """Broken out for testing."""
        return open(filename, 'r')

    def _load_notifiers(self):
        """One-time load of notifier config file."""
        self.routing_groups = {}
        self.used_drivers = set()
        filename = CONF.oslo_messaging_notifications.routing_config
        if not filename:
            return

        # Infer which drivers are used from the config file.
        self.routing_groups = yaml.safe_load(
            self._get_notifier_config_file(filename))
        if not self.routing_groups:
            self.routing_groups = {}  # In case we got None from load()
            return

        for group in self.routing_groups.values():
            self.used_drivers.update(group.keys())

        LOG.debug('loading notifiers from %s', self.NOTIFIER_PLUGIN_NAMESPACE)
        self.plugin_manager = dispatch.DispatchExtensionManager(
            namespace=self.NOTIFIER_PLUGIN_NAMESPACE,
            check_func=self._should_load_plugin,
            invoke_on_load=True,
            invoke_args=None)
        if not list(self.plugin_manager):
            LOG.warning("Failed to load any notifiers for %s",
                        self.NOTIFIER_PLUGIN_NAMESPACE)

    def _get_drivers_for_message(self, group, event_type, priority):
        """Which drivers should be called for this event_type
           or priority.
        """
        accepted_drivers = set()

        for driver, rules in group.items():
            checks = []
            for key, patterns in rules.items():
                if key == 'accepted_events':
                    c = [fnmatch.fnmatch(event_type, p)
                         for p in patterns]
                    checks.append(any(c))
                if key == 'accepted_priorities':
                    c = [fnmatch.fnmatch(priority, p.lower())
                         for p in patterns]
                    checks.append(any(c))
            if all(checks):
                accepted_drivers.add(driver)

        return list(accepted_drivers)

    def _filter_func(self, ext, context, message, priority, retry,
                     accepted_drivers):
        """True/False if the driver should be called for this message.
        """
        # context is unused here, but passed in by map()
        return ext.name in accepted_drivers

    def _call_notify(self, ext, context, message, priority, retry,
                     accepted_drivers):
        """Emit the notification.
        """
        # accepted_drivers is passed in as a result of the map() function
        LOG.info("Routing '%(event)s' notification to '%(driver)s' "
                 "driver",
                 {'event': message.get('event_type'), 'driver': ext.name})
        ext.obj.notify(context, message, priority, retry)

    def notify(self, context, message, priority, retry):
        if not self.plugin_manager:
            self._load_notifiers()

        # Fail if these aren't present ...
        event_type = message['event_type']

        accepted_drivers = set()
        for group in self.routing_groups.values():
            accepted_drivers.update(
                self._get_drivers_for_message(group, event_type,
                                              priority.lower()))
        self.plugin_manager.map(self._filter_func, self._call_notify, context,
                                message, priority, retry,
                                list(accepted_drivers))