summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/orm/loading.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/orm/loading.py')
-rw-r--r--lib/sqlalchemy/orm/loading.py231
1 files changed, 191 insertions, 40 deletions
diff --git a/lib/sqlalchemy/orm/loading.py b/lib/sqlalchemy/orm/loading.py
index 48641685e..616e757a3 100644
--- a/lib/sqlalchemy/orm/loading.py
+++ b/lib/sqlalchemy/orm/loading.py
@@ -26,6 +26,7 @@ from .base import _SET_DEFERRED_EXPIRED
from .util import _none_set
from .util import state_str
from .. import exc as sa_exc
+from .. import future
from .. import util
from ..engine import result_tuple
from ..engine.result import ChunkedIteratorResult
@@ -36,8 +37,20 @@ from ..sql import util as sql_util
_new_runid = util.counter()
-def instances(query, cursor, context):
- """Return an ORM result as an iterator."""
+def instances(cursor, context):
+ """Return a :class:`.Result` given an ORM query context.
+
+ :param cursor: a :class:`.CursorResult`, generated by a statement
+ which came from :class:`.ORMCompileState`
+
+ :param context: a :class:`.QueryContext` object
+
+ :return: a :class:`.Result` object representing ORM results
+
+ .. versionchanged:: 1.4 The instances() function now uses
+ :class:`.Result` objects and has an all new interface.
+
+ """
context.runid = _new_runid()
context.post_load_paths = {}
@@ -80,7 +93,7 @@ def instances(query, cursor, context):
],
)
- def chunks(size, as_tuples):
+ def chunks(size):
while True:
yield_per = size
@@ -94,7 +107,7 @@ def instances(query, cursor, context):
else:
fetch = cursor.fetchall()
- if not as_tuples:
+ if single_entity:
proc = process[0]
rows = [proc(row) for row in fetch]
else:
@@ -111,20 +124,62 @@ def instances(query, cursor, context):
break
result = ChunkedIteratorResult(
- row_metadata, chunks, source_supports_scalars=single_entity
+ row_metadata, chunks, source_supports_scalars=single_entity, raw=cursor
+ )
+
+ result._attributes = result._attributes.union(
+ dict(filtered=filtered, is_single_entity=single_entity)
)
+
if context.yield_per:
result.yield_per(context.yield_per)
- if single_entity:
- result = result.scalars()
+ return result
- filtered = context.compile_state._has_mapper_entities
- if filtered:
- result = result.unique()
+@util.preload_module("sqlalchemy.orm.context")
+def merge_frozen_result(session, statement, frozen_result, load=True):
+ querycontext = util.preloaded.orm_context
- return result
+ if load:
+ # flush current contents if we expect to load data
+ session._autoflush()
+
+ ctx = querycontext.ORMSelectCompileState._create_entities_collection(
+ statement
+ )
+
+ autoflush = session.autoflush
+ try:
+ session.autoflush = False
+ mapped_entities = [
+ i
+ for i, e in enumerate(ctx._entities)
+ if isinstance(e, querycontext._MapperEntity)
+ ]
+ keys = [ent._label_name for ent in ctx._entities]
+
+ keyed_tuple = result_tuple(
+ keys, [ent._extra_entities for ent in ctx._entities]
+ )
+
+ result = []
+ for newrow in frozen_result.rewrite_rows():
+ for i in mapped_entities:
+ if newrow[i] is not None:
+ newrow[i] = session._merge(
+ attributes.instance_state(newrow[i]),
+ attributes.instance_dict(newrow[i]),
+ load=load,
+ _recursive={},
+ _resolve_conflict_map={},
+ )
+
+ result.append(keyed_tuple(newrow))
+
+ return frozen_result.with_new_rows(result)
+ finally:
+ session.autoflush = autoflush
@util.preload_module("sqlalchemy.orm.context")
@@ -145,9 +200,7 @@ def merge_result(query, iterator, load=True):
else:
frozen_result = None
- ctx = querycontext.QueryCompileState._create_for_legacy_query(
- query, entities_only=True
- )
+ ctx = querycontext.ORMSelectCompileState._create_entities_collection(query)
autoflush = session.autoflush
try:
@@ -235,12 +288,15 @@ def get_from_identity(session, mapper, key, passive):
def load_on_ident(
- query,
+ session,
+ statement,
key,
+ load_options=None,
refresh_state=None,
with_for_update=None,
only_load_props=None,
no_autoflush=False,
+ bind_arguments=util.immutabledict(),
):
"""Load the given identity key from the database."""
if key is not None:
@@ -249,38 +305,59 @@ def load_on_ident(
else:
ident = identity_token = None
- if no_autoflush:
- query = query.autoflush(False)
-
return load_on_pk_identity(
- query,
+ session,
+ statement,
ident,
+ load_options=load_options,
refresh_state=refresh_state,
with_for_update=with_for_update,
only_load_props=only_load_props,
identity_token=identity_token,
+ no_autoflush=no_autoflush,
+ bind_arguments=bind_arguments,
)
def load_on_pk_identity(
- query,
+ session,
+ statement,
primary_key_identity,
+ load_options=None,
refresh_state=None,
with_for_update=None,
only_load_props=None,
identity_token=None,
+ no_autoflush=False,
+ bind_arguments=util.immutabledict(),
):
"""Load the given primary key identity from the database."""
+ query = statement
+ q = query._clone()
+
+ # TODO: fix these imports ....
+ from .context import QueryContext, ORMCompileState
+
+ if load_options is None:
+ load_options = QueryContext.default_load_options
+
+ compile_options = ORMCompileState.default_compile_options.merge(
+ q.compile_options
+ )
+
+ # checking that query doesnt have criteria on it
+ # just delete it here w/ optional assertion? since we are setting a
+ # where clause also
if refresh_state is None:
- q = query._clone()
- q._get_condition()
- else:
- q = query._clone()
+ _no_criterion_assertion(q, "get", order_by=False, distinct=False)
if primary_key_identity is not None:
- mapper = query._only_full_mapper_zero("load_on_pk_identity")
+ # mapper = query._only_full_mapper_zero("load_on_pk_identity")
+
+ # TODO: error checking?
+ mapper = query._raw_columns[0]._annotations["parententity"]
(_get_clause, _get_params) = mapper._get_clause
@@ -320,9 +397,8 @@ def load_on_pk_identity(
]
)
- q.load_options += {"_params": params}
+ load_options += {"_params": params}
- # with_for_update needs to be query.LockmodeArg()
if with_for_update is not None:
version_check = True
q._for_update_arg = with_for_update
@@ -333,11 +409,15 @@ def load_on_pk_identity(
version_check = False
if refresh_state and refresh_state.load_options:
- # if refresh_state.load_path.parent:
- q = q._with_current_path(refresh_state.load_path.parent)
- q = q.options(refresh_state.load_options)
+ compile_options += {"_current_path": refresh_state.load_path.parent}
+ q = q.options(*refresh_state.load_options)
- q._get_options(
+ # TODO: most of the compile_options that are not legacy only involve this
+ # function, so try to see if handling of them can mostly be local to here
+
+ q.compile_options, load_options = _set_get_options(
+ compile_options,
+ load_options,
populate_existing=bool(refresh_state),
version_check=version_check,
only_load_props=only_load_props,
@@ -346,12 +426,76 @@ def load_on_pk_identity(
)
q._order_by = None
+ if no_autoflush:
+ load_options += {"_autoflush": False}
+
+ result = (
+ session.execute(
+ q,
+ params=load_options._params,
+ execution_options={"_sa_orm_load_options": load_options},
+ bind_arguments=bind_arguments,
+ )
+ .unique()
+ .scalars()
+ )
+
try:
- return q.one()
+ return result.one()
except orm_exc.NoResultFound:
return None
+def _no_criterion_assertion(stmt, meth, order_by=True, distinct=True):
+ if (
+ stmt._where_criteria
+ or stmt.compile_options._statement is not None
+ or stmt._from_obj
+ or stmt._legacy_setup_joins
+ or stmt._limit_clause is not None
+ or stmt._offset_clause is not None
+ or stmt._group_by_clauses
+ or (order_by and stmt._order_by_clauses)
+ or (distinct and stmt._distinct)
+ ):
+ raise sa_exc.InvalidRequestError(
+ "Query.%s() being called on a "
+ "Query with existing criterion. " % meth
+ )
+
+
+def _set_get_options(
+ compile_opt,
+ load_opt,
+ populate_existing=None,
+ version_check=None,
+ only_load_props=None,
+ refresh_state=None,
+ identity_token=None,
+):
+
+ compile_options = {}
+ load_options = {}
+ if version_check:
+ load_options["_version_check"] = version_check
+ if populate_existing:
+ load_options["_populate_existing"] = populate_existing
+ if refresh_state:
+ load_options["_refresh_state"] = refresh_state
+ compile_options["_for_refresh_state"] = True
+ if only_load_props:
+ compile_options["_only_load_props"] = frozenset(only_load_props)
+ if identity_token:
+ load_options["_refresh_identity_token"] = identity_token
+
+ if load_options:
+ load_opt += load_options
+ if compile_options:
+ compile_opt += compile_options
+
+ return compile_opt, load_opt
+
+
def _setup_entity_query(
compile_state,
mapper,
@@ -487,7 +631,7 @@ def _instance_processor(
context, path, mapper, result, adapter, populators
)
- propagate_options = context.propagate_options
+ propagated_loader_options = context.propagated_loader_options
load_path = (
context.compile_state.current_path + path
if context.compile_state.current_path.path
@@ -639,8 +783,8 @@ def _instance_processor(
# be conservative about setting load_path when populate_existing
# is in effect; want to maintain options from the original
# load. see test_expire->test_refresh_maintains_deferred_options
- if isnew and (propagate_options or not populate_existing):
- state.load_options = propagate_options
+ if isnew and (propagated_loader_options or not populate_existing):
+ state.load_options = propagated_loader_options
state.load_path = load_path
_populate_full(
@@ -1055,7 +1199,7 @@ def load_scalar_attributes(mapper, state, attribute_names, passive):
result = False
- no_autoflush = passive & attributes.NO_AUTOFLUSH
+ no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
# in the case of inheritance, particularly concrete and abstract
# concrete inheritance, the class manager might have some keys
@@ -1080,10 +1224,16 @@ def load_scalar_attributes(mapper, state, attribute_names, passive):
# note: using from_statement() here means there is an adaption
# with adapt_on_names set up. the other option is to make the
# aliased() against a subquery which affects the SQL.
+
+ from .query import FromStatement
+
+ stmt = FromStatement(mapper, statement).options(
+ strategy_options.Load(mapper).undefer("*")
+ )
+
result = load_on_ident(
- session.query(mapper)
- .options(strategy_options.Load(mapper).undefer("*"))
- .from_statement(statement),
+ session,
+ stmt,
None,
only_load_props=attribute_names,
refresh_state=state,
@@ -1121,7 +1271,8 @@ def load_scalar_attributes(mapper, state, attribute_names, passive):
return
result = load_on_ident(
- session.query(mapper),
+ session,
+ future.select(mapper).apply_labels(),
identity_key,
refresh_state=state,
only_load_props=attribute_names,