1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
#
# Copyright 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base class for plugins.
"""
import abc
import collections
import fnmatch
import oslo.messaging
import six
from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
LOG = log.getLogger(__name__)
ExchangeTopics = collections.namedtuple('ExchangeTopics',
['exchange', 'topics'])
class PluginBase(object):
"""Base class for all plugins."""
@six.add_metaclass(abc.ABCMeta)
class NotificationBase(PluginBase):
"""Base class for plugins that support the notification API."""
def __init__(self, pipeline_manager):
super(NotificationBase, self).__init__()
self.pipeline_manager = pipeline_manager
@abc.abstractproperty
def event_types(self):
"""Return a sequence of strings.
Strings are defining the event types to be given to this plugin.
"""
def get_targets(self, conf):
"""Return a sequence of oslo.messaging.Target.
Sequence is defining the exchange and topics to be connected for this
plugin.
:param conf: Configuration.
"""
# TODO(sileht): Backwards compatibility, remove in J+1
if hasattr(self, 'get_exchange_topics'):
LOG.warn(_('get_exchange_topics API of NotificationPlugin is'
'deprecated, implements get_targets instead.'))
targets = []
for exchange, topics in self.get_exchange_topics(conf):
targets.extend(oslo.messaging.Target(topic=topic,
exchange=exchange)
for topic in topics)
return targets
@abc.abstractmethod
def process_notification(self, message):
"""Return a sequence of Counter instances for the given message.
:param message: Message to process.
"""
@staticmethod
def _handle_event_type(event_type, event_type_to_handle):
"""Check whether event_type should be handled.
It is according to event_type_to_handle.
"""
return any(map(lambda e: fnmatch.fnmatch(event_type, e),
event_type_to_handle))
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""RPC endpoint for notification messages
When another service sends a notification over the message
bus, this method receives it.
:param ctxt: oslo.messaging context
:param publisher_id: publisher of the notification
:param event_type: type of notification
:param payload: notification payload
:param metadata: metadata about the notification
"""
notification = messaging.convert_to_old_notification_format(
'info', ctxt, publisher_id, event_type, payload, metadata)
self.to_samples_and_publish(context.get_admin_context(), notification)
def to_samples_and_publish(self, context, notification):
"""Return samples produced by *process_notification*.
Samples produced for the given notification.
:param context: Execution context from the service or RPC call
:param notification: The notification to process.
"""
# TODO(sileht): this will be moved into oslo.messaging
# see oslo.messaging bp notification-dispatcher-filter
if not self._handle_event_type(notification['event_type'],
self.event_types):
return
with self.pipeline_manager.publisher(context) as p:
p(list(self.process_notification(notification)))
@six.add_metaclass(abc.ABCMeta)
class PollsterBase(PluginBase):
"""Base class for plugins that support the polling API."""
@abc.abstractmethod
def get_samples(self, manager, cache, resources=None):
"""Return a sequence of Counter instances from polling the resources.
:param manager: The service manager class invoking the plugin.
:param cache: A dictionary to allow pollsters to pass data
between themselves when recomputing it would be
expensive (e.g., asking another service for a
list of objects).
:param resources: A list of the endpoints the pollster will get data
from. It's up to the specific pollster to decide
how to use it.
"""
@six.add_metaclass(abc.ABCMeta)
class DiscoveryBase(object):
@abc.abstractmethod
def discover(self, param=None):
"""Discover resources to monitor.
:param param: an optional parameter to guide the discovery
"""
|