diff options
author | gord chung <gord@live.ca> | 2016-05-31 23:19:15 +0000 |
---|---|---|
committer | gord chung <gord@live.ca> | 2016-09-08 13:21:02 +0000 |
commit | 14827310bf6ce7f55d93e319c444a49514d5c6ef (patch) | |
tree | a2d38dd94c5effdb0efa5a7745ea088d72969d5f | |
parent | ba55669da58873b610a5dfab15bd476c434770ad (diff) | |
download | ceilometer-14827310bf6ce7f55d93e319c444a49514d5c6ef.tar.gz |
refactor service to be less pipeline dependent
first step to make pluggable notification service.
this moves a bit of the pipeline code from service to the pipeline
itself.
Change-Id: Ibcd803c8ffd42c59d1f2e2b53234ae42dc588c16
-rw-r--r-- | ceilometer/pipeline.py | 138 | ||||
-rw-r--r-- | ceilometer/service_base.py | 83 | ||||
-rw-r--r-- | ceilometer/tests/functional/test_notification.py | 8 | ||||
-rw-r--r-- | ceilometer/tests/pipeline_base.py | 215 | ||||
-rw-r--r-- | ceilometer/tests/unit/test_decoupled_pipeline.py | 18 |
5 files changed, 204 insertions, 258 deletions
diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index e6954efd..e174608a 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -619,7 +619,60 @@ EVENT_TYPE = {'pipeline': EventPipeline, 'sink': EventSink} -class PipelineManager(object): +class ConfigManagerBase(object): + """Base class for managing configuration file refresh""" + + def __init__(self): + self.cfg_loc = None + + def load_config(self, cfg_info): + """Load a configuration file and set its refresh values.""" + if isinstance(cfg_info, dict): + conf = cfg_info + else: + if not os.path.exists(cfg_info): + cfg_info = cfg.CONF.find_file(cfg_info) + with open(cfg_info) as fap: + data = fap.read() + + conf = yaml.safe_load(data) + self.cfg_loc = cfg_info + self.cfg_mtime = self.get_cfg_mtime() + self.cfg_hash = self.get_cfg_hash() + LOG.info("Config file: %s", conf) + return conf + + def get_cfg_mtime(self): + """Return modification time of cfg file""" + return os.path.getmtime(self.cfg_loc) if self.cfg_loc else None + + def get_cfg_hash(self): + """Return hash of configuration file""" + if not self.cfg_loc: + return None + + with open(self.cfg_loc) as fap: + data = fap.read() + if six.PY3: + data = data.encode('utf-8') + + file_hash = hashlib.md5(data).hexdigest() + return file_hash + + def cfg_changed(self): + """Returns hash of changed cfg else False.""" + mtime = self.get_cfg_mtime() + if mtime > self.cfg_mtime: + LOG.info(_LI('Configuration file has been updated.')) + self.cfg_mtime = mtime + _hash = self.get_cfg_hash() + if _hash != self.cfg_hash: + LOG.info(_LI("Detected change in configuration.")) + return _hash + return False + + +class PipelineManager(ConfigManagerBase): """Pipeline Manager Pipeline manager sets up pipelines according to config file @@ -628,7 +681,7 @@ class PipelineManager(object): """ - def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE): + def __init__(self, cfg_info, transformer_manager, p_type=SAMPLE_TYPE): """Setup the pipelines according to config. The configuration is supported as follows: @@ -695,6 +748,8 @@ class PipelineManager(object): Publisher's name is plugin name in setup.cfg """ + super(PipelineManager, self).__init__() + cfg = self.load_config(cfg_info) self.pipelines = [] if not ('sources' in cfg and 'sinks' in cfg): raise PipelineException("Both sources & sinks are required", @@ -746,17 +801,19 @@ class PipelineManager(object): return PublishContext(self.pipelines) -class PollingManager(object): +class PollingManager(ConfigManagerBase): """Polling Manager Polling manager sets up polling according to config file. """ - def __init__(self, cfg): + def __init__(self, cfg_info): """Setup the polling according to config. The configuration is the sources half of the Pipeline Config. """ + super(PollingManager, self).__init__() + cfg = self.load_config(cfg_info) self.sources = [] if not ('sources' in cfg and 'sinks' in cfg): raise PipelineException("Both sources & sinks are required", @@ -775,85 +832,26 @@ class PollingManager(object): unique_names.clear() -def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE): - if not os.path.exists(cfg_file): - cfg_file = cfg.CONF.find_file(cfg_file) - - LOG.debug("Pipeline config file: %s", cfg_file) - - with open(cfg_file) as fap: - data = fap.read() - - pipeline_cfg = yaml.safe_load(data) - LOG.info(_LI("Pipeline config: %s"), pipeline_cfg) - - return PipelineManager(pipeline_cfg, - transformer_manager or - extension.ExtensionManager( - 'ceilometer.transformer', - ), p_type) - - -def _setup_polling_manager(cfg_file): - if not os.path.exists(cfg_file): - cfg_file = cfg.CONF.find_file(cfg_file) - - LOG.debug("Polling config file: %s", cfg_file) - - with open(cfg_file) as fap: - data = fap.read() - - pipeline_cfg = yaml.safe_load(data) - LOG.info(_LI("Pipeline config: %s"), pipeline_cfg) - - return PollingManager(pipeline_cfg) - - def setup_event_pipeline(transformer_manager=None): """Setup event pipeline manager according to yaml config file.""" + default = extension.ExtensionManager('ceilometer.transformer') cfg_file = cfg.CONF.event_pipeline_cfg_file - return _setup_pipeline_manager(cfg_file, transformer_manager, EVENT_TYPE) + return PipelineManager(cfg_file, transformer_manager or default, + EVENT_TYPE) def setup_pipeline(transformer_manager=None): """Setup pipeline manager according to yaml config file.""" + default = extension.ExtensionManager('ceilometer.transformer') cfg_file = cfg.CONF.pipeline_cfg_file - return _setup_pipeline_manager(cfg_file, transformer_manager) - - -def _get_pipeline_cfg_file(p_type=SAMPLE_TYPE): - if p_type == EVENT_TYPE: - cfg_file = cfg.CONF.event_pipeline_cfg_file - else: - cfg_file = cfg.CONF.pipeline_cfg_file - - if not os.path.exists(cfg_file): - cfg_file = cfg.CONF.find_file(cfg_file) - - return cfg_file - - -def get_pipeline_mtime(p_type=SAMPLE_TYPE): - cfg_file = _get_pipeline_cfg_file(p_type) - return os.path.getmtime(cfg_file) - - -def get_pipeline_hash(p_type=SAMPLE_TYPE): - - cfg_file = _get_pipeline_cfg_file(p_type) - with open(cfg_file) as fap: - data = fap.read() - if six.PY3: - data = data.encode('utf-8') - - file_hash = hashlib.md5(data).hexdigest() - return file_hash + return PipelineManager(cfg_file, transformer_manager or default, + SAMPLE_TYPE) def setup_polling(): """Setup polling manager according to yaml config file.""" cfg_file = cfg.CONF.pipeline_cfg_file - return _setup_polling_manager(cfg_file) + return PollingManager(cfg_file) def get_pipeline_grouping_key(pipe): diff --git a/ceilometer/service_base.py b/ceilometer/service_base.py index 0953d237..a622902b 100644 --- a/ceilometer/service_base.py +++ b/ceilometer/service_base.py @@ -20,7 +20,7 @@ from oslo_config import cfg from oslo_log import log import six -from ceilometer.i18n import _LE, _LI +from ceilometer.i18n import _LE from ceilometer import pipeline from ceilometer import utils @@ -37,15 +37,6 @@ class PipelineBasedService(cotyledon.Service): def init_pipeline_refresh(self): """Initializes pipeline refresh state.""" self.clear_pipeline_validation_status() - if cfg.CONF.refresh_pipeline_cfg: - self.set_pipeline_mtime(pipeline.get_pipeline_mtime()) - self.set_pipeline_hash(pipeline.get_pipeline_hash()) - - if cfg.CONF.refresh_event_pipeline_cfg: - self.set_pipeline_mtime(pipeline.get_pipeline_mtime( - pipeline.EVENT_TYPE), pipeline.EVENT_TYPE) - self.set_pipeline_hash(pipeline.get_pipeline_hash( - pipeline.EVENT_TYPE), pipeline.EVENT_TYPE) self.refresh_pipeline_periodic = None if (cfg.CONF.refresh_pipeline_cfg or @@ -60,90 +51,52 @@ class PipelineBasedService(cotyledon.Service): self.refresh_pipeline_periodic.stop() self.refresh_pipeline_periodic.wait() - def get_pipeline_mtime(self, p_type=pipeline.SAMPLE_TYPE): - return (self.event_pipeline_mtime if p_type == pipeline.EVENT_TYPE else - self.pipeline_mtime) - - def set_pipeline_mtime(self, mtime, p_type=pipeline.SAMPLE_TYPE): - if p_type == pipeline.EVENT_TYPE: - self.event_pipeline_mtime = mtime - else: - self.pipeline_mtime = mtime - - def get_pipeline_hash(self, p_type=pipeline.SAMPLE_TYPE): - return (self.event_pipeline_hash if p_type == pipeline.EVENT_TYPE else - self.pipeline_hash) - - def set_pipeline_hash(self, _hash, p_type=pipeline.SAMPLE_TYPE): - if p_type == pipeline.EVENT_TYPE: - self.event_pipeline_hash = _hash - else: - self.pipeline_hash = _hash - @abc.abstractmethod def reload_pipeline(self): """Reload pipeline in the agents.""" - def pipeline_changed(self, p_type=pipeline.SAMPLE_TYPE): - """Returns hash of changed pipeline else False.""" - - pipeline_mtime = self.get_pipeline_mtime(p_type) - mtime = pipeline.get_pipeline_mtime(p_type) - if mtime > pipeline_mtime: - LOG.info(_LI('Pipeline configuration file has been updated.')) - - self.set_pipeline_mtime(mtime, p_type) - _hash = pipeline.get_pipeline_hash(p_type) - pipeline_hash = self.get_pipeline_hash(p_type) - if _hash != pipeline_hash: - LOG.info(_LI("Detected change in pipeline configuration.")) - return _hash - return False - def refresh_pipeline(self): """Refreshes appropriate pipeline, then delegates to agent.""" if cfg.CONF.refresh_pipeline_cfg: - pipeline_hash = self.pipeline_changed() + manager = None + if hasattr(self, 'pipeline_manager'): + manager = self.pipeline_manager + elif hasattr(self, 'polling_manager'): + manager = self.polling_manager + pipeline_hash = manager.cfg_changed() if manager else None if pipeline_hash: try: + LOG.debug("Pipeline has been refreshed. " + "old hash: %(old)s, new hash: %(new)s", + {'old': manager.cfg_hash, + 'new': pipeline_hash}) # Pipeline in the notification agent. if hasattr(self, 'pipeline_manager'): self.pipeline_manager = pipeline.setup_pipeline() # Polling in the polling agent. elif hasattr(self, 'polling_manager'): self.polling_manager = pipeline.setup_polling() - LOG.debug("Pipeline has been refreshed. " - "old hash: %(old)s, new hash: %(new)s", - {'old': self.pipeline_hash, - 'new': pipeline_hash}) - self.set_pipeline_hash(pipeline_hash) self.pipeline_validated = True except Exception as err: - LOG.debug("Active pipeline config's hash is %s", - self.pipeline_hash) LOG.exception(_LE('Unable to load changed pipeline: %s') % err) if cfg.CONF.refresh_event_pipeline_cfg: - ev_pipeline_hash = self.pipeline_changed(pipeline.EVENT_TYPE) + # Pipeline in the notification agent. + manager = (self.event_pipeline_manager + if hasattr(self, 'event_pipeline_manager') else None) + ev_pipeline_hash = manager.cfg_changed() if ev_pipeline_hash: try: - # Pipeline in the notification agent. - if hasattr(self, 'event_pipeline_manager'): - self.event_pipeline_manager = (pipeline. - setup_event_pipeline()) - LOG.debug("Event Pipeline has been refreshed. " "old hash: %(old)s, new hash: %(new)s", - {'old': self.event_pipeline_hash, + {'old': manager.cfg_hash, 'new': ev_pipeline_hash}) - self.set_pipeline_hash(ev_pipeline_hash, - pipeline.EVENT_TYPE) + self.event_pipeline_manager = (pipeline. + setup_event_pipeline()) self.event_pipeline_validated = True except Exception as err: - LOG.debug("Active event pipeline config's hash is %s", - self.event_pipeline_hash) LOG.exception(_LE('Unable to load changed event pipeline:' ' %s') % err) diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index 3aa16363..02452d1a 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -287,7 +287,7 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification): self.srv.run() self.addCleanup(self.srv.terminate) - pipeline = self.srv.pipeline_hash + pipeline = self.srv.pipeline_manager.cfg_hash # Modify the collection targets updated_pipeline_cfg_file = self.setup_pipeline(['vcpus', @@ -297,7 +297,7 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification): shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file) start = time.time() while time.time() - start < 10: - if pipeline != self.srv.pipeline_hash: + if pipeline != self.srv.pipeline_manager.cfg_hash: break else: self.fail("Pipeline failed to reload") @@ -312,7 +312,7 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification): self.srv.run() self.addCleanup(self.srv.terminate) - pipeline = self.srv.event_pipeline_hash + pipeline = self.srv.event_pipeline_manager.cfg_hash # Modify the collection targets updated_ev_pipeline_cfg_file = self.setup_event_pipeline( @@ -323,7 +323,7 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification): shutil.move(updated_ev_pipeline_cfg_file, ev_pipeline_cfg_file) start = time.time() while time.time() - start < 10: - if pipeline != self.srv.event_pipeline_hash: + if pipeline != self.srv.event_pipeline_manager.cfg_hash: break else: self.fail("Pipeline failed to reload") diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index 41842f6d..08bc5a83 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -17,6 +17,8 @@ import abc import copy import datetime +import os +import tempfile import traceback import mock @@ -25,6 +27,7 @@ from oslotest import base from oslotest import mockpatch import six from stevedore import extension +import yaml from ceilometer import pipeline from ceilometer import publisher @@ -127,6 +130,11 @@ class BasePipelineTestCase(base.BaseTestCase): def handle_sample(counter): raise Exception() + def cfg2file(self, data): + self.tmp_cfg.write(yaml.safe_dump(data)) + self.tmp_cfg.close() + return self.tmp_cfg.name + def setUp(self): super(BasePipelineTestCase, self).setUp() @@ -149,6 +157,7 @@ class BasePipelineTestCase(base.BaseTestCase): self.transformer_manager.__getitem__.side_effect = \ self.fake_tem_get_ext + self.tmp_cfg = tempfile.NamedTemporaryFile(mode='w', delete=False) self._setup_pipeline_cfg() self._reraise_exception = True @@ -156,6 +165,10 @@ class BasePipelineTestCase(base.BaseTestCase): 'ceilometer.pipeline.LOG.exception', side_effect=self._handle_reraise_exception)) + def tearDown(self): + os.unlink(self.tmp_cfg.name) + super(BasePipelineTestCase, self).tearDown() + def _handle_reraise_exception(self, msg): if self._reraise_exception: raise Exception(traceback.format_exc()) @@ -191,7 +204,7 @@ class BasePipelineTestCase(base.BaseTestCase): def _exception_create_pipelinemanager(self): self.assertRaises(pipeline.PipelineException, pipeline.PipelineManager, - self.pipeline_cfg, + self.cfg2file(self.pipeline_cfg), self.transformer_manager) def test_no_counters(self): @@ -200,7 +213,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_no_transformers(self): self._unset_pipeline_cfg('transformers') - pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) + pipeline.PipelineManager(self.cfg2file(self.pipeline_cfg), + self.transformer_manager) def test_no_name(self): self._unset_pipeline_cfg('name') @@ -208,9 +222,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_no_interval(self): self._unset_pipeline_cfg('interval') - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) - + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] self.assertEqual(600, pipe.get_interval()) @@ -255,16 +268,14 @@ class BasePipelineTestCase(base.BaseTestCase): self._exception_create_pipelinemanager() def test_get_interval(self): - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) - + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] self.assertEqual(5, pipe.get_interval()) def test_publisher_transformer_invoked(self): - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) - + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -278,8 +289,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_multiple_included_counters(self): counter_cfg = ['a', 'b'] self._set_pipeline_cfg('counters', counter_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -310,8 +321,8 @@ class BasePipelineTestCase(base.BaseTestCase): @mock.patch('ceilometer.pipeline.LOG') def test_none_volume_counter(self, LOG): self._set_pipeline_cfg('counters', ['empty_volume']) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) publisher = pipeline_manager.pipelines[0].publishers[0] test_s = sample.Sample( @@ -343,8 +354,8 @@ class BasePipelineTestCase(base.BaseTestCase): @mock.patch('ceilometer.pipeline.LOG') def test_fake_volume_counter(self, LOG): self._set_pipeline_cfg('counters', ['fake_volume']) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) publisher = pipeline_manager.pipelines[0].publishers[0] test_s = sample.Sample( @@ -376,8 +387,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_counter_dont_match(self): counter_cfg = ['nomatch'] self._set_pipeline_cfg('counters', counter_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -388,8 +399,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_wildcard_counter(self): counter_cfg = ['*'] self._set_pipeline_cfg('counters', counter_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -401,15 +412,15 @@ class BasePipelineTestCase(base.BaseTestCase): def test_wildcard_excluded_counters(self): counter_cfg = ['*', '!a'] self._set_pipeline_cfg('counters', counter_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) self.assertFalse(pipeline_manager.pipelines[0].support_meter('a')) def test_wildcard_excluded_counters_not_excluded(self): counter_cfg = ['*', '!b'] self._set_pipeline_cfg('counters', counter_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] @@ -420,8 +431,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_all_excluded_counters_not_excluded(self): counter_cfg = ['!b', '!c'] self._set_pipeline_cfg('counters', counter_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -435,8 +446,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_all_excluded_counters_is_excluded(self): counter_cfg = ['!a', '!c'] self._set_pipeline_cfg('counters', counter_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) self.assertFalse(pipeline_manager.pipelines[0].support_meter('a')) self.assertTrue(pipeline_manager.pipelines[0].support_meter('b')) self.assertFalse(pipeline_manager.pipelines[0].support_meter('c')) @@ -444,8 +455,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_wildcard_and_excluded_wildcard_counters(self): counter_cfg = ['*', '!disk.*'] self._set_pipeline_cfg('counters', counter_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) self.assertFalse(pipeline_manager.pipelines[0]. support_meter('disk.read.bytes')) self.assertTrue(pipeline_manager.pipelines[0].support_meter('cpu')) @@ -453,8 +464,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_included_counter_and_wildcard_counters(self): counter_cfg = ['cpu', 'disk.*'] self._set_pipeline_cfg('counters', counter_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) self.assertTrue(pipeline_manager.pipelines[0]. support_meter('disk.read.bytes')) self.assertTrue(pipeline_manager.pipelines[0].support_meter('cpu')) @@ -464,8 +475,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_excluded_counter_and_excluded_wildcard_counters(self): counter_cfg = ['!cpu', '!disk.*'] self._set_pipeline_cfg('counters', counter_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) self.assertFalse(pipeline_manager.pipelines[0]. support_meter('disk.read.bytes')) self.assertFalse(pipeline_manager.pipelines[0].support_meter('cpu')) @@ -475,8 +486,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_multiple_pipeline(self): self._augment_pipeline_cfg() - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -512,8 +523,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_multiple_pipeline_exception(self): self._reraise_exception = False self._break_pipeline_cfg() - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -545,8 +556,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_none_transformer_pipeline(self): self._set_pipeline_cfg('transformers', None) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] @@ -556,8 +567,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_empty_transformer_pipeline(self): self._set_pipeline_cfg('transformers', []) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) publisher = pipeline_manager.pipelines[0].publishers[0] @@ -577,9 +588,8 @@ class BasePipelineTestCase(base.BaseTestCase): }, ] self._set_pipeline_cfg('transformers', transformer_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) - + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -612,8 +622,8 @@ class BasePipelineTestCase(base.BaseTestCase): }, ] self._set_pipeline_cfg('transformers', transformer_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -650,8 +660,8 @@ class BasePipelineTestCase(base.BaseTestCase): }, ] self._set_pipeline_cfg('transformers', transformer_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -667,9 +677,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_multiple_publisher(self): self._set_pipeline_cfg('publishers', ['test://', 'new://']) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) - + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -685,8 +694,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_multiple_publisher_isolation(self): self._reraise_exception = False self._set_pipeline_cfg('publishers', ['except://', 'new://']) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -697,8 +706,8 @@ class BasePipelineTestCase(base.BaseTestCase): def test_multiple_counter_pipeline(self): self._set_pipeline_cfg('counters', ['a', 'b']) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter, sample.Sample( @@ -736,8 +745,8 @@ class BasePipelineTestCase(base.BaseTestCase): }, ] self._extend_pipeline_cfg('transformers', extra_transformer_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] pipe.publish_data(self.test_counter) @@ -773,8 +782,8 @@ class BasePipelineTestCase(base.BaseTestCase): ] self._extend_pipeline_cfg('transformers', extra_transformer_cfg) self._set_pipeline_cfg('counters', ['a', 'b']) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter, sample.Sample( @@ -807,8 +816,8 @@ class BasePipelineTestCase(base.BaseTestCase): 'parameters': {} }] self._extend_pipeline_cfg('transformers', extra_transformer_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] publisher = pipe.publishers[0] @@ -847,9 +856,8 @@ class BasePipelineTestCase(base.BaseTestCase): resource_metadata={} ), ] - - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] pipe.publish_data(counters) @@ -901,7 +909,6 @@ class BasePipelineTestCase(base.BaseTestCase): resource_metadata={} ), ] - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) pipe = pipeline_manager.pipelines[0] @@ -991,9 +998,8 @@ class BasePipelineTestCase(base.BaseTestCase): 'user_metadata': um}, ), ] - - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] pipe.publish_data(counters) @@ -1076,9 +1082,8 @@ class BasePipelineTestCase(base.BaseTestCase): resource_metadata={'cpu_number': 4} ), ] - - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] pipe.publish_data(counters) @@ -1104,8 +1109,8 @@ class BasePipelineTestCase(base.BaseTestCase): ] self._set_pipeline_cfg('transformers', transformer_cfg) self._set_pipeline_cfg('counters', ['cpu']) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] now = timeutils.utcnow() @@ -1164,14 +1169,14 @@ class BasePipelineTestCase(base.BaseTestCase): def test_resources(self): resources = ['test1://', 'test2://'] self._set_pipeline_cfg('resources', resources) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) self.assertEqual(resources, pipeline_manager.pipelines[0].resources) def test_no_resources(self): - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) self.assertEqual(0, len(pipeline_manager.pipelines[0].resources)) def _do_test_rate_of_change_mapping(self, pipe, meters, units): @@ -1238,8 +1243,8 @@ class BasePipelineTestCase(base.BaseTestCase): self._set_pipeline_cfg('transformers', transformer_cfg) self._set_pipeline_cfg('counters', ['disk.read.bytes', 'disk.write.requests']) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] meters = ('disk.read.bytes', 'disk.write.requests') units = ('B', 'request') @@ -1322,7 +1327,6 @@ class BasePipelineTestCase(base.BaseTestCase): resource_metadata={'version': '3.0'} ), ] - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) pipe = pipeline_manager.pipelines[0] @@ -1358,9 +1362,8 @@ class BasePipelineTestCase(base.BaseTestCase): timestamp=timeutils.utcnow().isoformat(), resource_metadata={'version': '1.0'} )) - - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] pipe.publish_data(counters) @@ -1554,9 +1557,8 @@ class BasePipelineTestCase(base.BaseTestCase): resource_metadata={'version': '2.0'} ) ] - - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] pipe.publish_data([counters[0]]) @@ -1592,9 +1594,8 @@ class BasePipelineTestCase(base.BaseTestCase): resource_metadata={'version': '1.0'} ), ] - - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] pipe.publish_data(counters) @@ -1640,9 +1641,8 @@ class BasePipelineTestCase(base.BaseTestCase): resource_metadata={'version': '2.0'} ) ] - - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] pipe.publish_data([counters[0]]) @@ -1787,9 +1787,8 @@ class BasePipelineTestCase(base.BaseTestCase): timestamp=timeutils.utcnow().isoformat(), resource_metadata=s.get('metadata') )) - - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] for s in counters: pipe.publish_data(s) @@ -1907,9 +1906,8 @@ class BasePipelineTestCase(base.BaseTestCase): timestamp=timeutils.utcnow().isoformat(), resource_metadata=None ) - - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] pipe.publish_data([counter]) @@ -1938,8 +1936,8 @@ class BasePipelineTestCase(base.BaseTestCase): ] self._set_pipeline_cfg('transformers', transformer_cfg) self._set_pipeline_cfg('counters', ['unrelated-sample']) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) timeutils.advance_time_seconds(200) pipe = pipeline_manager.pipelines[0] pipe.flush() @@ -1958,9 +1956,8 @@ class BasePipelineTestCase(base.BaseTestCase): ] self._set_pipeline_cfg('transformers', transformer_cfg) self._set_pipeline_cfg('counters', ['cpu']) - - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[0] pipe.publish_data(data) @@ -2129,8 +2126,8 @@ class BasePipelineTestCase(base.BaseTestCase): }, ] self._set_pipeline_cfg('transformers', transformer_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) self.assertEqual(set(['resource_id', 'counter_name']), set(pipeline.get_pipeline_grouping_key( pipeline_manager.pipelines[0]))) @@ -2147,8 +2144,8 @@ class BasePipelineTestCase(base.BaseTestCase): }, ] self._set_pipeline_cfg('transformers', transformer_cfg) - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) self.assertEqual(['counter_name'], pipeline.get_pipeline_grouping_key( pipeline_manager.pipelines[0])) diff --git a/ceilometer/tests/unit/test_decoupled_pipeline.py b/ceilometer/tests/unit/test_decoupled_pipeline.py index 283144af..d9d5f802 100644 --- a/ceilometer/tests/unit/test_decoupled_pipeline.py +++ b/ceilometer/tests/unit/test_decoupled_pipeline.py @@ -136,9 +136,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): 'publishers': ['new'], }) self.pipeline_cfg['sources'][0]['sinks'].append('second_sink') - - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -179,9 +178,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): 'resources': [], 'sinks': ['test_sink'] }) - - pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(self.pipeline_cfg), self.transformer_manager) with pipeline_manager.publisher() as p: p([self.test_counter]) @@ -225,8 +223,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): pipeline_cfg = yaml.safe_load(data) for s in pipeline_cfg['sinks']: s['publishers'] = ['test://'] - pipeline_manager = pipeline.PipelineManager(pipeline_cfg, - self.transformer_manager) + pipeline_manager = pipeline.PipelineManager( + self.cfg2file(pipeline_cfg), self.transformer_manager) pipe = pipeline_manager.pipelines[index] self._do_test_rate_of_change_mapping(pipe, meters, units) @@ -279,7 +277,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): }) self.assertRaises(pipeline.PipelineException, pipeline.PipelineManager, - self.pipeline_cfg, + self.cfg2file(self.pipeline_cfg), self.transformer_manager) def test_duplicated_source_names(self): @@ -292,5 +290,5 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): }) self.assertRaises(pipeline.PipelineException, pipeline.PipelineManager, - self.pipeline_cfg, + self.cfg2file(self.pipeline_cfg), self.transformer_manager) |