summaryrefslogtreecommitdiff
path: root/nova/db/main/api.py
diff options
context:
space:
mode:
Diffstat (limited to 'nova/db/main/api.py')
-rw-r--r--nova/db/main/api.py368
1 files changed, 242 insertions, 126 deletions
diff --git a/nova/db/main/api.py b/nova/db/main/api.py
index 4c40be905e..7d24f974f9 100644
--- a/nova/db/main/api.py
+++ b/nova/db/main/api.py
@@ -79,11 +79,28 @@ def _context_manager_from_context(context):
pass
-def _joinedload_all(column):
+def _joinedload_all(lead_entity, column):
+ """Do a nested load.
+
+ For example, resolve the following::
+
+ _joinedload_all(models.SecurityGroup, 'instances.info_cache')
+
+ to:
+
+ orm.joinedload(
+ models.SecurityGroup.instances
+ ).joinedload(
+ Instance.info_cache
+ )
+ """
elements = column.split('.')
- joined = orm.joinedload(elements.pop(0))
+ relationship_attr = getattr(lead_entity, elements.pop(0))
+ joined = orm.joinedload(relationship_attr)
for element in elements:
- joined = joined.joinedload(element)
+ relationship_entity = relationship_attr.entity.class_
+ relationship_attr = getattr(relationship_entity, element)
+ joined = joined.joinedload(relationship_attr)
return joined
@@ -562,7 +579,7 @@ def _compute_node_select(context, filters=None, limit=None, marker=None):
if filters is None:
filters = {}
- cn_tbl = sa.alias(models.ComputeNode.__table__, name='cn')
+ cn_tbl = models.ComputeNode.__table__.alias('cn')
select = sa.select(cn_tbl)
if context.read_deleted == "no":
@@ -595,9 +612,9 @@ def _compute_node_select(context, filters=None, limit=None, marker=None):
def _compute_node_fetchall(context, filters=None, limit=None, marker=None):
select = _compute_node_select(context, filters, limit=limit, marker=marker)
engine = get_engine(context=context)
- conn = engine.connect()
- results = conn.execute(select).fetchall()
+ with engine.connect() as conn, conn.begin():
+ results = conn.execute(select).fetchall()
# Callers expect dict-like objects, not SQLAlchemy RowProxy objects...
results = [dict(r._mapping) for r in results]
@@ -909,7 +926,7 @@ def compute_node_statistics(context):
engine = get_engine(context=context)
services_tbl = models.Service.__table__
- inner_sel = sa.alias(_compute_node_select(context), name='inner_sel')
+ inner_sel = _compute_node_select(context).alias('inner_sel')
# TODO(sbauza): Remove the service_id filter in a later release
# once we are sure that all compute nodes report the host field
@@ -966,9 +983,9 @@ def compute_node_statistics(context):
).label('disk_available_least'),
]
select = sql.select(*agg_cols).select_from(j)
- conn = engine.connect()
- results = conn.execute(select).fetchone()
+ with engine.connect() as conn, conn.begin():
+ results = conn.execute(select).fetchone()
# Build a dict of the info--making no assumptions about result
fields = ('count', 'vcpus', 'memory_mb', 'local_gb', 'vcpus_used',
@@ -976,7 +993,6 @@ def compute_node_statistics(context):
'current_workload', 'running_vms', 'disk_available_least')
results = {field: int(results[idx] or 0)
for idx, field in enumerate(fields)}
- conn.close()
return results
@@ -1381,9 +1397,9 @@ def instance_get_by_uuid(context, uuid, columns_to_join=None):
def _instance_get_by_uuid(context, uuid, columns_to_join=None):
- result = _build_instance_get(context, columns_to_join=columns_to_join).\
- filter_by(uuid=uuid).\
- first()
+ result = _build_instance_get(
+ context, columns_to_join=columns_to_join
+ ).filter_by(uuid=uuid).first()
if not result:
raise exception.InstanceNotFound(instance_id=uuid)
@@ -1411,9 +1427,13 @@ def instance_get(context, instance_id, columns_to_join=None):
def _build_instance_get(context, columns_to_join=None):
- query = model_query(context, models.Instance, project_only=True).\
- options(_joinedload_all('security_groups.rules')).\
- options(orm.joinedload('info_cache'))
+ query = model_query(
+ context, models.Instance, project_only=True,
+ ).options(
+ orm.joinedload(
+ models.Instance.security_groups
+ ).joinedload(models.SecurityGroup.rules)
+ ).options(orm.joinedload(models.Instance.info_cache))
if columns_to_join is None:
columns_to_join = ['metadata', 'system_metadata']
for column in columns_to_join:
@@ -1421,7 +1441,10 @@ def _build_instance_get(context, columns_to_join=None):
# Already always joined above
continue
if 'extra.' in column:
- query = query.options(orm.undefer(column))
+ column_ref = getattr(models.InstanceExtra, column.split('.')[1])
+ query = query.options(
+ orm.joinedload(models.Instance.extra).undefer(column_ref)
+ )
elif column in ['metadata', 'system_metadata']:
# NOTE(melwitt): We use subqueryload() instead of joinedload() for
# metadata and system_metadata because of the one-to-many
@@ -1431,13 +1454,16 @@ def _build_instance_get(context, columns_to_join=None):
# in a large data transfer. Instead, the subqueryload() will
# perform additional queries to obtain metadata and system_metadata
# for the instance.
- query = query.options(orm.subqueryload(column))
+ column_ref = getattr(models.Instance, column)
+ query = query.options(orm.subqueryload(column_ref))
else:
- query = query.options(orm.joinedload(column))
+ column_ref = getattr(models.Instance, column)
+ query = query.options(orm.joinedload(column_ref))
# NOTE(alaski) Stop lazy loading of columns not needed.
- for col in ['metadata', 'system_metadata']:
- if col not in columns_to_join:
- query = query.options(orm.noload(col))
+ for column in ['metadata', 'system_metadata']:
+ if column not in columns_to_join:
+ column_ref = getattr(models.Instance, column)
+ query = query.options(orm.noload(column_ref))
# NOTE(melwitt): We need to use order_by(<unique column>) so that the
# additional queries emitted by subqueryload() include the same ordering as
# used by the parent query.
@@ -1530,7 +1556,8 @@ def instance_get_all(context, columns_to_join=None):
_manual_join_columns(columns_to_join))
query = model_query(context, models.Instance)
for column in columns_to_join_new:
- query = query.options(orm.joinedload(column))
+ column_ref = getattr(models.Instance, column)
+ query = query.options(orm.joinedload(column_ref))
if not context.is_admin:
# If we're not admin context, add appropriate filter..
if context.project_id:
@@ -1671,9 +1698,13 @@ def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None,
query_prefix = context.session.query(models.Instance)
for column in columns_to_join_new:
if 'extra.' in column:
- query_prefix = query_prefix.options(orm.undefer(column))
+ column_ref = getattr(models.InstanceExtra, column.split('.')[1])
+ query_prefix = query_prefix.options(
+ orm.joinedload(models.Instance.extra).undefer(column_ref)
+ )
else:
- query_prefix = query_prefix.options(orm.joinedload(column))
+ column_ref = getattr(models.Instance, column)
+ query_prefix = query_prefix.options(orm.joinedload(column_ref))
# Note: order_by is done in the sqlalchemy.utils.py paginate_query(),
# no need to do it here as well
@@ -1683,9 +1714,9 @@ def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None,
filters = copy.deepcopy(filters)
model_object = models.Instance
- query_prefix = _get_query_nova_resource_by_changes_time(query_prefix,
- filters,
- model_object)
+ query_prefix = _get_query_nova_resource_by_changes_time(
+ query_prefix, filters, model_object,
+ )
if 'deleted' in filters:
# Instances can be soft or hard deleted and the query needs to
@@ -1697,14 +1728,12 @@ def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None,
models.Instance.deleted == models.Instance.id,
models.Instance.vm_state == vm_states.SOFT_DELETED
)
- query_prefix = query_prefix.\
- filter(delete)
+ query_prefix = query_prefix.filter(delete)
else:
query_prefix = query_prefix.\
filter(models.Instance.deleted == models.Instance.id)
else:
- query_prefix = query_prefix.\
- filter_by(deleted=0)
+ query_prefix = query_prefix.filter_by(deleted=0)
if not filters.pop('soft_deleted', False):
# It would be better to have vm_state not be nullable
# but until then we test it explicitly as a workaround.
@@ -1794,19 +1823,25 @@ def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None,
if marker is not None:
try:
marker = _instance_get_by_uuid(
- context.elevated(read_deleted='yes'), marker)
+ context.elevated(read_deleted='yes'), marker,
+ )
except exception.InstanceNotFound:
raise exception.MarkerNotFound(marker=marker)
try:
- query_prefix = sqlalchemyutils.paginate_query(query_prefix,
- models.Instance, limit,
- sort_keys,
- marker=marker,
- sort_dirs=sort_dirs)
+ query_prefix = sqlalchemyutils.paginate_query(
+ query_prefix,
+ models.Instance,
+ limit,
+ sort_keys,
+ marker=marker,
+ sort_dirs=sort_dirs,
+ )
except db_exc.InvalidSortKey:
raise exception.InvalidSortKey()
- return _instances_fill_metadata(context, query_prefix.all(), manual_joins)
+ instances = query_prefix.all()
+
+ return _instances_fill_metadata(context, instances, manual_joins)
@require_context
@@ -2059,9 +2094,13 @@ def instance_get_active_by_window_joined(context, begin, end=None,
for column in columns_to_join_new:
if 'extra.' in column:
- query = query.options(orm.undefer(column))
+ column_ref = getattr(models.InstanceExtra, column.split('.')[1])
+ query = query.options(
+ orm.joinedload(models.Instance.extra).undefer(column_ref)
+ )
else:
- query = query.options(orm.joinedload(column))
+ column_ref = getattr(models.Instance, column)
+ query = query.options(orm.joinedload(column_ref))
query = query.filter(sql.or_(
models.Instance.terminated_at == sql.null(),
@@ -2081,23 +2120,31 @@ def instance_get_active_by_window_joined(context, begin, end=None,
raise exception.MarkerNotFound(marker=marker)
query = sqlalchemyutils.paginate_query(
- query, models.Instance, limit, ['project_id', 'uuid'], marker=marker)
+ query, models.Instance, limit, ['project_id', 'uuid'], marker=marker,
+ )
+ instances = query.all()
- return _instances_fill_metadata(context, query.all(), manual_joins)
+ return _instances_fill_metadata(context, instances, manual_joins)
def _instance_get_all_query(context, project_only=False, joins=None):
if joins is None:
joins = ['info_cache', 'security_groups']
- query = model_query(context,
- models.Instance,
- project_only=project_only)
+ query = model_query(
+ context,
+ models.Instance,
+ project_only=project_only,
+ )
for column in joins:
if 'extra.' in column:
- query = query.options(orm.undefer(column))
+ column_ref = getattr(models.InstanceExtra, column.split('.')[1])
+ query = query.options(
+ orm.joinedload(models.Instance.extra).undefer(column_ref)
+ )
else:
- query = query.options(orm.joinedload(column))
+ column_ref = getattr(models.Instance, column)
+ query = query.options(orm.joinedload(column_ref))
return query
@@ -2105,9 +2152,12 @@ def _instance_get_all_query(context, project_only=False, joins=None):
def instance_get_all_by_host(context, host, columns_to_join=None):
"""Get all instances belonging to a host."""
query = _instance_get_all_query(context, joins=columns_to_join)
- return _instances_fill_metadata(context,
- query.filter_by(host=host).all(),
- manual_joins=columns_to_join)
+ instances = query.filter_by(host=host).all()
+ return _instances_fill_metadata(
+ context,
+ instances,
+ manual_joins=columns_to_join,
+ )
def _instance_get_all_uuids_by_hosts(context, hosts):
@@ -2147,19 +2197,26 @@ def instance_get_all_by_host_and_node(
candidates = ['system_metadata', 'metadata']
manual_joins = [x for x in columns_to_join if x in candidates]
columns_to_join = list(set(columns_to_join) - set(candidates))
- return _instances_fill_metadata(context,
- _instance_get_all_query(
- context,
- joins=columns_to_join).filter_by(host=host).
- filter_by(node=node).all(), manual_joins=manual_joins)
+ instances = _instance_get_all_query(
+ context,
+ joins=columns_to_join,
+ ).filter_by(host=host).filter_by(node=node).all()
+ return _instances_fill_metadata(
+ context,
+ instances,
+ manual_joins=manual_joins,
+ )
@pick_context_manager_reader
def instance_get_all_by_host_and_not_type(context, host, type_id=None):
"""Get all instances belonging to a host with a different type_id."""
- return _instances_fill_metadata(context,
- _instance_get_all_query(context).filter_by(host=host).
- filter(models.Instance.instance_type_id != type_id).all())
+ instances = _instance_get_all_query(context).filter_by(
+ host=host,
+ ).filter(
+ models.Instance.instance_type_id != type_id
+ ).all()
+ return _instances_fill_metadata(context, instances)
# NOTE(hanlind): This method can be removed as conductor RPC API moves to v2.0.
@@ -2172,11 +2229,14 @@ def instance_get_all_hung_in_rebooting(context, reboot_window):
# NOTE(danms): this is only used in the _poll_rebooting_instances()
# call in compute/manager, so we can avoid the metadata lookups
# explicitly
- return _instances_fill_metadata(context,
- model_query(context, models.Instance).
- filter(models.Instance.updated_at <= reboot_window).
- filter_by(task_state=task_states.REBOOTING).all(),
- manual_joins=[])
+ instances = model_query(context, models.Instance).filter(
+ models.Instance.updated_at <= reboot_window
+ ).filter_by(task_state=task_states.REBOOTING).all()
+ return _instances_fill_metadata(
+ context,
+ instances,
+ manual_joins=[],
+ )
def _retry_instance_update():
@@ -2505,13 +2565,15 @@ def instance_extra_get_by_instance_uuid(
:param instance_uuid: UUID of the instance tied to the topology record
:param columns: A list of the columns to load, or None for 'all of them'
"""
- query = model_query(context, models.InstanceExtra).\
- filter_by(instance_uuid=instance_uuid)
+ query = model_query(context, models.InstanceExtra).filter_by(
+ instance_uuid=instance_uuid,
+ )
if columns is None:
columns = ['numa_topology', 'pci_requests', 'flavor', 'vcpu_model',
'trusted_certs', 'resources', 'migration_context']
for column in columns:
- query = query.options(orm.undefer(column))
+ column_ref = getattr(models.InstanceExtra, column)
+ query = query.options(orm.undefer(column_ref))
instance_extra = query.first()
return instance_extra
@@ -2733,7 +2795,8 @@ def _block_device_mapping_get_query(context, columns_to_join=None):
query = model_query(context, models.BlockDeviceMapping)
for column in columns_to_join:
- query = query.options(orm.joinedload(column))
+ column_ref = getattr(models.BlockDeviceMapping, column)
+ query = query.options(orm.joinedload(column_ref))
return query
@@ -2950,10 +3013,18 @@ def security_group_create(context, values):
def _security_group_get_query(context, read_deleted=None,
project_only=False, join_rules=True):
- query = model_query(context, models.SecurityGroup,
- read_deleted=read_deleted, project_only=project_only)
+ query = model_query(
+ context,
+ models.SecurityGroup,
+ read_deleted=read_deleted,
+ project_only=project_only,
+ )
if join_rules:
- query = query.options(_joinedload_all('rules.grantee_group'))
+ query = query.options(
+ orm.joinedload(
+ models.SecurityGroup.rules
+ ).joinedload(models.SecurityGroupIngressRule.grantee_group)
+ )
return query
@@ -2998,8 +3069,7 @@ def security_group_get(context, security_group_id, columns_to_join=None):
if columns_to_join is None:
columns_to_join = []
for column in columns_to_join:
- if column.startswith('instances'):
- query = query.options(_joinedload_all(column))
+ query = query.options(_joinedload_all(models.SecurityGroup, column))
result = query.first()
if not result:
@@ -3011,25 +3081,27 @@ def security_group_get(context, security_group_id, columns_to_join=None):
@require_context
@pick_context_manager_reader
-def security_group_get_by_name(
- context, project_id, group_name, columns_to_join=None,
-):
+def security_group_get_by_name(context, project_id, group_name):
"""Returns a security group with the specified name from a project."""
- query = _security_group_get_query(context,
- read_deleted="no", join_rules=False).\
- filter_by(project_id=project_id).\
- filter_by(name=group_name)
-
- if columns_to_join is None:
- columns_to_join = ['instances', 'rules.grantee_group']
-
- for column in columns_to_join:
- query = query.options(_joinedload_all(column))
+ query = _security_group_get_query(
+ context, read_deleted="no", join_rules=False,
+ ).filter_by(
+ project_id=project_id,
+ ).filter_by(
+ name=group_name,
+ ).options(
+ orm.joinedload(models.SecurityGroup.instances)
+ ).options(
+ orm.joinedload(
+ models.SecurityGroup.rules
+ ).joinedload(models.SecurityGroupIngressRule.grantee_group)
+ )
result = query.first()
if not result:
raise exception.SecurityGroupNotFoundForProject(
- project_id=project_id, security_group_id=group_name)
+ project_id=project_id, security_group_id=group_name,
+ )
return result
@@ -3077,14 +3149,11 @@ def security_group_in_use(context, group_id):
@require_context
@pick_context_manager_writer
-def security_group_update(context, security_group_id, values,
- columns_to_join=None):
+def security_group_update(context, security_group_id, values):
"""Update a security group."""
query = model_query(context, models.SecurityGroup).filter_by(
- id=security_group_id)
- if columns_to_join:
- for column in columns_to_join:
- query = query.options(_joinedload_all(column))
+ id=security_group_id,
+ )
security_group_ref = query.first()
if not security_group_ref:
@@ -3265,20 +3334,36 @@ def migration_get_in_progress_by_host_and_node(context, host, node):
# 'finished' means a resize is finished on the destination host
# and the instance is in VERIFY_RESIZE state, so the end state
# for a resize is actually 'confirmed' or 'reverted'.
- return model_query(context, models.Migration).\
- filter(sql.or_(
- sql.and_(
- models.Migration.source_compute == host,
- models.Migration.source_node == node),
- sql.and_(
- models.Migration.dest_compute == host,
- models.Migration.dest_node == node))).\
- filter(~models.Migration.status.in_(['confirmed', 'reverted',
- 'error', 'failed',
- 'completed', 'cancelled',
- 'done'])).\
- options(_joinedload_all('instance.system_metadata')).\
- all()
+ return model_query(
+ context, models.Migration,
+ ).filter(
+ sql.or_(
+ sql.and_(
+ models.Migration.source_compute == host,
+ models.Migration.source_node == node,
+ ),
+ sql.and_(
+ models.Migration.dest_compute == host,
+ models.Migration.dest_node == node,
+ ),
+ )
+ ).filter(
+ ~models.Migration.status.in_(
+ [
+ 'confirmed',
+ 'reverted',
+ 'error',
+ 'failed',
+ 'completed',
+ 'cancelled',
+ 'done',
+ ]
+ )
+ ).options(
+ orm.joinedload(
+ models.Migration.instance
+ ).joinedload(models.Instance.system_metadata)
+ ).all()
@pick_context_manager_reader
@@ -3413,19 +3498,32 @@ def migration_get_in_progress_and_error_by_host_and_node(context, host, node):
"""Finds all in progress migrations and error migrations for the given
host and node.
"""
- return model_query(context, models.Migration).\
- filter(sql.or_(
- sql.and_(
- models.Migration.source_compute == host,
- models.Migration.source_node == node),
- sql.and_(
- models.Migration.dest_compute == host,
- models.Migration.dest_node == node))).\
- filter(~models.Migration.status.in_(['confirmed', 'reverted',
- 'failed', 'completed',
- 'cancelled', 'done'])).\
- options(_joinedload_all('instance.system_metadata')).\
- all()
+ return model_query(
+ context, models.Migration,
+ ).filter(
+ sql.or_(
+ sql.and_(
+ models.Migration.source_compute == host,
+ models.Migration.source_node == node),
+ sql.and_(
+ models.Migration.dest_compute == host,
+ models.Migration.dest_node == node,
+ ),
+ )
+ ).filter(
+ ~models.Migration.status.in_([
+ 'confirmed',
+ 'reverted',
+ 'failed',
+ 'completed',
+ 'cancelled',
+ 'done',
+ ])
+ ).options(
+ orm.joinedload(
+ models.Migration.instance
+ ).joinedload(models.Instance.system_metadata)
+ ).all()
########################
@@ -4176,6 +4274,12 @@ def _get_fk_stmts(metadata, conn, table, column, records):
fk_column = fk_table.c.id
for fk in fk_table.foreign_keys:
+ if table != fk.column.table:
+ # if the foreign key doesn't actually point to the table we're
+ # archiving entries from then it's not relevant; trying to
+ # resolve this would result in a cartesian product
+ continue
+
# We need to find the records in the referring (child) table that
# correspond to the records in our (parent) table so we can archive
# them.
@@ -4194,7 +4298,8 @@ def _get_fk_stmts(metadata, conn, table, column, records):
select = sql.select(fk.column).where(
sql.and_(fk.parent == fk.column, column.in_(records))
)
- rows = conn.execute(select).fetchall()
+ with conn.begin():
+ rows = conn.execute(select).fetchall()
p_records = [r[0] for r in rows]
# Then, select rows in the child table that correspond to the
# parent table records that were passed in.
@@ -4209,7 +4314,8 @@ def _get_fk_stmts(metadata, conn, table, column, records):
fk_select = sql.select(fk_column).where(
sql.and_(fk.parent == fk.column, fk.column.in_(p_records))
)
- fk_rows = conn.execute(fk_select).fetchall()
+ with conn.begin():
+ fk_rows = conn.execute(fk_select).fetchall()
fk_records = [r[0] for r in fk_rows]
if fk_records:
# If we found any records in the child table, create shadow
@@ -4225,6 +4331,7 @@ def _get_fk_stmts(metadata, conn, table, column, records):
# deque.
fk_delete = fk_table.delete().where(fk_column.in_(fk_records))
deletes.appendleft(fk_delete)
+
# Repeat for any possible nested child tables.
i, d = _get_fk_stmts(metadata, conn, fk_table, fk_column, fk_records)
inserts.extendleft(i)
@@ -4262,9 +4369,11 @@ def _archive_deleted_rows_for_table(
deleted_instance_uuids = []
try:
shadow_table = schema.Table(
- shadow_tablename, metadata, autoload_with=conn)
+ shadow_tablename, metadata, autoload_with=conn,
+ )
except sqla_exc.NoSuchTableError:
# No corresponding shadow table; skip it.
+ conn.close()
return rows_archived, deleted_instance_uuids, {}
# TODO(stephenfin): Drop this when we drop the table
@@ -4296,7 +4405,8 @@ def _archive_deleted_rows_for_table(
select = select.where(table.c.updated_at < before)
select = select.order_by(column).limit(max_rows)
- rows = conn.execute(select).fetchall()
+ with conn.begin():
+ rows = conn.execute(select).fetchall()
records = [r[0] for r in rows]
# We will archive deleted rows for this table and also generate insert and
@@ -4332,7 +4442,8 @@ def _archive_deleted_rows_for_table(
query_select = sql.select(table.c.uuid).where(
table.c.id.in_(records)
)
- rows = conn.execute(query_select).fetchall()
+ with conn.begin():
+ rows = conn.execute(query_select).fetchall()
deleted_instance_uuids = [r[0] for r in rows]
try:
@@ -4354,6 +4465,8 @@ def _archive_deleted_rows_for_table(
"%(tablename)s: %(error)s",
{'tablename': tablename, 'error': str(ex)})
+ conn.close()
+
return rows_archived, deleted_instance_uuids, extras
@@ -4476,7 +4589,8 @@ def purge_shadow_tables(context, before_date, status_fn=None):
else:
delete = table.delete()
- deleted = conn.execute(delete)
+ with conn.begin():
+ deleted = conn.execute(delete)
if deleted.rowcount > 0:
status_fn(_('Deleted %(rows)i rows from %(table)s based on '
'timestamp column %(col)s') % {
@@ -4485,6 +4599,8 @@ def purge_shadow_tables(context, before_date, status_fn=None):
'col': col is None and '(n/a)' or col.name})
total_deleted += deleted.rowcount
+ conn.close()
+
return total_deleted