summaryrefslogtreecommitdiff
path: root/designate/sink/service.py
blob: 71580b49f0e21b8e6db1b8bff30e62f73231f859 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# Copyright 2012 Managed I.T.
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging

from designate import notification_handler
from designate import rpc
from designate import service


LOG = logging.getLogger(__name__)


class Service(service.Service):
    def __init__(self):
        super(Service, self).__init__(
            self.service_name, threads=cfg.CONF['service:sink'].threads
        )

        # Initialize extensions
        self._server = None
        self.handlers = self._init_extensions()
        self.subscribers = self._get_subscribers()

    @property
    def service_name(self):
        return 'sink'

    @staticmethod
    def _init_extensions():
        """Loads and prepares all enabled extensions"""

        enabled_notification_handlers = cfg.CONF[
            'service:sink'].enabled_notification_handlers

        notification_handlers = notification_handler.get_notification_handlers(
            enabled_notification_handlers)

        if len(notification_handlers) == 0:
            LOG.warning('No designate-sink handlers enabled or loaded')

        return notification_handlers

    def _get_subscribers(self):
        subscriptions = {}
        for handler in self.handlers:
            for et in handler.get_event_types():
                subscriptions.setdefault(et, [])
                subscriptions[et].append(handler)
        return subscriptions

    def start(self):
        super(Service, self).start()

        # Setup notification subscriptions and start consuming
        targets = self._get_targets()

        # TODO(ekarlso): Change this is to endpoint objects rather then
        # ourselves?
        if targets:
            self._server = rpc.get_notification_listener(
                targets, [self],
                pool=cfg.CONF['service:sink'].listener_pool_name
            )
            self._server.start()

    def stop(self, graceful=True):
        # Try to shut the connection down, but if we get any sort of
        # errors, go ahead and ignore them.. as we're shutting down anyway
        try:
            if self._server:
                self._server.stop()
        except Exception as e:
            LOG.warning(
                'Unable to gracefully stop the notification listener: %s', e
            )

        super(Service, self).stop(graceful)

    def _get_targets(self):
        """
        Set's up subscriptions for the various exchange+topic combinations that
        we have a handler for.
        """
        targets = []
        for handler in self.handlers:
            exchange, topics = handler.get_exchange_topics()

            for topic in topics:
                target = messaging.Target(exchange=exchange, topic=topic)
                targets.append(target)
        return targets

    def _get_handler_event_types(self):
        """return a dict - keys are the event types we can handle"""
        return self.subscribers

    def info(self, context, publisher_id, event_type, payload, metadata):
        """
        Processes an incoming notification, offering each extension the
        opportunity to handle it.
        """
        # NOTE(zykes): Only bother to actually do processing if there's any
        # matching events, skips logging of things like compute.exists etc.
        if event_type in self._get_handler_event_types():
            for handler in self.handlers:
                if event_type in handler.get_event_types():
                    LOG.debug('Found handler for: %s', event_type)
                    handler.process_notification(context, event_type, payload)