summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgord chung <gord@live.ca>2016-05-31 23:19:15 +0000
committergord chung <gord@live.ca>2016-09-08 13:21:02 +0000
commit14827310bf6ce7f55d93e319c444a49514d5c6ef (patch)
treea2d38dd94c5effdb0efa5a7745ea088d72969d5f
parentba55669da58873b610a5dfab15bd476c434770ad (diff)
downloadceilometer-14827310bf6ce7f55d93e319c444a49514d5c6ef.tar.gz
refactor service to be less pipeline dependent
first step to make pluggable notification service. this moves a bit of the pipeline code from service to the pipeline itself. Change-Id: Ibcd803c8ffd42c59d1f2e2b53234ae42dc588c16
-rw-r--r--ceilometer/pipeline.py138
-rw-r--r--ceilometer/service_base.py83
-rw-r--r--ceilometer/tests/functional/test_notification.py8
-rw-r--r--ceilometer/tests/pipeline_base.py215
-rw-r--r--ceilometer/tests/unit/test_decoupled_pipeline.py18
5 files changed, 204 insertions, 258 deletions
diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py
index e6954efd..e174608a 100644
--- a/ceilometer/pipeline.py
+++ b/ceilometer/pipeline.py
@@ -619,7 +619,60 @@ EVENT_TYPE = {'pipeline': EventPipeline,
'sink': EventSink}
-class PipelineManager(object):
+class ConfigManagerBase(object):
+ """Base class for managing configuration file refresh"""
+
+ def __init__(self):
+ self.cfg_loc = None
+
+ def load_config(self, cfg_info):
+ """Load a configuration file and set its refresh values."""
+ if isinstance(cfg_info, dict):
+ conf = cfg_info
+ else:
+ if not os.path.exists(cfg_info):
+ cfg_info = cfg.CONF.find_file(cfg_info)
+ with open(cfg_info) as fap:
+ data = fap.read()
+
+ conf = yaml.safe_load(data)
+ self.cfg_loc = cfg_info
+ self.cfg_mtime = self.get_cfg_mtime()
+ self.cfg_hash = self.get_cfg_hash()
+ LOG.info("Config file: %s", conf)
+ return conf
+
+ def get_cfg_mtime(self):
+ """Return modification time of cfg file"""
+ return os.path.getmtime(self.cfg_loc) if self.cfg_loc else None
+
+ def get_cfg_hash(self):
+ """Return hash of configuration file"""
+ if not self.cfg_loc:
+ return None
+
+ with open(self.cfg_loc) as fap:
+ data = fap.read()
+ if six.PY3:
+ data = data.encode('utf-8')
+
+ file_hash = hashlib.md5(data).hexdigest()
+ return file_hash
+
+ def cfg_changed(self):
+ """Returns hash of changed cfg else False."""
+ mtime = self.get_cfg_mtime()
+ if mtime > self.cfg_mtime:
+ LOG.info(_LI('Configuration file has been updated.'))
+ self.cfg_mtime = mtime
+ _hash = self.get_cfg_hash()
+ if _hash != self.cfg_hash:
+ LOG.info(_LI("Detected change in configuration."))
+ return _hash
+ return False
+
+
+class PipelineManager(ConfigManagerBase):
"""Pipeline Manager
Pipeline manager sets up pipelines according to config file
@@ -628,7 +681,7 @@ class PipelineManager(object):
"""
- def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE):
+ def __init__(self, cfg_info, transformer_manager, p_type=SAMPLE_TYPE):
"""Setup the pipelines according to config.
The configuration is supported as follows:
@@ -695,6 +748,8 @@ class PipelineManager(object):
Publisher's name is plugin name in setup.cfg
"""
+ super(PipelineManager, self).__init__()
+ cfg = self.load_config(cfg_info)
self.pipelines = []
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
@@ -746,17 +801,19 @@ class PipelineManager(object):
return PublishContext(self.pipelines)
-class PollingManager(object):
+class PollingManager(ConfigManagerBase):
"""Polling Manager
Polling manager sets up polling according to config file.
"""
- def __init__(self, cfg):
+ def __init__(self, cfg_info):
"""Setup the polling according to config.
The configuration is the sources half of the Pipeline Config.
"""
+ super(PollingManager, self).__init__()
+ cfg = self.load_config(cfg_info)
self.sources = []
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
@@ -775,85 +832,26 @@ class PollingManager(object):
unique_names.clear()
-def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
- if not os.path.exists(cfg_file):
- cfg_file = cfg.CONF.find_file(cfg_file)
-
- LOG.debug("Pipeline config file: %s", cfg_file)
-
- with open(cfg_file) as fap:
- data = fap.read()
-
- pipeline_cfg = yaml.safe_load(data)
- LOG.info(_LI("Pipeline config: %s"), pipeline_cfg)
-
- return PipelineManager(pipeline_cfg,
- transformer_manager or
- extension.ExtensionManager(
- 'ceilometer.transformer',
- ), p_type)
-
-
-def _setup_polling_manager(cfg_file):
- if not os.path.exists(cfg_file):
- cfg_file = cfg.CONF.find_file(cfg_file)
-
- LOG.debug("Polling config file: %s", cfg_file)
-
- with open(cfg_file) as fap:
- data = fap.read()
-
- pipeline_cfg = yaml.safe_load(data)
- LOG.info(_LI("Pipeline config: %s"), pipeline_cfg)
-
- return PollingManager(pipeline_cfg)
-
-
def setup_event_pipeline(transformer_manager=None):
"""Setup event pipeline manager according to yaml config file."""
+ default = extension.ExtensionManager('ceilometer.transformer')
cfg_file = cfg.CONF.event_pipeline_cfg_file
- return _setup_pipeline_manager(cfg_file, transformer_manager, EVENT_TYPE)
+ return PipelineManager(cfg_file, transformer_manager or default,
+ EVENT_TYPE)
def setup_pipeline(transformer_manager=None):
"""Setup pipeline manager according to yaml config file."""
+ default = extension.ExtensionManager('ceilometer.transformer')
cfg_file = cfg.CONF.pipeline_cfg_file
- return _setup_pipeline_manager(cfg_file, transformer_manager)
-
-
-def _get_pipeline_cfg_file(p_type=SAMPLE_TYPE):
- if p_type == EVENT_TYPE:
- cfg_file = cfg.CONF.event_pipeline_cfg_file
- else:
- cfg_file = cfg.CONF.pipeline_cfg_file
-
- if not os.path.exists(cfg_file):
- cfg_file = cfg.CONF.find_file(cfg_file)
-
- return cfg_file
-
-
-def get_pipeline_mtime(p_type=SAMPLE_TYPE):
- cfg_file = _get_pipeline_cfg_file(p_type)
- return os.path.getmtime(cfg_file)
-
-
-def get_pipeline_hash(p_type=SAMPLE_TYPE):
-
- cfg_file = _get_pipeline_cfg_file(p_type)
- with open(cfg_file) as fap:
- data = fap.read()
- if six.PY3:
- data = data.encode('utf-8')
-
- file_hash = hashlib.md5(data).hexdigest()
- return file_hash
+ return PipelineManager(cfg_file, transformer_manager or default,
+ SAMPLE_TYPE)
def setup_polling():
"""Setup polling manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
- return _setup_polling_manager(cfg_file)
+ return PollingManager(cfg_file)
def get_pipeline_grouping_key(pipe):
diff --git a/ceilometer/service_base.py b/ceilometer/service_base.py
index 0953d237..a622902b 100644
--- a/ceilometer/service_base.py
+++ b/ceilometer/service_base.py
@@ -20,7 +20,7 @@ from oslo_config import cfg
from oslo_log import log
import six
-from ceilometer.i18n import _LE, _LI
+from ceilometer.i18n import _LE
from ceilometer import pipeline
from ceilometer import utils
@@ -37,15 +37,6 @@ class PipelineBasedService(cotyledon.Service):
def init_pipeline_refresh(self):
"""Initializes pipeline refresh state."""
self.clear_pipeline_validation_status()
- if cfg.CONF.refresh_pipeline_cfg:
- self.set_pipeline_mtime(pipeline.get_pipeline_mtime())
- self.set_pipeline_hash(pipeline.get_pipeline_hash())
-
- if cfg.CONF.refresh_event_pipeline_cfg:
- self.set_pipeline_mtime(pipeline.get_pipeline_mtime(
- pipeline.EVENT_TYPE), pipeline.EVENT_TYPE)
- self.set_pipeline_hash(pipeline.get_pipeline_hash(
- pipeline.EVENT_TYPE), pipeline.EVENT_TYPE)
self.refresh_pipeline_periodic = None
if (cfg.CONF.refresh_pipeline_cfg or
@@ -60,90 +51,52 @@ class PipelineBasedService(cotyledon.Service):
self.refresh_pipeline_periodic.stop()
self.refresh_pipeline_periodic.wait()
- def get_pipeline_mtime(self, p_type=pipeline.SAMPLE_TYPE):
- return (self.event_pipeline_mtime if p_type == pipeline.EVENT_TYPE else
- self.pipeline_mtime)
-
- def set_pipeline_mtime(self, mtime, p_type=pipeline.SAMPLE_TYPE):
- if p_type == pipeline.EVENT_TYPE:
- self.event_pipeline_mtime = mtime
- else:
- self.pipeline_mtime = mtime
-
- def get_pipeline_hash(self, p_type=pipeline.SAMPLE_TYPE):
- return (self.event_pipeline_hash if p_type == pipeline.EVENT_TYPE else
- self.pipeline_hash)
-
- def set_pipeline_hash(self, _hash, p_type=pipeline.SAMPLE_TYPE):
- if p_type == pipeline.EVENT_TYPE:
- self.event_pipeline_hash = _hash
- else:
- self.pipeline_hash = _hash
-
@abc.abstractmethod
def reload_pipeline(self):
"""Reload pipeline in the agents."""
- def pipeline_changed(self, p_type=pipeline.SAMPLE_TYPE):
- """Returns hash of changed pipeline else False."""
-
- pipeline_mtime = self.get_pipeline_mtime(p_type)
- mtime = pipeline.get_pipeline_mtime(p_type)
- if mtime > pipeline_mtime:
- LOG.info(_LI('Pipeline configuration file has been updated.'))
-
- self.set_pipeline_mtime(mtime, p_type)
- _hash = pipeline.get_pipeline_hash(p_type)
- pipeline_hash = self.get_pipeline_hash(p_type)
- if _hash != pipeline_hash:
- LOG.info(_LI("Detected change in pipeline configuration."))
- return _hash
- return False
-
def refresh_pipeline(self):
"""Refreshes appropriate pipeline, then delegates to agent."""
if cfg.CONF.refresh_pipeline_cfg:
- pipeline_hash = self.pipeline_changed()
+ manager = None
+ if hasattr(self, 'pipeline_manager'):
+ manager = self.pipeline_manager
+ elif hasattr(self, 'polling_manager'):
+ manager = self.polling_manager
+ pipeline_hash = manager.cfg_changed() if manager else None
if pipeline_hash:
try:
+ LOG.debug("Pipeline has been refreshed. "
+ "old hash: %(old)s, new hash: %(new)s",
+ {'old': manager.cfg_hash,
+ 'new': pipeline_hash})
# Pipeline in the notification agent.
if hasattr(self, 'pipeline_manager'):
self.pipeline_manager = pipeline.setup_pipeline()
# Polling in the polling agent.
elif hasattr(self, 'polling_manager'):
self.polling_manager = pipeline.setup_polling()
- LOG.debug("Pipeline has been refreshed. "
- "old hash: %(old)s, new hash: %(new)s",
- {'old': self.pipeline_hash,
- 'new': pipeline_hash})
- self.set_pipeline_hash(pipeline_hash)
self.pipeline_validated = True
except Exception as err:
- LOG.debug("Active pipeline config's hash is %s",
- self.pipeline_hash)
LOG.exception(_LE('Unable to load changed pipeline: %s')
% err)
if cfg.CONF.refresh_event_pipeline_cfg:
- ev_pipeline_hash = self.pipeline_changed(pipeline.EVENT_TYPE)
+ # Pipeline in the notification agent.
+ manager = (self.event_pipeline_manager
+ if hasattr(self, 'event_pipeline_manager') else None)
+ ev_pipeline_hash = manager.cfg_changed()
if ev_pipeline_hash:
try:
- # Pipeline in the notification agent.
- if hasattr(self, 'event_pipeline_manager'):
- self.event_pipeline_manager = (pipeline.
- setup_event_pipeline())
-
LOG.debug("Event Pipeline has been refreshed. "
"old hash: %(old)s, new hash: %(new)s",
- {'old': self.event_pipeline_hash,
+ {'old': manager.cfg_hash,
'new': ev_pipeline_hash})
- self.set_pipeline_hash(ev_pipeline_hash,
- pipeline.EVENT_TYPE)
+ self.event_pipeline_manager = (pipeline.
+ setup_event_pipeline())
self.event_pipeline_validated = True
except Exception as err:
- LOG.debug("Active event pipeline config's hash is %s",
- self.event_pipeline_hash)
LOG.exception(_LE('Unable to load changed event pipeline:'
' %s') % err)
diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py
index 3aa16363..02452d1a 100644
--- a/ceilometer/tests/functional/test_notification.py
+++ b/ceilometer/tests/functional/test_notification.py
@@ -287,7 +287,7 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification):
self.srv.run()
self.addCleanup(self.srv.terminate)
- pipeline = self.srv.pipeline_hash
+ pipeline = self.srv.pipeline_manager.cfg_hash
# Modify the collection targets
updated_pipeline_cfg_file = self.setup_pipeline(['vcpus',
@@ -297,7 +297,7 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification):
shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file)
start = time.time()
while time.time() - start < 10:
- if pipeline != self.srv.pipeline_hash:
+ if pipeline != self.srv.pipeline_manager.cfg_hash:
break
else:
self.fail("Pipeline failed to reload")
@@ -312,7 +312,7 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification):
self.srv.run()
self.addCleanup(self.srv.terminate)
- pipeline = self.srv.event_pipeline_hash
+ pipeline = self.srv.event_pipeline_manager.cfg_hash
# Modify the collection targets
updated_ev_pipeline_cfg_file = self.setup_event_pipeline(
@@ -323,7 +323,7 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification):
shutil.move(updated_ev_pipeline_cfg_file, ev_pipeline_cfg_file)
start = time.time()
while time.time() - start < 10:
- if pipeline != self.srv.event_pipeline_hash:
+ if pipeline != self.srv.event_pipeline_manager.cfg_hash:
break
else:
self.fail("Pipeline failed to reload")
diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py
index 41842f6d..08bc5a83 100644
--- a/ceilometer/tests/pipeline_base.py
+++ b/ceilometer/tests/pipeline_base.py
@@ -17,6 +17,8 @@
import abc
import copy
import datetime
+import os
+import tempfile
import traceback
import mock
@@ -25,6 +27,7 @@ from oslotest import base
from oslotest import mockpatch
import six
from stevedore import extension
+import yaml
from ceilometer import pipeline
from ceilometer import publisher
@@ -127,6 +130,11 @@ class BasePipelineTestCase(base.BaseTestCase):
def handle_sample(counter):
raise Exception()
+ def cfg2file(self, data):
+ self.tmp_cfg.write(yaml.safe_dump(data))
+ self.tmp_cfg.close()
+ return self.tmp_cfg.name
+
def setUp(self):
super(BasePipelineTestCase, self).setUp()
@@ -149,6 +157,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager.__getitem__.side_effect = \
self.fake_tem_get_ext
+ self.tmp_cfg = tempfile.NamedTemporaryFile(mode='w', delete=False)
self._setup_pipeline_cfg()
self._reraise_exception = True
@@ -156,6 +165,10 @@ class BasePipelineTestCase(base.BaseTestCase):
'ceilometer.pipeline.LOG.exception',
side_effect=self._handle_reraise_exception))
+ def tearDown(self):
+ os.unlink(self.tmp_cfg.name)
+ super(BasePipelineTestCase, self).tearDown()
+
def _handle_reraise_exception(self, msg):
if self._reraise_exception:
raise Exception(traceback.format_exc())
@@ -191,7 +204,7 @@ class BasePipelineTestCase(base.BaseTestCase):
def _exception_create_pipelinemanager(self):
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
- self.pipeline_cfg,
+ self.cfg2file(self.pipeline_cfg),
self.transformer_manager)
def test_no_counters(self):
@@ -200,7 +213,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_no_transformers(self):
self._unset_pipeline_cfg('transformers')
- pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager)
+ pipeline.PipelineManager(self.cfg2file(self.pipeline_cfg),
+ self.transformer_manager)
def test_no_name(self):
self._unset_pipeline_cfg('name')
@@ -208,9 +222,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_no_interval(self):
self._unset_pipeline_cfg('interval')
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
-
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
self.assertEqual(600, pipe.get_interval())
@@ -255,16 +268,14 @@ class BasePipelineTestCase(base.BaseTestCase):
self._exception_create_pipelinemanager()
def test_get_interval(self):
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
-
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
self.assertEqual(5, pipe.get_interval())
def test_publisher_transformer_invoked(self):
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
-
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -278,8 +289,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_included_counters(self):
counter_cfg = ['a', 'b']
self._set_pipeline_cfg('counters', counter_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -310,8 +321,8 @@ class BasePipelineTestCase(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.LOG')
def test_none_volume_counter(self, LOG):
self._set_pipeline_cfg('counters', ['empty_volume'])
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
publisher = pipeline_manager.pipelines[0].publishers[0]
test_s = sample.Sample(
@@ -343,8 +354,8 @@ class BasePipelineTestCase(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.LOG')
def test_fake_volume_counter(self, LOG):
self._set_pipeline_cfg('counters', ['fake_volume'])
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
publisher = pipeline_manager.pipelines[0].publishers[0]
test_s = sample.Sample(
@@ -376,8 +387,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_counter_dont_match(self):
counter_cfg = ['nomatch']
self._set_pipeline_cfg('counters', counter_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -388,8 +399,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_wildcard_counter(self):
counter_cfg = ['*']
self._set_pipeline_cfg('counters', counter_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -401,15 +412,15 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_wildcard_excluded_counters(self):
counter_cfg = ['*', '!a']
self._set_pipeline_cfg('counters', counter_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_meter('a'))
def test_wildcard_excluded_counters_not_excluded(self):
counter_cfg = ['*', '!b']
self._set_pipeline_cfg('counters', counter_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@@ -420,8 +431,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_all_excluded_counters_not_excluded(self):
counter_cfg = ['!b', '!c']
self._set_pipeline_cfg('counters', counter_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -435,8 +446,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_all_excluded_counters_is_excluded(self):
counter_cfg = ['!a', '!c']
self._set_pipeline_cfg('counters', counter_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_meter('a'))
self.assertTrue(pipeline_manager.pipelines[0].support_meter('b'))
self.assertFalse(pipeline_manager.pipelines[0].support_meter('c'))
@@ -444,8 +455,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_wildcard_and_excluded_wildcard_counters(self):
counter_cfg = ['*', '!disk.*']
self._set_pipeline_cfg('counters', counter_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].
support_meter('disk.read.bytes'))
self.assertTrue(pipeline_manager.pipelines[0].support_meter('cpu'))
@@ -453,8 +464,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_included_counter_and_wildcard_counters(self):
counter_cfg = ['cpu', 'disk.*']
self._set_pipeline_cfg('counters', counter_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertTrue(pipeline_manager.pipelines[0].
support_meter('disk.read.bytes'))
self.assertTrue(pipeline_manager.pipelines[0].support_meter('cpu'))
@@ -464,8 +475,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_excluded_counter_and_excluded_wildcard_counters(self):
counter_cfg = ['!cpu', '!disk.*']
self._set_pipeline_cfg('counters', counter_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].
support_meter('disk.read.bytes'))
self.assertFalse(pipeline_manager.pipelines[0].support_meter('cpu'))
@@ -475,8 +486,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_pipeline(self):
self._augment_pipeline_cfg()
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -512,8 +523,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_pipeline_exception(self):
self._reraise_exception = False
self._break_pipeline_cfg()
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -545,8 +556,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_none_transformer_pipeline(self):
self._set_pipeline_cfg('transformers', None)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@@ -556,8 +567,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_empty_transformer_pipeline(self):
self._set_pipeline_cfg('transformers', [])
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@@ -577,9 +588,8 @@ class BasePipelineTestCase(base.BaseTestCase):
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
-
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -612,8 +622,8 @@ class BasePipelineTestCase(base.BaseTestCase):
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -650,8 +660,8 @@ class BasePipelineTestCase(base.BaseTestCase):
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -667,9 +677,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_publisher(self):
self._set_pipeline_cfg('publishers', ['test://', 'new://'])
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
-
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -685,8 +694,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_publisher_isolation(self):
self._reraise_exception = False
self._set_pipeline_cfg('publishers', ['except://', 'new://'])
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -697,8 +706,8 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_multiple_counter_pipeline(self):
self._set_pipeline_cfg('counters', ['a', 'b'])
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter,
sample.Sample(
@@ -736,8 +745,8 @@ class BasePipelineTestCase(base.BaseTestCase):
},
]
self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(self.test_counter)
@@ -773,8 +782,8 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
self._set_pipeline_cfg('counters', ['a', 'b'])
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter,
sample.Sample(
@@ -807,8 +816,8 @@ class BasePipelineTestCase(base.BaseTestCase):
'parameters': {}
}]
self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
publisher = pipe.publishers[0]
@@ -847,9 +856,8 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={}
),
]
-
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(counters)
@@ -901,7 +909,6 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={}
),
]
-
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@@ -991,9 +998,8 @@ class BasePipelineTestCase(base.BaseTestCase):
'user_metadata': um},
),
]
-
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(counters)
@@ -1076,9 +1082,8 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={'cpu_number': 4}
),
]
-
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(counters)
@@ -1104,8 +1109,8 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['cpu'])
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
now = timeutils.utcnow()
@@ -1164,14 +1169,14 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_resources(self):
resources = ['test1://', 'test2://']
self._set_pipeline_cfg('resources', resources)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertEqual(resources,
pipeline_manager.pipelines[0].resources)
def test_no_resources(self):
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertEqual(0, len(pipeline_manager.pipelines[0].resources))
def _do_test_rate_of_change_mapping(self, pipe, meters, units):
@@ -1238,8 +1243,8 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['disk.read.bytes',
'disk.write.requests'])
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
meters = ('disk.read.bytes', 'disk.write.requests')
units = ('B', 'request')
@@ -1322,7 +1327,6 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={'version': '3.0'}
),
]
-
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@@ -1358,9 +1362,8 @@ class BasePipelineTestCase(base.BaseTestCase):
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={'version': '1.0'}
))
-
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(counters)
@@ -1554,9 +1557,8 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={'version': '2.0'}
)
]
-
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data([counters[0]])
@@ -1592,9 +1594,8 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={'version': '1.0'}
),
]
-
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(counters)
@@ -1640,9 +1641,8 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata={'version': '2.0'}
)
]
-
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data([counters[0]])
@@ -1787,9 +1787,8 @@ class BasePipelineTestCase(base.BaseTestCase):
timestamp=timeutils.utcnow().isoformat(),
resource_metadata=s.get('metadata')
))
-
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
for s in counters:
pipe.publish_data(s)
@@ -1907,9 +1906,8 @@ class BasePipelineTestCase(base.BaseTestCase):
timestamp=timeutils.utcnow().isoformat(),
resource_metadata=None
)
-
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data([counter])
@@ -1938,8 +1936,8 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['unrelated-sample'])
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
timeutils.advance_time_seconds(200)
pipe = pipeline_manager.pipelines[0]
pipe.flush()
@@ -1958,9 +1956,8 @@ class BasePipelineTestCase(base.BaseTestCase):
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['cpu'])
-
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(data)
@@ -2129,8 +2126,8 @@ class BasePipelineTestCase(base.BaseTestCase):
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertEqual(set(['resource_id', 'counter_name']),
set(pipeline.get_pipeline_grouping_key(
pipeline_manager.pipelines[0])))
@@ -2147,8 +2144,8 @@ class BasePipelineTestCase(base.BaseTestCase):
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
self.assertEqual(['counter_name'],
pipeline.get_pipeline_grouping_key(
pipeline_manager.pipelines[0]))
diff --git a/ceilometer/tests/unit/test_decoupled_pipeline.py b/ceilometer/tests/unit/test_decoupled_pipeline.py
index 283144af..d9d5f802 100644
--- a/ceilometer/tests/unit/test_decoupled_pipeline.py
+++ b/ceilometer/tests/unit/test_decoupled_pipeline.py
@@ -136,9 +136,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
'publishers': ['new'],
})
self.pipeline_cfg['sources'][0]['sinks'].append('second_sink')
-
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -179,9 +178,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
'resources': [],
'sinks': ['test_sink']
})
-
- pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(self.pipeline_cfg), self.transformer_manager)
with pipeline_manager.publisher() as p:
p([self.test_counter])
@@ -225,8 +223,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
pipeline_cfg = yaml.safe_load(data)
for s in pipeline_cfg['sinks']:
s['publishers'] = ['test://']
- pipeline_manager = pipeline.PipelineManager(pipeline_cfg,
- self.transformer_manager)
+ pipeline_manager = pipeline.PipelineManager(
+ self.cfg2file(pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[index]
self._do_test_rate_of_change_mapping(pipe, meters, units)
@@ -279,7 +277,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
})
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
- self.pipeline_cfg,
+ self.cfg2file(self.pipeline_cfg),
self.transformer_manager)
def test_duplicated_source_names(self):
@@ -292,5 +290,5 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
})
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
- self.pipeline_cfg,
+ self.cfg2file(self.pipeline_cfg),
self.transformer_manager)