summaryrefslogtreecommitdiff
path: root/heat/engine/stack_lock.py
blob: 627083b751bd3a8e111358de520d5b6bdc54c4ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import contextlib
import uuid

from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_utils import excutils

from heat.common import exception
from heat.common.i18n import _LI
from heat.common.i18n import _LW
from heat.common import messaging as rpc_messaging
from heat.objects import stack_lock as stack_lock_object
from heat.rpc import api as rpc_api

cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config')

LOG = logging.getLogger(__name__)


class StackLock(object):
    def __init__(self, context, stack, engine_id):
        self.context = context
        self.stack = stack
        self.engine_id = engine_id
        self.listener = None

    @staticmethod
    def engine_alive(context, engine_id):
        client = rpc_messaging.get_rpc_client(
            version='1.0', topic=rpc_api.LISTENER_TOPIC,
            server=engine_id)
        client_context = client.prepare(
            timeout=cfg.CONF.engine_life_check_timeout)
        try:
            return client_context.call(context, 'listening')
        except messaging.MessagingTimeout:
            return False

    @staticmethod
    def generate_engine_id():
        return str(uuid.uuid4())

    def get_engine_id(self):
        return stack_lock_object.StackLock.get_engine_id(self.stack.id)

    def try_acquire(self):
        """
        Try to acquire a stack lock, but don't raise an ActionInProgress
        exception or try to steal lock.
        """
        return stack_lock_object.StackLock.create(self.stack.id,
                                                  self.engine_id)

    def acquire(self, retry=True):
        """
        Acquire a lock on the stack.

        :param retry: When True, retry if lock was released while stealing.
        :type retry: boolean
        """
        lock_engine_id = stack_lock_object.StackLock.create(self.stack.id,
                                                            self.engine_id)
        if lock_engine_id is None:
            LOG.debug("Engine %(engine)s acquired lock on stack "
                      "%(stack)s" % {'engine': self.engine_id,
                                     'stack': self.stack.id})
            return

        if (lock_engine_id == self.engine_id or
                self.engine_alive(self.context, lock_engine_id)):
            LOG.debug("Lock on stack %(stack)s is owned by engine "
                      "%(engine)s" % {'stack': self.stack.id,
                                      'engine': lock_engine_id})
            raise exception.ActionInProgress(stack_name=self.stack.name,
                                             action=self.stack.action)
        else:
            LOG.info(_LI("Stale lock detected on stack %(stack)s.  Engine "
                         "%(engine)s will attempt to steal the lock"),
                     {'stack': self.stack.id, 'engine': self.engine_id})

            result = stack_lock_object.StackLock.steal(self.stack.id,
                                                       lock_engine_id,
                                                       self.engine_id)

            if result is None:
                LOG.info(_LI("Engine %(engine)s successfully stole the lock "
                             "on stack %(stack)s"),
                         {'engine': self.engine_id,
                          'stack': self.stack.id})
                return
            elif result is True:
                if retry:
                    LOG.info(_LI("The lock on stack %(stack)s was released "
                                 "while engine %(engine)s was stealing it. "
                                 "Trying again"), {'stack': self.stack.id,
                                                   'engine': self.engine_id})
                    return self.acquire(retry=False)
            else:
                new_lock_engine_id = result
                LOG.info(_LI("Failed to steal lock on stack %(stack)s. "
                             "Engine %(engine)s stole the lock first"),
                         {'stack': self.stack.id,
                          'engine': new_lock_engine_id})

            raise exception.ActionInProgress(
                stack_name=self.stack.name, action=self.stack.action)

    def release(self, stack_id):
        """Release a stack lock."""
        # Only the engine that owns the lock will be releasing it.
        result = stack_lock_object.StackLock.release(stack_id,
                                                     self.engine_id)
        if result is True:
            LOG.warn(_LW("Lock was already released on stack %s!"), stack_id)
        else:
            LOG.debug("Engine %(engine)s released lock on stack "
                      "%(stack)s" % {'engine': self.engine_id,
                                     'stack': stack_id})

    @contextlib.contextmanager
    def thread_lock(self, stack_id, retry=True):
        """
        Acquire a lock and release it only if there is an exception.  The
        release method still needs to be scheduled to be run at the
        end of the thread using the Thread.link method.

        :param retry: When True, retry if lock was released while stealing.
        :type retry: boolean
        """
        try:
            self.acquire(retry)
            yield
        except exception.ActionInProgress:
            raise
        except:  # noqa
            with excutils.save_and_reraise_exception():
                self.release(stack_id)

    @contextlib.contextmanager
    def try_thread_lock(self, stack_id):
        """
        Similar to thread_lock, but acquire the lock using try_acquire
        and only release it upon any exception after a successful
        acquisition.
        """
        result = None
        try:
            result = self.try_acquire()
            yield result
        except:  # noqa
            if result is None:  # Lock was successfully acquired
                with excutils.save_and_reraise_exception():
                    self.release(stack_id)
            raise