diff options
Diffstat (limited to 'ceilometer/publisher')
-rw-r--r-- | ceilometer/publisher/messaging.py | 50 |
1 files changed, 35 insertions, 15 deletions
diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py index 122ac24d..2e4d69fc 100644 --- a/ceilometer/publisher/messaging.py +++ b/ceilometer/publisher/messaging.py @@ -24,7 +24,6 @@ from oslo_config import cfg import six import six.moves.urllib.parse as urlparse -import ceilometer from ceilometer.i18n import _ from ceilometer import messaging from ceilometer.openstack.common import log @@ -48,6 +47,11 @@ NOTIFIER_OPTS = [ help='The topic that ceilometer uses for metering ' 'notifications.', ), + cfg.StrOpt('event_topic', + default='event', + help='The topic that ceilometer uses for event ' + 'notifications.', + ), cfg.StrOpt('metering_driver', default='messagingv2', help='The driver that ceilometer uses for metering ' @@ -102,7 +106,6 @@ class MessagingPublisher(publisher.PublisherBase): cfg.CONF.publisher.metering_secret) for sample in samples ] - topic = cfg.CONF.publisher_rpc.metering_topic self.local_queue.append((context, topic, meters)) @@ -142,18 +145,18 @@ class MessagingPublisher(publisher.PublisherBase): def _process_queue(self, queue, policy): while queue: - context, topic, meters = queue[0] + context, topic, data = queue[0] try: - self._send(context, topic, meters) + self._send(context, topic, data) except oslo.messaging.MessageDeliveryFailure: - samples = sum([len(m) for __, __, m in queue]) + data = sum([len(m) for __, __, m in queue]) if policy == 'queue': - LOG.warn(_("Failed to publish %d samples, queue them"), - samples) + LOG.warn(_("Failed to publish %d datapoints, queue them"), + data) return queue elif policy == 'drop': - LOG.warn(_("Failed to publish %d samples, dropping them"), - samples) + LOG.warn(_("Failed to publish %d datapoints, " + "dropping them"), data) return [] # default, occur only if rabbit_max_retries > 0 raise @@ -167,7 +170,12 @@ class MessagingPublisher(publisher.PublisherBase): :param context: Execution context from the service or RPC call :param events: events from pipeline after transformation """ - raise ceilometer.NotImplementedError + ev_list = [utils.message_from_event( + event, cfg.CONF.publisher.metering_secret) for event in events] + + topic = cfg.CONF.publisher_notifier.event_topic + self.local_queue.append((context, topic, ev_list)) + self.flush() @abc.abstractmethod def _send(self, context, topic, meters): @@ -192,16 +200,28 @@ class RPCPublisher(MessagingPublisher): class NotifierPublisher(MessagingPublisher): - def __init__(self, parsed_url): + def __init__(self, parsed_url, topic): super(NotifierPublisher, self).__init__(parsed_url) self.notifier = oslo.messaging.Notifier( messaging.get_transport(), driver=cfg.CONF.publisher_notifier.metering_driver, - publisher_id='metering.publisher.%s' % cfg.CONF.host, - topic=cfg.CONF.publisher_notifier.metering_topic, + publisher_id='telemetry.publisher.%s' % cfg.CONF.host, + topic=topic, retry=self.retry ) - def _send(self, context, event_type, meters): + def _send(self, context, event_type, data): self.notifier.sample(context.to_dict(), event_type=event_type, - payload=meters) + payload=data) + + +class SampleNotifierPublisher(NotifierPublisher): + def __init__(self, parsed_url): + super(SampleNotifierPublisher, self).__init__( + parsed_url, cfg.CONF.publisher_notifier.metering_topic) + + +class EventNotifierPublisher(NotifierPublisher): + def __init__(self, parsed_url): + super(EventNotifierPublisher, self).__init__( + parsed_url, cfg.CONF.publisher_notifier.event_topic) |