diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-06-11 11:23:21 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-06-11 11:23:21 +0000 |
commit | 433a27f52e0811cf759b8c0bdafb4e30e7a77ae9 (patch) | |
tree | 426931665c98b6d0093223746eccc85c21fafe74 /heat | |
parent | 9d9c6f59a0326c0fced499d860932da2617eb316 (diff) | |
parent | 507555a585d22a6dc276344b7b97fa9a015e81e5 (diff) | |
download | heat-433a27f52e0811cf759b8c0bdafb4e30e7a77ae9.tar.gz |
Merge "Move Engine initialization into service start()"
Diffstat (limited to 'heat')
-rw-r--r-- | heat/engine/service.py | 43 | ||||
-rw-r--r-- | heat/tests/test_engine_service.py | 57 | ||||
-rw-r--r-- | heat/tests/test_metadata_refresh.py | 4 | ||||
-rw-r--r-- | heat/tests/test_validate.py | 146 |
4 files changed, 48 insertions, 202 deletions
diff --git a/heat/engine/service.py b/heat/engine/service.py index 3a6704b16..e63b6abc9 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -13,6 +13,7 @@ import functools import json +import os from oslo.config import cfg import six @@ -101,7 +102,7 @@ class ThreadGroupManager(object): :param cnxt: RPC context :param stack: Stack to be operated on :type stack: heat.engine.parser.Stack - :param engine_id: The UUID of the engine acquiring the lock + :param engine_id: The UUID of the engine/worker acquiring the lock :param func: Callable to be invoked in sub-thread :type func: function or instancemethod :param args: Args to be passed to func @@ -278,17 +279,22 @@ class EngineService(service.Service): def __init__(self, host, topic, manager=None): super(EngineService, self).__init__(host, topic) resources.initialise() - - self.engine_id = stack_lock.StackLock.generate_engine_id() - self.thread_group_mgr = ThreadGroupManager() + self.host = host + + # The following are initialized here, but assigned in start() which + # happens after the fork when spawning multiple worker processes + self.stack_watch = None + self.listener = None + self.engine_id = None + self.thread_group_mgr = None + + def create_periodic_tasks(self): + LOG.debug("Starting periodic watch tasks pid=%s" % os.getpid()) + # Note with multiple workers, the parent process hasn't called start() + # so we need to create a ThreadGroupManager here for the periodic tasks + if self.thread_group_mgr is None: + self.thread_group_mgr = ThreadGroupManager() self.stack_watch = StackWatch(self.thread_group_mgr) - self.listener = EngineListener(host, self.engine_id, - self.thread_group_mgr) - LOG.debug("Starting listener for engine %s" % self.engine_id) - self.listener.start() - - def start(self): - super(EngineService, self).start() # Create a periodic_watcher_task per-stack admin_context = context.get_admin_context() @@ -296,6 +302,16 @@ class EngineService(service.Service): for s in stacks: self.stack_watch.start_watch_task(s.id, admin_context) + def start(self): + self.thread_group_mgr = ThreadGroupManager() + self.engine_id = stack_lock.StackLock.generate_engine_id() + self.listener = EngineListener(self.host, self.engine_id, + self.thread_group_mgr) + LOG.debug("Starting listener for engine %s pid=%s, ppid=%s" % + (self.engine_id, os.getpid(), os.getppid())) + self.listener.start() + super(EngineService, self).start() + def stop(self): # Stop rpc connection at first for preventing new requests LOG.info(_("Attempting to stop engine service...")) @@ -524,8 +540,9 @@ class EngineService(service.Service): if (stack.action in (stack.CREATE, stack.ADOPT) and stack.status == stack.COMPLETE): - # Schedule a periodic watcher task for this stack - self.stack_watch.start_watch_task(stack.id, cnxt) + if self.stack_watch: + # Schedule a periodic watcher task for this stack + self.stack_watch.start_watch_task(stack.id, cnxt) else: LOG.warning(_("Stack create failed, status %s") % stack.status) diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index c48cbb577..9bda718e6 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -309,6 +309,9 @@ class DummyThreadGroup(object): *args, **kwargs): self.threads.append(callback) + def stop_timers(self): + pass + def add_thread(self, callback, *args, **kwargs): self.threads.append(callback) return self.pool.spawn(callback, *args, **kwargs) @@ -419,11 +422,8 @@ class StackServiceCreateUpdateDeleteTest(HeatTestCase): def setUp(self): super(StackServiceCreateUpdateDeleteTest, self).setUp() self.ctx = utils.dummy_context() - - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() self.man = service.EngineService('a-host', 'a-topic') + self.man.create_periodic_tasks() def _test_stack_create(self, stack_name): params = {'foo': 'bar'} @@ -1227,10 +1227,6 @@ class StackServiceUpdateSuspendedNotSupportedTest(HeatTestCase): def setUp(self): super(StackServiceUpdateSuspendedNotSupportedTest, self).setUp() self.ctx = utils.dummy_context() - - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() self.man = service.EngineService('a-host', 'a-topic') def test_stack_update_suspended(self): @@ -1264,11 +1260,8 @@ class StackServiceSuspendResumeTest(HeatTestCase): def setUp(self): super(StackServiceSuspendResumeTest, self).setUp() self.ctx = utils.dummy_context() - - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() self.man = service.EngineService('a-host', 'a-topic') + self.man.create_periodic_tasks() def test_stack_suspend(self): stack_name = 'service_suspend_test_stack' @@ -1340,11 +1333,8 @@ class StackServiceAuthorizeTest(HeatTestCase): super(StackServiceAuthorizeTest, self).setUp() self.ctx = utils.dummy_context(tenant_id='stack_service_test_tenant') - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - self.eng = service.EngineService('a-host', 'a-topic') + self.eng.engine_id = 'engine-fake-uuid' cfg.CONF.set_default('heat_stack_user_role', 'stack_user_role') res._register_class('ResourceWithPropsType', generic_rsrc.ResourceWithProps) @@ -1437,36 +1427,29 @@ class StackServiceTest(HeatTestCase): super(StackServiceTest, self).setUp() self.ctx = utils.dummy_context(tenant_id='stack_service_test_tenant') - - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - self.eng = service.EngineService('a-host', 'a-topic') + self.eng.create_periodic_tasks() + self.eng.engine_id = 'engine-fake-uuid' cfg.CONF.set_default('heat_stack_user_role', 'stack_user_role') res._register_class('ResourceWithPropsType', generic_rsrc.ResourceWithProps) + @mock.patch.object(service.StackWatch, 'start_watch_task') @mock.patch.object(service.db_api, 'stack_get_all') @mock.patch.object(service.service.Service, 'start') - def test_start_gets_all_stacks(self, mock_super_start, mock_stack_get_all): - mock_stack_get_all.return_value = [] - - self.eng.start() - mock_stack_get_all.assert_called_once_with(mock.ANY, tenant_safe=False) - - @mock.patch.object(service.db_api, 'stack_get_all') - @mock.patch.object(service.service.Service, 'start') - def test_start_watches_all_stacks(self, mock_super_start, mock_get_all): + def test_start_watches_all_stacks(self, mock_super_start, mock_get_all, + start_watch_task): s1 = mock.Mock(id=1) s2 = mock.Mock(id=2) mock_get_all.return_value = [s1, s2] - mock_watch = mock.Mock() - self.eng.stack_watch.start_watch_task = mock_watch + start_watch_task.return_value = None + + self.eng.thread_group_mgr = None + self.eng.create_periodic_tasks() - self.eng.start() - calls = mock_watch.call_args_list - self.assertEqual(2, mock_watch.call_count) + mock_get_all.assert_called_once_with(mock.ANY, tenant_safe=False) + calls = start_watch_task.call_args_list + self.assertEqual(2, start_watch_task.call_count) self.assertIn(mock.call(1, mock.ANY), calls) self.assertIn(mock.call(2, mock.ANY), calls) @@ -2628,10 +2611,6 @@ class SoftwareConfigServiceTest(HeatTestCase): def setUp(self): super(SoftwareConfigServiceTest, self).setUp() self.ctx = utils.dummy_context() - - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() self.engine = service.EngineService('a-host', 'a-topic') def _create_software_config( diff --git a/heat/tests/test_metadata_refresh.py b/heat/tests/test_metadata_refresh.py index f029b7781..057f858e9 100644 --- a/heat/tests/test_metadata_refresh.py +++ b/heat/tests/test_metadata_refresh.py @@ -199,10 +199,6 @@ class WaitCondMetadataUpdateTest(HeatTestCase): def setUp(self): super(WaitCondMetadataUpdateTest, self).setUp() self.fc = fakes.FakeKeystoneClient() - - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() self.man = service.EngineService('a-host', 'a-topic') cfg.CONF.set_default('heat_waitcondition_server_url', 'http://server.test:8000/v1/waitcondition') diff --git a/heat/tests/test_validate.py b/heat/tests/test_validate.py index 51ff90cf5..b850e70b0 100644 --- a/heat/tests/test_validate.py +++ b/heat/tests/test_validate.py @@ -817,31 +817,19 @@ class validateTest(HeatTestCase): def test_validate_ref_valid(self): t = template_format.parse(test_template_ref % 'WikiDatabase') - - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) self.assertEqual('test.', res['Description']) - self.m.VerifyAll() def test_validate_with_environment(self): test_template = test_template_ref % 'WikiDatabase' test_template = test_template.replace('AWS::EC2::Instance', 'My::Instance') t = template_format.parse(test_template) - - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') params = {'resource_registry': {'My::Instance': 'AWS::EC2::Instance'}} res = dict(engine.validate_template(None, t, params)) self.assertEqual('test.', res['Description']) - self.m.VerifyAll() def test_validate_hot_valid(self): t = template_format.parse( @@ -852,58 +840,30 @@ class validateTest(HeatTestCase): my_instance: type: AWS::EC2::Instance """) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) self.assertEqual('test.', res['Description']) - self.m.VerifyAll() def test_validate_ref_invalid(self): t = template_format.parse(test_template_ref % 'WikiDatabasez') - - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) self.assertNotEqual(res['Description'], 'Successfully validated') - self.m.VerifyAll() def test_validate_findinmap_valid(self): t = template_format.parse(test_template_findinmap_valid) - - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) self.assertEqual('test.', res['Description']) - self.m.VerifyAll() def test_validate_findinmap_invalid(self): t = template_format.parse(test_template_findinmap_invalid) - - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) self.assertNotEqual(res['Description'], 'Successfully validated') - self.m.VerifyAll() def test_validate_parameters(self): t = template_format.parse(test_template_ref % 'WikiDatabase') - - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) # Note: the assertion below does not expect a CFN dict of the parameter @@ -917,7 +877,6 @@ class validateTest(HeatTestCase): 'NoEcho': 'false', 'Label': 'KeyName'}} self.assertEqual(expected, res['Parameters']) - self.m.VerifyAll() def test_validate_hot_empty_parameters_valid(self): t = template_format.parse( @@ -929,21 +888,12 @@ class validateTest(HeatTestCase): my_instance: type: AWS::EC2::Instance """) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) self.assertEqual({}, res['Parameters']) - self.m.VerifyAll() def test_validate_hot_parameter_label(self): t = template_format.parse(test_template_hot_parameter_label) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) parameters = res['Parameters'] @@ -955,14 +905,9 @@ class validateTest(HeatTestCase): 'NoEcho': 'false', 'Label': 'Nova KeyPair Name'}} self.assertEqual(expected, parameters) - self.m.VerifyAll() def test_validate_hot_no_parameter_label(self): t = template_format.parse(test_template_hot_no_parameter_label) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) parameters = res['Parameters'] @@ -974,14 +919,9 @@ class validateTest(HeatTestCase): 'NoEcho': 'false', 'Label': 'KeyName'}} self.assertEqual(expected, parameters) - self.m.VerifyAll() def test_validate_cfn_parameter_label(self): t = template_format.parse(test_template_cfn_parameter_label) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) parameters = res['Parameters'] @@ -993,7 +933,6 @@ class validateTest(HeatTestCase): 'NoEcho': 'false', 'Label': 'Nova KeyPair Name'}} self.assertEqual(expected, parameters) - self.m.VerifyAll() def test_validate_hot_empty_resources_valid(self): t = template_format.parse( @@ -1002,16 +941,11 @@ class validateTest(HeatTestCase): description: test. resources: """) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) expected = {"Description": "test.", "Parameters": {}} self.assertEqual(expected, res) - self.m.VerifyAll() def test_validate_hot_empty_outputs_valid(self): t = template_format.parse( @@ -1020,40 +954,25 @@ class validateTest(HeatTestCase): description: test. outputs: """) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) expected = {"Description": "test.", "Parameters": {}} self.assertEqual(expected, res) - self.m.VerifyAll() def test_validate_properties(self): t = template_format.parse(test_template_invalid_property) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) self.assertEqual({'Error': 'Unknown Property UnknownProperty'}, res) - self.m.VerifyAll() def test_invalid_resources(self): t = template_format.parse(test_template_invalid_resources) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) self.assertEqual({'Error': 'Resources must contain Resource. ' 'Found a [string] instead'}, res) - self.m.VerifyAll() def test_invalid_section_cfn(self): t = template_format.parse( @@ -1069,15 +988,10 @@ class validateTest(HeatTestCase): } """) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t)) self.assertEqual({'Error': 'The template section is invalid: Output'}, res) - self.m.VerifyAll() def test_invalid_section_hot(self): t = template_format.parse( @@ -1089,79 +1003,49 @@ class validateTest(HeatTestCase): output: """) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t)) self.assertEqual({'Error': 'The template section is invalid: output'}, res) - self.m.VerifyAll() def test_unimplemented_property(self): t = template_format.parse(test_template_unimplemented_property) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) self.assertEqual( {'Error': 'Property SourceDestCheck not implemented yet'}, res) - self.m.VerifyAll() def test_invalid_deletion_policy(self): t = template_format.parse(test_template_invalid_deletion_policy) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) self.assertEqual({'Error': 'Invalid DeletionPolicy Destroy'}, res) - self.m.VerifyAll() def test_snapshot_deletion_policy(self): t = template_format.parse(test_template_snapshot_deletion_policy) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) self.assertEqual( {'Error': 'Snapshot DeletionPolicy not supported'}, res) - self.m.VerifyAll() @skipIf(try_import('cinderclient.v1.volume_backups') is None, 'unable to import volume_backups') def test_volume_snapshot_deletion_policy(self): t = template_format.parse(test_template_volume_snapshot) - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, t, {})) self.assertEqual({'Description': u'test.', 'Parameters': {}}, res) - self.m.VerifyAll() def test_validate_template_without_resources(self): hot_tpl = template_format.parse(''' heat_template_version: 2013-05-23 ''') - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, hot_tpl, {})) expected = {'Description': 'No description', 'Parameters': {}} self.assertEqual(expected, res) - self.m.VerifyAll() def test_validate_template_with_invalid_resource_type(self): hot_tpl = template_format.parse(''' @@ -1179,15 +1063,10 @@ class validateTest(HeatTestCase): foo: bar ''') - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, hot_tpl, {})) self.assertEqual({'Error': 'u\'"Type" is not a valid keyword ' 'inside a resource definition\''}, res) - self.m.VerifyAll() def test_validate_template_with_invalid_resource_properties(self): hot_tpl = template_format.parse(''' @@ -1205,15 +1084,10 @@ class validateTest(HeatTestCase): foo: bar ''') - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, hot_tpl, {})) self.assertEqual({'Error': 'u\'"Properties" is not a valid keyword ' 'inside a resource definition\''}, res) - self.m.VerifyAll() def test_validate_template_with_invalid_resource_matadata(self): hot_tpl = template_format.parse(''' @@ -1231,15 +1105,10 @@ class validateTest(HeatTestCase): foo: bar ''') - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, hot_tpl, {})) self.assertEqual({'Error': 'u\'"Metadata" is not a valid keyword ' 'inside a resource definition\''}, res) - self.m.VerifyAll() def test_validate_template_with_invalid_resource_depends_on(self): hot_tpl = template_format.parse(''' @@ -1257,15 +1126,10 @@ class validateTest(HeatTestCase): foo: bar ''') - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, hot_tpl, {})) self.assertEqual({'Error': 'u\'"DependsOn" is not a valid keyword ' 'inside a resource definition\''}, res) - self.m.VerifyAll() def test_validate_template_with_invalid_resource_deletion_polciy(self): hot_tpl = template_format.parse(''' @@ -1283,16 +1147,11 @@ class validateTest(HeatTestCase): foo: bar ''') - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, hot_tpl, {})) self.assertEqual({'Error': 'u\'"DeletionPolicy" is not a valid ' 'keyword inside a resource definition\''}, res) - self.m.VerifyAll() def test_validate_template_with_invalid_resource_update_policy(self): hot_tpl = template_format.parse(''' @@ -1310,16 +1169,11 @@ class validateTest(HeatTestCase): foo: bar ''') - self.m.StubOutWithMock(service.EngineListener, 'start') - service.EngineListener.start().AndReturn(None) - self.m.ReplayAll() - engine = service.EngineService('a', 't') res = dict(engine.validate_template(None, hot_tpl, {})) self.assertEqual({'Error': 'u\'"UpdatePolicy" is not a valid ' 'keyword inside a resource definition\''}, res) - self.m.VerifyAll() def test_unregistered_key(self): t = template_format.parse(test_unregistered_key) |