summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ceilometer/notification.py129
-rw-r--r--ceilometer/pipeline/base.py77
-rw-r--r--ceilometer/pipeline/event.py58
-rw-r--r--ceilometer/pipeline/sample.py56
-rw-r--r--ceilometer/tests/unit/pipeline_base.py46
-rw-r--r--ceilometer/tests/unit/test_event_pipeline.py40
-rw-r--r--ceilometer/tests/unit/test_notification.py279
-rw-r--r--ceilometer/transformer/__init__.py4
-rw-r--r--ceilometer/transformer/accumulator.py2
-rw-r--r--ceilometer/transformer/arithmetic.py2
-rw-r--r--ceilometer/transformer/conversions.py2
-rw-r--r--devstack/plugin.sh1
-rw-r--r--doc/source/contributor/plugins.rst15
-rw-r--r--releasenotes/notes/remove-notification-workload-partitioning-2cef114fb2478e39.yaml4
14 files changed, 25 insertions, 690 deletions
diff --git a/ceilometer/notification.py b/ceilometer/notification.py
index cebd09db..af228f07 100644
--- a/ceilometer/notification.py
+++ b/ceilometer/notification.py
@@ -1,5 +1,5 @@
#
-# Copyright 2017 Red Hat, Inc.
+# Copyright 2017-2018 Red Hat, Inc.
# Copyright 2012-2013 eNovance <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -13,45 +13,25 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-import itertools
-import threading
import time
-import uuid
-from concurrent import futures
import cotyledon
-from futurist import periodics
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from stevedore import named
-from tooz import coordination
from ceilometer.i18n import _
from ceilometer import messaging
-from ceilometer import utils
LOG = log.getLogger(__name__)
OPTS = [
- cfg.IntOpt('pipeline_processing_queues',
- deprecated_for_removal=True,
- default=10,
- min=1,
- help='Number of queues to parallelize workload across. This '
- 'value should be larger than the number of active '
- 'notification agents for optimal results. WARNING: '
- 'Once set, lowering this value may result in lost data.'),
cfg.BoolOpt('ack_on_event_error',
default=True,
help='Acknowledge message when event persistence fails.'),
- cfg.BoolOpt('workload_partitioning',
- deprecated_for_removal=True,
- default=False,
- help='Enable workload partitioning, allowing multiple '
- 'notification agents to be run simultaneously.'),
cfg.MultiStrOpt('messaging_urls',
default=[],
secret=True,
@@ -68,10 +48,6 @@ OPTS = [
help='Number of notification messages to wait before '
'publishing them. Batching is advised when transformations are '
'applied in pipeline.'),
- cfg.IntOpt('batch_timeout',
- default=5,
- help='Number of seconds to wait before publishing samples '
- 'when batch_size is not reached (None means indefinitely)'),
cfg.IntOpt('workers',
default=1,
min=1,
@@ -114,25 +90,11 @@ class NotificationService(cotyledon.Service):
self.startup_delay = worker_id
self.conf = conf
- self.periodic = None
- self.shutdown = False
self.listeners = []
# NOTE(kbespalov): for the pipeline queues used a single amqp host
# hence only one listener is required
self.pipeline_listener = None
- if self.conf.notification.workload_partitioning:
- # XXX uuid4().bytes ought to work, but it requires ascii for now
- coordination_id = (coordination_id or
- str(uuid.uuid4()).encode('ascii'))
- self.partition_coordinator = coordination.get_coordinator(
- self.conf.coordination.backend_url, coordination_id)
- self.partition_set = list(range(
- self.conf.notification.pipeline_processing_queues))
- self.group_state = None
- else:
- self.partition_coordinator = None
-
def get_targets(self):
"""Return a sequence of oslo_messaging.Target
@@ -154,49 +116,22 @@ class NotificationService(cotyledon.Service):
time.sleep(self.startup_delay)
super(NotificationService, self).run()
- self.coord_lock = threading.Lock()
self.managers = [ext.obj for ext in named.NamedExtensionManager(
namespace='ceilometer.notification.pipeline',
names=self.conf.notification.pipelines, invoke_on_load=True,
on_missing_entrypoints_callback=self._log_missing_pipeline,
- invoke_args=(self.conf,
- self.conf.notification.workload_partitioning))]
+ invoke_args=(self.conf,))]
self.transport = messaging.get_transport(self.conf)
- if self.conf.notification.workload_partitioning:
- self.partition_coordinator.start(start_heart=True)
- else:
- # FIXME(sileht): endpoint uses the notification_topics option
- # and it should not because this is an oslo_messaging option
- # not a ceilometer. Until we have something to get the
- # notification_topics in another way, we must create a transport
- # to ensure the option has been registered by oslo_messaging.
- messaging.get_notifier(self.transport, '')
-
- self._configure_main_queue_listeners()
-
- if self.conf.notification.workload_partitioning:
- # join group after all manager set up is configured
- self.hashring = self.partition_coordinator.join_partitioned_group(
- self.NOTIFICATION_NAMESPACE)
-
- @periodics.periodic(spacing=self.conf.coordination.check_watchers,
- run_immediately=True)
- def run_watchers():
- self.partition_coordinator.run_watchers()
- if self.group_state != self.hashring.ring.nodes:
- self.group_state = self.hashring.ring.nodes.copy()
- self._refresh_agent()
+ # FIXME(sileht): endpoint uses the notification_topics option
+ # and it should not because this is an oslo_messaging option
+ # not a ceilometer. Until we have something to get the
+ # notification_topics in another way, we must create a transport
+ # to ensure the option has been registered by oslo_messaging.
+ messaging.get_notifier(self.transport, '')
- self.periodic = periodics.PeriodicWorker.create(
- [], executor_factory=lambda:
- futures.ThreadPoolExecutor(max_workers=10))
- self.periodic.add(run_watchers)
- utils.spawn_thread(self.periodic.start)
-
- def _configure_main_queue_listeners(self):
endpoints = []
for pipe_mgr in self.managers:
endpoints.extend(pipe_mgr.get_main_endpoints())
@@ -214,41 +149,6 @@ class NotificationService(cotyledon.Service):
)
self.listeners.append(listener)
- def _refresh_agent(self):
- with self.coord_lock:
- if self.shutdown:
- # NOTE(sileht): We are going to shutdown we everything will be
- # stopped, we should not restart them
- return
- self._configure_pipeline_listener()
-
- def _configure_pipeline_listener(self):
- partitioned = list(filter(
- self.hashring.belongs_to_self, self.partition_set))
-
- endpoints = []
- for pipe_mgr in self.managers:
- endpoints.extend(pipe_mgr.get_interim_endpoints())
-
- targets = []
- for mgr, hash_id in itertools.product(self.managers, partitioned):
- topic = '-'.join([mgr.NOTIFICATION_IPC, mgr.pm_type, str(hash_id)])
- LOG.debug('Listening to queue: %s', topic)
- targets.append(oslo_messaging.Target(topic=topic))
-
- if self.pipeline_listener:
- self.kill_listeners([self.pipeline_listener])
-
- self.pipeline_listener = messaging.get_batch_notification_listener(
- self.transport, targets, endpoints, allow_requeue=True,
- batch_size=self.conf.notification.batch_size,
- batch_timeout=self.conf.notification.batch_timeout)
- # NOTE(gordc): set single thread to process data sequentially
- # if batching enabled.
- batch = (1 if self.conf.notification.batch_size > 1
- else self.conf.max_parallel_requests)
- self.pipeline_listener.start(override_pool_size=batch)
-
@staticmethod
def kill_listeners(listeners):
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
@@ -259,15 +159,8 @@ class NotificationService(cotyledon.Service):
listener.wait()
def terminate(self):
- self.shutdown = True
- if self.periodic:
- self.periodic.stop()
- self.periodic.wait()
- if self.partition_coordinator:
- self.partition_coordinator.stop()
- with self.coord_lock:
- if self.pipeline_listener:
- self.kill_listeners([self.pipeline_listener])
- self.kill_listeners(self.listeners)
+ if self.pipeline_listener:
+ self.kill_listeners([self.pipeline_listener])
+ self.kill_listeners(self.listeners)
super(NotificationService, self).terminate()
diff --git a/ceilometer/pipeline/base.py b/ceilometer/pipeline/base.py
index 99a7b709..73114b1d 100644
--- a/ceilometer/pipeline/base.py
+++ b/ceilometer/pipeline/base.py
@@ -22,7 +22,6 @@ import oslo_messaging
import six
from ceilometer import agent
-from ceilometer import messaging
from ceilometer import publisher
OPTS = [
@@ -45,52 +44,6 @@ class PipelineException(agent.ConfigException):
super(PipelineException, self).__init__('Pipeline', message, cfg)
-class InterimPublishContext(object):
- """Publisher to hash/shard data to pipelines"""
-
- def __init__(self, conf, mgr):
- self.conf = conf
- self.mgr = mgr
- self.notifiers = self._get_notifiers(messaging.get_transport(conf))
-
- def _get_notifiers(self, transport):
- notifiers = []
- for x in range(self.conf.notification.pipeline_processing_queues):
- notifiers.append(oslo_messaging.Notifier(
- transport,
- driver=self.conf.publisher_notifier.telemetry_driver,
- topics=['-'.join(
- [self.mgr.NOTIFICATION_IPC, self.mgr.pm_type, str(x)])]))
- return notifiers
-
- @staticmethod
- def hash_grouping(datapoint, grouping_keys):
- # FIXME(gordc): this logic only supports a single grouping_key. we
- # need to change to support pipeline with multiple transformers and
- # different grouping_keys
- value = ''
- for key in grouping_keys or []:
- value += datapoint.get(key) if datapoint.get(key) else ''
- return hash(value)
-
- def __enter__(self):
- def p(data):
- data = [data] if not isinstance(data, list) else data
- for datapoint in data:
- for pipe in self.mgr.pipelines:
- if pipe.supported(datapoint):
- serialized_data = pipe.serializer(datapoint)
- key = (self.hash_grouping(serialized_data,
- pipe.get_grouping_key())
- % len(self.notifiers))
- self.notifiers[key].sample({}, event_type=pipe.name,
- payload=[serialized_data])
- return p
-
- def __exit__(self, exc_type, exc_value, traceback):
- pass
-
-
class PublishContext(object):
def __init__(self, pipelines):
self.pipelines = pipelines or []
@@ -239,24 +192,10 @@ class Pipeline(object):
def publish_data(self, data):
"""Publish data from pipeline."""
- @abc.abstractproperty
- def default_grouping_key(self):
- """Attribute to hash data on. Pass if no partitioning."""
-
@abc.abstractmethod
def supported(self, data):
"""Attribute to filter on. Pass if no partitioning."""
- @abc.abstractmethod
- def serializer(self, data):
- """Serialize data for interim transport. Pass if no partitioning."""
-
- def get_grouping_key(self):
- keys = []
- for transformer in self.sink.transformers:
- keys += transformer.grouping_keys
- return list(set(keys)) or self.default_grouping_key
-
class PublisherManager(object):
def __init__(self, conf, purpose):
@@ -281,7 +220,7 @@ class PipelineManager(agent.ConfigManagerBase):
NOTIFICATION_IPC = 'ceilometer_ipc'
- def __init__(self, conf, cfg_file, transformer_manager, partition):
+ def __init__(self, conf, cfg_file, transformer_manager):
"""Setup the pipelines according to config.
The configuration is supported as follows:
@@ -381,7 +320,6 @@ class PipelineManager(agent.ConfigManagerBase):
unique_names.add(pipe.name)
self.pipelines.append(pipe)
unique_names.clear()
- self.partition = partition
@abc.abstractproperty
def pm_type(self):
@@ -403,23 +341,10 @@ class PipelineManager(agent.ConfigManagerBase):
"""Build publisher for pipeline publishing."""
return PublishContext(self.pipelines)
- def interim_publisher(self):
- """Build publishing context for IPC."""
- return InterimPublishContext(self.conf, self)
-
- def get_main_publisher(self):
- """Return the publishing context to use"""
- return (self.interim_publisher() if self.partition else
- self.publisher())
-
def get_main_endpoints(self):
"""Return endpoints for main queue."""
pass
- def get_interim_endpoints(self):
- """Return endpoints for interim pipeline queues."""
- pass
-
class NotificationEndpoint(object):
"""Base Endpoint for plugins that support the notification API."""
diff --git a/ceilometer/pipeline/event.py b/ceilometer/pipeline/event.py
index 1243d706..4b3f0b64 100644
--- a/ceilometer/pipeline/event.py
+++ b/ceilometer/pipeline/event.py
@@ -11,18 +11,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-from itertools import chain
-
from oslo_log import log
import oslo_messaging
-from oslo_utils import timeutils
from stevedore import extension
from ceilometer import agent
from ceilometer.event import converter
-from ceilometer.event import models
from ceilometer.pipeline import base
-from ceilometer.publisher import utils as publisher_utils
LOG = log.getLogger(__name__)
@@ -67,39 +62,6 @@ class EventEndpoint(base.MainNotificationEndpoint):
return oslo_messaging.NotificationResult.HANDLED
-class InterimEventEndpoint(base.NotificationEndpoint):
- def __init__(self, conf, publisher, pipe_name):
- self.event_types = [pipe_name]
- super(InterimEventEndpoint, self).__init__(conf, publisher)
-
- def sample(self, notifications):
- return self.process_notifications('sample', notifications)
-
- def process_notifications(self, priority, notifications):
- events = chain.from_iterable(m["payload"] for m in notifications)
- events = [
- models.Event(
- message_id=ev['message_id'],
- event_type=ev['event_type'],
- generated=timeutils.normalize_time(
- timeutils.parse_isotime(ev['generated'])),
- traits=[models.Trait(name, dtype,
- models.Trait.convert_value(dtype, value))
- for name, dtype, value in ev['traits']],
- raw=ev.get('raw', {}))
- for ev in events if publisher_utils.verify_signature(
- ev, self.conf.publisher.telemetry_secret)
- ]
- try:
- with self.publisher as p:
- p(events)
- except Exception:
- if not self.conf.notification.ack_on_event_error:
- return oslo_messaging.NotificationResult.REQUEUE
- raise
- return oslo_messaging.NotificationResult.HANDLED
-
-
class EventSource(base.PipelineSource):
"""Represents a source of events.
@@ -140,8 +102,6 @@ class EventSink(base.Sink):
class EventPipeline(base.Pipeline):
"""Represents a pipeline for Events."""
- default_grouping_key = ['event_type']
-
def __str__(self):
# NOTE(gordc): prepend a namespace so we ensure event and sample
# pipelines do not have the same name.
@@ -153,10 +113,6 @@ class EventPipeline(base.Pipeline):
supported = [e for e in events if self.supported(e)]
self.sink.publish_events(supported)
- def serializer(self, event):
- return publisher_utils.message_from_event(
- event, self.conf.publisher.telemetry_secret)
-
def supported(self, event):
return self.source.support_event(event.event_type)
@@ -168,17 +124,9 @@ class EventPipelineManager(base.PipelineManager):
pm_source = EventSource
pm_sink = EventSink
- def __init__(self, conf, partition=False):
+ def __init__(self, conf):
super(EventPipelineManager, self).__init__(
- conf, conf.event_pipeline_cfg_file, {}, partition)
+ conf, conf.event_pipeline_cfg_file, {})
def get_main_endpoints(self):
- return [EventEndpoint(self.conf, self.get_main_publisher())]
-
- def get_interim_endpoints(self):
- # FIXME(gordc): change this so we shard data rather than per
- # pipeline. this will allow us to use self.publisher and less
- # queues.
- return [InterimEventEndpoint(
- self.conf, base.PublishContext([pipe]), pipe.name)
- for pipe in self.pipelines]
+ return [EventEndpoint(self.conf, self.publisher())]
diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py
index 3e3db8fa..f036f1d2 100644
--- a/ceilometer/pipeline/sample.py
+++ b/ceilometer/pipeline/sample.py
@@ -10,15 +10,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-from itertools import chain
-
from oslo_log import log
from stevedore import extension
from ceilometer import agent
from ceilometer.pipeline import base
-from ceilometer.publisher import utils as publisher_utils
-from ceilometer import sample as sample_util
LOG = log.getLogger(__name__)
@@ -52,37 +48,6 @@ class SampleEndpoint(base.MainNotificationEndpoint):
pass
-class InterimSampleEndpoint(base.NotificationEndpoint):
- def __init__(self, conf, publisher, pipe_name):
- self.event_types = [pipe_name]
- super(InterimSampleEndpoint, self).__init__(conf, publisher)
-
- def sample(self, notifications):
- return self.process_notifications('sample', notifications)
-
- def process_notifications(self, priority, notifications):
- samples = chain.from_iterable(m["payload"] for m in notifications)
- samples = [
- sample_util.Sample(name=s['counter_name'],
- type=s['counter_type'],
- unit=s['counter_unit'],
- volume=s['counter_volume'],
- user_id=s['user_id'],
- project_id=s['project_id'],
- resource_id=s['resource_id'],
- timestamp=s['timestamp'],
- resource_metadata=s['resource_metadata'],
- source=s.get('source'),
- # NOTE(sileht): May come from an older node,
- # Put None in this case.
- monotonic_time=s.get('monotonic_time'))
- for s in samples if publisher_utils.verify_signature(
- s, self.conf.publisher.telemetry_secret)
- ]
- with self.publisher as p:
- p(samples)
-
-
class SampleSource(base.PipelineSource):
"""Represents a source of samples.
@@ -181,8 +146,6 @@ class SampleSink(base.Sink):
class SamplePipeline(base.Pipeline):
"""Represents a pipeline for Samples."""
- default_grouping_key = ['resource_id']
-
def _validate_volume(self, s):
volume = s.volume
if volume is None:
@@ -219,10 +182,6 @@ class SamplePipeline(base.Pipeline):
and self._validate_volume(s)]
self.sink.publish_samples(supported)
- def serializer(self, sample):
- return publisher_utils.meter_message_from_counter(
- sample, self.conf.publisher.telemetry_secret)
-
def supported(self, sample):
return self.source.support_meter(sample.name)
@@ -234,10 +193,9 @@ class SamplePipelineManager(base.PipelineManager):
pm_source = SampleSource
pm_sink = SampleSink
- def __init__(self, conf, partition=False):
+ def __init__(self, conf):
super(SamplePipelineManager, self).__init__(
- conf, conf.pipeline_cfg_file, self.get_transform_manager(),
- partition)
+ conf, conf.pipeline_cfg_file, self.get_transform_manager())
@staticmethod
def get_transform_manager():
@@ -247,13 +205,5 @@ class SamplePipelineManager(base.PipelineManager):
exts = extension.ExtensionManager(
namespace='ceilometer.sample.endpoint',
invoke_on_load=True,
- invoke_args=(self.conf, self.get_main_publisher()))
+ invoke_args=(self.conf, self.publisher()))
return [ext.obj for ext in exts]
-
- def get_interim_endpoints(self):
- # FIXME(gordc): change this so we shard data rather than per
- # pipeline. this will allow us to use self.publisher and less
- # queues.
- return [InterimSampleEndpoint(
- self.conf, base.PublishContext([pipe]), pipe.name)
- for pipe in self.pipelines]
diff --git a/ceilometer/tests/unit/pipeline_base.py b/ceilometer/tests/unit/pipeline_base.py
index 345e6930..10b212be 100644
--- a/ceilometer/tests/unit/pipeline_base.py
+++ b/ceilometer/tests/unit/pipeline_base.py
@@ -75,7 +75,6 @@ class BasePipelineTestCase(base.BaseTestCase):
class TransformerClass(transformer.TransformerBase):
samples = []
- grouping_keys = ['counter_name']
def __init__(self, append_name='_update'):
self.__class__.samples = []
@@ -102,7 +101,6 @@ class BasePipelineTestCase(base.BaseTestCase):
class TransformerClassDrop(transformer.TransformerBase):
samples = []
- grouping_keys = ['resource_id']
def __init__(self):
self.__class__.samples = []
@@ -111,7 +109,6 @@ class BasePipelineTestCase(base.BaseTestCase):
self.__class__.samples.append(counter)
class TransformerClassException(object):
- grouping_keys = ['resource_id']
@staticmethod
def handle_sample(counter):
@@ -2171,46 +2168,3 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager()
-
- def test_get_pipeline_grouping_key(self):
- transformer_cfg = [
- {
- 'name': 'update',
- 'parameters': {}
- },
- {
- 'name': 'unit_conversion',
- 'parameters': {
- 'source': {},
- 'target': {'name': 'cpu_mins',
- 'unit': 'min',
- 'scale': 'volume'},
- }
- },
- {
- 'name': 'update',
- 'parameters': {}
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- self.assertEqual(set(['resource_id', 'counter_name']),
- set(pipeline_manager.pipelines[0].get_grouping_key()))
-
- def test_get_pipeline_duplicate_grouping_key(self):
- transformer_cfg = [
- {
- 'name': 'update',
- 'parameters': {}
- },
- {
- 'name': 'update',
- 'parameters': {}
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- self.assertEqual(['counter_name'],
- pipeline_manager.pipelines[0].get_grouping_key())
diff --git a/ceilometer/tests/unit/test_event_pipeline.py b/ceilometer/tests/unit/test_event_pipeline.py
index 3c5dd548..edbfc38f 100644
--- a/ceilometer/tests/unit/test_event_pipeline.py
+++ b/ceilometer/tests/unit/test_event_pipeline.py
@@ -16,15 +16,12 @@ import traceback
import uuid
import fixtures
-import mock
-import oslo_messaging
from ceilometer.event import models
from ceilometer.pipeline import base as pipeline
from ceilometer.pipeline import event
from ceilometer import publisher
from ceilometer.publisher import test as test_publisher
-from ceilometer.publisher import utils
from ceilometer import service
from ceilometer.tests import base
@@ -357,40 +354,3 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager()
-
- def test_event_pipeline_endpoint_requeue_on_failure(self):
- self.CONF.set_override("ack_on_event_error", False,
- group="notification")
- self.CONF.set_override("telemetry_secret", "not-so-secret",
- group="publisher")
- test_data = {
- 'message_id': uuid.uuid4(),
- 'event_type': 'a',
- 'generated': '2013-08-08 21:06:37.803826',
- 'traits': [
- {'name': 't_text',
- 'value': 1,
- 'dtype': 'text_trait'
- }
- ],
- 'raw': {'status': 'started'}
- }
- message_sign = utils.compute_signature(test_data, 'not-so-secret')
- test_data['message_signature'] = message_sign
-
- fake_publisher = mock.Mock()
- self.useFixture(fixtures.MockPatch(
- 'ceilometer.publisher.test.TestPublisher',
- return_value=fake_publisher))
-
- self._build_and_set_new_pipeline()
- pipeline_manager = event.EventPipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
- event_pipeline_endpoint = event.InterimEventEndpoint(
- self.CONF, pipeline.PublishContext([pipe]), pipe.name)
-
- fake_publisher.publish_events.side_effect = Exception
- ret = event_pipeline_endpoint.sample([
- {'ctxt': {}, 'publisher_id': 'compute.vagrant-precise',
- 'event_type': 'a', 'payload': [test_data], 'metadata': {}}])
- self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)
diff --git a/ceilometer/tests/unit/test_notification.py b/ceilometer/tests/unit/test_notification.py
index cfe3bcd4..e7d5fca7 100644
--- a/ceilometer/tests/unit/test_notification.py
+++ b/ceilometer/tests/unit/test_notification.py
@@ -17,7 +17,6 @@
import time
import mock
-import oslo_messaging
from oslo_utils import fileutils
import six
import yaml
@@ -84,14 +83,6 @@ class BaseNotificationTest(tests_base.BaseTestCase):
def run_service(self, srv):
srv.run()
self.addCleanup(srv.terminate)
- if srv.conf.notification.workload_partitioning:
- start = time.time()
- while time.time() - start < 10:
- if srv.group_state and srv.pipeline_listener:
- break # ensure pipeline is set if HA
- time.sleep(0.1)
- else:
- self.fail('Did not start pipeline queues')
class TestNotification(BaseNotificationTest):
@@ -242,273 +233,3 @@ class TestRealNotification(BaseRealNotification):
if len(self.publisher.events) >= self.expected_events:
break
self.assertEqual(self.expected_events, len(self.publisher.events))
-
-
-class TestRealNotificationHA(BaseRealNotification):
-
- def setUp(self):
- super(TestRealNotificationHA, self).setUp()
- self.CONF.set_override('workload_partitioning', True,
- group='notification')
- self.CONF.set_override("backend_url", "zake://", group="coordination")
- self.srv = notification.NotificationService(0, self.CONF)
-
- @mock.patch('ceilometer.publisher.test.TestPublisher')
- def test_notification_service(self, fake_publisher_cls):
- fake_publisher_cls.return_value = self.publisher
- self._check_notification_service()
-
- @mock.patch.object(oslo_messaging.MessageHandlingServer, 'stop')
- @mock.patch.object(oslo_messaging.MessageHandlingServer, 'wait')
- @mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')
- def test_notification_threads(self, m_listener, m_wait, m_stop):
- self.CONF.set_override('batch_size', 1, group='notification')
- self.srv.run()
- m_listener.assert_called_with(
- override_pool_size=self.CONF.max_parallel_requests)
- m_listener.reset_mock()
- self.CONF.set_override('batch_size', 2, group='notification')
- self.srv._refresh_agent()
- m_listener.assert_called_with(override_pool_size=1)
-
- @mock.patch('oslo_messaging.get_batch_notification_listener')
- def test_reset_listener_on_refresh(self, mock_listener):
- mock_listener.side_effect = [
- mock.MagicMock(), # main listener
- mock.MagicMock(), # pipeline listener
- mock.MagicMock(), # refresh pipeline listener
- ]
- self.run_service(self.srv)
- listener = self.srv.pipeline_listener
- self.srv._refresh_agent()
- self.assertIsNot(listener, self.srv.pipeline_listener)
-
- def test_hashring_targets(self):
- maybe = {"maybe": 0}
-
- def _once_over_five(item):
- maybe["maybe"] += 1
- return maybe["maybe"] % 5 == 0
-
- hashring = mock.MagicMock()
- hashring.belongs_to_self = _once_over_five
- self.srv.partition_coordinator = pc = mock.MagicMock()
- pc.join_partitioned_group.return_value = hashring
- self.run_service(self.srv)
- topics = [target.topic for target in
- self.srv.pipeline_listener.targets]
- self.assertEqual(4, len(topics))
- self.assertEqual(
- {'ceilometer_ipc-sample-4', 'ceilometer_ipc-sample-9',
- 'ceilometer_ipc-event-4', 'ceilometer_ipc-event-9'},
- set(topics))
-
- @mock.patch('oslo_messaging.get_batch_notification_listener')
- def test_notify_to_relevant_endpoint(self, mock_listener):
- self.run_service(self.srv)
-
- targets = mock_listener.call_args[0][1]
- self.assertIsNotEmpty(targets)
-
- pipe_list = []
- for mgr in self.srv.managers:
- for pipe in mgr.pipelines:
- pipe_list.append(pipe.name)
-
- for pipe in pipe_list:
- for endpoint in mock_listener.call_args[0][2]:
- self.assertTrue(hasattr(endpoint, 'filter_rule'))
- if endpoint.filter_rule.match(None, None, pipe, None, None):
- break
- else:
- self.fail('%s not handled by any endpoint' % pipe)
-
- @mock.patch('oslo_messaging.Notifier.sample')
- def test_broadcast_to_relevant_pipes_only(self, mock_notifier):
- self.run_service(self.srv)
- for endpoint in self.srv.listeners[0].dispatcher.endpoints:
- if (hasattr(endpoint, 'filter_rule') and
- not endpoint.filter_rule.match(None, None, 'nonmatching.end',
- None, None)):
- continue
- endpoint.info([{
- 'ctxt': TEST_NOTICE_CTXT,
- 'publisher_id': 'compute.vagrant-precise',
- 'event_type': 'nonmatching.end',
- 'payload': TEST_NOTICE_PAYLOAD,
- 'metadata': TEST_NOTICE_METADATA}])
- self.assertFalse(mock_notifier.called)
- for endpoint in self.srv.listeners[0].dispatcher.endpoints:
- if (hasattr(endpoint, 'filter_rule') and
- not endpoint.filter_rule.match(None, None,
- 'compute.instance.create.end',
- None, None)):
- continue
- endpoint.info([{
- 'ctxt': TEST_NOTICE_CTXT,
- 'publisher_id': 'compute.vagrant-precise',
- 'event_type': 'compute.instance.create.end',
- 'payload': TEST_NOTICE_PAYLOAD,
- 'metadata': TEST_NOTICE_METADATA}])
-
- self.assertTrue(mock_notifier.called)
- self.assertEqual(3, mock_notifier.call_count)
- self.assertEqual(1, len([i for i in mock_notifier.call_args_list
- if 'event_type' in i[1]['payload'][0]]))
- self.assertEqual(2, len([i for i in mock_notifier.call_args_list
- if 'counter_name' in i[1]['payload'][0]]))
-
-
-class TestRealNotificationMultipleAgents(BaseNotificationTest):
- def setup_pipeline(self, transformers):
- pipeline = yaml.dump({
- 'sources': [{
- 'name': 'test_pipeline',
- 'interval': 5,
- 'meters': ['vcpus', 'memory'],
- 'sinks': ['test_sink']
- }],
- 'sinks': [{
- 'name': 'test_sink',
- 'transformers': transformers,
- 'publishers': ['test://']
- }]
- })
- if six.PY3:
- pipeline = pipeline.encode('utf-8')
-
- pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline,
- prefix="pipeline",
- suffix="yaml")
- return pipeline_cfg_file
-
- def setup_event_pipeline(self):
- pipeline = yaml.dump({
- 'sources': [],
- 'sinks': []
- })
- if six.PY3:
- pipeline = pipeline.encode('utf-8')
-
- pipeline_cfg_file = fileutils.write_to_tempfile(
- content=pipeline, prefix="event_pipeline", suffix="yaml")
- return pipeline_cfg_file
-
- def setUp(self):
- super(TestRealNotificationMultipleAgents, self).setUp()
- self.CONF = service.prepare_service([], [])
- self.setup_messaging(self.CONF, 'nova')
-
- pipeline_cfg_file = self.setup_pipeline(['instance', 'memory'])
- event_pipeline_cfg_file = self.setup_event_pipeline()
- self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
- self.CONF.set_override("event_pipeline_cfg_file",
- event_pipeline_cfg_file)
- self.CONF.set_override("backend_url", "zake://", group="coordination")
- self.CONF.set_override('workload_partitioning', True,
- group='notification')
- self.CONF.set_override('pipeline_processing_queues', 2,
- group='notification')
- self.CONF.set_override('check_watchers', 1, group='coordination')
- self.publisher = test_publisher.TestPublisher(self.CONF, "")
- self.publisher2 = test_publisher.TestPublisher(self.CONF, "")
-
- def _check_notifications(self, fake_publisher_cls):
- fake_publisher_cls.side_effect = [self.publisher, self.publisher2]
-
- maybe = {"srv": 0, "srv2": -1}
-
- def _sometimes_srv(item):
- maybe["srv"] += 1
- return (maybe["srv"] % 2) == 0
-
- self.srv = notification.NotificationService(0, self.CONF)
- self.srv.partition_coordinator = pc = mock.MagicMock()
- hashring_srv1 = mock.MagicMock()
- hashring_srv1.belongs_to_self = _sometimes_srv
- hashring_srv1.ring.nodes = {'id1': mock.Mock()}
- pc.join_partitioned_group.return_value = hashring_srv1
- self.run_service(self.srv)
-
- def _sometimes_srv2(item):
- maybe["srv2"] += 1
- return (maybe["srv2"] % 2) == 0
-
- self.srv2 = notification.NotificationService(0, self.CONF)
- self.srv2.partition_coordinator = pc = mock.MagicMock()
- hashring = mock.MagicMock()
- hashring.belongs_to_self = _sometimes_srv2
- hashring.ring.nodes = {'id1': mock.Mock(), 'id2': mock.Mock()}
- self.srv.hashring.ring.nodes = hashring.ring.nodes.copy()
- pc.join_partitioned_group.return_value = hashring
- self.run_service(self.srv2)
-
- notifier = messaging.get_notifier(self.transport,
- "compute.vagrant-precise")
- payload1 = TEST_NOTICE_PAYLOAD.copy()
- payload1['instance_id'] = '0'
- notifier.info({}, 'compute.instance.create.end', payload1)
- payload2 = TEST_NOTICE_PAYLOAD.copy()
- payload2['instance_id'] = '1'
- notifier.info({}, 'compute.instance.create.end', payload2)
- self.expected_samples = 4
- with mock.patch('six.moves.builtins.hash', lambda x: int(x)):
- start = time.time()
- while time.time() - start < 10:
- if (len(self.publisher.samples + self.publisher2.samples) >=
- self.expected_samples and
- len(self.srv.group_state) == 2):
- break
- time.sleep(0.1)
-
- self.assertEqual(2, len(self.publisher.samples))
- self.assertEqual(2, len(self.publisher2.samples))
- self.assertEqual(1, len(set(
- s.resource_id for s in self.publisher.samples)))
- self.assertEqual(1, len(set(
- s.resource_id for s in self.publisher2.samples)))
- self.assertEqual(2, len(self.srv.group_state))
-
- @mock.patch('ceilometer.publisher.test.TestPublisher')
- def test_multiple_agents_no_transform(self, fake_publisher_cls):
- pipeline_cfg_file = self.setup_pipeline([])
- self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
- self._check_notifications(fake_publisher_cls)
-
- @mock.patch('ceilometer.publisher.test.TestPublisher')
- def test_multiple_agents_transform(self, fake_publisher_cls):
- pipeline_cfg_file = self.setup_pipeline(
- [{
- 'name': 'unit_conversion',
- 'parameters': {
- 'source': {},
- 'target': {'name': 'cpu_mins',
- 'unit': 'min',
- 'scale': 'volume'},
- }
- }])
- self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
- self._check_notifications(fake_publisher_cls)
-
- @mock.patch('ceilometer.publisher.test.TestPublisher')
- def test_multiple_agents_multiple_transform(self, fake_publisher_cls):
- pipeline_cfg_file = self.setup_pipeline(
- [{
- 'name': 'unit_conversion',
- 'parameters': {
- 'source': {},
- 'target': {'name': 'cpu_mins',
- 'unit': 'min',
- 'scale': 'volume'},
- }
- }, {
- 'name': 'unit_conversion',
- 'parameters': {
- 'source': {},
- 'target': {'name': 'cpu_mins',
- 'unit': 'min',
- 'scale': 'volume'},
- }
- }])
- self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
- self._check_notifications(fake_publisher_cls)
diff --git a/ceilometer/transformer/__init__.py b/ceilometer/transformer/__init__.py
index 48d78b4d..3afffee6 100644
--- a/ceilometer/transformer/__init__.py
+++ b/ceilometer/transformer/__init__.py
@@ -42,10 +42,6 @@ class TransformerBase(object):
:param sample: A sample.
"""
- @abc.abstractproperty
- def grouping_keys(self):
- """Keys used to group transformer."""
-
@staticmethod
def flush():
"""Flush samples cached previously."""
diff --git a/ceilometer/transformer/accumulator.py b/ceilometer/transformer/accumulator.py
index 1e14497c..db750076 100644
--- a/ceilometer/transformer/accumulator.py
+++ b/ceilometer/transformer/accumulator.py
@@ -22,8 +22,6 @@ class TransformerAccumulator(transformer.TransformerBase):
And then flushes them out into the wild.
"""
- grouping_keys = ['resource_id']
-
def __init__(self, size=1, **kwargs):
if size >= 1:
self.samples = []
diff --git a/ceilometer/transformer/arithmetic.py b/ceilometer/transformer/arithmetic.py
index 9d688ccd..6039d22a 100644
--- a/ceilometer/transformer/arithmetic.py
+++ b/ceilometer/transformer/arithmetic.py
@@ -36,8 +36,6 @@ class ArithmeticTransformer(transformer.TransformerBase):
over one or more meters and/or their metadata.
"""
- grouping_keys = ['resource_id']
-
meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)')
def __init__(self, target=None, **kwargs):
diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py
index 4614528b..5c3b809f 100644
--- a/ceilometer/transformer/conversions.py
+++ b/ceilometer/transformer/conversions.py
@@ -30,8 +30,6 @@ LOG = log.getLogger(__name__)
class BaseConversionTransformer(transformer.TransformerBase):
"""Transformer to derive conversion."""
- grouping_keys = ['resource_id']
-
def __init__(self, source=None, target=None, **kwargs):
"""Initialize transformer with configured parameters.
diff --git a/devstack/plugin.sh b/devstack/plugin.sh
index 02101d9f..9031b258 100644
--- a/devstack/plugin.sh
+++ b/devstack/plugin.sh
@@ -262,7 +262,6 @@ function configure_ceilometer {
if [[ -n "$CEILOMETER_COORDINATION_URL" ]]; then
iniset $CEILOMETER_CONF coordination backend_url $CEILOMETER_COORDINATION_URL
- iniset $CEILOMETER_CONF notification workload_partitioning True
iniset $CEILOMETER_CONF notification workers $API_WORKERS
fi
diff --git a/doc/source/contributor/plugins.rst b/doc/source/contributor/plugins.rst
index 6a7c50a9..9b198077 100644
--- a/doc/source/contributor/plugins.rst
+++ b/doc/source/contributor/plugins.rst
@@ -94,18 +94,9 @@ Additionally, it must set ``get_main_endpoints`` which provides endpoints to be
added to the main queue listener in the notification agent. This main queue
endpoint inherits :class:`ceilometer.pipeline.base.MainNotificationEndpoint`
and defines which notification priorities to listen, normalises the data,
-and redirects the data for pipeline processing or requeuing depending on
-`workload_partitioning` configuration.
-
-If a pipeline is configured to support `workload_partitioning`, data from the
-main queue endpoints are shared and requeued in internal queues. The
-notification agent configures a second notification consumer to handle these
-internal queues and pushes data to endpoints defined by
-``get_interim_endpoints`` in the pipeline manager. These interim endpoints
-define how to handle the shared, normalised data models for pipeline
-processing
-
-Both main queue and interim queue notification endpoints should implement:
+and redirects the data for pipeline processing.
+
+Notification endpoints should implement:
``event_types``
A sequence of strings defining the event types the endpoint should handle
diff --git a/releasenotes/notes/remove-notification-workload-partitioning-2cef114fb2478e39.yaml b/releasenotes/notes/remove-notification-workload-partitioning-2cef114fb2478e39.yaml
new file mode 100644
index 00000000..d72f1f43
--- /dev/null
+++ b/releasenotes/notes/remove-notification-workload-partitioning-2cef114fb2478e39.yaml
@@ -0,0 +1,4 @@
+---
+upgrade:
+ - |
+ The deprecated workload partitioning for notification agent has been removed.