From 43a5f3984e433ec28616cfe98cb060d9ff51af58 Mon Sep 17 00:00:00 2001 From: Stephen Finucane Date: Thu, 23 Mar 2023 12:15:08 +0000 Subject: db: Remove layer of indirection We don't have another ORM to content with here. Simplify 'heat.db.sqlalchemy' to 'heat.db'. Signed-off-by: Stephen Finucane Change-Id: Id1db6c0ff126859f436c6c9b1187c250f38ebb62 --- heat/cmd/manage.py | 4 +- heat/common/context.py | 3 +- heat/db/alembic.ini | 72 + heat/db/api.py | 1971 ++++++++++++++++++++ heat/db/filters.py | 44 + heat/db/migration.py | 117 ++ heat/db/migrations/README.rst | 15 + heat/db/migrations/env.py | 94 + heat/db/migrations/script.py.mako | 20 + .../versions/c6214ca60943_initial_revision.py | 392 ++++ heat/db/models.py | 399 ++++ heat/db/sqlalchemy/__init__.py | 0 heat/db/sqlalchemy/alembic.ini | 72 - heat/db/sqlalchemy/api.py | 1971 -------------------- heat/db/sqlalchemy/filters.py | 44 - heat/db/sqlalchemy/migration.py | 117 -- heat/db/sqlalchemy/migrations/README.rst | 15 - heat/db/sqlalchemy/migrations/env.py | 94 - heat/db/sqlalchemy/migrations/script.py.mako | 20 - .../versions/c6214ca60943_initial_revision.py | 392 ---- heat/db/sqlalchemy/models.py | 399 ---- heat/db/sqlalchemy/types.py | 65 - heat/db/sqlalchemy/utils.py | 25 - heat/db/types.py | 65 + heat/db/utils.py | 25 + heat/engine/service_software_config.py | 2 +- heat/engine/template_files.py | 2 +- heat/engine/worker.py | 2 +- heat/objects/event.py | 2 +- heat/objects/raw_template.py | 2 +- heat/objects/raw_template_files.py | 2 +- heat/objects/resource.py | 2 +- heat/objects/resource_data.py | 2 +- heat/objects/resource_properties_data.py | 2 +- heat/objects/service.py | 2 +- heat/objects/snapshot.py | 2 +- heat/objects/software_config.py | 3 +- heat/objects/software_deployment.py | 2 +- heat/objects/stack.py | 2 +- heat/objects/stack_lock.py | 3 +- heat/objects/stack_tag.py | 2 +- heat/objects/sync_point.py | 4 +- heat/objects/user_creds.py | 2 +- heat/tests/convergence/framework/engine_wrapper.py | 2 +- heat/tests/convergence/framework/reality.py | 2 +- heat/tests/db/test_migrations.py | 4 +- heat/tests/db/test_sqlalchemy_api.py | 6 +- heat/tests/db/test_sqlalchemy_filters.py | 2 +- heat/tests/db/test_sqlalchemy_types.py | 2 +- heat/tests/engine/service/test_software_config.py | 2 +- heat/tests/engine/service/test_stack_update.py | 2 +- heat/tests/engine/test_engine_worker.py | 2 +- heat/tests/test_common_service_utils.py | 2 +- heat/tests/test_engine_api_utils.py | 2 +- heat/tests/test_event.py | 2 +- heat/tests/test_resource.py | 4 +- heat/tests/test_resource_properties_data.py | 2 +- heat/tests/test_signal.py | 2 +- heat/tests/test_stack.py | 2 +- heat/tests/test_stack_update.py | 2 +- heat/tests/utils.py | 4 +- 61 files changed, 3258 insertions(+), 3263 deletions(-) create mode 100644 heat/db/alembic.ini create mode 100644 heat/db/api.py create mode 100644 heat/db/filters.py create mode 100644 heat/db/migration.py create mode 100644 heat/db/migrations/README.rst create mode 100644 heat/db/migrations/env.py create mode 100644 heat/db/migrations/script.py.mako create mode 100644 heat/db/migrations/versions/c6214ca60943_initial_revision.py create mode 100644 heat/db/models.py delete mode 100644 heat/db/sqlalchemy/__init__.py delete mode 100644 heat/db/sqlalchemy/alembic.ini delete mode 100644 heat/db/sqlalchemy/api.py delete mode 100644 heat/db/sqlalchemy/filters.py delete mode 100644 heat/db/sqlalchemy/migration.py delete mode 100644 heat/db/sqlalchemy/migrations/README.rst delete mode 100644 heat/db/sqlalchemy/migrations/env.py delete mode 100644 heat/db/sqlalchemy/migrations/script.py.mako delete mode 100644 heat/db/sqlalchemy/migrations/versions/c6214ca60943_initial_revision.py delete mode 100644 heat/db/sqlalchemy/models.py delete mode 100644 heat/db/sqlalchemy/types.py delete mode 100644 heat/db/sqlalchemy/utils.py create mode 100644 heat/db/types.py create mode 100644 heat/db/utils.py diff --git a/heat/cmd/manage.py b/heat/cmd/manage.py index 9f7b8e316..870c95d1d 100644 --- a/heat/cmd/manage.py +++ b/heat/cmd/manage.py @@ -25,8 +25,8 @@ from heat.common import exception from heat.common.i18n import _ from heat.common import messaging from heat.common import service_utils -from heat.db.sqlalchemy import api as db_api -from heat.db.sqlalchemy import migration as db_migration +from heat.db import api as db_api +from heat.db import migration as db_migration from heat.objects import service as service_objects from heat.rpc import client as rpc_client from heat import version diff --git a/heat/common/context.py b/heat/common/context.py index c72c8367b..854c13467 100644 --- a/heat/common/context.py +++ b/heat/common/context.py @@ -31,12 +31,11 @@ from heat.common import endpoint_utils from heat.common import exception from heat.common import policy from heat.common import wsgi -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.engine import clients LOG = logging.getLogger(__name__) - cfg.CONF.import_opt('client_retry_limit', 'heat.common.config') # Note, we yield the options via list_opts to enable generation of the diff --git a/heat/db/alembic.ini b/heat/db/alembic.ini new file mode 100644 index 000000000..dbee79a3b --- /dev/null +++ b/heat/db/alembic.ini @@ -0,0 +1,72 @@ +[alembic] +# path to migration scripts +script_location = %(here)s/migrations + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# version location specification; This defaults +# to heat/db/migrations/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:heat/db/migrations/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +version_path_separator = os # Use os.pathsep. Default configuration used for new projects. + +sqlalchemy.url = sqlite:///heat.db + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/heat/db/api.py b/heat/db/api.py new file mode 100644 index 000000000..8b2c99967 --- /dev/null +++ b/heat/db/api.py @@ -0,0 +1,1971 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Implementation of SQLAlchemy backend.""" +import datetime +import functools +import itertools +import random + +from oslo_config import cfg +from oslo_db import api as oslo_db_api +from oslo_db import exception as db_exception +from oslo_db import options +from oslo_db.sqlalchemy import enginefacade +from oslo_db.sqlalchemy import utils +from oslo_log import log as logging +from oslo_utils import encodeutils +from oslo_utils import excutils +from oslo_utils import timeutils +import osprofiler.sqlalchemy +import sqlalchemy +from sqlalchemy import and_ +from sqlalchemy import func +from sqlalchemy import or_ +from sqlalchemy import orm +from sqlalchemy.orm import aliased as orm_aliased + +from heat.common import crypt +from heat.common import exception +from heat.common.i18n import _ +from heat.db import filters as db_filters +from heat.db import models +from heat.db import utils as db_utils +from heat.engine import environment as heat_environment +from heat.rpc import api as rpc_api + +CONF = cfg.CONF +CONF.import_opt('hidden_stack_tags', 'heat.common.config') +CONF.import_opt('max_events_per_stack', 'heat.common.config') +CONF.import_group('profiler', 'heat.common.config') +CONF.import_opt('db_max_retries', 'oslo_db.options', group='database') +CONF.import_opt('db_retry_interval', 'oslo_db.options', group='database') +CONF.import_opt( + 'db_inc_retry_interval', 'oslo_db.options', group='database') +CONF.import_opt( + 'db_max_retry_interval', 'oslo_db.options', group='database') + +options.set_defaults(CONF) + +_facade = None +db_context = enginefacade.transaction_context() + +LOG = logging.getLogger(__name__) + + +# TODO(sbaker): fix tests so that sqlite_fk=True can be passed to configure +# FIXME(stephenfin): we need to remove reliance on autocommit semantics ASAP +# since it's not compatible with SQLAlchemy 2.0 +db_context.configure(__autocommit=True) + + +def get_facade(): + global _facade + if _facade is None: + + # FIXME: get_facade() is called by the test suite startup, + # but will not be called normally for API calls. + # osprofiler / oslo_db / enginefacade currently don't have hooks + # to talk to each other, however one needs to be added to oslo.db + # to allow access to the Engine once constructed. + db_context.configure(**CONF.database) + _facade = db_context.get_legacy_facade() + if CONF.profiler.enabled: + if CONF.profiler.trace_sqlalchemy: + osprofiler.sqlalchemy.add_tracing(sqlalchemy, + _facade.get_engine(), + "db") + return _facade + + +def get_engine(): + return get_facade().get_engine() + + +def get_session(): + return get_facade().get_session() + + +def retry_on_db_error(func): + @functools.wraps(func) + def try_func(context, *args, **kwargs): + if (context.session.transaction is None or + not context.session.autocommit): + wrapped = oslo_db_api.wrap_db_retry( + max_retries=CONF.database.db_max_retries, + retry_on_deadlock=True, + retry_on_disconnect=True, + retry_interval=CONF.database.db_retry_interval, + inc_retry_interval=CONF.database.db_inc_retry_interval, + max_retry_interval=CONF.database.db_max_retry_interval)(func) + return wrapped(context, *args, **kwargs) + else: + try: + return func(context, *args, **kwargs) + except (db_exception.DBDeadlock, db_exception.DBConnectionError): + with excutils.save_and_reraise_exception(): + LOG.debug('Not retrying on DBDeadlock and ' + 'DBConnectionError because ' + 'transaction not closed') + return try_func + + +def update_and_save(context, obj, values): + with context.session.begin(subtransactions=True): + for k, v in values.items(): + setattr(obj, k, v) + + +def delete_softly(context, obj): + """Mark this object as deleted.""" + update_and_save(context, obj, {'deleted_at': timeutils.utcnow()}) + + +def soft_delete_aware_query(context, *args, **kwargs): + """Stack query helper that accounts for context's `show_deleted` field. + + :param show_deleted: if True, overrides context's show_deleted field. + """ + + query = context.session.query(*args) + show_deleted = kwargs.get('show_deleted') or context.show_deleted + + if not show_deleted: + query = query.filter_by(deleted_at=None) + return query + + +def raw_template_get(context, template_id): + result = context.session.query(models.RawTemplate).get(template_id) + + if not result: + raise exception.NotFound(_('raw template with id %s not found') % + template_id) + return result + + +def raw_template_create(context, values): + raw_template_ref = models.RawTemplate() + raw_template_ref.update(values) + raw_template_ref.save(context.session) + return raw_template_ref + + +def raw_template_update(context, template_id, values): + raw_template_ref = raw_template_get(context, template_id) + # get only the changed values + values = dict((k, v) for k, v in values.items() + if getattr(raw_template_ref, k) != v) + + if values: + update_and_save(context, raw_template_ref, values) + + return raw_template_ref + + +def raw_template_delete(context, template_id): + try: + raw_template = raw_template_get(context, template_id) + except exception.NotFound: + # Ignore not found + return + raw_tmpl_files_id = raw_template.files_id + session = context.session + with session.begin(subtransactions=True): + session.delete(raw_template) + if raw_tmpl_files_id is None: + return + # If no other raw_template is referencing the same raw_template_files, + # delete that too + if session.query(models.RawTemplate).filter_by( + files_id=raw_tmpl_files_id).first() is None: + try: + raw_tmpl_files = raw_template_files_get( + context, raw_tmpl_files_id) + except exception.NotFound: + # Ignore not found + return + session.delete(raw_tmpl_files) + + +def raw_template_files_create(context, values): + session = context.session + raw_templ_files_ref = models.RawTemplateFiles() + raw_templ_files_ref.update(values) + with session.begin(): + raw_templ_files_ref.save(session) + return raw_templ_files_ref + + +def raw_template_files_get(context, files_id): + result = context.session.query(models.RawTemplateFiles).get(files_id) + if not result: + raise exception.NotFound( + _("raw_template_files with files_id %d not found") % + files_id) + return result + + +def resource_get(context, resource_id, refresh=False, refresh_data=False, + eager=True): + query = context.session.query(models.Resource) + query = query.options(orm.joinedload("data")) + if eager: + query = query.options(orm.joinedload("rsrc_prop_data")) + + result = query.get(resource_id) + if not result: + raise exception.NotFound(_("resource with id %s not found") % + resource_id) + if refresh: + context.session.refresh(result) + if refresh_data: + # ensure data is loaded (lazy or otherwise) + result.data + + return result + + +def resource_get_by_name_and_stack(context, resource_name, stack_id): + result = context.session.query( + models.Resource + ).filter_by( + name=resource_name + ).filter_by( + stack_id=stack_id + ).options(orm.joinedload("data")).first() + return result + + +def resource_get_all_by_physical_resource_id(context, physical_resource_id): + results = (context.session.query(models.Resource) + .filter_by(physical_resource_id=physical_resource_id) + .all()) + + for result in results: + if context is None or context.is_admin or context.tenant_id in ( + result.stack.tenant, result.stack.stack_user_project_id): + yield result + + +def resource_get_by_physical_resource_id(context, physical_resource_id): + results = resource_get_all_by_physical_resource_id(context, + physical_resource_id) + try: + return next(results) + except StopIteration: + return None + + +def resource_get_all(context): + results = context.session.query(models.Resource).all() + + if not results: + raise exception.NotFound(_('no resources were found')) + return results + + +@retry_on_db_error +def resource_purge_deleted(context, stack_id): + filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'} + query = context.session.query(models.Resource) + result = query.filter_by(**filters) + attr_ids = [r.attr_data_id for r in result if r.attr_data_id is not None] + with context.session.begin(): + result.delete() + if attr_ids: + context.session.query(models.ResourcePropertiesData).filter( + models.ResourcePropertiesData.id.in_(attr_ids)).delete( + synchronize_session=False) + + +def _add_atomic_key_to_values(values, atomic_key): + if atomic_key is None: + values['atomic_key'] = 1 + else: + values['atomic_key'] = atomic_key + 1 + + +@retry_on_db_error +def resource_update(context, resource_id, values, atomic_key, + expected_engine_id=None): + return _try_resource_update(context, resource_id, values, atomic_key, + expected_engine_id) + + +def _try_resource_update(context, resource_id, values, atomic_key, + expected_engine_id=None): + session = context.session + with session.begin(subtransactions=True): + _add_atomic_key_to_values(values, atomic_key) + rows_updated = session.query(models.Resource).filter_by( + id=resource_id, engine_id=expected_engine_id, + atomic_key=atomic_key).update(values) + + return bool(rows_updated) + + +def resource_update_and_save(context, resource_id, values): + resource = context.session.query(models.Resource).get(resource_id) + update_and_save(context, resource, values) + + +def resource_delete(context, resource_id): + session = context.session + with session.begin(subtransactions=True): + resource = session.query(models.Resource).get(resource_id) + if resource: + session.delete(resource) + if resource.attr_data_id is not None: + attr_prop_data = session.query( + models.ResourcePropertiesData).get(resource.attr_data_id) + session.delete(attr_prop_data) + + +def resource_attr_id_set(context, resource_id, atomic_key, attr_id): + session = context.session + with session.begin(): + values = {'attr_data_id': attr_id} + _add_atomic_key_to_values(values, atomic_key) + rows_updated = session.query(models.Resource).filter(and_( + models.Resource.id == resource_id, + models.Resource.atomic_key == atomic_key, + models.Resource.engine_id.is_(None), + or_(models.Resource.attr_data_id == attr_id, + models.Resource.attr_data_id.is_(None)))).update( + values) + if rows_updated > 0: + return True + else: + # Someone else set the attr_id first and/or we have a stale + # view of the resource based on atomic_key, so delete the + # resource_properties_data (attr) DB row. + LOG.debug('Not updating res_id %(rid)s with attr_id %(aid)s', + {'rid': resource_id, 'aid': attr_id}) + session.query( + models.ResourcePropertiesData).filter( + models.ResourcePropertiesData.id == attr_id).delete() + return False + + +def resource_attr_data_delete(context, resource_id, attr_id): + session = context.session + with session.begin(): + resource = session.query(models.Resource).get(resource_id) + attr_prop_data = session.query( + models.ResourcePropertiesData).get(attr_id) + if resource: + resource.update({'attr_data_id': None}) + if attr_prop_data: + session.delete(attr_prop_data) + + +def resource_data_get_all(context, resource_id, data=None): + """Looks up resource_data by resource.id. + + If data is encrypted, this method will decrypt the results. + """ + if data is None: + data = (context.session.query(models.ResourceData) + .filter_by(resource_id=resource_id)).all() + + if not data: + raise exception.NotFound(_('no resource data found')) + + ret = {} + + for res in data: + if res.redact: + try: + ret[res.key] = crypt.decrypt(res.decrypt_method, res.value) + continue + except exception.InvalidEncryptionKey: + LOG.exception('Failed to decrypt resource data %(rkey)s ' + 'for %(rid)s, ignoring.', + {'rkey': res.key, 'rid': resource_id}) + ret[res.key] = res.value + return ret + + +def resource_data_get(context, resource_id, key): + """Lookup value of resource's data by key. + + Decrypts resource data if necessary. + """ + result = resource_data_get_by_key(context, + resource_id, + key) + if result.redact: + return crypt.decrypt(result.decrypt_method, result.value) + return result.value + + +def stack_tags_set(context, stack_id, tags): + session = context.session + with session.begin(): + stack_tags_delete(context, stack_id) + result = [] + for tag in tags: + stack_tag = models.StackTag() + stack_tag.tag = tag + stack_tag.stack_id = stack_id + stack_tag.save(session=session) + result.append(stack_tag) + return result or None + + +def stack_tags_delete(context, stack_id): + session = context.session + with session.begin(subtransactions=True): + result = stack_tags_get(context, stack_id) + if result: + for tag in result: + session.delete(tag) + + +def stack_tags_get(context, stack_id): + result = (context.session.query(models.StackTag) + .filter_by(stack_id=stack_id) + .all()) + return result or None + + +def resource_data_get_by_key(context, resource_id, key): + """Looks up resource_data by resource_id and key. + + Does not decrypt resource_data. + """ + result = (context.session.query(models.ResourceData) + .filter_by(resource_id=resource_id) + .filter_by(key=key).first()) + + if not result: + raise exception.NotFound(_('No resource data found')) + return result + + +def resource_data_set(context, resource_id, key, value, redact=False): + """Save resource's key/value pair to database.""" + if redact: + method, value = crypt.encrypt(value) + else: + method = '' + try: + current = resource_data_get_by_key(context, resource_id, key) + except exception.NotFound: + current = models.ResourceData() + current.key = key + current.resource_id = resource_id + current.redact = redact + current.value = value + current.decrypt_method = method + current.save(session=context.session) + return current + + +def resource_exchange_stacks(context, resource_id1, resource_id2): + query = context.session.query(models.Resource) + session = query.session + + with session.begin(): + res1 = query.get(resource_id1) + res2 = query.get(resource_id2) + + res1.stack, res2.stack = res2.stack, res1.stack + + +def resource_data_delete(context, resource_id, key): + result = resource_data_get_by_key(context, resource_id, key) + session = context.session + with session.begin(): + session.delete(result) + + +def resource_create(context, values): + resource_ref = models.Resource() + resource_ref.update(values) + resource_ref.save(context.session) + return resource_ref + + +@retry_on_db_error +def resource_create_replacement(context, + existing_res_id, + new_res_values, + atomic_key, expected_engine_id=None): + session = context.session + try: + with session.begin(): + new_res = resource_create(context, new_res_values) + update_data = {'replaced_by': new_res.id} + if not _try_resource_update(context, + existing_res_id, update_data, + atomic_key, + expected_engine_id=expected_engine_id): + data = {} + if 'name' in new_res_values: + data['resource_name'] = new_res_values['name'] + raise exception.UpdateInProgress(**data) + except db_exception.DBReferenceError as exc: + # New template_id no longer exists + LOG.debug('Not creating replacement resource: %s', exc) + return None + else: + return new_res + + +def resource_get_all_by_stack(context, stack_id, filters=None): + query = context.session.query( + models.Resource + ).filter_by( + stack_id=stack_id + ).options(orm.joinedload("data")).options(orm.joinedload("rsrc_prop_data")) + + query = db_filters.exact_filter(query, models.Resource, filters) + results = query.all() + + return dict((res.name, res) for res in results) + + +def resource_get_all_active_by_stack(context, stack_id): + filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'} + subquery = context.session.query(models.Resource.id).filter_by(**filters) + + results = context.session.query(models.Resource).filter_by( + stack_id=stack_id).filter( + models.Resource.id.notin_(subquery.scalar_subquery()) + ).options(orm.joinedload("data")).all() + + return dict((res.id, res) for res in results) + + +def resource_get_all_by_root_stack(context, stack_id, filters=None, + stack_id_only=False): + query = context.session.query( + models.Resource + ).filter_by( + root_stack_id=stack_id + ) + + if stack_id_only: + query = query.options(orm.load_only("id", "stack_id")) + else: + query = query.options(orm.joinedload("data")).options( + orm.joinedload("rsrc_prop_data")) + + query = db_filters.exact_filter(query, models.Resource, filters) + results = query.all() + + return dict((res.id, res) for res in results) + + +def engine_get_all_locked_by_stack(context, stack_id): + query = context.session.query( + func.distinct(models.Resource.engine_id) + ).filter( + models.Resource.stack_id == stack_id, + models.Resource.engine_id.isnot(None)) + return set(i[0] for i in query.all()) + + +def resource_prop_data_create_or_update(context, values, rpd_id=None): + obj_ref = None + if rpd_id is not None: + obj_ref = context.session.query( + models.ResourcePropertiesData).filter_by(id=rpd_id).first() + if obj_ref is None: + obj_ref = models.ResourcePropertiesData() + obj_ref.update(values) + obj_ref.save(context.session) + return obj_ref + + +def resource_prop_data_create(context, values): + return resource_prop_data_create_or_update(context, values) + + +def resource_prop_data_get(context, resource_prop_data_id): + result = context.session.query(models.ResourcePropertiesData).get( + resource_prop_data_id) + if result is None: + raise exception.NotFound( + _('ResourcePropertiesData with id %s not found') % + resource_prop_data_id) + return result + + +def stack_get_by_name_and_owner_id(context, stack_name, owner_id): + query = soft_delete_aware_query( + context, models.Stack + ).options(orm.joinedload("raw_template")).filter(sqlalchemy.or_( + models.Stack.tenant == context.tenant_id, + models.Stack.stack_user_project_id == context.tenant_id) + ).filter_by(name=stack_name).filter_by(owner_id=owner_id) + return query.first() + + +def stack_get_by_name(context, stack_name): + query = soft_delete_aware_query( + context, models.Stack + ).filter(sqlalchemy.or_( + models.Stack.tenant == context.tenant_id, + models.Stack.stack_user_project_id == context.tenant_id) + ).filter_by(name=stack_name) + return query.order_by(models.Stack.created_at).first() + + +def stack_get(context, stack_id, show_deleted=False, eager_load=True): + query = context.session.query(models.Stack) + if eager_load: + query = query.options(orm.joinedload("raw_template")) + result = query.get(stack_id) + + deleted_ok = show_deleted or context.show_deleted + if result is None or result.deleted_at is not None and not deleted_ok: + return None + + # One exception to normal project scoping is users created by the + # stacks in the stack_user_project_id (in the heat stack user domain) + if (result is not None + and context is not None and not context.is_admin + and context.tenant_id not in (result.tenant, + result.stack_user_project_id)): + return None + return result + + +def stack_get_status(context, stack_id): + query = context.session.query(models.Stack) + query = query.options( + orm.load_only("action", "status", "status_reason", "updated_at")) + result = query.filter_by(id=stack_id).first() + if result is None: + raise exception.NotFound(_('Stack with id %s not found') % stack_id) + + return (result.action, result.status, result.status_reason, + result.updated_at) + + +def stack_get_all_by_owner_id(context, owner_id): + results = soft_delete_aware_query( + context, models.Stack).filter_by(owner_id=owner_id, + backup=False).all() + return results + + +def stack_get_all_by_root_owner_id(context, owner_id): + for stack in stack_get_all_by_owner_id(context, owner_id): + yield stack + for ch_st in stack_get_all_by_root_owner_id(context, stack.id): + yield ch_st + + +def _get_sort_keys(sort_keys, mapping): + """Returns an array containing only allowed keys + + :param sort_keys: an array of strings + :param mapping: a mapping from keys to DB column names + :returns: filtered list of sort keys + """ + if isinstance(sort_keys, str): + sort_keys = [sort_keys] + return [mapping[key] for key in sort_keys or [] if key in mapping] + + +def _paginate_query(context, query, model, limit=None, sort_keys=None, + marker=None, sort_dir=None): + default_sort_keys = ['created_at'] + if not sort_keys: + sort_keys = default_sort_keys + if not sort_dir: + sort_dir = 'desc' + + # This assures the order of the stacks will always be the same + # even for sort_key values that are not unique in the database + sort_keys = sort_keys + ['id'] + + model_marker = None + if marker: + model_marker = context.session.query(model).get(marker) + try: + query = utils.paginate_query(query, model, limit, sort_keys, + model_marker, sort_dir) + except utils.InvalidSortKey as exc: + err_msg = encodeutils.exception_to_unicode(exc) + raise exception.Invalid(reason=err_msg) + return query + + +def _query_stack_get_all(context, show_deleted=False, + show_nested=False, show_hidden=False, tags=None, + tags_any=None, not_tags=None, not_tags_any=None): + if show_nested: + query = soft_delete_aware_query( + context, models.Stack, show_deleted=show_deleted + ).filter_by(backup=False) + else: + query = soft_delete_aware_query( + context, models.Stack, show_deleted=show_deleted + ).filter_by(owner_id=None) + + if not context.is_admin: + query = query.filter_by(tenant=context.tenant_id) + + query = query.options(orm.subqueryload("tags")) + if tags: + for tag in tags: + tag_alias = orm_aliased(models.StackTag) + query = query.join(tag_alias, models.Stack.tags) + query = query.filter(tag_alias.tag == tag) + + if tags_any: + query = query.filter( + models.Stack.tags.any( + models.StackTag.tag.in_(tags_any))) + + if not_tags: + subquery = soft_delete_aware_query( + context, models.Stack, show_deleted=show_deleted + ) + for tag in not_tags: + tag_alias = orm_aliased(models.StackTag) + subquery = subquery.join(tag_alias, models.Stack.tags) + subquery = subquery.filter(tag_alias.tag == tag) + not_stack_ids = [s.id for s in subquery.all()] + query = query.filter(models.Stack.id.notin_(not_stack_ids)) + + if not_tags_any: + query = query.filter( + ~models.Stack.tags.any( + models.StackTag.tag.in_(not_tags_any))) + + if not show_hidden and cfg.CONF.hidden_stack_tags: + query = query.filter( + ~models.Stack.tags.any( + models.StackTag.tag.in_(cfg.CONF.hidden_stack_tags))) + + return query + + +def stack_get_all(context, limit=None, sort_keys=None, marker=None, + sort_dir=None, filters=None, + show_deleted=False, show_nested=False, show_hidden=False, + tags=None, tags_any=None, not_tags=None, + not_tags_any=None, eager_load=False): + query = _query_stack_get_all(context, + show_deleted=show_deleted, + show_nested=show_nested, + show_hidden=show_hidden, tags=tags, + tags_any=tags_any, not_tags=not_tags, + not_tags_any=not_tags_any) + if eager_load: + query = query.options(orm.joinedload("raw_template")) + return _filter_and_page_query(context, query, limit, sort_keys, + marker, sort_dir, filters).all() + + +def _filter_and_page_query(context, query, limit=None, sort_keys=None, + marker=None, sort_dir=None, filters=None): + if filters is None: + filters = {} + + sort_key_map = {rpc_api.STACK_NAME: models.Stack.name.key, + rpc_api.STACK_STATUS: models.Stack.status.key, + rpc_api.STACK_CREATION_TIME: models.Stack.created_at.key, + rpc_api.STACK_UPDATED_TIME: models.Stack.updated_at.key} + valid_sort_keys = _get_sort_keys(sort_keys, sort_key_map) + + query = db_filters.exact_filter(query, models.Stack, filters) + return _paginate_query(context, query, models.Stack, limit, + valid_sort_keys, marker, sort_dir) + + +def stack_count_all(context, filters=None, + show_deleted=False, show_nested=False, show_hidden=False, + tags=None, tags_any=None, not_tags=None, + not_tags_any=None): + query = _query_stack_get_all(context, + show_deleted=show_deleted, + show_nested=show_nested, + show_hidden=show_hidden, tags=tags, + tags_any=tags_any, not_tags=not_tags, + not_tags_any=not_tags_any) + query = db_filters.exact_filter(query, models.Stack, filters) + return query.count() + + +def stack_create(context, values): + stack_ref = models.Stack() + stack_ref.update(values) + stack_name = stack_ref.name + stack_ref.save(context.session) + + # Even though we just created a stack with this name, we may not find + # it again because some unit tests create stacks with deleted_at set. Also + # some backup stacks may not be found, for reasons that are unclear. + earliest = stack_get_by_name(context, stack_name) + if earliest is not None and earliest.id != stack_ref.id: + context.session.query(models.Stack).filter_by(id=stack_ref.id).delete() + raise exception.StackExists(stack_name=stack_name) + + return stack_ref + + +@retry_on_db_error +def stack_update(context, stack_id, values, exp_trvsl=None): + session = context.session + with session.begin(subtransactions=True): + query = (session.query(models.Stack) + .filter(and_(models.Stack.id == stack_id), + (models.Stack.deleted_at.is_(None)))) + if not context.is_admin: + query = query.filter(sqlalchemy.or_( + models.Stack.tenant == context.tenant_id, + models.Stack.stack_user_project_id == context.tenant_id)) + if exp_trvsl is not None: + query = query.filter(models.Stack.current_traversal == exp_trvsl) + rows_updated = query.update(values, synchronize_session=False) + if not rows_updated: + LOG.debug('Did not set stack state with values ' + '%(vals)s, stack id: %(id)s with ' + 'expected traversal: %(trav)s', + {'id': stack_id, 'vals': str(values), + 'trav': str(exp_trvsl)}) + if not stack_get(context, stack_id, eager_load=False): + raise exception.NotFound( + _('Attempt to update a stack with id: ' + '%(id)s %(msg)s') % { + 'id': stack_id, + 'msg': 'that does not exist'}) + session.expire_all() + return (rows_updated is not None and rows_updated > 0) + + +def stack_delete(context, stack_id): + s = stack_get(context, stack_id, eager_load=False) + if not s: + raise exception.NotFound(_('Attempt to delete a stack with id: ' + '%(id)s %(msg)s') % { + 'id': stack_id, + 'msg': 'that does not exist'}) + session = context.session + with session.begin(): + attr_ids = [] + # normally the resources are deleted already by this point + for r in s.resources: + if r.attr_data_id is not None: + attr_ids.append(r.attr_data_id) + session.delete(r) + if attr_ids: + session.query( + models.ResourcePropertiesData.id).filter( + models.ResourcePropertiesData.id.in_(attr_ids)).delete( + synchronize_session=False) + delete_softly(context, s) + + +def _is_duplicate_error(exc): + return isinstance(exc, db_exception.DBDuplicateEntry) + + +@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True, + retry_on_disconnect=True, + retry_interval=0.5, + inc_retry_interval=True, + exception_checker=_is_duplicate_error) +def stack_lock_create(context, stack_id, engine_id): + with db_context.writer.independent.using(context) as session: + lock = session.query(models.StackLock).get(stack_id) + if lock is not None: + return lock.engine_id + session.add(models.StackLock(stack_id=stack_id, engine_id=engine_id)) + + +def stack_lock_get_engine_id(context, stack_id): + with db_context.reader.independent.using(context) as session: + lock = session.query(models.StackLock).get(stack_id) + if lock is not None: + return lock.engine_id + + +def persist_state_and_release_lock(context, stack_id, engine_id, values): + session = context.session + with session.begin(): + rows_updated = (session.query(models.Stack) + .filter(models.Stack.id == stack_id) + .update(values, synchronize_session=False)) + rows_affected = None + if rows_updated is not None and rows_updated > 0: + rows_affected = session.query( + models.StackLock + ).filter_by(stack_id=stack_id, engine_id=engine_id).delete() + session.expire_all() + if not rows_affected: + return True + + +def stack_lock_steal(context, stack_id, old_engine_id, new_engine_id): + with db_context.writer.independent.using(context) as session: + lock = session.query(models.StackLock).get(stack_id) + rows_affected = session.query( + models.StackLock + ).filter_by(stack_id=stack_id, engine_id=old_engine_id + ).update({"engine_id": new_engine_id}) + if not rows_affected: + return lock.engine_id if lock is not None else True + + +def stack_lock_release(context, stack_id, engine_id): + with db_context.writer.independent.using(context) as session: + rows_affected = session.query( + models.StackLock + ).filter_by(stack_id=stack_id, engine_id=engine_id).delete() + if not rows_affected: + return True + + +def stack_get_root_id(context, stack_id): + s = stack_get(context, stack_id, eager_load=False) + if not s: + return None + while s.owner_id: + s = stack_get(context, s.owner_id, eager_load=False) + return s.id + + +def stack_count_total_resources(context, stack_id): + # count all resources which belong to the root stack + return context.session.query( + func.count(models.Resource.id) + ).filter_by(root_stack_id=stack_id).scalar() + + +def user_creds_create(context): + values = context.to_dict() + user_creds_ref = models.UserCreds() + if values.get('trust_id'): + method, trust_id = crypt.encrypt(values.get('trust_id')) + user_creds_ref.trust_id = trust_id + user_creds_ref.decrypt_method = method + user_creds_ref.trustor_user_id = values.get('trustor_user_id') + user_creds_ref.username = None + user_creds_ref.password = None + user_creds_ref.tenant = values.get('tenant') + user_creds_ref.tenant_id = values.get('tenant_id') + user_creds_ref.auth_url = values.get('auth_url') + user_creds_ref.region_name = values.get('region_name') + else: + user_creds_ref.update(values) + method, password = crypt.encrypt(values['password']) + if len(str(password)) > 255: + raise exception.Error(_("Length of OS_PASSWORD after encryption" + " exceeds Heat limit (255 chars)")) + user_creds_ref.password = password + user_creds_ref.decrypt_method = method + user_creds_ref.save(context.session) + result = dict(user_creds_ref) + + if values.get('trust_id'): + result['trust_id'] = values.get('trust_id') + else: + result['password'] = values.get('password') + + return result + + +def user_creds_get(context, user_creds_id): + db_result = context.session.query(models.UserCreds).get(user_creds_id) + if db_result is None: + return None + # Return a dict copy of DB results, do not decrypt details into db_result + # or it can be committed back to the DB in decrypted form + result = dict(db_result) + del result['decrypt_method'] + result['password'] = crypt.decrypt( + db_result.decrypt_method, result['password']) + result['trust_id'] = crypt.decrypt( + db_result.decrypt_method, result['trust_id']) + return result + + +@db_utils.retry_on_stale_data_error +def user_creds_delete(context, user_creds_id): + creds = context.session.query(models.UserCreds).get(user_creds_id) + if not creds: + raise exception.NotFound( + _('Attempt to delete user creds with id ' + '%(id)s that does not exist') % {'id': user_creds_id}) + with context.session.begin(): + context.session.delete(creds) + + +def event_get_all_by_tenant(context, limit=None, marker=None, + sort_keys=None, sort_dir=None, filters=None): + query = context.session.query(models.Event) + query = db_filters.exact_filter(query, models.Event, filters) + query = query.join( + models.Event.stack + ).filter_by(tenant=context.tenant_id).filter_by(deleted_at=None) + filters = None + return _events_filter_and_page_query(context, query, limit, marker, + sort_keys, sort_dir, filters).all() + + +def _query_all_events_by_stack(context, stack_id): + return context.session.query(models.Event).filter_by(stack_id=stack_id) + + +def event_get_all_by_stack(context, stack_id, limit=None, marker=None, + sort_keys=None, sort_dir=None, filters=None): + query = _query_all_events_by_stack(context, stack_id) + if filters and 'uuid' in filters: + # retrieving a single event, so eager load its rsrc_prop_data detail + query = query.options(orm.joinedload("rsrc_prop_data")) + return _events_filter_and_page_query(context, query, limit, marker, + sort_keys, sort_dir, filters).all() + + +def _events_paginate_query(context, query, model, limit=None, sort_keys=None, + marker=None, sort_dir=None): + default_sort_keys = ['created_at'] + if not sort_keys: + sort_keys = default_sort_keys + if not sort_dir: + sort_dir = 'desc' + + # This assures the order of the stacks will always be the same + # even for sort_key values that are not unique in the database + sort_keys = sort_keys + ['id'] + + model_marker = None + if marker: + # not to use context.session.query(model).get(marker), because + # user can only see the ID(column 'uuid') and the ID as the marker + model_marker = context.session.query( + model).filter_by(uuid=marker).first() + try: + query = utils.paginate_query(query, model, limit, sort_keys, + model_marker, sort_dir) + except utils.InvalidSortKey as exc: + err_msg = encodeutils.exception_to_unicode(exc) + raise exception.Invalid(reason=err_msg) + + return query + + +def _events_filter_and_page_query(context, query, + limit=None, marker=None, + sort_keys=None, sort_dir=None, + filters=None): + if filters is None: + filters = {} + + sort_key_map = {rpc_api.EVENT_TIMESTAMP: models.Event.created_at.key, + rpc_api.EVENT_RES_TYPE: models.Event.resource_type.key} + valid_sort_keys = _get_sort_keys(sort_keys, sort_key_map) + + query = db_filters.exact_filter(query, models.Event, filters) + + return _events_paginate_query(context, query, models.Event, limit, + valid_sort_keys, marker, sort_dir) + + +def event_count_all_by_stack(context, stack_id): + query = context.session.query(func.count(models.Event.id)) + return query.filter_by(stack_id=stack_id).scalar() + + +def _find_rpd_references(context, stack_id): + ev_ref_ids = set(e.rsrc_prop_data_id for e + in _query_all_events_by_stack(context, stack_id).all()) + rsrc_ref_ids = set(r.rsrc_prop_data_id for r + in context.session.query(models.Resource).filter_by( + stack_id=stack_id).all()) + return ev_ref_ids | rsrc_ref_ids + + +def _all_backup_stack_ids(context, stack_id): + """Iterate over all the IDs of all stacks related as stack/backup pairs. + + All backup stacks of a main stack, past and present (i.e. including those + that are soft deleted), are included. The main stack itself is also + included if the initial ID passed in is for a backup stack. The initial ID + passed in is never included in the output. + """ + query = context.session.query(models.Stack) + stack = query.get(stack_id) + if stack is None: + LOG.error('Stack %s not found', stack_id) + return + is_backup = stack.name.endswith('*') + + if is_backup: + main = query.get(stack.owner_id) + if main is None: + LOG.error('Main stack for backup "%s" %s not found', + stack.name, stack_id) + return + yield main.id + for backup_id in _all_backup_stack_ids(context, main.id): + if backup_id != stack_id: + yield backup_id + else: + q_backup = query.filter(sqlalchemy.or_( + models.Stack.tenant == context.tenant_id, + models.Stack.stack_user_project_id == context.tenant_id)) + q_backup = q_backup.filter_by(name=stack.name + '*') + q_backup = q_backup.filter_by(owner_id=stack_id) + for backup in q_backup.all(): + yield backup.id + + +def _delete_event_rows(context, stack_id, limit): + # MySQL does not support LIMIT in subqueries, + # sqlite does not support JOIN in DELETE. + # So we must manually supply the IN() values. + # pgsql SHOULD work with the pure DELETE/JOIN below but that must be + # confirmed via integration tests. + session = context.session + with session.begin(): + query = _query_all_events_by_stack(context, stack_id) + query = query.order_by(models.Event.id).limit(limit) + id_pairs = [(e.id, e.rsrc_prop_data_id) for e in query.all()] + if not id_pairs: + return 0 + (ids, rsrc_prop_ids) = zip(*id_pairs) + max_id = ids[-1] + # delete the events + retval = session.query(models.Event).filter( + models.Event.id <= max_id).filter( + models.Event.stack_id == stack_id).delete() + + # delete unreferenced resource_properties_data + def del_rpd(rpd_ids): + if not rpd_ids: + return + q_rpd = session.query(models.ResourcePropertiesData) + q_rpd = q_rpd.filter(models.ResourcePropertiesData.id.in_(rpd_ids)) + q_rpd.delete(synchronize_session=False) + + if rsrc_prop_ids: + clr_prop_ids = set(rsrc_prop_ids) - _find_rpd_references(context, + stack_id) + clr_prop_ids.discard(None) + try: + del_rpd(clr_prop_ids) + except db_exception.DBReferenceError: + LOG.debug('Checking backup/stack pairs for RPD references') + found = False + for partner_stack_id in _all_backup_stack_ids(context, + stack_id): + found = True + clr_prop_ids -= _find_rpd_references(context, + partner_stack_id) + if not found: + LOG.debug('No backup/stack pairs found for %s', stack_id) + raise + del_rpd(clr_prop_ids) + + return retval + + +@retry_on_db_error +def event_create(context, values): + if 'stack_id' in values and cfg.CONF.max_events_per_stack: + # only count events and purge on average + # 200.0/cfg.CONF.event_purge_batch_size percent of the time. + check = (2.0 / cfg.CONF.event_purge_batch_size) > random.uniform(0, 1) + if (check and + (event_count_all_by_stack(context, values['stack_id']) >= + cfg.CONF.max_events_per_stack)): + # prune + try: + _delete_event_rows(context, values['stack_id'], + cfg.CONF.event_purge_batch_size) + except db_exception.DBError as exc: + LOG.error('Failed to purge events: %s', str(exc)) + event_ref = models.Event() + event_ref.update(values) + event_ref.save(context.session) + return event_ref + + +def software_config_create(context, values): + obj_ref = models.SoftwareConfig() + obj_ref.update(values) + obj_ref.save(context.session) + return obj_ref + + +def software_config_get(context, config_id): + result = context.session.query(models.SoftwareConfig).get(config_id) + if (result is not None and context is not None and not context.is_admin and + result.tenant != context.tenant_id): + result = None + + if not result: + raise exception.NotFound(_('Software config with id %s not found') % + config_id) + return result + + +def software_config_get_all(context, limit=None, marker=None): + query = context.session.query(models.SoftwareConfig) + if not context.is_admin: + query = query.filter_by(tenant=context.tenant_id) + return _paginate_query(context, query, models.SoftwareConfig, + limit=limit, marker=marker).all() + + +def software_config_delete(context, config_id): + config = software_config_get(context, config_id) + # Query if the software config has been referenced by deployment. + result = context.session.query(models.SoftwareDeployment).filter_by( + config_id=config_id).first() + if result: + msg = (_("Software config with id %s can not be deleted as " + "it is referenced.") % config_id) + raise exception.InvalidRestrictedAction(message=msg) + with context.session.begin(): + context.session.delete(config) + + +def software_deployment_create(context, values): + obj_ref = models.SoftwareDeployment() + obj_ref.update(values) + session = context.session + + with session.begin(): + obj_ref.save(session) + + return obj_ref + + +def software_deployment_get(context, deployment_id): + result = context.session.query( + models.SoftwareDeployment).get(deployment_id) + if (result is not None and context is not None and not context.is_admin and + context.tenant_id not in (result.tenant, + result.stack_user_project_id)): + result = None + + if not result: + raise exception.NotFound(_('Deployment with id %s not found') % + deployment_id) + return result + + +def software_deployment_get_all(context, server_id=None): + sd = models.SoftwareDeployment + query = context.session.query(sd).order_by(sd.created_at) + if not context.is_admin: + query = query.filter(sqlalchemy.or_( + sd.tenant == context.tenant_id, + sd.stack_user_project_id == context.tenant_id)) + if server_id: + query = query.filter_by(server_id=server_id) + + return query.all() + + +def software_deployment_update(context, deployment_id, values): + deployment = software_deployment_get(context, deployment_id) + update_and_save(context, deployment, values) + return deployment + + +def software_deployment_delete(context, deployment_id): + deployment = software_deployment_get(context, deployment_id) + session = context.session + with session.begin(): + session.delete(deployment) + + +def snapshot_create(context, values): + obj_ref = models.Snapshot() + obj_ref.update(values) + obj_ref.save(context.session) + return obj_ref + + +def snapshot_get(context, snapshot_id): + result = context.session.query(models.Snapshot).get(snapshot_id) + if (result is not None and context is not None and + context.tenant_id != result.tenant): + result = None + + if not result: + raise exception.NotFound(_('Snapshot with id %s not found') % + snapshot_id) + return result + + +def snapshot_get_by_stack(context, snapshot_id, stack): + snapshot = snapshot_get(context, snapshot_id) + if snapshot.stack_id != stack.id: + raise exception.SnapshotNotFound(snapshot=snapshot_id, + stack=stack.name) + + return snapshot + + +def snapshot_update(context, snapshot_id, values): + snapshot = snapshot_get(context, snapshot_id) + snapshot.update(values) + snapshot.save(context.session) + return snapshot + + +def snapshot_delete(context, snapshot_id): + snapshot = snapshot_get(context, snapshot_id) + with context.session.begin(): + context.session.delete(snapshot) + + +def snapshot_get_all(context, stack_id): + return context.session.query(models.Snapshot).filter_by( + stack_id=stack_id, tenant=context.tenant_id) + + +def service_create(context, values): + service = models.Service() + service.update(values) + service.save(context.session) + return service + + +def service_update(context, service_id, values): + service = service_get(context, service_id) + values.update({'updated_at': timeutils.utcnow()}) + service.update(values) + service.save(context.session) + return service + + +def service_delete(context, service_id, soft_delete=True): + service = service_get(context, service_id) + session = context.session + with session.begin(): + if soft_delete: + delete_softly(context, service) + else: + session.delete(service) + + +def service_get(context, service_id): + result = context.session.query(models.Service).get(service_id) + if result is None: + raise exception.EntityNotFound(entity='Service', name=service_id) + return result + + +def service_get_all(context): + return (context.session.query(models.Service). + filter_by(deleted_at=None).all()) + + +def service_get_all_by_args(context, host, binary, hostname): + return (context.session.query(models.Service). + filter_by(host=host). + filter_by(binary=binary). + filter_by(hostname=hostname).all()) + + +def purge_deleted(age, granularity='days', project_id=None, batch_size=20): + def _validate_positive_integer(val, argname): + try: + val = int(val) + except ValueError: + raise exception.Error(_("%s should be an integer") % argname) + + if val < 0: + raise exception.Error(_("%s should be a positive integer") + % argname) + return val + + age = _validate_positive_integer(age, 'age') + batch_size = _validate_positive_integer(batch_size, 'batch_size') + + if granularity not in ('days', 'hours', 'minutes', 'seconds'): + raise exception.Error( + _("granularity should be days, hours, minutes, or seconds")) + + if granularity == 'days': + age = age * 86400 + elif granularity == 'hours': + age = age * 3600 + elif granularity == 'minutes': + age = age * 60 + + time_line = timeutils.utcnow() - datetime.timedelta(seconds=age) + engine = get_engine() + meta = sqlalchemy.MetaData() + meta.bind = engine + + stack = sqlalchemy.Table('stack', meta, autoload=True) + service = sqlalchemy.Table('service', meta, autoload=True) + + # Purge deleted services + srvc_del = service.delete().where(service.c.deleted_at < time_line) + engine.execute(srvc_del) + + # find the soft-deleted stacks that are past their expiry + sel = sqlalchemy.select([stack.c.id, stack.c.raw_template_id, + stack.c.prev_raw_template_id, + stack.c.user_creds_id, + stack.c.action, + stack.c.status, + stack.c.name]) + if project_id: + stack_where = sel.where(and_( + stack.c.tenant == project_id, + stack.c.deleted_at < time_line)) + else: + stack_where = sel.where( + stack.c.deleted_at < time_line) + + stacks = engine.execute(stack_where) + + while True: + next_stacks_to_purge = list(itertools.islice(stacks, batch_size)) + if len(next_stacks_to_purge): + _purge_stacks(next_stacks_to_purge, engine, meta) + else: + break + + +@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True, + retry_interval=0.5, inc_retry_interval=True) +def _purge_stacks(stack_infos, engine, meta): + """Purge some stacks and their releated events, raw_templates, etc. + + stack_infos is a list of lists of selected stack columns: + [[id, raw_template_id, prev_raw_template_id, user_creds_id, + action, status, name], ...] + """ + + stack = sqlalchemy.Table('stack', meta, autoload=True) + stack_lock = sqlalchemy.Table('stack_lock', meta, autoload=True) + stack_tag = sqlalchemy.Table('stack_tag', meta, autoload=True) + resource = sqlalchemy.Table('resource', meta, autoload=True) + resource_data = sqlalchemy.Table('resource_data', meta, autoload=True) + resource_properties_data = sqlalchemy.Table( + 'resource_properties_data', meta, autoload=True) + event = sqlalchemy.Table('event', meta, autoload=True) + raw_template = sqlalchemy.Table('raw_template', meta, autoload=True) + raw_template_files = sqlalchemy.Table('raw_template_files', meta, + autoload=True) + user_creds = sqlalchemy.Table('user_creds', meta, autoload=True) + syncpoint = sqlalchemy.Table('sync_point', meta, autoload=True) + + stack_info_str = ','.join([str(i) for i in stack_infos]) + LOG.info("Purging stacks %s", stack_info_str) + + # TODO(cwolfe): find a way to make this re-entrant with + # reasonably sized transactions (good luck), or add + # a cleanup for orphaned rows. + stack_ids = [stack_info[0] for stack_info in stack_infos] + # delete stack locks (just in case some got stuck) + stack_lock_del = stack_lock.delete().where( + stack_lock.c.stack_id.in_(stack_ids)) + engine.execute(stack_lock_del) + # delete stack tags + stack_tag_del = stack_tag.delete().where( + stack_tag.c.stack_id.in_(stack_ids)) + engine.execute(stack_tag_del) + # delete resource_data + res_where = sqlalchemy.select([resource.c.id]).where( + resource.c.stack_id.in_(stack_ids)) + res_data_del = resource_data.delete().where( + resource_data.c.resource_id.in_(res_where)) + engine.execute(res_data_del) + # clean up any sync_points that may have lingered + sync_del = syncpoint.delete().where( + syncpoint.c.stack_id.in_(stack_ids)) + engine.execute(sync_del) + + # get rsrc_prop_data_ids to delete + rsrc_prop_data_where = sqlalchemy.select( + [resource.c.rsrc_prop_data_id]).where( + resource.c.stack_id.in_(stack_ids)) + rsrc_prop_data_ids = set( + [i[0] for i in list(engine.execute(rsrc_prop_data_where))]) + rsrc_prop_data_where = sqlalchemy.select( + [resource.c.attr_data_id]).where( + resource.c.stack_id.in_(stack_ids)) + rsrc_prop_data_ids.update( + [i[0] for i in list(engine.execute(rsrc_prop_data_where))]) + rsrc_prop_data_where = sqlalchemy.select( + [event.c.rsrc_prop_data_id]).where( + event.c.stack_id.in_(stack_ids)) + rsrc_prop_data_ids.update( + [i[0] for i in list(engine.execute(rsrc_prop_data_where))]) + # delete events + event_del = event.delete().where(event.c.stack_id.in_(stack_ids)) + engine.execute(event_del) + # delete resources (normally there shouldn't be any) + res_del = resource.delete().where(resource.c.stack_id.in_(stack_ids)) + engine.execute(res_del) + # delete resource_properties_data + if rsrc_prop_data_ids: # keep rpd's in events + rsrc_prop_data_where = sqlalchemy.select( + [event.c.rsrc_prop_data_id]).where( + event.c.rsrc_prop_data_id.in_(rsrc_prop_data_ids)) + ids = list(engine.execute(rsrc_prop_data_where)) + rsrc_prop_data_ids.difference_update([i[0] for i in ids]) + if rsrc_prop_data_ids: # keep rpd's in resources + rsrc_prop_data_where = sqlalchemy.select( + [resource.c.rsrc_prop_data_id]).where( + resource.c.rsrc_prop_data_id.in_(rsrc_prop_data_ids)) + ids = list(engine.execute(rsrc_prop_data_where)) + rsrc_prop_data_ids.difference_update([i[0] for i in ids]) + if rsrc_prop_data_ids: # delete if we have any + rsrc_prop_data_del = resource_properties_data.delete().where( + resource_properties_data.c.id.in_(rsrc_prop_data_ids)) + engine.execute(rsrc_prop_data_del) + # delete the stacks + stack_del = stack.delete().where(stack.c.id.in_(stack_ids)) + engine.execute(stack_del) + # delete orphaned raw templates + raw_template_ids = [i[1] for i in stack_infos if i[1] is not None] + raw_template_ids.extend(i[2] for i in stack_infos if i[2] is not None) + if raw_template_ids: # keep those still referenced + raw_tmpl_sel = sqlalchemy.select([stack.c.raw_template_id]).where( + stack.c.raw_template_id.in_(raw_template_ids)) + raw_tmpl = [i[0] for i in engine.execute(raw_tmpl_sel)] + raw_template_ids = set(raw_template_ids) - set(raw_tmpl) + if raw_template_ids: # keep those still referenced (previous tmpl) + raw_tmpl_sel = sqlalchemy.select( + [stack.c.prev_raw_template_id]).where( + stack.c.prev_raw_template_id.in_(raw_template_ids)) + raw_tmpl = [i[0] for i in engine.execute(raw_tmpl_sel)] + raw_template_ids = raw_template_ids - set(raw_tmpl) + if raw_template_ids: # delete raw_templates if we have any + raw_tmpl_file_sel = sqlalchemy.select( + [raw_template.c.files_id]).where( + raw_template.c.id.in_(raw_template_ids)) + raw_tmpl_file_ids = [i[0] for i in engine.execute( + raw_tmpl_file_sel)] + raw_templ_del = raw_template.delete().where( + raw_template.c.id.in_(raw_template_ids)) + engine.execute(raw_templ_del) + if raw_tmpl_file_ids: # keep _files still referenced + raw_tmpl_file_sel = sqlalchemy.select( + [raw_template.c.files_id]).where( + raw_template.c.files_id.in_(raw_tmpl_file_ids)) + raw_tmpl_files = [i[0] for i in engine.execute( + raw_tmpl_file_sel)] + raw_tmpl_file_ids = set(raw_tmpl_file_ids) \ + - set(raw_tmpl_files) + if raw_tmpl_file_ids: # delete _files if we have any + raw_tmpl_file_del = raw_template_files.delete().where( + raw_template_files.c.id.in_(raw_tmpl_file_ids)) + engine.execute(raw_tmpl_file_del) + # purge any user creds that are no longer referenced + user_creds_ids = [i[3] for i in stack_infos if i[3] is not None] + if user_creds_ids: # keep those still referenced + user_sel = sqlalchemy.select([stack.c.user_creds_id]).where( + stack.c.user_creds_id.in_(user_creds_ids)) + users = [i[0] for i in engine.execute(user_sel)] + user_creds_ids = set(user_creds_ids) - set(users) + if user_creds_ids: # delete if we have any + usr_creds_del = user_creds.delete().where( + user_creds.c.id.in_(user_creds_ids)) + engine.execute(usr_creds_del) + + +def sync_point_delete_all_by_stack_and_traversal(context, stack_id, + traversal_id): + rows_deleted = context.session.query(models.SyncPoint).filter_by( + stack_id=stack_id, traversal_id=traversal_id).delete() + return rows_deleted + + +@retry_on_db_error +def sync_point_create(context, values): + values['entity_id'] = str(values['entity_id']) + sync_point_ref = models.SyncPoint() + sync_point_ref.update(values) + sync_point_ref.save(context.session) + return sync_point_ref + + +def sync_point_get(context, entity_id, traversal_id, is_update): + entity_id = str(entity_id) + return context.session.query(models.SyncPoint).get( + (entity_id, traversal_id, is_update) + ) + + +@retry_on_db_error +def sync_point_update_input_data(context, entity_id, + traversal_id, is_update, atomic_key, + input_data): + entity_id = str(entity_id) + rows_updated = context.session.query(models.SyncPoint).filter_by( + entity_id=entity_id, + traversal_id=traversal_id, + is_update=is_update, + atomic_key=atomic_key + ).update({"input_data": input_data, "atomic_key": atomic_key + 1}) + return rows_updated + + +def _crypt_action(encrypt): + if encrypt: + return _('encrypt') + return _('decrypt') + + +def _db_encrypt_or_decrypt_template_params( + ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False): + from heat.engine import template + session = ctxt.session + excs = [] + query = session.query(models.RawTemplate) + template_batches = _get_batch( + session, ctxt=ctxt, query=query, model=models.RawTemplate, + batch_size=batch_size) + next_batch = list(itertools.islice(template_batches, batch_size)) + while next_batch: + with session.begin(): + for raw_template in next_batch: + try: + if verbose: + LOG.info("Processing raw_template %s...", + raw_template.id) + env = raw_template.environment + needs_update = False + + # using "in env.keys()" so an exception is raised + # if env is something weird like a string. + if env is None or 'parameters' not in env.keys(): + continue + if 'encrypted_param_names' in env: + encrypted_params = env['encrypted_param_names'] + else: + encrypted_params = [] + + if encrypt: + tmpl = template.Template.load( + ctxt, raw_template.id, raw_template) + param_schemata = tmpl.param_schemata() + if not param_schemata: + continue + + for param_name, param_val in env['parameters'].items(): + if (param_name in encrypted_params or + param_name not in param_schemata or + not param_schemata[param_name].hidden): + continue + encrypted_val = crypt.encrypt( + str(param_val), encryption_key) + env['parameters'][param_name] = encrypted_val + encrypted_params.append(param_name) + needs_update = True + if needs_update: + newenv = env.copy() + newenv['encrypted_param_names'] = encrypted_params + else: # decrypt + for param_name in encrypted_params: + method, value = env['parameters'][param_name] + decrypted_val = crypt.decrypt(method, value, + encryption_key) + env['parameters'][param_name] = decrypted_val + needs_update = True + if needs_update: + newenv = env.copy() + newenv['encrypted_param_names'] = [] + + if needs_update: + raw_template_update(ctxt, raw_template.id, + {'environment': newenv}) + except Exception as exc: + LOG.exception('Failed to %(crypt_action)s parameters ' + 'of raw template %(id)d', + {'id': raw_template.id, + 'crypt_action': _crypt_action(encrypt)}) + excs.append(exc) + continue + finally: + if verbose: + LOG.info("Finished %(crypt_action)s processing of " + "raw_template %(id)d.", + {'id': raw_template.id, + 'crypt_action': _crypt_action(encrypt)}) + next_batch = list(itertools.islice(template_batches, batch_size)) + return excs + + +def _db_encrypt_or_decrypt_resource_prop_data_legacy( + ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False): + session = ctxt.session + excs = [] + + # Older resources may have properties_data in the legacy column, + # so update those as needed + query = session.query(models.Resource).filter( + models.Resource.properties_data_encrypted.isnot(encrypt)) + resource_batches = _get_batch( + session=session, ctxt=ctxt, query=query, model=models.Resource, + batch_size=batch_size) + next_batch = list(itertools.islice(resource_batches, batch_size)) + while next_batch: + with session.begin(): + for resource in next_batch: + if not resource.properties_data: + continue + try: + if verbose: + LOG.info("Processing resource %s...", + resource.id) + if encrypt: + result = crypt.encrypted_dict(resource.properties_data, + encryption_key) + else: + result = crypt.decrypted_dict(resource.properties_data, + encryption_key) + resource_update(ctxt, resource.id, + {'properties_data': result, + 'properties_data_encrypted': encrypt}, + resource.atomic_key) + except Exception as exc: + LOG.exception('Failed to %(crypt_action)s ' + 'properties_data of resource %(id)d' % + {'id': resource.id, + 'crypt_action': _crypt_action(encrypt)}) + excs.append(exc) + continue + finally: + if verbose: + LOG.info("Finished processing resource %s.", + resource.id) + next_batch = list(itertools.islice(resource_batches, batch_size)) + return excs + + +def _db_encrypt_or_decrypt_resource_prop_data( + ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False): + session = ctxt.session + excs = [] + + # Older resources may have properties_data in the legacy column, + # so update those as needed + query = session.query(models.ResourcePropertiesData).filter( + models.ResourcePropertiesData.encrypted.isnot(encrypt)) + rpd_batches = _get_batch( + session=session, ctxt=ctxt, query=query, + model=models.ResourcePropertiesData, batch_size=batch_size) + next_batch = list(itertools.islice(rpd_batches, batch_size)) + while next_batch: + with session.begin(): + for rpd in next_batch: + if not rpd.data: + continue + try: + if verbose: + LOG.info("Processing resource_properties_data " + "%s...", rpd.id) + if encrypt: + result = crypt.encrypted_dict(rpd.data, + encryption_key) + else: + result = crypt.decrypted_dict(rpd.data, + encryption_key) + rpd.update({'data': result, + 'encrypted': encrypt}) + except Exception as exc: + LOG.exception( + "Failed to %(crypt_action)s " + "data of resource_properties_data %(id)d" % + {'id': rpd.id, + 'crypt_action': _crypt_action(encrypt)}) + excs.append(exc) + continue + finally: + if verbose: + LOG.info( + "Finished processing resource_properties_data" + " %s.", rpd.id) + next_batch = list(itertools.islice(rpd_batches, batch_size)) + return excs + + +def db_encrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50, + verbose=False): + """Encrypt parameters and properties for all templates in db. + + :param ctxt: RPC context + :param encryption_key: key that will be used for parameter and property + encryption + :param batch_size: number of templates requested from DB in each iteration. + 50 means that heat requests 50 templates, encrypt them + and proceed with next 50 items. + :param verbose: log an INFO message when processing of each raw_template or + resource begins or ends + :return: list of exceptions encountered during encryption + """ + excs = [] + excs.extend(_db_encrypt_or_decrypt_template_params( + ctxt, encryption_key, True, batch_size, verbose)) + excs.extend(_db_encrypt_or_decrypt_resource_prop_data( + ctxt, encryption_key, True, batch_size, verbose)) + excs.extend(_db_encrypt_or_decrypt_resource_prop_data_legacy( + ctxt, encryption_key, True, batch_size, verbose)) + return excs + + +def db_decrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50, + verbose=False): + """Decrypt parameters and properties for all templates in db. + + :param ctxt: RPC context + :param encryption_key: key that will be used for parameter and property + decryption + :param batch_size: number of templates requested from DB in each iteration. + 50 means that heat requests 50 templates, encrypt them + and proceed with next 50 items. + :param verbose: log an INFO message when processing of each raw_template or + resource begins or ends + :return: list of exceptions encountered during decryption + """ + excs = [] + excs.extend(_db_encrypt_or_decrypt_template_params( + ctxt, encryption_key, False, batch_size, verbose)) + excs.extend(_db_encrypt_or_decrypt_resource_prop_data( + ctxt, encryption_key, False, batch_size, verbose)) + excs.extend(_db_encrypt_or_decrypt_resource_prop_data_legacy( + ctxt, encryption_key, False, batch_size, verbose)) + return excs + + +def db_properties_data_migrate(ctxt, batch_size=50): + """Migrate properties data from legacy columns to new location in db. + + :param ctxt: RPC context + :param batch_size: number of templates requested from DB in each iteration. + 50 means that heat requests 50 templates, encrypt them + and proceed with next 50 items. + """ + session = ctxt.session + + query = session.query(models.Resource).filter(and_( + models.Resource.properties_data.isnot(None), + models.Resource.rsrc_prop_data_id.is_(None))) + resource_batches = _get_batch( + session=session, ctxt=ctxt, query=query, + model=models.Resource, batch_size=batch_size) + next_batch = list(itertools.islice(resource_batches, batch_size)) + while next_batch: + with session.begin(): + for resource in next_batch: + try: + encrypted = resource.properties_data_encrypted + if encrypted is None: + LOG.warning( + 'Unexpected: resource.encrypted is None for ' + 'resource id %s for legacy ' + 'resource.properties_data, assuming False.', + resource.id) + encrypted = False + rsrc_prop_data = resource_prop_data_create( + ctxt, {'encrypted': encrypted, + 'data': resource.properties_data}) + resource_update(ctxt, resource.id, + {'properties_data_encrypted': None, + 'properties_data': None, + 'rsrc_prop_data_id': rsrc_prop_data.id}, + resource.atomic_key) + except Exception: + LOG.exception('Failed to migrate properties_data for ' + 'resource %d', resource.id) + continue + next_batch = list(itertools.islice(resource_batches, batch_size)) + + query = session.query(models.Event).filter(and_( + models.Event.resource_properties.isnot(None), + models.Event.rsrc_prop_data_id.is_(None))) + event_batches = _get_batch( + session=session, ctxt=ctxt, query=query, + model=models.Event, batch_size=batch_size) + next_batch = list(itertools.islice(event_batches, batch_size)) + while next_batch: + with session.begin(): + for event in next_batch: + try: + prop_data = event.resource_properties + rsrc_prop_data = resource_prop_data_create( + ctxt, {'encrypted': False, + 'data': prop_data}) + event.update({'resource_properties': None, + 'rsrc_prop_data_id': rsrc_prop_data.id}) + except Exception: + LOG.exception('Failed to migrate resource_properties ' + 'for event %d', event.id) + continue + next_batch = list(itertools.islice(event_batches, batch_size)) + + +def _get_batch(session, ctxt, query, model, batch_size=50): + last_batch_marker = None + while True: + results = _paginate_query( + context=ctxt, query=query, model=model, limit=batch_size, + marker=last_batch_marker).all() + if not results: + break + else: + for result in results: + yield result + last_batch_marker = results[-1].id + + +def reset_stack_status(context, stack_id, stack=None): + session = context.session + if stack is None: + stack = session.query(models.Stack).get(stack_id) + + if stack is None: + raise exception.NotFound(_('Stack with id %s not found') % stack_id) + + with session.begin(): + query = session.query(models.Resource).filter_by( + status='IN_PROGRESS', stack_id=stack_id) + query.update({'status': 'FAILED', + 'status_reason': 'Stack status manually reset', + 'engine_id': None}) + + query = session.query(models.ResourceData) + query = query.join(models.Resource) + query = query.filter_by(stack_id=stack_id) + query = query.filter( + models.ResourceData.key.in_(heat_environment.HOOK_TYPES)) + data_ids = [data.id for data in query] + + if data_ids: + query = session.query(models.ResourceData) + query = query.filter(models.ResourceData.id.in_(data_ids)) + query.delete(synchronize_session='fetch') + + query = session.query(models.Stack).filter_by(owner_id=stack_id) + for child in query: + reset_stack_status(context, child.id, child) + + with session.begin(): + if stack.status == 'IN_PROGRESS': + stack.status = 'FAILED' + stack.status_reason = 'Stack status manually reset' + + session.query( + models.StackLock + ).filter_by(stack_id=stack_id).delete() diff --git a/heat/db/filters.py b/heat/db/filters.py new file mode 100644 index 000000000..6c7b8cf24 --- /dev/null +++ b/heat/db/filters.py @@ -0,0 +1,44 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +def exact_filter(query, model, filters): + """Applies exact match filtering to a query. + + Returns the updated query. Modifies filters argument to remove + filters consumed. + + :param query: query to apply filters to + :param model: model object the query applies to, for IN-style + filtering + :param filters: dictionary of filters; values that are lists, + tuples, sets, or frozensets cause an 'IN' test to + be performed, while exact matching ('==' operator) + is used for other values + """ + + filter_dict = {} + if filters is None: + filters = {} + + for key, value in filters.items(): + if isinstance(value, (list, tuple, set, frozenset)): + column_attr = getattr(model, key) + query = query.filter(column_attr.in_(value)) + else: + filter_dict[key] = value + + if filter_dict: + query = query.filter_by(**filter_dict) + + return query diff --git a/heat/db/migration.py b/heat/db/migration.py new file mode 100644 index 000000000..910656ca1 --- /dev/null +++ b/heat/db/migration.py @@ -0,0 +1,117 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from alembic import command as alembic_api +from alembic import config as alembic_config +from alembic import migration as alembic_migration +from oslo_log import log as logging +import sqlalchemy as sa + +from heat.db import api as db_api + +LOG = logging.getLogger(__name__) + +ALEMBIC_INIT_VERSION = 'c6214ca60943' + + +def _migrate_legacy_database(engine, connection, config): + """Check if database is a legacy sqlalchemy-migrate-managed database. + + If it is, migrate it by "stamping" the initial alembic schema. + """ + # If the database doesn't have the sqlalchemy-migrate legacy migration + # table, we don't have anything to do + if not sa.inspect(engine).has_table('migrate_version'): + return + + # Likewise, if we've already migrated to alembic, we don't have anything to + # do + context = alembic_migration.MigrationContext.configure(connection) + if context.get_current_revision(): + return + + # We have legacy migrations but no alembic migration. Stamp (dummy apply) + # the initial alembic migration. + + LOG.info( + 'The database is still under sqlalchemy-migrate control; ' + 'fake applying the initial alembic migration' + ) + alembic_api.stamp(config, ALEMBIC_INIT_VERSION) + + +def _find_alembic_conf(): + """Get the project's alembic configuration + + :returns: An instance of ``alembic.config.Config`` + """ + path = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + 'alembic.ini', + ) + config = alembic_config.Config(os.path.abspath(path)) + # we don't want to use the logger configuration from the file, which is + # only really intended for the CLI + # https://stackoverflow.com/a/42691781/613428 + config.attributes['configure_logger'] = False + return config + + +def _upgrade_alembic(engine, config, version): + # re-use the connection rather than creating a new one + with engine.begin() as connection: + config.attributes['connection'] = connection + _migrate_legacy_database(engine, connection, config) + alembic_api.upgrade(config, version or 'head') + + +def db_sync(version=None, engine=None): + """Migrate the database to `version` or the most recent version.""" + # if the user requested a specific version, check if it's an integer: if + # so, we're almost certainly in sqlalchemy-migrate land and won't support + # that + if version is not None and version.isdigit(): + raise ValueError( + 'You requested an sqlalchemy-migrate database version; this is ' + 'no longer supported' + ) + + if engine is None: + engine = db_api.get_engine() + + config = _find_alembic_conf() + + # discard the URL encoded in alembic.ini in favour of the URL configured + # for the engine by the database fixtures, casting from + # 'sqlalchemy.engine.url.URL' to str in the process. This returns a + # RFC-1738 quoted URL, which means that a password like "foo@" will be + # turned into "foo%40". This in turns causes a problem for + # set_main_option() because that uses ConfigParser.set, which (by design) + # uses *python* interpolation to write the string out ... where "%" is the + # special python interpolation character! Avoid this mismatch by quoting + # all %'s for the set below. + engine_url = str(engine.url).replace('%', '%%') + config.set_main_option('sqlalchemy.url', str(engine_url)) + + LOG.info('Applying migration(s)') + _upgrade_alembic(engine, config, version) + LOG.info('Migration(s) applied') + + +def db_version(): + """Get database version.""" + engine = db_api.get_engine() + with engine.connect() as connection: + m_context = alembic_migration.MigrationContext.configure(connection) + return m_context.get_current_revision() diff --git a/heat/db/migrations/README.rst b/heat/db/migrations/README.rst new file mode 100644 index 000000000..b2283fc82 --- /dev/null +++ b/heat/db/migrations/README.rst @@ -0,0 +1,15 @@ +Database migrations +=================== + +This directory contains migrations for the database. These are implemented +using `alembic`__, a lightweight database migration tool designed for usage +with `SQLAlchemy`__. + +The best place to start understanding Alembic is with its own `tutorial`__. You +can also play around with the :command:`alembic` command:: + + $ alembic --help + +.. __: https://alembic.sqlalchemy.org/en/latest/ +.. __: https://www.sqlalchemy.org/ +.. __: https://alembic.sqlalchemy.org/en/latest/tutorial.html diff --git a/heat/db/migrations/env.py b/heat/db/migrations/env.py new file mode 100644 index 000000000..932b16a97 --- /dev/null +++ b/heat/db/migrations/env.py @@ -0,0 +1,94 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from logging.config import fileConfig + +from alembic import context +from sqlalchemy import engine_from_config +from sqlalchemy import pool + +from heat.db import models + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.attributes.get('configure_logger', True): + fileConfig(config.config_file_name) + +# this is the MetaData object for the various models in the database +target_metadata = models.BASE.metadata + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL and not an Engine, though an + Engine is acceptable here as well. By skipping the Engine creation we + don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine and associate a connection + with the context. + + This is modified from the default based on the below, since we want to + share an engine when unit testing so in-memory database testing actually + works. + + https://alembic.sqlalchemy.org/en/latest/cookbook.html#connection-sharing + """ + connectable = config.attributes.get('connection', None) + + if connectable is None: + # only create Engine if we don't have a Connection from the outside + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + # when connectable is already a Connection object, calling connect() gives + # us a *branched connection* + + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=target_metadata, + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/heat/db/migrations/script.py.mako b/heat/db/migrations/script.py.mako new file mode 100644 index 000000000..120937fbc --- /dev/null +++ b/heat/db/migrations/script.py.mako @@ -0,0 +1,20 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} +""" + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} diff --git a/heat/db/migrations/versions/c6214ca60943_initial_revision.py b/heat/db/migrations/versions/c6214ca60943_initial_revision.py new file mode 100644 index 000000000..d3e9d757d --- /dev/null +++ b/heat/db/migrations/versions/c6214ca60943_initial_revision.py @@ -0,0 +1,392 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Initial revision + +Revision ID: c6214ca60943 +Revises: +Create Date: 2023-03-22 18:04:02.387269 +""" + +from alembic import op +import sqlalchemy as sa + +import heat.db.types + +# revision identifiers, used by Alembic. +revision = 'c6214ca60943' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + 'raw_template_files', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('files', heat.db.types.Json(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id'), + mysql_engine='InnoDB', + ) + op.create_table( + 'resource_properties_data', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('data', heat.db.types.Json(), nullable=True), + sa.Column('encrypted', sa.Boolean(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id'), + mysql_engine='InnoDB', + ) + op.create_table( + 'service', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('engine_id', sa.String(length=36), nullable=False), + sa.Column('host', sa.String(length=255), nullable=False), + sa.Column('hostname', sa.String(length=255), nullable=False), + sa.Column('binary', sa.String(length=255), nullable=False), + sa.Column('topic', sa.String(length=255), nullable=False), + sa.Column('report_interval', sa.Integer(), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('deleted_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id'), + mysql_engine='InnoDB', + ) + op.create_table( + 'software_config', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('name', sa.String(length=255), nullable=True), + sa.Column('group', sa.String(length=255), nullable=True), + sa.Column('config', heat.db.types.Json(), nullable=True), + sa.Column('tenant', sa.String(length=64), nullable=False), + sa.PrimaryKeyConstraint('id'), + mysql_engine='InnoDB', + ) + op.create_index( + op.f('ix_software_config_tenant'), + 'software_config', + ['tenant'], + unique=False, + ) + op.create_table( + 'user_creds', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('username', sa.String(length=255), nullable=True), + sa.Column('password', sa.String(length=255), nullable=True), + sa.Column('region_name', sa.String(length=255), nullable=True), + sa.Column('decrypt_method', sa.String(length=64), nullable=True), + sa.Column('tenant', sa.String(length=1024), nullable=True), + sa.Column('auth_url', sa.Text(), nullable=True), + sa.Column('tenant_id', sa.String(length=256), nullable=True), + sa.Column('trust_id', sa.String(length=255), nullable=True), + sa.Column('trustor_user_id', sa.String(length=64), nullable=True), + sa.PrimaryKeyConstraint('id'), + mysql_engine='InnoDB', + ) + op.create_table( + 'raw_template', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('template', heat.db.types.Json(), nullable=True), + sa.Column('files', heat.db.types.Json(), nullable=True), + sa.Column( + 'environment', heat.db.types.Json(), nullable=True + ), + sa.Column('files_id', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint( + ['files_id'], + ['raw_template_files.id'], + name='raw_tmpl_files_fkey_ref', + ), + sa.PrimaryKeyConstraint('id'), + mysql_engine='InnoDB', + ) + op.create_table( + 'software_deployment', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('server_id', sa.String(length=36), nullable=False), + sa.Column('config_id', sa.String(length=36), nullable=False), + sa.Column( + 'input_values', heat.db.types.Json(), nullable=True + ), + sa.Column( + 'output_values', heat.db.types.Json(), nullable=True + ), + sa.Column('action', sa.String(length=255), nullable=True), + sa.Column('status', sa.String(length=255), nullable=True), + sa.Column('status_reason', sa.Text(), nullable=True), + sa.Column('tenant', sa.String(length=64), nullable=False), + sa.Column( + 'stack_user_project_id', sa.String(length=64), nullable=True + ), + sa.ForeignKeyConstraint( + ['config_id'], + ['software_config.id'], + ), + sa.PrimaryKeyConstraint('id'), + ) + op.create_index( + 'ix_software_deployment_created_at', + 'software_deployment', + ['created_at'], + unique=False, + ) + op.create_index( + op.f('ix_software_deployment_server_id'), + 'software_deployment', + ['server_id'], + unique=False, + ) + op.create_index( + op.f('ix_software_deployment_tenant'), + 'software_deployment', + ['tenant'], + unique=False, + ) + op.create_table( + 'stack', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('deleted_at', sa.DateTime(), nullable=True), + sa.Column('name', sa.String(length=255), nullable=True), + sa.Column('raw_template_id', sa.Integer(), nullable=False), + sa.Column('prev_raw_template_id', sa.Integer(), nullable=True), + sa.Column('user_creds_id', sa.Integer(), nullable=True), + sa.Column('username', sa.String(length=256), nullable=True), + sa.Column('owner_id', sa.String(length=36), nullable=True), + sa.Column('action', sa.String(length=255), nullable=True), + sa.Column('status', sa.String(length=255), nullable=True), + sa.Column('status_reason', sa.Text(), nullable=True), + sa.Column('timeout', sa.Integer(), nullable=True), + sa.Column('tenant', sa.String(length=256), nullable=True), + sa.Column('disable_rollback', sa.Boolean(), nullable=False), + sa.Column( + 'stack_user_project_id', sa.String(length=64), nullable=True + ), + sa.Column('backup', sa.Boolean(), nullable=True), + sa.Column('nested_depth', sa.Integer(), nullable=True), + sa.Column('convergence', sa.Boolean(), nullable=True), + sa.Column('current_traversal', sa.String(length=36), nullable=True), + sa.Column( + 'current_deps', heat.db.types.Json(), nullable=True + ), + sa.Column( + 'parent_resource_name', sa.String(length=255), nullable=True + ), + sa.ForeignKeyConstraint( + ['prev_raw_template_id'], + ['raw_template.id'], + ), + sa.ForeignKeyConstraint( + ['raw_template_id'], + ['raw_template.id'], + ), + sa.ForeignKeyConstraint( + ['user_creds_id'], + ['user_creds.id'], + ), + sa.PrimaryKeyConstraint('id'), + ) + op.create_index( + 'ix_stack_name', 'stack', ['name'], unique=False, mysql_length=255 + ) + op.create_index( + 'ix_stack_tenant', 'stack', ['tenant'], unique=False, mysql_length=255 + ) + op.create_index( + op.f('ix_stack_owner_id'), 'stack', ['owner_id'], unique=False + ) + op.create_table( + 'event', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('uuid', sa.String(length=36), nullable=True), + sa.Column('stack_id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('resource_action', sa.String(length=255), nullable=True), + sa.Column('resource_status', sa.String(length=255), nullable=True), + sa.Column('resource_name', sa.String(length=255), nullable=True), + sa.Column( + 'physical_resource_id', sa.String(length=255), nullable=True + ), + sa.Column( + 'resource_status_reason', sa.String(length=255), nullable=True + ), + sa.Column('resource_type', sa.String(length=255), nullable=True), + sa.Column('resource_properties', sa.PickleType(), nullable=True), + sa.Column('rsrc_prop_data_id', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint( + ['rsrc_prop_data_id'], + ['resource_properties_data.id'], + name='ev_rsrc_prop_data_ref', + ), + sa.ForeignKeyConstraint( + ['stack_id'], + ['stack.id'], + ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('uuid'), + mysql_engine='InnoDB', + ) + op.create_table( + 'resource', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('uuid', sa.String(length=36), nullable=True), + sa.Column('nova_instance', sa.String(length=255), nullable=True), + sa.Column('name', sa.String(length=255), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('action', sa.String(length=255), nullable=True), + sa.Column('status', sa.String(length=255), nullable=True), + sa.Column('status_reason', sa.Text(), nullable=True), + sa.Column('stack_id', sa.String(length=36), nullable=False), + sa.Column( + 'rsrc_metadata', heat.db.types.Json(), nullable=True + ), + sa.Column( + 'properties_data', heat.db.types.Json(), nullable=True + ), + sa.Column('engine_id', sa.String(length=36), nullable=True), + sa.Column('atomic_key', sa.Integer(), nullable=True), + sa.Column('needed_by', heat.db.types.List(), nullable=True), + sa.Column('requires', heat.db.types.List(), nullable=True), + sa.Column('replaces', sa.Integer(), nullable=True), + sa.Column('replaced_by', sa.Integer(), nullable=True), + sa.Column('current_template_id', sa.Integer(), nullable=True), + sa.Column('properties_data_encrypted', sa.Boolean(), nullable=True), + sa.Column('root_stack_id', sa.String(length=36), nullable=True), + sa.Column('rsrc_prop_data_id', sa.Integer(), nullable=True), + sa.Column('attr_data_id', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint( + ['attr_data_id'], + ['resource_properties_data.id'], + name='rsrc_attr_data_ref', + ), + sa.ForeignKeyConstraint( + ['current_template_id'], + ['raw_template.id'], + ), + sa.ForeignKeyConstraint( + ['rsrc_prop_data_id'], + ['resource_properties_data.id'], + name='rsrc_rsrc_prop_data_ref', + ), + sa.ForeignKeyConstraint( + ['stack_id'], + ['stack.id'], + ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('uuid'), + mysql_engine='InnoDB', + ) + op.create_index( + op.f('ix_resource_root_stack_id'), + 'resource', + ['root_stack_id'], + unique=False, + ) + op.create_table( + 'snapshot', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('stack_id', sa.String(length=36), nullable=False), + sa.Column('name', sa.String(length=255), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('status', sa.String(length=255), nullable=True), + sa.Column('status_reason', sa.String(length=255), nullable=True), + sa.Column('data', heat.db.types.Json(), nullable=True), + sa.Column('tenant', sa.String(length=64), nullable=False), + sa.ForeignKeyConstraint( + ['stack_id'], + ['stack.id'], + ), + sa.PrimaryKeyConstraint('id'), + mysql_engine='InnoDB', + ) + op.create_index( + op.f('ix_snapshot_tenant'), 'snapshot', ['tenant'], unique=False + ) + op.create_table( + 'stack_lock', + sa.Column('stack_id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('engine_id', sa.String(length=36), nullable=True), + sa.ForeignKeyConstraint( + ['stack_id'], + ['stack.id'], + ), + sa.PrimaryKeyConstraint('stack_id'), + mysql_engine='InnoDB', + ) + op.create_table( + 'stack_tag', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('tag', sa.Unicode(length=80), nullable=True), + sa.Column('stack_id', sa.String(length=36), nullable=False), + sa.ForeignKeyConstraint( + ['stack_id'], + ['stack.id'], + ), + sa.PrimaryKeyConstraint('id'), + mysql_engine='InnoDB', + ) + op.create_table( + 'sync_point', + sa.Column('entity_id', sa.String(length=36), nullable=False), + sa.Column('traversal_id', sa.String(length=36), nullable=False), + sa.Column('is_update', sa.Boolean(), nullable=False), + sa.Column('atomic_key', sa.Integer(), nullable=False), + sa.Column('stack_id', sa.String(length=36), nullable=False), + sa.Column( + 'input_data', heat.db.types.Json(), nullable=True + ), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint( + ['stack_id'], + ['stack.id'], + ), + sa.PrimaryKeyConstraint('entity_id', 'traversal_id', 'is_update'), + ) + op.create_table( + 'resource_data', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('key', sa.String(length=255), nullable=True), + sa.Column('value', sa.Text(), nullable=True), + sa.Column('redact', sa.Boolean(), nullable=True), + sa.Column('decrypt_method', sa.String(length=64), nullable=True), + sa.Column('resource_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ['resource_id'], + ['resource.id'], + name='fk_resource_id', + ondelete='CASCADE', + ), + sa.PrimaryKeyConstraint('id'), + mysql_engine='InnoDB', + ) diff --git a/heat/db/models.py b/heat/db/models.py new file mode 100644 index 000000000..ebc235f5d --- /dev/null +++ b/heat/db/models.py @@ -0,0 +1,399 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""SQLAlchemy models for heat data.""" + +import uuid + +from oslo_db.sqlalchemy import models +import sqlalchemy +from sqlalchemy.ext import declarative +from sqlalchemy.orm import backref +from sqlalchemy.orm import relationship + +from heat.db import types + +BASE = declarative.declarative_base() + + +class HeatBase(models.ModelBase, models.TimestampMixin): + """Base class for Heat Models.""" + __table_args__ = {'mysql_engine': 'InnoDB'} + + +class SoftDelete(object): + deleted_at = sqlalchemy.Column(sqlalchemy.DateTime) + + +class StateAware(object): + action = sqlalchemy.Column('action', sqlalchemy.String(255)) + status = sqlalchemy.Column('status', sqlalchemy.String(255)) + status_reason = sqlalchemy.Column('status_reason', sqlalchemy.Text) + + +class RawTemplate(BASE, HeatBase): + """Represents an unparsed template which should be in JSON format.""" + + __tablename__ = 'raw_template' + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + template = sqlalchemy.Column(types.Json) + # legacy column + files = sqlalchemy.Column(types.Json) + # modern column, reference to raw_template_files + files_id = sqlalchemy.Column( + sqlalchemy.Integer(), + sqlalchemy.ForeignKey('raw_template_files.id')) + environment = sqlalchemy.Column('environment', types.Json) + + +class RawTemplateFiles(BASE, HeatBase): + """Where template files json dicts are stored.""" + __tablename__ = 'raw_template_files' + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + files = sqlalchemy.Column(types.Json) + + +class StackTag(BASE, HeatBase): + """Key/value store of arbitrary stack tags.""" + + __tablename__ = 'stack_tag' + + id = sqlalchemy.Column('id', + sqlalchemy.Integer, + primary_key=True, + nullable=False) + tag = sqlalchemy.Column('tag', sqlalchemy.Unicode(80)) + stack_id = sqlalchemy.Column('stack_id', + sqlalchemy.String(36), + sqlalchemy.ForeignKey('stack.id'), + nullable=False) + + +class SyncPoint(BASE, HeatBase): + """Represents a syncpoint for a stack that is being worked on.""" + + __tablename__ = 'sync_point' + __table_args__ = ( + sqlalchemy.PrimaryKeyConstraint('entity_id', + 'traversal_id', + 'is_update'), + sqlalchemy.ForeignKeyConstraint(['stack_id'], ['stack.id']) + ) + + entity_id = sqlalchemy.Column(sqlalchemy.String(36)) + traversal_id = sqlalchemy.Column(sqlalchemy.String(36)) + is_update = sqlalchemy.Column(sqlalchemy.Boolean) + # integer field for atomic update operations + atomic_key = sqlalchemy.Column(sqlalchemy.Integer, nullable=False) + stack_id = sqlalchemy.Column(sqlalchemy.String(36), + nullable=False) + input_data = sqlalchemy.Column(types.Json) + + +class Stack(BASE, HeatBase, SoftDelete, StateAware): + """Represents a stack created by the heat engine.""" + + __tablename__ = 'stack' + __table_args__ = ( + sqlalchemy.Index('ix_stack_name', 'name', mysql_length=255), + sqlalchemy.Index('ix_stack_tenant', 'tenant', mysql_length=255), + ) + + id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True, + default=lambda: str(uuid.uuid4())) + name = sqlalchemy.Column(sqlalchemy.String(255)) + raw_template_id = sqlalchemy.Column( + sqlalchemy.Integer, + sqlalchemy.ForeignKey('raw_template.id'), + nullable=False) + raw_template = relationship(RawTemplate, backref=backref('stack'), + foreign_keys=[raw_template_id]) + prev_raw_template_id = sqlalchemy.Column( + 'prev_raw_template_id', + sqlalchemy.Integer, + sqlalchemy.ForeignKey('raw_template.id')) + prev_raw_template = relationship(RawTemplate, + foreign_keys=[prev_raw_template_id]) + username = sqlalchemy.Column(sqlalchemy.String(256)) + tenant = sqlalchemy.Column(sqlalchemy.String(256)) + user_creds_id = sqlalchemy.Column( + sqlalchemy.Integer, + sqlalchemy.ForeignKey('user_creds.id')) + owner_id = sqlalchemy.Column(sqlalchemy.String(36), index=True) + parent_resource_name = sqlalchemy.Column(sqlalchemy.String(255)) + timeout = sqlalchemy.Column(sqlalchemy.Integer) + disable_rollback = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False) + stack_user_project_id = sqlalchemy.Column(sqlalchemy.String(64)) + backup = sqlalchemy.Column('backup', sqlalchemy.Boolean) + nested_depth = sqlalchemy.Column('nested_depth', sqlalchemy.Integer) + convergence = sqlalchemy.Column('convergence', sqlalchemy.Boolean) + tags = relationship(StackTag, cascade="all,delete", + backref=backref('stack')) + current_traversal = sqlalchemy.Column('current_traversal', + sqlalchemy.String(36)) + current_deps = sqlalchemy.Column('current_deps', types.Json) + + # Override timestamp column to store the correct value: it should be the + # time the create/update call was issued, not the time the DB entry is + # created/modified. (bug #1193269) + updated_at = sqlalchemy.Column(sqlalchemy.DateTime) + + +class StackLock(BASE, HeatBase): + """Store stack locks for deployments with multiple-engines.""" + + __tablename__ = 'stack_lock' + + stack_id = sqlalchemy.Column(sqlalchemy.String(36), + sqlalchemy.ForeignKey('stack.id'), + primary_key=True) + engine_id = sqlalchemy.Column(sqlalchemy.String(36)) + + +class UserCreds(BASE, HeatBase): + """Represents user credentials. + + Also, mirrors the 'context' handed in by wsgi. + """ + + __tablename__ = 'user_creds' + + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + username = sqlalchemy.Column(sqlalchemy.String(255)) + password = sqlalchemy.Column(sqlalchemy.String(255)) + region_name = sqlalchemy.Column(sqlalchemy.String(255)) + decrypt_method = sqlalchemy.Column(sqlalchemy.String(64)) + tenant = sqlalchemy.Column(sqlalchemy.String(1024)) + auth_url = sqlalchemy.Column(sqlalchemy.Text) + tenant_id = sqlalchemy.Column(sqlalchemy.String(256)) + trust_id = sqlalchemy.Column(sqlalchemy.String(255)) + trustor_user_id = sqlalchemy.Column(sqlalchemy.String(64)) + stack = relationship(Stack, backref=backref('user_creds'), + cascade_backrefs=False) + + +class ResourcePropertiesData(BASE, HeatBase): + """Represents resource properties data, current or older""" + + __tablename__ = 'resource_properties_data' + + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + data = sqlalchemy.Column('data', types.Json) + encrypted = sqlalchemy.Column('encrypted', sqlalchemy.Boolean) + + +class Event(BASE, HeatBase): + """Represents an event generated by the heat engine.""" + + __tablename__ = 'event' + + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + stack_id = sqlalchemy.Column(sqlalchemy.String(36), + sqlalchemy.ForeignKey('stack.id'), + nullable=False) + stack = relationship(Stack, backref=backref('events')) + + uuid = sqlalchemy.Column(sqlalchemy.String(36), + default=lambda: str(uuid.uuid4()), + unique=True) + resource_action = sqlalchemy.Column(sqlalchemy.String(255)) + resource_status = sqlalchemy.Column(sqlalchemy.String(255)) + resource_name = sqlalchemy.Column(sqlalchemy.String(255)) + physical_resource_id = sqlalchemy.Column(sqlalchemy.String(255)) + _resource_status_reason = sqlalchemy.Column( + 'resource_status_reason', sqlalchemy.String(255)) + resource_type = sqlalchemy.Column(sqlalchemy.String(255)) + rsrc_prop_data_id = sqlalchemy.Column(sqlalchemy.Integer, + sqlalchemy.ForeignKey( + 'resource_properties_data.id')) + rsrc_prop_data = relationship(ResourcePropertiesData, + backref=backref('event')) + resource_properties = sqlalchemy.Column(sqlalchemy.PickleType) + + @property + def resource_status_reason(self): + return self._resource_status_reason + + @resource_status_reason.setter + def resource_status_reason(self, reason): + self._resource_status_reason = reason and reason[:255] or '' + + +class ResourceData(BASE, HeatBase): + """Key/value store of arbitrary, resource-specific data.""" + + __tablename__ = 'resource_data' + + id = sqlalchemy.Column('id', + sqlalchemy.Integer, + primary_key=True, + nullable=False) + key = sqlalchemy.Column('key', sqlalchemy.String(255)) + value = sqlalchemy.Column('value', sqlalchemy.Text) + redact = sqlalchemy.Column('redact', sqlalchemy.Boolean) + decrypt_method = sqlalchemy.Column(sqlalchemy.String(64)) + resource_id = sqlalchemy.Column( + 'resource_id', sqlalchemy.Integer, + sqlalchemy.ForeignKey(column='resource.id', name='fk_resource_id', + ondelete='CASCADE'), + nullable=False) + + +class Resource(BASE, HeatBase, StateAware): + """Represents a resource created by the heat engine.""" + + __tablename__ = 'resource' + + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + uuid = sqlalchemy.Column(sqlalchemy.String(36), + default=lambda: str(uuid.uuid4()), + unique=True) + name = sqlalchemy.Column('name', sqlalchemy.String(255)) + physical_resource_id = sqlalchemy.Column('nova_instance', + sqlalchemy.String(255)) + # odd name as "metadata" is reserved + rsrc_metadata = sqlalchemy.Column('rsrc_metadata', types.Json) + + stack_id = sqlalchemy.Column(sqlalchemy.String(36), + sqlalchemy.ForeignKey('stack.id'), + nullable=False) + stack = relationship(Stack, backref=backref('resources')) + root_stack_id = sqlalchemy.Column(sqlalchemy.String(36), index=True) + data = relationship(ResourceData, + cascade="all", + passive_deletes=True, + backref=backref('resource')) + rsrc_prop_data_id = sqlalchemy.Column(sqlalchemy.Integer, + sqlalchemy.ForeignKey( + 'resource_properties_data.id')) + rsrc_prop_data = relationship(ResourcePropertiesData, + foreign_keys=[rsrc_prop_data_id]) + attr_data_id = sqlalchemy.Column(sqlalchemy.Integer, + sqlalchemy.ForeignKey( + 'resource_properties_data.id')) + attr_data = relationship(ResourcePropertiesData, + foreign_keys=[attr_data_id]) + + # Override timestamp column to store the correct value: it should be the + # time the create/update call was issued, not the time the DB entry is + # created/modified. (bug #1193269) + updated_at = sqlalchemy.Column(sqlalchemy.DateTime) + properties_data = sqlalchemy.Column('properties_data', types.Json) + properties_data_encrypted = sqlalchemy.Column('properties_data_encrypted', + sqlalchemy.Boolean) + engine_id = sqlalchemy.Column(sqlalchemy.String(36)) + atomic_key = sqlalchemy.Column(sqlalchemy.Integer) + + needed_by = sqlalchemy.Column('needed_by', types.List) + requires = sqlalchemy.Column('requires', types.List) + replaces = sqlalchemy.Column('replaces', sqlalchemy.Integer, + default=None) + replaced_by = sqlalchemy.Column('replaced_by', sqlalchemy.Integer, + default=None) + current_template_id = sqlalchemy.Column( + 'current_template_id', + sqlalchemy.Integer, + sqlalchemy.ForeignKey('raw_template.id')) + + +class SoftwareConfig(BASE, HeatBase): + """Represents a software configuration resource. + + Represents a software configuration resource to be applied to one or more + servers. + """ + + __tablename__ = 'software_config' + + id = sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True, + default=lambda: str(uuid.uuid4())) + name = sqlalchemy.Column('name', sqlalchemy.String(255)) + group = sqlalchemy.Column('group', sqlalchemy.String(255)) + config = sqlalchemy.Column('config', types.Json) + tenant = sqlalchemy.Column( + 'tenant', sqlalchemy.String(64), nullable=False, index=True) + + +class SoftwareDeployment(BASE, HeatBase, StateAware): + """Represents a software deployment resource. + + Represents applying a software configuration resource to a single server + resource. + """ + + __tablename__ = 'software_deployment' + __table_args__ = ( + sqlalchemy.Index('ix_software_deployment_created_at', 'created_at'),) + + id = sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True, + default=lambda: str(uuid.uuid4())) + config_id = sqlalchemy.Column( + 'config_id', + sqlalchemy.String(36), + sqlalchemy.ForeignKey('software_config.id'), + nullable=False) + config = relationship(SoftwareConfig, backref=backref('deployments')) + server_id = sqlalchemy.Column('server_id', sqlalchemy.String(36), + nullable=False, index=True) + input_values = sqlalchemy.Column('input_values', types.Json) + output_values = sqlalchemy.Column('output_values', types.Json) + tenant = sqlalchemy.Column( + 'tenant', sqlalchemy.String(64), nullable=False, index=True) + stack_user_project_id = sqlalchemy.Column(sqlalchemy.String(64)) + updated_at = sqlalchemy.Column(sqlalchemy.DateTime) + + +class Snapshot(BASE, HeatBase): + + __tablename__ = 'snapshot' + + id = sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True, + default=lambda: str(uuid.uuid4())) + stack_id = sqlalchemy.Column(sqlalchemy.String(36), + sqlalchemy.ForeignKey('stack.id'), + nullable=False) + name = sqlalchemy.Column('name', sqlalchemy.String(255)) + data = sqlalchemy.Column('data', types.Json) + tenant = sqlalchemy.Column( + 'tenant', sqlalchemy.String(64), nullable=False, index=True) + status = sqlalchemy.Column('status', sqlalchemy.String(255)) + status_reason = sqlalchemy.Column('status_reason', sqlalchemy.String(255)) + stack = relationship(Stack, backref=backref('snapshot')) + + +class Service(BASE, HeatBase, SoftDelete): + + __tablename__ = 'service' + + id = sqlalchemy.Column('id', + sqlalchemy.String(36), + primary_key=True, + default=lambda: str(uuid.uuid4())) + engine_id = sqlalchemy.Column('engine_id', + sqlalchemy.String(36), + nullable=False) + host = sqlalchemy.Column('host', + sqlalchemy.String(255), + nullable=False) + hostname = sqlalchemy.Column('hostname', + sqlalchemy.String(255), + nullable=False) + binary = sqlalchemy.Column('binary', + sqlalchemy.String(255), + nullable=False) + topic = sqlalchemy.Column('topic', + sqlalchemy.String(255), + nullable=False) + report_interval = sqlalchemy.Column('report_interval', + sqlalchemy.Integer, + nullable=False) diff --git a/heat/db/sqlalchemy/__init__.py b/heat/db/sqlalchemy/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/heat/db/sqlalchemy/alembic.ini b/heat/db/sqlalchemy/alembic.ini deleted file mode 100644 index b4a2f235a..000000000 --- a/heat/db/sqlalchemy/alembic.ini +++ /dev/null @@ -1,72 +0,0 @@ -[alembic] -# path to migration scripts -script_location = %(here)s/migrations - -# sys.path path, will be prepended to sys.path if present. -# defaults to the current working directory. -prepend_sys_path = . - -# version location specification; This defaults -# to heat/db/sqlalchemy/migrations/versions. When using multiple version -# directories, initial revisions must be specified with --version-path. -# The path separator used here should be the separator specified by "version_path_separator" below. -# version_locations = %(here)s/bar:%(here)s/bat:heat/db/sqlalchemy/migrations/versions - -# version path separator; As mentioned above, this is the character used to split -# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. -# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. -# Valid values for version_path_separator are: -# -# version_path_separator = : -# version_path_separator = ; -# version_path_separator = space -version_path_separator = os # Use os.pathsep. Default configuration used for new projects. - -sqlalchemy.url = sqlite:///heat.db - - -[post_write_hooks] -# post_write_hooks defines scripts or Python functions that are run -# on newly generated revision scripts. See the documentation for further -# detail and examples - -# format using "black" - use the console_scripts runner, against the "black" entrypoint -# hooks = black -# black.type = console_scripts -# black.entrypoint = black -# black.options = -l 79 REVISION_SCRIPT_FILENAME - -# Logging configuration -[loggers] -keys = root,sqlalchemy,alembic - -[handlers] -keys = console - -[formatters] -keys = generic - -[logger_root] -level = WARN -handlers = console -qualname = - -[logger_sqlalchemy] -level = WARN -handlers = -qualname = sqlalchemy.engine - -[logger_alembic] -level = INFO -handlers = -qualname = alembic - -[handler_console] -class = StreamHandler -args = (sys.stderr,) -level = NOTSET -formatter = generic - -[formatter_generic] -format = %(levelname)-5.5s [%(name)s] %(message)s -datefmt = %H:%M:%S diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py deleted file mode 100644 index 1556d2d76..000000000 --- a/heat/db/sqlalchemy/api.py +++ /dev/null @@ -1,1971 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Implementation of SQLAlchemy backend.""" -import datetime -import functools -import itertools -import random - -from oslo_config import cfg -from oslo_db import api as oslo_db_api -from oslo_db import exception as db_exception -from oslo_db import options -from oslo_db.sqlalchemy import enginefacade -from oslo_db.sqlalchemy import utils -from oslo_log import log as logging -from oslo_utils import encodeutils -from oslo_utils import excutils -from oslo_utils import timeutils -import osprofiler.sqlalchemy -import sqlalchemy -from sqlalchemy import and_ -from sqlalchemy import func -from sqlalchemy import or_ -from sqlalchemy import orm -from sqlalchemy.orm import aliased as orm_aliased - -from heat.common import crypt -from heat.common import exception -from heat.common.i18n import _ -from heat.db.sqlalchemy import filters as db_filters -from heat.db.sqlalchemy import models -from heat.db.sqlalchemy import utils as db_utils -from heat.engine import environment as heat_environment -from heat.rpc import api as rpc_api - -CONF = cfg.CONF -CONF.import_opt('hidden_stack_tags', 'heat.common.config') -CONF.import_opt('max_events_per_stack', 'heat.common.config') -CONF.import_group('profiler', 'heat.common.config') -CONF.import_opt('db_max_retries', 'oslo_db.options', group='database') -CONF.import_opt('db_retry_interval', 'oslo_db.options', group='database') -CONF.import_opt( - 'db_inc_retry_interval', 'oslo_db.options', group='database') -CONF.import_opt( - 'db_max_retry_interval', 'oslo_db.options', group='database') - -options.set_defaults(CONF) - -_facade = None -db_context = enginefacade.transaction_context() - -LOG = logging.getLogger(__name__) - - -# TODO(sbaker): fix tests so that sqlite_fk=True can be passed to configure -# FIXME(stephenfin): we need to remove reliance on autocommit semantics ASAP -# since it's not compatible with SQLAlchemy 2.0 -db_context.configure(__autocommit=True) - - -def get_facade(): - global _facade - if _facade is None: - - # FIXME: get_facade() is called by the test suite startup, - # but will not be called normally for API calls. - # osprofiler / oslo_db / enginefacade currently don't have hooks - # to talk to each other, however one needs to be added to oslo.db - # to allow access to the Engine once constructed. - db_context.configure(**CONF.database) - _facade = db_context.get_legacy_facade() - if CONF.profiler.enabled: - if CONF.profiler.trace_sqlalchemy: - osprofiler.sqlalchemy.add_tracing(sqlalchemy, - _facade.get_engine(), - "db") - return _facade - - -def get_engine(): - return get_facade().get_engine() - - -def get_session(): - return get_facade().get_session() - - -def retry_on_db_error(func): - @functools.wraps(func) - def try_func(context, *args, **kwargs): - if (context.session.transaction is None or - not context.session.autocommit): - wrapped = oslo_db_api.wrap_db_retry( - max_retries=CONF.database.db_max_retries, - retry_on_deadlock=True, - retry_on_disconnect=True, - retry_interval=CONF.database.db_retry_interval, - inc_retry_interval=CONF.database.db_inc_retry_interval, - max_retry_interval=CONF.database.db_max_retry_interval)(func) - return wrapped(context, *args, **kwargs) - else: - try: - return func(context, *args, **kwargs) - except (db_exception.DBDeadlock, db_exception.DBConnectionError): - with excutils.save_and_reraise_exception(): - LOG.debug('Not retrying on DBDeadlock and ' - 'DBConnectionError because ' - 'transaction not closed') - return try_func - - -def update_and_save(context, obj, values): - with context.session.begin(subtransactions=True): - for k, v in values.items(): - setattr(obj, k, v) - - -def delete_softly(context, obj): - """Mark this object as deleted.""" - update_and_save(context, obj, {'deleted_at': timeutils.utcnow()}) - - -def soft_delete_aware_query(context, *args, **kwargs): - """Stack query helper that accounts for context's `show_deleted` field. - - :param show_deleted: if True, overrides context's show_deleted field. - """ - - query = context.session.query(*args) - show_deleted = kwargs.get('show_deleted') or context.show_deleted - - if not show_deleted: - query = query.filter_by(deleted_at=None) - return query - - -def raw_template_get(context, template_id): - result = context.session.query(models.RawTemplate).get(template_id) - - if not result: - raise exception.NotFound(_('raw template with id %s not found') % - template_id) - return result - - -def raw_template_create(context, values): - raw_template_ref = models.RawTemplate() - raw_template_ref.update(values) - raw_template_ref.save(context.session) - return raw_template_ref - - -def raw_template_update(context, template_id, values): - raw_template_ref = raw_template_get(context, template_id) - # get only the changed values - values = dict((k, v) for k, v in values.items() - if getattr(raw_template_ref, k) != v) - - if values: - update_and_save(context, raw_template_ref, values) - - return raw_template_ref - - -def raw_template_delete(context, template_id): - try: - raw_template = raw_template_get(context, template_id) - except exception.NotFound: - # Ignore not found - return - raw_tmpl_files_id = raw_template.files_id - session = context.session - with session.begin(subtransactions=True): - session.delete(raw_template) - if raw_tmpl_files_id is None: - return - # If no other raw_template is referencing the same raw_template_files, - # delete that too - if session.query(models.RawTemplate).filter_by( - files_id=raw_tmpl_files_id).first() is None: - try: - raw_tmpl_files = raw_template_files_get( - context, raw_tmpl_files_id) - except exception.NotFound: - # Ignore not found - return - session.delete(raw_tmpl_files) - - -def raw_template_files_create(context, values): - session = context.session - raw_templ_files_ref = models.RawTemplateFiles() - raw_templ_files_ref.update(values) - with session.begin(): - raw_templ_files_ref.save(session) - return raw_templ_files_ref - - -def raw_template_files_get(context, files_id): - result = context.session.query(models.RawTemplateFiles).get(files_id) - if not result: - raise exception.NotFound( - _("raw_template_files with files_id %d not found") % - files_id) - return result - - -def resource_get(context, resource_id, refresh=False, refresh_data=False, - eager=True): - query = context.session.query(models.Resource) - query = query.options(orm.joinedload("data")) - if eager: - query = query.options(orm.joinedload("rsrc_prop_data")) - - result = query.get(resource_id) - if not result: - raise exception.NotFound(_("resource with id %s not found") % - resource_id) - if refresh: - context.session.refresh(result) - if refresh_data: - # ensure data is loaded (lazy or otherwise) - result.data - - return result - - -def resource_get_by_name_and_stack(context, resource_name, stack_id): - result = context.session.query( - models.Resource - ).filter_by( - name=resource_name - ).filter_by( - stack_id=stack_id - ).options(orm.joinedload("data")).first() - return result - - -def resource_get_all_by_physical_resource_id(context, physical_resource_id): - results = (context.session.query(models.Resource) - .filter_by(physical_resource_id=physical_resource_id) - .all()) - - for result in results: - if context is None or context.is_admin or context.tenant_id in ( - result.stack.tenant, result.stack.stack_user_project_id): - yield result - - -def resource_get_by_physical_resource_id(context, physical_resource_id): - results = resource_get_all_by_physical_resource_id(context, - physical_resource_id) - try: - return next(results) - except StopIteration: - return None - - -def resource_get_all(context): - results = context.session.query(models.Resource).all() - - if not results: - raise exception.NotFound(_('no resources were found')) - return results - - -@retry_on_db_error -def resource_purge_deleted(context, stack_id): - filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'} - query = context.session.query(models.Resource) - result = query.filter_by(**filters) - attr_ids = [r.attr_data_id for r in result if r.attr_data_id is not None] - with context.session.begin(): - result.delete() - if attr_ids: - context.session.query(models.ResourcePropertiesData).filter( - models.ResourcePropertiesData.id.in_(attr_ids)).delete( - synchronize_session=False) - - -def _add_atomic_key_to_values(values, atomic_key): - if atomic_key is None: - values['atomic_key'] = 1 - else: - values['atomic_key'] = atomic_key + 1 - - -@retry_on_db_error -def resource_update(context, resource_id, values, atomic_key, - expected_engine_id=None): - return _try_resource_update(context, resource_id, values, atomic_key, - expected_engine_id) - - -def _try_resource_update(context, resource_id, values, atomic_key, - expected_engine_id=None): - session = context.session - with session.begin(subtransactions=True): - _add_atomic_key_to_values(values, atomic_key) - rows_updated = session.query(models.Resource).filter_by( - id=resource_id, engine_id=expected_engine_id, - atomic_key=atomic_key).update(values) - - return bool(rows_updated) - - -def resource_update_and_save(context, resource_id, values): - resource = context.session.query(models.Resource).get(resource_id) - update_and_save(context, resource, values) - - -def resource_delete(context, resource_id): - session = context.session - with session.begin(subtransactions=True): - resource = session.query(models.Resource).get(resource_id) - if resource: - session.delete(resource) - if resource.attr_data_id is not None: - attr_prop_data = session.query( - models.ResourcePropertiesData).get(resource.attr_data_id) - session.delete(attr_prop_data) - - -def resource_attr_id_set(context, resource_id, atomic_key, attr_id): - session = context.session - with session.begin(): - values = {'attr_data_id': attr_id} - _add_atomic_key_to_values(values, atomic_key) - rows_updated = session.query(models.Resource).filter(and_( - models.Resource.id == resource_id, - models.Resource.atomic_key == atomic_key, - models.Resource.engine_id.is_(None), - or_(models.Resource.attr_data_id == attr_id, - models.Resource.attr_data_id.is_(None)))).update( - values) - if rows_updated > 0: - return True - else: - # Someone else set the attr_id first and/or we have a stale - # view of the resource based on atomic_key, so delete the - # resource_properties_data (attr) DB row. - LOG.debug('Not updating res_id %(rid)s with attr_id %(aid)s', - {'rid': resource_id, 'aid': attr_id}) - session.query( - models.ResourcePropertiesData).filter( - models.ResourcePropertiesData.id == attr_id).delete() - return False - - -def resource_attr_data_delete(context, resource_id, attr_id): - session = context.session - with session.begin(): - resource = session.query(models.Resource).get(resource_id) - attr_prop_data = session.query( - models.ResourcePropertiesData).get(attr_id) - if resource: - resource.update({'attr_data_id': None}) - if attr_prop_data: - session.delete(attr_prop_data) - - -def resource_data_get_all(context, resource_id, data=None): - """Looks up resource_data by resource.id. - - If data is encrypted, this method will decrypt the results. - """ - if data is None: - data = (context.session.query(models.ResourceData) - .filter_by(resource_id=resource_id)).all() - - if not data: - raise exception.NotFound(_('no resource data found')) - - ret = {} - - for res in data: - if res.redact: - try: - ret[res.key] = crypt.decrypt(res.decrypt_method, res.value) - continue - except exception.InvalidEncryptionKey: - LOG.exception('Failed to decrypt resource data %(rkey)s ' - 'for %(rid)s, ignoring.', - {'rkey': res.key, 'rid': resource_id}) - ret[res.key] = res.value - return ret - - -def resource_data_get(context, resource_id, key): - """Lookup value of resource's data by key. - - Decrypts resource data if necessary. - """ - result = resource_data_get_by_key(context, - resource_id, - key) - if result.redact: - return crypt.decrypt(result.decrypt_method, result.value) - return result.value - - -def stack_tags_set(context, stack_id, tags): - session = context.session - with session.begin(): - stack_tags_delete(context, stack_id) - result = [] - for tag in tags: - stack_tag = models.StackTag() - stack_tag.tag = tag - stack_tag.stack_id = stack_id - stack_tag.save(session=session) - result.append(stack_tag) - return result or None - - -def stack_tags_delete(context, stack_id): - session = context.session - with session.begin(subtransactions=True): - result = stack_tags_get(context, stack_id) - if result: - for tag in result: - session.delete(tag) - - -def stack_tags_get(context, stack_id): - result = (context.session.query(models.StackTag) - .filter_by(stack_id=stack_id) - .all()) - return result or None - - -def resource_data_get_by_key(context, resource_id, key): - """Looks up resource_data by resource_id and key. - - Does not decrypt resource_data. - """ - result = (context.session.query(models.ResourceData) - .filter_by(resource_id=resource_id) - .filter_by(key=key).first()) - - if not result: - raise exception.NotFound(_('No resource data found')) - return result - - -def resource_data_set(context, resource_id, key, value, redact=False): - """Save resource's key/value pair to database.""" - if redact: - method, value = crypt.encrypt(value) - else: - method = '' - try: - current = resource_data_get_by_key(context, resource_id, key) - except exception.NotFound: - current = models.ResourceData() - current.key = key - current.resource_id = resource_id - current.redact = redact - current.value = value - current.decrypt_method = method - current.save(session=context.session) - return current - - -def resource_exchange_stacks(context, resource_id1, resource_id2): - query = context.session.query(models.Resource) - session = query.session - - with session.begin(): - res1 = query.get(resource_id1) - res2 = query.get(resource_id2) - - res1.stack, res2.stack = res2.stack, res1.stack - - -def resource_data_delete(context, resource_id, key): - result = resource_data_get_by_key(context, resource_id, key) - session = context.session - with session.begin(): - session.delete(result) - - -def resource_create(context, values): - resource_ref = models.Resource() - resource_ref.update(values) - resource_ref.save(context.session) - return resource_ref - - -@retry_on_db_error -def resource_create_replacement(context, - existing_res_id, - new_res_values, - atomic_key, expected_engine_id=None): - session = context.session - try: - with session.begin(): - new_res = resource_create(context, new_res_values) - update_data = {'replaced_by': new_res.id} - if not _try_resource_update(context, - existing_res_id, update_data, - atomic_key, - expected_engine_id=expected_engine_id): - data = {} - if 'name' in new_res_values: - data['resource_name'] = new_res_values['name'] - raise exception.UpdateInProgress(**data) - except db_exception.DBReferenceError as exc: - # New template_id no longer exists - LOG.debug('Not creating replacement resource: %s', exc) - return None - else: - return new_res - - -def resource_get_all_by_stack(context, stack_id, filters=None): - query = context.session.query( - models.Resource - ).filter_by( - stack_id=stack_id - ).options(orm.joinedload("data")).options(orm.joinedload("rsrc_prop_data")) - - query = db_filters.exact_filter(query, models.Resource, filters) - results = query.all() - - return dict((res.name, res) for res in results) - - -def resource_get_all_active_by_stack(context, stack_id): - filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'} - subquery = context.session.query(models.Resource.id).filter_by(**filters) - - results = context.session.query(models.Resource).filter_by( - stack_id=stack_id).filter( - models.Resource.id.notin_(subquery.scalar_subquery()) - ).options(orm.joinedload("data")).all() - - return dict((res.id, res) for res in results) - - -def resource_get_all_by_root_stack(context, stack_id, filters=None, - stack_id_only=False): - query = context.session.query( - models.Resource - ).filter_by( - root_stack_id=stack_id - ) - - if stack_id_only: - query = query.options(orm.load_only("id", "stack_id")) - else: - query = query.options(orm.joinedload("data")).options( - orm.joinedload("rsrc_prop_data")) - - query = db_filters.exact_filter(query, models.Resource, filters) - results = query.all() - - return dict((res.id, res) for res in results) - - -def engine_get_all_locked_by_stack(context, stack_id): - query = context.session.query( - func.distinct(models.Resource.engine_id) - ).filter( - models.Resource.stack_id == stack_id, - models.Resource.engine_id.isnot(None)) - return set(i[0] for i in query.all()) - - -def resource_prop_data_create_or_update(context, values, rpd_id=None): - obj_ref = None - if rpd_id is not None: - obj_ref = context.session.query( - models.ResourcePropertiesData).filter_by(id=rpd_id).first() - if obj_ref is None: - obj_ref = models.ResourcePropertiesData() - obj_ref.update(values) - obj_ref.save(context.session) - return obj_ref - - -def resource_prop_data_create(context, values): - return resource_prop_data_create_or_update(context, values) - - -def resource_prop_data_get(context, resource_prop_data_id): - result = context.session.query(models.ResourcePropertiesData).get( - resource_prop_data_id) - if result is None: - raise exception.NotFound( - _('ResourcePropertiesData with id %s not found') % - resource_prop_data_id) - return result - - -def stack_get_by_name_and_owner_id(context, stack_name, owner_id): - query = soft_delete_aware_query( - context, models.Stack - ).options(orm.joinedload("raw_template")).filter(sqlalchemy.or_( - models.Stack.tenant == context.tenant_id, - models.Stack.stack_user_project_id == context.tenant_id) - ).filter_by(name=stack_name).filter_by(owner_id=owner_id) - return query.first() - - -def stack_get_by_name(context, stack_name): - query = soft_delete_aware_query( - context, models.Stack - ).filter(sqlalchemy.or_( - models.Stack.tenant == context.tenant_id, - models.Stack.stack_user_project_id == context.tenant_id) - ).filter_by(name=stack_name) - return query.order_by(models.Stack.created_at).first() - - -def stack_get(context, stack_id, show_deleted=False, eager_load=True): - query = context.session.query(models.Stack) - if eager_load: - query = query.options(orm.joinedload("raw_template")) - result = query.get(stack_id) - - deleted_ok = show_deleted or context.show_deleted - if result is None or result.deleted_at is not None and not deleted_ok: - return None - - # One exception to normal project scoping is users created by the - # stacks in the stack_user_project_id (in the heat stack user domain) - if (result is not None - and context is not None and not context.is_admin - and context.tenant_id not in (result.tenant, - result.stack_user_project_id)): - return None - return result - - -def stack_get_status(context, stack_id): - query = context.session.query(models.Stack) - query = query.options( - orm.load_only("action", "status", "status_reason", "updated_at")) - result = query.filter_by(id=stack_id).first() - if result is None: - raise exception.NotFound(_('Stack with id %s not found') % stack_id) - - return (result.action, result.status, result.status_reason, - result.updated_at) - - -def stack_get_all_by_owner_id(context, owner_id): - results = soft_delete_aware_query( - context, models.Stack).filter_by(owner_id=owner_id, - backup=False).all() - return results - - -def stack_get_all_by_root_owner_id(context, owner_id): - for stack in stack_get_all_by_owner_id(context, owner_id): - yield stack - for ch_st in stack_get_all_by_root_owner_id(context, stack.id): - yield ch_st - - -def _get_sort_keys(sort_keys, mapping): - """Returns an array containing only allowed keys - - :param sort_keys: an array of strings - :param mapping: a mapping from keys to DB column names - :returns: filtered list of sort keys - """ - if isinstance(sort_keys, str): - sort_keys = [sort_keys] - return [mapping[key] for key in sort_keys or [] if key in mapping] - - -def _paginate_query(context, query, model, limit=None, sort_keys=None, - marker=None, sort_dir=None): - default_sort_keys = ['created_at'] - if not sort_keys: - sort_keys = default_sort_keys - if not sort_dir: - sort_dir = 'desc' - - # This assures the order of the stacks will always be the same - # even for sort_key values that are not unique in the database - sort_keys = sort_keys + ['id'] - - model_marker = None - if marker: - model_marker = context.session.query(model).get(marker) - try: - query = utils.paginate_query(query, model, limit, sort_keys, - model_marker, sort_dir) - except utils.InvalidSortKey as exc: - err_msg = encodeutils.exception_to_unicode(exc) - raise exception.Invalid(reason=err_msg) - return query - - -def _query_stack_get_all(context, show_deleted=False, - show_nested=False, show_hidden=False, tags=None, - tags_any=None, not_tags=None, not_tags_any=None): - if show_nested: - query = soft_delete_aware_query( - context, models.Stack, show_deleted=show_deleted - ).filter_by(backup=False) - else: - query = soft_delete_aware_query( - context, models.Stack, show_deleted=show_deleted - ).filter_by(owner_id=None) - - if not context.is_admin: - query = query.filter_by(tenant=context.tenant_id) - - query = query.options(orm.subqueryload("tags")) - if tags: - for tag in tags: - tag_alias = orm_aliased(models.StackTag) - query = query.join(tag_alias, models.Stack.tags) - query = query.filter(tag_alias.tag == tag) - - if tags_any: - query = query.filter( - models.Stack.tags.any( - models.StackTag.tag.in_(tags_any))) - - if not_tags: - subquery = soft_delete_aware_query( - context, models.Stack, show_deleted=show_deleted - ) - for tag in not_tags: - tag_alias = orm_aliased(models.StackTag) - subquery = subquery.join(tag_alias, models.Stack.tags) - subquery = subquery.filter(tag_alias.tag == tag) - not_stack_ids = [s.id for s in subquery.all()] - query = query.filter(models.Stack.id.notin_(not_stack_ids)) - - if not_tags_any: - query = query.filter( - ~models.Stack.tags.any( - models.StackTag.tag.in_(not_tags_any))) - - if not show_hidden and cfg.CONF.hidden_stack_tags: - query = query.filter( - ~models.Stack.tags.any( - models.StackTag.tag.in_(cfg.CONF.hidden_stack_tags))) - - return query - - -def stack_get_all(context, limit=None, sort_keys=None, marker=None, - sort_dir=None, filters=None, - show_deleted=False, show_nested=False, show_hidden=False, - tags=None, tags_any=None, not_tags=None, - not_tags_any=None, eager_load=False): - query = _query_stack_get_all(context, - show_deleted=show_deleted, - show_nested=show_nested, - show_hidden=show_hidden, tags=tags, - tags_any=tags_any, not_tags=not_tags, - not_tags_any=not_tags_any) - if eager_load: - query = query.options(orm.joinedload("raw_template")) - return _filter_and_page_query(context, query, limit, sort_keys, - marker, sort_dir, filters).all() - - -def _filter_and_page_query(context, query, limit=None, sort_keys=None, - marker=None, sort_dir=None, filters=None): - if filters is None: - filters = {} - - sort_key_map = {rpc_api.STACK_NAME: models.Stack.name.key, - rpc_api.STACK_STATUS: models.Stack.status.key, - rpc_api.STACK_CREATION_TIME: models.Stack.created_at.key, - rpc_api.STACK_UPDATED_TIME: models.Stack.updated_at.key} - valid_sort_keys = _get_sort_keys(sort_keys, sort_key_map) - - query = db_filters.exact_filter(query, models.Stack, filters) - return _paginate_query(context, query, models.Stack, limit, - valid_sort_keys, marker, sort_dir) - - -def stack_count_all(context, filters=None, - show_deleted=False, show_nested=False, show_hidden=False, - tags=None, tags_any=None, not_tags=None, - not_tags_any=None): - query = _query_stack_get_all(context, - show_deleted=show_deleted, - show_nested=show_nested, - show_hidden=show_hidden, tags=tags, - tags_any=tags_any, not_tags=not_tags, - not_tags_any=not_tags_any) - query = db_filters.exact_filter(query, models.Stack, filters) - return query.count() - - -def stack_create(context, values): - stack_ref = models.Stack() - stack_ref.update(values) - stack_name = stack_ref.name - stack_ref.save(context.session) - - # Even though we just created a stack with this name, we may not find - # it again because some unit tests create stacks with deleted_at set. Also - # some backup stacks may not be found, for reasons that are unclear. - earliest = stack_get_by_name(context, stack_name) - if earliest is not None and earliest.id != stack_ref.id: - context.session.query(models.Stack).filter_by(id=stack_ref.id).delete() - raise exception.StackExists(stack_name=stack_name) - - return stack_ref - - -@retry_on_db_error -def stack_update(context, stack_id, values, exp_trvsl=None): - session = context.session - with session.begin(subtransactions=True): - query = (session.query(models.Stack) - .filter(and_(models.Stack.id == stack_id), - (models.Stack.deleted_at.is_(None)))) - if not context.is_admin: - query = query.filter(sqlalchemy.or_( - models.Stack.tenant == context.tenant_id, - models.Stack.stack_user_project_id == context.tenant_id)) - if exp_trvsl is not None: - query = query.filter(models.Stack.current_traversal == exp_trvsl) - rows_updated = query.update(values, synchronize_session=False) - if not rows_updated: - LOG.debug('Did not set stack state with values ' - '%(vals)s, stack id: %(id)s with ' - 'expected traversal: %(trav)s', - {'id': stack_id, 'vals': str(values), - 'trav': str(exp_trvsl)}) - if not stack_get(context, stack_id, eager_load=False): - raise exception.NotFound( - _('Attempt to update a stack with id: ' - '%(id)s %(msg)s') % { - 'id': stack_id, - 'msg': 'that does not exist'}) - session.expire_all() - return (rows_updated is not None and rows_updated > 0) - - -def stack_delete(context, stack_id): - s = stack_get(context, stack_id, eager_load=False) - if not s: - raise exception.NotFound(_('Attempt to delete a stack with id: ' - '%(id)s %(msg)s') % { - 'id': stack_id, - 'msg': 'that does not exist'}) - session = context.session - with session.begin(): - attr_ids = [] - # normally the resources are deleted already by this point - for r in s.resources: - if r.attr_data_id is not None: - attr_ids.append(r.attr_data_id) - session.delete(r) - if attr_ids: - session.query( - models.ResourcePropertiesData.id).filter( - models.ResourcePropertiesData.id.in_(attr_ids)).delete( - synchronize_session=False) - delete_softly(context, s) - - -def _is_duplicate_error(exc): - return isinstance(exc, db_exception.DBDuplicateEntry) - - -@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True, - retry_on_disconnect=True, - retry_interval=0.5, - inc_retry_interval=True, - exception_checker=_is_duplicate_error) -def stack_lock_create(context, stack_id, engine_id): - with db_context.writer.independent.using(context) as session: - lock = session.query(models.StackLock).get(stack_id) - if lock is not None: - return lock.engine_id - session.add(models.StackLock(stack_id=stack_id, engine_id=engine_id)) - - -def stack_lock_get_engine_id(context, stack_id): - with db_context.reader.independent.using(context) as session: - lock = session.query(models.StackLock).get(stack_id) - if lock is not None: - return lock.engine_id - - -def persist_state_and_release_lock(context, stack_id, engine_id, values): - session = context.session - with session.begin(): - rows_updated = (session.query(models.Stack) - .filter(models.Stack.id == stack_id) - .update(values, synchronize_session=False)) - rows_affected = None - if rows_updated is not None and rows_updated > 0: - rows_affected = session.query( - models.StackLock - ).filter_by(stack_id=stack_id, engine_id=engine_id).delete() - session.expire_all() - if not rows_affected: - return True - - -def stack_lock_steal(context, stack_id, old_engine_id, new_engine_id): - with db_context.writer.independent.using(context) as session: - lock = session.query(models.StackLock).get(stack_id) - rows_affected = session.query( - models.StackLock - ).filter_by(stack_id=stack_id, engine_id=old_engine_id - ).update({"engine_id": new_engine_id}) - if not rows_affected: - return lock.engine_id if lock is not None else True - - -def stack_lock_release(context, stack_id, engine_id): - with db_context.writer.independent.using(context) as session: - rows_affected = session.query( - models.StackLock - ).filter_by(stack_id=stack_id, engine_id=engine_id).delete() - if not rows_affected: - return True - - -def stack_get_root_id(context, stack_id): - s = stack_get(context, stack_id, eager_load=False) - if not s: - return None - while s.owner_id: - s = stack_get(context, s.owner_id, eager_load=False) - return s.id - - -def stack_count_total_resources(context, stack_id): - # count all resources which belong to the root stack - return context.session.query( - func.count(models.Resource.id) - ).filter_by(root_stack_id=stack_id).scalar() - - -def user_creds_create(context): - values = context.to_dict() - user_creds_ref = models.UserCreds() - if values.get('trust_id'): - method, trust_id = crypt.encrypt(values.get('trust_id')) - user_creds_ref.trust_id = trust_id - user_creds_ref.decrypt_method = method - user_creds_ref.trustor_user_id = values.get('trustor_user_id') - user_creds_ref.username = None - user_creds_ref.password = None - user_creds_ref.tenant = values.get('tenant') - user_creds_ref.tenant_id = values.get('tenant_id') - user_creds_ref.auth_url = values.get('auth_url') - user_creds_ref.region_name = values.get('region_name') - else: - user_creds_ref.update(values) - method, password = crypt.encrypt(values['password']) - if len(str(password)) > 255: - raise exception.Error(_("Length of OS_PASSWORD after encryption" - " exceeds Heat limit (255 chars)")) - user_creds_ref.password = password - user_creds_ref.decrypt_method = method - user_creds_ref.save(context.session) - result = dict(user_creds_ref) - - if values.get('trust_id'): - result['trust_id'] = values.get('trust_id') - else: - result['password'] = values.get('password') - - return result - - -def user_creds_get(context, user_creds_id): - db_result = context.session.query(models.UserCreds).get(user_creds_id) - if db_result is None: - return None - # Return a dict copy of DB results, do not decrypt details into db_result - # or it can be committed back to the DB in decrypted form - result = dict(db_result) - del result['decrypt_method'] - result['password'] = crypt.decrypt( - db_result.decrypt_method, result['password']) - result['trust_id'] = crypt.decrypt( - db_result.decrypt_method, result['trust_id']) - return result - - -@db_utils.retry_on_stale_data_error -def user_creds_delete(context, user_creds_id): - creds = context.session.query(models.UserCreds).get(user_creds_id) - if not creds: - raise exception.NotFound( - _('Attempt to delete user creds with id ' - '%(id)s that does not exist') % {'id': user_creds_id}) - with context.session.begin(): - context.session.delete(creds) - - -def event_get_all_by_tenant(context, limit=None, marker=None, - sort_keys=None, sort_dir=None, filters=None): - query = context.session.query(models.Event) - query = db_filters.exact_filter(query, models.Event, filters) - query = query.join( - models.Event.stack - ).filter_by(tenant=context.tenant_id).filter_by(deleted_at=None) - filters = None - return _events_filter_and_page_query(context, query, limit, marker, - sort_keys, sort_dir, filters).all() - - -def _query_all_events_by_stack(context, stack_id): - return context.session.query(models.Event).filter_by(stack_id=stack_id) - - -def event_get_all_by_stack(context, stack_id, limit=None, marker=None, - sort_keys=None, sort_dir=None, filters=None): - query = _query_all_events_by_stack(context, stack_id) - if filters and 'uuid' in filters: - # retrieving a single event, so eager load its rsrc_prop_data detail - query = query.options(orm.joinedload("rsrc_prop_data")) - return _events_filter_and_page_query(context, query, limit, marker, - sort_keys, sort_dir, filters).all() - - -def _events_paginate_query(context, query, model, limit=None, sort_keys=None, - marker=None, sort_dir=None): - default_sort_keys = ['created_at'] - if not sort_keys: - sort_keys = default_sort_keys - if not sort_dir: - sort_dir = 'desc' - - # This assures the order of the stacks will always be the same - # even for sort_key values that are not unique in the database - sort_keys = sort_keys + ['id'] - - model_marker = None - if marker: - # not to use context.session.query(model).get(marker), because - # user can only see the ID(column 'uuid') and the ID as the marker - model_marker = context.session.query( - model).filter_by(uuid=marker).first() - try: - query = utils.paginate_query(query, model, limit, sort_keys, - model_marker, sort_dir) - except utils.InvalidSortKey as exc: - err_msg = encodeutils.exception_to_unicode(exc) - raise exception.Invalid(reason=err_msg) - - return query - - -def _events_filter_and_page_query(context, query, - limit=None, marker=None, - sort_keys=None, sort_dir=None, - filters=None): - if filters is None: - filters = {} - - sort_key_map = {rpc_api.EVENT_TIMESTAMP: models.Event.created_at.key, - rpc_api.EVENT_RES_TYPE: models.Event.resource_type.key} - valid_sort_keys = _get_sort_keys(sort_keys, sort_key_map) - - query = db_filters.exact_filter(query, models.Event, filters) - - return _events_paginate_query(context, query, models.Event, limit, - valid_sort_keys, marker, sort_dir) - - -def event_count_all_by_stack(context, stack_id): - query = context.session.query(func.count(models.Event.id)) - return query.filter_by(stack_id=stack_id).scalar() - - -def _find_rpd_references(context, stack_id): - ev_ref_ids = set(e.rsrc_prop_data_id for e - in _query_all_events_by_stack(context, stack_id).all()) - rsrc_ref_ids = set(r.rsrc_prop_data_id for r - in context.session.query(models.Resource).filter_by( - stack_id=stack_id).all()) - return ev_ref_ids | rsrc_ref_ids - - -def _all_backup_stack_ids(context, stack_id): - """Iterate over all the IDs of all stacks related as stack/backup pairs. - - All backup stacks of a main stack, past and present (i.e. including those - that are soft deleted), are included. The main stack itself is also - included if the initial ID passed in is for a backup stack. The initial ID - passed in is never included in the output. - """ - query = context.session.query(models.Stack) - stack = query.get(stack_id) - if stack is None: - LOG.error('Stack %s not found', stack_id) - return - is_backup = stack.name.endswith('*') - - if is_backup: - main = query.get(stack.owner_id) - if main is None: - LOG.error('Main stack for backup "%s" %s not found', - stack.name, stack_id) - return - yield main.id - for backup_id in _all_backup_stack_ids(context, main.id): - if backup_id != stack_id: - yield backup_id - else: - q_backup = query.filter(sqlalchemy.or_( - models.Stack.tenant == context.tenant_id, - models.Stack.stack_user_project_id == context.tenant_id)) - q_backup = q_backup.filter_by(name=stack.name + '*') - q_backup = q_backup.filter_by(owner_id=stack_id) - for backup in q_backup.all(): - yield backup.id - - -def _delete_event_rows(context, stack_id, limit): - # MySQL does not support LIMIT in subqueries, - # sqlite does not support JOIN in DELETE. - # So we must manually supply the IN() values. - # pgsql SHOULD work with the pure DELETE/JOIN below but that must be - # confirmed via integration tests. - session = context.session - with session.begin(): - query = _query_all_events_by_stack(context, stack_id) - query = query.order_by(models.Event.id).limit(limit) - id_pairs = [(e.id, e.rsrc_prop_data_id) for e in query.all()] - if not id_pairs: - return 0 - (ids, rsrc_prop_ids) = zip(*id_pairs) - max_id = ids[-1] - # delete the events - retval = session.query(models.Event).filter( - models.Event.id <= max_id).filter( - models.Event.stack_id == stack_id).delete() - - # delete unreferenced resource_properties_data - def del_rpd(rpd_ids): - if not rpd_ids: - return - q_rpd = session.query(models.ResourcePropertiesData) - q_rpd = q_rpd.filter(models.ResourcePropertiesData.id.in_(rpd_ids)) - q_rpd.delete(synchronize_session=False) - - if rsrc_prop_ids: - clr_prop_ids = set(rsrc_prop_ids) - _find_rpd_references(context, - stack_id) - clr_prop_ids.discard(None) - try: - del_rpd(clr_prop_ids) - except db_exception.DBReferenceError: - LOG.debug('Checking backup/stack pairs for RPD references') - found = False - for partner_stack_id in _all_backup_stack_ids(context, - stack_id): - found = True - clr_prop_ids -= _find_rpd_references(context, - partner_stack_id) - if not found: - LOG.debug('No backup/stack pairs found for %s', stack_id) - raise - del_rpd(clr_prop_ids) - - return retval - - -@retry_on_db_error -def event_create(context, values): - if 'stack_id' in values and cfg.CONF.max_events_per_stack: - # only count events and purge on average - # 200.0/cfg.CONF.event_purge_batch_size percent of the time. - check = (2.0 / cfg.CONF.event_purge_batch_size) > random.uniform(0, 1) - if (check and - (event_count_all_by_stack(context, values['stack_id']) >= - cfg.CONF.max_events_per_stack)): - # prune - try: - _delete_event_rows(context, values['stack_id'], - cfg.CONF.event_purge_batch_size) - except db_exception.DBError as exc: - LOG.error('Failed to purge events: %s', str(exc)) - event_ref = models.Event() - event_ref.update(values) - event_ref.save(context.session) - return event_ref - - -def software_config_create(context, values): - obj_ref = models.SoftwareConfig() - obj_ref.update(values) - obj_ref.save(context.session) - return obj_ref - - -def software_config_get(context, config_id): - result = context.session.query(models.SoftwareConfig).get(config_id) - if (result is not None and context is not None and not context.is_admin and - result.tenant != context.tenant_id): - result = None - - if not result: - raise exception.NotFound(_('Software config with id %s not found') % - config_id) - return result - - -def software_config_get_all(context, limit=None, marker=None): - query = context.session.query(models.SoftwareConfig) - if not context.is_admin: - query = query.filter_by(tenant=context.tenant_id) - return _paginate_query(context, query, models.SoftwareConfig, - limit=limit, marker=marker).all() - - -def software_config_delete(context, config_id): - config = software_config_get(context, config_id) - # Query if the software config has been referenced by deployment. - result = context.session.query(models.SoftwareDeployment).filter_by( - config_id=config_id).first() - if result: - msg = (_("Software config with id %s can not be deleted as " - "it is referenced.") % config_id) - raise exception.InvalidRestrictedAction(message=msg) - with context.session.begin(): - context.session.delete(config) - - -def software_deployment_create(context, values): - obj_ref = models.SoftwareDeployment() - obj_ref.update(values) - session = context.session - - with session.begin(): - obj_ref.save(session) - - return obj_ref - - -def software_deployment_get(context, deployment_id): - result = context.session.query( - models.SoftwareDeployment).get(deployment_id) - if (result is not None and context is not None and not context.is_admin and - context.tenant_id not in (result.tenant, - result.stack_user_project_id)): - result = None - - if not result: - raise exception.NotFound(_('Deployment with id %s not found') % - deployment_id) - return result - - -def software_deployment_get_all(context, server_id=None): - sd = models.SoftwareDeployment - query = context.session.query(sd).order_by(sd.created_at) - if not context.is_admin: - query = query.filter(sqlalchemy.or_( - sd.tenant == context.tenant_id, - sd.stack_user_project_id == context.tenant_id)) - if server_id: - query = query.filter_by(server_id=server_id) - - return query.all() - - -def software_deployment_update(context, deployment_id, values): - deployment = software_deployment_get(context, deployment_id) - update_and_save(context, deployment, values) - return deployment - - -def software_deployment_delete(context, deployment_id): - deployment = software_deployment_get(context, deployment_id) - session = context.session - with session.begin(): - session.delete(deployment) - - -def snapshot_create(context, values): - obj_ref = models.Snapshot() - obj_ref.update(values) - obj_ref.save(context.session) - return obj_ref - - -def snapshot_get(context, snapshot_id): - result = context.session.query(models.Snapshot).get(snapshot_id) - if (result is not None and context is not None and - context.tenant_id != result.tenant): - result = None - - if not result: - raise exception.NotFound(_('Snapshot with id %s not found') % - snapshot_id) - return result - - -def snapshot_get_by_stack(context, snapshot_id, stack): - snapshot = snapshot_get(context, snapshot_id) - if snapshot.stack_id != stack.id: - raise exception.SnapshotNotFound(snapshot=snapshot_id, - stack=stack.name) - - return snapshot - - -def snapshot_update(context, snapshot_id, values): - snapshot = snapshot_get(context, snapshot_id) - snapshot.update(values) - snapshot.save(context.session) - return snapshot - - -def snapshot_delete(context, snapshot_id): - snapshot = snapshot_get(context, snapshot_id) - with context.session.begin(): - context.session.delete(snapshot) - - -def snapshot_get_all(context, stack_id): - return context.session.query(models.Snapshot).filter_by( - stack_id=stack_id, tenant=context.tenant_id) - - -def service_create(context, values): - service = models.Service() - service.update(values) - service.save(context.session) - return service - - -def service_update(context, service_id, values): - service = service_get(context, service_id) - values.update({'updated_at': timeutils.utcnow()}) - service.update(values) - service.save(context.session) - return service - - -def service_delete(context, service_id, soft_delete=True): - service = service_get(context, service_id) - session = context.session - with session.begin(): - if soft_delete: - delete_softly(context, service) - else: - session.delete(service) - - -def service_get(context, service_id): - result = context.session.query(models.Service).get(service_id) - if result is None: - raise exception.EntityNotFound(entity='Service', name=service_id) - return result - - -def service_get_all(context): - return (context.session.query(models.Service). - filter_by(deleted_at=None).all()) - - -def service_get_all_by_args(context, host, binary, hostname): - return (context.session.query(models.Service). - filter_by(host=host). - filter_by(binary=binary). - filter_by(hostname=hostname).all()) - - -def purge_deleted(age, granularity='days', project_id=None, batch_size=20): - def _validate_positive_integer(val, argname): - try: - val = int(val) - except ValueError: - raise exception.Error(_("%s should be an integer") % argname) - - if val < 0: - raise exception.Error(_("%s should be a positive integer") - % argname) - return val - - age = _validate_positive_integer(age, 'age') - batch_size = _validate_positive_integer(batch_size, 'batch_size') - - if granularity not in ('days', 'hours', 'minutes', 'seconds'): - raise exception.Error( - _("granularity should be days, hours, minutes, or seconds")) - - if granularity == 'days': - age = age * 86400 - elif granularity == 'hours': - age = age * 3600 - elif granularity == 'minutes': - age = age * 60 - - time_line = timeutils.utcnow() - datetime.timedelta(seconds=age) - engine = get_engine() - meta = sqlalchemy.MetaData() - meta.bind = engine - - stack = sqlalchemy.Table('stack', meta, autoload=True) - service = sqlalchemy.Table('service', meta, autoload=True) - - # Purge deleted services - srvc_del = service.delete().where(service.c.deleted_at < time_line) - engine.execute(srvc_del) - - # find the soft-deleted stacks that are past their expiry - sel = sqlalchemy.select([stack.c.id, stack.c.raw_template_id, - stack.c.prev_raw_template_id, - stack.c.user_creds_id, - stack.c.action, - stack.c.status, - stack.c.name]) - if project_id: - stack_where = sel.where(and_( - stack.c.tenant == project_id, - stack.c.deleted_at < time_line)) - else: - stack_where = sel.where( - stack.c.deleted_at < time_line) - - stacks = engine.execute(stack_where) - - while True: - next_stacks_to_purge = list(itertools.islice(stacks, batch_size)) - if len(next_stacks_to_purge): - _purge_stacks(next_stacks_to_purge, engine, meta) - else: - break - - -@oslo_db_api.wrap_db_retry(max_retries=3, retry_on_deadlock=True, - retry_interval=0.5, inc_retry_interval=True) -def _purge_stacks(stack_infos, engine, meta): - """Purge some stacks and their releated events, raw_templates, etc. - - stack_infos is a list of lists of selected stack columns: - [[id, raw_template_id, prev_raw_template_id, user_creds_id, - action, status, name], ...] - """ - - stack = sqlalchemy.Table('stack', meta, autoload=True) - stack_lock = sqlalchemy.Table('stack_lock', meta, autoload=True) - stack_tag = sqlalchemy.Table('stack_tag', meta, autoload=True) - resource = sqlalchemy.Table('resource', meta, autoload=True) - resource_data = sqlalchemy.Table('resource_data', meta, autoload=True) - resource_properties_data = sqlalchemy.Table( - 'resource_properties_data', meta, autoload=True) - event = sqlalchemy.Table('event', meta, autoload=True) - raw_template = sqlalchemy.Table('raw_template', meta, autoload=True) - raw_template_files = sqlalchemy.Table('raw_template_files', meta, - autoload=True) - user_creds = sqlalchemy.Table('user_creds', meta, autoload=True) - syncpoint = sqlalchemy.Table('sync_point', meta, autoload=True) - - stack_info_str = ','.join([str(i) for i in stack_infos]) - LOG.info("Purging stacks %s", stack_info_str) - - # TODO(cwolfe): find a way to make this re-entrant with - # reasonably sized transactions (good luck), or add - # a cleanup for orphaned rows. - stack_ids = [stack_info[0] for stack_info in stack_infos] - # delete stack locks (just in case some got stuck) - stack_lock_del = stack_lock.delete().where( - stack_lock.c.stack_id.in_(stack_ids)) - engine.execute(stack_lock_del) - # delete stack tags - stack_tag_del = stack_tag.delete().where( - stack_tag.c.stack_id.in_(stack_ids)) - engine.execute(stack_tag_del) - # delete resource_data - res_where = sqlalchemy.select([resource.c.id]).where( - resource.c.stack_id.in_(stack_ids)) - res_data_del = resource_data.delete().where( - resource_data.c.resource_id.in_(res_where)) - engine.execute(res_data_del) - # clean up any sync_points that may have lingered - sync_del = syncpoint.delete().where( - syncpoint.c.stack_id.in_(stack_ids)) - engine.execute(sync_del) - - # get rsrc_prop_data_ids to delete - rsrc_prop_data_where = sqlalchemy.select( - [resource.c.rsrc_prop_data_id]).where( - resource.c.stack_id.in_(stack_ids)) - rsrc_prop_data_ids = set( - [i[0] for i in list(engine.execute(rsrc_prop_data_where))]) - rsrc_prop_data_where = sqlalchemy.select( - [resource.c.attr_data_id]).where( - resource.c.stack_id.in_(stack_ids)) - rsrc_prop_data_ids.update( - [i[0] for i in list(engine.execute(rsrc_prop_data_where))]) - rsrc_prop_data_where = sqlalchemy.select( - [event.c.rsrc_prop_data_id]).where( - event.c.stack_id.in_(stack_ids)) - rsrc_prop_data_ids.update( - [i[0] for i in list(engine.execute(rsrc_prop_data_where))]) - # delete events - event_del = event.delete().where(event.c.stack_id.in_(stack_ids)) - engine.execute(event_del) - # delete resources (normally there shouldn't be any) - res_del = resource.delete().where(resource.c.stack_id.in_(stack_ids)) - engine.execute(res_del) - # delete resource_properties_data - if rsrc_prop_data_ids: # keep rpd's in events - rsrc_prop_data_where = sqlalchemy.select( - [event.c.rsrc_prop_data_id]).where( - event.c.rsrc_prop_data_id.in_(rsrc_prop_data_ids)) - ids = list(engine.execute(rsrc_prop_data_where)) - rsrc_prop_data_ids.difference_update([i[0] for i in ids]) - if rsrc_prop_data_ids: # keep rpd's in resources - rsrc_prop_data_where = sqlalchemy.select( - [resource.c.rsrc_prop_data_id]).where( - resource.c.rsrc_prop_data_id.in_(rsrc_prop_data_ids)) - ids = list(engine.execute(rsrc_prop_data_where)) - rsrc_prop_data_ids.difference_update([i[0] for i in ids]) - if rsrc_prop_data_ids: # delete if we have any - rsrc_prop_data_del = resource_properties_data.delete().where( - resource_properties_data.c.id.in_(rsrc_prop_data_ids)) - engine.execute(rsrc_prop_data_del) - # delete the stacks - stack_del = stack.delete().where(stack.c.id.in_(stack_ids)) - engine.execute(stack_del) - # delete orphaned raw templates - raw_template_ids = [i[1] for i in stack_infos if i[1] is not None] - raw_template_ids.extend(i[2] for i in stack_infos if i[2] is not None) - if raw_template_ids: # keep those still referenced - raw_tmpl_sel = sqlalchemy.select([stack.c.raw_template_id]).where( - stack.c.raw_template_id.in_(raw_template_ids)) - raw_tmpl = [i[0] for i in engine.execute(raw_tmpl_sel)] - raw_template_ids = set(raw_template_ids) - set(raw_tmpl) - if raw_template_ids: # keep those still referenced (previous tmpl) - raw_tmpl_sel = sqlalchemy.select( - [stack.c.prev_raw_template_id]).where( - stack.c.prev_raw_template_id.in_(raw_template_ids)) - raw_tmpl = [i[0] for i in engine.execute(raw_tmpl_sel)] - raw_template_ids = raw_template_ids - set(raw_tmpl) - if raw_template_ids: # delete raw_templates if we have any - raw_tmpl_file_sel = sqlalchemy.select( - [raw_template.c.files_id]).where( - raw_template.c.id.in_(raw_template_ids)) - raw_tmpl_file_ids = [i[0] for i in engine.execute( - raw_tmpl_file_sel)] - raw_templ_del = raw_template.delete().where( - raw_template.c.id.in_(raw_template_ids)) - engine.execute(raw_templ_del) - if raw_tmpl_file_ids: # keep _files still referenced - raw_tmpl_file_sel = sqlalchemy.select( - [raw_template.c.files_id]).where( - raw_template.c.files_id.in_(raw_tmpl_file_ids)) - raw_tmpl_files = [i[0] for i in engine.execute( - raw_tmpl_file_sel)] - raw_tmpl_file_ids = set(raw_tmpl_file_ids) \ - - set(raw_tmpl_files) - if raw_tmpl_file_ids: # delete _files if we have any - raw_tmpl_file_del = raw_template_files.delete().where( - raw_template_files.c.id.in_(raw_tmpl_file_ids)) - engine.execute(raw_tmpl_file_del) - # purge any user creds that are no longer referenced - user_creds_ids = [i[3] for i in stack_infos if i[3] is not None] - if user_creds_ids: # keep those still referenced - user_sel = sqlalchemy.select([stack.c.user_creds_id]).where( - stack.c.user_creds_id.in_(user_creds_ids)) - users = [i[0] for i in engine.execute(user_sel)] - user_creds_ids = set(user_creds_ids) - set(users) - if user_creds_ids: # delete if we have any - usr_creds_del = user_creds.delete().where( - user_creds.c.id.in_(user_creds_ids)) - engine.execute(usr_creds_del) - - -def sync_point_delete_all_by_stack_and_traversal(context, stack_id, - traversal_id): - rows_deleted = context.session.query(models.SyncPoint).filter_by( - stack_id=stack_id, traversal_id=traversal_id).delete() - return rows_deleted - - -@retry_on_db_error -def sync_point_create(context, values): - values['entity_id'] = str(values['entity_id']) - sync_point_ref = models.SyncPoint() - sync_point_ref.update(values) - sync_point_ref.save(context.session) - return sync_point_ref - - -def sync_point_get(context, entity_id, traversal_id, is_update): - entity_id = str(entity_id) - return context.session.query(models.SyncPoint).get( - (entity_id, traversal_id, is_update) - ) - - -@retry_on_db_error -def sync_point_update_input_data(context, entity_id, - traversal_id, is_update, atomic_key, - input_data): - entity_id = str(entity_id) - rows_updated = context.session.query(models.SyncPoint).filter_by( - entity_id=entity_id, - traversal_id=traversal_id, - is_update=is_update, - atomic_key=atomic_key - ).update({"input_data": input_data, "atomic_key": atomic_key + 1}) - return rows_updated - - -def _crypt_action(encrypt): - if encrypt: - return _('encrypt') - return _('decrypt') - - -def _db_encrypt_or_decrypt_template_params( - ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False): - from heat.engine import template - session = ctxt.session - excs = [] - query = session.query(models.RawTemplate) - template_batches = _get_batch( - session, ctxt=ctxt, query=query, model=models.RawTemplate, - batch_size=batch_size) - next_batch = list(itertools.islice(template_batches, batch_size)) - while next_batch: - with session.begin(): - for raw_template in next_batch: - try: - if verbose: - LOG.info("Processing raw_template %s...", - raw_template.id) - env = raw_template.environment - needs_update = False - - # using "in env.keys()" so an exception is raised - # if env is something weird like a string. - if env is None or 'parameters' not in env.keys(): - continue - if 'encrypted_param_names' in env: - encrypted_params = env['encrypted_param_names'] - else: - encrypted_params = [] - - if encrypt: - tmpl = template.Template.load( - ctxt, raw_template.id, raw_template) - param_schemata = tmpl.param_schemata() - if not param_schemata: - continue - - for param_name, param_val in env['parameters'].items(): - if (param_name in encrypted_params or - param_name not in param_schemata or - not param_schemata[param_name].hidden): - continue - encrypted_val = crypt.encrypt( - str(param_val), encryption_key) - env['parameters'][param_name] = encrypted_val - encrypted_params.append(param_name) - needs_update = True - if needs_update: - newenv = env.copy() - newenv['encrypted_param_names'] = encrypted_params - else: # decrypt - for param_name in encrypted_params: - method, value = env['parameters'][param_name] - decrypted_val = crypt.decrypt(method, value, - encryption_key) - env['parameters'][param_name] = decrypted_val - needs_update = True - if needs_update: - newenv = env.copy() - newenv['encrypted_param_names'] = [] - - if needs_update: - raw_template_update(ctxt, raw_template.id, - {'environment': newenv}) - except Exception as exc: - LOG.exception('Failed to %(crypt_action)s parameters ' - 'of raw template %(id)d', - {'id': raw_template.id, - 'crypt_action': _crypt_action(encrypt)}) - excs.append(exc) - continue - finally: - if verbose: - LOG.info("Finished %(crypt_action)s processing of " - "raw_template %(id)d.", - {'id': raw_template.id, - 'crypt_action': _crypt_action(encrypt)}) - next_batch = list(itertools.islice(template_batches, batch_size)) - return excs - - -def _db_encrypt_or_decrypt_resource_prop_data_legacy( - ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False): - session = ctxt.session - excs = [] - - # Older resources may have properties_data in the legacy column, - # so update those as needed - query = session.query(models.Resource).filter( - models.Resource.properties_data_encrypted.isnot(encrypt)) - resource_batches = _get_batch( - session=session, ctxt=ctxt, query=query, model=models.Resource, - batch_size=batch_size) - next_batch = list(itertools.islice(resource_batches, batch_size)) - while next_batch: - with session.begin(): - for resource in next_batch: - if not resource.properties_data: - continue - try: - if verbose: - LOG.info("Processing resource %s...", - resource.id) - if encrypt: - result = crypt.encrypted_dict(resource.properties_data, - encryption_key) - else: - result = crypt.decrypted_dict(resource.properties_data, - encryption_key) - resource_update(ctxt, resource.id, - {'properties_data': result, - 'properties_data_encrypted': encrypt}, - resource.atomic_key) - except Exception as exc: - LOG.exception('Failed to %(crypt_action)s ' - 'properties_data of resource %(id)d' % - {'id': resource.id, - 'crypt_action': _crypt_action(encrypt)}) - excs.append(exc) - continue - finally: - if verbose: - LOG.info("Finished processing resource %s.", - resource.id) - next_batch = list(itertools.islice(resource_batches, batch_size)) - return excs - - -def _db_encrypt_or_decrypt_resource_prop_data( - ctxt, encryption_key, encrypt=False, batch_size=50, verbose=False): - session = ctxt.session - excs = [] - - # Older resources may have properties_data in the legacy column, - # so update those as needed - query = session.query(models.ResourcePropertiesData).filter( - models.ResourcePropertiesData.encrypted.isnot(encrypt)) - rpd_batches = _get_batch( - session=session, ctxt=ctxt, query=query, - model=models.ResourcePropertiesData, batch_size=batch_size) - next_batch = list(itertools.islice(rpd_batches, batch_size)) - while next_batch: - with session.begin(): - for rpd in next_batch: - if not rpd.data: - continue - try: - if verbose: - LOG.info("Processing resource_properties_data " - "%s...", rpd.id) - if encrypt: - result = crypt.encrypted_dict(rpd.data, - encryption_key) - else: - result = crypt.decrypted_dict(rpd.data, - encryption_key) - rpd.update({'data': result, - 'encrypted': encrypt}) - except Exception as exc: - LOG.exception( - "Failed to %(crypt_action)s " - "data of resource_properties_data %(id)d" % - {'id': rpd.id, - 'crypt_action': _crypt_action(encrypt)}) - excs.append(exc) - continue - finally: - if verbose: - LOG.info( - "Finished processing resource_properties_data" - " %s.", rpd.id) - next_batch = list(itertools.islice(rpd_batches, batch_size)) - return excs - - -def db_encrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50, - verbose=False): - """Encrypt parameters and properties for all templates in db. - - :param ctxt: RPC context - :param encryption_key: key that will be used for parameter and property - encryption - :param batch_size: number of templates requested from DB in each iteration. - 50 means that heat requests 50 templates, encrypt them - and proceed with next 50 items. - :param verbose: log an INFO message when processing of each raw_template or - resource begins or ends - :return: list of exceptions encountered during encryption - """ - excs = [] - excs.extend(_db_encrypt_or_decrypt_template_params( - ctxt, encryption_key, True, batch_size, verbose)) - excs.extend(_db_encrypt_or_decrypt_resource_prop_data( - ctxt, encryption_key, True, batch_size, verbose)) - excs.extend(_db_encrypt_or_decrypt_resource_prop_data_legacy( - ctxt, encryption_key, True, batch_size, verbose)) - return excs - - -def db_decrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50, - verbose=False): - """Decrypt parameters and properties for all templates in db. - - :param ctxt: RPC context - :param encryption_key: key that will be used for parameter and property - decryption - :param batch_size: number of templates requested from DB in each iteration. - 50 means that heat requests 50 templates, encrypt them - and proceed with next 50 items. - :param verbose: log an INFO message when processing of each raw_template or - resource begins or ends - :return: list of exceptions encountered during decryption - """ - excs = [] - excs.extend(_db_encrypt_or_decrypt_template_params( - ctxt, encryption_key, False, batch_size, verbose)) - excs.extend(_db_encrypt_or_decrypt_resource_prop_data( - ctxt, encryption_key, False, batch_size, verbose)) - excs.extend(_db_encrypt_or_decrypt_resource_prop_data_legacy( - ctxt, encryption_key, False, batch_size, verbose)) - return excs - - -def db_properties_data_migrate(ctxt, batch_size=50): - """Migrate properties data from legacy columns to new location in db. - - :param ctxt: RPC context - :param batch_size: number of templates requested from DB in each iteration. - 50 means that heat requests 50 templates, encrypt them - and proceed with next 50 items. - """ - session = ctxt.session - - query = session.query(models.Resource).filter(and_( - models.Resource.properties_data.isnot(None), - models.Resource.rsrc_prop_data_id.is_(None))) - resource_batches = _get_batch( - session=session, ctxt=ctxt, query=query, - model=models.Resource, batch_size=batch_size) - next_batch = list(itertools.islice(resource_batches, batch_size)) - while next_batch: - with session.begin(): - for resource in next_batch: - try: - encrypted = resource.properties_data_encrypted - if encrypted is None: - LOG.warning( - 'Unexpected: resource.encrypted is None for ' - 'resource id %s for legacy ' - 'resource.properties_data, assuming False.', - resource.id) - encrypted = False - rsrc_prop_data = resource_prop_data_create( - ctxt, {'encrypted': encrypted, - 'data': resource.properties_data}) - resource_update(ctxt, resource.id, - {'properties_data_encrypted': None, - 'properties_data': None, - 'rsrc_prop_data_id': rsrc_prop_data.id}, - resource.atomic_key) - except Exception: - LOG.exception('Failed to migrate properties_data for ' - 'resource %d', resource.id) - continue - next_batch = list(itertools.islice(resource_batches, batch_size)) - - query = session.query(models.Event).filter(and_( - models.Event.resource_properties.isnot(None), - models.Event.rsrc_prop_data_id.is_(None))) - event_batches = _get_batch( - session=session, ctxt=ctxt, query=query, - model=models.Event, batch_size=batch_size) - next_batch = list(itertools.islice(event_batches, batch_size)) - while next_batch: - with session.begin(): - for event in next_batch: - try: - prop_data = event.resource_properties - rsrc_prop_data = resource_prop_data_create( - ctxt, {'encrypted': False, - 'data': prop_data}) - event.update({'resource_properties': None, - 'rsrc_prop_data_id': rsrc_prop_data.id}) - except Exception: - LOG.exception('Failed to migrate resource_properties ' - 'for event %d', event.id) - continue - next_batch = list(itertools.islice(event_batches, batch_size)) - - -def _get_batch(session, ctxt, query, model, batch_size=50): - last_batch_marker = None - while True: - results = _paginate_query( - context=ctxt, query=query, model=model, limit=batch_size, - marker=last_batch_marker).all() - if not results: - break - else: - for result in results: - yield result - last_batch_marker = results[-1].id - - -def reset_stack_status(context, stack_id, stack=None): - session = context.session - if stack is None: - stack = session.query(models.Stack).get(stack_id) - - if stack is None: - raise exception.NotFound(_('Stack with id %s not found') % stack_id) - - with session.begin(): - query = session.query(models.Resource).filter_by( - status='IN_PROGRESS', stack_id=stack_id) - query.update({'status': 'FAILED', - 'status_reason': 'Stack status manually reset', - 'engine_id': None}) - - query = session.query(models.ResourceData) - query = query.join(models.Resource) - query = query.filter_by(stack_id=stack_id) - query = query.filter( - models.ResourceData.key.in_(heat_environment.HOOK_TYPES)) - data_ids = [data.id for data in query] - - if data_ids: - query = session.query(models.ResourceData) - query = query.filter(models.ResourceData.id.in_(data_ids)) - query.delete(synchronize_session='fetch') - - query = session.query(models.Stack).filter_by(owner_id=stack_id) - for child in query: - reset_stack_status(context, child.id, child) - - with session.begin(): - if stack.status == 'IN_PROGRESS': - stack.status = 'FAILED' - stack.status_reason = 'Stack status manually reset' - - session.query( - models.StackLock - ).filter_by(stack_id=stack_id).delete() diff --git a/heat/db/sqlalchemy/filters.py b/heat/db/sqlalchemy/filters.py deleted file mode 100644 index 6c7b8cf24..000000000 --- a/heat/db/sqlalchemy/filters.py +++ /dev/null @@ -1,44 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -def exact_filter(query, model, filters): - """Applies exact match filtering to a query. - - Returns the updated query. Modifies filters argument to remove - filters consumed. - - :param query: query to apply filters to - :param model: model object the query applies to, for IN-style - filtering - :param filters: dictionary of filters; values that are lists, - tuples, sets, or frozensets cause an 'IN' test to - be performed, while exact matching ('==' operator) - is used for other values - """ - - filter_dict = {} - if filters is None: - filters = {} - - for key, value in filters.items(): - if isinstance(value, (list, tuple, set, frozenset)): - column_attr = getattr(model, key) - query = query.filter(column_attr.in_(value)) - else: - filter_dict[key] = value - - if filter_dict: - query = query.filter_by(**filter_dict) - - return query diff --git a/heat/db/sqlalchemy/migration.py b/heat/db/sqlalchemy/migration.py deleted file mode 100644 index 67ac4c3b4..000000000 --- a/heat/db/sqlalchemy/migration.py +++ /dev/null @@ -1,117 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os - -from alembic import command as alembic_api -from alembic import config as alembic_config -from alembic import migration as alembic_migration -from oslo_log import log as logging -import sqlalchemy as sa - -from heat.db.sqlalchemy import api as db_api - -LOG = logging.getLogger(__name__) - -ALEMBIC_INIT_VERSION = 'c6214ca60943' - - -def _migrate_legacy_database(engine, connection, config): - """Check if database is a legacy sqlalchemy-migrate-managed database. - - If it is, migrate it by "stamping" the initial alembic schema. - """ - # If the database doesn't have the sqlalchemy-migrate legacy migration - # table, we don't have anything to do - if not sa.inspect(engine).has_table('migrate_version'): - return - - # Likewise, if we've already migrated to alembic, we don't have anything to - # do - context = alembic_migration.MigrationContext.configure(connection) - if context.get_current_revision(): - return - - # We have legacy migrations but no alembic migration. Stamp (dummy apply) - # the initial alembic migration. - - LOG.info( - 'The database is still under sqlalchemy-migrate control; ' - 'fake applying the initial alembic migration' - ) - alembic_api.stamp(config, ALEMBIC_INIT_VERSION) - - -def _find_alembic_conf(): - """Get the project's alembic configuration - - :returns: An instance of ``alembic.config.Config`` - """ - path = os.path.join( - os.path.abspath(os.path.dirname(__file__)), - 'alembic.ini', - ) - config = alembic_config.Config(os.path.abspath(path)) - # we don't want to use the logger configuration from the file, which is - # only really intended for the CLI - # https://stackoverflow.com/a/42691781/613428 - config.attributes['configure_logger'] = False - return config - - -def _upgrade_alembic(engine, config, version): - # re-use the connection rather than creating a new one - with engine.begin() as connection: - config.attributes['connection'] = connection - _migrate_legacy_database(engine, connection, config) - alembic_api.upgrade(config, version or 'head') - - -def db_sync(version=None, engine=None): - """Migrate the database to `version` or the most recent version.""" - # if the user requested a specific version, check if it's an integer: if - # so, we're almost certainly in sqlalchemy-migrate land and won't support - # that - if version is not None and version.isdigit(): - raise ValueError( - 'You requested an sqlalchemy-migrate database version; this is ' - 'no longer supported' - ) - - if engine is None: - engine = db_api.get_engine() - - config = _find_alembic_conf() - - # discard the URL encoded in alembic.ini in favour of the URL configured - # for the engine by the database fixtures, casting from - # 'sqlalchemy.engine.url.URL' to str in the process. This returns a - # RFC-1738 quoted URL, which means that a password like "foo@" will be - # turned into "foo%40". This in turns causes a problem for - # set_main_option() because that uses ConfigParser.set, which (by design) - # uses *python* interpolation to write the string out ... where "%" is the - # special python interpolation character! Avoid this mismatch by quoting - # all %'s for the set below. - engine_url = str(engine.url).replace('%', '%%') - config.set_main_option('sqlalchemy.url', str(engine_url)) - - LOG.info('Applying migration(s)') - _upgrade_alembic(engine, config, version) - LOG.info('Migration(s) applied') - - -def db_version(): - """Get database version.""" - engine = db_api.get_engine() - with engine.connect() as connection: - m_context = alembic_migration.MigrationContext.configure(connection) - return m_context.get_current_revision() diff --git a/heat/db/sqlalchemy/migrations/README.rst b/heat/db/sqlalchemy/migrations/README.rst deleted file mode 100644 index b2283fc82..000000000 --- a/heat/db/sqlalchemy/migrations/README.rst +++ /dev/null @@ -1,15 +0,0 @@ -Database migrations -=================== - -This directory contains migrations for the database. These are implemented -using `alembic`__, a lightweight database migration tool designed for usage -with `SQLAlchemy`__. - -The best place to start understanding Alembic is with its own `tutorial`__. You -can also play around with the :command:`alembic` command:: - - $ alembic --help - -.. __: https://alembic.sqlalchemy.org/en/latest/ -.. __: https://www.sqlalchemy.org/ -.. __: https://alembic.sqlalchemy.org/en/latest/tutorial.html diff --git a/heat/db/sqlalchemy/migrations/env.py b/heat/db/sqlalchemy/migrations/env.py deleted file mode 100644 index 84413f313..000000000 --- a/heat/db/sqlalchemy/migrations/env.py +++ /dev/null @@ -1,94 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from logging.config import fileConfig - -from alembic import context -from sqlalchemy import engine_from_config -from sqlalchemy import pool - -from heat.db.sqlalchemy import models - -# this is the Alembic Config object, which provides -# access to the values within the .ini file in use. -config = context.config - -# Interpret the config file for Python logging. -# This line sets up loggers basically. -if config.attributes.get('configure_logger', True): - fileConfig(config.config_file_name) - -# this is the MetaData object for the various models in the database -target_metadata = models.BASE.metadata - - -def run_migrations_offline() -> None: - """Run migrations in 'offline' mode. - - This configures the context with just a URL and not an Engine, though an - Engine is acceptable here as well. By skipping the Engine creation we - don't even need a DBAPI to be available. - - Calls to context.execute() here emit the given string to the - script output. - """ - url = config.get_main_option("sqlalchemy.url") - context.configure( - url=url, - target_metadata=target_metadata, - literal_binds=True, - dialect_opts={"paramstyle": "named"}, - ) - - with context.begin_transaction(): - context.run_migrations() - - -def run_migrations_online() -> None: - """Run migrations in 'online' mode. - - In this scenario we need to create an Engine and associate a connection - with the context. - - This is modified from the default based on the below, since we want to - share an engine when unit testing so in-memory database testing actually - works. - - https://alembic.sqlalchemy.org/en/latest/cookbook.html#connection-sharing - """ - connectable = config.attributes.get('connection', None) - - if connectable is None: - # only create Engine if we don't have a Connection from the outside - connectable = engine_from_config( - config.get_section(config.config_ini_section, {}), - prefix="sqlalchemy.", - poolclass=pool.NullPool, - ) - - # when connectable is already a Connection object, calling connect() gives - # us a *branched connection* - - with connectable.connect() as connection: - context.configure( - connection=connection, - target_metadata=target_metadata, - ) - - with context.begin_transaction(): - context.run_migrations() - - -if context.is_offline_mode(): - run_migrations_offline() -else: - run_migrations_online() diff --git a/heat/db/sqlalchemy/migrations/script.py.mako b/heat/db/sqlalchemy/migrations/script.py.mako deleted file mode 100644 index 120937fbc..000000000 --- a/heat/db/sqlalchemy/migrations/script.py.mako +++ /dev/null @@ -1,20 +0,0 @@ -"""${message} - -Revision ID: ${up_revision} -Revises: ${down_revision | comma,n} -Create Date: ${create_date} -""" - -from alembic import op -import sqlalchemy as sa -${imports if imports else ""} - -# revision identifiers, used by Alembic. -revision = ${repr(up_revision)} -down_revision = ${repr(down_revision)} -branch_labels = ${repr(branch_labels)} -depends_on = ${repr(depends_on)} - - -def upgrade() -> None: - ${upgrades if upgrades else "pass"} diff --git a/heat/db/sqlalchemy/migrations/versions/c6214ca60943_initial_revision.py b/heat/db/sqlalchemy/migrations/versions/c6214ca60943_initial_revision.py deleted file mode 100644 index b6a4e7bbc..000000000 --- a/heat/db/sqlalchemy/migrations/versions/c6214ca60943_initial_revision.py +++ /dev/null @@ -1,392 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Initial revision - -Revision ID: c6214ca60943 -Revises: -Create Date: 2023-03-22 18:04:02.387269 -""" - -from alembic import op -import sqlalchemy as sa - -import heat.db.sqlalchemy.types - -# revision identifiers, used by Alembic. -revision = 'c6214ca60943' -down_revision = None -branch_labels = None -depends_on = None - - -def upgrade() -> None: - op.create_table( - 'raw_template_files', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('files', heat.db.sqlalchemy.types.Json(), nullable=True), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.PrimaryKeyConstraint('id'), - mysql_engine='InnoDB', - ) - op.create_table( - 'resource_properties_data', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('data', heat.db.sqlalchemy.types.Json(), nullable=True), - sa.Column('encrypted', sa.Boolean(), nullable=True), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.PrimaryKeyConstraint('id'), - mysql_engine='InnoDB', - ) - op.create_table( - 'service', - sa.Column('id', sa.String(length=36), nullable=False), - sa.Column('engine_id', sa.String(length=36), nullable=False), - sa.Column('host', sa.String(length=255), nullable=False), - sa.Column('hostname', sa.String(length=255), nullable=False), - sa.Column('binary', sa.String(length=255), nullable=False), - sa.Column('topic', sa.String(length=255), nullable=False), - sa.Column('report_interval', sa.Integer(), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('deleted_at', sa.DateTime(), nullable=True), - sa.PrimaryKeyConstraint('id'), - mysql_engine='InnoDB', - ) - op.create_table( - 'software_config', - sa.Column('id', sa.String(length=36), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('name', sa.String(length=255), nullable=True), - sa.Column('group', sa.String(length=255), nullable=True), - sa.Column('config', heat.db.sqlalchemy.types.Json(), nullable=True), - sa.Column('tenant', sa.String(length=64), nullable=False), - sa.PrimaryKeyConstraint('id'), - mysql_engine='InnoDB', - ) - op.create_index( - op.f('ix_software_config_tenant'), - 'software_config', - ['tenant'], - unique=False, - ) - op.create_table( - 'user_creds', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('username', sa.String(length=255), nullable=True), - sa.Column('password', sa.String(length=255), nullable=True), - sa.Column('region_name', sa.String(length=255), nullable=True), - sa.Column('decrypt_method', sa.String(length=64), nullable=True), - sa.Column('tenant', sa.String(length=1024), nullable=True), - sa.Column('auth_url', sa.Text(), nullable=True), - sa.Column('tenant_id', sa.String(length=256), nullable=True), - sa.Column('trust_id', sa.String(length=255), nullable=True), - sa.Column('trustor_user_id', sa.String(length=64), nullable=True), - sa.PrimaryKeyConstraint('id'), - mysql_engine='InnoDB', - ) - op.create_table( - 'raw_template', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('template', heat.db.sqlalchemy.types.Json(), nullable=True), - sa.Column('files', heat.db.sqlalchemy.types.Json(), nullable=True), - sa.Column( - 'environment', heat.db.sqlalchemy.types.Json(), nullable=True - ), - sa.Column('files_id', sa.Integer(), nullable=True), - sa.ForeignKeyConstraint( - ['files_id'], - ['raw_template_files.id'], - name='raw_tmpl_files_fkey_ref', - ), - sa.PrimaryKeyConstraint('id'), - mysql_engine='InnoDB', - ) - op.create_table( - 'software_deployment', - sa.Column('id', sa.String(length=36), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('server_id', sa.String(length=36), nullable=False), - sa.Column('config_id', sa.String(length=36), nullable=False), - sa.Column( - 'input_values', heat.db.sqlalchemy.types.Json(), nullable=True - ), - sa.Column( - 'output_values', heat.db.sqlalchemy.types.Json(), nullable=True - ), - sa.Column('action', sa.String(length=255), nullable=True), - sa.Column('status', sa.String(length=255), nullable=True), - sa.Column('status_reason', sa.Text(), nullable=True), - sa.Column('tenant', sa.String(length=64), nullable=False), - sa.Column( - 'stack_user_project_id', sa.String(length=64), nullable=True - ), - sa.ForeignKeyConstraint( - ['config_id'], - ['software_config.id'], - ), - sa.PrimaryKeyConstraint('id'), - ) - op.create_index( - 'ix_software_deployment_created_at', - 'software_deployment', - ['created_at'], - unique=False, - ) - op.create_index( - op.f('ix_software_deployment_server_id'), - 'software_deployment', - ['server_id'], - unique=False, - ) - op.create_index( - op.f('ix_software_deployment_tenant'), - 'software_deployment', - ['tenant'], - unique=False, - ) - op.create_table( - 'stack', - sa.Column('id', sa.String(length=36), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('deleted_at', sa.DateTime(), nullable=True), - sa.Column('name', sa.String(length=255), nullable=True), - sa.Column('raw_template_id', sa.Integer(), nullable=False), - sa.Column('prev_raw_template_id', sa.Integer(), nullable=True), - sa.Column('user_creds_id', sa.Integer(), nullable=True), - sa.Column('username', sa.String(length=256), nullable=True), - sa.Column('owner_id', sa.String(length=36), nullable=True), - sa.Column('action', sa.String(length=255), nullable=True), - sa.Column('status', sa.String(length=255), nullable=True), - sa.Column('status_reason', sa.Text(), nullable=True), - sa.Column('timeout', sa.Integer(), nullable=True), - sa.Column('tenant', sa.String(length=256), nullable=True), - sa.Column('disable_rollback', sa.Boolean(), nullable=False), - sa.Column( - 'stack_user_project_id', sa.String(length=64), nullable=True - ), - sa.Column('backup', sa.Boolean(), nullable=True), - sa.Column('nested_depth', sa.Integer(), nullable=True), - sa.Column('convergence', sa.Boolean(), nullable=True), - sa.Column('current_traversal', sa.String(length=36), nullable=True), - sa.Column( - 'current_deps', heat.db.sqlalchemy.types.Json(), nullable=True - ), - sa.Column( - 'parent_resource_name', sa.String(length=255), nullable=True - ), - sa.ForeignKeyConstraint( - ['prev_raw_template_id'], - ['raw_template.id'], - ), - sa.ForeignKeyConstraint( - ['raw_template_id'], - ['raw_template.id'], - ), - sa.ForeignKeyConstraint( - ['user_creds_id'], - ['user_creds.id'], - ), - sa.PrimaryKeyConstraint('id'), - ) - op.create_index( - 'ix_stack_name', 'stack', ['name'], unique=False, mysql_length=255 - ) - op.create_index( - 'ix_stack_tenant', 'stack', ['tenant'], unique=False, mysql_length=255 - ) - op.create_index( - op.f('ix_stack_owner_id'), 'stack', ['owner_id'], unique=False - ) - op.create_table( - 'event', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('uuid', sa.String(length=36), nullable=True), - sa.Column('stack_id', sa.String(length=36), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('resource_action', sa.String(length=255), nullable=True), - sa.Column('resource_status', sa.String(length=255), nullable=True), - sa.Column('resource_name', sa.String(length=255), nullable=True), - sa.Column( - 'physical_resource_id', sa.String(length=255), nullable=True - ), - sa.Column( - 'resource_status_reason', sa.String(length=255), nullable=True - ), - sa.Column('resource_type', sa.String(length=255), nullable=True), - sa.Column('resource_properties', sa.PickleType(), nullable=True), - sa.Column('rsrc_prop_data_id', sa.Integer(), nullable=True), - sa.ForeignKeyConstraint( - ['rsrc_prop_data_id'], - ['resource_properties_data.id'], - name='ev_rsrc_prop_data_ref', - ), - sa.ForeignKeyConstraint( - ['stack_id'], - ['stack.id'], - ), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('uuid'), - mysql_engine='InnoDB', - ) - op.create_table( - 'resource', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('uuid', sa.String(length=36), nullable=True), - sa.Column('nova_instance', sa.String(length=255), nullable=True), - sa.Column('name', sa.String(length=255), nullable=True), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('action', sa.String(length=255), nullable=True), - sa.Column('status', sa.String(length=255), nullable=True), - sa.Column('status_reason', sa.Text(), nullable=True), - sa.Column('stack_id', sa.String(length=36), nullable=False), - sa.Column( - 'rsrc_metadata', heat.db.sqlalchemy.types.Json(), nullable=True - ), - sa.Column( - 'properties_data', heat.db.sqlalchemy.types.Json(), nullable=True - ), - sa.Column('engine_id', sa.String(length=36), nullable=True), - sa.Column('atomic_key', sa.Integer(), nullable=True), - sa.Column('needed_by', heat.db.sqlalchemy.types.List(), nullable=True), - sa.Column('requires', heat.db.sqlalchemy.types.List(), nullable=True), - sa.Column('replaces', sa.Integer(), nullable=True), - sa.Column('replaced_by', sa.Integer(), nullable=True), - sa.Column('current_template_id', sa.Integer(), nullable=True), - sa.Column('properties_data_encrypted', sa.Boolean(), nullable=True), - sa.Column('root_stack_id', sa.String(length=36), nullable=True), - sa.Column('rsrc_prop_data_id', sa.Integer(), nullable=True), - sa.Column('attr_data_id', sa.Integer(), nullable=True), - sa.ForeignKeyConstraint( - ['attr_data_id'], - ['resource_properties_data.id'], - name='rsrc_attr_data_ref', - ), - sa.ForeignKeyConstraint( - ['current_template_id'], - ['raw_template.id'], - ), - sa.ForeignKeyConstraint( - ['rsrc_prop_data_id'], - ['resource_properties_data.id'], - name='rsrc_rsrc_prop_data_ref', - ), - sa.ForeignKeyConstraint( - ['stack_id'], - ['stack.id'], - ), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('uuid'), - mysql_engine='InnoDB', - ) - op.create_index( - op.f('ix_resource_root_stack_id'), - 'resource', - ['root_stack_id'], - unique=False, - ) - op.create_table( - 'snapshot', - sa.Column('id', sa.String(length=36), nullable=False), - sa.Column('stack_id', sa.String(length=36), nullable=False), - sa.Column('name', sa.String(length=255), nullable=True), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('status', sa.String(length=255), nullable=True), - sa.Column('status_reason', sa.String(length=255), nullable=True), - sa.Column('data', heat.db.sqlalchemy.types.Json(), nullable=True), - sa.Column('tenant', sa.String(length=64), nullable=False), - sa.ForeignKeyConstraint( - ['stack_id'], - ['stack.id'], - ), - sa.PrimaryKeyConstraint('id'), - mysql_engine='InnoDB', - ) - op.create_index( - op.f('ix_snapshot_tenant'), 'snapshot', ['tenant'], unique=False - ) - op.create_table( - 'stack_lock', - sa.Column('stack_id', sa.String(length=36), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('engine_id', sa.String(length=36), nullable=True), - sa.ForeignKeyConstraint( - ['stack_id'], - ['stack.id'], - ), - sa.PrimaryKeyConstraint('stack_id'), - mysql_engine='InnoDB', - ) - op.create_table( - 'stack_tag', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('tag', sa.Unicode(length=80), nullable=True), - sa.Column('stack_id', sa.String(length=36), nullable=False), - sa.ForeignKeyConstraint( - ['stack_id'], - ['stack.id'], - ), - sa.PrimaryKeyConstraint('id'), - mysql_engine='InnoDB', - ) - op.create_table( - 'sync_point', - sa.Column('entity_id', sa.String(length=36), nullable=False), - sa.Column('traversal_id', sa.String(length=36), nullable=False), - sa.Column('is_update', sa.Boolean(), nullable=False), - sa.Column('atomic_key', sa.Integer(), nullable=False), - sa.Column('stack_id', sa.String(length=36), nullable=False), - sa.Column( - 'input_data', heat.db.sqlalchemy.types.Json(), nullable=True - ), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.ForeignKeyConstraint( - ['stack_id'], - ['stack.id'], - ), - sa.PrimaryKeyConstraint('entity_id', 'traversal_id', 'is_update'), - ) - op.create_table( - 'resource_data', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('created_at', sa.DateTime(), nullable=True), - sa.Column('updated_at', sa.DateTime(), nullable=True), - sa.Column('key', sa.String(length=255), nullable=True), - sa.Column('value', sa.Text(), nullable=True), - sa.Column('redact', sa.Boolean(), nullable=True), - sa.Column('decrypt_method', sa.String(length=64), nullable=True), - sa.Column('resource_id', sa.Integer(), nullable=False), - sa.ForeignKeyConstraint( - ['resource_id'], - ['resource.id'], - name='fk_resource_id', - ondelete='CASCADE', - ), - sa.PrimaryKeyConstraint('id'), - mysql_engine='InnoDB', - ) diff --git a/heat/db/sqlalchemy/models.py b/heat/db/sqlalchemy/models.py deleted file mode 100644 index ca208bef0..000000000 --- a/heat/db/sqlalchemy/models.py +++ /dev/null @@ -1,399 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""SQLAlchemy models for heat data.""" - -import uuid - -from oslo_db.sqlalchemy import models -import sqlalchemy -from sqlalchemy.ext import declarative -from sqlalchemy.orm import backref -from sqlalchemy.orm import relationship - -from heat.db.sqlalchemy import types - -BASE = declarative.declarative_base() - - -class HeatBase(models.ModelBase, models.TimestampMixin): - """Base class for Heat Models.""" - __table_args__ = {'mysql_engine': 'InnoDB'} - - -class SoftDelete(object): - deleted_at = sqlalchemy.Column(sqlalchemy.DateTime) - - -class StateAware(object): - action = sqlalchemy.Column('action', sqlalchemy.String(255)) - status = sqlalchemy.Column('status', sqlalchemy.String(255)) - status_reason = sqlalchemy.Column('status_reason', sqlalchemy.Text) - - -class RawTemplate(BASE, HeatBase): - """Represents an unparsed template which should be in JSON format.""" - - __tablename__ = 'raw_template' - id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) - template = sqlalchemy.Column(types.Json) - # legacy column - files = sqlalchemy.Column(types.Json) - # modern column, reference to raw_template_files - files_id = sqlalchemy.Column( - sqlalchemy.Integer(), - sqlalchemy.ForeignKey('raw_template_files.id')) - environment = sqlalchemy.Column('environment', types.Json) - - -class RawTemplateFiles(BASE, HeatBase): - """Where template files json dicts are stored.""" - __tablename__ = 'raw_template_files' - id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) - files = sqlalchemy.Column(types.Json) - - -class StackTag(BASE, HeatBase): - """Key/value store of arbitrary stack tags.""" - - __tablename__ = 'stack_tag' - - id = sqlalchemy.Column('id', - sqlalchemy.Integer, - primary_key=True, - nullable=False) - tag = sqlalchemy.Column('tag', sqlalchemy.Unicode(80)) - stack_id = sqlalchemy.Column('stack_id', - sqlalchemy.String(36), - sqlalchemy.ForeignKey('stack.id'), - nullable=False) - - -class SyncPoint(BASE, HeatBase): - """Represents a syncpoint for a stack that is being worked on.""" - - __tablename__ = 'sync_point' - __table_args__ = ( - sqlalchemy.PrimaryKeyConstraint('entity_id', - 'traversal_id', - 'is_update'), - sqlalchemy.ForeignKeyConstraint(['stack_id'], ['stack.id']) - ) - - entity_id = sqlalchemy.Column(sqlalchemy.String(36)) - traversal_id = sqlalchemy.Column(sqlalchemy.String(36)) - is_update = sqlalchemy.Column(sqlalchemy.Boolean) - # integer field for atomic update operations - atomic_key = sqlalchemy.Column(sqlalchemy.Integer, nullable=False) - stack_id = sqlalchemy.Column(sqlalchemy.String(36), - nullable=False) - input_data = sqlalchemy.Column(types.Json) - - -class Stack(BASE, HeatBase, SoftDelete, StateAware): - """Represents a stack created by the heat engine.""" - - __tablename__ = 'stack' - __table_args__ = ( - sqlalchemy.Index('ix_stack_name', 'name', mysql_length=255), - sqlalchemy.Index('ix_stack_tenant', 'tenant', mysql_length=255), - ) - - id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True, - default=lambda: str(uuid.uuid4())) - name = sqlalchemy.Column(sqlalchemy.String(255)) - raw_template_id = sqlalchemy.Column( - sqlalchemy.Integer, - sqlalchemy.ForeignKey('raw_template.id'), - nullable=False) - raw_template = relationship(RawTemplate, backref=backref('stack'), - foreign_keys=[raw_template_id]) - prev_raw_template_id = sqlalchemy.Column( - 'prev_raw_template_id', - sqlalchemy.Integer, - sqlalchemy.ForeignKey('raw_template.id')) - prev_raw_template = relationship(RawTemplate, - foreign_keys=[prev_raw_template_id]) - username = sqlalchemy.Column(sqlalchemy.String(256)) - tenant = sqlalchemy.Column(sqlalchemy.String(256)) - user_creds_id = sqlalchemy.Column( - sqlalchemy.Integer, - sqlalchemy.ForeignKey('user_creds.id')) - owner_id = sqlalchemy.Column(sqlalchemy.String(36), index=True) - parent_resource_name = sqlalchemy.Column(sqlalchemy.String(255)) - timeout = sqlalchemy.Column(sqlalchemy.Integer) - disable_rollback = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False) - stack_user_project_id = sqlalchemy.Column(sqlalchemy.String(64)) - backup = sqlalchemy.Column('backup', sqlalchemy.Boolean) - nested_depth = sqlalchemy.Column('nested_depth', sqlalchemy.Integer) - convergence = sqlalchemy.Column('convergence', sqlalchemy.Boolean) - tags = relationship(StackTag, cascade="all,delete", - backref=backref('stack')) - current_traversal = sqlalchemy.Column('current_traversal', - sqlalchemy.String(36)) - current_deps = sqlalchemy.Column('current_deps', types.Json) - - # Override timestamp column to store the correct value: it should be the - # time the create/update call was issued, not the time the DB entry is - # created/modified. (bug #1193269) - updated_at = sqlalchemy.Column(sqlalchemy.DateTime) - - -class StackLock(BASE, HeatBase): - """Store stack locks for deployments with multiple-engines.""" - - __tablename__ = 'stack_lock' - - stack_id = sqlalchemy.Column(sqlalchemy.String(36), - sqlalchemy.ForeignKey('stack.id'), - primary_key=True) - engine_id = sqlalchemy.Column(sqlalchemy.String(36)) - - -class UserCreds(BASE, HeatBase): - """Represents user credentials. - - Also, mirrors the 'context' handed in by wsgi. - """ - - __tablename__ = 'user_creds' - - id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) - username = sqlalchemy.Column(sqlalchemy.String(255)) - password = sqlalchemy.Column(sqlalchemy.String(255)) - region_name = sqlalchemy.Column(sqlalchemy.String(255)) - decrypt_method = sqlalchemy.Column(sqlalchemy.String(64)) - tenant = sqlalchemy.Column(sqlalchemy.String(1024)) - auth_url = sqlalchemy.Column(sqlalchemy.Text) - tenant_id = sqlalchemy.Column(sqlalchemy.String(256)) - trust_id = sqlalchemy.Column(sqlalchemy.String(255)) - trustor_user_id = sqlalchemy.Column(sqlalchemy.String(64)) - stack = relationship(Stack, backref=backref('user_creds'), - cascade_backrefs=False) - - -class ResourcePropertiesData(BASE, HeatBase): - """Represents resource properties data, current or older""" - - __tablename__ = 'resource_properties_data' - - id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) - data = sqlalchemy.Column('data', types.Json) - encrypted = sqlalchemy.Column('encrypted', sqlalchemy.Boolean) - - -class Event(BASE, HeatBase): - """Represents an event generated by the heat engine.""" - - __tablename__ = 'event' - - id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) - stack_id = sqlalchemy.Column(sqlalchemy.String(36), - sqlalchemy.ForeignKey('stack.id'), - nullable=False) - stack = relationship(Stack, backref=backref('events')) - - uuid = sqlalchemy.Column(sqlalchemy.String(36), - default=lambda: str(uuid.uuid4()), - unique=True) - resource_action = sqlalchemy.Column(sqlalchemy.String(255)) - resource_status = sqlalchemy.Column(sqlalchemy.String(255)) - resource_name = sqlalchemy.Column(sqlalchemy.String(255)) - physical_resource_id = sqlalchemy.Column(sqlalchemy.String(255)) - _resource_status_reason = sqlalchemy.Column( - 'resource_status_reason', sqlalchemy.String(255)) - resource_type = sqlalchemy.Column(sqlalchemy.String(255)) - rsrc_prop_data_id = sqlalchemy.Column(sqlalchemy.Integer, - sqlalchemy.ForeignKey( - 'resource_properties_data.id')) - rsrc_prop_data = relationship(ResourcePropertiesData, - backref=backref('event')) - resource_properties = sqlalchemy.Column(sqlalchemy.PickleType) - - @property - def resource_status_reason(self): - return self._resource_status_reason - - @resource_status_reason.setter - def resource_status_reason(self, reason): - self._resource_status_reason = reason and reason[:255] or '' - - -class ResourceData(BASE, HeatBase): - """Key/value store of arbitrary, resource-specific data.""" - - __tablename__ = 'resource_data' - - id = sqlalchemy.Column('id', - sqlalchemy.Integer, - primary_key=True, - nullable=False) - key = sqlalchemy.Column('key', sqlalchemy.String(255)) - value = sqlalchemy.Column('value', sqlalchemy.Text) - redact = sqlalchemy.Column('redact', sqlalchemy.Boolean) - decrypt_method = sqlalchemy.Column(sqlalchemy.String(64)) - resource_id = sqlalchemy.Column( - 'resource_id', sqlalchemy.Integer, - sqlalchemy.ForeignKey(column='resource.id', name='fk_resource_id', - ondelete='CASCADE'), - nullable=False) - - -class Resource(BASE, HeatBase, StateAware): - """Represents a resource created by the heat engine.""" - - __tablename__ = 'resource' - - id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) - uuid = sqlalchemy.Column(sqlalchemy.String(36), - default=lambda: str(uuid.uuid4()), - unique=True) - name = sqlalchemy.Column('name', sqlalchemy.String(255)) - physical_resource_id = sqlalchemy.Column('nova_instance', - sqlalchemy.String(255)) - # odd name as "metadata" is reserved - rsrc_metadata = sqlalchemy.Column('rsrc_metadata', types.Json) - - stack_id = sqlalchemy.Column(sqlalchemy.String(36), - sqlalchemy.ForeignKey('stack.id'), - nullable=False) - stack = relationship(Stack, backref=backref('resources')) - root_stack_id = sqlalchemy.Column(sqlalchemy.String(36), index=True) - data = relationship(ResourceData, - cascade="all", - passive_deletes=True, - backref=backref('resource')) - rsrc_prop_data_id = sqlalchemy.Column(sqlalchemy.Integer, - sqlalchemy.ForeignKey( - 'resource_properties_data.id')) - rsrc_prop_data = relationship(ResourcePropertiesData, - foreign_keys=[rsrc_prop_data_id]) - attr_data_id = sqlalchemy.Column(sqlalchemy.Integer, - sqlalchemy.ForeignKey( - 'resource_properties_data.id')) - attr_data = relationship(ResourcePropertiesData, - foreign_keys=[attr_data_id]) - - # Override timestamp column to store the correct value: it should be the - # time the create/update call was issued, not the time the DB entry is - # created/modified. (bug #1193269) - updated_at = sqlalchemy.Column(sqlalchemy.DateTime) - properties_data = sqlalchemy.Column('properties_data', types.Json) - properties_data_encrypted = sqlalchemy.Column('properties_data_encrypted', - sqlalchemy.Boolean) - engine_id = sqlalchemy.Column(sqlalchemy.String(36)) - atomic_key = sqlalchemy.Column(sqlalchemy.Integer) - - needed_by = sqlalchemy.Column('needed_by', types.List) - requires = sqlalchemy.Column('requires', types.List) - replaces = sqlalchemy.Column('replaces', sqlalchemy.Integer, - default=None) - replaced_by = sqlalchemy.Column('replaced_by', sqlalchemy.Integer, - default=None) - current_template_id = sqlalchemy.Column( - 'current_template_id', - sqlalchemy.Integer, - sqlalchemy.ForeignKey('raw_template.id')) - - -class SoftwareConfig(BASE, HeatBase): - """Represents a software configuration resource. - - Represents a software configuration resource to be applied to one or more - servers. - """ - - __tablename__ = 'software_config' - - id = sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True, - default=lambda: str(uuid.uuid4())) - name = sqlalchemy.Column('name', sqlalchemy.String(255)) - group = sqlalchemy.Column('group', sqlalchemy.String(255)) - config = sqlalchemy.Column('config', types.Json) - tenant = sqlalchemy.Column( - 'tenant', sqlalchemy.String(64), nullable=False, index=True) - - -class SoftwareDeployment(BASE, HeatBase, StateAware): - """Represents a software deployment resource. - - Represents applying a software configuration resource to a single server - resource. - """ - - __tablename__ = 'software_deployment' - __table_args__ = ( - sqlalchemy.Index('ix_software_deployment_created_at', 'created_at'),) - - id = sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True, - default=lambda: str(uuid.uuid4())) - config_id = sqlalchemy.Column( - 'config_id', - sqlalchemy.String(36), - sqlalchemy.ForeignKey('software_config.id'), - nullable=False) - config = relationship(SoftwareConfig, backref=backref('deployments')) - server_id = sqlalchemy.Column('server_id', sqlalchemy.String(36), - nullable=False, index=True) - input_values = sqlalchemy.Column('input_values', types.Json) - output_values = sqlalchemy.Column('output_values', types.Json) - tenant = sqlalchemy.Column( - 'tenant', sqlalchemy.String(64), nullable=False, index=True) - stack_user_project_id = sqlalchemy.Column(sqlalchemy.String(64)) - updated_at = sqlalchemy.Column(sqlalchemy.DateTime) - - -class Snapshot(BASE, HeatBase): - - __tablename__ = 'snapshot' - - id = sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True, - default=lambda: str(uuid.uuid4())) - stack_id = sqlalchemy.Column(sqlalchemy.String(36), - sqlalchemy.ForeignKey('stack.id'), - nullable=False) - name = sqlalchemy.Column('name', sqlalchemy.String(255)) - data = sqlalchemy.Column('data', types.Json) - tenant = sqlalchemy.Column( - 'tenant', sqlalchemy.String(64), nullable=False, index=True) - status = sqlalchemy.Column('status', sqlalchemy.String(255)) - status_reason = sqlalchemy.Column('status_reason', sqlalchemy.String(255)) - stack = relationship(Stack, backref=backref('snapshot')) - - -class Service(BASE, HeatBase, SoftDelete): - - __tablename__ = 'service' - - id = sqlalchemy.Column('id', - sqlalchemy.String(36), - primary_key=True, - default=lambda: str(uuid.uuid4())) - engine_id = sqlalchemy.Column('engine_id', - sqlalchemy.String(36), - nullable=False) - host = sqlalchemy.Column('host', - sqlalchemy.String(255), - nullable=False) - hostname = sqlalchemy.Column('hostname', - sqlalchemy.String(255), - nullable=False) - binary = sqlalchemy.Column('binary', - sqlalchemy.String(255), - nullable=False) - topic = sqlalchemy.Column('topic', - sqlalchemy.String(255), - nullable=False) - report_interval = sqlalchemy.Column('report_interval', - sqlalchemy.Integer, - nullable=False) diff --git a/heat/db/sqlalchemy/types.py b/heat/db/sqlalchemy/types.py deleted file mode 100644 index d454024c6..000000000 --- a/heat/db/sqlalchemy/types.py +++ /dev/null @@ -1,65 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_serialization import jsonutils -from sqlalchemy.dialects import mysql -from sqlalchemy import types - - -dumps = jsonutils.dumps -loads = jsonutils.loads - - -class LongText(types.TypeDecorator): - - impl = types.Text - cache_ok = True - - def load_dialect_impl(self, dialect): - if dialect.name == 'mysql': - return dialect.type_descriptor(mysql.LONGTEXT()) - else: - return self.impl - - -class Json(LongText): - - cache_ok = True - - def process_bind_param(self, value, dialect): - return dumps(value) - - def process_result_value(self, value, dialect): - if value is None: - return None - return loads(value) - - -class List(types.TypeDecorator): - - impl = types.Text - cache_ok = True - - def load_dialect_impl(self, dialect): - if dialect.name == 'mysql': - return dialect.type_descriptor(mysql.LONGTEXT()) - else: - return self.impl - - def process_bind_param(self, value, dialect): - return dumps(value) - - def process_result_value(self, value, dialect): - if value is None: - return None - return loads(value) diff --git a/heat/db/sqlalchemy/utils.py b/heat/db/sqlalchemy/utils.py deleted file mode 100644 index 67e2d541e..000000000 --- a/heat/db/sqlalchemy/utils.py +++ /dev/null @@ -1,25 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# SQLAlchemy helper functions - -from sqlalchemy.orm import exc -import tenacity - - -def retry_on_stale_data_error(func): - wrapper = tenacity.retry( - stop=tenacity.stop_after_attempt(3), - retry=tenacity.retry_if_exception_type(exc.StaleDataError), - reraise=True) - return wrapper(func) diff --git a/heat/db/types.py b/heat/db/types.py new file mode 100644 index 000000000..d454024c6 --- /dev/null +++ b/heat/db/types.py @@ -0,0 +1,65 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_serialization import jsonutils +from sqlalchemy.dialects import mysql +from sqlalchemy import types + + +dumps = jsonutils.dumps +loads = jsonutils.loads + + +class LongText(types.TypeDecorator): + + impl = types.Text + cache_ok = True + + def load_dialect_impl(self, dialect): + if dialect.name == 'mysql': + return dialect.type_descriptor(mysql.LONGTEXT()) + else: + return self.impl + + +class Json(LongText): + + cache_ok = True + + def process_bind_param(self, value, dialect): + return dumps(value) + + def process_result_value(self, value, dialect): + if value is None: + return None + return loads(value) + + +class List(types.TypeDecorator): + + impl = types.Text + cache_ok = True + + def load_dialect_impl(self, dialect): + if dialect.name == 'mysql': + return dialect.type_descriptor(mysql.LONGTEXT()) + else: + return self.impl + + def process_bind_param(self, value, dialect): + return dumps(value) + + def process_result_value(self, value, dialect): + if value is None: + return None + return loads(value) diff --git a/heat/db/utils.py b/heat/db/utils.py new file mode 100644 index 000000000..67e2d541e --- /dev/null +++ b/heat/db/utils.py @@ -0,0 +1,25 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# SQLAlchemy helper functions + +from sqlalchemy.orm import exc +import tenacity + + +def retry_on_stale_data_error(func): + wrapper = tenacity.retry( + stop=tenacity.stop_after_attempt(3), + retry=tenacity.retry_if_exception_type(exc.StaleDataError), + reraise=True) + return wrapper(func) diff --git a/heat/engine/service_software_config.py b/heat/engine/service_software_config.py index 637d7874e..195397101 100644 --- a/heat/engine/service_software_config.py +++ b/heat/engine/service_software_config.py @@ -23,7 +23,7 @@ from urllib import parse from heat.common import crypt from heat.common import exception from heat.common.i18n import _ -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.engine import api from heat.engine import resource from heat.engine import scheduler diff --git a/heat/engine/template_files.py b/heat/engine/template_files.py index 55a54e0d6..bcbe68b07 100644 --- a/heat/engine/template_files.py +++ b/heat/engine/template_files.py @@ -17,7 +17,7 @@ import weakref from heat.common import context from heat.common import exception from heat.common.i18n import _ -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import raw_template_files _d = weakref.WeakValueDictionary() diff --git a/heat/engine/worker.py b/heat/engine/worker.py index 28e2424e0..c2163ff59 100644 --- a/heat/engine/worker.py +++ b/heat/engine/worker.py @@ -24,7 +24,7 @@ from osprofiler import profiler from heat.common import context from heat.common import messaging as rpc_messaging -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.engine import check_resource from heat.engine import node_data from heat.engine import stack as parser diff --git a/heat/objects/event.py b/heat/objects/event.py index f51a6ac8b..c9a356e97 100644 --- a/heat/objects/event.py +++ b/heat/objects/event.py @@ -19,7 +19,7 @@ from oslo_versionedobjects import base from oslo_versionedobjects import fields from heat.common import identifier -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base from heat.objects import resource_properties_data as rpd diff --git a/heat/objects/raw_template.py b/heat/objects/raw_template.py index 3a1ba35db..616411247 100644 --- a/heat/objects/raw_template.py +++ b/heat/objects/raw_template.py @@ -24,7 +24,7 @@ from oslo_versionedobjects import fields from heat.common import crypt from heat.common import environment_format as env_fmt -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base from heat.objects import fields as heat_fields diff --git a/heat/objects/raw_template_files.py b/heat/objects/raw_template_files.py index 1afed6e7e..36d6cde91 100644 --- a/heat/objects/raw_template_files.py +++ b/heat/objects/raw_template_files.py @@ -16,7 +16,7 @@ from oslo_versionedobjects import base from oslo_versionedobjects import fields -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base from heat.objects import fields as heat_fields diff --git a/heat/objects/resource.py b/heat/objects/resource.py index c5aacd4fc..590ea5a59 100644 --- a/heat/objects/resource.py +++ b/heat/objects/resource.py @@ -26,7 +26,7 @@ import tenacity from heat.common import crypt from heat.common import exception from heat.common.i18n import _ -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base from heat.objects import fields as heat_fields from heat.objects import resource_data diff --git a/heat/objects/resource_data.py b/heat/objects/resource_data.py index 4b6419bcc..7602bc040 100644 --- a/heat/objects/resource_data.py +++ b/heat/objects/resource_data.py @@ -19,7 +19,7 @@ from oslo_versionedobjects import base from oslo_versionedobjects import fields from heat.common import exception -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base diff --git a/heat/objects/resource_properties_data.py b/heat/objects/resource_properties_data.py index c1c313c25..4b6a18dad 100644 --- a/heat/objects/resource_properties_data.py +++ b/heat/objects/resource_properties_data.py @@ -19,7 +19,7 @@ from oslo_versionedobjects import base from oslo_versionedobjects import fields from heat.common import crypt -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import fields as heat_fields diff --git a/heat/objects/service.py b/heat/objects/service.py index 416e9081c..121af6145 100644 --- a/heat/objects/service.py +++ b/heat/objects/service.py @@ -18,7 +18,7 @@ from oslo_versionedobjects import base from oslo_versionedobjects import fields from heat.common import service_utils -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base diff --git a/heat/objects/snapshot.py b/heat/objects/snapshot.py index 95c758d05..23d569a40 100644 --- a/heat/objects/snapshot.py +++ b/heat/objects/snapshot.py @@ -18,7 +18,7 @@ from oslo_versionedobjects import base from oslo_versionedobjects import fields -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base from heat.objects import fields as heat_fields diff --git a/heat/objects/software_config.py b/heat/objects/software_config.py index 02ac8a908..e0d9d42c7 100644 --- a/heat/objects/software_config.py +++ b/heat/objects/software_config.py @@ -12,13 +12,12 @@ # License for the specific language governing permissions and limitations # under the License. - """SoftwareConfig object.""" from oslo_versionedobjects import base from oslo_versionedobjects import fields -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base from heat.objects import fields as heat_fields diff --git a/heat/objects/software_deployment.py b/heat/objects/software_deployment.py index 2ff47ad93..564a48465 100644 --- a/heat/objects/software_deployment.py +++ b/heat/objects/software_deployment.py @@ -18,7 +18,7 @@ from oslo_versionedobjects import base from oslo_versionedobjects import fields -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base from heat.objects import fields as heat_fields from heat.objects import software_config diff --git a/heat/objects/stack.py b/heat/objects/stack.py index 349aaada8..a5c781d16 100644 --- a/heat/objects/stack.py +++ b/heat/objects/stack.py @@ -22,7 +22,7 @@ from oslo_versionedobjects import fields from heat.common import exception from heat.common.i18n import _ from heat.common import identifier -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base from heat.objects import fields as heat_fields from heat.objects import raw_template diff --git a/heat/objects/stack_lock.py b/heat/objects/stack_lock.py index 1a00c40e9..9d699ee00 100644 --- a/heat/objects/stack_lock.py +++ b/heat/objects/stack_lock.py @@ -11,13 +11,12 @@ # License for the specific language governing permissions and limitations # under the License. - """StackLock object.""" from oslo_versionedobjects import base from oslo_versionedobjects import fields -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base diff --git a/heat/objects/stack_tag.py b/heat/objects/stack_tag.py index 3a75fffbf..50d86deda 100644 --- a/heat/objects/stack_tag.py +++ b/heat/objects/stack_tag.py @@ -17,7 +17,7 @@ from oslo_versionedobjects import base from oslo_versionedobjects import fields -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base diff --git a/heat/objects/sync_point.py b/heat/objects/sync_point.py index 264d323b8..750a551b1 100644 --- a/heat/objects/sync_point.py +++ b/heat/objects/sync_point.py @@ -11,14 +11,12 @@ # License for the specific language governing permissions and limitations # under the License. - """SyncPoint object.""" - from oslo_versionedobjects import base from oslo_versionedobjects import fields -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base from heat.objects import fields as heat_fields diff --git a/heat/objects/user_creds.py b/heat/objects/user_creds.py index abfa3610a..d482ee229 100644 --- a/heat/objects/user_creds.py +++ b/heat/objects/user_creds.py @@ -18,7 +18,7 @@ from oslo_versionedobjects import base from oslo_versionedobjects import fields -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.objects import base as heat_base diff --git a/heat/tests/convergence/framework/engine_wrapper.py b/heat/tests/convergence/framework/engine_wrapper.py index b1dc5e6b4..c1fb47c01 100644 --- a/heat/tests/convergence/framework/engine_wrapper.py +++ b/heat/tests/convergence/framework/engine_wrapper.py @@ -11,7 +11,7 @@ # License for the specific language governing permissions and limitations # under the License. -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.engine import service from heat.engine import stack from heat.tests.convergence.framework import message_processor diff --git a/heat/tests/convergence/framework/reality.py b/heat/tests/convergence/framework/reality.py index 055782b38..0618c758d 100644 --- a/heat/tests/convergence/framework/reality.py +++ b/heat/tests/convergence/framework/reality.py @@ -14,7 +14,7 @@ from unittest import mock from heat.common import exception -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.tests import utils diff --git a/heat/tests/db/test_migrations.py b/heat/tests/db/test_migrations.py index b627e963e..ffad49f2e 100644 --- a/heat/tests/db/test_migrations.py +++ b/heat/tests/db/test_migrations.py @@ -22,8 +22,8 @@ from oslotest import base as test_base import sqlalchemy import testtools -from heat.db.sqlalchemy import migration -from heat.db.sqlalchemy import models +from heat.db import migration +from heat.db import models class DBNotAllowed(Exception): diff --git a/heat/tests/db/test_sqlalchemy_api.py b/heat/tests/db/test_sqlalchemy_api.py index 07a1c2670..a4f3309f0 100644 --- a/heat/tests/db/test_sqlalchemy_api.py +++ b/heat/tests/db/test_sqlalchemy_api.py @@ -30,8 +30,8 @@ from heat.common import context from heat.common import exception from heat.common import short_id from heat.common import template_format -from heat.db.sqlalchemy import api as db_api -from heat.db.sqlalchemy import models +from heat.db import api as db_api +from heat.db import models from heat.engine.clients.os import glance from heat.engine.clients.os import nova from heat.engine import environment @@ -2308,7 +2308,7 @@ class DBAPIStackTest(common.HeatTestCase): create_stack(self.ctx, self.template, self.user_creds, deleted_at=deleted) - with mock.patch('heat.db.sqlalchemy.api._purge_stacks') as mock_ps: + with mock.patch('heat.db.api._purge_stacks') as mock_ps: db_api.purge_deleted(age=0, batch_size=2) self.assertEqual(4, mock_ps.call_count) diff --git a/heat/tests/db/test_sqlalchemy_filters.py b/heat/tests/db/test_sqlalchemy_filters.py index e3424aee7..6e8f2d312 100644 --- a/heat/tests/db/test_sqlalchemy_filters.py +++ b/heat/tests/db/test_sqlalchemy_filters.py @@ -13,7 +13,7 @@ from unittest import mock -from heat.db.sqlalchemy import filters as db_filters +from heat.db import filters as db_filters from heat.tests import common diff --git a/heat/tests/db/test_sqlalchemy_types.py b/heat/tests/db/test_sqlalchemy_types.py index 47b460a55..051962d18 100644 --- a/heat/tests/db/test_sqlalchemy_types.py +++ b/heat/tests/db/test_sqlalchemy_types.py @@ -15,7 +15,7 @@ from sqlalchemy.dialects.mysql import base as mysql_base from sqlalchemy.dialects.sqlite import base as sqlite_base from sqlalchemy import types -from heat.db.sqlalchemy import types as db_types +from heat.db import types as db_types from heat.tests import common diff --git a/heat/tests/engine/service/test_software_config.py b/heat/tests/engine/service/test_software_config.py index ecd7bef62..9aba366a5 100644 --- a/heat/tests/engine/service/test_software_config.py +++ b/heat/tests/engine/service/test_software_config.py @@ -22,7 +22,7 @@ from oslo_utils import timeutils from heat.common import crypt from heat.common import exception from heat.common import template_format -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.engine.clients.os import swift from heat.engine.clients.os import zaqar from heat.engine import service diff --git a/heat/tests/engine/service/test_stack_update.py b/heat/tests/engine/service/test_stack_update.py index f1df832af..0712e888f 100644 --- a/heat/tests/engine/service/test_stack_update.py +++ b/heat/tests/engine/service/test_stack_update.py @@ -24,7 +24,7 @@ from heat.common import exception from heat.common import messaging from heat.common import service_utils from heat.common import template_format -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.engine.clients.os import glance from heat.engine.clients.os import nova from heat.engine.clients.os import swift diff --git a/heat/tests/engine/test_engine_worker.py b/heat/tests/engine/test_engine_worker.py index 0eed84fb0..de873b2aa 100644 --- a/heat/tests/engine/test_engine_worker.py +++ b/heat/tests/engine/test_engine_worker.py @@ -15,7 +15,7 @@ from unittest import mock -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.engine import check_resource from heat.engine import stack as parser from heat.engine import template as templatem diff --git a/heat/tests/test_common_service_utils.py b/heat/tests/test_common_service_utils.py index 0745e8f85..2b363607c 100644 --- a/heat/tests/test_common_service_utils.py +++ b/heat/tests/test_common_service_utils.py @@ -18,7 +18,7 @@ from oslo_utils import timeutils import uuid from heat.common import service_utils -from heat.db.sqlalchemy import models +from heat.db import models from heat.tests import common diff --git a/heat/tests/test_engine_api_utils.py b/heat/tests/test_engine_api_utils.py index 8703fcdda..6cee13b03 100644 --- a/heat/tests/test_engine_api_utils.py +++ b/heat/tests/test_engine_api_utils.py @@ -21,7 +21,7 @@ from oslo_utils import timeutils from heat.common import exception from heat.common import template_format from heat.common import timeutils as heat_timeutils -from heat.db.sqlalchemy import models +from heat.db import models from heat.engine import api from heat.engine.cfn import parameters as cfn_param from heat.engine import event diff --git a/heat/tests/test_event.py b/heat/tests/test_event.py index 8d9a2521b..0efdf9caf 100644 --- a/heat/tests/test_event.py +++ b/heat/tests/test_event.py @@ -16,7 +16,7 @@ from unittest import mock from oslo_config import cfg import uuid -from heat.db.sqlalchemy import models +from heat.db import models from heat.engine import event from heat.engine import stack from heat.engine import template diff --git a/heat/tests/test_resource.py b/heat/tests/test_resource.py index b4f8f05c0..e3d0c97ef 100644 --- a/heat/tests/test_resource.py +++ b/heat/tests/test_resource.py @@ -27,8 +27,8 @@ from heat.common import exception from heat.common.i18n import _ from heat.common import short_id from heat.common import timeutils -from heat.db.sqlalchemy import api as db_api -from heat.db.sqlalchemy import models +from heat.db import api as db_api +from heat.db import models from heat.engine import attributes from heat.engine.cfn import functions as cfn_funcs from heat.engine import clients diff --git a/heat/tests/test_resource_properties_data.py b/heat/tests/test_resource_properties_data.py index 2ef5374cf..bcf677564 100644 --- a/heat/tests/test_resource_properties_data.py +++ b/heat/tests/test_resource_properties_data.py @@ -13,7 +13,7 @@ from oslo_config import cfg -from heat.db.sqlalchemy import models +from heat.db import models from heat.objects import resource_properties_data as rpd_object from heat.tests import common from heat.tests import utils diff --git a/heat/tests/test_signal.py b/heat/tests/test_signal.py index bb0b8313b..b2d754abb 100644 --- a/heat/tests/test_signal.py +++ b/heat/tests/test_signal.py @@ -20,7 +20,7 @@ from oslo_utils import timeutils from heat.common import exception from heat.common import template_format -from heat.db.sqlalchemy import models +from heat.db import models from heat.engine.clients.os import heat_plugin from heat.engine.clients.os.keystone import fake_keystoneclient as fake_ks from heat.engine.clients.os import swift diff --git a/heat/tests/test_stack.py b/heat/tests/test_stack.py index 35374190c..278df8e23 100644 --- a/heat/tests/test_stack.py +++ b/heat/tests/test_stack.py @@ -27,7 +27,7 @@ from heat.common import context from heat.common import exception from heat.common import template_format from heat.common import timeutils -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.engine.clients.os import keystone from heat.engine.clients.os.keystone import fake_keystoneclient as fake_ks from heat.engine.clients.os import nova diff --git a/heat/tests/test_stack_update.py b/heat/tests/test_stack_update.py index a466cef4f..b441f3ebf 100644 --- a/heat/tests/test_stack_update.py +++ b/heat/tests/test_stack_update.py @@ -17,7 +17,7 @@ from unittest import mock from heat.common import exception from heat.common import template_format -from heat.db.sqlalchemy import api as db_api +from heat.db import api as db_api from heat.engine.clients.os.keystone import fake_keystoneclient from heat.engine import environment from heat.engine import resource diff --git a/heat/tests/utils.py b/heat/tests/utils.py index ea9aa6329..44c81c294 100644 --- a/heat/tests/utils.py +++ b/heat/tests/utils.py @@ -24,8 +24,8 @@ import sqlalchemy from sqlalchemy import exc as sqla_exc from heat.common import context -from heat.db.sqlalchemy import api as db_api -from heat.db.sqlalchemy import models +from heat.db import api as db_api +from heat.db import models from heat.engine import environment from heat.engine import node_data from heat.engine import resource -- cgit v1.2.1