diff options
author | Zuul <zuul@review.opendev.org> | 2023-04-07 17:46:50 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2023-04-07 17:46:50 +0000 |
commit | cec8eb12c628a38b2431dac78ed37fc3f1506939 (patch) | |
tree | f613061baee36e32de818128e96f6d5742112585 /heat/db/api.py | |
parent | 1ca04117b9c3bff0b338df97df2c26d6a70929ea (diff) | |
parent | 43a5f3984e433ec28616cfe98cb060d9ff51af58 (diff) | |
download | heat-cec8eb12c628a38b2431dac78ed37fc3f1506939.tar.gz |
Merge "db: Remove layer of indirection"
Diffstat (limited to 'heat/db/api.py')
-rw-r--r-- | heat/db/api.py | 1971 |
1 files changed, 1971 insertions, 0 deletions
diff --git a/heat/db/api.py b/heat/db/api.py new file mode 100644 index 000000000..8b2c99967 --- /dev/null +++ b/heat/db/api.py @@ -0,0 +1,1971 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Implementation of SQLAlchemy backend.""" +import datetime +import functools +import itertools +import random + +from oslo_config import cfg +from oslo_db import api as oslo_db_api +from oslo_db import exception as db_exception +from oslo_db import options +from oslo_db.sqlalchemy import enginefacade +from oslo_db.sqlalchemy import utils +from oslo_log import log as logging +from oslo_utils import encodeutils +from oslo_utils import excutils +from oslo_utils import timeutils +import osprofiler.sqlalchemy +import sqlalchemy +from sqlalchemy import and_ +from sqlalchemy import func +from sqlalchemy import or_ +from sqlalchemy import orm +from sqlalchemy.orm import aliased as orm_aliased + +from heat.common import crypt +from heat.common import exception +from heat.common.i18n import _ +from heat.db import filters as db_filters +from heat.db import models +from heat.db import utils as db_utils +from heat.engine import environment as heat_environment +from heat.rpc import api as rpc_api + +CONF = cfg.CONF +CONF.import_opt('hidden_stack_tags', 'heat.common.config') +CONF.import_opt('max_events_per_stack', 'heat.common.config') +CONF.import_group('profiler', 'heat.common.config') +CONF.import_opt('db_max_retries', 'oslo_db.options', group='database') +CONF.import_opt('db_retry_interval', 'oslo_db.options', group='database') +CONF.import_opt( + 'db_inc_retry_interval', 'oslo_db.options', group='database') +CONF.import_opt( + 'db_max_retry_interval', 'oslo_db.options', group='database') + +options.set_defaults(CONF) + +_facade = None +db_context = enginefacade.transaction_context() + +LOG = logging.getLogger(__name__) + + +# TODO(sbaker): fix tests so that sqlite_fk=True can be passed to configure +# FIXME(stephenfin): we need to remove reliance on autocommit semantics ASAP +# since it's not compatible with SQLAlchemy 2.0 +db_context.configure(__autocommit=True) + + +def get_facade(): + global _facade + if _facade is None: + + # FIXME: get_facade() is called by the test suite startup, + # but will not be called normally for API calls. + # osprofiler / oslo_db / enginefacade currently don't have hooks + # to talk to each other, however one needs to be added to oslo.db + # to allow access to the Engine once constructed. + db_context.configure(**CONF.database) + _facade = db_context.get_legacy_facade() + if CONF.profiler.enabled: + if CONF.profiler.trace_sqlalchemy: + osprofiler.sqlalchemy.add_tracing(sqlalchemy, + _facade.get_engine(), + "db") + return _facade + + +def get_engine(): + return get_facade().get_engine() + + +def get_session(): + return get_facade().get_session() + + +def retry_on_db_error(func): + @functools.wraps(func) + def try_func(context, *args, **kwargs): + if (context.session.transaction is None or + not context.session.autocommit): + wrapped = oslo_db_api.wrap_db_retry( + max_retries=CONF.database.db_max_retries, + retry_on_deadlock=True, + retry_on_disconnect=True, + retry_interval=CONF.database.db_retry_interval, + inc_retry_interval=CONF.database.db_inc_retry_interval, + max_retry_interval=CONF.database.db_max_retry_interval)(func) + return wrapped(context, *args, **kwargs) + else: + try: + return func(context, *args, **kwargs) + except (db_exception.DBDeadlock, db_exception.DBConnectionError): + with excutils.save_and_reraise_exception(): + LOG.debug('Not retrying on DBDeadlock and ' + 'DBConnectionError because ' + 'transaction not closed') + return try_func + + +def update_and_save(context, obj, values): + with context.session.begin(subtransactions=True): + for k, v in values.items(): + setattr(obj, k, v) + + +def delete_softly(context, obj): + """Mark this object as deleted.""" + update_and_save(context, obj, {'deleted_at': timeutils.utcnow()}) + + +def soft_delete_aware_query(context, *args, **kwargs): + """Stack query helper that accounts for context's `show_deleted` field. + + :param show_deleted: if True, overrides context's show_deleted field. + """ + + query = context.session.query(*args) + show_deleted = kwargs.get('show_deleted') or context.show_deleted + + if not show_deleted: + query = query.filter_by(deleted_at=None) + return query + + +def raw_template_get(context, template_id): + result = context.session.query(models.RawTemplate).get(template_id) + + if not result: + raise exception.NotFound(_('raw template with id %s not found') % + template_id) + return result + + +def raw_template_create(context, values): + raw_template_ref = models.RawTemplate() + raw_template_ref.update(values) + raw_template_ref.save(context.session) + return raw_template_ref + + +def raw_template_update(context, template_id, values): + raw_template_ref = raw_template_get(context, template_id) + # get only the changed values + values = dict((k, v) for k, v in values.items() + if getattr(raw_template_ref, k) != v) + + if values: + update_and_save(context, raw_template_ref, values) + + return raw_template_ref + + +def raw_template_delete(context, template_id): + try: + raw_template = raw_template_get(context, template_id) + except exception.NotFound: + # Ignore not found + return + raw_tmpl_files_id = raw_template.files_id + session = context.session + with session.begin(subtransactions=True): + session.delete(raw_template) + if raw_tmpl_files_id is None: + return + # If no other raw_template is referencing the same raw_template_files, + # delete that too + if session.query(models.RawTemplate).filter_by( + files_id=raw_tmpl_files_id).first() is None: + try: + raw_tmpl_files = raw_template_files_get( + context, raw_tmpl_files_id) + except exception.NotFound: + # Ignore not found + return + session.delete(raw_tmpl_files) + + +def raw_template_files_create(context, values): + session = context.session + raw_templ_files_ref = models.RawTemplateFiles() + raw_templ_files_ref.update(values) + with session.begin(): + raw_templ_files_ref.save(session) + return raw_templ_files_ref + + +def raw_template_files_get(context, files_id): + result = context.session.query(models.RawTemplateFiles).get(files_id) + if not result: + raise exception.NotFound( + _("raw_template_files with files_id %d not found") % + files_id) + return result + + +def resource_get(context, resource_id, refresh=False, refresh_data=False, + eager=True): + query = context.session.query(models.Resource) + query = query.options(orm.joinedload("data")) + if eager: + query = query.options(orm.joinedload("rsrc_prop_data")) + + result = query.get(resource_id) + if not result: + raise exception.NotFound(_("resource with id %s not found") % + resource_id) + if refresh: + context.session.refresh(result) + if refresh_data: + # ensure data is loaded (lazy or otherwise) + result.data + + return result + + +def resource_get_by_name_and_stack(context, resource_name, stack_id): + result = context.session.query( + models.Resource + ).filter_by( + name=resource_name + ).filter_by( + stack_id=stack_id + ).options(orm.joinedload("data")).first() + return result + + +def resource_get_all_by_physical_resource_id(context, physical_resource_id): + results = (context.session.query(models.Resource) + .filter_by(physical_resource_id=physical_resource_id) + .all()) + + for result in results: + if context is None or context.is_admin or context.tenant_id in ( + result.stack.tenant, result.stack.stack_user_project_id): + yield result + + +def resource_get_by_physical_resource_id(context, physical_resource_id): + results = resource_get_all_by_physical_resource_id(context, + physical_resource_id) + try: + return next(results) + except StopIteration: + return None + + +def resource_get_all(context): + results = context.session.query(models.Resource).all() + + if not results: + raise exception.NotFound(_('no resources were found')) + return results + + +@retry_on_db_error +def resource_purge_deleted(context, stack_id): + filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'} + query = context.session.query(models.Resource) + result = query.filter_by(**filters) + attr_ids = [r.attr_data_id for r in result if r.attr_data_id is not None] + with context.session.begin(): + result.delete() + if attr_ids: + context.session.query(models.ResourcePropertiesData).filter( + models.ResourcePropertiesData.id.in_(attr_ids)).delete( + synchronize_session=False) + + +def _add_atomic_key_to_values(values, atomic_key): + if atomic_key is None: + values['atomic_key'] = 1 + else: + values['atomic_key'] = atomic_key + 1 + + +@retry_on_db_error +def resource_update(context, resource_id, values, atomic_key, + expected_engine_id=None): + return _try_resource_update(context, resource_id, values, atomic_key, + expected_engine_id) + + +def _try_resource_update(context, resource_id, values, atomic_key, + expected_engine_id=None): + session = context.session + with session.begin(subtransactions=True): + _add_atomic_key_to_values(values, atomic_key) + rows_updated = session.query(models.Resource).filter_by( + id=resource_id, engine_id=expected_engine_id, + atomic_key=atomic_key).update(values) + + return bool(rows_updated) + + +def resource_update_and_save(context, resource_id, values): + resource = context.session.query(models.Resource).get(resource_id) + update_and_save(context, resource, values) + + +def resource_delete(context, resource_id): + session = context.session + with session.begin(subtransactions=True): + resource = session.query(models.Resource).get(resource_id) + if resource: + session.delete(resource) + if resource.attr_data_id is not None: + attr_prop_data = session.query( + models.ResourcePropertiesData).get(resource.attr_data_id) + session.delete(attr_prop_data) + + +def resource_attr_id_set(context, resource_id, atomic_key, attr_id): + session = context.session + with session.begin(): + values = {'attr_data_id': attr_id} + _add_atomic_key_to_values(values, atomic_key) + rows_updated = session.query(models.Resource).filter(and_( + models.Resource.id == resource_id, + models.Resource.atomic_key == atomic_key, + models.Resource.engine_id.is_(None), + or_(models.Resource.attr_data_id == attr_id, + models.Resource.attr_data_id.is_(None)))).update( + values) + if rows_updated > 0: + return True + else: + # Someone else set the attr_id first and/or we have a stale + # view of the resource based on atomic_key, so delete the + # resource_properties_data (attr) DB row. + LOG.debug('Not updating res_id %(rid)s with attr_id %(aid)s', + {'rid': resource_id, 'aid': attr_id}) + session.query( + models.ResourcePropertiesData).filter( + models.ResourcePropertiesData.id == attr_id).delete() + return False + + +def resource_attr_data_delete(context, resource_id, attr_id): + session = context.session + with session.begin(): + resource = session.query(models.Resource).get(resource_id) + attr_prop_data = session.query( + models.ResourcePropertiesData).get(attr_id) + if resource: + resource.update({'attr_data_id': None}) + if attr_prop_data: + session.delete(attr_prop_data) + + +def resource_data_get_all(context, resource_id, data=None): + """Looks up resource_data by resource.id. + + If data is encrypted, this method will decrypt the results. + """ + if data is None: + data = (context.session.query(models.ResourceData) + .filter_by(resource_id=resource_id)).all() + + if not data: + raise exception.NotFound(_('no resource data found')) + + ret = {} + + for res in data: + if res.redact: + try: + ret[res.key] = crypt.decrypt(res.decrypt_method, res.value) + continue + except exception.InvalidEncryptionKey: + LOG.exception('Failed to decrypt resource data %(rkey)s ' + 'for %(rid)s, ignoring.', + {'rkey': res.key, 'rid': resource_id}) + ret[res.key] = res.value + return ret + + +def resource_data_get(context, resource_id, key): + """Lookup value of resource's data by key. + + Decrypts resource data if necessary. + """ + result = resource_data_get_by_key(context, + resource_id, + key) + if result.redact: + return crypt.decrypt(result.decrypt_method, result.value) + return result.value + + +def stack_tags_set(context, stack_id, tags): + session = context.session + with session.begin(): + stack_tags_delete(context, stack_id) + result = [] + for tag in tags: + stack_tag = models.StackTag() + stack_tag.tag = tag + stack_tag.stack_id = stack_id + stack_tag.save(session=session) + result.append(stack_tag) + return result or None + + +def stack_tags_delete(context, stack_id): + session = context.session + with session.begin(subtransactions=True): + result = stack_tags_get(context, stack_id) + if result: + for tag in result: + session.delete(tag) + + +def stack_tags_get(context, stack_id): + result = (context.session.query(models.StackTag) + .filter_by(stack_id=stack_id) + .all()) + return result or None + + +def resource_data_get_by_key(context, resource_id, key): + """Looks up resource_data by resource_id and key. + + Does not decrypt resource_data. + """ + result = (context.session.query(models.ResourceData) + .filter_by(resource_id=resource_id) + .filter_by(key=key).first()) + + if not result: + raise exception.NotFound(_('No resource data found')) + return result + + +def resource_data_set(context, resource_id, key, value, redact=False): + """Save resource's key/value pair to database.""" + if redact: + method, value = crypt.encrypt(value) + else: + method = '' + try: + current = resource_data_get_by_key(context, resource_id, key) + except exception.NotFound: + current = models.ResourceData() + current.key = key + current.resource_id = resource_id + current.redact = redact + current.value = value + current.decrypt_method = method + current.save(session=context.session) + return current + + +def resource_exchange_stacks(context, resource_id1, resource_id2): + query = context.session.query(models.Resource) + session = query.session + + with session.begin(): + res1 = query.get(resource_id1) + res2 = query.get(resource_id2) + + res1.stack, res2.stack = res2.stack, res1.stack + + +def resource_data_delete(context, resource_id, key): + result = resource_data_get_by_key(context, resource_id, key) + session = context.session + with session.begin(): + session.delete(result) + + +def resource_create(context, values): + resource_ref = models.Resource() + resource_ref.update(values) + resource_ref.save(context.session) + return resource_ref + + +@retry_on_db_error +def resource_create_replacement(context, + existing_res_id, + new_res_values, + atomic_key, expected_engine_id=None): + session = context.session + try: + with session.begin(): + new_res = resource_create(context, new_res_values) + update_data = {'replaced_by': new_res.id} + if not _try_resource_update(context, + existing_res_id, update_data, + atomic_key, + expected_engine_id=expected_engine_id): + data = {} + if 'name' in new_res_values: + data['resource_name'] = new_res_values['name'] + raise exception.UpdateInProgress(**data) + except db_exception.DBReferenceError as exc: + # New template_id no longer exists + LOG.debug('Not creating replacement resource: %s', exc) + return None + else: + return new_res + + +def resource_get_all_by_stack(context, stack_id, filters=None): + query = context.session.query( + models.Resource + ).filter_by( + stack_id=stack_id + ).options(orm.joinedload("data")).options(orm.joinedload("rsrc_prop_data")) + + query = db_filters.exact_filter(query, models.Resource, filters) + results = query.all() + + return dict((res.name, res) for res in results) + + +def resource_get_all_active_by_stack(context, stack_id): + filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'} + subquery = context.session.query(models.Resource.id).filter_by(**filters) + + results = context.session.query(models.Resource).filter_by( + stack_id=stack_id).filter( + models.Resource.id.notin_(subquery.scalar_subquery()) + ).options(orm.joinedload("data")).all() + + return dict((res.id, res) for res in results) + + +def resource_get_all_by_root_stack(context, stack_id, filters=None, + stack_id_only=False): + query = context.session.query( + models.Resource + ).filter_by( + root_stack_id=stack_id + ) + + if stack_id_only: + query = query.options(orm.load_only("id", "stack_id")) + else: + query = query.options(orm.joinedload("data")).options( + orm.joinedload("rsrc_prop_data")) + + query = db_filters.exact_filter(query, models.Resource, filters) + results = query.all() + + return dict((res.id, res) for res in results) + + +def engine_get_all_locked_by_stack(context, stack_id): + query = context.session.query( + func.distinct(models.Resource.engine_id) + ).filter( + models.Resource.stack_id == stack_id, + models.Resource.engine_id.isnot(None)) + return set(i[0] for i in query.all()) + + +def resource_prop_data_create_or_update(context, values, rpd_id=None): + obj_ref = None + if rpd_id is not None: + obj_ref = context.session.query( + models.ResourcePropertiesData).filter_by(id=rpd_id).first() + if obj_ref is None: + obj_ref = models.ResourcePropertiesData() + obj_ref.update(values) + obj_ref.save(context.session) + return obj_ref + + +def resource_prop_data_create(context, values): + return resource_prop_data_create_or_update(context, values) + + +def resource_prop_data_get(context, resource_prop_data_id): + result = context.session.query(models.ResourcePropertiesData).get( + resource_prop_data_id) + if result is None: + raise exception.NotFound( + _('ResourcePropertiesData with id %s not found') % + resource_prop_data_id) + return result + + +def stack_get_by_name_and_owner_id(context, stack_name, owner_id): + query = soft_delete_aware_query( + context, models.Stack + ).options(orm.joinedload("raw_template")).filter(sqlalchemy.or_( + models.Stack.tenant == context.tenant_id, + models.Stack.stack_user_project_id == context.tenant_id) + ).filter_by(name=stack_name).filter_by(owner_id=owner_id) + return query.first() + + +def stack_get_by_name(context, stack_name): + query = soft_delete_aware_query( + context, models.Stack + ).filter(sqlalchemy.or_( + models.Stack.tenant == context.tenant_id, + models.Stack.stack_user_project_id == context.tenant_id) + ).filter_by(name=stack_name) + return query.order_by(models.Stack.created_at).first() + + +def stack_get(context, stack_id, show_deleted=False, eager_load=True): + query = context.session.query(models.Stack) + if eager_load: + query = query.options(orm.joinedload("raw_template")) + result = query.get(stack_id) + + deleted_ok = show_deleted or context.show_deleted + if result is None or result.deleted_at is not None and not deleted_ok: + return None + + # One exception to normal project scoping is users created by the + # stacks in the stack_user_project_id (in the heat stack user domain) + if (result is not None + and context is not None and not context.is_admin + and context.tenant_id not in (result.tenant, + result.stack_user_project_id)): + return None + return result + + +def stack_get_status(context, stack_id): + query = context.session.query(models.Stack) + query = query.options( + orm.load_only("action", "status", "status_reason", "updated_at")) + result = query.filter_by(id=stack_id).first() + if result is None: + raise exception.NotFound(_('Stack with id %s not found') % stack_id) + + return (result.action, result.status, result.status_reason, + result.updated_at) + + +def stack_get_all_by_owner_id(context, owner_id): + results = soft_delete_aware_query( + context, models.Stack).filter_by(owner_id=owner_id, + backup=False).all() + return results + + +def stack_get_all_by_root_owner_id(context, owner_id): + for stack in stack_get_all_by_owner_id(context, owner_id): + yield stack + for ch_st in stack_get_all_by_root_owner_id(context, stack.id): + yield ch_st + + +def _get_sort_keys(sort_keys, mapping): + """Returns an array containing only allowed keys + + :param sort_keys: an array of strings + :param mapping: a mapping from keys to DB column names + :returns: filtered list of sort keys + """ + if isinstance(sort_keys, str): + sort_keys = [sort_keys] + return [mapping[key] for key in sort_keys or [] if key in mapping] + + +def _paginate_query(context, query, model, limit=None, sort_keys=None, + marker=None, sort_dir=None): + default_sort_keys = ['created_at'] + if not sort_keys: + sort_keys = default_sort_keys + if not sort_dir: + sort_dir = 'desc' + + # This assures the order of the stacks will always be the same + # even for sort_key values that are not unique in the database + sort_keys = sort_keys + ['id'] + + model_marker = None + if marker: + model_marker = context.session.query(model).get(marker) + try: + query = utils.paginate_query(query, model, limit, sort_keys, + model_marker, sort_dir) + except utils.InvalidSortKey as exc: + err_msg = encodeutils.exception_to_unicode(exc) + raise exception.Invalid(reason=err_msg) + return query + + +def _query_stack_get_all(context, show_deleted=False, + show_nested=False, show_hidden=False, tags=None, + tags_any=None, not_tags=None, not_tags_any=None): + if show_nested: + query = soft_delete_aware_query( + context, models.Stack, show_deleted=show_deleted + ).filter_by(backup=False) + else: + query = soft_delete_aware_query( + context, models.Stack, show_deleted=show_deleted + ).filter_by(owner_id=None) + + if not context.is_admin: + query = query.filter_by(tenant=context.tenant_id) + + query = query.options(orm.subqueryload("tags")) + if tags: + for tag in tags: + tag_alias = orm_aliased(models.StackTag) + query = query.join(tag_alias, models.Stack.tags) + query = query.filter(tag_alias.tag == tag) + + if tags_any: + query = query.filter( + models.Stack.tags.any( + models.StackTag.tag.in_(tags_any))) + + if not_tags: + subquery = soft_delete_aware_query( + context, models.Stack, show_deleted=show_deleted + ) + for tag in not_tags: + tag_alias = orm_aliased(models.StackTag) + subquery = subquery.join(tag_alias, models.Stack.tags) + subquery = subquery.filter(tag_alias.tag == tag) + not_stack_ids = [s.id for s in subquery.all()] + query = query.filter(models.Stack.id.notin_(not_stack_ids)) + + if not_tags_any: + query = query.filter( + ~models.Stack.tags.any( + models.StackTag.tag.in_(not_tags_any))) + + if not show_hidden and cfg.CONF.hidden_stack_tags: + query = query.filter( + ~models.Stack.tags.any( + models.StackTag.tag.in_(cfg.CONF.hidden_stack_tags))) + + return query + + +def stack_get_all(context, limit=None, sort_keys=None, marker=None, + sort_dir=None, filters=None, + show_deleted=False, show_nested=False, show_hidden=False, + tags=None, tags_any=None, not_tags=None, + not_tags_any=None, eager_load=False): + query = _query_stack_get_all(context, + show_deleted=show_deleted, + show_nested=show_nested, + show_hidden=show_hidden, tags=tags, + tags_any=tags_any, not_tags=not_tags, + not_tags_any=not_tags_any) + if eager_load: + query = query.options(orm.joinedload("raw_template")) + return _filter_and_page_query(context, query, limit, sort_keys, + marker, sort_dir, filters).all() + + +def _filter_and_page_query(context, query, limit=None, sort_keys=None, + marker=None, sort_dir=None, filters=None): + if filters is None: + filters = {} + + sort_key_map = {rpc_api.STACK_NAME: models.Stack.name.key, + rpc_api.STACK_STATUS: models.Stack.status.key, + rpc_api.STACK_CREATION_TIME: models.Stack.created_at.key, + rpc_api.STACK_UPDATED_TIME: models.Stack.updated_at.key} + valid_sort_keys = _get_sort_keys(sort_keys, sort_key_map) + + query = db_filters.exact_filter(query, models.Stack, filters) + return _paginate_query(context, query, models.Stack, limit, + valid_sort_keys, marker, sort_dir) + + +def stack_count_all(context, filters=None, + show_deleted=False, show_nested=False, show_hidden=False, + tags=None, tags_any=None, not_tags=None, + not_tags_any=None): + query = _query_stack_get_all(context, + show_deleted=show_deleted, + show_nested=show_nested, + show_hidden=show_hidden, tags=tags, + tags_any=tags_any, not_tags=not_tags, + not_tags_any=not_tags_any) + query = db_filters.exact_filter(query, models.Stack, filters) + return query.count() + + +def stack_create(context, values): + stack_ref = models.Stack() + stack_ref.update(values) + stack_name = stack_ref.name + stack_ref.save(context.session) + + # Even though we just created a stack with this name, we may not find + # it again because some unit tests create stacks with deleted_at set. Also + # some backup stacks may not be found, for reasons that are unclear. + earliest = stack_get_by_name(context, stack_name) + if earliest is not None and earliest.id != stack_ref.id: + context.session.query(models.Stack).filter_by(id=stack_ref.id).delete() + raise exception.StackExists(stack_name=stack_name) + + return stack_ref + + +@retry_on_db_error +def stack_update(context, stack_id, values, exp_trvsl=None): + session = context.session + with session.begin(subtransactions=True): + query = (session.query(models.Stack) + .filter(and_(models.Stack.id == stack_id), + (models.Stack.deleted_at.is_(None)))) + if not context.is_admin: + query = query.filter(sqlalchemy.or_( + models.Stack.tenant == context.tenant_id, + models.Stack.stack_user_project_id == context.tenant_id)) + if exp_trvsl is not None: + query = query.filter(models.Stack.current_traversal == exp_trvsl) + rows_updated = query.update(values, synchronize_session=False) + if not rows_updated: + LOG.debug('Did not set stack state with values ' + '%(vals)s, stack id: %(id)s with ' + 'expected traversal: %(trav)s', + {'id': stack_id, 'vals': str(values), + 'trav': str(exp_trvsl)}) + if not stack_get(context, stack_id, eager_load=False): + raise exception.NotFound( + _('Attempt to update a stack with id: ' + '%(id)s %(msg)s') % { + 'id': stack_id, + 'msg': 'that does not exist'}) + session.expire_all() + return (rows_updated is not None and rows_updated > 0) + + +def stack_delete(context, stack_id): + s = stack_get(context, stack_id, eager_load=False) + if not s: + raise exception.NotFound(_('Attempt to delete a stack with id: ' + '%(id)s %(msg)s') % { + 'id': stack_id, + 'msg': 'that does not exist'}) + session = context.session + with session.begin(): + attr_ids = [] + # normally the resources are deleted already by this point + for r in s.resources: + if r.attr_data_id is not None: + attr_ids.append(r.attr_data_id) + session.delete(r) + if attr_ids: + session.query( + models.ResourcePropertiesData.id).filter( + models.ResourcePropertiesData.id.in_(attr_ids)).delete( + synchronize_session=False) + delete_softly(context, s) + + +def _is_duplicate_error(exc): + return isinstance(exc, db_exception.DBDuplicateEntry) + + +@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True, + retry_on_disconnect=True, + retry_interval=0.5, + inc_retry_interval=True, + exception_checker=_is_duplicate_error) +def stack_lock_create(context, stack_id, engine_id): + with db_context.writer.independent.using(context) as session: + lock = session.query(models.StackLock).get(stack_id) + if lock is not None: + return lock.engine_id + session.add(models.StackLock(stack_id=stack_id, engine_id=engine_id)) + + +def stack_lock_get_engine_id(context, stack_id): + with db_context.reader.independent.using(context) as session: + lock = session.query(models.StackLock).get(stack_id) + if lock is not None: + return lock.engine_id + + +def persist_state_and_release_lock(context, stack_id, engine_id, values): + session = context.session + with session.begin(): + rows_updated = (session.query(models.Stack) + .filter(models.Stack.id == stack_id) + .update(values, synchronize_session=False)) + rows_affected = None + if rows_updated is not None and rows_updated > 0: + rows_affected = session.query( + models.StackLock + ).filter_by(stack_id=stack_id, engine_id=engine_id).delete() + session.expire_all() + if not rows_affected: + return True + + +def stack_lock_steal(context, stack_id, old_engine_id, new_engine_id): + with db_context.writer.independent.using(context) as session: + lock = session.query(models.StackLock).get(stack_id) + rows_affected = session.query( + models.StackLock + ).filter_by(stack_id=stack_id, engine_id=old_engine_id + ).update({"engine_id": new_engine_id}) + if not rows_affected: + return lock.engine_id if lock is not None else True + + +def stack_lock_release(context, stack_id, engine_id): + with db_context.writer.independent.using(context) as session: + rows_affected = session.query( + models.StackLock + ).filter_by(stack_id=stack_id, engine_id=engine_id).delete() + if not rows_affected: + return True + + +def stack_get_root_id(context, stack_id): + s = stack_get(context, stack_id, eager_load=False) + if not s: + return None + while s.owner_id: + s = stack_get(context, s.owner_id, eager_load=False) + return s.id + + +def stack_count_total_resources(context, stack_id): + # count all resources which belong to the root stack + return context.session.query( + func.count(models.Resource.id) + ).filter_by(root_stack_id=stack_id).scalar() + + +def user_creds_create(context): + values = context.to_dict() + user_creds_ref = models.UserCreds() + if values.get('trust_id'): + method, trust_id = crypt.encrypt(values.get('trust_id')) + user_creds_ref.trust_id = trust_id + user_creds_ref.decrypt_method = method + user_creds_ref.trustor_user_id = values.get('trustor_user_id') + user_creds_ref.username = None + user_creds_ref.password = None + user_creds_ref.tenant = values.get('tenant') + user_creds_ref.tenant_id = values.get('tenant_id') + user_creds_ref.auth_url = values.get('auth_url') + user_creds_ref.region_name = values.get('region_name') + else: + user_creds_ref.update(values) + method, password = crypt.encrypt(values['password']) + if len(str(password)) > 255: + raise exception.Error(_("Length of OS_PASSWORD after encryption" + " exceeds Heat limit (255 chars)")) + user_creds_ref.password = password + user_creds_ref.decrypt_method = method + user_creds_ref.save(context.session) + result = dict(user_creds_ref) + + if values.get('trust_id'): + result['trust_id'] = values.get('trust_id') + else: + result['password'] = values.get('password') + + return result + + +def user_creds_get(context, user_creds_id): + db_result = context.session.query(models.UserCreds).get(user_creds_id) + if db_result is None: + return None + # Return a dict copy of DB results, do not decrypt details into db_result + # or it can be committed back to the DB in decrypted form + result = dict(db_result) + del result['decrypt_method'] + result['password'] = crypt.decrypt( + db_result.decrypt_method, result['password']) + result['trust_id'] = crypt.decrypt( + db_result.decrypt_method, result['trust_id']) + return result + + +@db_utils.retry_on_stale_data_error +def user_creds_delete(context, user_creds_id): + creds = context.session.query(models.UserCreds).get(user_creds_id) + if not creds: + raise exception.NotFound( + _('Attempt to delete user creds with id ' + '%(id)s that does not exist') % {'id': user_creds_id}) + with context.session.begin(): + context.session.delete(creds) + + +def event_get_all_by_tenant(context, limit=None, marker=None, + sort_keys=None, sort_dir=None, filters=None): + query = context.session.query(models.Event) + query = db_filters.exact_filter(query, models.Event, filters) + query = query.join( + models.Event.stack + ).filter_by(tenant=context.tenant_id).filter_by(deleted_at=None) + filters = None + return _events_filter_and_page_query(context, query, limit, marker, + sort_keys, sort_dir, filters).all() + + +def _query_all_events_by_stack(context, stack_id): + return context.session.query(models.Event).filter_by(stack_id=stack_id) + + +def event_get_all_by_stack(context, stack_id, limit=None, marker=None, + sort_keys=None, sort_dir=None, filters=None): + query = _query_all_events_by_stack(context, stack_id) + if filters and 'uuid' in filters: + # retrieving a single event, so eager load its rsrc_prop_data detail + query = query.options(orm.joinedload("rsrc_prop_data")) + return _events_filter_and_page_query(context, query, limit, marker, + sort_keys, sort_dir, filters).all() + + +def _events_paginate_query(context, query, model, limit=None, sort_keys=None, + marker=None, sort_dir=None): + default_sort_keys = ['created_at'] + if not sort_keys: + sort_keys = default_sort_keys + if not sort_dir: + sort_dir = 'desc' + + # This assures the order of the stacks will always be the same + # even for sort_key values that are not unique in the database + sort_keys = sort_keys + ['id'] + + model_marker = None + if marker: + # not to use context.session.query(model).get(marker), because + # user can only see the ID(column 'uuid') and the ID as the marker + model_marker = context.session.query( + model).filter_by(uuid=marker).first() + try: + query = utils.paginate_query(query, model, limit, sort_keys, + model_marker, sort_dir) + except utils.InvalidSortKey as exc: + err_msg = encodeutils.exception_to_unicode(exc) + raise exception.Invalid(reason=err_msg) + + return query + + +def _events_filter_and_page_query(context, query, + limit=None, marker=None, + sort_keys=None, sort_dir=None, + filters=None): + if filters is None: + filters = {} + + sort_key_map = {rpc_api.EVENT_TIMESTAMP: models.Event.created_at.key, + rpc_api.EVENT_RES_TYPE: models.Event.resource_type.key} + valid_sort_keys = _get_sort_keys(sort_keys, sort_key_map) + + query = db_filters.exact_filter(query, models.Event, filters) + + return _events_paginate_query(context, query, models.Event, limit, + valid_sort_keys, marker, sort_dir) + + +def event_count_all_by_stack(context, stack_id): + query = context.session.query(func.count(models.Event.id)) + return query.filter_by(stack_id=stack_id).scalar() + + +def _find_rpd_references(context, stack_id): + ev_ref_ids = set(e.rsrc_prop_data_id for e + in _query_all_events_by_stack(context, stack_id).all()) + rsrc_ref_ids = set(r.rsrc_prop_data_id for r + in context.session.query(models.Resource).filter_by( + stack_id=stack_id).all()) + return ev_ref_ids | rsrc_ref_ids + + +def _all_backup_stack_ids(context, stack_id): + """Iterate over all the IDs of all stacks related as stack/backup pairs. + + All backup stacks of a main stack, past and present (i.e. including those + that are soft deleted), are included. The main stack itself is also + included if the initial ID passed in is for a backup stack. The initial ID + passed in is never included in the output. + """ + query = context.session.query(models.Stack) + stack = query.get(stack_id) + if stack is None: + LOG.error('Stack %s not found', stack_id) + return + is_backup = stack.name.endswith('*') + + if is_backup: + main = query.get(stack.owner_id) + if main is None: + LOG.error('Main stack for backup "%s" %s not found', + stack.name, stack_id) + return + yield main.id + for backup_id in _all_backup_stack_ids(context, main.id): + if backup_id != stack_id: + yield backup_id + else: + q_backup = query.filter(sqlalchemy.or_( + models.Stack.tenant == context.tenant_id, + models.Stack.stack_user_project_id == context.tenant_id)) + q_backup = q_backup.filter_by(name=stack.name + '*') + q_backup = q_backup.filter_by(owner_id=stack_id) + for backup in q_backup.all(): + yield backup.id + + +def _delete_event_rows(context, stack_id, limit): + # MySQL does not support LIMIT in subqueries, + # sqlite does not support JOIN in DELETE. + # So we must manually supply the IN() values. + # pgsql SHOULD work with the pure DELETE/JOIN below but that must be + # confirmed via integration tests. + session = context.session + with session.begin(): + query = _query_all_events_by_stack(context, stack_id) + query = query.order_by(models.Event.id).limit(limit) + id_pairs = [(e.id, e.rsrc_prop_data_id) for e in query.all()] + if not id_pairs: + return 0 + (ids, rsrc_prop_ids) = zip(*id_pairs) + max_id = ids[-1] + # delete the events + retval = session.query(models.Event).filter( + models.Event.id <= max_id).filter( + models.Event.stack_id == stack_id).delete() + + # delete unreferenced resource_properties_data + def del_rpd(rpd_ids): + if not rpd_ids: + return + q_rpd = session.query(models.ResourcePropertiesData) + q_rpd = q_rpd.filter(models.ResourcePropertiesData.id.in_(rpd_ids)) + q_rpd.delete(synchronize_session=False) + + if rsrc_prop_ids: + clr_prop_ids = set(rsrc_prop_ids) - _find_rpd_references(context, + stack_id) + clr_prop_ids.discard(None) + try: + del_rpd(clr_prop_ids) + except db_exception.DBReferenceError: + LOG.debug('Checking backup/stack pairs for RPD references') + found = False + for partner_stack_id in _all_backup_stack_ids(context, + stack_id): + found = True + clr_prop_ids -= _find_rpd_references(context, + partner_stack_id) + if not found: + LOG.debug('No backup/stack pairs found for %s', stack_id) + raise + del_rpd(clr_prop_ids) + + return retval + + +@retry_on_db_error +def event_create(context, values): + if 'stack_id' in values and cfg.CONF.max_events_per_stack: + # only count events and purge on average + # 200.0/cfg.CONF.event_purge_batch_size percent of the time. + check = (2.0 / cfg.CONF.event_purge_batch_size) > random.uniform(0, 1) + if (check and + (event_count_all_by_stack(context, values['stack_id']) >= + cfg.CONF.max_events_per_stack)): + # prune + try: + _delete_event_rows(context, values['stack_id'], + cfg.CONF.event_purge_batch_size) + except db_exception.DBError as exc: + LOG.error('Failed to purge events: %s', str(exc)) + event_ref = models.Event() + event_ref.update(values) + event_ref.save(context.session) + return event_ref + + +def software_config_create(context, values): + obj_ref = models.SoftwareConfig() + obj_ref.update(values) + obj_ref.save(context.session) + return obj_ref + + +def software_config_get(context, config_id): + result = context.session.query(models.SoftwareConfig).get(config_id) + if (result is not None and context is not None and not context.is_admin and + result.tenant != context.tenant_id): + result = None + + if not result: + raise exception.NotFound(_('Software config with id %s not found') % + config_id) + return result + + +def software_config_get_all(context, limit=None, marker=None): + query = context.session.query(models.SoftwareConfig) + if not context.is_admin: + query = query.filter_by(tenant=context.tenant_id) + return _paginate_query(context, query, models.SoftwareConfig, + limit=limit, marker=marker).all() + + +def software_config_delete(context, config_id): + config = software_config_get(context, config_id) + # Query if the software config has been referenced by deployment. + result = context.session.query(models.SoftwareDeployment).filter_by( + config_id=config_id).first() + if result: + msg = (_("Software config with id %s can not be deleted as " + "it is referenced.") % config_id) + raise exception.InvalidRestrictedAction(message=msg) + with context.session.begin(): + context.session.delete(config) + + +def software_deployment_create(context, values): + obj_ref = models.SoftwareDeployment() + obj_ref.update(values) + session = context.session + + with session.begin(): + obj_ref.save(session) + + return obj_ref + + +def software_deployment_get(context, deployment_id): + result = context.session.query( + models.SoftwareDeployment).get(deployment_id) + if (result is not None and context is not None and not context.is_admin and + context.tenant_id not in (result.tenant, + result.stack_user_project_id)): + result = None + + if not result: + raise exception.NotFound(_('Deployment with id %s not found') % + deployment_id) + return result + + +def software_deployment_get_all(context, server_id=None): + sd = models.SoftwareDeployment + query = context.session.query(sd).order_by(sd.created_at) + if not context.is_admin: + query = query.filter(sqlalchemy.or_( + sd.tenant == context.tenant_id, + sd.stack_user_project_id == context.tenant_id)) + if server_id: + query = query.filter_by(server_id=server_id) + + return query.all() + + +def software_deployment_update(context, deployment_id, values): + deployment = software_deployment_get(context, deployment_id) + update_and_save(context, deployment, values) + return deployment + + +def software_deployment_delete(context, deployment_id): + deployment = software_deployment_get(context, deployment_id) + session = context.session + with session.begin(): + session.delete(deployment) + + +def snapshot_create(context, values): + obj_ref = models.Snapshot() + obj_ref.update(values) + obj_ref.save(context.session) + return obj_ref + + +def snapshot_get(context, snapshot_id): + result = context.session.query(models.Snapshot).get(snapshot_id) + if (result is not None and context is not None and + context.tenant_id != result.tenant): + result = None + + if not result: + raise exception.NotFound(_('Snapshot with id %s not found') % + snapshot_id) + return result + + +def snapshot_get_by_stack(context, snapshot_id, stack): + snapshot = snapshot_get(context, snapshot_id) + if snapshot.stack_id != stack.id: + raise exception.SnapshotNotFound(snapshot=snapshot_id, + stack=stack.name) + + return snapshot + + +def snapshot_update(context, snapshot_id, values): + snapshot = snapshot_get(context, snapshot_id) + snapshot.update(values) + snapshot.save(context.session) + return snapshot + + +def snapshot_delete(context, snapshot_id): + snapshot = snapshot_get(context, snapshot_id) + with context.session.begin(): + context.session.delete(snapshot) + + +def snapshot_get_all(context, stack_id): + return context.session.query(models.Snapshot).filter_by( + stack_id=stack_id, tenant=context.tenant_id) + + +def service_create(context, values): + service = models.Service() + service.update(values) + service.save(context.session) + return service + + +def service_update(context, service_id, values): + service = service_get(context, service_id) + values.update({'updated_at': timeutils.utcnow()}) + service.update(values) + service.save(context.session) + return service + + +def service_delete(context, service_id, soft_delete=True): + service = service_get(context, service_id) + session = context.session + with session.begin(): + if soft_delete: + delete_softly(context, service) + else: + session.delete(service) + + +def service_get(context, service_id): + result = context.session.query(models.Service).get(service_id) + if result is None: + raise exception.EntityNotFound(entity='Service', name=service_id) + return result + + +def service_get_all(context): + return (context.session.query(models.Service). + filter_by(deleted_at=None).all()) + + +def service_get_all_by_args(context, host, binary, hostname): + return (context.session.query(models.Service). + filter_by(host=host). + filter_by(binary=binary). + filter_by(hostname=hostname).all()) + + +def purge_deleted(age, granularity='days', project_id=None, batch_size=20): + def _validate_positive_integer(val, argname): + try: + val = int(val) + except ValueError: + raise exception.Error(_("%s should be an integer") % argname) + + if val < 0: + raise exception.Error(_("%s should be a positive integer") + % argname) + return val + + age = _validate_positive_integer(age, 'age') + batch_size = _validate_positive_integer(batch_size, 'batch_size') + + if granularity not in ('days', 'hours', 'minutes', 'seconds'): + raise exception.Error( + _("granularity should be days, hours, minutes, or seconds")) + + if granularity == 'days': + age = age * 86400 + elif granularity == 'hours': + age = age * 3600 + elif granularity == 'minutes': + age = age * 60 + + time_line = timeutils.utcnow() - datetime.timedelta(seconds=age) + engine = get_engine() + meta = sqlalchemy.MetaData() + meta.bind = engine + + stack = sqlalchemy.Table('stack', meta, autoload=True) + service = sqlalchemy.Table('service', meta, autoload=True) + + # Purge deleted services + srvc_del = service.delete().where(service.c.deleted_at < time_line) + engine.execute(srvc_del) + + # find the soft-deleted stacks that are past their expiry + sel = sqlalchemy.select([stack.c.id, stack.c.raw_template_id, + stack.c.prev_raw_template_id, + stack.c.user_creds_id, + stack.c.action, + stack.c.status, + stack.c.name]) + if project_id: + stack_where = sel.where(and_( + stack.c.tenant == project_id, + stack.c.deleted_at < time_line)) + else: + stack_where = sel.where( + stack.c.deleted_at < time_line) + + stacks = engine.execute(stack_where) + + while True: + next_stacks_to_purge = list(itertools.islice(stacks, batch_size)) + if len(next_stacks_to_purge): + _purge_stacks(next_stacks_to_purge, engine, meta) + else: + break + + +@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True, + retry_interval=0.5, inc_retry_interval=True) +def _purge_stacks(stack_infos, engine, meta): + """Purge some stacks and their releated events, raw_templates, etc. + + stack_infos is a list of lists of selected stack columns: + [[id, raw_template_id, prev_raw_template_id, user_creds_id, + action, status, name], ...] + """ + + stack = sqlalchemy.Table('stack', meta, autoload=True) + stack_lock = sqlalchemy.Table('stack_lock', meta, autoload=True) + stack_tag = sqlalchemy.Table('stack_tag', meta, autoload=True) + resource = sqlalchemy.Table('resource', meta, autoload=True) + resource_data = sqlalchemy.Table('resource_data', meta, autoload=True) + resource_properties_data = sqlalchemy.Table( + 'resource_properties_data', meta, autoload=True) + event = sqlalchemy.Table('event', meta, autoload=True) + raw_template = sqlalchemy.Table('raw_template', meta, autoload=True) + raw_template_files = sqlalchemy.Table('raw_template_files', meta, + autoload=True) + user_creds = sqlalchemy.Table('user_creds', meta, autoload=True) + syncpoint = sqlalchemy.Table('sync_point', meta, autoload=True) + + stack_info_str = ','.join([str(i) for i in stack_infos]) + LOG.info("Purging stacks %s", stack_info_str) + + # TODO(cwolfe): find a way to make this re-entrant with + # reasonably sized transactions (good luck), or add + # a cleanup for orphaned rows. + stack_ids = [stack_info[0] for stack_info in stack_infos] + # delete stack locks (just in case some got stuck) + stack_lock_del = stack_lock.delete().where( + stack_lock.c.stack_id.in_(stack_ids)) + engine.execute(stack_lock_del) + # delete stack tags + stack_tag_del = stack_tag.delete().where( + stack_tag.c.stack_id.in_(stack_ids)) + engine.execute(stack_tag_del) + # delete resource_data + res_where = sqlalchemy.select([resource.c.id]).where( + resource.c.stack_id.in_(stack_ids)) + res_data_del = resource_data.delete().where( + resource_data.c.resource_id.in_(res_where)) + engine.execute(res_data_del) + # clean up any sync_points that may have lingered + sync_del = syncpoint.delete().where( + syncpoint.c.stack_id.in_(stack_ids)) + engine.execute(sync_del) + + # get rsrc_prop_data_ids to delete + rsrc_prop_data_where = sqlalchemy.select( + [resource.c.rsrc_prop_data_id]).where( + resource.c.stack_id.in_(stack_ids)) + rsrc_prop_data_ids = set( + [i[0] for i in list(engine.execute(rsrc_prop_data_where))]) + rsrc_prop_data_where = sqlalchemy.select( + [resource.c.attr_data_id]).where( + resource.c.stack_id.in_(stack_ids)) + rsrc_prop_data_ids.update( + [i[0] for i in list(engine.execute(rsrc_prop_data_where))]) + rsrc_prop_data_where = sqlalchemy.select( + [event.c.rsrc_prop_data_id]).where( + event.c.stack_id.in_(stack_ids)) + rsrc_prop_data_ids.update( + [i[0] for i in list(engine.execute(rsrc_prop_data_where))]) + # delete events + event_del = event.delete().where(event.c.stack_id.in_(stack_ids)) + engine.execute(event_del) + # delete resources (normally there shouldn't be any) + res_del = resource.delete().where(resource.c.stack_id.in_(stack_ids)) + engine.execute(res_del) + # delete resource_properties_data + if rsrc_prop_data_ids: # keep rpd's in events + rsrc_prop_data_where = sqlalchemy.select( + [event.c.rsrc_prop_data_id]).where( + event.c.rsrc_prop_data_id.in_(rsrc_prop_data_ids)) + ids = list(engine.execute(rsrc_prop_data_where)) + rsrc_prop_data_ids.difference_update([i[0] for i in ids]) + if rsrc_prop_data_ids: # keep rpd's in resources + rsrc_prop_data_where = sqlalchemy.select( + [resource.c.rsrc_prop_data_id]).where( + resource.c.rsrc_prop_data_id.in_(rsrc_prop_data_ids)) + ids = list(engine.execute(rsrc_prop_data_where)) + rsrc_prop_data_ids.difference_update([i[0] for i in ids]) + if rsrc_prop_data_ids: # delete if we have any + rsrc_prop_data_del = resource_properties_data.delete().where( + resource_properties_data.c.id.in_(rsrc_prop_data_ids)) + engine.execute(rsrc_prop_data_del) + # delete the stacks + stack_del = stack.delete().where(stack.c.id.in_(stack_ids)) + engine.execute(stack_del) + # delete orphaned raw templates + raw_template_ids = [i[1] for i in stack_infos if i[1] is not None] + raw_template_ids.extend(i[2] for i in stack_infos if i[2] is not None) + if raw_template_ids: # keep those still referenced + raw_tmpl_sel = sqlalchemy.select([stack.c.raw_template_id]).where( + stack.c.raw_template_id.in_(raw_template_ids)) + raw_tmpl = [i[0] for i in engine.execute(raw_tmpl_sel)] + raw_template_ids = set(raw_template_ids) - set(raw_tmpl) + if raw_template_ids: # keep those still referenced (previous tmpl) + raw_tmpl_sel = sqlalchemy.select( + [stack.c.prev_raw_template_id]).where( + stack.c.prev_raw_template_id.in_(raw_template_ids)) + raw_tmpl = [i[0] for i in engine.execute(raw_tmpl_sel)] + raw_template_ids = raw_template_ids - set(raw_tmpl) + if raw_template_ids: # delete raw_templates if we have any + raw_tmpl_file_sel = sqlalchemy.select( + [raw_template.c.files_id]).where( + raw_template.c.id.in_(raw_template_ids)) + raw_tmpl_file_ids = [i[0] for i in engine.execute( + raw_tmpl_file_sel)] + raw_templ_del = raw_template.delete().where( + raw_template.c.id.in_(raw_template_ids)) + engine.execute(raw_templ_del) + if raw_tmpl_file_ids: # keep _files still referenced + raw_tmpl_file_sel = sqlalchemy.select( + [raw_template.c.files_id]).where( + raw_template.c.files_id.in_(raw_tmpl_file_ids)) + raw_tmpl_files = [i[0] for i in engine.execute( + raw_tmpl_file_sel)] + raw_tmpl_file_ids = set(raw_tmpl_file_ids) \ + - set(raw_tmpl_files) + if raw_tmpl_file_ids: # delete _files if we have any + raw_tmpl_file_del = raw_template_files.delete().where( + raw_template_files.c.id.in_(raw_tmpl_file_ids)) + engine.execute(raw_tmpl_file_del) + # purge any user creds that are no longer referenced + user_creds_ids = [i[3] for i in stack_infos if i[3] is not None] + if user_creds_ids: # keep those still referenced + user_sel = sqlalchemy.select([stack.c.user_creds_id]).where( + stack.c.user_creds_id.in_(user_creds_ids)) + users = [i[0] for i in engine.execute(user_sel)] + user_creds_ids = set(user_creds_ids) - set(users) + if user_creds_ids: # delete if we have any + usr_creds_del = user_creds.delete().where( + user_creds.c.id.in_(user_creds_ids)) + engine.execute(usr_creds_del) + + +def sync_point_delete_all_by_stack_and_traversal(context, stack_id, + traversal_id): + rows_deleted = context.session.query(models.SyncPoint).filter_by( + stack_id=stack_id, traversal_id=traversal_id).delete() + return rows_deleted + + +@retry_on_db_error +def sync_point_create(context, values): + values['entity_id'] = str(values['entity_id']) + sync_point_ref = models.SyncPoint() + sync_point_ref.update(values) + sync_point_ref.save(context.session) + return sync_point_ref + + +def sync_point_get(context, entity_id, traversal_id, is_update): + entity_id = str(entity_id) + return context.session.query(models.SyncPoint).get( + (entity_id, traversal_id, is_update) + ) + + +@retry_on_db_error +def sync_point_update_input_data(context, entity_id, + traversal_id, is_update, atomic_key, + input_data): + entity_id = str(entity_id) + rows_updated = context.session.query(models.SyncPoint).filter_by( + entity_id=entity_id, + traversal_id=traversal_id, + is_update=is_update, + atomic_key=atomic_key + ).update({"input_data": input_data, "atomic_key": atomic_key + 1}) + return rows_updated + + +def _crypt_action(encrypt): + if encrypt: + return _('encrypt') + return _('decrypt') + + +def _db_encrypt_or_decrypt_template_params( + ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False): + from heat.engine import template + session = ctxt.session + excs = [] + query = session.query(models.RawTemplate) + template_batches = _get_batch( + session, ctxt=ctxt, query=query, model=models.RawTemplate, + batch_size=batch_size) + next_batch = list(itertools.islice(template_batches, batch_size)) + while next_batch: + with session.begin(): + for raw_template in next_batch: + try: + if verbose: + LOG.info("Processing raw_template %s...", + raw_template.id) + env = raw_template.environment + needs_update = False + + # using "in env.keys()" so an exception is raised + # if env is something weird like a string. + if env is None or 'parameters' not in env.keys(): + continue + if 'encrypted_param_names' in env: + encrypted_params = env['encrypted_param_names'] + else: + encrypted_params = [] + + if encrypt: + tmpl = template.Template.load( + ctxt, raw_template.id, raw_template) + param_schemata = tmpl.param_schemata() + if not param_schemata: + continue + + for param_name, param_val in env['parameters'].items(): + if (param_name in encrypted_params or + param_name not in param_schemata or + not param_schemata[param_name].hidden): + continue + encrypted_val = crypt.encrypt( + str(param_val), encryption_key) + env['parameters'][param_name] = encrypted_val + encrypted_params.append(param_name) + needs_update = True + if needs_update: + newenv = env.copy() + newenv['encrypted_param_names'] = encrypted_params + else: # decrypt + for param_name in encrypted_params: + method, value = env['parameters'][param_name] + decrypted_val = crypt.decrypt(method, value, + encryption_key) + env['parameters'][param_name] = decrypted_val + needs_update = True + if needs_update: + newenv = env.copy() + newenv['encrypted_param_names'] = [] + + if needs_update: + raw_template_update(ctxt, raw_template.id, + {'environment': newenv}) + except Exception as exc: + LOG.exception('Failed to %(crypt_action)s parameters ' + 'of raw template %(id)d', + {'id': raw_template.id, + 'crypt_action': _crypt_action(encrypt)}) + excs.append(exc) + continue + finally: + if verbose: + LOG.info("Finished %(crypt_action)s processing of " + "raw_template %(id)d.", + {'id': raw_template.id, + 'crypt_action': _crypt_action(encrypt)}) + next_batch = list(itertools.islice(template_batches, batch_size)) + return excs + + +def _db_encrypt_or_decrypt_resource_prop_data_legacy( + ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False): + session = ctxt.session + excs = [] + + # Older resources may have properties_data in the legacy column, + # so update those as needed + query = session.query(models.Resource).filter( + models.Resource.properties_data_encrypted.isnot(encrypt)) + resource_batches = _get_batch( + session=session, ctxt=ctxt, query=query, model=models.Resource, + batch_size=batch_size) + next_batch = list(itertools.islice(resource_batches, batch_size)) + while next_batch: + with session.begin(): + for resource in next_batch: + if not resource.properties_data: + continue + try: + if verbose: + LOG.info("Processing resource %s...", + resource.id) + if encrypt: + result = crypt.encrypted_dict(resource.properties_data, + encryption_key) + else: + result = crypt.decrypted_dict(resource.properties_data, + encryption_key) + resource_update(ctxt, resource.id, + {'properties_data': result, + 'properties_data_encrypted': encrypt}, + resource.atomic_key) + except Exception as exc: + LOG.exception('Failed to %(crypt_action)s ' + 'properties_data of resource %(id)d' % + {'id': resource.id, + 'crypt_action': _crypt_action(encrypt)}) + excs.append(exc) + continue + finally: + if verbose: + LOG.info("Finished processing resource %s.", + resource.id) + next_batch = list(itertools.islice(resource_batches, batch_size)) + return excs + + +def _db_encrypt_or_decrypt_resource_prop_data( + ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False): + session = ctxt.session + excs = [] + + # Older resources may have properties_data in the legacy column, + # so update those as needed + query = session.query(models.ResourcePropertiesData).filter( + models.ResourcePropertiesData.encrypted.isnot(encrypt)) + rpd_batches = _get_batch( + session=session, ctxt=ctxt, query=query, + model=models.ResourcePropertiesData, batch_size=batch_size) + next_batch = list(itertools.islice(rpd_batches, batch_size)) + while next_batch: + with session.begin(): + for rpd in next_batch: + if not rpd.data: + continue + try: + if verbose: + LOG.info("Processing resource_properties_data " + "%s...", rpd.id) + if encrypt: + result = crypt.encrypted_dict(rpd.data, + encryption_key) + else: + result = crypt.decrypted_dict(rpd.data, + encryption_key) + rpd.update({'data': result, + 'encrypted': encrypt}) + except Exception as exc: + LOG.exception( + "Failed to %(crypt_action)s " + "data of resource_properties_data %(id)d" % + {'id': rpd.id, + 'crypt_action': _crypt_action(encrypt)}) + excs.append(exc) + continue + finally: + if verbose: + LOG.info( + "Finished processing resource_properties_data" + " %s.", rpd.id) + next_batch = list(itertools.islice(rpd_batches, batch_size)) + return excs + + +def db_encrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50, + verbose=False): + """Encrypt parameters and properties for all templates in db. + + :param ctxt: RPC context + :param encryption_key: key that will be used for parameter and property + encryption + :param batch_size: number of templates requested from DB in each iteration. + 50 means that heat requests 50 templates, encrypt them + and proceed with next 50 items. + :param verbose: log an INFO message when processing of each raw_template or + resource begins or ends + :return: list of exceptions encountered during encryption + """ + excs = [] + excs.extend(_db_encrypt_or_decrypt_template_params( + ctxt, encryption_key, True, batch_size, verbose)) + excs.extend(_db_encrypt_or_decrypt_resource_prop_data( + ctxt, encryption_key, True, batch_size, verbose)) + excs.extend(_db_encrypt_or_decrypt_resource_prop_data_legacy( + ctxt, encryption_key, True, batch_size, verbose)) + return excs + + +def db_decrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50, + verbose=False): + """Decrypt parameters and properties for all templates in db. + + :param ctxt: RPC context + :param encryption_key: key that will be used for parameter and property + decryption + :param batch_size: number of templates requested from DB in each iteration. + 50 means that heat requests 50 templates, encrypt them + and proceed with next 50 items. + :param verbose: log an INFO message when processing of each raw_template or + resource begins or ends + :return: list of exceptions encountered during decryption + """ + excs = [] + excs.extend(_db_encrypt_or_decrypt_template_params( + ctxt, encryption_key, False, batch_size, verbose)) + excs.extend(_db_encrypt_or_decrypt_resource_prop_data( + ctxt, encryption_key, False, batch_size, verbose)) + excs.extend(_db_encrypt_or_decrypt_resource_prop_data_legacy( + ctxt, encryption_key, False, batch_size, verbose)) + return excs + + +def db_properties_data_migrate(ctxt, batch_size=50): + """Migrate properties data from legacy columns to new location in db. + + :param ctxt: RPC context + :param batch_size: number of templates requested from DB in each iteration. + 50 means that heat requests 50 templates, encrypt them + and proceed with next 50 items. + """ + session = ctxt.session + + query = session.query(models.Resource).filter(and_( + models.Resource.properties_data.isnot(None), + models.Resource.rsrc_prop_data_id.is_(None))) + resource_batches = _get_batch( + session=session, ctxt=ctxt, query=query, + model=models.Resource, batch_size=batch_size) + next_batch = list(itertools.islice(resource_batches, batch_size)) + while next_batch: + with session.begin(): + for resource in next_batch: + try: + encrypted = resource.properties_data_encrypted + if encrypted is None: + LOG.warning( + 'Unexpected: resource.encrypted is None for ' + 'resource id %s for legacy ' + 'resource.properties_data, assuming False.', + resource.id) + encrypted = False + rsrc_prop_data = resource_prop_data_create( + ctxt, {'encrypted': encrypted, + 'data': resource.properties_data}) + resource_update(ctxt, resource.id, + {'properties_data_encrypted': None, + 'properties_data': None, + 'rsrc_prop_data_id': rsrc_prop_data.id}, + resource.atomic_key) + except Exception: + LOG.exception('Failed to migrate properties_data for ' + 'resource %d', resource.id) + continue + next_batch = list(itertools.islice(resource_batches, batch_size)) + + query = session.query(models.Event).filter(and_( + models.Event.resource_properties.isnot(None), + models.Event.rsrc_prop_data_id.is_(None))) + event_batches = _get_batch( + session=session, ctxt=ctxt, query=query, + model=models.Event, batch_size=batch_size) + next_batch = list(itertools.islice(event_batches, batch_size)) + while next_batch: + with session.begin(): + for event in next_batch: + try: + prop_data = event.resource_properties + rsrc_prop_data = resource_prop_data_create( + ctxt, {'encrypted': False, + 'data': prop_data}) + event.update({'resource_properties': None, + 'rsrc_prop_data_id': rsrc_prop_data.id}) + except Exception: + LOG.exception('Failed to migrate resource_properties ' + 'for event %d', event.id) + continue + next_batch = list(itertools.islice(event_batches, batch_size)) + + +def _get_batch(session, ctxt, query, model, batch_size=50): + last_batch_marker = None + while True: + results = _paginate_query( + context=ctxt, query=query, model=model, limit=batch_size, + marker=last_batch_marker).all() + if not results: + break + else: + for result in results: + yield result + last_batch_marker = results[-1].id + + +def reset_stack_status(context, stack_id, stack=None): + session = context.session + if stack is None: + stack = session.query(models.Stack).get(stack_id) + + if stack is None: + raise exception.NotFound(_('Stack with id %s not found') % stack_id) + + with session.begin(): + query = session.query(models.Resource).filter_by( + status='IN_PROGRESS', stack_id=stack_id) + query.update({'status': 'FAILED', + 'status_reason': 'Stack status manually reset', + 'engine_id': None}) + + query = session.query(models.ResourceData) + query = query.join(models.Resource) + query = query.filter_by(stack_id=stack_id) + query = query.filter( + models.ResourceData.key.in_(heat_environment.HOOK_TYPES)) + data_ids = [data.id for data in query] + + if data_ids: + query = session.query(models.ResourceData) + query = query.filter(models.ResourceData.id.in_(data_ids)) + query.delete(synchronize_session='fetch') + + query = session.query(models.Stack).filter_by(owner_id=stack_id) + for child in query: + reset_stack_status(context, child.id, child) + + with session.begin(): + if stack.status == 'IN_PROGRESS': + stack.status = 'FAILED' + stack.status_reason = 'Stack status manually reset' + + session.query( + models.StackLock + ).filter_by(stack_id=stack_id).delete() |