diff options
author | Angus Salkeld <asalkeld@mirantis.com> | 2014-12-18 10:52:16 +1000 |
---|---|---|
committer | Angus Salkeld <asalkeld@mirantis.com> | 2014-12-19 14:11:15 +1000 |
commit | 0191ce10c208c5f6453d9671eb6a3ae08b88a171 (patch) | |
tree | 193c326c65f8297fe084f55220c60bdd475c2777 | |
parent | 9f736de7576ca10e6c00c92de2a77312b9e306f3 (diff) | |
download | heat-0191ce10c208c5f6453d9671eb6a3ae08b88a171.tar.gz |
Separate StackWatch out into it's own module
In the interest in small modules that have a matching unit test file.
Part of blueprint decouple-nested
Change-Id: Ia40310007113b1111883a20db99cb7f754ae43bf
-rw-r--r-- | heat/engine/service.py | 87 | ||||
-rw-r--r-- | heat/engine/service_stack_watch.py | 107 | ||||
-rw-r--r-- | heat/tests/test_engine_service.py | 78 | ||||
-rw-r--r-- | heat/tests/test_engine_service_stack_watch.py | 110 |
4 files changed, 222 insertions, 160 deletions
diff --git a/heat/engine/service.py b/heat/engine/service.py index 5c0781444..33b582558 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -20,7 +20,6 @@ import eventlet from oslo.config import cfg from oslo import messaging from oslo.serialization import jsonutils -from oslo.utils import timeutils from osprofiler import profiler import requests import six @@ -44,6 +43,7 @@ from heat.engine import event as evt from heat.engine import parameter_groups from heat.engine import properties from heat.engine import resources +from heat.engine import service_stack_watch from heat.engine import stack as parser from heat.engine import stack_lock from heat.engine import template as templatem @@ -216,88 +216,6 @@ class ThreadGroupManager(object): event.send(message) -class StackWatch(object): - def __init__(self, thread_group_mgr): - self.thread_group_mgr = thread_group_mgr - - def start_watch_task(self, stack_id, cnxt): - - def stack_has_a_watchrule(sid): - wrs = db_api.watch_rule_get_all_by_stack(cnxt, sid) - - now = timeutils.utcnow() - start_watch_thread = False - for wr in wrs: - # reset the last_evaluated so we don't fire off alarms when - # the engine has not been running. - db_api.watch_rule_update(cnxt, wr.id, {'last_evaluated': now}) - - if wr.state != rpc_api.WATCH_STATE_CEILOMETER_CONTROLLED: - start_watch_thread = True - - children = db_api.stack_get_all_by_owner_id(cnxt, sid) - for child in children: - if stack_has_a_watchrule(child.id): - start_watch_thread = True - - return start_watch_thread - - if stack_has_a_watchrule(stack_id): - self.thread_group_mgr.add_timer( - stack_id, - self.periodic_watcher_task, - sid=stack_id) - - def check_stack_watches(self, sid): - # Retrieve the stored credentials & create context - # Require tenant_safe=False to the stack_get to defeat tenant - # scoping otherwise we fail to retrieve the stack - LOG.debug("Periodic watcher task for stack %s" % sid) - admin_context = context.get_admin_context() - db_stack = db_api.stack_get(admin_context, sid, tenant_safe=False, - eager_load=True) - if not db_stack: - LOG.error(_LE("Unable to retrieve stack %s for periodic task"), - sid) - return - stack = parser.Stack.load(admin_context, stack=db_stack, - use_stored_context=True) - - # recurse into any nested stacks. - children = db_api.stack_get_all_by_owner_id(admin_context, sid) - for child in children: - self.check_stack_watches(child.id) - - # Get all watchrules for this stack and evaluate them - try: - wrs = db_api.watch_rule_get_all_by_stack(admin_context, sid) - except Exception as ex: - LOG.warn(_LW('periodic_task db error watch rule removed? %(ex)s'), - ex) - return - - def run_alarm_action(stack, actions, details): - for action in actions: - action(details=details) - for res in stack.itervalues(): - res.metadata_update() - - for wr in wrs: - rule = watchrule.WatchRule.load(stack.context, watch=wr) - actions = rule.evaluate() - if actions: - self.thread_group_mgr.start(sid, run_alarm_action, stack, - actions, rule.get_details()) - - def periodic_watcher_task(self, sid): - """ - Periodic task, created for each stack, triggers watch-rule - evaluation for all rules defined for the stack - sid = stack ID - """ - self.check_stack_watches(sid) - - @profiler.trace_cls("rpc") class EngineListener(service.Service): ''' @@ -384,7 +302,8 @@ class EngineService(service.Service): # so we need to create a ThreadGroupManager here for the periodic tasks if self.thread_group_mgr is None: self.thread_group_mgr = ThreadGroupManager() - self.stack_watch = StackWatch(self.thread_group_mgr) + self.stack_watch = service_stack_watch.StackWatch( + self.thread_group_mgr) # Create a periodic_watcher_task per-stack admin_context = context.get_admin_context() diff --git a/heat/engine/service_stack_watch.py b/heat/engine/service_stack_watch.py new file mode 100644 index 000000000..ece76b957 --- /dev/null +++ b/heat/engine/service_stack_watch.py @@ -0,0 +1,107 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo.utils import timeutils + +from heat.common import context +from heat.common.i18n import _LE +from heat.common.i18n import _LW +from heat.db import api as db_api +from heat.engine import stack +from heat.engine import watchrule +from heat.openstack.common import log as logging +from heat.rpc import api as rpc_api + +LOG = logging.getLogger(__name__) + + +class StackWatch(object): + def __init__(self, thread_group_mgr): + self.thread_group_mgr = thread_group_mgr + + def start_watch_task(self, stack_id, cnxt): + + def stack_has_a_watchrule(sid): + wrs = db_api.watch_rule_get_all_by_stack(cnxt, sid) + + now = timeutils.utcnow() + start_watch_thread = False + for wr in wrs: + # reset the last_evaluated so we don't fire off alarms when + # the engine has not been running. + db_api.watch_rule_update(cnxt, wr.id, {'last_evaluated': now}) + + if wr.state != rpc_api.WATCH_STATE_CEILOMETER_CONTROLLED: + start_watch_thread = True + + children = db_api.stack_get_all_by_owner_id(cnxt, sid) + for child in children: + if stack_has_a_watchrule(child.id): + start_watch_thread = True + + return start_watch_thread + + if stack_has_a_watchrule(stack_id): + self.thread_group_mgr.add_timer( + stack_id, + self.periodic_watcher_task, + sid=stack_id) + + def check_stack_watches(self, sid): + # Retrieve the stored credentials & create context + # Require tenant_safe=False to the stack_get to defeat tenant + # scoping otherwise we fail to retrieve the stack + LOG.debug("Periodic watcher task for stack %s" % sid) + admin_context = context.get_admin_context() + db_stack = db_api.stack_get(admin_context, sid, tenant_safe=False, + eager_load=True) + if not db_stack: + LOG.error(_LE("Unable to retrieve stack %s for periodic task"), + sid) + return + stk = stack.Stack.load(admin_context, stack=db_stack, + use_stored_context=True) + + # recurse into any nested stacks. + children = db_api.stack_get_all_by_owner_id(admin_context, sid) + for child in children: + self.check_stack_watches(child.id) + + # Get all watchrules for this stack and evaluate them + try: + wrs = db_api.watch_rule_get_all_by_stack(admin_context, sid) + except Exception as ex: + LOG.warn(_LW('periodic_task db error watch rule removed? %(ex)s'), + ex) + return + + def run_alarm_action(stk, actions, details): + for action in actions: + action(details=details) + for res in stk.itervalues(): + res.metadata_update() + + for wr in wrs: + rule = watchrule.WatchRule.load(stk.context, watch=wr) + actions = rule.evaluate() + if actions: + self.thread_group_mgr.start(sid, run_alarm_action, stk, + actions, rule.get_details()) + + def periodic_watcher_task(self, sid): + """ + Periodic task, created for each stack, triggers watch-rule + evaluation for all rules defined for the stack + sid = stack ID + """ + self.check_stack_watches(sid) diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index c0db2c32c..143db8082 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -31,7 +31,6 @@ import six from heat.common import exception from heat.common import identifier from heat.common import template_format -from heat.common import urlfetch from heat.db import api as db_api from heat.engine.clients.os import glance from heat.engine.clients.os import keystone @@ -42,6 +41,7 @@ from heat.engine import properties from heat.engine import resource as res from heat.engine.resources import instance as instances from heat.engine import service +from heat.engine import service_stack_watch from heat.engine import stack as parser from heat.engine import stack_lock from heat.engine import template as templatem @@ -106,38 +106,6 @@ wp_template_no_default = ''' } ''' -nested_alarm_template = ''' -HeatTemplateFormatVersion: '2012-12-12' -Resources: - the_nested: - Type: AWS::CloudFormation::Stack - Properties: - TemplateURL: https://server.test/alarm.template -''' - -alarm_template = ''' -{ - "AWSTemplateFormatVersion" : "2010-09-09", - "Description" : "alarming", - "Resources" : { - "service_alarm": { - "Type": "AWS::CloudWatch::Alarm", - "Properties": { - "EvaluationPeriods": "1", - "AlarmActions": [], - "AlarmDescription": "do the thing", - "Namespace": "dev/null", - "Period": "300", - "ComparisonOperator": "GreaterThanThreshold", - "Statistic": "SampleCount", - "Threshold": "2", - "MetricName": "ServiceFailure" - } - } - } -} -''' - policy_template = ''' { "AWSTemplateFormatVersion" : "2010-09-09", @@ -1633,7 +1601,7 @@ class StackServiceTest(common.HeatTestCase): res._register_class('ResourceWithPropsType', generic_rsrc.ResourceWithProps) - @mock.patch.object(service.StackWatch, 'start_watch_task') + @mock.patch.object(service_stack_watch.StackWatch, 'start_watch_task') @mock.patch.object(service.db_api, 'stack_get_all') @mock.patch.object(service.service.Service, 'start') def test_start_watches_all_stacks(self, mock_super_start, mock_get_all, @@ -2633,48 +2601,6 @@ class StackServiceTest(common.HeatTestCase): self.m.VerifyAll() - @stack_context('periodic_watch_task_not_created') - def test_periodic_watch_task_not_created(self): - self.eng.thread_group_mgr.groups[self.stack.id] = DummyThreadGroup() - self.eng.stack_watch.start_watch_task(self.stack.id, self.ctx) - self.assertEqual( - [], self.eng.thread_group_mgr.groups[self.stack.id].threads) - - def test_periodic_watch_task_created(self): - stack = get_stack('period_watch_task_created', - utils.dummy_context(), - alarm_template) - self.stack = stack - self.m.ReplayAll() - stack.store() - stack.create() - self.eng.thread_group_mgr.groups[stack.id] = DummyThreadGroup() - self.eng.stack_watch.start_watch_task(stack.id, self.ctx) - expected = [self.eng.stack_watch.periodic_watcher_task] - observed = self.eng.thread_group_mgr.groups[stack.id].threads - self.assertEqual(expected, observed) - self.stack.delete() - - def test_periodic_watch_task_created_nested(self): - self.m.StubOutWithMock(urlfetch, 'get') - urlfetch.get('https://server.test/alarm.template').MultipleTimes().\ - AndReturn(alarm_template) - self.m.ReplayAll() - - stack = get_stack('period_watch_task_created_nested', - utils.dummy_context(), - nested_alarm_template) - setup_keystone_mocks(self.m, stack) - self.stack = stack - self.m.ReplayAll() - stack.store() - stack.create() - self.eng.thread_group_mgr.groups[stack.id] = DummyThreadGroup() - self.eng.stack_watch.start_watch_task(stack.id, self.ctx) - self.assertEqual([self.eng.stack_watch.periodic_watcher_task], - self.eng.thread_group_mgr.groups[stack.id].threads) - self.stack.delete() - @stack_context('service_show_watch_test_stack', False) def test_show_watch(self): # Insert two dummy watch rules into the DB diff --git a/heat/tests/test_engine_service_stack_watch.py b/heat/tests/test_engine_service_stack_watch.py new file mode 100644 index 000000000..739e12b3f --- /dev/null +++ b/heat/tests/test_engine_service_stack_watch.py @@ -0,0 +1,110 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from heat.engine import service_stack_watch +from heat.rpc import api as rpc_api +from heat.tests import common +from heat.tests import utils + + +class StackServiceWatcherTest(common.HeatTestCase): + + def setUp(self): + super(StackServiceWatcherTest, self).setUp() + + self.ctx = utils.dummy_context(tenant_id='stack_service_test_tenant') + self.patch('heat.engine.service.warnings') + + @mock.patch.object(service_stack_watch.db_api, 'stack_get_all_by_owner_id') + @mock.patch.object(service_stack_watch.db_api, + 'watch_rule_get_all_by_stack') + @mock.patch.object(service_stack_watch.db_api, 'watch_rule_update') + def test_periodic_watch_task_not_created(self, watch_rule_update, + watch_rule_get_all_by_stack, + stack_get_all_by_owner_id): + """If there is no cloud watch lite alarm, then don't create + a periodic task for it. + """ + stack_id = 83 + watch_rule_get_all_by_stack.return_value = [] + stack_get_all_by_owner_id.return_value = [] + tg = mock.Mock() + sw = service_stack_watch.StackWatch(tg) + sw.start_watch_task(stack_id, self.ctx) + + # assert that add_timer is NOT called. + self.assertEqual([], tg.add_timer.call_args_list) + + @mock.patch.object(service_stack_watch.db_api, 'stack_get_all_by_owner_id') + @mock.patch.object(service_stack_watch.db_api, + 'watch_rule_get_all_by_stack') + @mock.patch.object(service_stack_watch.db_api, 'watch_rule_update') + def test_periodic_watch_task_created(self, watch_rule_update, + watch_rule_get_all_by_stack, + stack_get_all_by_owner_id): + """If there is no cloud watch lite alarm, then DO create + a periodic task for it. + """ + stack_id = 86 + wr1 = mock.Mock() + wr1.id = 4 + wr1.state = rpc_api.WATCH_STATE_NODATA + + watch_rule_get_all_by_stack.return_value = [wr1] + stack_get_all_by_owner_id.return_value = [] + tg = mock.Mock() + sw = service_stack_watch.StackWatch(tg) + sw.start_watch_task(stack_id, self.ctx) + + # assert that add_timer IS called. + self.assertEqual([mock.call(stack_id, sw.periodic_watcher_task, + sid=stack_id)], + tg.add_timer.call_args_list) + + @mock.patch.object(service_stack_watch.db_api, 'stack_get_all_by_owner_id') + @mock.patch.object(service_stack_watch.db_api, + 'watch_rule_get_all_by_stack') + @mock.patch.object(service_stack_watch.db_api, 'watch_rule_update') + def test_periodic_watch_task_created_nested(self, watch_rule_update, + watch_rule_get_all_by_stack, + stack_get_all_by_owner_id): + stack_id = 90 + + def my_wr_get(cnxt, sid): + if sid == stack_id: + return [] + wr1 = mock.Mock() + wr1.id = 4 + wr1.state = rpc_api.WATCH_STATE_NODATA + return [wr1] + + watch_rule_get_all_by_stack.side_effect = my_wr_get + + def my_nested_get(cnxt, sid): + if sid == stack_id: + nested_stack = mock.Mock() + nested_stack.id = 55 + return [nested_stack] + return [] + + stack_get_all_by_owner_id.side_effect = my_nested_get + tg = mock.Mock() + sw = service_stack_watch.StackWatch(tg) + sw.start_watch_task(stack_id, self.ctx) + + # assert that add_timer IS called. + self.assertEqual([mock.call(stack_id, sw.periodic_watcher_task, + sid=stack_id)], + tg.add_timer.call_args_list) |