summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-08-03 22:58:08 +0000
committerGerrit Code Review <review@openstack.org>2018-08-03 22:58:08 +0000
commit68dd40db86c8797b66034c070f127341da069134 (patch)
tree255c14bd4f41fa6457a07c4c2494f29b32fc8095
parent53188f7e9278c3d73538b8234f6b430fe6d7c7c7 (diff)
parent2d2da745931bceeb784984a44d747fcaf6264a30 (diff)
downloadheat-68dd40db86c8797b66034c070f127341da069134.tar.gz
Merge "Eliminate client races in legacy operations"
-rw-r--r--heat/engine/service.py78
-rw-r--r--heat/engine/stack.py52
-rw-r--r--heat/tests/engine/service/test_stack_action.py8
-rw-r--r--heat/tests/engine/service/test_stack_events.py5
-rw-r--r--heat/tests/engine/service/test_stack_update.py19
-rw-r--r--releasenotes/notes/legacy-client-races-ba7a60cef5ec1694.yaml12
6 files changed, 136 insertions, 38 deletions
diff --git a/heat/engine/service.py b/heat/engine/service.py
index cffcbeffa..cf9528bf8 100644
--- a/heat/engine/service.py
+++ b/heat/engine/service.py
@@ -185,6 +185,11 @@ class ThreadGroupManager(object):
stack.ROLLBACK,
stack.UPDATE)):
stack.persist_state_and_release_lock(lock.engine_id)
+
+ notify = kwargs.get('notify')
+ if notify is not None:
+ assert not notify.signalled()
+ notify.signal()
else:
lock.release()
@@ -244,6 +249,38 @@ class ThreadGroupManager(object):
msg_queue.put_nowait(message)
+class NotifyEvent(object):
+ def __init__(self):
+ self._queue = eventlet.queue.LightQueue(1)
+ self._signalled = False
+
+ def signalled(self):
+ return self._signalled
+
+ def signal(self):
+ """Signal the event."""
+ if self._signalled:
+ return
+ self._signalled = True
+
+ self._queue.put(None)
+ # Yield control so that the waiting greenthread will get the message
+ # as soon as possible, so that the API handler can respond to the user.
+ # Another option would be to set the queue length to 0 (which would
+ # cause put() to block until the event has been seen, but many unit
+ # tests run in a single greenthread and would thus deadlock.
+ eventlet.sleep(0)
+
+ def wait(self):
+ """Wait for the event."""
+ try:
+ # There's no timeout argument to eventlet.event.Event available
+ # until eventlet 0.22.1, so use a queue.
+ self._queue.get(timeout=cfg.CONF.rpc_response_timeout)
+ except eventlet.queue.Empty:
+ LOG.warning('Timed out waiting for operation to start')
+
+
@profiler.trace_cls("rpc")
class EngineListener(object):
"""Listen on an AMQP queue named for the engine.
@@ -996,14 +1033,17 @@ class EngineService(service.ServiceBase):
new_stack=updated_stack)
else:
msg_queue = eventlet.queue.LightQueue()
+ stored_event = NotifyEvent()
th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
self.engine_id,
current_stack.update,
updated_stack,
- msg_queue=msg_queue)
+ msg_queue=msg_queue,
+ notify=stored_event)
th.link(self.thread_group_mgr.remove_msg_queue,
current_stack.id, msg_queue)
self.thread_group_mgr.add_msg_queue(current_stack.id, msg_queue)
+ stored_event.wait()
return dict(current_stack.identifier())
@context.request_context
@@ -1393,8 +1433,11 @@ class EngineService(service.ServiceBase):
# Successfully acquired lock
if acquire_result is None:
self.thread_group_mgr.stop_timers(stack.id)
+ stored = NotifyEvent()
self.thread_group_mgr.start_with_acquired_lock(stack, lock,
- stack.delete)
+ stack.delete,
+ notify=stored)
+ stored.wait()
return
# Current engine has the lock
@@ -1999,30 +2042,28 @@ class EngineService(service.ServiceBase):
@context.request_context
def stack_suspend(self, cnxt, stack_identity):
"""Handle request to perform suspend action on a stack."""
- def _stack_suspend(stack):
- LOG.debug("suspending stack %s", stack.name)
- stack.suspend()
-
s = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=s)
self.resource_enforcer.enforce_stack(stack, is_registered_policy=True)
+ stored_event = NotifyEvent()
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
- _stack_suspend, stack)
+ stack.suspend,
+ notify=stored_event)
+ stored_event.wait()
@context.request_context
def stack_resume(self, cnxt, stack_identity):
"""Handle request to perform a resume action on a stack."""
- def _stack_resume(stack):
- LOG.debug("resuming stack %s", stack.name)
- stack.resume()
-
s = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=s)
self.resource_enforcer.enforce_stack(stack, is_registered_policy=True)
+ stored_event = NotifyEvent()
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
- _stack_resume, stack)
+ stack.resume,
+ notify=stored_event)
+ stored_event.wait()
@context.request_context
def stack_snapshot(self, cnxt, stack_identity, name):
@@ -2094,15 +2135,13 @@ class EngineService(service.ServiceBase):
stack = parser.Stack.load(cnxt, stack=s)
LOG.info("Checking stack %s", stack.name)
+ stored_event = NotifyEvent()
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
- stack.check)
+ stack.check, notify=stored_event)
+ stored_event.wait()
@context.request_context
def stack_restore(self, cnxt, stack_identity, snapshot_id):
- def _stack_restore(stack, snapshot):
- LOG.debug("restoring stack %s", stack.name)
- stack.restore(snapshot)
-
s = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=s)
self.resource_enforcer.enforce_stack(stack, is_registered_policy=True)
@@ -2118,8 +2157,11 @@ class EngineService(service.ServiceBase):
action=stack.RESTORE,
new_stack=new_stack)
else:
+ stored_event = NotifyEvent()
self.thread_group_mgr.start_with_lock(
- cnxt, stack, self.engine_id, _stack_restore, stack, snapshot)
+ cnxt, stack, self.engine_id, stack.restore, snapshot,
+ notify=stored_event)
+ stored_event.wait()
@context.request_context
def stack_list_snapshots(self, cnxt, stack_identity):
diff --git a/heat/engine/stack.py b/heat/engine/stack.py
index 7dab91a7d..66eba3cbe 100644
--- a/heat/engine/stack.py
+++ b/heat/engine/stack.py
@@ -1122,7 +1122,8 @@ class Stack(collections.Mapping):
@scheduler.wrappertask
def stack_task(self, action, reverse=False, post_func=None,
- aggregate_exceptions=False, pre_completion_func=None):
+ aggregate_exceptions=False, pre_completion_func=None,
+ notify=None):
"""A task to perform an action on the stack.
All of the resources are traversed in forward or reverse dependency
@@ -1147,9 +1148,13 @@ class Stack(collections.Mapping):
'Failed stack pre-ops: %s' % six.text_type(e))
if callable(post_func):
post_func()
+ # No need to call notify.signal(), because persistence of the
+ # state is always deferred here.
return
self.state_set(action, self.IN_PROGRESS,
'Stack %s started' % action)
+ if notify is not None:
+ notify.signal()
stack_status = self.COMPLETE
reason = 'Stack %s completed successfully' % action
@@ -1208,12 +1213,13 @@ class Stack(collections.Mapping):
@profiler.trace('Stack.check', hide_args=False)
@reset_state_on_error
- def check(self):
+ def check(self, notify=None):
self.updated_time = oslo_timeutils.utcnow()
checker = scheduler.TaskRunner(
self.stack_task, self.CHECK,
post_func=self.supports_check_action,
- aggregate_exceptions=True)
+ aggregate_exceptions=True,
+ notify=notify)
checker()
def supports_check_action(self):
@@ -1281,7 +1287,7 @@ class Stack(collections.Mapping):
@profiler.trace('Stack.update', hide_args=False)
@reset_state_on_error
- def update(self, newstack, msg_queue=None):
+ def update(self, newstack, msg_queue=None, notify=None):
"""Update the stack.
Compare the current stack with newstack,
@@ -1296,7 +1302,7 @@ class Stack(collections.Mapping):
"""
self.updated_time = oslo_timeutils.utcnow()
updater = scheduler.TaskRunner(self.update_task, newstack,
- msg_queue=msg_queue)
+ msg_queue=msg_queue, notify=notify)
updater()
@profiler.trace('Stack.converge_stack', hide_args=False)
@@ -1540,11 +1546,14 @@ class Stack(collections.Mapping):
self.state_set(self.action, self.FAILED, six.text_type(reason))
@scheduler.wrappertask
- def update_task(self, newstack, action=UPDATE, msg_queue=None):
+ def update_task(self, newstack, action=UPDATE,
+ msg_queue=None, notify=None):
if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE):
LOG.error("Unexpected action %s passed to update!", action)
self.state_set(self.UPDATE, self.FAILED,
"Invalid action %s" % action)
+ if notify is not None:
+ notify.signal()
return
try:
@@ -1553,6 +1562,8 @@ class Stack(collections.Mapping):
except Exception as e:
self.state_set(action, self.FAILED, e.args[0] if e.args else
'Failed stack pre-ops: %s' % six.text_type(e))
+ if notify is not None:
+ notify.signal()
return
if self.status == self.IN_PROGRESS:
if action == self.ROLLBACK:
@@ -1561,6 +1572,8 @@ class Stack(collections.Mapping):
reason = _('Attempted to %s an IN_PROGRESS '
'stack') % action
self.reset_stack_and_resources_in_progress(reason)
+ if notify is not None:
+ notify.signal()
return
# Save a copy of the new template. To avoid two DB writes
@@ -1574,6 +1587,10 @@ class Stack(collections.Mapping):
self.status_reason = 'Stack %s started' % action
self._send_notification_and_add_event()
self.store()
+ # Notify the caller that the state is stored
+ if notify is not None:
+ notify.signal()
+
if prev_tmpl_id is not None:
raw_template_object.RawTemplate.delete(self.context, prev_tmpl_id)
@@ -1842,7 +1859,7 @@ class Stack(collections.Mapping):
@profiler.trace('Stack.delete', hide_args=False)
@reset_state_on_error
- def delete(self, action=DELETE, backup=False, abandon=False):
+ def delete(self, action=DELETE, backup=False, abandon=False, notify=None):
"""Delete all of the resources, and then the stack itself.
The action parameter is used to differentiate between a user
@@ -1858,12 +1875,16 @@ class Stack(collections.Mapping):
LOG.error("Unexpected action %s passed to delete!", action)
self.state_set(self.DELETE, self.FAILED,
"Invalid action %s" % action)
+ if notify is not None:
+ notify.signal()
return
stack_status = self.COMPLETE
reason = 'Stack %s completed successfully' % action
self.state_set(action, self.IN_PROGRESS, 'Stack %s started' %
action)
+ if notify is not None:
+ notify.signal()
backup_stack = self._backup_stack(False)
if backup_stack:
@@ -1927,7 +1948,7 @@ class Stack(collections.Mapping):
@profiler.trace('Stack.suspend', hide_args=False)
@reset_state_on_error
- def suspend(self):
+ def suspend(self, notify=None):
"""Suspend the stack.
Invokes handle_suspend for all stack resources.
@@ -1938,6 +1959,7 @@ class Stack(collections.Mapping):
other than move to SUSPEND_COMPLETE, so the resources must implement
handle_suspend for this to have any effect.
"""
+ LOG.debug("Suspending stack %s", self)
# No need to suspend if the stack has been suspended
if self.state == (self.SUSPEND, self.COMPLETE):
LOG.info('%s is already suspended', self)
@@ -1947,12 +1969,13 @@ class Stack(collections.Mapping):
sus_task = scheduler.TaskRunner(
self.stack_task,
action=self.SUSPEND,
- reverse=True)
+ reverse=True,
+ notify=notify)
sus_task(timeout=self.timeout_secs())
@profiler.trace('Stack.resume', hide_args=False)
@reset_state_on_error
- def resume(self):
+ def resume(self, notify=None):
"""Resume the stack.
Invokes handle_resume for all stack resources.
@@ -1963,6 +1986,7 @@ class Stack(collections.Mapping):
other than move to RESUME_COMPLETE, so the resources must implement
handle_resume for this to have any effect.
"""
+ LOG.debug("Resuming stack %s", self)
# No need to resume if the stack has been resumed
if self.state == (self.RESUME, self.COMPLETE):
LOG.info('%s is already resumed', self)
@@ -1972,7 +1996,8 @@ class Stack(collections.Mapping):
sus_task = scheduler.TaskRunner(
self.stack_task,
action=self.RESUME,
- reverse=False)
+ reverse=False,
+ notify=notify)
sus_task(timeout=self.timeout_secs())
@profiler.trace('Stack.snapshot', hide_args=False)
@@ -2034,16 +2059,17 @@ class Stack(collections.Mapping):
return newstack, template
@reset_state_on_error
- def restore(self, snapshot):
+ def restore(self, snapshot, notify=None):
"""Restore the given snapshot.
Invokes handle_restore on all resources.
"""
+ LOG.debug("Restoring stack %s", self)
self.updated_time = oslo_timeutils.utcnow()
newstack = self.restore_data(snapshot)[0]
updater = scheduler.TaskRunner(self.update_task, newstack,
- action=self.RESTORE)
+ action=self.RESTORE, notify=notify)
updater()
def get_availability_zones(self):
diff --git a/heat/tests/engine/service/test_stack_action.py b/heat/tests/engine/service/test_stack_action.py
index 1f01f3ac4..ccec6bcac 100644
--- a/heat/tests/engine/service/test_stack_action.py
+++ b/heat/tests/engine/service/test_stack_action.py
@@ -44,12 +44,14 @@ class StackServiceActionsTest(common.HeatTestCase):
thread = mock.MagicMock()
mock_link = self.patchobject(thread, 'link')
mock_start.return_value = thread
+ self.patchobject(service, 'NotifyEvent')
result = self.man.stack_suspend(self.ctx, stk.identifier())
self.assertIsNone(result)
mock_load.assert_called_once_with(self.ctx, stack=s)
mock_link.assert_called_once_with(mock.ANY)
- mock_start.assert_called_once_with(stk.id, mock.ANY, stk)
+ mock_start.assert_called_once_with(stk.id, stk.suspend,
+ notify=mock.ANY)
stk.delete()
@@ -64,13 +66,14 @@ class StackServiceActionsTest(common.HeatTestCase):
thread = mock.MagicMock()
mock_link = self.patchobject(thread, 'link')
mock_start.return_value = thread
+ self.patchobject(service, 'NotifyEvent')
result = self.man.stack_resume(self.ctx, stk.identifier())
self.assertIsNone(result)
mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
mock_link.assert_called_once_with(mock.ANY)
- mock_start.assert_called_once_with(stk.id, mock.ANY, stk)
+ mock_start.assert_called_once_with(stk.id, stk.resume, notify=mock.ANY)
stk.delete()
@@ -108,6 +111,7 @@ class StackServiceActionsTest(common.HeatTestCase):
stk = utils.parse_stack(t, stack_name=stack_name)
stk.check = mock.Mock()
+ self.patchobject(service, 'NotifyEvent')
mock_load.return_value = stk
mock_start.side_effect = self._mock_thread_start
diff --git a/heat/tests/engine/service/test_stack_events.py b/heat/tests/engine/service/test_stack_events.py
index 1c3bfa1e3..fa0e8e1d1 100644
--- a/heat/tests/engine/service/test_stack_events.py
+++ b/heat/tests/engine/service/test_stack_events.py
@@ -12,6 +12,8 @@
# under the License.
import mock
+from oslo_config import cfg
+from oslo_messaging import conffixture
from heat.engine import resource as res
from heat.engine.resources.aws.ec2 import instance as instances
@@ -94,6 +96,7 @@ class StackEventTest(common.HeatTestCase):
@tools.stack_context('service_event_list_deleted_resource')
@mock.patch.object(instances.Instance, 'handle_delete')
def test_event_list_deleted_resource(self, mock_delete):
+ self.useFixture(conffixture.ConfFixture(cfg.CONF))
mock_delete.return_value = None
res._register_class('GenericResourceType',
@@ -103,7 +106,7 @@ class StackEventTest(common.HeatTestCase):
thread.link = mock.Mock(return_value=None)
def run(stack_id, func, *args, **kwargs):
- func(*args)
+ func(*args, **kwargs)
return thread
self.eng.thread_group_mgr.start = run
diff --git a/heat/tests/engine/service/test_stack_update.py b/heat/tests/engine/service/test_stack_update.py
index 8c71b7ce1..ff4e7943f 100644
--- a/heat/tests/engine/service/test_stack_update.py
+++ b/heat/tests/engine/service/test_stack_update.py
@@ -15,6 +15,7 @@ import uuid
import eventlet.queue
import mock
from oslo_config import cfg
+from oslo_messaging import conffixture
from oslo_messaging.rpc import dispatcher
import six
@@ -44,6 +45,7 @@ class ServiceStackUpdateTest(common.HeatTestCase):
def setUp(self):
super(ServiceStackUpdateTest, self).setUp()
+ self.useFixture(conffixture.ConfFixture(cfg.CONF))
self.ctx = utils.dummy_context()
self.man = service.EngineService('a-host', 'a-topic')
self.man.thread_group_mgr = tools.DummyThreadGroupManager()
@@ -69,7 +71,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
mock_validate = self.patchobject(stk, 'validate', return_value=None)
msgq_mock = mock.Mock()
- self.patchobject(eventlet.queue, 'LightQueue', return_value=msgq_mock)
+ self.patchobject(eventlet.queue, 'LightQueue',
+ side_effect=[msgq_mock, eventlet.queue.LightQueue()])
# do update
api_args = {'timeout_mins': 60, rpc_api.PARAM_CONVERGE: True}
@@ -123,7 +126,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
self.patchobject(environment, 'Environment', return_value=stk.env)
self.patchobject(stk, 'validate', return_value=None)
self.patchobject(eventlet.queue, 'LightQueue',
- return_value=mock.Mock())
+ side_effect=[mock.Mock(),
+ eventlet.queue.LightQueue()])
mock_merge = self.patchobject(env_util, 'merge_environments')
@@ -186,7 +190,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
mock_validate = self.patchobject(stk, 'validate', return_value=None)
msgq_mock = mock.Mock()
- self.patchobject(eventlet.queue, 'LightQueue', return_value=msgq_mock)
+ self.patchobject(eventlet.queue, 'LightQueue',
+ side_effect=[msgq_mock, eventlet.queue.LightQueue()])
# do update
api_args = {'timeout_mins': 60, rpc_api.PARAM_CONVERGE: False}
@@ -245,6 +250,7 @@ class ServiceStackUpdateTest(common.HeatTestCase):
t['parameters']['newparam'] = {'type': 'number'}
with mock.patch('heat.engine.stack.Stack') as mock_stack:
stk.update = mock.Mock()
+ self.patchobject(service, 'NotifyEvent')
mock_stack.load.return_value = stk
mock_stack.validate.return_value = None
result = self.man.update_stack(self.ctx, stk.identifier(),
@@ -301,7 +307,8 @@ resources:
rpc_api.PARAM_CONVERGE: False}
with mock.patch('heat.engine.stack.Stack') as mock_stack:
- stk.update = mock.Mock()
+ loaded_stack.update = mock.Mock()
+ self.patchobject(service, 'NotifyEvent')
mock_stack.load.return_value = loaded_stack
mock_stack.validate.return_value = None
result = self.man.update_stack(self.ctx, stk.identifier(),
@@ -344,6 +351,7 @@ resources:
t['parameters']['newparam'] = {'type': 'number'}
with mock.patch('heat.engine.stack.Stack') as mock_stack:
stk.update = mock.Mock()
+ self.patchobject(service, 'NotifyEvent')
mock_stack.load.return_value = stk
mock_stack.validate.return_value = None
result = self.man.update_stack(self.ctx, stk.identifier(),
@@ -444,6 +452,7 @@ resources:
'myother.yaml': 'myother'}
with mock.patch('heat.engine.stack.Stack') as mock_stack:
stk.update = mock.Mock()
+ self.patchobject(service, 'NotifyEvent')
mock_stack.load.return_value = stk
mock_stack.validate.return_value = None
result = self.man.update_stack(self.ctx, stk.identifier(),
@@ -490,6 +499,7 @@ resources:
'resource_registry': {'resources': {}}}
with mock.patch('heat.engine.stack.Stack') as mock_stack:
stk.update = mock.Mock()
+ self.patchobject(service, 'NotifyEvent')
mock_stack.load.return_value = stk
mock_stack.validate.return_value = None
result = self.man.update_stack(self.ctx, stk.identifier(),
@@ -890,6 +900,7 @@ resources:
stack.status = stack.COMPLETE
with mock.patch('heat.engine.stack.Stack') as mock_stack:
+ self.patchobject(service, 'NotifyEvent')
mock_stack.load.return_value = stack
mock_stack.validate.return_value = None
result = self.man.update_stack(self.ctx, stack.identifier(),
diff --git a/releasenotes/notes/legacy-client-races-ba7a60cef5ec1694.yaml b/releasenotes/notes/legacy-client-races-ba7a60cef5ec1694.yaml
new file mode 100644
index 000000000..6371f2c09
--- /dev/null
+++ b/releasenotes/notes/legacy-client-races-ba7a60cef5ec1694.yaml
@@ -0,0 +1,12 @@
+---
+fixes:
+ - |
+ Previously, the suspend, resume, and check API calls for all stacks, and
+ the update, restore, and delete API calls for non-convergence stacks,
+ returned immediately after starting the stack operation. This meant that
+ for a client reading the state immediately when performing the same
+ operation twice in a row, it could have misinterpreted a previous state as
+ the latest unless careful reference were made to the updated_at timestamp.
+ Stacks are now guaranteed to have moved to the ``IN_PROGRESS`` state before
+ any of these APIs return (except in the case of deleting a non-convergence
+ stack where another operation was already in progress).