summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngus Salkeld <asalkeld@mirantis.com>2014-12-18 10:52:16 +1000
committerAngus Salkeld <asalkeld@mirantis.com>2014-12-19 14:11:15 +1000
commit0191ce10c208c5f6453d9671eb6a3ae08b88a171 (patch)
tree193c326c65f8297fe084f55220c60bdd475c2777
parent9f736de7576ca10e6c00c92de2a77312b9e306f3 (diff)
downloadheat-0191ce10c208c5f6453d9671eb6a3ae08b88a171.tar.gz
Separate StackWatch out into it's own module
In the interest in small modules that have a matching unit test file. Part of blueprint decouple-nested Change-Id: Ia40310007113b1111883a20db99cb7f754ae43bf
-rw-r--r--heat/engine/service.py87
-rw-r--r--heat/engine/service_stack_watch.py107
-rw-r--r--heat/tests/test_engine_service.py78
-rw-r--r--heat/tests/test_engine_service_stack_watch.py110
4 files changed, 222 insertions, 160 deletions
diff --git a/heat/engine/service.py b/heat/engine/service.py
index 5c0781444..33b582558 100644
--- a/heat/engine/service.py
+++ b/heat/engine/service.py
@@ -20,7 +20,6 @@ import eventlet
from oslo.config import cfg
from oslo import messaging
from oslo.serialization import jsonutils
-from oslo.utils import timeutils
from osprofiler import profiler
import requests
import six
@@ -44,6 +43,7 @@ from heat.engine import event as evt
from heat.engine import parameter_groups
from heat.engine import properties
from heat.engine import resources
+from heat.engine import service_stack_watch
from heat.engine import stack as parser
from heat.engine import stack_lock
from heat.engine import template as templatem
@@ -216,88 +216,6 @@ class ThreadGroupManager(object):
event.send(message)
-class StackWatch(object):
- def __init__(self, thread_group_mgr):
- self.thread_group_mgr = thread_group_mgr
-
- def start_watch_task(self, stack_id, cnxt):
-
- def stack_has_a_watchrule(sid):
- wrs = db_api.watch_rule_get_all_by_stack(cnxt, sid)
-
- now = timeutils.utcnow()
- start_watch_thread = False
- for wr in wrs:
- # reset the last_evaluated so we don't fire off alarms when
- # the engine has not been running.
- db_api.watch_rule_update(cnxt, wr.id, {'last_evaluated': now})
-
- if wr.state != rpc_api.WATCH_STATE_CEILOMETER_CONTROLLED:
- start_watch_thread = True
-
- children = db_api.stack_get_all_by_owner_id(cnxt, sid)
- for child in children:
- if stack_has_a_watchrule(child.id):
- start_watch_thread = True
-
- return start_watch_thread
-
- if stack_has_a_watchrule(stack_id):
- self.thread_group_mgr.add_timer(
- stack_id,
- self.periodic_watcher_task,
- sid=stack_id)
-
- def check_stack_watches(self, sid):
- # Retrieve the stored credentials & create context
- # Require tenant_safe=False to the stack_get to defeat tenant
- # scoping otherwise we fail to retrieve the stack
- LOG.debug("Periodic watcher task for stack %s" % sid)
- admin_context = context.get_admin_context()
- db_stack = db_api.stack_get(admin_context, sid, tenant_safe=False,
- eager_load=True)
- if not db_stack:
- LOG.error(_LE("Unable to retrieve stack %s for periodic task"),
- sid)
- return
- stack = parser.Stack.load(admin_context, stack=db_stack,
- use_stored_context=True)
-
- # recurse into any nested stacks.
- children = db_api.stack_get_all_by_owner_id(admin_context, sid)
- for child in children:
- self.check_stack_watches(child.id)
-
- # Get all watchrules for this stack and evaluate them
- try:
- wrs = db_api.watch_rule_get_all_by_stack(admin_context, sid)
- except Exception as ex:
- LOG.warn(_LW('periodic_task db error watch rule removed? %(ex)s'),
- ex)
- return
-
- def run_alarm_action(stack, actions, details):
- for action in actions:
- action(details=details)
- for res in stack.itervalues():
- res.metadata_update()
-
- for wr in wrs:
- rule = watchrule.WatchRule.load(stack.context, watch=wr)
- actions = rule.evaluate()
- if actions:
- self.thread_group_mgr.start(sid, run_alarm_action, stack,
- actions, rule.get_details())
-
- def periodic_watcher_task(self, sid):
- """
- Periodic task, created for each stack, triggers watch-rule
- evaluation for all rules defined for the stack
- sid = stack ID
- """
- self.check_stack_watches(sid)
-
-
@profiler.trace_cls("rpc")
class EngineListener(service.Service):
'''
@@ -384,7 +302,8 @@ class EngineService(service.Service):
# so we need to create a ThreadGroupManager here for the periodic tasks
if self.thread_group_mgr is None:
self.thread_group_mgr = ThreadGroupManager()
- self.stack_watch = StackWatch(self.thread_group_mgr)
+ self.stack_watch = service_stack_watch.StackWatch(
+ self.thread_group_mgr)
# Create a periodic_watcher_task per-stack
admin_context = context.get_admin_context()
diff --git a/heat/engine/service_stack_watch.py b/heat/engine/service_stack_watch.py
new file mode 100644
index 000000000..ece76b957
--- /dev/null
+++ b/heat/engine/service_stack_watch.py
@@ -0,0 +1,107 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo.utils import timeutils
+
+from heat.common import context
+from heat.common.i18n import _LE
+from heat.common.i18n import _LW
+from heat.db import api as db_api
+from heat.engine import stack
+from heat.engine import watchrule
+from heat.openstack.common import log as logging
+from heat.rpc import api as rpc_api
+
+LOG = logging.getLogger(__name__)
+
+
+class StackWatch(object):
+ def __init__(self, thread_group_mgr):
+ self.thread_group_mgr = thread_group_mgr
+
+ def start_watch_task(self, stack_id, cnxt):
+
+ def stack_has_a_watchrule(sid):
+ wrs = db_api.watch_rule_get_all_by_stack(cnxt, sid)
+
+ now = timeutils.utcnow()
+ start_watch_thread = False
+ for wr in wrs:
+ # reset the last_evaluated so we don't fire off alarms when
+ # the engine has not been running.
+ db_api.watch_rule_update(cnxt, wr.id, {'last_evaluated': now})
+
+ if wr.state != rpc_api.WATCH_STATE_CEILOMETER_CONTROLLED:
+ start_watch_thread = True
+
+ children = db_api.stack_get_all_by_owner_id(cnxt, sid)
+ for child in children:
+ if stack_has_a_watchrule(child.id):
+ start_watch_thread = True
+
+ return start_watch_thread
+
+ if stack_has_a_watchrule(stack_id):
+ self.thread_group_mgr.add_timer(
+ stack_id,
+ self.periodic_watcher_task,
+ sid=stack_id)
+
+ def check_stack_watches(self, sid):
+ # Retrieve the stored credentials & create context
+ # Require tenant_safe=False to the stack_get to defeat tenant
+ # scoping otherwise we fail to retrieve the stack
+ LOG.debug("Periodic watcher task for stack %s" % sid)
+ admin_context = context.get_admin_context()
+ db_stack = db_api.stack_get(admin_context, sid, tenant_safe=False,
+ eager_load=True)
+ if not db_stack:
+ LOG.error(_LE("Unable to retrieve stack %s for periodic task"),
+ sid)
+ return
+ stk = stack.Stack.load(admin_context, stack=db_stack,
+ use_stored_context=True)
+
+ # recurse into any nested stacks.
+ children = db_api.stack_get_all_by_owner_id(admin_context, sid)
+ for child in children:
+ self.check_stack_watches(child.id)
+
+ # Get all watchrules for this stack and evaluate them
+ try:
+ wrs = db_api.watch_rule_get_all_by_stack(admin_context, sid)
+ except Exception as ex:
+ LOG.warn(_LW('periodic_task db error watch rule removed? %(ex)s'),
+ ex)
+ return
+
+ def run_alarm_action(stk, actions, details):
+ for action in actions:
+ action(details=details)
+ for res in stk.itervalues():
+ res.metadata_update()
+
+ for wr in wrs:
+ rule = watchrule.WatchRule.load(stk.context, watch=wr)
+ actions = rule.evaluate()
+ if actions:
+ self.thread_group_mgr.start(sid, run_alarm_action, stk,
+ actions, rule.get_details())
+
+ def periodic_watcher_task(self, sid):
+ """
+ Periodic task, created for each stack, triggers watch-rule
+ evaluation for all rules defined for the stack
+ sid = stack ID
+ """
+ self.check_stack_watches(sid)
diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py
index c0db2c32c..143db8082 100644
--- a/heat/tests/test_engine_service.py
+++ b/heat/tests/test_engine_service.py
@@ -31,7 +31,6 @@ import six
from heat.common import exception
from heat.common import identifier
from heat.common import template_format
-from heat.common import urlfetch
from heat.db import api as db_api
from heat.engine.clients.os import glance
from heat.engine.clients.os import keystone
@@ -42,6 +41,7 @@ from heat.engine import properties
from heat.engine import resource as res
from heat.engine.resources import instance as instances
from heat.engine import service
+from heat.engine import service_stack_watch
from heat.engine import stack as parser
from heat.engine import stack_lock
from heat.engine import template as templatem
@@ -106,38 +106,6 @@ wp_template_no_default = '''
}
'''
-nested_alarm_template = '''
-HeatTemplateFormatVersion: '2012-12-12'
-Resources:
- the_nested:
- Type: AWS::CloudFormation::Stack
- Properties:
- TemplateURL: https://server.test/alarm.template
-'''
-
-alarm_template = '''
-{
- "AWSTemplateFormatVersion" : "2010-09-09",
- "Description" : "alarming",
- "Resources" : {
- "service_alarm": {
- "Type": "AWS::CloudWatch::Alarm",
- "Properties": {
- "EvaluationPeriods": "1",
- "AlarmActions": [],
- "AlarmDescription": "do the thing",
- "Namespace": "dev/null",
- "Period": "300",
- "ComparisonOperator": "GreaterThanThreshold",
- "Statistic": "SampleCount",
- "Threshold": "2",
- "MetricName": "ServiceFailure"
- }
- }
- }
-}
-'''
-
policy_template = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
@@ -1633,7 +1601,7 @@ class StackServiceTest(common.HeatTestCase):
res._register_class('ResourceWithPropsType',
generic_rsrc.ResourceWithProps)
- @mock.patch.object(service.StackWatch, 'start_watch_task')
+ @mock.patch.object(service_stack_watch.StackWatch, 'start_watch_task')
@mock.patch.object(service.db_api, 'stack_get_all')
@mock.patch.object(service.service.Service, 'start')
def test_start_watches_all_stacks(self, mock_super_start, mock_get_all,
@@ -2633,48 +2601,6 @@ class StackServiceTest(common.HeatTestCase):
self.m.VerifyAll()
- @stack_context('periodic_watch_task_not_created')
- def test_periodic_watch_task_not_created(self):
- self.eng.thread_group_mgr.groups[self.stack.id] = DummyThreadGroup()
- self.eng.stack_watch.start_watch_task(self.stack.id, self.ctx)
- self.assertEqual(
- [], self.eng.thread_group_mgr.groups[self.stack.id].threads)
-
- def test_periodic_watch_task_created(self):
- stack = get_stack('period_watch_task_created',
- utils.dummy_context(),
- alarm_template)
- self.stack = stack
- self.m.ReplayAll()
- stack.store()
- stack.create()
- self.eng.thread_group_mgr.groups[stack.id] = DummyThreadGroup()
- self.eng.stack_watch.start_watch_task(stack.id, self.ctx)
- expected = [self.eng.stack_watch.periodic_watcher_task]
- observed = self.eng.thread_group_mgr.groups[stack.id].threads
- self.assertEqual(expected, observed)
- self.stack.delete()
-
- def test_periodic_watch_task_created_nested(self):
- self.m.StubOutWithMock(urlfetch, 'get')
- urlfetch.get('https://server.test/alarm.template').MultipleTimes().\
- AndReturn(alarm_template)
- self.m.ReplayAll()
-
- stack = get_stack('period_watch_task_created_nested',
- utils.dummy_context(),
- nested_alarm_template)
- setup_keystone_mocks(self.m, stack)
- self.stack = stack
- self.m.ReplayAll()
- stack.store()
- stack.create()
- self.eng.thread_group_mgr.groups[stack.id] = DummyThreadGroup()
- self.eng.stack_watch.start_watch_task(stack.id, self.ctx)
- self.assertEqual([self.eng.stack_watch.periodic_watcher_task],
- self.eng.thread_group_mgr.groups[stack.id].threads)
- self.stack.delete()
-
@stack_context('service_show_watch_test_stack', False)
def test_show_watch(self):
# Insert two dummy watch rules into the DB
diff --git a/heat/tests/test_engine_service_stack_watch.py b/heat/tests/test_engine_service_stack_watch.py
new file mode 100644
index 000000000..739e12b3f
--- /dev/null
+++ b/heat/tests/test_engine_service_stack_watch.py
@@ -0,0 +1,110 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from heat.engine import service_stack_watch
+from heat.rpc import api as rpc_api
+from heat.tests import common
+from heat.tests import utils
+
+
+class StackServiceWatcherTest(common.HeatTestCase):
+
+ def setUp(self):
+ super(StackServiceWatcherTest, self).setUp()
+
+ self.ctx = utils.dummy_context(tenant_id='stack_service_test_tenant')
+ self.patch('heat.engine.service.warnings')
+
+ @mock.patch.object(service_stack_watch.db_api, 'stack_get_all_by_owner_id')
+ @mock.patch.object(service_stack_watch.db_api,
+ 'watch_rule_get_all_by_stack')
+ @mock.patch.object(service_stack_watch.db_api, 'watch_rule_update')
+ def test_periodic_watch_task_not_created(self, watch_rule_update,
+ watch_rule_get_all_by_stack,
+ stack_get_all_by_owner_id):
+ """If there is no cloud watch lite alarm, then don't create
+ a periodic task for it.
+ """
+ stack_id = 83
+ watch_rule_get_all_by_stack.return_value = []
+ stack_get_all_by_owner_id.return_value = []
+ tg = mock.Mock()
+ sw = service_stack_watch.StackWatch(tg)
+ sw.start_watch_task(stack_id, self.ctx)
+
+ # assert that add_timer is NOT called.
+ self.assertEqual([], tg.add_timer.call_args_list)
+
+ @mock.patch.object(service_stack_watch.db_api, 'stack_get_all_by_owner_id')
+ @mock.patch.object(service_stack_watch.db_api,
+ 'watch_rule_get_all_by_stack')
+ @mock.patch.object(service_stack_watch.db_api, 'watch_rule_update')
+ def test_periodic_watch_task_created(self, watch_rule_update,
+ watch_rule_get_all_by_stack,
+ stack_get_all_by_owner_id):
+ """If there is no cloud watch lite alarm, then DO create
+ a periodic task for it.
+ """
+ stack_id = 86
+ wr1 = mock.Mock()
+ wr1.id = 4
+ wr1.state = rpc_api.WATCH_STATE_NODATA
+
+ watch_rule_get_all_by_stack.return_value = [wr1]
+ stack_get_all_by_owner_id.return_value = []
+ tg = mock.Mock()
+ sw = service_stack_watch.StackWatch(tg)
+ sw.start_watch_task(stack_id, self.ctx)
+
+ # assert that add_timer IS called.
+ self.assertEqual([mock.call(stack_id, sw.periodic_watcher_task,
+ sid=stack_id)],
+ tg.add_timer.call_args_list)
+
+ @mock.patch.object(service_stack_watch.db_api, 'stack_get_all_by_owner_id')
+ @mock.patch.object(service_stack_watch.db_api,
+ 'watch_rule_get_all_by_stack')
+ @mock.patch.object(service_stack_watch.db_api, 'watch_rule_update')
+ def test_periodic_watch_task_created_nested(self, watch_rule_update,
+ watch_rule_get_all_by_stack,
+ stack_get_all_by_owner_id):
+ stack_id = 90
+
+ def my_wr_get(cnxt, sid):
+ if sid == stack_id:
+ return []
+ wr1 = mock.Mock()
+ wr1.id = 4
+ wr1.state = rpc_api.WATCH_STATE_NODATA
+ return [wr1]
+
+ watch_rule_get_all_by_stack.side_effect = my_wr_get
+
+ def my_nested_get(cnxt, sid):
+ if sid == stack_id:
+ nested_stack = mock.Mock()
+ nested_stack.id = 55
+ return [nested_stack]
+ return []
+
+ stack_get_all_by_owner_id.side_effect = my_nested_get
+ tg = mock.Mock()
+ sw = service_stack_watch.StackWatch(tg)
+ sw.start_watch_task(stack_id, self.ctx)
+
+ # assert that add_timer IS called.
+ self.assertEqual([mock.call(stack_id, sw.periodic_watcher_task,
+ sid=stack_id)],
+ tg.add_timer.call_args_list)