summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ceilometer/agent/plugin_base.py2
-rw-r--r--ceilometer/collector.py116
-rw-r--r--ceilometer/dispatcher/database.py3
-rwxr-xr-xceilometer/dispatcher/http.py3
-rw-r--r--ceilometer/event/endpoint.py2
-rw-r--r--ceilometer/notification.py18
-rw-r--r--ceilometer/publisher/direct.py3
-rw-r--r--ceilometer/publisher/messaging.py10
-rw-r--r--ceilometer/publisher/udp.py3
-rw-r--r--ceilometer/publisher/utils.py6
-rw-r--r--ceilometer/tests/api/v2/test_acl_scenarios.py3
-rw-r--r--ceilometer/tests/api/v2/test_complex_query_scenarios.py3
-rw-r--r--ceilometer/tests/api/v2/test_list_events_scenarios.py6
-rw-r--r--ceilometer/tests/api/v2/test_list_meters_scenarios.py3
-rw-r--r--ceilometer/tests/api/v2/test_list_resources_scenarios.py56
-rw-r--r--ceilometer/tests/api/v2/test_statistics_scenarios.py27
-rw-r--r--ceilometer/tests/dispatcher/test_db.py9
-rw-r--r--ceilometer/tests/dispatcher/test_file.py6
-rwxr-xr-xceilometer/tests/dispatcher/test_http.py12
-rw-r--r--ceilometer/tests/publisher/test_udp.py2
-rw-r--r--ceilometer/tests/storage/test_storage_scenarios.py16
-rw-r--r--ceilometer/tests/test_collector.py52
-rw-r--r--ceilometer/utils.py9
-rwxr-xr-xdoc/source/install/manual.rst16
-rwxr-xr-xtools/make_test_data.py3
25 files changed, 220 insertions, 169 deletions
diff --git a/ceilometer/agent/plugin_base.py b/ceilometer/agent/plugin_base.py
index 1ec89a68..0409e906 100644
--- a/ceilometer/agent/plugin_base.py
+++ b/ceilometer/agent/plugin_base.py
@@ -175,7 +175,7 @@ class NotificationBase(PluginBase):
if self.requeue:
meters = [
utils.meter_message_from_counter(
- sample, cfg.CONF.publisher.metering_secret)
+ sample, cfg.CONF.publisher.telemetry_secret)
for sample in self.process_notification(notification)
]
for notifier in self.transporter:
diff --git a/ceilometer/collector.py b/ceilometer/collector.py
index 46517ccc..6b022cd2 100644
--- a/ceilometer/collector.py
+++ b/ceilometer/collector.py
@@ -18,13 +18,16 @@ import socket
import msgpack
import oslo.messaging
from oslo_config import cfg
+from oslo_utils import timeutils
from oslo_utils import units
from ceilometer import dispatcher
+from ceilometer.event.storage import models
from ceilometer import messaging
from ceilometer.i18n import _, _LE
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
+from ceilometer import utils
OPTS = [
cfg.StrOpt('udp_address',
@@ -39,13 +42,21 @@ OPTS = [
help='Requeue the sample on the collector sample queue '
'when the collector fails to dispatch it. This is only valid '
'if the sample come from the notifier publisher.'),
+ cfg.BoolOpt('requeue_event_on_dispatcher_error',
+ default=False,
+ help='Requeue the event on the collector event queue '
+ 'when the collector fails to dispatch it.'),
]
cfg.CONF.register_opts(OPTS, group="collector")
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
- group="publisher_rpc")
+ group='publisher_rpc')
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
- group="publisher_notifier")
+ group='publisher_notifier')
+cfg.CONF.import_opt('event_topic', 'ceilometer.publisher.messaging',
+ group='publisher_notifier')
+cfg.CONF.import_opt('store_events', 'ceilometer.notification',
+ group='notification')
LOG = log.getLogger(__name__)
@@ -58,26 +69,38 @@ class CollectorService(os_service.Service):
# ensure dispatcher is configured before starting other services
self.dispatcher_manager = dispatcher.load_dispatcher_manager()
self.rpc_server = None
- self.notification_server = None
+ self.sample_listener = None
+ self.event_listener = None
super(CollectorService, self).start()
if cfg.CONF.collector.udp_address:
self.tg.add_thread(self.start_udp)
- allow_requeue = cfg.CONF.collector.requeue_sample_on_dispatcher_error
transport = messaging.get_transport(optional=True)
if transport:
self.rpc_server = messaging.get_rpc_server(
transport, cfg.CONF.publisher_rpc.metering_topic, self)
- target = oslo.messaging.Target(
+ sample_target = oslo.messaging.Target(
topic=cfg.CONF.publisher_notifier.metering_topic)
- self.notification_server = messaging.get_notification_listener(
- transport, [target], [self],
- allow_requeue=allow_requeue)
+ self.sample_listener = messaging.get_notification_listener(
+ transport, [sample_target],
+ [SampleEndpoint(self.dispatcher_manager)],
+ allow_requeue=(cfg.CONF.collector.
+ requeue_sample_on_dispatcher_error))
+
+ if cfg.CONF.notification.store_events:
+ event_target = oslo.messaging.Target(
+ topic=cfg.CONF.publisher_notifier.event_topic)
+ self.event_listener = messaging.get_notification_listener(
+ transport, [event_target],
+ [EventEndpoint(self.dispatcher_manager)],
+ allow_requeue=(cfg.CONF.collector.
+ requeue_event_on_dispatcher_error))
+ self.event_listener.start()
self.rpc_server.start()
- self.notification_server.start()
+ self.sample_listener.start()
if not cfg.CONF.collector.udp_address:
# Add a dummy thread to have wait() working
@@ -110,31 +133,78 @@ class CollectorService(os_service.Service):
self.udp_run = False
if self.rpc_server:
self.rpc_server.stop()
- if self.notification_server:
- self.notification_server.stop()
+ if self.sample_listener:
+ utils.kill_listeners([self.sample_listener])
+ if self.event_listener:
+ utils.kill_listeners([self.event_listener])
super(CollectorService, self).stop()
+ def record_metering_data(self, context, data):
+ """RPC endpoint for messages we send to ourselves.
+
+ When the notification messages are re-published through the
+ RPC publisher, this method receives them for processing.
+ """
+ self.dispatcher_manager.map_method('record_metering_data', data=data)
+
+
+class CollectorEndpoint(object):
+ def __init__(self, dispatcher_manager, requeue_on_error):
+ self.dispatcher_manager = dispatcher_manager
+ self.requeue_on_error = requeue_on_error
+
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
"""RPC endpoint for notification messages
When another service sends a notification over the message
bus, this method receives it.
-
"""
try:
- self.dispatcher_manager.map_method('record_metering_data',
- data=payload)
+ self.dispatcher_manager.map_method(self.method, payload)
except Exception:
- if cfg.CONF.collector.requeue_sample_on_dispatcher_error:
- LOG.exception(_LE("Dispatcher failed to handle the sample, "
- "requeue it."))
+ if self.requeue_on_error:
+ LOG.exception(_LE("Dispatcher failed to handle the %s, "
+ "requeue it."), self.ep_type)
return oslo.messaging.NotificationResult.REQUEUE
raise
- def record_metering_data(self, context, data):
- """RPC endpoint for messages we send to ourselves.
- When the notification messages are re-published through the
- RPC publisher, this method receives them for processing.
- """
- self.dispatcher_manager.map_method('record_metering_data', data=data)
+class SampleEndpoint(CollectorEndpoint):
+ method = 'record_metering_data'
+ ep_type = 'sample'
+
+ def __init__(self, dispatcher_manager):
+ super(SampleEndpoint, self).__init__(
+ dispatcher_manager,
+ cfg.CONF.collector.requeue_sample_on_dispatcher_error)
+
+
+class EventEndpoint(CollectorEndpoint):
+ method = 'record_events'
+ ep_type = 'event'
+
+ def __init__(self, dispatcher_manager):
+ super(EventEndpoint, self).__init__(
+ dispatcher_manager,
+ cfg.CONF.collector.requeue_event_on_dispatcher_error)
+
+ def sample(self, ctxt, publisher_id, event_type, payload, metadata):
+ events = []
+ for ev in payload:
+ try:
+ events.append(
+ models.Event(
+ message_id=ev['message_id'],
+ event_type=ev['event_type'],
+ generated=timeutils.normalize_time(
+ timeutils.parse_isotime(ev['generated'])),
+ traits=[models.Trait(
+ name, dtype,
+ models.Trait.convert_value(dtype, value))
+ for name, dtype, value in ev['traits']])
+ )
+ except Exception:
+ LOG.exception(_LE("Error processing event and it will be "
+ "dropped: %s"), ev)
+ return super(EventEndpoint, self).sample(
+ ctxt, publisher_id, event_type, events, metadata)
diff --git a/ceilometer/dispatcher/database.py b/ceilometer/dispatcher/database.py
index ea555765..2a80bb8d 100644
--- a/ceilometer/dispatcher/database.py
+++ b/ceilometer/dispatcher/database.py
@@ -78,8 +78,7 @@ class DatabaseDispatcher(dispatcher.Base):
'timestamp': meter.get('timestamp', 'NO TIMESTAMP'),
'counter_volume': meter['counter_volume']}))
if publisher_utils.verify_signature(
- meter,
- self.conf.publisher.metering_secret):
+ meter, self.conf.publisher.telemetry_secret):
try:
# Convert the timestamp to a datetime instance.
# Storage engines are responsible for converting
diff --git a/ceilometer/dispatcher/http.py b/ceilometer/dispatcher/http.py
index 430c6b4b..3edcac90 100755
--- a/ceilometer/dispatcher/http.py
+++ b/ceilometer/dispatcher/http.py
@@ -85,8 +85,7 @@ class HttpDispatcher(dispatcher.Base):
'timestamp': meter.get('timestamp', 'NO TIMESTAMP'),
'counter_volume': meter['counter_volume']}))
if publisher_utils.verify_signature(
- meter,
- self.conf.publisher.metering_secret):
+ meter, self.conf.publisher.telemetry_secret):
try:
if self.cadf_only:
# Only cadf messages are being wanted.
diff --git a/ceilometer/event/endpoint.py b/ceilometer/event/endpoint.py
index 65cc7f6a..422e36db 100644
--- a/ceilometer/event/endpoint.py
+++ b/ceilometer/event/endpoint.py
@@ -68,7 +68,7 @@ class EventsNotificationEndpoint(object):
self.ctxt.to_dict(),
event_type='pipeline.event',
payload=[utils.message_from_event(
- event, cfg.CONF.publisher.metering_secret)])
+ event, cfg.CONF.publisher.telemetry_secret)])
else:
with self.transporter.publisher(self.ctxt) as p:
p(event)
diff --git a/ceilometer/notification.py b/ceilometer/notification.py
index 09fa8382..2b96455e 100644
--- a/ceilometer/notification.py
+++ b/ceilometer/notification.py
@@ -25,6 +25,7 @@ from ceilometer import messaging
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
from ceilometer import pipeline
+from ceilometer import utils
LOG = log.getLogger(__name__)
@@ -52,7 +53,7 @@ OPTS = [
]
cfg.CONF.register_opts(OPTS, group="notification")
-cfg.CONF.import_opt('metering_driver', 'ceilometer.publisher.messaging',
+cfg.CONF.import_opt('telemetry_driver', 'ceilometer.publisher.messaging',
group='publisher_notifier')
@@ -80,7 +81,7 @@ class NotificationService(os_service.Service):
def _get_notifier(self, transport, pipe):
return oslo.messaging.Notifier(
transport,
- driver=cfg.CONF.publisher_notifier.metering_driver,
+ driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id='ceilometer.notification',
topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))
@@ -174,17 +175,8 @@ class NotificationService(os_service.Service):
listener.start()
self.listeners.append(listener)
- @staticmethod
- def _kill_listeners(listeners):
- # NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
- # which stops new messages, and wait(), which processes remaining
- # messages and closes connection
- for listener in listeners:
- listener.stop()
- listener.wait()
-
def _refresh_agent(self, event):
- self._kill_listeners(self.pipeline_listeners)
+ utils.kill_listeners(self.pipeline_listeners)
self._configure_pipeline_listeners()
def _configure_pipeline_listeners(self):
@@ -210,5 +202,5 @@ class NotificationService(os_service.Service):
def stop(self):
self.partition_coordinator.leave_group(self.group_id)
- self._kill_listeners(self.listeners + self.pipeline_listeners)
+ utils.kill_listeners(self.listeners + self.pipeline_listeners)
super(NotificationService, self).stop()
diff --git a/ceilometer/publisher/direct.py b/ceilometer/publisher/direct.py
index 7b3d071b..14180ae7 100644
--- a/ceilometer/publisher/direct.py
+++ b/ceilometer/publisher/direct.py
@@ -42,8 +42,7 @@ class DirectPublisher(publisher.PublisherBase):
# Transform the Sample objects into a list of dicts
meters = [
utils.meter_message_from_counter(
- sample,
- cfg.CONF.publisher.metering_secret)
+ sample, cfg.CONF.publisher.telemetry_secret)
for sample in samples
]
diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py
index 2e4d69fc..0ccbb69b 100644
--- a/ceilometer/publisher/messaging.py
+++ b/ceilometer/publisher/messaging.py
@@ -52,10 +52,11 @@ NOTIFIER_OPTS = [
help='The topic that ceilometer uses for event '
'notifications.',
),
- cfg.StrOpt('metering_driver',
+ cfg.StrOpt('telemetry_driver',
default='messagingv2',
help='The driver that ceilometer uses for metering '
'notifications.',
+ deprecated_name='metering_driver',
)
]
@@ -102,8 +103,7 @@ class MessagingPublisher(publisher.PublisherBase):
meters = [
utils.meter_message_from_counter(
- sample,
- cfg.CONF.publisher.metering_secret)
+ sample, cfg.CONF.publisher.telemetry_secret)
for sample in samples
]
topic = cfg.CONF.publisher_rpc.metering_topic
@@ -171,7 +171,7 @@ class MessagingPublisher(publisher.PublisherBase):
:param events: events from pipeline after transformation
"""
ev_list = [utils.message_from_event(
- event, cfg.CONF.publisher.metering_secret) for event in events]
+ event, cfg.CONF.publisher.telemetry_secret) for event in events]
topic = cfg.CONF.publisher_notifier.event_topic
self.local_queue.append((context, topic, ev_list))
@@ -204,7 +204,7 @@ class NotifierPublisher(MessagingPublisher):
super(NotifierPublisher, self).__init__(parsed_url)
self.notifier = oslo.messaging.Notifier(
messaging.get_transport(),
- driver=cfg.CONF.publisher_notifier.metering_driver,
+ driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id='telemetry.publisher.%s' % cfg.CONF.host,
topic=topic,
retry=self.retry
diff --git a/ceilometer/publisher/udp.py b/ceilometer/publisher/udp.py
index 4da2de8e..718161f0 100644
--- a/ceilometer/publisher/udp.py
+++ b/ceilometer/publisher/udp.py
@@ -50,8 +50,7 @@ class UDPPublisher(publisher.PublisherBase):
for sample in samples:
msg = utils.meter_message_from_counter(
- sample,
- cfg.CONF.publisher.metering_secret)
+ sample, cfg.CONF.publisher.telemetry_secret)
host = self.host
port = self.port
LOG.debug(_("Publishing sample %(msg)s over UDP to "
diff --git a/ceilometer/publisher/utils.py b/ceilometer/publisher/utils.py
index 21ddbd86..824a3b21 100644
--- a/ceilometer/publisher/utils.py
+++ b/ceilometer/publisher/utils.py
@@ -24,14 +24,16 @@ import six
from ceilometer import utils
OPTS = [
- cfg.StrOpt('metering_secret',
+ cfg.StrOpt('telemetry_secret',
secret=True,
default='change this or be hacked',
help='Secret value for signing metering messages.',
deprecated_opts=[cfg.DeprecatedOpt("metering_secret",
"DEFAULT"),
cfg.DeprecatedOpt("metering_secret",
- "publisher_rpc")]
+ "publisher_rpc"),
+ cfg.DeprecatedOpt("metering_secret",
+ "publisher")]
),
]
cfg.CONF.register_opts(OPTS, group="publisher")
diff --git a/ceilometer/tests/api/v2/test_acl_scenarios.py b/ceilometer/tests/api/v2/test_acl_scenarios.py
index 551f5045..b00ec7b1 100644
--- a/ceilometer/tests/api/v2/test_acl_scenarios.py
+++ b/ceilometer/tests/api/v2/test_acl_scenarios.py
@@ -101,8 +101,7 @@ class TestAPIACL(v2.FunctionalTest,
'tag': 'self.sample4'},
source='test_source')]:
msg = utils.meter_message_from_counter(
- cnt,
- self.CONF.publisher.metering_secret)
+ cnt, self.CONF.publisher.telemetry_secret)
self.conn.record_metering_data(msg)
def get_json(self, path, expect_errors=False, headers=None,
diff --git a/ceilometer/tests/api/v2/test_complex_query_scenarios.py b/ceilometer/tests/api/v2/test_complex_query_scenarios.py
index 444ac4b3..32bd13b2 100644
--- a/ceilometer/tests/api/v2/test_complex_query_scenarios.py
+++ b/ceilometer/tests/api/v2/test_complex_query_scenarios.py
@@ -89,8 +89,7 @@ class TestQueryMetersController(tests_api.FunctionalTest,
source='test_source')]:
msg = utils.meter_message_from_counter(
- cnt,
- self.CONF.publisher.metering_secret)
+ cnt, self.CONF.publisher.telemetry_secret)
self.conn.record_metering_data(msg)
def test_query_fields_are_optional(self):
diff --git a/ceilometer/tests/api/v2/test_list_events_scenarios.py b/ceilometer/tests/api/v2/test_list_events_scenarios.py
index a3253d6f..5aff5db0 100644
--- a/ceilometer/tests/api/v2/test_list_events_scenarios.py
+++ b/ceilometer/tests/api/v2/test_list_events_scenarios.py
@@ -54,8 +54,7 @@ class TestListEvents(v2.FunctionalTest,
source='test_source',
)
msg = utils.meter_message_from_counter(
- self.sample1,
- self.CONF.publisher.metering_secret,
+ self.sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -74,8 +73,7 @@ class TestListEvents(v2.FunctionalTest,
source='source2',
)
msg2 = utils.meter_message_from_counter(
- self.sample2,
- self.CONF.publisher.metering_secret,
+ self.sample2, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg2)
diff --git a/ceilometer/tests/api/v2/test_list_meters_scenarios.py b/ceilometer/tests/api/v2/test_list_meters_scenarios.py
index fbc5016c..37b2c788 100644
--- a/ceilometer/tests/api/v2/test_list_meters_scenarios.py
+++ b/ceilometer/tests/api/v2/test_list_meters_scenarios.py
@@ -177,8 +177,7 @@ class TestListMeters(v2.FunctionalTest,
resource_metadata={},
source='test_source1')]:
msg = utils.meter_message_from_counter(
- cnt,
- self.CONF.publisher.metering_secret)
+ cnt, self.CONF.publisher.telemetry_secret)
self.messages.append(msg)
self.conn.record_metering_data(msg)
diff --git a/ceilometer/tests/api/v2/test_list_resources_scenarios.py b/ceilometer/tests/api/v2/test_list_resources_scenarios.py
index cf95205b..eef10ba1 100644
--- a/ceilometer/tests/api/v2/test_list_resources_scenarios.py
+++ b/ceilometer/tests/api/v2/test_list_resources_scenarios.py
@@ -61,8 +61,7 @@ class TestListResources(v2.FunctionalTest,
source='test',
)
msg = utils.meter_message_from_counter(
- sample1,
- self.CONF.publisher.metering_secret,
+ sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -90,8 +89,7 @@ class TestListResources(v2.FunctionalTest,
source='test',
)
msg = utils.meter_message_from_counter(
- sample1,
- self.CONF.publisher.metering_secret,
+ sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -110,8 +108,7 @@ class TestListResources(v2.FunctionalTest,
source='test',
)
msg2 = utils.meter_message_from_counter(
- sample2,
- self.CONF.publisher.metering_secret,
+ sample2, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg2)
@@ -144,7 +141,7 @@ class TestListResources(v2.FunctionalTest,
)
msg = utils.meter_message_from_counter(
datapoint,
- self.CONF.publisher.metering_secret,
+ self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -169,8 +166,7 @@ class TestListResources(v2.FunctionalTest,
source='test',
)
msg = utils.meter_message_from_counter(
- sample1,
- self.CONF.publisher.metering_secret,
+ sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -189,8 +185,7 @@ class TestListResources(v2.FunctionalTest,
source='test',
)
msg2 = utils.meter_message_from_counter(
- sample2,
- self.CONF.publisher.metering_secret,
+ sample2, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg2)
@@ -213,8 +208,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources',
)
msg = utils.meter_message_from_counter(
- sample1,
- self.CONF.publisher.metering_secret,
+ sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -233,8 +227,7 @@ class TestListResources(v2.FunctionalTest,
source='not-test',
)
msg2 = utils.meter_message_from_counter(
- sample2,
- self.CONF.publisher.metering_secret,
+ sample2, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg2)
@@ -262,8 +255,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources',
)
msg = utils.meter_message_from_counter(
- sample1,
- self.CONF.publisher.metering_secret,
+ sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -282,8 +274,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources',
)
msg2 = utils.meter_message_from_counter(
- sample2,
- self.CONF.publisher.metering_secret,
+ sample2, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg2)
@@ -315,8 +306,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources',
)
msg = utils.meter_message_from_counter(
- sample1,
- self.CONF.publisher.metering_secret,
+ sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -335,8 +325,7 @@ class TestListResources(v2.FunctionalTest,
source='not-test',
)
msg2 = utils.meter_message_from_counter(
- sample2,
- self.CONF.publisher.metering_secret,
+ sample2, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg2)
@@ -362,8 +351,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources',
)
msg = utils.meter_message_from_counter(
- sample1,
- self.CONF.publisher.metering_secret,
+ sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -382,8 +370,7 @@ class TestListResources(v2.FunctionalTest,
source='not-test',
)
msg2 = utils.meter_message_from_counter(
- sample2,
- self.CONF.publisher.metering_secret,
+ sample2, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg2)
@@ -409,8 +396,7 @@ class TestListResources(v2.FunctionalTest,
source='not-test',
)
msg2 = utils.meter_message_from_counter(
- sample1,
- self.CONF.publisher.metering_secret,
+ sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg2)
@@ -436,8 +422,7 @@ class TestListResources(v2.FunctionalTest,
source='not-test',
)
msg2 = utils.meter_message_from_counter(
- sample1,
- self.CONF.publisher.metering_secret,
+ sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg2)
@@ -465,8 +450,7 @@ class TestListResources(v2.FunctionalTest,
source='test',
)
msg = utils.meter_message_from_counter(
- sample1,
- self.CONF.publisher.metering_secret,
+ sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -494,8 +478,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources',
)
msg = utils.meter_message_from_counter(
- sample1,
- self.CONF.publisher.metering_secret,
+ sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -526,8 +509,7 @@ class TestListResources(v2.FunctionalTest,
source='test_list_resources',
)
msg = utils.meter_message_from_counter(
- sample1,
- self.CONF.publisher.metering_secret,
+ sample1, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
diff --git a/ceilometer/tests/api/v2/test_statistics_scenarios.py b/ceilometer/tests/api/v2/test_statistics_scenarios.py
index d36fd462..aa1e53e7 100644
--- a/ceilometer/tests/api/v2/test_statistics_scenarios.py
+++ b/ceilometer/tests/api/v2/test_statistics_scenarios.py
@@ -45,8 +45,7 @@ class TestMaxProjectVolume(v2.FunctionalTest,
source='source1',
)
msg = utils.meter_message_from_counter(
- s,
- self.CONF.publisher.metering_secret,
+ s, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -143,8 +142,7 @@ class TestMaxResourceVolume(v2.FunctionalTest,
source='source1',
)
msg = utils.meter_message_from_counter(
- s,
- self.CONF.publisher.metering_secret,
+ s, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -273,8 +271,7 @@ class TestSumProjectVolume(v2.FunctionalTest,
source='source1',
)
msg = utils.meter_message_from_counter(
- s,
- self.CONF.publisher.metering_secret,
+ s, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -373,8 +370,7 @@ class TestSumResourceVolume(v2.FunctionalTest,
source='source1',
)
msg = utils.meter_message_from_counter(
- s,
- self.CONF.publisher.metering_secret,
+ s, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -530,8 +526,7 @@ class TestGroupByInstance(v2.FunctionalTest,
source=test_sample['source'],
)
msg = utils.meter_message_from_counter(
- c,
- self.CONF.publisher.metering_secret,
+ c, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -1281,8 +1276,7 @@ class TestGroupBySource(v2.FunctionalTest,
source=test_sample['source'],
)
msg = utils.meter_message_from_counter(
- c,
- self.CONF.publisher.metering_secret,
+ c, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -1383,8 +1377,7 @@ class TestSelectableAggregates(v2.FunctionalTest,
source=test_sample['source'],
)
msg = utils.meter_message_from_counter(
- c,
- self.CONF.publisher.metering_secret,
+ c, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -1482,8 +1475,7 @@ class TestSelectableAggregates(v2.FunctionalTest,
source='source',
)
msg = utils.meter_message_from_counter(
- s,
- self.CONF.publisher.metering_secret,
+ s, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -1635,8 +1627,7 @@ class TestUnparameterizedAggregates(v2.FunctionalTest,
source=test_sample['source'],
)
msg = utils.meter_message_from_counter(
- c,
- self.CONF.publisher.metering_secret,
+ c, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
diff --git a/ceilometer/tests/dispatcher/test_db.py b/ceilometer/tests/dispatcher/test_db.py
index 906cf513..95a603f2 100644
--- a/ceilometer/tests/dispatcher/test_db.py
+++ b/ceilometer/tests/dispatcher/test_db.py
@@ -48,8 +48,7 @@ class TestDispatcherDB(base.BaseTestCase):
'counter_volume': 1,
}
msg['message_signature'] = utils.compute_signature(
- msg,
- self.CONF.publisher.metering_secret,
+ msg, self.CONF.publisher.telemetry_secret,
)
with mock.patch.object(self.dispatcher.meter_conn,
@@ -85,8 +84,7 @@ class TestDispatcherDB(base.BaseTestCase):
'timestamp': '2012-07-02T13:53:40Z',
}
msg['message_signature'] = utils.compute_signature(
- msg,
- self.CONF.publisher.metering_secret,
+ msg, self.CONF.publisher.telemetry_secret,
)
expected = msg.copy()
@@ -105,8 +103,7 @@ class TestDispatcherDB(base.BaseTestCase):
'timestamp': '2012-09-30T15:31:50.262-08:00',
}
msg['message_signature'] = utils.compute_signature(
- msg,
- self.CONF.publisher.metering_secret,
+ msg, self.CONF.publisher.telemetry_secret,
)
expected = msg.copy()
diff --git a/ceilometer/tests/dispatcher/test_file.py b/ceilometer/tests/dispatcher/test_file.py
index df3b5edb..68617301 100644
--- a/ceilometer/tests/dispatcher/test_file.py
+++ b/ceilometer/tests/dispatcher/test_file.py
@@ -52,8 +52,7 @@ class TestDispatcherFile(base.BaseTestCase):
'counter_volume': 1,
}
msg['message_signature'] = utils.compute_signature(
- msg,
- self.CONF.publisher.metering_secret,
+ msg, self.CONF.publisher.telemetry_secret,
)
# The record_metering_data method should exist and not produce errors.
@@ -84,8 +83,7 @@ class TestDispatcherFile(base.BaseTestCase):
'counter_volume': 1,
}
msg['message_signature'] = utils.compute_signature(
- msg,
- self.CONF.publisher.metering_secret,
+ msg, self.CONF.publisher.telemetry_secret,
)
# The record_metering_data method should exist and not produce errors.
diff --git a/ceilometer/tests/dispatcher/test_http.py b/ceilometer/tests/dispatcher/test_http.py
index 10ced3e6..5d728dac 100755
--- a/ceilometer/tests/dispatcher/test_http.py
+++ b/ceilometer/tests/dispatcher/test_http.py
@@ -32,8 +32,7 @@ class TestDispatcherHttp(base.BaseTestCase):
'counter_volume': 1,
}
self.msg['message_signature'] = utils.compute_signature(
- self.msg,
- self.CONF.publisher.metering_secret,
+ self.msg, self.CONF.publisher.telemetry_secret,
)
def test_http_dispatcher_config_options(self):
@@ -78,8 +77,7 @@ class TestDispatcherHttp(base.BaseTestCase):
self.msg['resource_metadata'] = {'request': {'NONE_CADF_EVENT': {
'q1': 'v1', 'q2': 'v2'}, }, }
self.msg['message_signature'] = utils.compute_signature(
- self.msg,
- self.CONF.publisher.metering_secret,
+ self.msg, self.CONF.publisher.telemetry_secret,
)
with mock.patch.object(requests, 'post') as post:
@@ -97,8 +95,7 @@ class TestDispatcherHttp(base.BaseTestCase):
self.msg['resource_metadata'] = {'request': {'CADF_EVENT': {
'q1': 'v1', 'q2': 'v2'}, }, }
self.msg['message_signature'] = utils.compute_signature(
- self.msg,
- self.CONF.publisher.metering_secret,
+ self.msg, self.CONF.publisher.telemetry_secret,
)
with mock.patch.object(requests, 'post') as post:
@@ -114,8 +111,7 @@ class TestDispatcherHttp(base.BaseTestCase):
self.msg['resource_metadata'] = {'any': {'thing1': 'v1',
'thing2': 'v2', }, }
self.msg['message_signature'] = utils.compute_signature(
- self.msg,
- self.CONF.publisher.metering_secret,
+ self.msg, self.CONF.publisher.telemetry_secret,
)
with mock.patch.object(requests, 'post') as post:
diff --git a/ceilometer/tests/publisher/test_udp.py b/ceilometer/tests/publisher/test_udp.py
index aa764943..0f6ad059 100644
--- a/ceilometer/tests/publisher/test_udp.py
+++ b/ceilometer/tests/publisher/test_udp.py
@@ -110,7 +110,7 @@ class TestUDPPublisher(base.BaseTestCase):
def setUp(self):
super(TestUDPPublisher, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
- self.CONF.publisher.metering_secret = 'not-so-secret'
+ self.CONF.publisher.telemetry_secret = 'not-so-secret'
def test_published(self):
self.data_sent = []
diff --git a/ceilometer/tests/storage/test_storage_scenarios.py b/ceilometer/tests/storage/test_storage_scenarios.py
index 9637cfc2..1776f4d9 100644
--- a/ceilometer/tests/storage/test_storage_scenarios.py
+++ b/ceilometer/tests/storage/test_storage_scenarios.py
@@ -51,7 +51,7 @@ class DBTestBase(tests_db.TestBase):
resource_metadata=metadata, source=source
)
msg = utils.meter_message_from_counter(
- s, self.CONF.publisher.metering_secret
+ s, self.CONF.publisher.telemetry_secret
)
self.conn.record_metering_data(msg)
return msg
@@ -1409,8 +1409,7 @@ class StatisticsGroupByTest(DBTestBase,
source=test_sample['source'],
)
msg = utils.meter_message_from_counter(
- c,
- self.CONF.publisher.metering_secret,
+ c, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -2608,8 +2607,7 @@ class CounterDataTypeTest(DBTestBase,
source='test-1',
)
msg = utils.meter_message_from_counter(
- c,
- self.CONF.publisher.metering_secret,
+ c, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -2627,8 +2625,7 @@ class CounterDataTypeTest(DBTestBase,
source='test-1',
)
msg = utils.meter_message_from_counter(
- c,
- self.CONF.publisher.metering_secret,
+ c, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -2645,8 +2642,7 @@ class CounterDataTypeTest(DBTestBase,
source='test-1',
)
msg = utils.meter_message_from_counter(
- c,
- self.CONF.publisher.metering_secret,
+ c, self.CONF.publisher.telemetry_secret,
)
self.conn.record_metering_data(msg)
@@ -3492,7 +3488,7 @@ class BigIntegerTest(tests_db.TestBase,
timestamp=datetime.datetime.utcnow(),
resource_metadata=metadata)
msg = utils.meter_message_from_counter(
- s, self.CONF.publisher.metering_secret)
+ s, self.CONF.publisher.telemetry_secret)
self.conn.record_metering_data(msg)
diff --git a/ceilometer/tests/test_collector.py b/ceilometer/tests/test_collector.py
index b6ff66fb..15178bb4 100644
--- a/ceilometer/tests/test_collector.py
+++ b/ceilometer/tests/test_collector.py
@@ -47,7 +47,7 @@ class TestCollector(tests_base.BaseTestCase):
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF.import_opt("connection", "oslo.db.options", group="database")
self.CONF.set_override("connection", "log://", group='database')
- self.CONF.set_override('metering_secret', 'not-so-secret',
+ self.CONF.set_override('telemetry_secret', 'not-so-secret',
group='publisher')
self._setup_messaging()
@@ -230,25 +230,53 @@ class TestCollector(tests_base.BaseTestCase):
mylog.info.assert_called_once_with(
'metering data test for test_run_tasks: 1')
- @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start')
- @mock.patch.object(collector.CollectorService, 'start_udp')
- def test_collector_requeue(self, udp_start, rpc_start):
- self.CONF.set_override('requeue_sample_on_dispatcher_error', True,
- group='collector')
+ def _test_collector_requeue(self, listener):
self.srv.start()
with mock.patch.object(self.srv.dispatcher_manager, 'map_method',
side_effect=Exception('boom')):
- ret = self.srv.sample({}, 'pub_id', 'event', {}, {})
+ endp = getattr(self.srv, listener).dispatcher.endpoints[0]
+ ret = endp.sample({}, 'pub_id', 'event', {}, {})
self.assertEqual(oslo.messaging.NotificationResult.REQUEUE,
ret)
- @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start')
- @mock.patch.object(collector.CollectorService, 'start_udp')
- def test_collector_no_requeue(self, udp_start, rpc_start):
- self.CONF.set_override('requeue_sample_on_dispatcher_error', False,
+ @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
+ mock.Mock())
+ @mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
+ def test_collector_sample_requeue(self):
+ self.CONF.set_override('requeue_sample_on_dispatcher_error', True,
group='collector')
+ self._test_collector_requeue('sample_listener')
+
+ @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
+ mock.Mock())
+ @mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
+ def test_collector_event_requeue(self):
+ self.CONF.set_override('requeue_event_on_dispatcher_error', True,
+ group='collector')
+ self.CONF.set_override('store_events', True, group='notification')
+ self._test_collector_requeue('event_listener')
+
+ def _test_collector_no_requeue(self, listener):
self.srv.start()
with mock.patch.object(self.srv.dispatcher_manager, 'map_method',
side_effect=FakeException('boom')):
- self.assertRaises(FakeException, self.srv.sample, {}, 'pub_id',
+ endp = getattr(self.srv, listener).dispatcher.endpoints[0]
+ self.assertRaises(FakeException, endp.sample, {}, 'pub_id',
'event', {}, {})
+
+ @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
+ mock.Mock())
+ @mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
+ def test_collector_sample_no_requeue(self):
+ self.CONF.set_override('requeue_sample_on_dispatcher_error', False,
+ group='collector')
+ self._test_collector_no_requeue('sample_listener')
+
+ @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
+ mock.Mock())
+ @mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
+ def test_collector_event_no_requeue(self):
+ self.CONF.set_override('requeue_event_on_dispatcher_error', False,
+ group='collector')
+ self.CONF.set_override('store_events', True, group='notification')
+ self._test_collector_no_requeue('event_listener')
diff --git a/ceilometer/utils.py b/ceilometer/utils.py
index 8701eb96..237cae41 100644
--- a/ceilometer/utils.py
+++ b/ceilometer/utils.py
@@ -249,3 +249,12 @@ class HashRing(object):
return None
pos = self._get_position_on_ring(key)
return self._ring[self._sorted_keys[pos]]
+
+
+def kill_listeners(listeners):
+ # NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
+ # which stops new messages, and wait(), which processes remaining
+ # messages and closes connection
+ for listener in listeners:
+ listener.stop()
+ listener.wait()
diff --git a/doc/source/install/manual.rst b/doc/source/install/manual.rst
index f0c665d7..3ee5a138 100755
--- a/doc/source/install/manual.rst
+++ b/doc/source/install/manual.rst
@@ -217,9 +217,9 @@ Installing the notification agent
not yet been tested with ZeroMQ. We recommend using Rabbit or
qpid for now.
- 2. Set the ``metering_secret`` value.
+ 2. Set the ``telemetry_secret`` value.
- Set the ``metering_secret`` value to a large, random, value. Use
+ Set the ``telemetry_secret`` value to a large, random, value. Use
the same value in all ceilometer configuration files, on all
nodes, so that messages passing between the nodes can be
validated.
@@ -289,9 +289,9 @@ Installing the collector
not yet been tested with ZeroMQ. We recommend using Rabbit or
qpid for now.
- 2. Set the ``metering_secret`` value.
+ 2. Set the ``telemetry_secret`` value.
- Set the ``metering_secret`` value to a large, random, value. Use
+ Set the ``telemetry_secret`` value to a large, random, value. Use
the same value in all ceilometer configuration files, on all
nodes, so that messages passing between the nodes can be
validated.
@@ -372,9 +372,9 @@ Installing the Compute Agent
not yet been tested with ZeroMQ. We recommend using Rabbit or
qpid for now.
- 2. Set the ``metering_secret`` value.
+ 2. Set the ``telemetry_secret`` value.
- Set the ``metering_secret`` value to a large, random, value. Use
+ Set the ``telemetry_secret`` value to a large, random, value. Use
the same value in all ceilometer configuration files, on all
nodes, so that messages passing between the nodes can be
validated.
@@ -446,9 +446,9 @@ Installing the Central Agent
not yet been tested with ZeroMQ. We recommend using Rabbit or
qpid for now.
- 2. Set the ``metering_secret`` value.
+ 2. Set the ``telemetry_secret`` value.
- Set the ``metering_secret`` value to a large, random, value. Use
+ Set the ``telemetry_secret`` value to a large, random, value. Use
the same value in all ceilometer configuration files, on all
nodes, so that messages passing between the nodes can be
validated.
diff --git a/tools/make_test_data.py b/tools/make_test_data.py
index 347eba1c..55042fb5 100755
--- a/tools/make_test_data.py
+++ b/tools/make_test_data.py
@@ -81,8 +81,7 @@ def make_test_data(name, meter_type, unit, volume, random_min,
source=source,
)
data = utils.meter_message_from_counter(
- c,
- cfg.CONF.publisher.metering_secret)
+ c, cfg.CONF.publisher.telemetry_secret)
yield data
n += 1