diff options
Diffstat (limited to 'nova/db/main/api.py')
-rw-r--r-- | nova/db/main/api.py | 368 |
1 files changed, 242 insertions, 126 deletions
diff --git a/nova/db/main/api.py b/nova/db/main/api.py index 4c40be905e..7d24f974f9 100644 --- a/nova/db/main/api.py +++ b/nova/db/main/api.py @@ -79,11 +79,28 @@ def _context_manager_from_context(context): pass -def _joinedload_all(column): +def _joinedload_all(lead_entity, column): + """Do a nested load. + + For example, resolve the following:: + + _joinedload_all(models.SecurityGroup, 'instances.info_cache') + + to: + + orm.joinedload( + models.SecurityGroup.instances + ).joinedload( + Instance.info_cache + ) + """ elements = column.split('.') - joined = orm.joinedload(elements.pop(0)) + relationship_attr = getattr(lead_entity, elements.pop(0)) + joined = orm.joinedload(relationship_attr) for element in elements: - joined = joined.joinedload(element) + relationship_entity = relationship_attr.entity.class_ + relationship_attr = getattr(relationship_entity, element) + joined = joined.joinedload(relationship_attr) return joined @@ -562,7 +579,7 @@ def _compute_node_select(context, filters=None, limit=None, marker=None): if filters is None: filters = {} - cn_tbl = sa.alias(models.ComputeNode.__table__, name='cn') + cn_tbl = models.ComputeNode.__table__.alias('cn') select = sa.select(cn_tbl) if context.read_deleted == "no": @@ -595,9 +612,9 @@ def _compute_node_select(context, filters=None, limit=None, marker=None): def _compute_node_fetchall(context, filters=None, limit=None, marker=None): select = _compute_node_select(context, filters, limit=limit, marker=marker) engine = get_engine(context=context) - conn = engine.connect() - results = conn.execute(select).fetchall() + with engine.connect() as conn, conn.begin(): + results = conn.execute(select).fetchall() # Callers expect dict-like objects, not SQLAlchemy RowProxy objects... results = [dict(r._mapping) for r in results] @@ -909,7 +926,7 @@ def compute_node_statistics(context): engine = get_engine(context=context) services_tbl = models.Service.__table__ - inner_sel = sa.alias(_compute_node_select(context), name='inner_sel') + inner_sel = _compute_node_select(context).alias('inner_sel') # TODO(sbauza): Remove the service_id filter in a later release # once we are sure that all compute nodes report the host field @@ -966,9 +983,9 @@ def compute_node_statistics(context): ).label('disk_available_least'), ] select = sql.select(*agg_cols).select_from(j) - conn = engine.connect() - results = conn.execute(select).fetchone() + with engine.connect() as conn, conn.begin(): + results = conn.execute(select).fetchone() # Build a dict of the info--making no assumptions about result fields = ('count', 'vcpus', 'memory_mb', 'local_gb', 'vcpus_used', @@ -976,7 +993,6 @@ def compute_node_statistics(context): 'current_workload', 'running_vms', 'disk_available_least') results = {field: int(results[idx] or 0) for idx, field in enumerate(fields)} - conn.close() return results @@ -1381,9 +1397,9 @@ def instance_get_by_uuid(context, uuid, columns_to_join=None): def _instance_get_by_uuid(context, uuid, columns_to_join=None): - result = _build_instance_get(context, columns_to_join=columns_to_join).\ - filter_by(uuid=uuid).\ - first() + result = _build_instance_get( + context, columns_to_join=columns_to_join + ).filter_by(uuid=uuid).first() if not result: raise exception.InstanceNotFound(instance_id=uuid) @@ -1411,9 +1427,13 @@ def instance_get(context, instance_id, columns_to_join=None): def _build_instance_get(context, columns_to_join=None): - query = model_query(context, models.Instance, project_only=True).\ - options(_joinedload_all('security_groups.rules')).\ - options(orm.joinedload('info_cache')) + query = model_query( + context, models.Instance, project_only=True, + ).options( + orm.joinedload( + models.Instance.security_groups + ).joinedload(models.SecurityGroup.rules) + ).options(orm.joinedload(models.Instance.info_cache)) if columns_to_join is None: columns_to_join = ['metadata', 'system_metadata'] for column in columns_to_join: @@ -1421,7 +1441,10 @@ def _build_instance_get(context, columns_to_join=None): # Already always joined above continue if 'extra.' in column: - query = query.options(orm.undefer(column)) + column_ref = getattr(models.InstanceExtra, column.split('.')[1]) + query = query.options( + orm.joinedload(models.Instance.extra).undefer(column_ref) + ) elif column in ['metadata', 'system_metadata']: # NOTE(melwitt): We use subqueryload() instead of joinedload() for # metadata and system_metadata because of the one-to-many @@ -1431,13 +1454,16 @@ def _build_instance_get(context, columns_to_join=None): # in a large data transfer. Instead, the subqueryload() will # perform additional queries to obtain metadata and system_metadata # for the instance. - query = query.options(orm.subqueryload(column)) + column_ref = getattr(models.Instance, column) + query = query.options(orm.subqueryload(column_ref)) else: - query = query.options(orm.joinedload(column)) + column_ref = getattr(models.Instance, column) + query = query.options(orm.joinedload(column_ref)) # NOTE(alaski) Stop lazy loading of columns not needed. - for col in ['metadata', 'system_metadata']: - if col not in columns_to_join: - query = query.options(orm.noload(col)) + for column in ['metadata', 'system_metadata']: + if column not in columns_to_join: + column_ref = getattr(models.Instance, column) + query = query.options(orm.noload(column_ref)) # NOTE(melwitt): We need to use order_by(<unique column>) so that the # additional queries emitted by subqueryload() include the same ordering as # used by the parent query. @@ -1530,7 +1556,8 @@ def instance_get_all(context, columns_to_join=None): _manual_join_columns(columns_to_join)) query = model_query(context, models.Instance) for column in columns_to_join_new: - query = query.options(orm.joinedload(column)) + column_ref = getattr(models.Instance, column) + query = query.options(orm.joinedload(column_ref)) if not context.is_admin: # If we're not admin context, add appropriate filter.. if context.project_id: @@ -1671,9 +1698,13 @@ def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None, query_prefix = context.session.query(models.Instance) for column in columns_to_join_new: if 'extra.' in column: - query_prefix = query_prefix.options(orm.undefer(column)) + column_ref = getattr(models.InstanceExtra, column.split('.')[1]) + query_prefix = query_prefix.options( + orm.joinedload(models.Instance.extra).undefer(column_ref) + ) else: - query_prefix = query_prefix.options(orm.joinedload(column)) + column_ref = getattr(models.Instance, column) + query_prefix = query_prefix.options(orm.joinedload(column_ref)) # Note: order_by is done in the sqlalchemy.utils.py paginate_query(), # no need to do it here as well @@ -1683,9 +1714,9 @@ def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None, filters = copy.deepcopy(filters) model_object = models.Instance - query_prefix = _get_query_nova_resource_by_changes_time(query_prefix, - filters, - model_object) + query_prefix = _get_query_nova_resource_by_changes_time( + query_prefix, filters, model_object, + ) if 'deleted' in filters: # Instances can be soft or hard deleted and the query needs to @@ -1697,14 +1728,12 @@ def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None, models.Instance.deleted == models.Instance.id, models.Instance.vm_state == vm_states.SOFT_DELETED ) - query_prefix = query_prefix.\ - filter(delete) + query_prefix = query_prefix.filter(delete) else: query_prefix = query_prefix.\ filter(models.Instance.deleted == models.Instance.id) else: - query_prefix = query_prefix.\ - filter_by(deleted=0) + query_prefix = query_prefix.filter_by(deleted=0) if not filters.pop('soft_deleted', False): # It would be better to have vm_state not be nullable # but until then we test it explicitly as a workaround. @@ -1794,19 +1823,25 @@ def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None, if marker is not None: try: marker = _instance_get_by_uuid( - context.elevated(read_deleted='yes'), marker) + context.elevated(read_deleted='yes'), marker, + ) except exception.InstanceNotFound: raise exception.MarkerNotFound(marker=marker) try: - query_prefix = sqlalchemyutils.paginate_query(query_prefix, - models.Instance, limit, - sort_keys, - marker=marker, - sort_dirs=sort_dirs) + query_prefix = sqlalchemyutils.paginate_query( + query_prefix, + models.Instance, + limit, + sort_keys, + marker=marker, + sort_dirs=sort_dirs, + ) except db_exc.InvalidSortKey: raise exception.InvalidSortKey() - return _instances_fill_metadata(context, query_prefix.all(), manual_joins) + instances = query_prefix.all() + + return _instances_fill_metadata(context, instances, manual_joins) @require_context @@ -2059,9 +2094,13 @@ def instance_get_active_by_window_joined(context, begin, end=None, for column in columns_to_join_new: if 'extra.' in column: - query = query.options(orm.undefer(column)) + column_ref = getattr(models.InstanceExtra, column.split('.')[1]) + query = query.options( + orm.joinedload(models.Instance.extra).undefer(column_ref) + ) else: - query = query.options(orm.joinedload(column)) + column_ref = getattr(models.Instance, column) + query = query.options(orm.joinedload(column_ref)) query = query.filter(sql.or_( models.Instance.terminated_at == sql.null(), @@ -2081,23 +2120,31 @@ def instance_get_active_by_window_joined(context, begin, end=None, raise exception.MarkerNotFound(marker=marker) query = sqlalchemyutils.paginate_query( - query, models.Instance, limit, ['project_id', 'uuid'], marker=marker) + query, models.Instance, limit, ['project_id', 'uuid'], marker=marker, + ) + instances = query.all() - return _instances_fill_metadata(context, query.all(), manual_joins) + return _instances_fill_metadata(context, instances, manual_joins) def _instance_get_all_query(context, project_only=False, joins=None): if joins is None: joins = ['info_cache', 'security_groups'] - query = model_query(context, - models.Instance, - project_only=project_only) + query = model_query( + context, + models.Instance, + project_only=project_only, + ) for column in joins: if 'extra.' in column: - query = query.options(orm.undefer(column)) + column_ref = getattr(models.InstanceExtra, column.split('.')[1]) + query = query.options( + orm.joinedload(models.Instance.extra).undefer(column_ref) + ) else: - query = query.options(orm.joinedload(column)) + column_ref = getattr(models.Instance, column) + query = query.options(orm.joinedload(column_ref)) return query @@ -2105,9 +2152,12 @@ def _instance_get_all_query(context, project_only=False, joins=None): def instance_get_all_by_host(context, host, columns_to_join=None): """Get all instances belonging to a host.""" query = _instance_get_all_query(context, joins=columns_to_join) - return _instances_fill_metadata(context, - query.filter_by(host=host).all(), - manual_joins=columns_to_join) + instances = query.filter_by(host=host).all() + return _instances_fill_metadata( + context, + instances, + manual_joins=columns_to_join, + ) def _instance_get_all_uuids_by_hosts(context, hosts): @@ -2147,19 +2197,26 @@ def instance_get_all_by_host_and_node( candidates = ['system_metadata', 'metadata'] manual_joins = [x for x in columns_to_join if x in candidates] columns_to_join = list(set(columns_to_join) - set(candidates)) - return _instances_fill_metadata(context, - _instance_get_all_query( - context, - joins=columns_to_join).filter_by(host=host). - filter_by(node=node).all(), manual_joins=manual_joins) + instances = _instance_get_all_query( + context, + joins=columns_to_join, + ).filter_by(host=host).filter_by(node=node).all() + return _instances_fill_metadata( + context, + instances, + manual_joins=manual_joins, + ) @pick_context_manager_reader def instance_get_all_by_host_and_not_type(context, host, type_id=None): """Get all instances belonging to a host with a different type_id.""" - return _instances_fill_metadata(context, - _instance_get_all_query(context).filter_by(host=host). - filter(models.Instance.instance_type_id != type_id).all()) + instances = _instance_get_all_query(context).filter_by( + host=host, + ).filter( + models.Instance.instance_type_id != type_id + ).all() + return _instances_fill_metadata(context, instances) # NOTE(hanlind): This method can be removed as conductor RPC API moves to v2.0. @@ -2172,11 +2229,14 @@ def instance_get_all_hung_in_rebooting(context, reboot_window): # NOTE(danms): this is only used in the _poll_rebooting_instances() # call in compute/manager, so we can avoid the metadata lookups # explicitly - return _instances_fill_metadata(context, - model_query(context, models.Instance). - filter(models.Instance.updated_at <= reboot_window). - filter_by(task_state=task_states.REBOOTING).all(), - manual_joins=[]) + instances = model_query(context, models.Instance).filter( + models.Instance.updated_at <= reboot_window + ).filter_by(task_state=task_states.REBOOTING).all() + return _instances_fill_metadata( + context, + instances, + manual_joins=[], + ) def _retry_instance_update(): @@ -2505,13 +2565,15 @@ def instance_extra_get_by_instance_uuid( :param instance_uuid: UUID of the instance tied to the topology record :param columns: A list of the columns to load, or None for 'all of them' """ - query = model_query(context, models.InstanceExtra).\ - filter_by(instance_uuid=instance_uuid) + query = model_query(context, models.InstanceExtra).filter_by( + instance_uuid=instance_uuid, + ) if columns is None: columns = ['numa_topology', 'pci_requests', 'flavor', 'vcpu_model', 'trusted_certs', 'resources', 'migration_context'] for column in columns: - query = query.options(orm.undefer(column)) + column_ref = getattr(models.InstanceExtra, column) + query = query.options(orm.undefer(column_ref)) instance_extra = query.first() return instance_extra @@ -2733,7 +2795,8 @@ def _block_device_mapping_get_query(context, columns_to_join=None): query = model_query(context, models.BlockDeviceMapping) for column in columns_to_join: - query = query.options(orm.joinedload(column)) + column_ref = getattr(models.BlockDeviceMapping, column) + query = query.options(orm.joinedload(column_ref)) return query @@ -2950,10 +3013,18 @@ def security_group_create(context, values): def _security_group_get_query(context, read_deleted=None, project_only=False, join_rules=True): - query = model_query(context, models.SecurityGroup, - read_deleted=read_deleted, project_only=project_only) + query = model_query( + context, + models.SecurityGroup, + read_deleted=read_deleted, + project_only=project_only, + ) if join_rules: - query = query.options(_joinedload_all('rules.grantee_group')) + query = query.options( + orm.joinedload( + models.SecurityGroup.rules + ).joinedload(models.SecurityGroupIngressRule.grantee_group) + ) return query @@ -2998,8 +3069,7 @@ def security_group_get(context, security_group_id, columns_to_join=None): if columns_to_join is None: columns_to_join = [] for column in columns_to_join: - if column.startswith('instances'): - query = query.options(_joinedload_all(column)) + query = query.options(_joinedload_all(models.SecurityGroup, column)) result = query.first() if not result: @@ -3011,25 +3081,27 @@ def security_group_get(context, security_group_id, columns_to_join=None): @require_context @pick_context_manager_reader -def security_group_get_by_name( - context, project_id, group_name, columns_to_join=None, -): +def security_group_get_by_name(context, project_id, group_name): """Returns a security group with the specified name from a project.""" - query = _security_group_get_query(context, - read_deleted="no", join_rules=False).\ - filter_by(project_id=project_id).\ - filter_by(name=group_name) - - if columns_to_join is None: - columns_to_join = ['instances', 'rules.grantee_group'] - - for column in columns_to_join: - query = query.options(_joinedload_all(column)) + query = _security_group_get_query( + context, read_deleted="no", join_rules=False, + ).filter_by( + project_id=project_id, + ).filter_by( + name=group_name, + ).options( + orm.joinedload(models.SecurityGroup.instances) + ).options( + orm.joinedload( + models.SecurityGroup.rules + ).joinedload(models.SecurityGroupIngressRule.grantee_group) + ) result = query.first() if not result: raise exception.SecurityGroupNotFoundForProject( - project_id=project_id, security_group_id=group_name) + project_id=project_id, security_group_id=group_name, + ) return result @@ -3077,14 +3149,11 @@ def security_group_in_use(context, group_id): @require_context @pick_context_manager_writer -def security_group_update(context, security_group_id, values, - columns_to_join=None): +def security_group_update(context, security_group_id, values): """Update a security group.""" query = model_query(context, models.SecurityGroup).filter_by( - id=security_group_id) - if columns_to_join: - for column in columns_to_join: - query = query.options(_joinedload_all(column)) + id=security_group_id, + ) security_group_ref = query.first() if not security_group_ref: @@ -3265,20 +3334,36 @@ def migration_get_in_progress_by_host_and_node(context, host, node): # 'finished' means a resize is finished on the destination host # and the instance is in VERIFY_RESIZE state, so the end state # for a resize is actually 'confirmed' or 'reverted'. - return model_query(context, models.Migration).\ - filter(sql.or_( - sql.and_( - models.Migration.source_compute == host, - models.Migration.source_node == node), - sql.and_( - models.Migration.dest_compute == host, - models.Migration.dest_node == node))).\ - filter(~models.Migration.status.in_(['confirmed', 'reverted', - 'error', 'failed', - 'completed', 'cancelled', - 'done'])).\ - options(_joinedload_all('instance.system_metadata')).\ - all() + return model_query( + context, models.Migration, + ).filter( + sql.or_( + sql.and_( + models.Migration.source_compute == host, + models.Migration.source_node == node, + ), + sql.and_( + models.Migration.dest_compute == host, + models.Migration.dest_node == node, + ), + ) + ).filter( + ~models.Migration.status.in_( + [ + 'confirmed', + 'reverted', + 'error', + 'failed', + 'completed', + 'cancelled', + 'done', + ] + ) + ).options( + orm.joinedload( + models.Migration.instance + ).joinedload(models.Instance.system_metadata) + ).all() @pick_context_manager_reader @@ -3413,19 +3498,32 @@ def migration_get_in_progress_and_error_by_host_and_node(context, host, node): """Finds all in progress migrations and error migrations for the given host and node. """ - return model_query(context, models.Migration).\ - filter(sql.or_( - sql.and_( - models.Migration.source_compute == host, - models.Migration.source_node == node), - sql.and_( - models.Migration.dest_compute == host, - models.Migration.dest_node == node))).\ - filter(~models.Migration.status.in_(['confirmed', 'reverted', - 'failed', 'completed', - 'cancelled', 'done'])).\ - options(_joinedload_all('instance.system_metadata')).\ - all() + return model_query( + context, models.Migration, + ).filter( + sql.or_( + sql.and_( + models.Migration.source_compute == host, + models.Migration.source_node == node), + sql.and_( + models.Migration.dest_compute == host, + models.Migration.dest_node == node, + ), + ) + ).filter( + ~models.Migration.status.in_([ + 'confirmed', + 'reverted', + 'failed', + 'completed', + 'cancelled', + 'done', + ]) + ).options( + orm.joinedload( + models.Migration.instance + ).joinedload(models.Instance.system_metadata) + ).all() ######################## @@ -4176,6 +4274,12 @@ def _get_fk_stmts(metadata, conn, table, column, records): fk_column = fk_table.c.id for fk in fk_table.foreign_keys: + if table != fk.column.table: + # if the foreign key doesn't actually point to the table we're + # archiving entries from then it's not relevant; trying to + # resolve this would result in a cartesian product + continue + # We need to find the records in the referring (child) table that # correspond to the records in our (parent) table so we can archive # them. @@ -4194,7 +4298,8 @@ def _get_fk_stmts(metadata, conn, table, column, records): select = sql.select(fk.column).where( sql.and_(fk.parent == fk.column, column.in_(records)) ) - rows = conn.execute(select).fetchall() + with conn.begin(): + rows = conn.execute(select).fetchall() p_records = [r[0] for r in rows] # Then, select rows in the child table that correspond to the # parent table records that were passed in. @@ -4209,7 +4314,8 @@ def _get_fk_stmts(metadata, conn, table, column, records): fk_select = sql.select(fk_column).where( sql.and_(fk.parent == fk.column, fk.column.in_(p_records)) ) - fk_rows = conn.execute(fk_select).fetchall() + with conn.begin(): + fk_rows = conn.execute(fk_select).fetchall() fk_records = [r[0] for r in fk_rows] if fk_records: # If we found any records in the child table, create shadow @@ -4225,6 +4331,7 @@ def _get_fk_stmts(metadata, conn, table, column, records): # deque. fk_delete = fk_table.delete().where(fk_column.in_(fk_records)) deletes.appendleft(fk_delete) + # Repeat for any possible nested child tables. i, d = _get_fk_stmts(metadata, conn, fk_table, fk_column, fk_records) inserts.extendleft(i) @@ -4262,9 +4369,11 @@ def _archive_deleted_rows_for_table( deleted_instance_uuids = [] try: shadow_table = schema.Table( - shadow_tablename, metadata, autoload_with=conn) + shadow_tablename, metadata, autoload_with=conn, + ) except sqla_exc.NoSuchTableError: # No corresponding shadow table; skip it. + conn.close() return rows_archived, deleted_instance_uuids, {} # TODO(stephenfin): Drop this when we drop the table @@ -4296,7 +4405,8 @@ def _archive_deleted_rows_for_table( select = select.where(table.c.updated_at < before) select = select.order_by(column).limit(max_rows) - rows = conn.execute(select).fetchall() + with conn.begin(): + rows = conn.execute(select).fetchall() records = [r[0] for r in rows] # We will archive deleted rows for this table and also generate insert and @@ -4332,7 +4442,8 @@ def _archive_deleted_rows_for_table( query_select = sql.select(table.c.uuid).where( table.c.id.in_(records) ) - rows = conn.execute(query_select).fetchall() + with conn.begin(): + rows = conn.execute(query_select).fetchall() deleted_instance_uuids = [r[0] for r in rows] try: @@ -4354,6 +4465,8 @@ def _archive_deleted_rows_for_table( "%(tablename)s: %(error)s", {'tablename': tablename, 'error': str(ex)}) + conn.close() + return rows_archived, deleted_instance_uuids, extras @@ -4476,7 +4589,8 @@ def purge_shadow_tables(context, before_date, status_fn=None): else: delete = table.delete() - deleted = conn.execute(delete) + with conn.begin(): + deleted = conn.execute(delete) if deleted.rowcount > 0: status_fn(_('Deleted %(rows)i rows from %(table)s based on ' 'timestamp column %(col)s') % { @@ -4485,6 +4599,8 @@ def purge_shadow_tables(context, before_date, status_fn=None): 'col': col is None and '(n/a)' or col.name}) total_deleted += deleted.rowcount + conn.close() + return total_deleted |