summaryrefslogtreecommitdiff
path: root/ceilometer/publisher
diff options
context:
space:
mode:
Diffstat (limited to 'ceilometer/publisher')
-rw-r--r--ceilometer/publisher/messaging.py50
1 files changed, 35 insertions, 15 deletions
diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py
index 122ac24d..2e4d69fc 100644
--- a/ceilometer/publisher/messaging.py
+++ b/ceilometer/publisher/messaging.py
@@ -24,7 +24,6 @@ from oslo_config import cfg
import six
import six.moves.urllib.parse as urlparse
-import ceilometer
from ceilometer.i18n import _
from ceilometer import messaging
from ceilometer.openstack.common import log
@@ -48,6 +47,11 @@ NOTIFIER_OPTS = [
help='The topic that ceilometer uses for metering '
'notifications.',
),
+ cfg.StrOpt('event_topic',
+ default='event',
+ help='The topic that ceilometer uses for event '
+ 'notifications.',
+ ),
cfg.StrOpt('metering_driver',
default='messagingv2',
help='The driver that ceilometer uses for metering '
@@ -102,7 +106,6 @@ class MessagingPublisher(publisher.PublisherBase):
cfg.CONF.publisher.metering_secret)
for sample in samples
]
-
topic = cfg.CONF.publisher_rpc.metering_topic
self.local_queue.append((context, topic, meters))
@@ -142,18 +145,18 @@ class MessagingPublisher(publisher.PublisherBase):
def _process_queue(self, queue, policy):
while queue:
- context, topic, meters = queue[0]
+ context, topic, data = queue[0]
try:
- self._send(context, topic, meters)
+ self._send(context, topic, data)
except oslo.messaging.MessageDeliveryFailure:
- samples = sum([len(m) for __, __, m in queue])
+ data = sum([len(m) for __, __, m in queue])
if policy == 'queue':
- LOG.warn(_("Failed to publish %d samples, queue them"),
- samples)
+ LOG.warn(_("Failed to publish %d datapoints, queue them"),
+ data)
return queue
elif policy == 'drop':
- LOG.warn(_("Failed to publish %d samples, dropping them"),
- samples)
+ LOG.warn(_("Failed to publish %d datapoints, "
+ "dropping them"), data)
return []
# default, occur only if rabbit_max_retries > 0
raise
@@ -167,7 +170,12 @@ class MessagingPublisher(publisher.PublisherBase):
:param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation
"""
- raise ceilometer.NotImplementedError
+ ev_list = [utils.message_from_event(
+ event, cfg.CONF.publisher.metering_secret) for event in events]
+
+ topic = cfg.CONF.publisher_notifier.event_topic
+ self.local_queue.append((context, topic, ev_list))
+ self.flush()
@abc.abstractmethod
def _send(self, context, topic, meters):
@@ -192,16 +200,28 @@ class RPCPublisher(MessagingPublisher):
class NotifierPublisher(MessagingPublisher):
- def __init__(self, parsed_url):
+ def __init__(self, parsed_url, topic):
super(NotifierPublisher, self).__init__(parsed_url)
self.notifier = oslo.messaging.Notifier(
messaging.get_transport(),
driver=cfg.CONF.publisher_notifier.metering_driver,
- publisher_id='metering.publisher.%s' % cfg.CONF.host,
- topic=cfg.CONF.publisher_notifier.metering_topic,
+ publisher_id='telemetry.publisher.%s' % cfg.CONF.host,
+ topic=topic,
retry=self.retry
)
- def _send(self, context, event_type, meters):
+ def _send(self, context, event_type, data):
self.notifier.sample(context.to_dict(), event_type=event_type,
- payload=meters)
+ payload=data)
+
+
+class SampleNotifierPublisher(NotifierPublisher):
+ def __init__(self, parsed_url):
+ super(SampleNotifierPublisher, self).__init__(
+ parsed_url, cfg.CONF.publisher_notifier.metering_topic)
+
+
+class EventNotifierPublisher(NotifierPublisher):
+ def __init__(self, parsed_url):
+ super(EventNotifierPublisher, self).__init__(
+ parsed_url, cfg.CONF.publisher_notifier.event_topic)